[#1149] Oban jobs implementation for :federator_incoming and :federator_outgoing...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Fri, 9 Aug 2019 17:08:01 +0000 (20:08 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Fri, 9 Aug 2019 17:08:01 +0000 (20:08 +0300)
13 files changed:
config/config.exs
lib/pleroma/web/activity_pub/utils.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/publisher.ex
lib/pleroma/workers/publisher.ex
lib/pleroma/workers/receiver.ex [new file with mode: 0644]
lib/pleroma/workers/subscriber.ex [new file with mode: 0644]
test/activity_test.exs
test/support/oban_helpers.ex [new file with mode: 0644]
test/user_test.exs
test/web/activity_pub/activity_pub_controller_test.exs
test/web/federator_test.exs
test/web/websub/websub_test.exs

index 1bb325bf558e584f2b16495fcd48deaa8fca479b..5fd64365c2d575f75e1e0af9dca5a21e15578b0c 100644 (file)
@@ -458,6 +458,13 @@ config :pleroma, Oban,
   prune: {:maxage, 60 * 60 * 24 * 7},
   queues: job_queues
 
+config :pleroma, :workers,
+  retries: [
+    compile_time_default: 1,
+    federator_incoming: 5,
+    federator_outgoing: 5
+  ]
+
 config :pleroma, :fetch_initial_posts,
   enabled: false,
   pages: 5
index 39074888b5edc1d1910949c9dfc324edf37b4f92..f0917f9d470091e24eafcc2231a7a64648629014 100644 (file)
@@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
   """
   def maybe_federate(%Activity{local: true} = activity) do
     if Pleroma.Config.get!([:instance, :federating]) do
-      priority =
-        case activity.data["type"] do
-          "Delete" -> 10
-          "Create" -> 1
-          _ -> 5
-        end
-
-      Pleroma.Web.Federator.publish(activity, priority)
+      Pleroma.Web.Federator.publish(activity)
     end
 
     :ok
index 97ec9d549db5c68c5cb83fcc117e154854af5d1e..bb9eadfee6b620c5e24821ae46e4c4f4d6835f18 100644 (file)
@@ -3,22 +3,15 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Federator do
-  alias Pleroma.Activity
-  alias Pleroma.Object.Containment
-  alias Pleroma.User
-  alias Pleroma.Web.ActivityPub.ActivityPub
-  alias Pleroma.Web.ActivityPub.Transmogrifier
-  alias Pleroma.Web.ActivityPub.Utils
-  alias Pleroma.Web.Federator.Publisher
-  alias Pleroma.Web.OStatus
-  alias Pleroma.Web.Websub
+  alias Pleroma.Workers.Publisher, as: PublisherWorker
+  alias Pleroma.Workers.Receiver, as: ReceiverWorker
+  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
 
   require Logger
 
   def init do
     # 1 minute
-    Process.sleep(1000 * 60)
-    refresh_subscriptions()
+    refresh_subscriptions(schedule_in: 60)
   end
 
   @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -36,111 +29,50 @@ defmodule Pleroma.Web.Federator do
   # Client API
 
   def incoming_doc(doc) do
-    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+    %{"op" => "incoming_doc", "body" => doc}
+    |> ReceiverWorker.new(worker_args(:federator_incoming))
+    |> Pleroma.Repo.insert()
   end
 
   def incoming_ap_doc(params) do
-    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+    %{"op" => "incoming_ap_doc", "params" => params}
+    |> ReceiverWorker.new(worker_args(:federator_incoming))
+    |> Pleroma.Repo.insert()
   end
 
-  def publish(activity, priority \\ 1) do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+  def publish(%{id: "pleroma:fakeid"} = activity) do
+    PublisherWorker.perform_publish(activity)
   end
 
-  def verify_websub(websub) do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
-  end
-
-  def request_subscription(sub) do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
-  end
-
-  def refresh_subscriptions do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
-  end
-
-  # Job Worker Callbacks
-
-  def perform(:refresh_subscriptions) do
-    Logger.debug("Federator running refresh subscriptions")
-    Websub.refresh_subscriptions()
-
-    spawn(fn ->
-      # 6 hours
-      Process.sleep(1000 * 60 * 60 * 6)
-      refresh_subscriptions()
-    end)
-  end
-
-  def perform(:request_subscription, websub) do
-    Logger.debug("Refreshing #{websub.topic}")
-
-    with {:ok, websub} <- Websub.request_subscription(websub) do
-      Logger.debug("Successfully refreshed #{websub.topic}")
-    else
-      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
-    end
+  def publish(activity) do
+    %{"op" => "publish", "activity_id" => activity.id}
+    |> PublisherWorker.new(worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def perform(:publish, activity) do
-    Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
-
-    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
-         {:ok, actor} <- User.ensure_keys_present(actor) do
-      Publisher.publish(actor, activity)
-    end
-  end
-
-  def perform(:verify_websub, websub) do
-    Logger.debug(fn ->
-      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
-    end)
-
-    Websub.verify(websub)
-  end
-
-  def perform(:incoming_doc, doc) do
-    Logger.info("Got document, trying to parse")
-    OStatus.handle_incoming(doc)
+  def verify_websub(websub) do
+    %{"op" => "verify_websub", "websub_id" => websub.id}
+    |> SubscriberWorker.new(worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def perform(:incoming_ap_doc, params) do
-    Logger.info("Handling incoming AP activity")
-
-    params = Utils.normalize_params(params)
-
-    # NOTE: we use the actor ID to do the containment, this is fine because an
-    # actor shouldn't be acting on objects outside their own AP server.
-    with {:ok, _user} <- ap_enabled_actor(params["actor"]),
-         nil <- Activity.normalize(params["id"]),
-         :ok <- Containment.contain_origin_from_id(params["actor"], params),
-         {:ok, activity} <- Transmogrifier.handle_incoming(params) do
-      {:ok, activity}
-    else
-      %Activity{} ->
-        Logger.info("Already had #{params["id"]}")
-        :error
-
-      _e ->
-        # Just drop those for now
-        Logger.info("Unhandled activity")
-        Logger.info(Jason.encode!(params, pretty: true))
-        :error
-    end
+  def request_subscription(websub) do
+    %{"op" => "request_subscription", "websub_id" => websub.id}
+    |> SubscriberWorker.new(worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def perform(type, _) do
-    Logger.debug(fn -> "Unknown task: #{type}" end)
-    {:error, "Don't know what to do with this"}
+  def refresh_subscriptions(worker_args \\ []) do
+    %{"op" => "refresh_subscriptions"}
+    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def ap_enabled_actor(id) do
-    user = User.get_cached_by_ap_id(id)
-
-    if User.ap_enabled?(user) do
-      {:ok, user}
+  defp worker_args(queue) do
+    if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
+      [max_attempts: max_attempts]
     else
-      ActivityPub.make_user_from_ap_id(id)
+      []
     end
   end
 end
index e8c1bf17f26c36e481c87bc8c8cee4b627edbd9c..05d2be61558381b3fe3cb54af62958a3ebb6ea86 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
   alias Pleroma.Activity
   alias Pleroma.Config
   alias Pleroma.User
+  alias Pleroma.Workers.Publisher, as: PublisherWorker
 
   require Logger
 
@@ -30,8 +31,15 @@ defmodule Pleroma.Web.Federator.Publisher do
   """
   @spec enqueue_one(module(), Map.t()) :: :ok
   def enqueue_one(module, %{} = params) do
-    %{module: to_string(module), params: params}
-    |> Pleroma.Workers.Publisher.new()
+    worker_args =
+      if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
+        [max_attempts: max_attempts]
+      else
+        []
+      end
+
+    %{"op" => "publish_one", "module" => to_string(module), "params" => params}
+    |> PublisherWorker.new(worker_args)
     |> Pleroma.Repo.insert()
   end
 
index 6397948308c5983b79a256f37eb44c529ec4b394..67871977a82254b4500fcf6bb274d7efd277eed1 100644 (file)
@@ -3,12 +3,33 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Workers.Publisher do
-  use Oban.Worker, queue: "federator_outgoing", max_attempts: 5
+  alias Pleroma.Activity
+  alias Pleroma.User
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "federator_outgoing",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
 
   @impl Oban.Worker
-  def perform(%Oban.Job{args: %{module: module_name, params: params}}) do
+  def perform(%{"op" => "publish", "activity_id" => activity_id}) do
+    with %Activity{} = activity <- Activity.get_by_id(activity_id) do
+      perform_publish(activity)
+    else
+      _ -> raise "Non-existing activity: #{activity_id}"
+    end
+  end
+
+  def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
     module_name
     |> String.to_atom()
     |> apply(:publish_one, [params])
   end
+
+  def perform_publish(%Activity{} = activity) do
+    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
+         {:ok, actor} <- User.ensure_keys_present(actor) do
+      Pleroma.Web.Federator.Publisher.publish(actor, activity)
+    end
+  end
 end
diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex
new file mode 100644 (file)
index 0000000..43558b4
--- /dev/null
@@ -0,0 +1,61 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Receiver do
+  alias Pleroma.Activity
+  alias Pleroma.Object.Containment
+  alias Pleroma.User
+  alias Pleroma.Web.ActivityPub.ActivityPub
+  alias Pleroma.Web.ActivityPub.Transmogrifier
+  alias Pleroma.Web.ActivityPub.Utils
+  alias Pleroma.Web.OStatus
+
+  require Logger
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "federator_incoming",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "incoming_doc", "body" => doc}) do
+    Logger.info("Got incoming document, trying to parse")
+    OStatus.handle_incoming(doc)
+  end
+
+  def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
+    Logger.info("Handling incoming AP activity")
+
+    params = Utils.normalize_params(params)
+
+    # NOTE: we use the actor ID to do the containment, this is fine because an
+    # actor shouldn't be acting on objects outside their own AP server.
+    with {:ok, _user} <- ap_enabled_actor(params["actor"]),
+         nil <- Activity.normalize(params["id"]),
+         :ok <- Containment.contain_origin_from_id(params["actor"], params),
+         {:ok, activity} <- Transmogrifier.handle_incoming(params) do
+      {:ok, activity}
+    else
+      %Activity{} ->
+        Logger.info("Already had #{params["id"]}")
+        :error
+
+      _e ->
+        # Just drop those for now
+        Logger.info("Unhandled activity")
+        Logger.info(Jason.encode!(params, pretty: true))
+        :error
+    end
+  end
+
+  defp ap_enabled_actor(id) do
+    user = User.get_cached_by_ap_id(id)
+
+    if User.ap_enabled?(user) do
+      {:ok, user}
+    else
+      ActivityPub.make_user_from_ap_id(id)
+    end
+  end
+end
diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex
new file mode 100644 (file)
index 0000000..a8c01bb
--- /dev/null
@@ -0,0 +1,44 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Subscriber do
+  alias Pleroma.Repo
+  alias Pleroma.Web.Websub
+  alias Pleroma.Web.Websub.WebsubClientSubscription
+
+  require Logger
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "federator_outgoing",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "refresh_subscriptions"}) do
+    Websub.refresh_subscriptions()
+    # Schedule the next run in 6 hours
+    Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6)
+  end
+
+  def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
+    websub = Repo.get(WebsubClientSubscription, websub_id)
+    Logger.debug("Refreshing #{websub.topic}")
+
+    with {:ok, websub} <- Websub.request_subscription(websub) do
+      Logger.debug("Successfully refreshed #{websub.topic}")
+    else
+      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
+    end
+  end
+
+  def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
+    websub = Repo.get(WebsubClientSubscription, websub_id)
+
+    Logger.debug(fn ->
+      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+    end)
+
+    Websub.verify(websub)
+  end
+end
index b27f6fd369ac82f20f07ee6f84445eafdf12cf1d..b9c12adb21b50fccdbf6cf8c5175ff19ead48660 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.ActivityTest do
   use Pleroma.DataCase
   alias Pleroma.Activity
   alias Pleroma.Bookmark
