Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs
authorEgor Kislitsyn <egor@kislitsyn.com>
Mon, 11 Feb 2019 06:54:21 +0000 (13:54 +0700)
committerEgor Kislitsyn <egor@kislitsyn.com>
Mon, 11 Feb 2019 06:54:21 +0000 (13:54 +0700)
# Conflicts:
# lib/pleroma/web/federator/federator.ex
# lib/pleroma/web/websub/websub.ex

1  2 
config/config.exs
docs/config.md
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/activity_pub_controller.ex
lib/pleroma/web/activity_pub/utils.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/ostatus/ostatus_controller.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/web/websub/websub.ex
lib/pleroma/web/websub/websub_controller.ex

diff --combined config/config.exs
index 0db940fdc84afe5d18d59121d14c7eba01672beb,5db0ea9aa9a5397834fa53514c7b73fb71392724..9e4ee8d02e1ecc576f649843195db6223362c0f8
@@@ -115,7 -115,7 +115,7 @@@ config :logger, :console
  config :logger, :ex_syslogger,
    level: :debug,
    ident: "Pleroma",
-   format: "$date $time $metadata[$level] $message",
+   format: "$metadata[$level] $message",
    metadata: [:request_id]
  
  config :mime, :types, %{
@@@ -238,6 -238,11 +238,11 @@@ config :pleroma, :mrf_simple
    reject: [],
    accept: []
  
+ config :pleroma, :mrf_keyword,
+   reject: [],
+   federated_timeline_removal: [],
+   replace: []
  config :pleroma, :rich_media, enabled: true
  
  config :pleroma, :media_proxy,
@@@ -325,16 -330,14 +330,16 @@@ config :pleroma, Pleroma.User
      "web"
    ]
  
 -config :pleroma, Pleroma.Web.Federator, max_jobs: 50
 -
  config :pleroma, Pleroma.Web.Federator.RetryQueue,
    enabled: false,
    max_jobs: 20,
    initial_timeout: 30,
    max_retries: 5
  
 +config :pleroma, Pleroma.Jobs,
 +  federator_incoming: [max_jobs: 50],
 +  federator_outgoing: [max_jobs: 50]
 +
  # Import environment specific config. This must remain at the bottom
  # of this file so it overrides the configuration defined above.
  import_config "#{Mix.env()}.exs"
