X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Ffederator%2Fpublisher.ex;h=ad020136175660888fc58dbe371f3f024b06a6ce;hb=e6dab6513da0dcab19179b1a3448c0cce0c9f206;hp=fb4e8548da10a5f48ceafa93d5faa9f06cbecc0a;hpb=fc7246d7159a97a8b9bb878e848db6f8ac0df988;p=akkoma
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index fb4e8548d..ad0201361 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -1,12 +1,12 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
+# Copyright © 2017-2020 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
+ alias Pleroma.Workers.PublisherWorker
require Logger
@@ -30,36 +30,24 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
-
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ def enqueue_one(module, %{} = params) do
+ PublisherWorker.enqueue(
+ "publish_one",
+ %{"module" => to_string(module), "params" => params}
+ )
end
@doc """
Relays an activity to all specified peers.
"""
- @callback publish(Pleroma.User.t(), Pleroma.Activity.t()) :: :ok | {:error, any()}
+ @callback publish(User.t(), Activity.t()) :: :ok | {:error, any()}
- @spec publish(Pleroma.User.t(), Pleroma.Activity.t()) :: :ok
+ @spec publish(User.t(), Activity.t()) :: :ok
def publish(%User{} = user, %Activity{} = activity) do
Config.get([:instance, :federation_publisher_modules])
|> Enum.each(fn module ->
if module.is_representable?(activity) do
- Logger.info("Publishing #{activity.data["id"]} using #{inspect(module)}")
+ Logger.debug("Publishing #{activity.data["id"]} using #{inspect(module)}")
module.publish(user, activity)
end
end)
@@ -70,9 +58,9 @@ defmodule Pleroma.Web.Federator.Publisher do
@doc """
Gathers links used by an outgoing federation module for WebFinger output.
"""
- @callback gather_webfinger_links(Pleroma.User.t()) :: list()
+ @callback gather_webfinger_links(User.t()) :: list()
- @spec gather_webfinger_links(Pleroma.User.t()) :: list()
+ @spec gather_webfinger_links(User.t()) :: list()
def gather_webfinger_links(%User{} = user) do
Config.get([:instance, :federation_publisher_modules])
|> Enum.reduce([], fn module, links ->
@@ -92,4 +80,30 @@ defmodule Pleroma.Web.Federator.Publisher do
links ++ module.gather_nodeinfo_protocol_names()
end)
end
+
+ @doc """
+ Gathers a set of remote users given an IR envelope.
+ """
+ def remote_users(%User{id: user_id}, %{data: %{"to" => to} = data}) do
+ cc = Map.get(data, "cc", [])
+
+ bcc =
+ data
+ |> Map.get("bcc", [])
+ |> Enum.reduce([], fn ap_id, bcc ->
+ case Pleroma.List.get_by_ap_id(ap_id) do
+ %Pleroma.List{user_id: ^user_id} = list ->
+ {:ok, following} = Pleroma.List.get_following(list)
+ bcc ++ Enum.map(following, & &1.ap_id)
+
+ _ ->
+ bcc
+ end
+ end)
+
+ [to, cc, bcc]
+ |> Enum.concat()
+ |> Enum.map(&User.get_cached_by_ap_id/1)
+ |> Enum.filter(fn user -> user && !user.local end)
+ end
end