# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.OStatus
- alias Pleroma.Web.Websub
- alias Pleroma.Workers.Publisher, as: PublisherWorker
- alias Pleroma.Workers.Receiver, as: ReceiverWorker
- alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+ alias Pleroma.Workers.PublisherWorker
+ alias Pleroma.Workers.ReceiverWorker
require Logger
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
-
- def init do
- # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
- refresh_subscriptions(schedule_in: 60)
- end
-
- @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
+ @doc """
+ Returns `true` if the distance to target object does not exceed max configured value.
+ Serves to prevent fetching of very long threads, especially useful on smaller instances.
+ Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
+ Applies to fetching of both ancestor (reply-to) and child (reply) objects.
+ """
# credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
- def allowed_incoming_reply_depth?(depth) do
- max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
+ def allowed_thread_distance?(distance) do
+ max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
- if max_replies_depth do
- (depth || 1) <= max_replies_depth
+ if max_distance && max_distance >= 0 do
+ # Default depth is 0 (an object has zero distance from itself in its thread)
+ (distance || 0) <= max_distance
else
true
end
# Client API
- def incoming_doc(doc) do
- %{"op" => "incoming_doc", "body" => doc}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
- end
-
def incoming_ap_doc(params) do
- %{"op" => "incoming_ap_doc", "params" => params}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
def publish(%{id: "pleroma:fakeid"} = activity) do
end
def publish(activity) do
- %{"op" => "publish", "activity_id" => activity.id}
- |> PublisherWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
- end
-
- def verify_websub(websub) do
- %{"op" => "verify_websub", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
- end
-
- def request_subscription(websub) do
- %{"op" => "request_subscription", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
- end
-
- def refresh_subscriptions(worker_args \\ []) do
- %{"op" => "refresh_subscriptions"}
- |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
# Job Worker Callbacks
end
end
- def perform(:incoming_doc, doc) do
- Logger.info("Got document, trying to parse")
- OStatus.handle_incoming(doc)
- end
-
def perform(:incoming_ap_doc, params) do
- Logger.info("Handling incoming AP activity")
+ Logger.debug("Handling incoming AP activity")
params = Utils.normalize_params(params)
# actor shouldn't be acting on objects outside their own AP server.
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
nil <- Activity.normalize(params["id"]),
- :ok <- Containment.contain_origin_from_id(params["actor"], params),
+ {_, :ok} <-
+ {:correct_origin?, Containment.contain_origin_from_id(params["actor"], params)},
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity}
else
+ {:correct_origin?, _} ->
+ Logger.debug("Origin containment failure for #{params["id"]}")
+ {:error, :origin_containment_failed}
+
%Activity{} ->
- Logger.info("Already had #{params["id"]}")
- :error
+ Logger.debug("Already had #{params["id"]}")
+ {:error, :already_present}
- _e ->
+ e ->
# Just drop those for now
- Logger.info("Unhandled activity")
- Logger.info(Jason.encode!(params, pretty: true))
- :error
- end
- end
-
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
-
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
+ Logger.debug("Unhandled activity")
+ Logger.debug(Jason.encode!(params, pretty: true))
+ {:error, e}
end
end
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
- end
-
def ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)