X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=13065414545829c8bf4696bbfff83246433ea551;hb=66e00ace7c0708f2f9361bc6e1008ccea08cb6ef;hp=d85fe824fd8f7cd46dca214083ef0baef6e4a2f6;hpb=7101ba1a21ded292ba057f9aadec1756914b45e0;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index d85fe824f..130654145 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors +# Copyright © 2017-2020 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do @@ -10,28 +10,24 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.OStatus - alias Pleroma.Web.Websub - alias Pleroma.Workers.Publisher, as: PublisherWorker - alias Pleroma.Workers.Receiver, as: ReceiverWorker - alias Pleroma.Workers.Subscriber, as: SubscriberWorker + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker require Logger - defdelegate worker_args(queue), to: Pleroma.Workers.Helper - - def init do - # 1 minute - refresh_subscriptions(schedule_in: 60) - end - - @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" + @doc """ + Returns `true` if the distance to target object does not exceed max configured value. + Serves to prevent fetching of very long threads, especially useful on smaller instances. + Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161). + Applies to fetching of both ancestor (reply-to) and child (reply) objects. + """ # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength - def allowed_incoming_reply_depth?(depth) do - max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) + def allowed_thread_distance?(distance) do + max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) - if max_replies_depth do - (depth || 1) <= max_replies_depth + if max_distance && max_distance >= 0 do + # Default depth is 0 (an object has zero distance from itself in its thread) + (distance || 0) <= max_distance else true end @@ -39,16 +35,8 @@ defmodule Pleroma.Web.Federator do # Client API - def incoming_doc(doc) do - %{"op" => "incoming_doc", "body" => doc} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() - end - def incoming_ap_doc(params) do - %{"op" => "incoming_ap_doc", "params" => params} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end def publish(%{id: "pleroma:fakeid"} = activity) do @@ -56,27 +44,7 @@ defmodule Pleroma.Web.Federator do end def publish(activity) do - %{"op" => "publish", "activity_id" => activity.id} - |> PublisherWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() - end - - def verify_websub(websub) do - %{"op" => "verify_websub", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() - end - - def request_subscription(websub) do - %{"op" => "request_subscription", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() - end - - def refresh_subscriptions(worker_args \\ []) do - %{"op" => "refresh_subscriptions"} - |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end # Job Worker Callbacks @@ -95,65 +63,42 @@ defmodule Pleroma.Web.Federator do end end - def perform(:incoming_doc, doc) do - Logger.info("Got document, trying to parse") - OStatus.handle_incoming(doc) - end - def perform(:incoming_ap_doc, params) do - Logger.info("Handling incoming AP activity") + Logger.debug("Handling incoming AP activity") - params = Utils.normalize_params(params) + actor = + params + |> Map.get("actor") + |> Utils.get_ap_id() # NOTE: we use the actor ID to do the containment, this is fine because an # actor shouldn't be acting on objects outside their own AP server. - with {:ok, _user} <- ap_enabled_actor(params["actor"]), + with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)}, nil <- Activity.normalize(params["id"]), - :ok <- Containment.contain_origin_from_id(params["actor"], params), + {_, :ok} <- + {:correct_origin?, Containment.contain_origin_from_id(actor, params)}, {:ok, activity} <- Transmogrifier.handle_incoming(params) do {:ok, activity} else - %Activity{} -> - Logger.info("Already had #{params["id"]}") - :error + {:correct_origin?, _} -> + Logger.debug("Origin containment failure for #{params["id"]}") + {:error, :origin_containment_failed} - _e -> - # Just drop those for now - Logger.info("Unhandled activity") - Logger.info(Jason.encode!(params, pretty: true)) - :error - end - end + %Activity{} -> + Logger.debug("Already had #{params["id"]}") + {:error, :already_present} - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") + {:actor, e} -> + Logger.debug("Unhandled actor #{actor}, #{inspect(e)}") + {:error, e} - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") + e -> + # Just drop those for now + Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end) + {:error, e} end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) - end - def ap_enabled_actor(id) do user = User.get_cached_by_ap_id(id)