[#1149] Replaced RetryQueue with oban-based retries.
authorIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 1 Aug 2019 14:28:00 +0000 (17:28 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 1 Aug 2019 14:28:00 +0000 (17:28 +0300)
18 files changed:
config/config.exs
config/test.exs
docs/config.md
lib/pleroma/application.ex
lib/pleroma/web/activity_pub/publisher.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/publisher.ex
lib/pleroma/web/federator/retry_queue.ex [deleted file]
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/workers/publisher.ex [new file with mode: 0644]
mix.exs
mix.lock
priv/repo/migrations/20190730055101_add_oban_jobs_table.exs [new file with mode: 0644]
test/user_test.exs
test/web/activity_pub/publisher_test.exs
test/web/federator_test.exs
test/web/retry_queue_test.exs [deleted file]
test/web/salmon/salmon_test.exs

index 17770640a92f6833e5a670026a75ef4c53f455bf..1bb325bf558e584f2b16495fcd48deaa8fca479b 100644 (file)
@@ -440,13 +440,7 @@ config :pleroma, Pleroma.User,
     "web"
   ]
 
-config :pleroma, Pleroma.Web.Federator.RetryQueue,
-  enabled: false,
-  max_jobs: 20,
-  initial_timeout: 30,
-  max_retries: 5
-
-config :pleroma_job_queue, :queues,
+job_queues = [
   federator_incoming: 50,
   federator_outgoing: 50,
   web_push: 50,
@@ -454,6 +448,15 @@ config :pleroma_job_queue, :queues,
   transmogrifier: 20,
   scheduled_activities: 10,
   background: 5
+]
+
+config :pleroma_job_queue, :queues, job_queues
+
+config :pleroma, Oban,
+  repo: Pleroma.Repo,
+  verbose: false,
+  prune: {:maxage, 60 * 60 * 24 * 7},
+  queues: job_queues
 
 config :pleroma, :fetch_initial_posts,
   enabled: false,
index 92dca18bc166e746e28ea40965b1559c6e88e867..23d9bf77912fb1e3206f77d0d9e4e66b6a15e2d9 100644 (file)
@@ -62,6 +62,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
 
 config :pleroma_job_queue, disabled: true
 
+config :pleroma, Oban,
+  queues: false,
+  prune: :disabled
+
 config :pleroma, Pleroma.ScheduledActivity,
   daily_user_limit: 2,
   total_user_limit: 3,
index 02f86dc169abfb6f9be9215c9d7a949b885e40de..5c18ffdbfd814255721a8ace5995ef41445c6cb4 100644 (file)
@@ -412,13 +412,6 @@ config :pleroma_job_queue, :queues,
 
 This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
 
-## Pleroma.Web.Federator.RetryQueue
-
-* `enabled`: If set to `true`, failed federation jobs will be retried
-* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
-* `initial_timeout`: The initial timeout in seconds
-* `max_retries`: The maximum number of times a federation job is retried
-
 ## Pleroma.Web.Metadata
 * `providers`: a list of metadata providers to enable. Providers available:
   * Pleroma.Web.Metadata.Providers.OpenGraph
index 0353314914ba82c35d062adde2cf6db7caeedf46..ce7d8c4b28d989d709c542f2557df2086e493795 100644 (file)
@@ -120,8 +120,8 @@ defmodule Pleroma.Application do
         hackney_pool_children() ++
         [
           %{
-            id: Pleroma.Web.Federator.RetryQueue,
-            start: {Pleroma.Web.Federator.RetryQueue, :start_link, []}
+            id: Oban,
+            start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
           },
           %{
             id: Pleroma.Web.OAuth.Token.CleanWorker,
index 46edab0bd6c9e7144013b18c890d87adf34ef92e..29f3221d1a8d1c9718d68578fea5675e0d57d3eb 100644 (file)
@@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
     end
   end
 
+  def publish_one(%{actor_id: actor_id} = params) do
+    actor = User.get_by_id(actor_id)
+
+    params
+    |> Map.delete(:actor_id)
+    |> Map.put(:actor, actor)
+    |> publish_one()
+  end
+
   defp should_federate?(inbox, public) do
     if public do
       true
@@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
   Publishes an activity with BCC to all relevant peers.
   """
 
-  def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+  def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+      when is_list(bcc) and bcc != [] do
     public = is_public?(activity)
     {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
 
@@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
       Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
         inbox: inbox,
         json: json,
-        actor: actor,
+        actor_id: actor.id,
         id: activity.data["id"],
         unreachable_since: unreachable_since
       })
@@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
         %{
           inbox: inbox,
           json: json,
-          actor: actor,
+          actor_id: actor.id,
           id: activity.data["id"],
           unreachable_since: unreachable_since
         }
index f4f9e83e06ab55f71b715301631865bc46b11c00..97ec9d549db5c68c5cb83fcc117e154854af5d1e 100644 (file)
@@ -10,7 +10,6 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.Web.ActivityPub.Transmogrifier
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.Federator.Publisher
-  alias Pleroma.Web.Federator.RetryQueue
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub
 
@@ -130,19 +129,6 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
-  def perform(
-        :publish_single_websub,
-        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
-      ) do
-    case Websub.publish_one(params) do
-      {:ok, _} ->
-        :ok
-
-      {:error, _} ->
-        RetryQueue.enqueue(params, Websub)
-    end
-  end
-
   def perform(type, _) do
     Logger.debug(fn -> "Unknown task: #{type}" end)
     {:error, "Don't know what to do with this"}
index 70f870244fbb5f7af1283985620004f3791522f7..e8c1bf17f26c36e481c87bc8c8cee4b627edbd9c 100644 (file)
@@ -6,7 +6,6 @@ defmodule Pleroma.Web.Federator.Publisher do
   alias Pleroma.Activity
   alias Pleroma.Config
   alias Pleroma.User
-  alias Pleroma.Web.Federator.RetryQueue
 
   require Logger
 
@@ -30,23 +29,10 @@ defmodule Pleroma.Web.Federator.Publisher do
   Enqueue publishing a single activity.
   """
   @spec enqueue_one(module(), Map.t()) :: :ok
-  def enqueue_one(module, %{} = params),
-    do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
-  @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
-  def perform(:publish_one, module, params) do
-    case apply(module, :publish_one, [params]) do
-      {:ok, _} ->
-        :ok
-
-      {:error, _e} ->
-        RetryQueue.enqueue(params, module)
-    end
-  end
-
-  def perform(type, _, _) do
-    Logger.debug("Unknown task: #{type}")
-    {:error, "Don't know what to do with this"}
+  def enqueue_one(module, %{} = params) do
+    %{module: to_string(module), params: params}
+    |> Pleroma.Workers.Publisher.new()
+    |> Pleroma.Repo.insert()
   end
 
   @doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644 (file)
index 3db948c..0000000
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
-  use GenServer
-
-  require Logger
-
-  def init(args) do
-    queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
-    {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
-  end
-
-  def start_link do
-    enabled =
-      if Pleroma.Config.get(:env) == :test,
-        do: true,
-        else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
-    if enabled do
-      Logger.info("Starting retry queue")
-
-      linkres =
-        GenServer.start_link(
-          __MODULE__,
-          %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
-          name: __MODULE__
-        )
-
-      maybe_kickoff_timer()
-      linkres
-    else
-      Logger.info("Retry queue disabled")
-      :ignore
-    end
-  end
-
-  def enqueue(data, transport, retries \\ 0) do
-    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
-  end
-
-  def get_stats do
-    GenServer.call(__MODULE__, :get_stats)
-  end
-
-  def reset_stats do
-    GenServer.call(__MODULE__, :reset_stats)
-  end
-
-  def get_retry_params(retries) do
-    if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
-      {:drop, "Max retries reached"}
-    else
-      {:retry, growth_function(retries)}
-    end
-  end
-
-  def get_retry_timer_interval do
-    Pleroma.Config.get([:retry_queue, :interval], 1000)
-  end
-
-  defp ets_count_expires(table, current_time) do
-    :ets.select_count(
-      table,
-      [
-        {
-          {:"$1", :"$2"},
-          [{:"=<", :"$1", {:const, current_time}}],
-          [true]
-        }
-      ]
-    )
-  end
-
-  defp ets_pop_n_expired(table, current_time, desired) do
-    {popped, _continuation} =
-      :ets.select(
-        table,
-        [
-          {
-            {:"$1", :"$2"},
-            [{:"=<", :"$1", {:const, current_time}}],
-            [:"$_"]
-          }
-        ],
-        desired
-      )
-
-    popped
-    |> Enum.each(fn e ->
-      :ets.delete_object(table, e)
-    end)
-
-    popped
-  end
-
-  def maybe_start_job(running_jobs, queue_table) do
-    # we don't want to hit the ets or the DateTime more times than we have to
-    # could optimize slightly further by not using the count, and instead grabbing
-    # up to N objects early...
-    current_time = DateTime.to_unix(DateTime.utc_now())
-    n_running_jobs = :sets.size(running_jobs)
-
-    if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
-      n_ready_jobs = ets_count_expires(queue_table, current_time)
-
-      if n_ready_jobs > 0 do
-        # figure out how many we could start
-        available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
-        start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
-      else
-        running_jobs
-      end
-    else
-      running_jobs
-    end
-  end
-
-  defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
-    running_jobs
-  end
-
-  defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
-       when available_job_slots > 0 do
-    candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
-    candidates
-    |> List.foldl(running_jobs, fn {_, e}, rj ->
-      {:ok, pid} = Task.start(fn -> worker(e) end)
-      mref = Process.monitor(pid)
-      :sets.add_element(mref, rj)
-    end)
-  end
-
-  def worker({:send, data, transport, retries}) do
-    case transport.publish_one(data) do
-      {:ok, _} ->
-        GenServer.cast(__MODULE__, :inc_delivered)
-        :delivered
-
-      {:error, _reason} ->
-        enqueue(data, transport, retries)
-        :retry
-    end
-  end
-
-  def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
-    {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
-  end
-
-  def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
-    {:reply, %{delivered: delivery_count, dropped: drop_count},
-     %{state | delivered: 0, dropped: 0}}
-  end
-
-  def handle_cast(:reset_stats, state) do
-    {:noreply, %{state | delivered: 0, dropped: 0}}
-  end
-
-  def handle_cast(
-        {:maybe_enqueue, data, transport, retries},
-        %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
-      ) do
-    case get_retry_params(retries) do
-      {:retry, timeout} ->
-        :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
-        running_jobs = maybe_start_job(running_jobs, queue_table)
-        {:noreply, %{state | running_jobs: running_jobs}}
-
-      {:drop, message} ->
-        Logger.debug(message)
-        {:noreply, %{state | dropped: drop_count + 1}}
-    end
-  end
-
-  def handle_cast(:kickoff_timer, state) do
-    retry_interval = get_retry_timer_interval()
-    Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
-    {:noreply, state}
-  end
-
-  def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
-    {:noreply, %{state | delivered: delivery_count + 1}}
-  end
-
-  def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
-    {:noreply, %{state | dropped: drop_count + 1}}
-  end
-
-  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
-    case transport.publish_one(data) do
-      {:ok, _} ->
-        {:noreply, %{state | delivered: delivery_count + 1}}
-
-      {:error, _reason} ->
-        enqueue(data, transport, retries)
-        {:noreply, state}
-    end
-  end
-
-  def handle_info(
-        :retry_timer_run,
-        %{queue_table: queue_table, running_jobs: running_jobs} = state
-      ) do
-    maybe_kickoff_timer()
-    running_jobs = maybe_start_job(running_jobs, queue_table)
-    {:noreply, %{state | running_jobs: running_jobs}}
-  end
-
-  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
-    %{running_jobs: running_jobs, queue_table: queue_table} = state
-    running_jobs = :sets.del_element(ref, running_jobs)
-    running_jobs = maybe_start_job(running_jobs, queue_table)
-    {:noreply, %{state | running_jobs: running_jobs}}
-  end
-
-  def handle_info(unknown, state) do
-    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
-    {:noreply, state}
-  end
-
-  if Pleroma.Config.get(:env) == :test do
-    defp growth_function(_retries) do
-      _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
-      DateTime.to_unix(DateTime.utc_now()) - 1
-    end
-  else
-    defp growth_function(retries) do
-      round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
-        DateTime.to_unix(DateTime.utc_now())
-    end
-  end
-
-  defp maybe_kickoff_timer do
-    GenServer.cast(__MODULE__, :kickoff_timer)
-  end
-end
index 9b01ebcc642ea30acc7f09024bc5f7c9c0fea58b..bbaa293fd3105a4b0f3e617ffa45c33190528a79 100644 (file)
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
     end
   end
 
+  def publish_one(%{recipient_id: recipient_id} = params) do
+    recipient = User.get_by_id(recipient_id)
+
+    params
+    |> Map.delete(:recipient_id)
+    |> Map.put(:recipient, recipient)
+    |> publish_one()
+  end
+
   def publish_one(_), do: :noop
 
   @supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
         Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
 
         Publisher.enqueue_one(__MODULE__, %{
-          recipient: remote_user,
+          recipient_id: remote_user.id,
           feed: feed,
           unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
         })
diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex
new file mode 100644 (file)
index 0000000..6397948
--- /dev/null
@@ -0,0 +1,14 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Publisher do
+  use Oban.Worker, queue: "federator_outgoing", max_attempts: 5
+
+  @impl Oban.Worker
+  def perform(%Oban.Job{args: %{module: module_name, params: params}}) do
+    module_name
+    |> String.to_atom()
+    |> apply(:publish_one, [params])
+  end
+end
diff --git a/mix.exs b/mix.exs
index 2a8fe2e9da8b8e5289952ef69df58d90f06efc0c..1ca7a4a7722adbe42abfaecd85c66f0084494097 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do
       {:phoenix_ecto, "~> 4.0"},
       {:ecto_sql, "~> 3.1"},
       {:postgrex, ">= 0.13.5"},
+      {:oban, "~> 0.6"},
       {:gettext, "~> 0.15"},
       {:comeonin, "~> 4.1.1"},
       {:pbkdf2_elixir, "~> 0.12.3"},
index 65da7be8b53a8bfa30d14c0655ee1c78b1626632..8c0b9734e369f232a7945bb01e60adaa30a7d476 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -55,6 +55,7 @@
   "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
   "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
   "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
+  "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
   "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
   "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
   "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs
new file mode 100644 (file)
index 0000000..2f201bd
--- /dev/null
@@ -0,0 +1,6 @@
+defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
+  use Ecto.Migration
+
+  defdelegate up, to: Oban.Migrations
+  defdelegate down, to: Oban.Migrations
+end
index 556df45fd0f75a14276d8df6ddb54eb2207a0e14..70c376384947f53eb664b13fbbdeb10d3eec7aa2 100644 (file)
@@ -12,9 +12,9 @@ defmodule Pleroma.UserTest do
   alias Pleroma.Web.CommonAPI
 
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
 
   import Pleroma.Factory
-  import Mock
 
   setup_all do
     Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -1034,11 +1034,7 @@ defmodule Pleroma.UserTest do
       refute Activity.get_by_id(repeat.id)
     end
 
-    test_with_mock "it sends out User Delete activity",
-                   %{user: user},
-                   Pleroma.Web.ActivityPub.Publisher,
-                   [:passthrough],
-                   [] do
+    test "it sends out User Delete activity", %{user: user} do
       config_path = [:instance, :federating]
       initial_setting = Pleroma.Config.get(config_path)
       Pleroma.Config.put(config_path, true)
@@ -1048,11 +1044,8 @@ defmodule Pleroma.UserTest do
 
       {:ok, _user} = User.delete(user)
 
-      assert called(
-               Pleroma.Web.ActivityPub.Publisher.publish_one(%{
-                 inbox: "http://mastodon.example.org/inbox"
-               })
-             )
+      assert [%{args: %{"params" => %{"inbox" => "http://mastodon.example.org/inbox"}}}] =
+               all_enqueued(worker: Pleroma.Workers.Publisher)
 
       Pleroma.Config.put(config_path, initial_setting)
     end
index 36a39c84c9208a0f38e4eb9d86a2940fac6b4fe4..26d019878b77a7efaffd2b7526e15df58cb93fce 100644 (file)
@@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
       assert called(
                Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
                  inbox: "https://domain.com/users/nick1/inbox",
-                 actor: actor,
+                 actor_id: actor.id,
                  id: note_activity.data["id"]
                })
              )
index 6e143eee48f03d467e4865fd18af8b01c204db67..5c170454838785953c05ea79ca9b31462e09f80f 100644 (file)
@@ -6,7 +6,10 @@ defmodule Pleroma.Web.FederatorTest do
   alias Pleroma.Instances
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Federator
+
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
+
   import Pleroma.Factory
   import Mock
 
@@ -22,15 +25,6 @@ defmodule Pleroma.Web.FederatorTest do
     :ok
   end
 
-  describe "Publisher.perform" do
-    test "call `perform` with unknown task" do
-      assert {
-               :error,
-               "Don't know what to do with this"
-             } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok)
-    end
-  end
-
   describe "Publish an activity" do
     setup do
       user = insert(:user)
@@ -73,10 +67,7 @@ defmodule Pleroma.Web.FederatorTest do
   end
 
   describe "Targets reachability filtering in `publish`" do
-    test_with_mock "it federates only to reachable instances via AP",
-                   Pleroma.Web.ActivityPub.Publisher,
-                   [:passthrough],
-                   [] do
+    test "it federates only to reachable instances via AP" do
       user = insert(:user)
 
       {inbox1, inbox2} =
@@ -104,20 +95,13 @@ defmodule Pleroma.Web.FederatorTest do
       {:ok, _activity} =
         CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
 
-      assert called(
-               Pleroma.Web.ActivityPub.Publisher.publish_one(%{
-                 inbox: inbox1,
-                 unreachable_since: dt
-               })
-             )
+      expected_dt = NaiveDateTime.to_iso8601(dt)
 
-      refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2}))
+      assert [%{args: %{"params" => %{"inbox" => ^inbox1, "unreachable_since" => ^expected_dt}}}] =
+               all_enqueued(worker: Pleroma.Workers.Publisher)
     end
 
