+ defp ets_count_expires(table, current_time) do
+ :ets.select_count(
+ table,
+ [
+ {
+ {:"$1", :"$2"},
+ [{:"=<", :"$1", {:const, current_time}}],
+ [true]
+ }
+ ]
+ )
+ end
+
+ defp ets_pop_n_expired(table, current_time, desired) do
+ {popped, _continuation} =
+ :ets.select(
+ table,
+ [
+ {
+ {:"$1", :"$2"},
+ [{:"=<", :"$1", {:const, current_time}}],
+ [:"$_"]
+ }
+ ],
+ desired
+ )
+
+ popped
+ |> Enum.each(fn e ->
+ :ets.delete_object(table, e)
+ end)
+
+ popped
+ end
+
+ def maybe_start_job(running_jobs, queue_table) do
+ # we don't want to hit the ets or the DateTime more times than we have to
+ # could optimize slightly further by not using the count, and instead grabbing
+ # up to N objects early...
+ current_time = DateTime.to_unix(DateTime.utc_now())
+ n_running_jobs = :sets.size(running_jobs)
+
+ if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
+ n_ready_jobs = ets_count_expires(queue_table, current_time)
+
+ if n_ready_jobs > 0 do
+ # figure out how many we could start
+ available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
+ start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
+ else
+ running_jobs
+ end
+ else
+ running_jobs
+ end
+ end
+
+ defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
+ running_jobs
+ end
+
+ defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
+ when available_job_slots > 0 do
+ candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
+
+ candidates
+ |> List.foldl(running_jobs, fn {_, e}, rj ->
+ {:ok, pid} = Task.start(fn -> worker(e) end)
+ mref = Process.monitor(pid)
+ :sets.add_element(mref, rj)
+ end)
+ end
+
+ def worker({:send, data, transport, retries}) do
+ case transport.publish_one(data) do
+ {:ok, _} ->
+ GenServer.cast(__MODULE__, :inc_delivered)
+ :delivered
+
+ {:error, _reason} ->
+ enqueue(data, transport, retries)
+ :retry
+ end
+ end
+
+ def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
+ {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
+ end
+
+ def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
+ {:reply, %{delivered: delivery_count, dropped: drop_count},
+ %{state | delivered: 0, dropped: 0}}
+ end
+
+ def handle_cast(:reset_stats, state) do
+ {:noreply, %{state | delivered: 0, dropped: 0}}
+ end
+
+ def handle_cast(
+ {:maybe_enqueue, data, transport, retries},
+ %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
+ ) do
+ case get_retry_params(retries) do
+ {:retry, timeout} ->
+ :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
+ running_jobs = maybe_start_job(running_jobs, queue_table)
+ {:noreply, %{state | running_jobs: running_jobs}}