Priority queue fixes.
authorLain Iwakura <lain@soykaf.club>
Wed, 6 Dec 2017 15:51:11 +0000 (16:51 +0100)
committerLain Iwakura <lain@soykaf.club>
Wed, 6 Dec 2017 15:51:11 +0000 (16:51 +0100)
lib/pleroma/web/activity_pub/utils.ex
lib/pleroma/web/federator/federator.ex

index 51fac6fe27beacb21552577c5baf7aa8a921ba72..ac20a282266b635ab17cbbe681e23c2e6031bbe7 100644 (file)
@@ -29,7 +29,12 @@ defmodule Pleroma.Web.ActivityPub.Utils do
   Enqueues an activity for federation if it's local
   """
   def maybe_federate(%Activity{local: true} = activity) do
-    Pleroma.Web.Federator.enqueue(:publish, activity)
+    priority = case activity.data["type"] do
+                 "Delete" -> 10
+                 "Create" -> 1
+                 _ -> 5
+               end
+    Pleroma.Web.Federator.enqueue(:publish, activity, priority)
     :ok
   end
   def maybe_federate(_), do: :ok
index f384b313ca20ea3a0e29bcf12043e03a9db3c26f..b23ed5fcc71d15fa65f6de9aef6aacc397577407 100644 (file)
@@ -15,7 +15,7 @@ defmodule Pleroma.Web.Federator do
       enqueue(:refresh_subscriptions, nil)
     end)
     GenServer.start_link(__MODULE__, %{
-          in: {:sets.new(), [],
+          in: {:sets.new(), []},
           out: {:sets.new(), []}
                          }, name: __MODULE__)
   end
@@ -79,17 +79,17 @@ defmodule Pleroma.Web.Federator do
     {:error, "Don't know what do do with this"}
   end
 
-  def enqueue(type, payload) do
+  def enqueue(type, payload, priority \\ 1) do
     if Mix.env == :test do
       handle(type, payload)
     else
-      GenServer.cast(__MODULE__, {:enqueue, type, payload})
+      GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
     end
   end
 
   def maybe_start_job(running_jobs, queue) do
     if (:sets.size(running_jobs) < @max_jobs) && queue != [] do
-      {{:value, {type, payload}}, queue} = queue_pop(queue)
+      {{type, payload}, queue} = queue_pop(queue)
       {:ok, pid} = Task.start(fn -> handle(type, payload) end)
       mref = Process.monitor(pid)
       {:sets.add_element(mref, running_jobs), queue}
@@ -98,14 +98,14 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
-  def handle_cast({:enqueue, type, payload}, state) when type in [:incoming_doc] do
+  def handle_cast({:enqueue, type, payload, priority}, state) when type in [:incoming_doc] do
     %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
     i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
     {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
     {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
   end
 
-  def handle_cast({:enqueue, type, payload}, state) do
+  def handle_cast({:enqueue, type, payload, priority}, state) do
     %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
     o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
     {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)