+  alias Pleroma.ObanHelpers
   alias Pleroma.Object
   alias Pleroma.ThreadMute
   import Pleroma.Factory
@@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do
       }
 
       {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"})
-      {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params)
+      {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params)
+      {:ok, remote_activity} = ObanHelpers.perform(job)
       %{local_activity: local_activity, remote_activity: remote_activity, user: user}
     end
 
diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex
new file mode 100644 (file)
index 0000000..54b5a95
--- /dev/null
@@ -0,0 +1,36 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ObanHelpers do
+  @moduledoc """
+  Oban test helpers.
+  """
+
+  alias Pleroma.Repo
+
+  def perform(%Oban.Job{} = job) do
+    res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job])
+    Repo.delete(job)
+    res
+  end
+
+  def perform(jobs) when is_list(jobs) do
+    for job <- jobs, do: perform(job)
+  end
+
+  def member?(%{} = job_args, jobs) when is_list(jobs) do
+    Enum.any?(jobs, fn job ->
+      member?(job_args, job.args)
+    end)
+  end
+
+  def member?(%{} = test_attrs, %{} = attrs) do
+    Enum.all?(
+      test_attrs,
+      fn {k, _v} -> member?(test_attrs[k], attrs[k]) end
+    )
+  end
+
+  def member?(x, y), do: x == y
+end
index 70c376384947f53eb664b13fbbdeb10d3eec7aa2..ee6d8e8f370bdc023c60bc6f4cd29820dd4374b8 100644 (file)
@@ -5,6 +5,7 @@
 defmodule Pleroma.UserTest do
   alias Pleroma.Activity
   alias Pleroma.Builders.UserBuilder
