Merge branch 'develop' of https://git.pleroma.social/pleroma/pleroma into develop
[akkoma] / lib / pleroma / web / federator / federator.ex
index cf7e50fee28bd61829473a5671a992019083a19c..1a2da014ae054999325a55eaf1c844a12d1210e4 100644 (file)
@@ -12,14 +12,12 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.Web.Federator.Publisher
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub
-  alias Pleroma.Workers.Publisher, as: PublisherWorker
-  alias Pleroma.Workers.Receiver, as: ReceiverWorker
-  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+  alias Pleroma.Workers.PublisherWorker
+  alias Pleroma.Workers.ReceiverWorker
+  alias Pleroma.Workers.SubscriberWorker
 
   require Logger
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
-
   def init do
     # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
     refresh_subscriptions(schedule_in: 60)
@@ -40,15 +38,11 @@ defmodule Pleroma.Web.Federator do
   # Client API
 
   def incoming_doc(doc) do
-    %{"op" => "incoming_doc", "body" => doc}
-    |> ReceiverWorker.new(worker_args(:federator_incoming))
-    |> Pleroma.Repo.insert()
+    ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
   end
 
   def incoming_ap_doc(params) do
-    %{"op" => "incoming_ap_doc", "params" => params}
-    |> ReceiverWorker.new(worker_args(:federator_incoming))
-    |> Pleroma.Repo.insert()
+    ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
   end
 
   def publish(%{id: "pleroma:fakeid"} = activity) do
@@ -56,27 +50,19 @@ defmodule Pleroma.Web.Federator do
   end
 
   def publish(activity) do
-    %{"op" => "publish", "activity_id" => activity.id}
-    |> PublisherWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
   end
 
   def verify_websub(websub) do
-    %{"op" => "verify_websub", "websub_id" => websub.id}
-    |> SubscriberWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
   end
 
   def request_subscription(websub) do
-    %{"op" => "request_subscription", "websub_id" => websub.id}
-    |> SubscriberWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
   end
 
   def refresh_subscriptions(worker_args \\ []) do
-    %{"op" => "refresh_subscriptions"}
-    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
   end
 
   # Job Worker Callbacks