-    test_with_mock "it federates only to reachable instances via Websub",
-                   Pleroma.Web.Websub,
-                   [:passthrough],
-                   [] do
+    test "it federates only to reachable instances via Websub" do
       user = insert(:user)
       websub_topic = Pleroma.Web.OStatus.feed_path(user)
 
@@ -142,23 +126,25 @@ defmodule Pleroma.Web.FederatorTest do
 
       {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
 
-      assert called(
-               Pleroma.Web.Websub.publish_one(%{
-                 callback: sub2.callback,
-                 unreachable_since: dt
-               })
-             )
-
-      refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback}))
+      expected_callback = sub2.callback
+      expected_dt = NaiveDateTime.to_iso8601(dt)
+
+      assert [
+               %{
+                 args: %{
+                   "params" => %{
+                     "callback" => ^expected_callback,
+                     "unreachable_since" => ^expected_dt
+                   }
+                 }
+               }
+             ] = all_enqueued(worker: Pleroma.Workers.Publisher)
     end
 
-    test_with_mock "it federates only to reachable instances via Salmon",
-                   Pleroma.Web.Salmon,
-                   [:passthrough],
-                   [] do
+    test "it federates only to reachable instances via Salmon" do
       user = insert(:user)
 
-      remote_user1 =
+      _remote_user1 =
         insert(:user, %{
           local: false,
           nickname: "nick1@domain.com",
@@ -174,6 +160,8 @@ defmodule Pleroma.Web.FederatorTest do
           info: %{salmon: "https://domain2.com/salmon"}
         })
 