+  alias Pleroma.ObanHelpers
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.User
@@ -1044,8 +1045,16 @@ defmodule Pleroma.UserTest do
 
       {:ok, _user} = User.delete(user)
 
-      assert [%{args: %{"params" => %{"inbox" => "http://mastodon.example.org/inbox"}}}] =
+      assert ObanHelpers.member?(
+               %{
+                 "op" => "publish_one",
+                 "params" => %{
+                   "inbox" => "http://mastodon.example.org/inbox",
+                   "id" => "pleroma:fakeid"
+                 }
+               },
                all_enqueued(worker: Pleroma.Workers.Publisher)
+             )
 
       Pleroma.Config.put(config_path, initial_setting)
     end
index 40344f17ea8c076fdb57a6fdea4c1b5f0ca39829..1d809164faba5a8dbeb6fc6db6754040a6262482 100644 (file)
@@ -4,15 +4,19 @@
 
 defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
   use Pleroma.Web.ConnCase
+  use Oban.Testing, repo: Pleroma.Repo
+
   import Pleroma.Factory
   alias Pleroma.Activity
   alias Pleroma.Instances
+  alias Pleroma.ObanHelpers
   alias Pleroma.Object
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ObjectView
   alias Pleroma.Web.ActivityPub.UserView
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.CommonAPI
+  alias Pleroma.Workers.Receiver, as: ReceiverWorker
 
   setup_all do
     Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -232,7 +236,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/inbox", data)
 
       assert "ok" == json_response(conn, 200)