diff --combined docs/config.md
index 5a32f9425c81007c1ca8cea985cc586ea6ebbb6b,74badd0da29d875e40330b0dafb0f754687fe5de..eb5f88e12ed89036091ac16451de655286f16d61
@@@ -36,15 -36,14 +36,15 @@@ This filter replaces the filename (not 
  
  An example for Sendgrid adapter:
  
 -```
 +```exs
  config :pleroma, Pleroma.Mailer,
    adapter: Swoosh.Adapters.Sendgrid,
    api_key: "YOUR_API_KEY"
  ```
  
  An example for SMTP adapter:
 -```
 +
 +```exs
  config :pleroma, Pleroma.Mailer,
    adapter: Swoosh.Adapters.SMTP,
    relay: "smtp.gmail.com",
  
  ## :logger
  * `backends`: `:console` is used to send logs to stdout, `{ExSyslogger, :ex_syslogger}` to log to syslog
+ An example to enable ONLY ExSyslogger (f/ex in ``prod.secret.exs``) with info and debug suppressed:
+ ```
+ config :logger, 
+   backends: [{ExSyslogger, :ex_syslogger}]
+ config :logger, :ex_syslogger,
+   level: :warn
+ ```
+ Another example, keeping console output and adding the pid to syslog output:
+ ```
+ config :logger,
+   backends: [:console, {ExSyslogger, :ex_syslogger}]
+ config :logger, :ex_syslogger,
+   level: :warn,
+   option: [:pid, :ndelay]
+ ```
  See: [logger’s documentation](https://hexdocs.pm/logger/Logger.html) and [ex_syslogger’s documentation](https://hexdocs.pm/ex_syslogger/)
  
  
@@@ -152,6 -171,11 +172,11 @@@ This section is used to configure Plero
  * `delist_threshold`: Number of mentioned users after which the message gets delisted (the message can still be seen, but it will not show up in public timelines and mentioned users won't get notifications about it). Set to 0 to disable.
  * `reject_threshold`: Number of mentioned users after which the messaged gets rejected. Set to 0 to disable.
  
+ ## :mrf_keyword
+ * `reject`: A list of patterns which result in message being rejected, each pattern can be a string or a [regular expression](https://hexdocs.pm/elixir/Regex.html)
+ * `federated_timeline_removal`: A list of patterns which result in message being removed from federated timelines (a.k.a unlisted), each pattern can be a string or a [regular expression](https://hexdocs.pm/elixir/Regex.html)
+ * `replace`: A list of tuples containing `{pattern, replacement}`, `pattern` can be a string or a [regular expression](https://hexdocs.pm/elixir/Regex.html)
  ## :media_proxy
  * `enabled`: Enables proxying of remote media to the instance’s proxy
  * `base_url`: The base URL to access a user-uploaded file. Useful when you want to proxy the media files via another host/CDN fronts.
@@@ -183,7 -207,7 +208,7 @@@ their ActivityPub ID
  
  An example:
  
 -```
 +```exs
  config :pleroma, :mrf_user_allowlist,
    "example.org": ["https://example.org/users/admin"]
  ```
@@@ -212,34 -236,18 +237,34 @@@ the source code is here: https://github
  
  Allows to set a token that can be used to authenticate with the admin api without using an actual user by giving it as the 'admin_token' parameter. Example:
  
 -```
 +```exs
  config :pleroma, :admin_token, "somerandomtoken"
  ```
  
  You can then do
 -```
 +
 +```sh
  curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
  ```
  
 -## Pleroma.Web.Federator
 +## Pleroma.Jobs
 +
 +A list of job queues and their settings.
 +
 +Job queue settings:
 +
 +* `max_jobs`: The maximum amount of parallel jobs running at the same time.
 +
 +Example:
 +
 +```exs
 +config :pleroma, Pleroma.Jobs,
 +  federator_incoming: [max_jobs: 50],
 +  federator_outgoing: [max_jobs: 50]
 +```
 +
 +This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
  
 -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
  
  ## Pleroma.Web.Federator.RetryQueue
  
index 464d39c047d7b1d1210aacf20397420d18d5efc9,c46d8233e04aa77dfef3a4da5e0d49d66679454f..501c8b845656b747cc99c4827e6b3d8cfbb5a610
@@@ -3,13 -3,22 +3,22 @@@
  # SPDX-License-Identifier: AGPL-3.0-only
  
  defmodule Pleroma.Web.ActivityPub.ActivityPub do
-   alias Pleroma.{Activity, Repo, Object, Upload, User, Notification, Instances}
-   alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
+   alias Pleroma.Activity
+   alias Pleroma.Repo
+   alias Pleroma.Object
+   alias Pleroma.Upload
+   alias Pleroma.User
+   alias Pleroma.Notification
+   alias Pleroma.Instances
+   alias Pleroma.Web.ActivityPub.Transmogrifier
+   alias Pleroma.Web.ActivityPub.MRF
    alias Pleroma.Web.WebFinger
    alias Pleroma.Web.Federator
    alias Pleroma.Web.OStatus
    import Ecto.Query
    import Pleroma.Web.ActivityPub.Utils
    require Logger
  
    @httpoison Application.get_env(:pleroma, :httpoison)
    defp get_recipients(%{"type" => "Announce"} = data) do
      to = data["to"] || []
      cc = data["cc"] || []
-     recipients = to ++ cc
      actor = User.get_cached_by_ap_id(data["actor"])
  
-     recipients
-     |> Enum.filter(fn recipient ->
-       case User.get_cached_by_ap_id(recipient) do
-         nil ->
-           true
+     recipients =
+       (to ++ cc)
+       |> Enum.filter(fn recipient ->
+         case User.get_cached_by_ap_id(recipient) do
+           nil ->
+             true
  
-         user ->
-           User.following?(user, actor)
-       end
-     end)
+           user ->
+             User.following?(user, actor)
+         end
+       end)
  
      {recipients, to, cc}
    end
            activity.data["object"]
            |> Map.get("tag", [])
            |> Enum.filter(fn tag -> is_bitstring(tag) end)
-           |> Enum.map(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
+           |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
  
            if activity.data["object"]["attachment"] != [] do
              Pleroma.Web.Streamer.stream("public:media", activity)
  
      public = is_public?(activity)
  
 -    reachable_inboxes_metadata =
 -      (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
 -      |> Enum.filter(fn user -> User.ap_enabled?(user) end)
 -      |> Enum.map(fn %{info: %{source_data: data}} ->
 -        (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
 -      end)
 -      |> Enum.uniq()
 -      |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
 -      |> Instances.filter_reachable()
 -
      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
      json = Jason.encode!(data)
  
 -    Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
 -      Federator.enqueue(:publish_single_ap, %{
 +    (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
 +    |> Enum.filter(fn user -> User.ap_enabled?(user) end)
 +    |> Enum.map(fn %{info: %{source_data: data}} ->
 +      (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
 +    end)
 +    |> Enum.uniq()
 +    |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
 +    |> Instances.filter_reachable()
 +    |> Enum.each(fn {inbox, unreachable_since} ->
 +      Federator.publish_single_ap(%{
          inbox: inbox,
          json: json,
          actor: actor,
index afc01398fe73efef0c7d6edfe5dab708d2602f9b,69879476eb4b26f855d3b65d373c410f20c454ec..2bea5131148330f7a3d87e5236dc648d89815ba6
@@@ -5,12 -5,15 +5,15 @@@
  defmodule Pleroma.Web.ActivityPub.ActivityPubController do
    use Pleroma.Web, :controller
  
-   alias Pleroma.{Activity, User, Object}
-   alias Pleroma.Web.ActivityPub.{ObjectView, UserView}
+   alias Pleroma.Activity
+   alias Pleroma.User
+   alias Pleroma.Object
+   alias Pleroma.Web.ActivityPub.ObjectView
+   alias Pleroma.Web.ActivityPub.UserView
    alias Pleroma.Web.ActivityPub.ActivityPub
    alias Pleroma.Web.ActivityPub.Relay
-   alias Pleroma.Web.ActivityPub.Utils
    alias Pleroma.Web.ActivityPub.Transmogrifier
+   alias Pleroma.Web.ActivityPub.Utils
    alias Pleroma.Web.Federator
  
    require Logger
      with %User{} = user <- User.get_cached_by_nickname(nickname),
           true <- Utils.recipient_in_message(user.ap_id, params),
           params <- Utils.maybe_splice_recipient(user.ap_id, params) do
 -      Federator.enqueue(:incoming_ap_doc, params)
 +      Federator.incoming_ap_doc(params)
        json(conn, "ok")
      end
    end
  
    def inbox(%{assigns: %{valid_signature: true}} = conn, params) do
 -    Federator.enqueue(:incoming_ap_doc, params)
 +    Federator.incoming_ap_doc(params)
      json(conn, "ok")
    end
  
index 83086dcec7192f928e0fdd7361a86879f02ea01c,da6cca4ddd4052b93013b8c0c3b30a4fd88002b0..e34dd1d4112a1cfd29ac7a9190d670f6456cb1a8
@@@ -3,11 -3,19 +3,19 @@@
  # SPDX-License-Identifier: AGPL-3.0-only
  
  defmodule Pleroma.Web.ActivityPub.Utils do
-   alias Pleroma.{Repo, Web, Object, Activity, User, Notification}
+   alias Pleroma.Repo
+   alias Pleroma.Web
+   alias Pleroma.Object
+   alias Pleroma.Activity
+   alias Pleroma.User
+   alias Pleroma.Notification
    alias Pleroma.Web.Router.Helpers
    alias Pleroma.Web.Endpoint
-   alias Ecto.{Changeset, UUID}
+   alias Ecto.Changeset
+   alias Ecto.UUID
    import Ecto.Query
    require Logger
  
    @supported_object_types ["Article", "Note", "Video", "Page"]
      context = context || generate_id("contexts")
      changeset = Object.context_mapping(context)
  
-     case Repo.insert(changeset) do
-       {:ok, object} ->
-         object
-       # This should be solved by an upsert, but it seems ecto
-       # has problems accessing the constraint inside the jsonb.
-       {:error, _} ->
-         Object.get_cached_by_ap_id(context)
+     with {:ok, object} <- Object.insert_or_get(changeset) do
+       object
      end
    end
  
          _ -> 5
        end
  
 -    Pleroma.Web.Federator.enqueue(:publish, activity, priority)
 +    Pleroma.Web.Federator.publish(activity, priority)
      :ok
    end
  
index 37100c9e6277f0fdabb7dced65ea42df0dff9ce1,468959a65f34883b55d07593d149b043374195eb..7df75aca602264b13a49e6d88705b96d8575dc9b
@@@ -3,79 -3,58 +3,82 @@@
  # SPDX-License-Identifier: AGPL-3.0-only
  
  defmodule Pleroma.Web.Federator do
-   alias Pleroma.User
 -  use GenServer
 -
    alias Pleroma.Activity
-   alias Pleroma.Jobs
-   alias Pleroma.Web.{WebFinger, Websub, Salmon}
-   alias Pleroma.Web.Federator.RetryQueue
+   alias Pleroma.User
+   alias Pleroma.Web.WebFinger
+   alias Pleroma.Web.Websub
+   alias Pleroma.Web.Salmon
    alias Pleroma.Web.ActivityPub.ActivityPub
    alias Pleroma.Web.ActivityPub.Relay
    alias Pleroma.Web.ActivityPub.Transmogrifier
    alias Pleroma.Web.ActivityPub.Utils
+   alias Pleroma.Web.Federator.RetryQueue
    alias Pleroma.Web.OStatus
++  alias Pleroma.Jobs
    require Logger
  
    @websub Application.get_env(:pleroma, :websub)
    @ostatus Application.get_env(:pleroma, :ostatus)
  
 -  def init(args) do
 -    {:ok, args}
 +  def init() do
 +    # 1 minute
 +    Process.sleep(1000 * 60 * 1)
 +    refresh_subscriptions()
    end
  
 -  def start_link do
 -    spawn(fn ->
 -      # 1 minute
 -      Process.sleep(1000 * 60)
 -      enqueue(:refresh_subscriptions, nil)
 -    end)
 +  # Client API
 +
 +  def incoming_doc(doc) do
 +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
 +  end
 +
 +  def incoming_ap_doc(params) do
 +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
 +  end
 +
 +  def publish(activity, priority \\ 1) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority)
 +  end
 +
 +  def publish_single_ap(params) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params])
 +  end
  
 -    GenServer.start_link(
 -      __MODULE__,
 -      %{
 -        in: {:sets.new(), []},
 -        out: {:sets.new(), []}
 -      },
 -      name: __MODULE__
 -    )
 +  def publish_single_websub(websub) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub])
    end
  
 -  def handle(:refresh_subscriptions, _) do
 +  def verify_websub(websub) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub])
 +  end
 +
 +  def request_subscription(sub) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub])
 +  end
 +
 +  def refresh_subscriptions() do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions])
 +  end
 +
 +  def publish_single_salmon(params) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_salmon, params])
 +  end
 +
 +  # Job Worker Callbacks
 +
 +  def perform(:refresh_subscriptions) do
      Logger.debug("Federator running refresh subscriptions")
      Websub.refresh_subscriptions()
  
      spawn(fn ->
        # 6 hours
        Process.sleep(1000 * 60 * 60 * 6)
 -      enqueue(:refresh_subscriptions, nil)
 +      refresh_subscriptions()
      end)
    end
  
 -  def handle(:request_subscription, websub) do
 +  def perform(:request_subscription, websub) do
      Logger.debug("Refreshing #{websub.topic}")
  
      with {:ok, websub} <- Websub.request_subscription(websub) do