+      remote_user2_id = remote_user2.id
+
       dt = NaiveDateTime.utc_now()
       Instances.set_unreachable(remote_user2.ap_id, dt)
 
@@ -182,14 +170,18 @@ defmodule Pleroma.Web.FederatorTest do
       {:ok, _activity} =
         CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
 
-      assert called(
-               Pleroma.Web.Salmon.publish_one(%{
-                 recipient: remote_user2,
-                 unreachable_since: dt
-               })
-             )
-
-      refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1}))
+      expected_dt = NaiveDateTime.to_iso8601(dt)
+
+      assert [
+               %{
+                 args: %{
+                   "params" => %{
+                     "recipient_id" => ^remote_user2_id,
+                     "unreachable_since" => ^expected_dt
+                   }
+                 }
+               }
+             ] = all_enqueued(worker: Pleroma.Workers.Publisher)
     end
   end
 
diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs
deleted file mode 100644 (file)
index ecb3ce5..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule MockActivityPub do
-  def publish_one({ret, waiter}) do
-    send(waiter, :complete)
-    {ret, "success"}
-  end
-end
-
-defmodule Pleroma.Web.Federator.RetryQueueTest do
-  use Pleroma.DataCase
-  alias Pleroma.Web.Federator.RetryQueue
-
-  @small_retry_count 0
-  @hopeless_retry_count 10
-
-  setup do
-    RetryQueue.reset_stats()
-  end
-
-  test "RetryQueue responds to stats request" do
-    assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats()
-  end
-
-  test "failed posts are retried" do
-    {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
-
-    wait_task =
-      Task.async(fn ->
-        receive do
-          :complete -> :ok
-        end
-      end)
-
-    RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count)
-    Task.await(wait_task)
-    assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats()
-  end
-
-  test "posts that have been tried too many times are dropped" do
-    {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
-
-    RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count)
-    assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats()
-  end
-end
index e86e76fe931efdf6220cb46773da8cc60af8d772..0186f3fef541613be5efa352bf523ac424bd40c6 100644 (file)
@@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do
 
     Salmon.publish(user, activity)
 
-    assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user}))
+    assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))
   end
 end