-      :timer.sleep(500)
+
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
       assert Activity.get_by_ap_id(data["id"])
     end
 
@@ -274,7 +279,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/users/#{user.nickname}/inbox", data)
 
       assert "ok" == json_response(conn, 200)
-      :timer.sleep(500)
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
       assert Activity.get_by_ap_id(data["id"])
     end
 
@@ -303,7 +308,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/users/#{recipient.nickname}/inbox", data)
 
       assert "ok" == json_response(conn, 200)
-      :timer.sleep(500)
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
       assert Activity.get_by_ap_id(data["id"])
     end
 
@@ -382,6 +387,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
       |> post("/users/#{recipient.nickname}/inbox", data)
       |> json_response(200)
 
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
+
       activity = Activity.get_by_ap_id(data["id"])
 
       assert activity.id
@@ -457,6 +464,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/users/#{user.nickname}/outbox", data)
 
       result = json_response(conn, 201)
+
       assert Activity.get_by_ap_id(result["id"])
     end
 
index 5c170454838785953c05ea79ca9b31462e09f80f..ebe962da2958834134658fc4d1128b65e727f647 100644 (file)
@@ -4,8 +4,10 @@
 
 defmodule Pleroma.Web.FederatorTest do
   alias Pleroma.Instances
+  alias Pleroma.ObanHelpers
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Federator
+  alias Pleroma.Workers.Publisher, as: PublisherWorker
 
   use Pleroma.DataCase
   use Oban.Testing, repo: Pleroma.Repo
