X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=f506a7d245eace7c06bff5bcb2dde6a22e6a84b0;hb=6fab5fe7f892b5a9cf4d605b079e544583f0ce73;hp=bb9eadfee6b620c5e24821ae46e4c4f4d6835f18;hpb=267262491ecf7b413052708062abac69b1d8f643;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index bb9eadfee..f506a7d24 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,17 +3,18 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - alias Pleroma.Workers.Publisher, as: PublisherWorker - alias Pleroma.Workers.Receiver, as: ReceiverWorker - alias Pleroma.Workers.Subscriber, as: SubscriberWorker + alias Pleroma.Activity + alias Pleroma.Object.Containment + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.ActivityPub.Utils + alias Pleroma.Web.Federator.Publisher + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker require Logger - def init do - # 1 minute - refresh_subscriptions(schedule_in: 60) - end - @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength def allowed_incoming_reply_depth?(depth) do @@ -28,51 +29,66 @@ defmodule Pleroma.Web.Federator do # Client API - def incoming_doc(doc) do - %{"op" => "incoming_doc", "body" => doc} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() - end - def incoming_ap_doc(params) do - %{"op" => "incoming_ap_doc", "params" => params} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end def publish(%{id: "pleroma:fakeid"} = activity) do - PublisherWorker.perform_publish(activity) + perform(:publish, activity) end def publish(activity) do - %{"op" => "publish", "activity_id" => activity.id} - |> PublisherWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end - def verify_websub(websub) do - %{"op" => "verify_websub", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + # Job Worker Callbacks + + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end - def request_subscription(websub) do - %{"op" => "request_subscription", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + def perform(:publish, activity) do + Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) + + with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), + {:ok, actor} <- User.ensure_keys_present(actor) do + Publisher.publish(actor, activity) + end end - def refresh_subscriptions(worker_args \\ []) do - %{"op" => "refresh_subscriptions"} - |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + def perform(:incoming_ap_doc, params) do + Logger.debug("Handling incoming AP activity") + + params = Utils.normalize_params(params) + + # NOTE: we use the actor ID to do the containment, this is fine because an + # actor shouldn't be acting on objects outside their own AP server. + with {:ok, _user} <- ap_enabled_actor(params["actor"]), + nil <- Activity.normalize(params["id"]), + :ok <- Containment.contain_origin_from_id(params["actor"], params), + {:ok, activity} <- Transmogrifier.handle_incoming(params) do + {:ok, activity} + else + %Activity{} -> + Logger.debug("Already had #{params["id"]}") + :error + + _e -> + # Just drop those for now + Logger.debug("Unhandled activity") + Logger.debug(Jason.encode!(params, pretty: true)) + :error + end end - defp worker_args(queue) do - if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do - [max_attempts: max_attempts] + def ap_enabled_actor(id) do + user = User.get_cached_by_ap_id(id) + + if User.ap_enabled?(user) do + {:ok, user} else - [] + ActivityPub.make_user_from_ap_id(id) end end end