@@@ -85,7 -64,7 +88,7 @@@
      end
    end
  
 -  def handle(:publish, activity) do
 +  def perform(:publish, activity) do
      Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
  
      with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
      end
    end
  
 -  def handle(:verify_websub, websub) do
 +  def perform(:verify_websub, websub) do
      Logger.debug(fn ->
        "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
      end)
      @websub.verify(websub)
    end
  
 -  def handle(:incoming_doc, doc) do
 +  def perform(:incoming_doc, doc) do
      Logger.info("Got document, trying to parse")
      @ostatus.handle_incoming(doc)
    end
  
 -  def handle(:incoming_ap_doc, params) do
 +  def perform(:incoming_ap_doc, params) do
      Logger.info("Handling incoming AP activity")
  
      params = Utils.normalize_params(params)
      end
    end
  
 -  def handle(:publish_single_salmon, params) do
 +  def perform(:publish_single_salmon, params) do
      Salmon.send_to_user(params)
    end
  
 -  def handle(:publish_single_ap, params) do
 +  def perform(:publish_single_ap, params) do
      case ActivityPub.publish_one(params) do
        {:ok, _} ->
          :ok
      end
    end
  
 -  def handle(
 +  def perform(
          :publish_single_websub,
          %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
        ) do
      end
    end
  
 -  def handle(type, _) do
 +  def perform(type, _) do
      Logger.debug(fn -> "Unknown task: #{type}" end)
      {:error, "Don't know what to do with this"}
    end
  
 -  if Mix.env() == :test do
 -    def enqueue(type, payload, _priority \\ 1) do
 -      if Pleroma.Config.get([:instance, :federating]) do
 -        handle(type, payload)
 -      end
 -    end
 -  else
 -    def enqueue(type, payload, priority \\ 1) do
 -      if Pleroma.Config.get([:instance, :federating]) do
 -        GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
 -      end
 -    end
 -  end
 -
 -  def maybe_start_job(running_jobs, queue) do
 -    if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
 -      {{type, payload}, queue} = queue_pop(queue)
 -      {:ok, pid} = Task.start(fn -> handle(type, payload) end)
 -      mref = Process.monitor(pid)
 -      {:sets.add_element(mref, running_jobs), queue}
 -    else
 -      {running_jobs, queue}
 -    end
 -  end
 -
 -  def handle_cast({:enqueue, type, payload, _priority}, state)
 -      when type in [:incoming_doc, :incoming_ap_doc] do
 -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
 -    i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
 -    {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
 -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
 -  end
 -
 -  def handle_cast({:enqueue, type, payload, _priority}, state) do
 -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
 -    o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
 -    {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
 -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
 -  end
 -
 -  def handle_cast(_, state) do
 -    {:noreply, state}
 -  end
 -
 -  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
 -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
 -    i_running_jobs = :sets.del_element(ref, i_running_jobs)
 -    o_running_jobs = :sets.del_element(ref, o_running_jobs)
 -    {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
 -    {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
 -
 -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
 -  end
 -
 -  def enqueue_sorted(queue, element, priority) do
 -    [%{item: element, priority: priority} | queue]
 -    |> Enum.sort_by(fn %{priority: priority} -> priority end)
 -  end
 -
 -  def queue_pop([%{item: element} | queue]) do
 -    {element, queue}
 -  end
 -
    def ap_enabled_actor(id) do
      user = User.get_by_ap_id(id)
  
index 9ad3d3bd1bec89f8039da50cbf5043bb68a3976b,db4c8f4daf7d45c55656614379ffa196e7958778..bab3da2b06f385f4906c6d4031623353d931677b
@@@ -5,13 -5,17 +5,17 @@@
  defmodule Pleroma.Web.OStatus.OStatusController do
    use Pleroma.Web, :controller
  
-   alias Pleroma.{User, Activity, Object}
-   alias Pleroma.Web.OStatus.{FeedRepresenter, ActivityRepresenter}
-   alias Pleroma.Web.{OStatus, Federator}
-   alias Pleroma.Web.XML
-   alias Pleroma.Web.ActivityPub.ObjectView
-   alias Pleroma.Web.ActivityPub.ActivityPubController
+   alias Pleroma.Activity
+   alias Pleroma.Object
+   alias Pleroma.User
    alias Pleroma.Web.ActivityPub.ActivityPub
+   alias Pleroma.Web.ActivityPub.ActivityPubController
+   alias Pleroma.Web.ActivityPub.ObjectView
+   alias Pleroma.Web.OStatus.ActivityRepresenter
+   alias Pleroma.Web.OStatus.FeedRepresenter
+   alias Pleroma.Web.Federator
+   alias Pleroma.Web.OStatus
+   alias Pleroma.Web.XML
  
    plug(Pleroma.Web.FederatingPlug when action in [:salmon_incoming])
  
@@@ -83,7 -87,7 +87,7 @@@
      {:ok, body, _conn} = read_body(conn)
      {:ok, doc} = decode_or_retry(body)
  
 -    Federator.enqueue(:incoming_doc, doc)
 +    Federator.incoming_doc(doc)
  
      conn
      |> send_resp(200, "")
index 3db0c07fd6e6e334e9ecf65742195548d811faf4,a5a9e16c62cfd2f821234e960ce947f98d22ea19..0a69aa1ec9091530d8c99b4859afb898f4f74544
@@@ -6,10 -6,12 +6,12 @@@ defmodule Pleroma.Web.Salmon d
    @httpoison Application.get_env(:pleroma, :httpoison)
  
    use Bitwise
    alias Pleroma.Instances
+   alias Pleroma.User
    alias Pleroma.Web.XML
    alias Pleroma.Web.OStatus.ActivityRepresenter
-   alias Pleroma.User
    require Logger
  
    def decode(salmon) do
        |> Enum.each(fn remote_user ->
          Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
  
 -        Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{
 +        Pleroma.Web.Federator.publish_single_salmon(%{
            recipient: remote_user,
            feed: feed,
            poster: poster,
index 82845f13dbf100b293e0f97e65a96671dac43886,a08d7993d007915d63a00265ab4389ab2e825a37..c00ec08582aa328598d14fa02019d14833ecea1f
@@@ -4,12 -4,15 +4,16 @@@
  
  defmodule Pleroma.Web.Websub do
    alias Ecto.Changeset
-   alias Pleroma.Repo
    alias Pleroma.Instances
-   alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
+   alias Pleroma.Repo
+   alias Pleroma.Web.Websub.WebsubServerSubscription
+   alias Pleroma.Web.Websub.WebsubClientSubscription
    alias Pleroma.Web.OStatus.FeedRepresenter
-   alias Pleroma.Web.{XML, Endpoint, OStatus, Federator}
+   alias Pleroma.Web.XML
+   alias Pleroma.Web.Endpoint
+   alias Pleroma.Web.OStatus
    alias Pleroma.Web.Router.Helpers
++  alias Pleroma.Web.Federator
    require Logger
  
    import Ecto.Query
@@@ -84,7 -87,7 +88,7 @@@
          unreachable_since: reachable_callbacks_metadata[sub.callback]
        }
  
 -      Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
 +      Federator.publish_single_websub(data)
      end)
    end
  
  
        websub = Repo.update!(change)
  
 -      Pleroma.Web.Federator.enqueue(:verify_websub, websub)
 +      Federator.verify_websub(websub)
  
        {:ok, websub}
      else
      subs = Repo.all(query)
  
      Enum.each(subs, fn sub ->
 -      Pleroma.Web.Federator.enqueue(:request_subscription, sub)
 +      Federator.request_subscription(sub)
      end)
    end
  
index 41bbc0369ceb925c9a4c65d4a0f45018126e5d94,1ad18a8a4edad84fae8252a4ca5b266b6a62096e..ad40f1b9471f5ddbbbf601fa1c5967e55f933a3e
@@@ -5,8 -5,10 +5,10 @@@
  defmodule Pleroma.Web.Websub.WebsubController do
    use Pleroma.Web, :controller
  
-   alias Pleroma.{Repo, User}
-   alias Pleroma.Web.{Websub, Federator}
+   alias Pleroma.Repo
+   alias Pleroma.User
+   alias Pleroma.Web.Websub
+   alias Pleroma.Web.Federator
    alias Pleroma.Web.Websub.WebsubClientSubscription
  
    require Logger
@@@ -82,7 -84,7 +84,7 @@@
           %WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id),
           {:ok, body, _conn} = read_body(conn),
           ^signature <- Websub.sign(websub.secret, body) do
 -      Federator.enqueue(:incoming_doc, body)
 +      Federator.incoming_doc(body)
  
        conn
        |> send_resp(200, "OK")