From: Egor Kislitsyn Date: Mon, 11 Feb 2019 06:54:21 +0000 (+0700) Subject: Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs X-Git-Url: http://git.squeep.com/?a=commitdiff_plain;h=305d2194136d4560e02c110d528164034d3727b2;hp=-c;p=akkoma Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs # Conflicts: # lib/pleroma/web/federator/federator.ex # lib/pleroma/web/websub/websub.ex --- 305d2194136d4560e02c110d528164034d3727b2 diff --combined config/config.exs index 0db940fdc,5db0ea9aa..9e4ee8d02 --- a/config/config.exs +++ b/config/config.exs @@@ -115,7 -115,7 +115,7 @@@ config :logger, :console config :logger, :ex_syslogger, level: :debug, ident: "Pleroma", - format: "$date $time $metadata[$level] $message", + format: "$metadata[$level] $message", metadata: [:request_id] config :mime, :types, %{ @@@ -238,6 -238,11 +238,11 @@@ config :pleroma, :mrf_simple reject: [], accept: [] + config :pleroma, :mrf_keyword, + reject: [], + federated_timeline_removal: [], + replace: [] + config :pleroma, :rich_media, enabled: true config :pleroma, :media_proxy, @@@ -325,16 -330,14 +330,16 @@@ config :pleroma, Pleroma.User "web" ] -config :pleroma, Pleroma.Web.Federator, max_jobs: 50 - config :pleroma, Pleroma.Web.Federator.RetryQueue, enabled: false, max_jobs: 20, initial_timeout: 30, max_retries: 5 +config :pleroma, Pleroma.Jobs, + federator_incoming: [max_jobs: 50], + federator_outgoing: [max_jobs: 50] + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --combined docs/config.md index 5a32f9425,74badd0da..eb5f88e12 --- a/docs/config.md +++ b/docs/config.md @@@ -36,15 -36,14 +36,15 @@@ This filter replaces the filename (not An example for Sendgrid adapter: -``` +```exs config :pleroma, Pleroma.Mailer, adapter: Swoosh.Adapters.Sendgrid, api_key: "YOUR_API_KEY" ``` An example for SMTP adapter: -``` + +```exs config :pleroma, Pleroma.Mailer, adapter: Swoosh.Adapters.SMTP, relay: "smtp.gmail.com", @@@ -101,6 -100,26 +101,26 @@@ ## :logger * `backends`: `:console` is used to send logs to stdout, `{ExSyslogger, :ex_syslogger}` to log to syslog + + An example to enable ONLY ExSyslogger (f/ex in ``prod.secret.exs``) with info and debug suppressed: + ``` + config :logger, + backends: [{ExSyslogger, :ex_syslogger}] + + config :logger, :ex_syslogger, + level: :warn + ``` + + Another example, keeping console output and adding the pid to syslog output: + ``` + config :logger, + backends: [:console, {ExSyslogger, :ex_syslogger}] + + config :logger, :ex_syslogger, + level: :warn, + option: [:pid, :ndelay] + ``` + See: [logger’s documentation](https://hexdocs.pm/logger/Logger.html) and [ex_syslogger’s documentation](https://hexdocs.pm/ex_syslogger/) @@@ -152,6 -171,11 +172,11 @@@ This section is used to configure Plero * `delist_threshold`: Number of mentioned users after which the message gets delisted (the message can still be seen, but it will not show up in public timelines and mentioned users won't get notifications about it). Set to 0 to disable. * `reject_threshold`: Number of mentioned users after which the messaged gets rejected. Set to 0 to disable. + ## :mrf_keyword + * `reject`: A list of patterns which result in message being rejected, each pattern can be a string or a [regular expression](https://hexdocs.pm/elixir/Regex.html) + * `federated_timeline_removal`: A list of patterns which result in message being removed from federated timelines (a.k.a unlisted), each pattern can be a string or a [regular expression](https://hexdocs.pm/elixir/Regex.html) + * `replace`: A list of tuples containing `{pattern, replacement}`, `pattern` can be a string or a [regular expression](https://hexdocs.pm/elixir/Regex.html) + ## :media_proxy * `enabled`: Enables proxying of remote media to the instance’s proxy * `base_url`: The base URL to access a user-uploaded file. Useful when you want to proxy the media files via another host/CDN fronts. @@@ -183,7 -207,7 +208,7 @@@ their ActivityPub ID An example: -``` +```exs config :pleroma, :mrf_user_allowlist, "example.org": ["https://example.org/users/admin"] ``` @@@ -212,34 -236,18 +237,34 @@@ the source code is here: https://github Allows to set a token that can be used to authenticate with the admin api without using an actual user by giving it as the 'admin_token' parameter. Example: -``` +```exs config :pleroma, :admin_token, "somerandomtoken" ``` You can then do -``` + +```sh curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" ``` -## Pleroma.Web.Federator +## Pleroma.Jobs + +A list of job queues and their settings. + +Job queue settings: + +* `max_jobs`: The maximum amount of parallel jobs running at the same time. + +Example: + +```exs +config :pleroma, Pleroma.Jobs, + federator_incoming: [max_jobs: 50], + federator_outgoing: [max_jobs: 50] +``` + +This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. ## Pleroma.Web.Federator.RetryQueue diff --combined lib/pleroma/web/activity_pub/activity_pub.ex index 464d39c04,c46d8233e..501c8b845 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@@ -3,13 -3,22 +3,22 @@@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.ActivityPub do - alias Pleroma.{Activity, Repo, Object, Upload, User, Notification, Instances} - alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF} + alias Pleroma.Activity + alias Pleroma.Repo + alias Pleroma.Object + alias Pleroma.Upload + alias Pleroma.User + alias Pleroma.Notification + alias Pleroma.Instances + alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.WebFinger alias Pleroma.Web.Federator alias Pleroma.Web.OStatus + import Ecto.Query import Pleroma.Web.ActivityPub.Utils + require Logger @httpoison Application.get_env(:pleroma, :httpoison) @@@ -19,19 -28,19 +28,19 @@@ defp get_recipients(%{"type" => "Announce"} = data) do to = data["to"] || [] cc = data["cc"] || [] - recipients = to ++ cc actor = User.get_cached_by_ap_id(data["actor"]) - recipients - |> Enum.filter(fn recipient -> - case User.get_cached_by_ap_id(recipient) do - nil -> - true + recipients = + (to ++ cc) + |> Enum.filter(fn recipient -> + case User.get_cached_by_ap_id(recipient) do + nil -> + true - user -> - User.following?(user, actor) - end - end) + user -> + User.following?(user, actor) + end + end) {recipients, to, cc} end @@@ -119,7 -128,7 +128,7 @@@ activity.data["object"] |> Map.get("tag", []) |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.map(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) + |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) if activity.data["object"]["attachment"] != [] do Pleroma.Web.Streamer.stream("public:media", activity) @@@ -744,19 -753,21 +753,19 @@@ public = is_public?(activity) - reachable_inboxes_metadata = - (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) - |> Enum.filter(fn user -> User.ap_enabled?(user) end) - |> Enum.map(fn %{info: %{source_data: data}} -> - (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> - Federator.enqueue(:publish_single_ap, %{ + (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) + |> Enum.filter(fn user -> User.ap_enabled?(user) end) + |> Enum.map(fn %{info: %{source_data: data}} -> + (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] + end) + |> Enum.uniq() + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() + |> Enum.each(fn {inbox, unreachable_since} -> + Federator.publish_single_ap(%{ inbox: inbox, json: json, actor: actor, diff --combined lib/pleroma/web/activity_pub/activity_pub_controller.ex index afc01398f,69879476e..2bea51311 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@@ -5,12 -5,15 +5,15 @@@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do use Pleroma.Web, :controller - alias Pleroma.{Activity, User, Object} - alias Pleroma.Web.ActivityPub.{ObjectView, UserView} + alias Pleroma.Activity + alias Pleroma.User + alias Pleroma.Object + alias Pleroma.Web.ActivityPub.ObjectView + alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Relay - alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator require Logger @@@ -152,13 -155,13 +155,13 @@@ with %User{} = user <- User.get_cached_by_nickname(nickname), true <- Utils.recipient_in_message(user.ap_id, params), params <- Utils.maybe_splice_recipient(user.ap_id, params) do - Federator.enqueue(:incoming_ap_doc, params) + Federator.incoming_ap_doc(params) json(conn, "ok") end end def inbox(%{assigns: %{valid_signature: true}} = conn, params) do - Federator.enqueue(:incoming_ap_doc, params) + Federator.incoming_ap_doc(params) json(conn, "ok") end diff --combined lib/pleroma/web/activity_pub/utils.ex index 83086dcec,da6cca4dd..e34dd1d41 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@@ -3,11 -3,19 +3,19 @@@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.Utils do - alias Pleroma.{Repo, Web, Object, Activity, User, Notification} + alias Pleroma.Repo + alias Pleroma.Web + alias Pleroma.Object + alias Pleroma.Activity + alias Pleroma.User + alias Pleroma.Notification alias Pleroma.Web.Router.Helpers alias Pleroma.Web.Endpoint - alias Ecto.{Changeset, UUID} + alias Ecto.Changeset + alias Ecto.UUID + import Ecto.Query + require Logger @supported_object_types ["Article", "Note", "Video", "Page"] @@@ -134,14 -142,8 +142,8 @@@ context = context || generate_id("contexts") changeset = Object.context_mapping(context) - case Repo.insert(changeset) do - {:ok, object} -> - object - - # This should be solved by an upsert, but it seems ecto - # has problems accessing the constraint inside the jsonb. - {:error, _} -> - Object.get_cached_by_ap_id(context) + with {:ok, object} <- Object.insert_or_get(changeset) do + object end end @@@ -156,7 -158,7 +158,7 @@@ _ -> 5 end - Pleroma.Web.Federator.enqueue(:publish, activity, priority) + Pleroma.Web.Federator.publish(activity, priority) :ok end diff --combined lib/pleroma/web/federator/federator.ex index 37100c9e6,468959a65..7df75aca6 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@@ -3,79 -3,58 +3,82 @@@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - alias Pleroma.User - use GenServer - alias Pleroma.Activity - alias Pleroma.Jobs - alias Pleroma.Web.{WebFinger, Websub, Salmon} - alias Pleroma.Web.Federator.RetryQueue + alias Pleroma.User + alias Pleroma.Web.WebFinger + alias Pleroma.Web.Websub + alias Pleroma.Web.Salmon alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils + alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.OStatus ++ alias Pleroma.Jobs + require Logger @websub Application.get_env(:pleroma, :websub) @ostatus Application.get_env(:pleroma, :ostatus) - def init(args) do - {:ok, args} + def init() do + # 1 minute + Process.sleep(1000 * 60 * 1) + refresh_subscriptions() end - def start_link do - spawn(fn -> - # 1 minute - Process.sleep(1000 * 60) - enqueue(:refresh_subscriptions, nil) - end) + # Client API + + def incoming_doc(doc) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + end + + def incoming_ap_doc(params) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + end + + def publish(activity, priority \\ 1) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority) + end + + def publish_single_ap(params) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params]) + end - GenServer.start_link( - __MODULE__, - %{ - in: {:sets.new(), []}, - out: {:sets.new(), []} - }, - name: __MODULE__ - ) + def publish_single_websub(websub) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub]) end - def handle(:refresh_subscriptions, _) do + def verify_websub(websub) do + Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub]) + end + + def request_subscription(sub) do + Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub]) + end + + def refresh_subscriptions() do + Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions]) + end + + def publish_single_salmon(params) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_salmon, params]) + end + + # Job Worker Callbacks + + def perform(:refresh_subscriptions) do Logger.debug("Federator running refresh subscriptions") Websub.refresh_subscriptions() spawn(fn -> # 6 hours Process.sleep(1000 * 60 * 60 * 6) - enqueue(:refresh_subscriptions, nil) + refresh_subscriptions() end) end - def handle(:request_subscription, websub) do + def perform(:request_subscription, websub) do Logger.debug("Refreshing #{websub.topic}") with {:ok, websub} <- Websub.request_subscription(websub) do @@@ -85,7 -64,7 +88,7 @@@ end end - def handle(:publish, activity) do + def perform(:publish, activity) do Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do @@@ -111,7 -90,7 +114,7 @@@ end end - def handle(:verify_websub, websub) do + def perform(:verify_websub, websub) do Logger.debug(fn -> "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" end) @@@ -119,12 -98,12 +122,12 @@@ @websub.verify(websub) end - def handle(:incoming_doc, doc) do + def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") @ostatus.handle_incoming(doc) end - def handle(:incoming_ap_doc, params) do + def perform(:incoming_ap_doc, params) do Logger.info("Handling incoming AP activity") params = Utils.normalize_params(params) @@@ -149,11 -128,11 +152,11 @@@ end end - def handle(:publish_single_salmon, params) do + def perform(:publish_single_salmon, params) do Salmon.send_to_user(params) end - def handle(:publish_single_ap, params) do + def perform(:publish_single_ap, params) do case ActivityPub.publish_one(params) do {:ok, _} -> :ok @@@ -163,7 -142,7 +166,7 @@@ end end - def handle( + def perform( :publish_single_websub, %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params ) do @@@ -176,11 -155,74 +179,11 @@@ end end - def handle(type, _) do + def perform(type, _) do Logger.debug(fn -> "Unknown task: #{type}" end) {:error, "Don't know what to do with this"} end - if Mix.env() == :test do - def enqueue(type, payload, _priority \\ 1) do - if Pleroma.Config.get([:instance, :federating]) do - handle(type, payload) - end - end - else - def enqueue(type, payload, priority \\ 1) do - if Pleroma.Config.get([:instance, :federating]) do - GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) - end - end - end - - def maybe_start_job(running_jobs, queue) do - if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do - {{type, payload}, queue} = queue_pop(queue) - {:ok, pid} = Task.start(fn -> handle(type, payload) end) - mref = Process.monitor(pid) - {:sets.add_element(mref, running_jobs), queue} - else - {running_jobs, queue} - end - end - - def handle_cast({:enqueue, type, payload, _priority}, state) - when type in [:incoming_doc, :incoming_ap_doc] do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_queue = enqueue_sorted(i_queue, {type, payload}, 1) - {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def handle_cast({:enqueue, type, payload, _priority}, state) do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - o_queue = enqueue_sorted(o_queue, {type, payload}, 1) - {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def handle_cast(_, state) do - {:noreply, state} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_running_jobs = :sets.del_element(ref, i_running_jobs) - o_running_jobs = :sets.del_element(ref, o_running_jobs) - {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) - {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def enqueue_sorted(queue, element, priority) do - [%{item: element, priority: priority} | queue] - |> Enum.sort_by(fn %{priority: priority} -> priority end) - end - - def queue_pop([%{item: element} | queue]) do - {element, queue} - end - def ap_enabled_actor(id) do user = User.get_by_ap_id(id) diff --combined lib/pleroma/web/ostatus/ostatus_controller.ex index 9ad3d3bd1,db4c8f4da..bab3da2b0 --- a/lib/pleroma/web/ostatus/ostatus_controller.ex +++ b/lib/pleroma/web/ostatus/ostatus_controller.ex @@@ -5,13 -5,17 +5,17 @@@ defmodule Pleroma.Web.OStatus.OStatusController do use Pleroma.Web, :controller - alias Pleroma.{User, Activity, Object} - alias Pleroma.Web.OStatus.{FeedRepresenter, ActivityRepresenter} - alias Pleroma.Web.{OStatus, Federator} - alias Pleroma.Web.XML - alias Pleroma.Web.ActivityPub.ObjectView - alias Pleroma.Web.ActivityPub.ActivityPubController + alias Pleroma.Activity + alias Pleroma.Object + alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.ActivityPubController + alias Pleroma.Web.ActivityPub.ObjectView + alias Pleroma.Web.OStatus.ActivityRepresenter + alias Pleroma.Web.OStatus.FeedRepresenter + alias Pleroma.Web.Federator + alias Pleroma.Web.OStatus + alias Pleroma.Web.XML plug(Pleroma.Web.FederatingPlug when action in [:salmon_incoming]) @@@ -83,7 -87,7 +87,7 @@@ {:ok, body, _conn} = read_body(conn) {:ok, doc} = decode_or_retry(body) - Federator.enqueue(:incoming_doc, doc) + Federator.incoming_doc(doc) conn |> send_resp(200, "") diff --combined lib/pleroma/web/salmon/salmon.ex index 3db0c07fd,a5a9e16c6..0a69aa1ec --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@@ -6,10 -6,12 +6,12 @@@ defmodule Pleroma.Web.Salmon d @httpoison Application.get_env(:pleroma, :httpoison) use Bitwise + alias Pleroma.Instances + alias Pleroma.User alias Pleroma.Web.XML alias Pleroma.Web.OStatus.ActivityRepresenter - alias Pleroma.User + require Logger def decode(salmon) do @@@ -227,7 -229,7 +229,7 @@@ |> Enum.each(fn remote_user -> Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) - Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ + Pleroma.Web.Federator.publish_single_salmon(%{ recipient: remote_user, feed: feed, poster: poster, diff --combined lib/pleroma/web/websub/websub.ex index 82845f13d,a08d7993d..c00ec0858 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@@ -4,12 -4,15 +4,16 @@@ defmodule Pleroma.Web.Websub do alias Ecto.Changeset - alias Pleroma.Repo alias Pleroma.Instances - alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription} + alias Pleroma.Repo + alias Pleroma.Web.Websub.WebsubServerSubscription + alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.OStatus.FeedRepresenter - alias Pleroma.Web.{XML, Endpoint, OStatus, Federator} + alias Pleroma.Web.XML + alias Pleroma.Web.Endpoint + alias Pleroma.Web.OStatus alias Pleroma.Web.Router.Helpers ++ alias Pleroma.Web.Federator require Logger import Ecto.Query @@@ -84,7 -87,7 +88,7 @@@ unreachable_since: reachable_callbacks_metadata[sub.callback] } - Pleroma.Web.Federator.enqueue(:publish_single_websub, data) + Federator.publish_single_websub(data) end) end @@@ -116,7 -119,7 +120,7 @@@ websub = Repo.update!(change) - Pleroma.Web.Federator.enqueue(:verify_websub, websub) + Federator.verify_websub(websub) {:ok, websub} else @@@ -266,7 -269,7 +270,7 @@@ subs = Repo.all(query) Enum.each(subs, fn sub -> - Pleroma.Web.Federator.enqueue(:request_subscription, sub) + Federator.request_subscription(sub) end) end diff --combined lib/pleroma/web/websub/websub_controller.ex index 41bbc0369,1ad18a8a4..ad40f1b94 --- a/lib/pleroma/web/websub/websub_controller.ex +++ b/lib/pleroma/web/websub/websub_controller.ex @@@ -5,8 -5,10 +5,10 @@@ defmodule Pleroma.Web.Websub.WebsubController do use Pleroma.Web, :controller - alias Pleroma.{Repo, User} - alias Pleroma.Web.{Websub, Federator} + alias Pleroma.Repo + alias Pleroma.User + alias Pleroma.Web.Websub + alias Pleroma.Web.Federator alias Pleroma.Web.Websub.WebsubClientSubscription require Logger @@@ -82,7 -84,7 +84,7 @@@ %WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id), {:ok, body, _conn} = read_body(conn), ^signature <- Websub.sign(websub.secret, body) do - Federator.enqueue(:incoming_doc, body) + Federator.incoming_doc(body) conn |> send_resp(200, "OK")