@@ -45,6 +47,7 @@ defmodule Pleroma.Web.FederatorTest do
     } do
       with_mocks([relay_mock]) do
         Federator.publish(activity)
+        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
       end
 
       assert_received :relay_publish
@@ -58,6 +61,7 @@ defmodule Pleroma.Web.FederatorTest do
 
       with_mocks([relay_mock]) do
         Federator.publish(activity)
+        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
       end
 
       refute_received :relay_publish
@@ -97,8 +101,15 @@ defmodule Pleroma.Web.FederatorTest do
 
       expected_dt = NaiveDateTime.to_iso8601(dt)
 
-      assert [%{args: %{"params" => %{"inbox" => ^inbox1, "unreachable_since" => ^expected_dt}}}] =
-               all_enqueued(worker: Pleroma.Workers.Publisher)
+      ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
+
+      assert ObanHelpers.member?(
+               %{
+                 "op" => "publish_one",
+                 "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt}
+               },
+               all_enqueued(worker: PublisherWorker)
+             )
     end
 
     test "it federates only to reachable instances via Websub" do
@@ -129,16 +140,18 @@ defmodule Pleroma.Web.FederatorTest do
       expected_callback = sub2.callback
       expected_dt = NaiveDateTime.to_iso8601(dt)
 
-      assert [
+      ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
+
+      assert ObanHelpers.member?(
                %{
-                 args: %{
-                   "params" => %{
-                     "callback" => ^expected_callback,
-                     "unreachable_since" => ^expected_dt
-                   }
+                 "op" => "publish_one",
+                 "params" => %{
+                   "callback" => expected_callback,
+                   "unreachable_since" => expected_dt
                  }
-               }
-             ] = all_enqueued(worker: Pleroma.Workers.Publisher)
+               },
+               all_enqueued(worker: PublisherWorker)
+             )
     end
 
     test "it federates only to reachable instances via Salmon" do
@@ -172,16 +185,18 @@ defmodule Pleroma.Web.FederatorTest do
 
       expected_dt = NaiveDateTime.to_iso8601(dt)
 
-      assert [
+      ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
+
+      assert ObanHelpers.member?(
                %{
-                 args: %{
-                   "params" => %{
-                     "recipient_id" => ^remote_user2_id,
-                     "unreachable_since" => ^expected_dt
-                   }
+                 "op" => "publish_one",
+                 "params" => %{
+                   "recipient_id" => remote_user2_id,
+                   "unreachable_since" => expected_dt
                  }
-               }
-             ] = all_enqueued(worker: Pleroma.Workers.Publisher)
+               },
+               all_enqueued(worker: PublisherWorker)
+             )
     end
   end
 
@@ -201,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do
         "to" => ["https://www.w3.org/ns/activitystreams#Public"]
       }
 
-      {:ok, _activity} = Federator.incoming_ap_doc(params)
+      assert {:ok, job} = Federator.incoming_ap_doc(params)
+      assert {:ok, _activity} = ObanHelpers.perform(job)
     end
 
     test "rejects incoming AP docs with incorrect origin" do
@@ -219,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do
         "to" => ["https://www.w3.org/ns/activitystreams#Public"]
       }
 
-      :error = Federator.incoming_ap_doc(params)
+      assert {:ok, job} = Federator.incoming_ap_doc(params)
+      assert :error = ObanHelpers.perform(job)
     end
   end
 end
index 74386d7dbb98a149fbb8afa1d2e3106782c79d28..b704a558a8c016960d288aeb288369759f2eef93 100644 (file)
@@ -4,11 +4,14 @@
 
 defmodule Pleroma.Web.WebsubTest do
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
 
+  alias Pleroma.ObanHelpers
   alias Pleroma.Web.Router.Helpers
   alias Pleroma.Web.Websub
   alias Pleroma.Web.Websub.WebsubClientSubscription
   alias Pleroma.Web.Websub.WebsubServerSubscription
+  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
 
   import Pleroma.Factory
   import Tesla.Mock
@@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do
         })
 
       _refresh = Websub.refresh_subscriptions()
+      ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))
 
       assert still_good == Repo.get(WebsubClientSubscription, still_good.id)
       refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)