provide instance thumbnail at /api/v1/instance (debug)
[akkoma] / lib / pleroma / web / federator / federator.ex
index 9f6f983aa873b86ec4baef6747f6ed1f1c894f59..c9f9dc7a1ec1eb677f2815f4e49e0766debc0b74 100644 (file)
@@ -15,8 +15,8 @@ defmodule Pleroma.Web.Federator do
       enqueue(:refresh_subscriptions, nil)
     end)
     GenServer.start_link(__MODULE__, %{
-          in: {:sets.new(), :queue.new()},
-          out: {:sets.new(), :queue.new()}
+          in: {:sets.new(), []},
+          out: {:sets.new(), []}
                          }, name: __MODULE__)
   end
 
@@ -41,12 +41,12 @@ defmodule Pleroma.Web.Federator do
   def handle(:publish, activity) do
     Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
     with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
-      Logger.debug(fn -> "Sending #{activity.data["id"]} out via websub" end)
-      Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
-
       {:ok, actor} = WebFinger.ensure_keys_present(actor)
       Logger.debug(fn -> "Sending #{activity.data["id"]} out via salmon" end)
       Pleroma.Web.Salmon.publish(actor, activity)
+
+      Logger.debug(fn -> "Sending #{activity.data["id"]} out via websub" end)
+      Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
     end
   end
 
@@ -79,17 +79,17 @@ defmodule Pleroma.Web.Federator do
     {:error, "Don't know what do do with this"}
   end
 
-  def enqueue(type, payload) do
+  def enqueue(type, payload, priority \\ 1) do
     if Mix.env == :test do
       handle(type, payload)
     else
-      GenServer.cast(__MODULE__, {:enqueue, type, payload})
+      GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
     end
   end
 
   def maybe_start_job(running_jobs, queue) do
-    if (:sets.size(running_jobs) < @max_jobs) && !:queue.is_empty(queue) do
-      {{:value, {type, payload}}, queue} = :queue.out(queue)
+    if (:sets.size(running_jobs) < @max_jobs) && queue != [] do
+      {{type, payload}, queue} = queue_pop(queue)
       {:ok, pid} = Task.start(fn -> handle(type, payload) end)
       mref = Process.monitor(pid)
       {:sets.add_element(mref, running_jobs), queue}
@@ -98,16 +98,16 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
-  def handle_cast({:enqueue, type, payload}, state) when type in [:incoming_doc] do
+  def handle_cast({:enqueue, type, payload, priority}, state) when type in [:incoming_doc] do
     %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
-    i_queue = :queue.in({type, payload}, i_queue)
+    i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
     {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
     {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
   end
 
-  def handle_cast({:enqueue, type, payload}, state) do
+  def handle_cast({:enqueue, type, payload, priority}, state) do
     %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
-    o_queue = :queue.in({type, payload}, o_queue)
+    o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
     {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
     {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
   end
@@ -126,4 +126,13 @@ defmodule Pleroma.Web.Federator do
 
     {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
   end
+
+  def enqueue_sorted(queue, element, priority) do
+    [%{item: element, priority: priority} | queue]
+    |> Enum.sort_by(fn (%{priority: priority}) -> priority end)
+  end
+
+  def queue_pop([%{item: element} | queue]) do
+    {element, queue}
+  end
 end