X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=e8a56ebd706d7f243328c7cad05202bcf1b5a626;hb=791bcfd90f41da9d77ab5a5ad6eec22ae8050b8a;hp=cf7e50fee28bd61829473a5671a992019083a19c;hpb=c34126f89ca58141cc927bddc2474bef76ea1575;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index cf7e50fee..e8a56ebd7 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,21 +10,11 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.OStatus - alias Pleroma.Web.Websub - alias Pleroma.Workers.Publisher, as: PublisherWorker - alias Pleroma.Workers.Receiver, as: ReceiverWorker - alias Pleroma.Workers.Subscriber, as: SubscriberWorker + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker require Logger - defdelegate worker_args(queue), to: Pleroma.Workers.Helper - - def init do - # To do: consider removing this call in favor of scheduled execution (`quantum`-based) - refresh_subscriptions(schedule_in: 60) - end - @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength def allowed_incoming_reply_depth?(depth) do @@ -39,16 +29,8 @@ defmodule Pleroma.Web.Federator do # Client API - def incoming_doc(doc) do - %{"op" => "incoming_doc", "body" => doc} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() - end - def incoming_ap_doc(params) do - %{"op" => "incoming_ap_doc", "params" => params} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end def publish(%{id: "pleroma:fakeid"} = activity) do @@ -56,27 +38,7 @@ defmodule Pleroma.Web.Federator do end def publish(activity) do - %{"op" => "publish", "activity_id" => activity.id} - |> PublisherWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() - end - - def verify_websub(websub) do - %{"op" => "verify_websub", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() - end - - def request_subscription(websub) do - %{"op" => "request_subscription", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() - end - - def refresh_subscriptions(worker_args \\ []) do - %{"op" => "refresh_subscriptions"} - |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end # Job Worker Callbacks @@ -95,11 +57,6 @@ defmodule Pleroma.Web.Federator do end end - def perform(:incoming_doc, doc) do - Logger.info("Got document, trying to parse") - OStatus.handle_incoming(doc) - end - def perform(:incoming_ap_doc, params) do Logger.info("Handling incoming AP activity") @@ -125,29 +82,6 @@ defmodule Pleroma.Web.Federator do end end - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") - - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end - end - - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - end - def ap_enabled_actor(id) do user = User.get_cached_by_ap_id(id)