From: Egor Kislitsyn Date: Mon, 4 Feb 2019 13:50:28 +0000 (+0700) Subject: Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=3a3a3996b7a37d281745586fa40bbabd5299a1ce;p=akkoma Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs # Conflicts: # lib/pleroma/web/activity_pub/activity_pub.ex # lib/pleroma/web/federator/federator.ex --- 3a3a3996b7a37d281745586fa40bbabd5299a1ce diff --cc lib/pleroma/web/activity_pub/activity_pub.ex index 6d784717e,4635e7fcd..bdc9456dd --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@@ -747,15 -747,18 +747,16 @@@ defmodule Pleroma.Web.ActivityPub.Activ {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - (Pleroma.Web.Salmon.remote_users(activity) ++ followers) - Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> - Federator.enqueue(:publish_single_ap, %{ ++ (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) + |> Enum.filter(fn user -> User.ap_enabled?(user) end) + |> Enum.map(fn %{info: %{source_data: data}} -> + (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] + end) + |> Enum.uniq() + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Enum.each(fn inbox -> ++ |> Instances.filter_reachable() ++ |> Enum.each(fn {inbox, unreachable_since} -> + Federator.publish_single_ap(%{ inbox: inbox, json: json, actor: actor, diff --cc lib/pleroma/web/federator/federator.ex index 4d03b4622,bb7676cf0..37100c9e6 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@@ -3,10 -3,10 +3,10 @@@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - use GenServer alias Pleroma.User alias Pleroma.Activity + alias Pleroma.Jobs - alias Pleroma.Web.{WebFinger, Websub} + alias Pleroma.Web.{WebFinger, Websub, Salmon} alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Relay @@@ -18,49 -18,28 +18,53 @@@ @websub Application.get_env(:pleroma, :websub) @ostatus Application.get_env(:pleroma, :ostatus) - def init(args) do - {:ok, args} + def init() do + # 1 minute + Process.sleep(1000 * 60 * 1) + refresh_subscriptions() end - def start_link do - spawn(fn -> - # 1 minute - Process.sleep(1000 * 60 * 1) - enqueue(:refresh_subscriptions, nil) - end) + # Client API + + def incoming_doc(doc) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + end + + def incoming_ap_doc(params) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + end - GenServer.start_link( - __MODULE__, - %{ - in: {:sets.new(), []}, - out: {:sets.new(), []} - }, - name: __MODULE__ - ) + def publish(activity, priority \\ 1) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority) end - def handle(:refresh_subscriptions, _) do + def publish_single_ap(params) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params]) + end + + def publish_single_websub(websub) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub]) + end + + def verify_websub(websub) do + Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub]) + end + + def request_subscription(sub) do + Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub]) + end + + def refresh_subscriptions() do + Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions]) + end + ++ def publish_single_salmon(params) do ++ Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_salmon, params]) ++ end ++ + # Job Worker Callbacks + + def perform(:refresh_subscriptions) do Logger.debug("Federator running refresh subscriptions") Websub.refresh_subscriptions() @@@ -145,7 -124,11 +149,11 @@@ end end - def handle(:publish_single_salmon, params) do ++ def perform(:publish_single_salmon, params) do + Salmon.send_to_user(params) + end + - def handle(:publish_single_ap, params) do + def perform(:publish_single_ap, params) do case ActivityPub.publish_one(params) do {:ok, _} -> :ok diff --cc lib/pleroma/web/salmon/salmon.ex index e41657da1,b1c2dc7fa..3db0c07fd --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@@ -209,12 -216,23 +216,23 @@@ defmodule Pleroma.Web.Salmon d {:ok, private, _} = keys_from_pem(keys) {:ok, feed} = encode(private, feed) - remote_users(activity) + remote_users = remote_users(activity) + + salmon_urls = Enum.map(remote_users, & &1.info.salmon) + reachable_urls_metadata = Instances.filter_reachable(salmon_urls) + reachable_urls = Map.keys(reachable_urls_metadata) + + remote_users + |> Enum.filter(&(&1.info.salmon in reachable_urls)) |> Enum.each(fn remote_user -> - Task.start(fn -> - Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) - send_to_user(remote_user, feed, poster) - end) + Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) + - Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ ++ Pleroma.Web.Federator.publish_single_salmon(%{ + recipient: remote_user, + feed: feed, + poster: poster, + unreachable_since: reachable_urls_metadata[remote_user.info.salmon] + }) end) end end diff --cc lib/pleroma/web/websub/websub.ex index 652ffd92c,90ba79962..82845f13d --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@@ -5,9 -5,10 +5,10 @@@ defmodule Pleroma.Web.Websub do alias Ecto.Changeset alias Pleroma.Repo + alias Pleroma.Instances alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription} alias Pleroma.Web.OStatus.FeedRepresenter - alias Pleroma.Web.{XML, Endpoint, OStatus} + alias Pleroma.Web.{XML, Endpoint, OStatus, Federator} alias Pleroma.Web.Router.Helpers require Logger @@@ -74,10 -80,11 +80,11 @@@ xml: response, topic: topic, callback: sub.callback, - secret: sub.secret + secret: sub.secret, + unreachable_since: reachable_callbacks_metadata[sub.callback] } - Pleroma.Web.Federator.enqueue(:publish_single_websub, data) + Federator.publish_single_websub(data) end) end