X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Ffederator.ex;h=1a2da014ae054999325a55eaf1c844a12d1210e4;hb=c623b4324deaf236334a0f77a81435b5bffadf3c;hp=bb9eadfee6b620c5e24821ae46e4c4f4d6835f18;hpb=b7fad8d395c2bd1afe445a370e539571f5ec0c18;p=akkoma diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index bb9eadfee..1a2da014a 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,14 +3,23 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - alias Pleroma.Workers.Publisher, as: PublisherWorker - alias Pleroma.Workers.Receiver, as: ReceiverWorker - alias Pleroma.Workers.Subscriber, as: SubscriberWorker + alias Pleroma.Activity + alias Pleroma.Object.Containment + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.ActivityPub.Utils + alias Pleroma.Web.Federator.Publisher + alias Pleroma.Web.OStatus + alias Pleroma.Web.Websub + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker + alias Pleroma.Workers.SubscriberWorker require Logger def init do - # 1 minute + # To do: consider removing this call in favor of scheduled execution (`quantum`-based) refresh_subscriptions(schedule_in: 60) end @@ -29,50 +38,109 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - %{"op" => "incoming_doc", "body" => doc} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_doc", %{"body" => doc}) end def incoming_ap_doc(params) do - %{"op" => "incoming_ap_doc", "params" => params} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end def publish(%{id: "pleroma:fakeid"} = activity) do - PublisherWorker.perform_publish(activity) + perform(:publish, activity) end def publish(activity) do - %{"op" => "publish", "activity_id" => activity.id} - |> PublisherWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end def verify_websub(websub) do - %{"op" => "verify_websub", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id}) end def request_subscription(websub) do - %{"op" => "request_subscription", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id}) end def refresh_subscriptions(worker_args \\ []) do - %{"op" => "refresh_subscriptions"} - |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1]) end - defp worker_args(queue) do - if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do - [max_attempts: max_attempts] + # Job Worker Callbacks + + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) + end + + def perform(:publish, activity) do + Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) + + with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), + {:ok, actor} <- User.ensure_keys_present(actor) do + Publisher.publish(actor, activity) + end + end + + def perform(:incoming_doc, doc) do + Logger.info("Got document, trying to parse") + OStatus.handle_incoming(doc) + end + + def perform(:incoming_ap_doc, params) do + Logger.info("Handling incoming AP activity") + + params = Utils.normalize_params(params) + + # NOTE: we use the actor ID to do the containment, this is fine because an + # actor shouldn't be acting on objects outside their own AP server. + with {:ok, _user} <- ap_enabled_actor(params["actor"]), + nil <- Activity.normalize(params["id"]), + :ok <- Containment.contain_origin_from_id(params["actor"], params), + {:ok, activity} <- Transmogrifier.handle_incoming(params) do + {:ok, activity} + else + %Activity{} -> + Logger.info("Already had #{params["id"]}") + :error + + _e -> + # Just drop those for now + Logger.info("Unhandled activity") + Logger.info(Jason.encode!(params, pretty: true)) + :error + end + end + + def perform(:request_subscription, websub) do + Logger.debug("Refreshing #{websub.topic}") + + with {:ok, websub} <- Websub.request_subscription(websub) do + Logger.debug("Successfully refreshed #{websub.topic}") + else + _e -> Logger.debug("Couldn't refresh #{websub.topic}") + end + end + + def perform(:verify_websub, websub) do + Logger.debug(fn -> + "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" + end) + + Websub.verify(websub) + end + + def perform(:refresh_subscriptions) do + Logger.debug("Federator running refresh subscriptions") + Websub.refresh_subscriptions() + end + + def ap_enabled_actor(id) do + user = User.get_cached_by_ap_id(id) + + if User.ap_enabled?(user) do + {:ok, user} else - [] + ActivityPub.make_user_from_ap_id(id) end end end