X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=fd904ef0a47767b5e46d3f1f5f26010bf6cf3b5e;hb=86f2fbc81e5f3df6c16e05127e9e39c8fca0cde9;hp=d85fe824fd8f7cd46dca214083ef0baef6e4a2f6;hpb=8778c16dac1789eeb6dd51524573163216200b5d;p=akkoma
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index d85fe824f..fd904ef0a 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
+# Copyright © 2017-2020 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
@@ -10,28 +10,24 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.OStatus
- alias Pleroma.Web.Websub
- alias Pleroma.Workers.Publisher, as: PublisherWorker
- alias Pleroma.Workers.Receiver, as: ReceiverWorker
- alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+ alias Pleroma.Workers.PublisherWorker
+ alias Pleroma.Workers.ReceiverWorker
require Logger
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
-
- def init do
- # 1 minute
- refresh_subscriptions(schedule_in: 60)
- end
-
- @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
+ @doc """
+ Returns `true` if the distance to target object does not exceed max configured value.
+ Serves to prevent fetching of very long threads, especially useful on smaller instances.
+ Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
+ Applies to fetching of both ancestor (reply-to) and child (reply) objects.
+ """
# credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
- def allowed_incoming_reply_depth?(depth) do
- max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
+ def allowed_thread_distance?(distance) do
+ max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
- if max_replies_depth do
- (depth || 1) <= max_replies_depth
+ if max_distance && max_distance >= 0 do
+ # Default depth is 0 (an object has zero distance from itself in its thread)
+ (distance || 0) <= max_distance
else
true
end
@@ -39,16 +35,8 @@ defmodule Pleroma.Web.Federator do
# Client API
- def incoming_doc(doc) do
- %{"op" => "incoming_doc", "body" => doc}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
- end
-
def incoming_ap_doc(params) do
- %{"op" => "incoming_ap_doc", "params" => params}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
def publish(%{id: "pleroma:fakeid"} = activity) do
@@ -56,27 +44,7 @@ defmodule Pleroma.Web.Federator do
end
def publish(activity) do
- %{"op" => "publish", "activity_id" => activity.id}
- |> PublisherWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
- end
-
- def verify_websub(websub) do
- %{"op" => "verify_websub", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
- end
-
- def request_subscription(websub) do
- %{"op" => "request_subscription", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
- end
-
- def refresh_subscriptions(worker_args \\ []) do
- %{"op" => "refresh_subscriptions"}
- |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
# Job Worker Callbacks
@@ -95,13 +63,8 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(:incoming_doc, doc) do
- Logger.info("Got document, trying to parse")
- OStatus.handle_incoming(doc)
- end
-
def perform(:incoming_ap_doc, params) do
- Logger.info("Handling incoming AP activity")
+ Logger.debug("Handling incoming AP activity")
params = Utils.normalize_params(params)
@@ -114,46 +77,17 @@ defmodule Pleroma.Web.Federator do
{:ok, activity}
else
%Activity{} ->
- Logger.info("Already had #{params["id"]}")
+ Logger.debug("Already had #{params["id"]}")
:error
_e ->
# Just drop those for now
- Logger.info("Unhandled activity")
- Logger.info(Jason.encode!(params, pretty: true))
+ Logger.debug("Unhandled activity")
+ Logger.debug(Jason.encode!(params, pretty: true))
:error
end
end
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
-
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
- end
-
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
- end
-
def ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)