X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=13065414545829c8bf4696bbfff83246433ea551;hb=66e00ace7c0708f2f9361bc6e1008ccea08cb6ef;hp=f4c9fe28403d4991f6569b7bc91897529fbcd155;hpb=ad5263c647aea65dbeb4c329825671895e0a8863;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4c9fe284..130654145 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors +# Copyright © 2017-2020 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do @@ -10,65 +10,48 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue - alias Pleroma.Web.OStatus - alias Pleroma.Web.Websub + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker require Logger - def init do - # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() + @doc """ + Returns `true` if the distance to target object does not exceed max configured value. + Serves to prevent fetching of very long threads, especially useful on smaller instances. + Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161). + Applies to fetching of both ancestor (reply-to) and child (reply) objects. + """ + # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength + def allowed_thread_distance?(distance) do + max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) + + if max_distance && max_distance >= 0 do + # Default depth is 0 (an object has zero distance from itself in its thread) + (distance || 0) <= max_distance + else + true + end end # Client API - def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) - end - def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) - end - - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) - end - - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + def publish(%{id: "pleroma:fakeid"} = activity) do + perform(:publish, activity) end - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + def publish(activity) do + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end # Job Worker Callbacks - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) - end - - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") - - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end def perform(:publish, activity) do @@ -80,62 +63,42 @@ defmodule Pleroma.Web.Federator do end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - - def perform(:incoming_doc, doc) do - Logger.info("Got document, trying to parse") - OStatus.handle_incoming(doc) - end - def perform(:incoming_ap_doc, params) do - Logger.info("Handling incoming AP activity") + Logger.debug("Handling incoming AP activity") - params = Utils.normalize_params(params) + actor = + params + |> Map.get("actor") + |> Utils.get_ap_id() # NOTE: we use the actor ID to do the containment, this is fine because an # actor shouldn't be acting on objects outside their own AP server. - with {:ok, _user} <- ap_enabled_actor(params["actor"]), + with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)}, nil <- Activity.normalize(params["id"]), - :ok <- Containment.contain_origin_from_id(params["actor"], params), + {_, :ok} <- + {:correct_origin?, Containment.contain_origin_from_id(actor, params)}, {:ok, activity} <- Transmogrifier.handle_incoming(params) do {:ok, activity} else - %Activity{} -> - Logger.info("Already had #{params["id"]}") - :error + {:correct_origin?, _} -> + Logger.debug("Origin containment failure for #{params["id"]}") + {:error, :origin_containment_failed} - _e -> - # Just drop those for now - Logger.info("Unhandled activity") - Logger.info(Jason.encode!(params, pretty: true)) - :error - end - end + %Activity{} -> + Logger.debug("Already had #{params["id"]}") + {:error, :already_present} - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok + {:actor, e} -> + Logger.debug("Unhandled actor #{actor}, #{inspect(e)}") + {:error, e} - {:error, _} -> - RetryQueue.enqueue(params, Websub) + e -> + # Just drop those for now + Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end) + {:error, e} end end - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} - end - def ap_enabled_actor(id) do user = User.get_cached_by_ap_id(id)