X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=013fb5b70d29e349ef8bd7b484076a9df10b7d3e;hb=61d9f43e463a3b3b0c1e4b9c98c22e222797bd82;hp=f4f9e83e06ab55f71b715301631865bc46b11c00;hpb=a7994185739522dee80e22f76e5fdac1a3b8279b;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..013fb5b70 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,25 +10,24 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue - alias Pleroma.Web.OStatus - alias Pleroma.Web.Websub + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker require Logger - def init do - # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() - end - - @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" + @doc """ + Returns `true` if the distance to target object does not exceed max configured value. + Serves to prevent fetching of very long threads, especially useful on smaller instances. + Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161). + Applies to fetching of both ancestor (reply-to) and child (reply) objects. + """ # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength - def allowed_incoming_reply_depth?(depth) do - max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) + def allowed_thread_distance?(distance) do + max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) - if max_replies_depth do - (depth || 1) <= max_replies_depth + if max_distance && max_distance >= 0 do + # Default depth is 0 (an object has zero distance from itself in its thread) + (distance || 0) <= max_distance else true end @@ -36,51 +35,23 @@ defmodule Pleroma.Web.Federator do # Client API - def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) - end - def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + def publish(%{id: "pleroma:fakeid"} = activity) do + perform(:publish, activity) end - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) - end - - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) - end - - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + def publish(activity) do + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end # Job Worker Callbacks - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) - end - - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") - - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end def perform(:publish, activity) do @@ -92,21 +63,8 @@ defmodule Pleroma.Web.Federator do end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - - def perform(:incoming_doc, doc) do - Logger.info("Got document, trying to parse") - OStatus.handle_incoming(doc) - end - def perform(:incoming_ap_doc, params) do - Logger.info("Handling incoming AP activity") + Logger.debug("Handling incoming AP activity") params = Utils.normalize_params(params) @@ -119,35 +77,17 @@ defmodule Pleroma.Web.Federator do {:ok, activity} else %Activity{} -> - Logger.info("Already had #{params["id"]}") + Logger.debug("Already had #{params["id"]}") :error _e -> # Just drop those for now - Logger.info("Unhandled activity") - Logger.info(Jason.encode!(params, pretty: true)) + Logger.debug("Unhandled activity") + Logger.debug(Jason.encode!(params, pretty: true)) :error end end - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok - - {:error, _} -> - RetryQueue.enqueue(params, Websub) - end - end - - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} - end - def ap_enabled_actor(id) do user = User.get_cached_by_ap_id(id)