Revert "Merge branch 'streamer-refactoring' into 'develop'"
[akkoma] / lib / pleroma / web / federator / federator.ex
index d85fe824fd8f7cd46dca214083ef0baef6e4a2f6..1a2da014ae054999325a55eaf1c844a12d1210e4 100644 (file)
@@ -12,16 +12,14 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.Web.Federator.Publisher
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub
-  alias Pleroma.Workers.Publisher, as: PublisherWorker
-  alias Pleroma.Workers.Receiver, as: ReceiverWorker
-  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+  alias Pleroma.Workers.PublisherWorker
+  alias Pleroma.Workers.ReceiverWorker
+  alias Pleroma.Workers.SubscriberWorker
 
   require Logger
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
-
   def init do
-    # 1 minute
+    # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
     refresh_subscriptions(schedule_in: 60)
   end
 
@@ -40,15 +38,11 @@ defmodule Pleroma.Web.Federator do
   # Client API
 
   def incoming_doc(doc) do
-    %{"op" => "incoming_doc", "body" => doc}
-    |> ReceiverWorker.new(worker_args(:federator_incoming))
-    |> Pleroma.Repo.insert()
+    ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
   end
 
   def incoming_ap_doc(params) do
-    %{"op" => "incoming_ap_doc", "params" => params}
-    |> ReceiverWorker.new(worker_args(:federator_incoming))
-    |> Pleroma.Repo.insert()
+    ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
   end
 
   def publish(%{id: "pleroma:fakeid"} = activity) do
@@ -56,27 +50,19 @@ defmodule Pleroma.Web.Federator do
   end
 
   def publish(activity) do
-    %{"op" => "publish", "activity_id" => activity.id}
-    |> PublisherWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
   end
 
   def verify_websub(websub) do
-    %{"op" => "verify_websub", "websub_id" => websub.id}
-    |> SubscriberWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
   end
 
   def request_subscription(websub) do
-    %{"op" => "request_subscription", "websub_id" => websub.id}
-    |> SubscriberWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
   end
 
   def refresh_subscriptions(worker_args \\ []) do
-    %{"op" => "refresh_subscriptions"}
-    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
   end
 
   # Job Worker Callbacks
@@ -146,12 +132,6 @@ defmodule Pleroma.Web.Federator do
   def perform(:refresh_subscriptions) do
     Logger.debug("Federator running refresh subscriptions")
     Websub.refresh_subscriptions()
-
-    spawn(fn ->
-      # 6 hours
-      Process.sleep(1000 * 60 * 60 * 6)
-      refresh_subscriptions()
-    end)
   end
 
   def ap_enabled_actor(id) do