- if Mix.env() == :test do
- def enqueue(type, payload, _priority \\ 1) do
- if Pleroma.Config.get([:instance, :federating]) do
- handle(type, payload)
- end
- end
- else
- def enqueue(type, payload, priority \\ 1) do
- if Pleroma.Config.get([:instance, :federating]) do
- GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
- end
- end
- end
-
- def maybe_start_job(running_jobs, queue) do
- if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
- {{type, payload}, queue} = queue_pop(queue)
- {:ok, pid} = Task.start(fn -> handle(type, payload) end)
- mref = Process.monitor(pid)
- {:sets.add_element(mref, running_jobs), queue}
- else
- {running_jobs, queue}
- end
- end
-
- def handle_cast({:enqueue, type, payload, _priority}, state)
- when type in [:incoming_doc, :incoming_ap_doc] do
- %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
- i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
- {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
- {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
- end
-
- def handle_cast({:enqueue, type, payload, _priority}, state) do
- %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
- o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
- {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
- {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
- end
-
- def handle_cast(_, state) do
- {:noreply, state}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
- i_running_jobs = :sets.del_element(ref, i_running_jobs)
- o_running_jobs = :sets.del_element(ref, o_running_jobs)
- {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
- {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
-
- {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
- end
-
- def enqueue_sorted(queue, element, priority) do
- [%{item: element, priority: priority} | queue]
- |> Enum.sort_by(fn %{priority: priority} -> priority end)
- end
-
- def queue_pop([%{item: element} | queue]) do
- {element, queue}
- end
-