{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
- (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
- Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
- Federator.enqueue(:publish_single_ap, %{
++ (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
+ |> Enum.filter(fn user -> User.ap_enabled?(user) end)
+ |> Enum.map(fn %{info: %{source_data: data}} ->
+ (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
+ end)
+ |> Enum.uniq()
+ |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
- |> Enum.each(fn inbox ->
++ |> Instances.filter_reachable()
++ |> Enum.each(fn {inbox, unreachable_since} ->
+ Federator.publish_single_ap(%{
inbox: inbox,
json: json,
actor: actor,
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
- use GenServer
alias Pleroma.User
alias Pleroma.Activity
- alias Pleroma.Web.{WebFinger, Websub}
+ alias Pleroma.Jobs
+ alias Pleroma.Web.{WebFinger, Websub, Salmon}
alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay
@websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :ostatus)
- def init(args) do
- {:ok, args}
+ def init() do
+ # 1 minute
+ Process.sleep(1000 * 60 * 1)
+ refresh_subscriptions()
end
- def start_link do
- spawn(fn ->
- # 1 minute
- Process.sleep(1000 * 60 * 1)
- enqueue(:refresh_subscriptions, nil)
- end)
+ # Client API
+
+ def incoming_doc(doc) do
+ Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ end
+
+ def incoming_ap_doc(params) do
+ Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ end
- GenServer.start_link(
- __MODULE__,
- %{
- in: {:sets.new(), []},
- out: {:sets.new(), []}
- },
- name: __MODULE__
- )
+ def publish(activity, priority \\ 1) do
+ Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority)
end
- def handle(:refresh_subscriptions, _) do
+ def publish_single_ap(params) do
+ Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params])
+ end
+
+ def publish_single_websub(websub) do
+ Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub])
+ end
+
+ def verify_websub(websub) do
+ Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub])
+ end
+
+ def request_subscription(sub) do
+ Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub])
+ end
+
+ def refresh_subscriptions() do
+ Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions])
+ end
+
++ def publish_single_salmon(params) do
++ Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_salmon, params])
++ end
++
+ # Job Worker Callbacks
+
+ def perform(:refresh_subscriptions) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
end
end
- def handle(:publish_single_salmon, params) do
++ def perform(:publish_single_salmon, params) do
+ Salmon.send_to_user(params)
+ end
+
- def handle(:publish_single_ap, params) do
+ def perform(:publish_single_ap, params) do
case ActivityPub.publish_one(params) do
{:ok, _} ->
:ok
{:ok, private, _} = keys_from_pem(keys)
{:ok, feed} = encode(private, feed)
- remote_users(activity)
+ remote_users = remote_users(activity)
+
+ salmon_urls = Enum.map(remote_users, & &1.info.salmon)
+ reachable_urls_metadata = Instances.filter_reachable(salmon_urls)
+ reachable_urls = Map.keys(reachable_urls_metadata)
+
+ remote_users
+ |> Enum.filter(&(&1.info.salmon in reachable_urls))
|> Enum.each(fn remote_user ->
- Task.start(fn ->
- Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
- send_to_user(remote_user, feed, poster)
- end)
+ Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
+
- Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{
++ Pleroma.Web.Federator.publish_single_salmon(%{
+ recipient: remote_user,
+ feed: feed,
+ poster: poster,
+ unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
+ })
end)
end
end
defmodule Pleroma.Web.Websub do
alias Ecto.Changeset
alias Pleroma.Repo
+ alias Pleroma.Instances
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
alias Pleroma.Web.OStatus.FeedRepresenter
- alias Pleroma.Web.{XML, Endpoint, OStatus}
+ alias Pleroma.Web.{XML, Endpoint, OStatus, Federator}
alias Pleroma.Web.Router.Helpers
require Logger
xml: response,
topic: topic,
callback: sub.callback,
- secret: sub.secret
+ secret: sub.secret,
+ unreachable_since: reachable_callbacks_metadata[sub.callback]
}
- Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
+ Federator.publish_single_websub(data)
end)
end