X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=e4ab9ba32a538845dedbba040164d3f90a68aae7;hb=da876d09e89bcfec6f2d1eaddb396f68ce48e12a;hp=1a2da014ae054999325a55eaf1c844a12d1210e4;hpb=95a4a194173cb056c4ceeb422cbf8dfcdc234467;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 1a2da014a..e4ab9ba32 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors +# Copyright © 2017-2020 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do @@ -10,26 +10,24 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.OStatus - alias Pleroma.Web.Websub alias Pleroma.Workers.PublisherWorker alias Pleroma.Workers.ReceiverWorker - alias Pleroma.Workers.SubscriberWorker require Logger - def init do - # To do: consider removing this call in favor of scheduled execution (`quantum`-based) - refresh_subscriptions(schedule_in: 60) - end - - @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" + @doc """ + Returns `true` if the distance to target object does not exceed max configured value. + Serves to prevent fetching of very long threads, especially useful on smaller instances. + Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161). + Applies to fetching of both ancestor (reply-to) and child (reply) objects. + """ # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength - def allowed_incoming_reply_depth?(depth) do - max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) + def allowed_thread_distance?(distance) do + max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) - if max_replies_depth do - (depth || 1) <= max_replies_depth + if max_distance && max_distance >= 0 do + # Default depth is 0 (an object has zero distance from itself in its thread) + (distance || 0) <= max_distance else true end @@ -37,10 +35,6 @@ defmodule Pleroma.Web.Federator do # Client API - def incoming_doc(doc) do - ReceiverWorker.enqueue("incoming_doc", %{"body" => doc}) - end - def incoming_ap_doc(params) do ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end @@ -53,18 +47,6 @@ defmodule Pleroma.Web.Federator do PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end - def verify_websub(websub) do - SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id}) - end - - def request_subscription(websub) do - SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id}) - end - - def refresh_subscriptions(worker_args \\ []) do - SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1]) - end - # Job Worker Callbacks @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} @@ -81,59 +63,42 @@ defmodule Pleroma.Web.Federator do end end - def perform(:incoming_doc, doc) do - Logger.info("Got document, trying to parse") - OStatus.handle_incoming(doc) - end - def perform(:incoming_ap_doc, params) do - Logger.info("Handling incoming AP activity") + Logger.debug("Handling incoming AP activity") - params = Utils.normalize_params(params) + actor = + params + |> Map.get("actor") + |> Utils.get_ap_id() # NOTE: we use the actor ID to do the containment, this is fine because an # actor shouldn't be acting on objects outside their own AP server. - with {:ok, _user} <- ap_enabled_actor(params["actor"]), + with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)}, nil <- Activity.normalize(params["id"]), - :ok <- Containment.contain_origin_from_id(params["actor"], params), + {_, :ok} <- + {:correct_origin?, Containment.contain_origin_from_id(actor, params)}, {:ok, activity} <- Transmogrifier.handle_incoming(params) do {:ok, activity} else - %Activity{} -> - Logger.info("Already had #{params["id"]}") - :error + {:correct_origin?, _} -> + Logger.debug("Origin containment failure for #{params["id"]}") + {:error, :origin_containment_failed} - _e -> - # Just drop those for now - Logger.info("Unhandled activity") - Logger.info(Jason.encode!(params, pretty: true)) - :error - end - end + %Activity{} -> + Logger.debug("Already had #{params["id"]}") + {:error, :already_present} - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") + {:actor, e} -> + Logger.debug("Unhandled actor #{actor}, #{inspect(e)}") + {:error, e} - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") + e -> + # Just drop those for now + Logger.debug("Unhandled activity\n" <> Jason.encode!(params, pretty: true)) + {:error, e} end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - end - def ap_enabled_actor(id) do user = User.get_cached_by_ap_id(id)