Merge remote-tracking branch 'remotes/upstream/develop' into 1149-oban-job-queue
authorIvan Tashkinov <ivantashkinov@gmail.com>
Wed, 14 Aug 2019 18:44:50 +0000 (21:44 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Wed, 14 Aug 2019 18:44:50 +0000 (21:44 +0300)
52 files changed:
config/config.exs
config/test.exs
docs/config.md
lib/pleroma/application.ex
lib/pleroma/digest_email_worker.ex
lib/pleroma/emails/mailer.ex
lib/pleroma/instances/instance.ex
lib/pleroma/scheduled_activity_worker.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
lib/pleroma/web/activity_pub/publisher.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/activity_pub/utils.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/publisher.ex
lib/pleroma/web/federator/retry_queue.ex [deleted file]
lib/pleroma/web/oauth/token/clean_worker.ex
lib/pleroma/web/push/push.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/web/twitter_api/controllers/util_controller.ex
lib/pleroma/workers/background_worker.ex [new file with mode: 0644]
lib/pleroma/workers/helper.ex [new file with mode: 0644]
lib/pleroma/workers/mailer.ex [new file with mode: 0644]
lib/pleroma/workers/publisher.ex [new file with mode: 0644]
lib/pleroma/workers/receiver.ex [new file with mode: 0644]
lib/pleroma/workers/scheduled_activity_worker.ex [new file with mode: 0644]
lib/pleroma/workers/subscriber.ex [new file with mode: 0644]
lib/pleroma/workers/transmogrifier.ex [new file with mode: 0644]
lib/pleroma/workers/web_pusher.ex [new file with mode: 0644]
mix.exs
mix.lock
priv/repo/migrations/20190730055101_add_oban_jobs_table.exs [new file with mode: 0644]
test/activity_test.exs
test/conversation_test.exs
test/mix/tasks/pleroma.digest_test.exs
test/notification_test.exs
test/support/oban_helpers.ex [new file with mode: 0644]
test/user_test.exs
test/web/activity_pub/activity_pub_controller_test.exs
test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs
test/web/activity_pub/publisher_test.exs
test/web/activity_pub/transmogrifier_test.exs
test/web/federator_test.exs
test/web/instances/instance_test.exs
test/web/mastodon_api/mastodon_api_controller_test.exs
test/web/retry_queue_test.exs [deleted file]
test/web/salmon/salmon_test.exs
test/web/twitter_api/twitter_api_controller_test.exs
test/web/twitter_api/twitter_api_test.exs
test/web/twitter_api/util_controller_test.exs
test/web/websub/websub_test.exs

index 75866112058d1a61f1cd5ab1e54f0e11b02281e4..80517646154b68144e62f3d0412c1ef7c1cef2b2 100644 (file)
@@ -445,13 +445,7 @@ config :pleroma, Pleroma.User,
     "web"
   ]
 
-config :pleroma, Pleroma.Web.Federator.RetryQueue,
-  enabled: false,
-  max_jobs: 20,
-  initial_timeout: 30,
-  max_retries: 5
-
-config :pleroma_job_queue, :queues,
+job_queues = [
   federator_incoming: 50,
   federator_outgoing: 50,
   web_push: 50,
@@ -459,6 +453,22 @@ config :pleroma_job_queue, :queues,
   transmogrifier: 20,
   scheduled_activities: 10,
   background: 5
+]
+
+config :pleroma_job_queue, :queues, job_queues
+
+config :pleroma, Oban,
+  repo: Pleroma.Repo,
+  verbose: false,
+  prune: {:maxage, 60 * 60 * 24 * 7},
+  queues: job_queues
+
+config :pleroma, :workers,
+  retries: [
+    compile_time_default: 1,
+    federator_incoming: 5,
+    federator_outgoing: 5
+  ]
 
 config :pleroma, :fetch_initial_posts,
   enabled: false,
index 6f75f39b552cdd155cd77e524ebee9cd6ab70e8b..f897b5d48e62d23f6457bf40f488f5a33412b4f5 100644 (file)
@@ -63,6 +63,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
 
 config :pleroma_job_queue, disabled: true
 
+config :pleroma, Oban,
+  queues: false,
+  prune: :disabled
+
 config :pleroma, Pleroma.ScheduledActivity,
   daily_user_limit: 2,
   total_user_limit: 3,
index 20311db541a75dfc9edb53f8ccd8125550dff29a..a558a51d9f76d8fb0f90b6564228ef61940e627d 100644 (file)
@@ -419,13 +419,6 @@ config :pleroma_job_queue, :queues,
 
 This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
 
-## Pleroma.Web.Federator.RetryQueue
-
-* `enabled`: If set to `true`, failed federation jobs will be retried
-* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
-* `initial_timeout`: The initial timeout in seconds
-* `max_retries`: The maximum number of times a federation job is retried
-
 ## Pleroma.Web.Metadata
 * `providers`: a list of metadata providers to enable. Providers available:
   * Pleroma.Web.Metadata.Providers.OpenGraph
index 00b06f723ab224bc5e86a8cf46f4ae546bb72c4e..7cf60f44a961d846fda1df23581ef85969135d01 100644 (file)
@@ -120,8 +120,8 @@ defmodule Pleroma.Application do
         hackney_pool_children() ++
         [
           %{
-            id: Pleroma.Web.Federator.RetryQueue,
-            start: {Pleroma.Web.Federator.RetryQueue, :start_link, []}
+            id: Oban,
+            start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
           },
           %{
             id: Pleroma.Web.OAuth.Token.CleanWorker,
@@ -233,6 +233,7 @@ defmodule Pleroma.Application do
   defp after_supervisor_start do
     with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
          true <- digest_config[:active] do
+      # TODO: consider replacing with `quantum` scheduler
       PleromaJobQueue.schedule(
         digest_config[:schedule],
         :digest_emails,
index 18e67d39b07cd5d7176f36c95cdd84b539f1299c..6e44cc955aa2b22d3008bdd54a28cd487e3f58d6 100644 (file)
@@ -1,7 +1,10 @@
 defmodule Pleroma.DigestEmailWorker do
+  alias Pleroma.Repo
+  alias Pleroma.Workers.Mailer, as: MailerWorker
+
   import Ecto.Query
 
-  @queue_name :digest_emails
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
 
   def perform do
     config = Pleroma.Config.get([:email_notifications, :digest])
@@ -17,7 +20,11 @@ defmodule Pleroma.DigestEmailWorker do
       select: u
     )
     |> Pleroma.Repo.all()
-    |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1]))
+    |> Enum.each(fn user ->
+      %{"op" => "digest_email", "user_id" => user.id}
+      |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
+      |> Repo.insert()
+    end)
   end
 
   @doc """
index 2e4657b7c33b4a1ce9762f5a0e6facc80dfe9e53..bb534f6026cd31860e490d3d86f9bff7447cb398 100644 (file)
@@ -9,6 +9,8 @@ defmodule Pleroma.Emails.Mailer do
   The module contains functions to delivery email using Swoosh.Mailer.
   """
 
+  alias Pleroma.Repo
+  alias Pleroma.Workers.Mailer, as: MailerWorker
   alias Swoosh.DeliveryError
 
   @otp_app :pleroma
@@ -17,9 +19,18 @@ defmodule Pleroma.Emails.Mailer do
   @spec enabled?() :: boolean()
   def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   @doc "add email to queue"
   def deliver_async(email, config \\ []) do
-    PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+    encoded_email =
+      email
+      |> :erlang.term_to_binary()
+      |> Base.encode64()
+
+    %{"op" => "email", "encoded_email" => encoded_email, "config" => config}
+    |> MailerWorker.new(worker_args(:mailer))
+    |> Repo.insert()
   end
 
   @doc "callback to perform send email from queue"
index 4d7ed4ca1076abe35b697f2e29002f739b2124c8..544c4b687c8d5e0a9c13a3247fa8e731609b0188 100644 (file)
@@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do
   def set_unreachable(url_or_host, unreachable_since \\ nil)
 
   def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
-    unreachable_since = unreachable_since || DateTime.utc_now()
+    unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
     host = host(url_or_host)
     existing_record = Repo.get_by(Instance, %{host: host})
 
@@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do
   end
 
   def set_unreachable(_, _), do: {:error, nil}
+
+  defp parse_datetime(datetime) when is_binary(datetime) do
+    NaiveDateTime.from_iso8601(datetime)
+  end
+
+  defp parse_datetime(datetime), do: datetime
 end
index 65b38622f212e2399ddecf9dc1d83af593bdb296..cabea51ca98b7b67a88be061bc7b308dd3ad6575 100644 (file)
@@ -8,14 +8,18 @@ defmodule Pleroma.ScheduledActivityWorker do
   """
 
   alias Pleroma.Config
+  alias Pleroma.Repo
   alias Pleroma.ScheduledActivity
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
+
   use GenServer
   require Logger
 
   @schedule_interval :timer.minutes(1)
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   def start_link do
     GenServer.start_link(__MODULE__, nil)
   end
@@ -45,7 +49,9 @@ defmodule Pleroma.ScheduledActivityWorker do
   def handle_info(:perform, state) do
     ScheduledActivity.due_activities(@schedule_interval)
     |> Enum.each(fn scheduled_activity ->
-      PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+      %{"op" => "execute", "activity_id" => scheduled_activity.id}
+      |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
+      |> Repo.insert()
     end)
 
     schedule_next()
index b67743846877596a4fffe812f3c76e173f7ea236..32fde2b6bbd8fef1d98780a1d376af7678b034df 100644 (file)
@@ -26,6 +26,7 @@ defmodule Pleroma.User do
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.RelMe
   alias Pleroma.Web.Websub
+  alias Pleroma.Workers.BackgroundWorker
 
   require Logger
 
@@ -39,6 +40,8 @@ defmodule Pleroma.User do
   @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
   @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   schema "users" do
     field(:bio, :string)
     field(:email, :string)
@@ -583,8 +586,11 @@ defmodule Pleroma.User do
   end
 
   @doc "Fetch some posts when the user has just been federated with"
-  def fetch_initial_posts(user),
-    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+  def fetch_initial_posts(user) do
+    %{"op" => "fetch_initial_posts", "user_id" => user.id}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
   @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
   def get_followers_query(%User{} = user, nil) do
@@ -1005,7 +1011,9 @@ defmodule Pleroma.User do
   end
 
   def deactivate_async(user, status \\ true) do
-    PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+    %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
   end
 
   def deactivate(%User{} = user, status \\ true) do
@@ -1033,9 +1041,11 @@ defmodule Pleroma.User do
     |> update_and_set_cache()
   end
 
-  @spec delete(User.t()) :: :ok
-  def delete(%User{} = user),
-    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+  def delete(%User{} = user) do
+    %{"op" => "delete_user", "user_id" => user.id}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
   @spec perform(atom(), User.t()) :: {:ok, User.t()}
   def perform(:delete, %User{} = user) do
@@ -1142,21 +1152,26 @@ defmodule Pleroma.User do
     Repo.all(query)
   end
 
-  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
-    do:
-      PleromaJobQueue.enqueue(:background, __MODULE__, [
-        :blocks_import,
-        blocker,
-        blocked_identifiers
-      ])
-
-  def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
-    do:
-      PleromaJobQueue.enqueue(:background, __MODULE__, [
-        :follow_import,
-        follower,
-        followed_identifiers
-      ])
+  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+    %{
+      "op" => "blocks_import",
+      "blocker_id" => blocker.id,
+      "blocked_identifiers" => blocked_identifiers
+    }
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
+
+  def follow_import(%User{} = follower, followed_identifiers)
+      when is_list(followed_identifiers) do
+    %{
+      "op" => "follow_import",
+      "follower_id" => follower.id,
+      "followed_identifiers" => followed_identifiers
+    }
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
   def delete_user_activities(%User{ap_id: ap_id} = user) do
     ap_id
index cf55c95203fc2b7cd119f8774567464ccacb96f6..23374604747dd7f527a8fbd59c3501dc3eb447f8 100644 (file)
@@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   alias Pleroma.Web.ActivityPub.MRF
   alias Pleroma.Web.ActivityPub.Transmogrifier
   alias Pleroma.Web.WebFinger
+  alias Pleroma.Workers.BackgroundWorker
 
   import Ecto.Query
   import Pleroma.Web.ActivityPub.Utils
@@ -25,6 +26,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   require Logger
   require Pleroma.Constants
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   # For Announce activities, we filter the recipients based on following status for any actors
   # that match actual users.  See issue #164 for more information about why this is necessary.
   defp get_recipients(%{"type" => "Announce"} = data) do
@@ -145,7 +148,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           activity
         end
 
-      PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
+      %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
+      |> BackgroundWorker.new(worker_args(:background))
+      |> Repo.insert()
 
       Notification.create_notifications(activity)
 
index a179dd54d3850f9987d1fa13968d49039d043f41..b188164ee36cd6cad90f66f3f9c8379ed5cf0a7c 100644 (file)
@@ -7,7 +7,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
   @behaviour Pleroma.Web.ActivityPub.MRF
 
   alias Pleroma.HTTP
+  alias Pleroma.Repo
   alias Pleroma.Web.MediaProxy
+  alias Pleroma.Workers.BackgroundWorker
 
   require Logger
 
@@ -16,6 +18,8 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
     recv_timeout: 10_000
   ]
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   def perform(:prefetch, url) do
     Logger.info("Prefetching #{inspect(url)}")
 
@@ -30,7 +34,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
         url
         |> Enum.each(fn
           %{"href" => href} ->
-            PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href])
+            %{"op" => "media_proxy_prefetch", "url" => href}
+            |> BackgroundWorker.new(worker_args(:background))
+            |> Repo.insert()
 
           x ->
             Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -46,7 +52,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
         %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
       )
       when is_list(attachments) and length(attachments) > 0 do
-    PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message])
+    %{"op" => "media_proxy_preload", "message" => message}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
 
     {:ok, message}
   end
index 46edab0bd6c9e7144013b18c890d87adf34ef92e..29f3221d1a8d1c9718d68578fea5675e0d57d3eb 100644 (file)
@@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
     end
   end
 
+  def publish_one(%{actor_id: actor_id} = params) do
+    actor = User.get_by_id(actor_id)
+
+    params
+    |> Map.delete(:actor_id)
+    |> Map.put(:actor, actor)
+    |> publish_one()
+  end
+
   defp should_federate?(inbox, public) do
     if public do
       true
@@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
   Publishes an activity with BCC to all relevant peers.
   """
 
-  def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+  def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+      when is_list(bcc) and bcc != [] do
     public = is_public?(activity)
     {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
 
@@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
       Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
         inbox: inbox,
         json: json,
-        actor: actor,
+        actor_id: actor.id,
         id: activity.data["id"],
         unreachable_since: unreachable_since
       })
@@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
         %{
           inbox: inbox,
           json: json,
-          actor: actor,
+          actor_id: actor.id,
           id: activity.data["id"],
           unreachable_since: unreachable_since
         }
index 0fcc81bf3c03da529c95ea3e6c2b790d6ecc2de6..2cb7ca8d18d3900d34feedf5a510c623e3e8e566 100644 (file)
@@ -15,12 +15,15 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.ActivityPub.Visibility
   alias Pleroma.Web.Federator
+  alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker
 
   import Ecto.Query
 
   require Logger
   require Pleroma.Constants
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   @doc """
   Modifies an incoming AP object (mastodon format) to our internal format.
   """
@@ -1043,7 +1046,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
          already_ap <- User.ap_enabled?(user),
          {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
       unless already_ap do
-        PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
+        %{"op" => "user_upgrade", "user_id" => user.id}
+        |> TransmogrifierWorker.new(worker_args(:transmogrifier))
+        |> Repo.insert()
       end
 
       {:ok, user}
index fc5305c589a426f9965db3a20aa78e6271aea276..4f68acc78c4c98b95f42f83c2c636d837397f85d 100644 (file)
@@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
   """
   def maybe_federate(%Activity{local: true} = activity) do
     if Pleroma.Config.get!([:instance, :federating]) do
-      priority =
-        case activity.data["type"] do
-          "Delete" -> 10
-          "Create" -> 1
-          _ -> 5
-        end
-
-      Pleroma.Web.Federator.publish(activity, priority)
+      Pleroma.Web.Federator.publish(activity)
     end
 
     :ok
index f4f9e83e06ab55f71b715301631865bc46b11c00..d85fe824fd8f7cd46dca214083ef0baef6e4a2f6 100644 (file)
@@ -10,16 +10,19 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.Web.ActivityPub.Transmogrifier
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.Federator.Publisher
-  alias Pleroma.Web.Federator.RetryQueue
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub
+  alias Pleroma.Workers.Publisher, as: PublisherWorker
+  alias Pleroma.Workers.Receiver, as: ReceiverWorker
+  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
 
   require Logger
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   def init do
     # 1 minute
-    Process.sleep(1000 * 60)
-    refresh_subscriptions()
+    refresh_subscriptions(schedule_in: 60)
   end
 
   @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -37,50 +40,50 @@ defmodule Pleroma.Web.Federator do
   # Client API
 
   def incoming_doc(doc) do
-    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+    %{"op" => "incoming_doc", "body" => doc}
+    |> ReceiverWorker.new(worker_args(:federator_incoming))
+    |> Pleroma.Repo.insert()
   end
 
   def incoming_ap_doc(params) do
-    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+    %{"op" => "incoming_ap_doc", "params" => params}
+    |> ReceiverWorker.new(worker_args(:federator_incoming))
+    |> Pleroma.Repo.insert()
   end
 
-  def publish(activity, priority \\ 1) do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+  def publish(%{id: "pleroma:fakeid"} = activity) do
+    perform(:publish, activity)
   end
 
-  def verify_websub(websub) do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+  def publish(activity) do
+    %{"op" => "publish", "activity_id" => activity.id}
+    |> PublisherWorker.new(worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def request_subscription(sub) do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+  def verify_websub(websub) do
+    %{"op" => "verify_websub", "websub_id" => websub.id}
+    |> SubscriberWorker.new(worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def refresh_subscriptions do
-    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+  def request_subscription(websub) do
+    %{"op" => "request_subscription", "websub_id" => websub.id}
+    |> SubscriberWorker.new(worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  # Job Worker Callbacks
-
-  def perform(:refresh_subscriptions) do
-    Logger.debug("Federator running refresh subscriptions")
-    Websub.refresh_subscriptions()
-
-    spawn(fn ->
-      # 6 hours
-      Process.sleep(1000 * 60 * 60 * 6)
-      refresh_subscriptions()
-    end)
+  def refresh_subscriptions(worker_args \\ []) do
+    %{"op" => "refresh_subscriptions"}
+    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
+    |> Pleroma.Repo.insert()
   end
 
-  def perform(:request_subscription, websub) do
-    Logger.debug("Refreshing #{websub.topic}")
+  # Job Worker Callbacks
 
-    with {:ok, websub} <- Websub.request_subscription(websub) do
-      Logger.debug("Successfully refreshed #{websub.topic}")
-    else
-      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
-    end
+  @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
+  def perform(:publish_one, module, params) do
+    apply(module, :publish_one, [params])
   end
 
   def perform(:publish, activity) do
@@ -92,14 +95,6 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
-  def perform(:verify_websub, websub) do
-    Logger.debug(fn ->
-      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
-    end)
-
-    Websub.verify(websub)
-  end
-
   def perform(:incoming_doc, doc) do
     Logger.info("Got document, trying to parse")
     OStatus.handle_incoming(doc)
@@ -130,22 +125,33 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
-  def perform(
-        :publish_single_websub,
-        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
-      ) do
-    case Websub.publish_one(params) do
-      {:ok, _} ->
-        :ok
+  def perform(:request_subscription, websub) do
+    Logger.debug("Refreshing #{websub.topic}")
 
-      {:error, _} ->
-        RetryQueue.enqueue(params, Websub)
+    with {:ok, websub} <- Websub.request_subscription(websub) do
+      Logger.debug("Successfully refreshed #{websub.topic}")
+    else
+      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
     end
   end
 
-  def perform(type, _) do
-    Logger.debug(fn -> "Unknown task: #{type}" end)
-    {:error, "Don't know what to do with this"}
+  def perform(:verify_websub, websub) do
+    Logger.debug(fn ->
+      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+    end)
+
+    Websub.verify(websub)
+  end
+
+  def perform(:refresh_subscriptions) do
+    Logger.debug("Federator running refresh subscriptions")
+    Websub.refresh_subscriptions()
+
+    spawn(fn ->
+      # 6 hours
+      Process.sleep(1000 * 60 * 60 * 6)
+      refresh_subscriptions()
+    end)
   end
 
   def ap_enabled_actor(id) do
index 70f870244fbb5f7af1283985620004f3791522f7..05d2be61558381b3fe3cb54af62958a3ebb6ea86 100644 (file)
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
   alias Pleroma.Activity
   alias Pleroma.Config
   alias Pleroma.User
-  alias Pleroma.Web.Federator.RetryQueue
+  alias Pleroma.Workers.Publisher, as: PublisherWorker
 
   require Logger
 
@@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do
   Enqueue publishing a single activity.
   """
   @spec enqueue_one(module(), Map.t()) :: :ok
-  def enqueue_one(module, %{} = params),
-    do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
-  @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
-  def perform(:publish_one, module, params) do
-    case apply(module, :publish_one, [params]) do
-      {:ok, _} ->
-        :ok
-
-      {:error, _e} ->
-        RetryQueue.enqueue(params, module)
-    end
-  end
+  def enqueue_one(module, %{} = params) do
+    worker_args =
+      if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
+        [max_attempts: max_attempts]
+      else
+        []
+      end
 
-  def perform(type, _, _) do
-    Logger.debug("Unknown task: #{type}")
-    {:error, "Don't know what to do with this"}
+    %{"op" => "publish_one", "module" => to_string(module), "params" => params}
+    |> PublisherWorker.new(worker_args)
+    |> Pleroma.Repo.insert()
   end
 
   @doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644 (file)
index 3db948c..0000000
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
-  use GenServer
-
-  require Logger
-
-  def init(args) do
-    queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
-    {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
-  end
-
-  def start_link do
-    enabled =
-      if Pleroma.Config.get(:env) == :test,
-        do: true,
-        else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
-    if enabled do
-      Logger.info("Starting retry queue")
-
-      linkres =
-        GenServer.start_link(
-          __MODULE__,
-          %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
-          name: __MODULE__
-        )
-
-      maybe_kickoff_timer()
-      linkres
-    else
-      Logger.info("Retry queue disabled")
-      :ignore
-    end
-  end
-
-  def enqueue(data, transport, retries \\ 0) do
-    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
-  end
-
-  def get_stats do
-    GenServer.call(__MODULE__, :get_stats)
-  end
-
-  def reset_stats do
-    GenServer.call(__MODULE__, :reset_stats)
-  end
-
-  def get_retry_params(retries) do
-    if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
-      {:drop, "Max retries reached"}
-    else
-      {:retry, growth_function(retries)}
-    end
-  end
-
-  def get_retry_timer_interval do
-    Pleroma.Config.get([:retry_queue, :interval], 1000)
-  end
-
-  defp ets_count_expires(table, current_time) do
-    :ets.select_count(
-      table,
-      [
-        {
-          {:"$1", :"$2"},
-          [{:"=<", :"$1", {:const, current_time}}],
-          [true]
-        }
-      ]
-    )
-  end
-
-  defp ets_pop_n_expired(table, current_time, desired) do
-    {popped, _continuation} =
-      :ets.select(
-        table,
-        [
-          {
-            {:"$1", :"$2"},
-            [{:"=<", :"$1", {:const, current_time}}],
-            [:"$_"]
-          }
-        ],
-        desired
-      )
-
-    popped
-    |> Enum.each(fn e ->
-      :ets.delete_object(table, e)
-    end)
-
-    popped
-  end
-
-  def maybe_start_job(running_jobs, queue_table) do
-    # we don't want to hit the ets or the DateTime more times than we have to
-    # could optimize slightly further by not using the count, and instead grabbing
-    # up to N objects early...
-    current_time = DateTime.to_unix(DateTime.utc_now())
-    n_running_jobs = :sets.size(running_jobs)
-
-    if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
-      n_ready_jobs = ets_count_expires(queue_table, current_time)
-
-      if n_ready_jobs > 0 do
-        # figure out how many we could start
-        available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
-        start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
-      else
-        running_jobs
-      end
-    else
-      running_jobs
-    end
-  end
-
-  defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
-    running_jobs
-  end
-
-  defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
-       when available_job_slots > 0 do
-    candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
-    candidates
-    |> List.foldl(running_jobs, fn {_, e}, rj ->
-      {:ok, pid} = Task.start(fn -> worker(e) end)
-      mref = Process.monitor(pid)
-      :sets.add_element(mref, rj)
-    end)
-  end
-
-  def worker({:send, data, transport, retries}) do
-    case transport.publish_one(data) do
-      {:ok, _} ->
-        GenServer.cast(__MODULE__, :inc_delivered)
-        :delivered
-
-      {:error, _reason} ->
-        enqueue(data, transport, retries)
-        :retry
-    end
-  end
-
-  def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
-    {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
-  end
-
-  def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
-    {:reply, %{delivered: delivery_count, dropped: drop_count},
-     %{state | delivered: 0, dropped: 0}}
-  end
-
-  def handle_cast(:reset_stats, state) do
-    {:noreply, %{state | delivered: 0, dropped: 0}}
-  end
-
-  def handle_cast(
-        {:maybe_enqueue, data, transport, retries},
-        %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
-      ) do
-    case get_retry_params(retries) do
-      {:retry, timeout} ->
-        :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
-        running_jobs = maybe_start_job(running_jobs, queue_table)
-        {:noreply, %{state | running_jobs: running_jobs}}
-
-      {:drop, message} ->
-        Logger.debug(message)
-        {:noreply, %{state | dropped: drop_count + 1}}
-    end
-  end
-
-  def handle_cast(:kickoff_timer, state) do
-    retry_interval = get_retry_timer_interval()
-    Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
-    {:noreply, state}
-  end
-
-  def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
-    {:noreply, %{state | delivered: delivery_count + 1}}
-  end
-
-  def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
-    {:noreply, %{state | dropped: drop_count + 1}}
-  end
-
-  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
-    case transport.publish_one(data) do
-      {:ok, _} ->
-        {:noreply, %{state | delivered: delivery_count + 1}}
-
-      {:error, _reason} ->
-        enqueue(data, transport, retries)
-        {:noreply, state}
-    end
-  end
-
-  def handle_info(
-        :retry_timer_run,
-        %{queue_table: queue_table, running_jobs: running_jobs} = state
-      ) do
-    maybe_kickoff_timer()
-    running_jobs = maybe_start_job(running_jobs, queue_table)
-    {:noreply, %{state | running_jobs: running_jobs}}
-  end
-
-  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
-    %{running_jobs: running_jobs, queue_table: queue_table} = state
-    running_jobs = :sets.del_element(ref, running_jobs)
-    running_jobs = maybe_start_job(running_jobs, queue_table)
-    {:noreply, %{state | running_jobs: running_jobs}}
-  end
-
-  def handle_info(unknown, state) do
-    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
-    {:noreply, state}
-  end
-
-  if Pleroma.Config.get(:env) == :test do
-    defp growth_function(_retries) do
-      _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
-      DateTime.to_unix(DateTime.utc_now()) - 1
-    end
-  else
-    defp growth_function(retries) do
-      round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
-        DateTime.to_unix(DateTime.utc_now())
-    end
-  end
-
-  defp maybe_kickoff_timer do
-    GenServer.cast(__MODULE__, :kickoff_timer)
-  end
-end
index dca85244931d5cd2c664ebb41f3bd9c305b113f9..c0c9c3653827f8036155d79de331bb4b2840e932 100644 (file)
@@ -14,9 +14,12 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
               [:oauth2, :clean_expired_tokens_interval],
               86_400_000
             )
-  @queue :background
 
+  alias Pleroma.Repo
   alias Pleroma.Web.OAuth.Token
+  alias Pleroma.Workers.BackgroundWorker
+
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
 
   def start_link, do: GenServer.start_link(__MODULE__, nil)
 
@@ -31,8 +34,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
 
   @doc false
   def handle_info(:perform, state) do
+    %{"op" => "clean_expired_tokens"}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+
     Process.send_after(self(), :perform, @interval)
-    PleromaJobQueue.enqueue(@queue, __MODULE__, [:clean])
     {:noreply, state}
   end
 
index 729dad02a22a0cec0698c6501cf2671e939ec49f..b4f0e5127e37518efb603fde9dd1a307017aed65 100644 (file)
@@ -3,10 +3,13 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Push do
-  alias Pleroma.Web.Push.Impl
+  alias Pleroma.Repo
+  alias Pleroma.Workers.WebPusher
 
   require Logger
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   def init do
     unless enabled() do
       Logger.warn("""
@@ -31,6 +34,9 @@ defmodule Pleroma.Web.Push do
     end
   end
 
-  def send(notification),
-    do: PleromaJobQueue.enqueue(:web_push, Impl, [notification])
+  def send(notification) do
+    %{"op" => "web_push", "notification_id" => notification.id}
+    |> WebPusher.new(worker_args(:web_push))
+    |> Repo.insert()
+  end
 end
index 9b01ebcc642ea30acc7f09024bc5f7c9c0fea58b..bbaa293fd3105a4b0f3e617ffa45c33190528a79 100644 (file)
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
     end
   end
 
+  def publish_one(%{recipient_id: recipient_id} = params) do
+    recipient = User.get_by_id(recipient_id)
+
+    params
+    |> Map.delete(:recipient_id)
+    |> Map.put(:recipient, recipient)
+    |> publish_one()
+  end
+
   def publish_one(_), do: :noop
 
   @supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
         Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
 
         Publisher.enqueue_one(__MODULE__, %{
-          recipient: remote_user,
+          recipient_id: remote_user.id,
           feed: feed,
           unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
         })
index 3405bd3b7f5c2ab5d551183ec7fd32666d76f9a9..7ba4ad305f288be53c0da3ec311bb81bed3ab9ba 100644 (file)
@@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
              String.split(line, ",") |> List.first()
            end)
            |> List.delete("Account address") do
-      PleromaJobQueue.enqueue(:background, User, [
-        :follow_import,
-        follower,
-        followed_identifiers
-      ])
-
+      User.follow_import(follower, followed_identifiers)
       json(conn, "job started")
     end
   end
@@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
 
   def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
     with blocked_identifiers <- String.split(list) do
-      PleromaJobQueue.enqueue(:background, User, [
-        :blocks_import,
-        blocker,
-        blocked_identifiers
-      ])
-
+      User.blocks_import(blocker, blocked_identifiers)
       json(conn, "job started")
     end
   end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
new file mode 100644 (file)
index 0000000..3ab2b6b
--- /dev/null
@@ -0,0 +1,66 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+  alias Pleroma.Activity
+  alias Pleroma.User
+  alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
+  alias Pleroma.Web.OAuth.Token.CleanWorker
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "background",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}) do
+    user = User.get_by_id(user_id)
+    User.perform(:fetch_initial_posts, user)
+  end
+
+  def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}) do
+    user = User.get_by_id(user_id)
+    User.perform(:deactivate_async, user, status)
+  end
+
+  def perform(%{"op" => "delete_user", "user_id" => user_id}) do
+    user = User.get_by_id(user_id)
+    User.perform(:delete, user)
+  end
+
+  def perform(%{
+        "op" => "blocks_import",
+        "blocker_id" => blocker_id,
+        "blocked_identifiers" => blocked_identifiers
+      }) do
+    blocker = User.get_by_id(blocker_id)
+    User.perform(:blocks_import, blocker, blocked_identifiers)
+  end
+
+  def perform(%{
+        "op" => "follow_import",
+        "follower_id" => follower_id,
+        "followed_identifiers" => followed_identifiers
+      }) do
+    follower = User.get_by_id(follower_id)
+    User.perform(:follow_import, follower, followed_identifiers)
+  end
+
+  def perform(%{"op" => "clean_expired_tokens"}) do
+    CleanWorker.perform(:clean)
+  end
+
+  def perform(%{"op" => "media_proxy_preload", "message" => message}) do
+    MediaProxyWarmingPolicy.perform(:preload, message)
+  end
+
+  def perform(%{"op" => "media_proxy_prefetch", "url" => url}) do
+    MediaProxyWarmingPolicy.perform(:prefetch, url)
+  end
+
+  def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}) do
+    activity = Activity.get_by_id(activity_id)
+    Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+  end
+end
diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex
new file mode 100644 (file)
index 0000000..3286ce0
--- /dev/null
@@ -0,0 +1,13 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Helper do
+  def worker_args(queue) do
+    if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
+      [max_attempts: max_attempts]
+    else
+      []
+    end
+  end
+end
diff --git a/lib/pleroma/workers/mailer.ex b/lib/pleroma/workers/mailer.ex
new file mode 100644 (file)
index 0000000..8bf9952
--- /dev/null
@@ -0,0 +1,27 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Mailer do
+  alias Pleroma.User
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "mailer",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}) do
+    email =
+      encoded_email
+      |> Base.decode64!()
+      |> :erlang.binary_to_term()
+
+    Pleroma.Emails.Mailer.deliver(email, config)
+  end
+
+  def perform(%{"op" => "digest_email", "user_id" => user_id}) do
+    user = User.get_by_id(user_id)
+    Pleroma.DigestEmailWorker.perform(user)
+  end
+end
diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex
new file mode 100644 (file)
index 0000000..c890ffb
--- /dev/null
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Publisher do
+  alias Pleroma.Activity
+  alias Pleroma.Web.Federator
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "federator_outgoing",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "publish", "activity_id" => activity_id}) do
+    activity = Activity.get_by_id(activity_id)
+    Federator.perform(:publish, activity)
+  end
+
+  def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
+    Federator.perform(:publish_one, String.to_atom(module_name), params)
+  end
+end
diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex
new file mode 100644 (file)
index 0000000..d3de957
--- /dev/null
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Receiver do
+  alias Pleroma.Web.Federator
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "federator_incoming",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "incoming_doc", "body" => doc}) do
+    Federator.perform(:incoming_doc, doc)
+  end
+
+  def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
+    Federator.perform(:incoming_ap_doc, params)
+  end
+end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
new file mode 100644 (file)
index 0000000..a49834f
--- /dev/null
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ScheduledActivityWorker do
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "scheduled_activities",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "execute", "activity_id" => activity_id}) do
+    Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
+  end
+end
diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex
new file mode 100644 (file)
index 0000000..6af3ad0
--- /dev/null
@@ -0,0 +1,29 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Subscriber do
+  alias Pleroma.Repo
+  alias Pleroma.Web.Federator
+  alias Pleroma.Web.Websub.WebsubClientSubscription
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "federator_outgoing",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "refresh_subscriptions"}) do
+    Federator.perform(:refresh_subscriptions)
+  end
+
+  def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
+    websub = Repo.get(WebsubClientSubscription, websub_id)
+    Federator.perform(:request_subscription, websub)
+  end
+
+  def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
+    websub = Repo.get(WebsubClientSubscription, websub_id)
+    Federator.perform(:verify_websub, websub)
+  end
+end
diff --git a/lib/pleroma/workers/transmogrifier.ex b/lib/pleroma/workers/transmogrifier.ex
new file mode 100644 (file)
index 0000000..c6b4fab
--- /dev/null
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Transmogrifier do
+  alias Pleroma.User
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "transmogrifier",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "user_upgrade", "user_id" => user_id}) do
+    user = User.get_by_id(user_id)
+    Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
+  end
+end
diff --git a/lib/pleroma/workers/web_pusher.ex b/lib/pleroma/workers/web_pusher.ex
new file mode 100644 (file)
index 0000000..b99581e
--- /dev/null
@@ -0,0 +1,19 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WebPusher do
+  alias Pleroma.Notification
+  alias Pleroma.Repo
+
+  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  use Oban.Worker,
+    queue: "web_push",
+    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+  @impl Oban.Worker
+  def perform(%{"op" => "web_push", "notification_id" => notification_id}) do
+    notification = Repo.get(Notification, notification_id)
+    Pleroma.Web.Push.Impl.perform(notification)
+  end
+end
diff --git a/mix.exs b/mix.exs
index 3170d6f2d7ca8413660297439c3fbc092569a458..b651520edce1b7884427c2e63764ce1f0938176c 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do
       {:phoenix_ecto, "~> 4.0"},
       {:ecto_sql, "~> 3.1"},
       {:postgrex, ">= 0.13.5"},
+      {:oban, "~> 0.6"},
       {:gettext, "~> 0.15"},
       {:comeonin, "~> 4.1.1"},
       {:pbkdf2_elixir, "~> 0.12.3"},
index 2639e96e95ad118bc30360793cc7fad4eb939f1c..52932c9ef3f352fddf32a565477ae2964f7ef028 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -57,6 +57,7 @@
   "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
   "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
   "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
+  "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
   "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
   "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
   "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs
new file mode 100644 (file)
index 0000000..2f201bd
--- /dev/null
@@ -0,0 +1,6 @@
+defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
+  use Ecto.Migration
+
+  defdelegate up, to: Oban.Migrations
+  defdelegate down, to: Oban.Migrations
+end
index b27f6fd369ac82f20f07ee6f84445eafdf12cf1d..658c478375097b8f7c9a9d193b7976d73c427606 100644 (file)
@@ -7,6 +7,7 @@ defmodule Pleroma.ActivityTest do
   alias Pleroma.Activity
   alias Pleroma.Bookmark
   alias Pleroma.Object
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.ThreadMute
   import Pleroma.Factory
 
@@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do
       }
 
       {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"})
-      {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params)
+      {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params)
+      {:ok, remote_activity} = ObanHelpers.perform(job)
       %{local_activity: local_activity, remote_activity: remote_activity, user: user}
     end
 
index aa193e0d40dadf907f974ef2cfa378ee7788844e..f917aa69105d7c2369738f63e8b298abc2a49f30 100644 (file)
@@ -28,6 +28,8 @@ defmodule Pleroma.ConversationTest do
     {:ok, _activity} =
       CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"})
 
+    Pleroma.Tests.ObanHelpers.perform_all()
+
     Repo.delete_all(Conversation)
     Repo.delete_all(Conversation.Participation)
 
index 595f64ed70e3d95e8f4d45ab1ee94d6ba9fa5926..5fbeac0d66c93dfab8d220b6a8aaf06aa11704d9 100644 (file)
@@ -4,6 +4,7 @@ defmodule Mix.Tasks.Pleroma.DigestTest do
   import Pleroma.Factory
   import Swoosh.TestAssertions
 
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.CommonAPI
 
   setup_all do
@@ -39,6 +40,8 @@ defmodule Mix.Tasks.Pleroma.DigestTest do
 
       :ok = Mix.Tasks.Pleroma.Digest.run(["test", user2.nickname, yesterday_date])
 
+      ObanHelpers.perform_all()
+
       assert_receive {:mix_shell, :info, [message]}
       assert message =~ "Digest email have been sent"
 
index 80ea2a085b5f1153109d3a4e95358c49437debbd..e1c9f4f93ba151d4587be144a1c27a16f84810af 100644 (file)
@@ -8,6 +8,7 @@ defmodule Pleroma.NotificationTest do
   import Pleroma.Factory
 
   alias Pleroma.Notification
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.Transmogrifier
   alias Pleroma.Web.CommonAPI
@@ -621,7 +622,8 @@ defmodule Pleroma.NotificationTest do
 
       refute Enum.empty?(Notification.for_user(other_user))
 
-      User.delete(user)
+      {:ok, job} = User.delete(user)
+      ObanHelpers.perform(job)
 
       assert Enum.empty?(Notification.for_user(other_user))
     end
@@ -666,6 +668,7 @@ defmodule Pleroma.NotificationTest do
       }
 
       {:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message)
+      ObanHelpers.perform_all()
 
       assert Enum.empty?(Notification.for_user(local_user))
     end
diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex
new file mode 100644 (file)
index 0000000..d379c9e
--- /dev/null
@@ -0,0 +1,42 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Tests.ObanHelpers do
+  @moduledoc """
+  Oban test helpers.
+  """
+
+  alias Pleroma.Repo
+
+  def perform_all do
+    Oban.Job
+    |> Repo.all()
+    |> perform()
+  end
+
+  def perform(%Oban.Job{} = job) do
+    res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job])
+    Repo.delete(job)
+    res
+  end
+
+  def perform(jobs) when is_list(jobs) do
+    for job <- jobs, do: perform(job)
+  end
+
+  def member?(%{} = job_args, jobs) when is_list(jobs) do
+    Enum.any?(jobs, fn job ->
+      member?(job_args, job.args)
+    end)
+  end
+
+  def member?(%{} = test_attrs, %{} = attrs) do
+    Enum.all?(
+      test_attrs,
+      fn {k, _v} -> member?(test_attrs[k], attrs[k]) end
+    )
+  end
+
+  def member?(x, y), do: x == y
+end
index b363b322c28590daac9a09375a9d11c7ed66d72c..2b955ced0c73a9994921fe811a4c95c185fef483 100644 (file)
@@ -7,14 +7,15 @@ defmodule Pleroma.UserTest do
   alias Pleroma.Builders.UserBuilder
   alias Pleroma.Object
   alias Pleroma.Repo
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.CommonAPI
 
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
 
   import Pleroma.Factory
-  import Mock
 
   setup_all do
     Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -678,7 +679,9 @@ defmodule Pleroma.UserTest do
         user3.nickname
       ]
 
-      result = User.follow_import(user1, identifiers)
+      {:ok, job} = User.follow_import(user1, identifiers)
+      result = ObanHelpers.perform(job)
+
       assert is_list(result)
       assert result == [user2, user3]
     end
@@ -889,7 +892,9 @@ defmodule Pleroma.UserTest do
         user3.nickname
       ]
 
-      result = User.blocks_import(user1, identifiers)
+      {:ok, job} = User.blocks_import(user1, identifiers)
+      result = ObanHelpers.perform(job)
+
       assert is_list(result)
       assert result == [user2, user3]
     end
@@ -1015,7 +1020,8 @@ defmodule Pleroma.UserTest do
       {:ok, like_two, _} = CommonAPI.favorite(activity.id, follower)
       {:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user)
 
-      {:ok, _} = User.delete(user)
+      {:ok, job} = User.delete(user)
+      {:ok, _user} = ObanHelpers.perform(job)
 
       follower = User.get_cached_by_id(follower.id)
 
@@ -1037,11 +1043,7 @@ defmodule Pleroma.UserTest do
       refute Activity.get_by_id(repeat.id)
     end
 
-    test_with_mock "it sends out User Delete activity",
-                   %{user: user},
-                   Pleroma.Web.ActivityPub.Publisher,
-                   [:passthrough],
-                   [] do
+    test "it sends out User Delete activity", %{user: user} do
       config_path = [:instance, :federating]
       initial_setting = Pleroma.Config.get(config_path)
       Pleroma.Config.put(config_path, true)
@@ -1049,12 +1051,18 @@ defmodule Pleroma.UserTest do
       {:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin")
       {:ok, _} = User.follow(follower, user)
 
-      {:ok, _user} = User.delete(user)
-
-      assert called(
-               Pleroma.Web.ActivityPub.Publisher.publish_one(%{
-                 inbox: "http://mastodon.example.org/inbox"
-               })
+      {:ok, job} = User.delete(user)
+      {:ok, _user} = ObanHelpers.perform(job)
+
+      assert ObanHelpers.member?(
+               %{
+                 "op" => "publish_one",
+                 "params" => %{
+                   "inbox" => "http://mastodon.example.org/inbox",
+                   "id" => "pleroma:fakeid"
+                 }
+               },
+               all_enqueued(worker: Pleroma.Workers.Publisher)
              )
 
       Pleroma.Config.put(config_path, initial_setting)
@@ -1101,7 +1109,8 @@ defmodule Pleroma.UserTest do
     test "User.delete() plugs any possible zombie objects" do
       user = insert(:user)
 
-      {:ok, _} = User.delete(user)
+      {:ok, job} = User.delete(user)
+      {:ok, _} = ObanHelpers.perform(job)
 
       {:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}")
 
index 251055ee17ea8458ff7245bd502a615817e7fafa..f46353fddb52a66ece35708a694930feae161b9c 100644 (file)
@@ -4,15 +4,19 @@
 
 defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
   use Pleroma.Web.ConnCase
+  use Oban.Testing, repo: Pleroma.Repo
+
   import Pleroma.Factory
   alias Pleroma.Activity
   alias Pleroma.Instances
   alias Pleroma.Object
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ObjectView
   alias Pleroma.Web.ActivityPub.UserView
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.CommonAPI
+  alias Pleroma.Workers.Receiver, as: ReceiverWorker
 
   setup_all do
     Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -279,7 +283,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/inbox", data)
 
       assert "ok" == json_response(conn, 200)
-      :timer.sleep(500)
+
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
       assert Activity.get_by_ap_id(data["id"])
     end
 
@@ -321,7 +326,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/users/#{user.nickname}/inbox", data)
 
       assert "ok" == json_response(conn, 200)
-      :timer.sleep(500)
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
       assert Activity.get_by_ap_id(data["id"])
     end
 
@@ -350,7 +355,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/users/#{recipient.nickname}/inbox", data)
 
       assert "ok" == json_response(conn, 200)
-      :timer.sleep(500)
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
       assert Activity.get_by_ap_id(data["id"])
     end
 
@@ -429,6 +434,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
       |> post("/users/#{recipient.nickname}/inbox", data)
       |> json_response(200)
 
+      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
+
       activity = Activity.get_by_ap_id(data["id"])
 
       assert activity.id
@@ -504,6 +511,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
         |> post("/users/#{user.nickname}/outbox", data)
 
       result = json_response(conn, 201)
+
       assert Activity.get_by_ap_id(result["id"])
     end
 
index 372e789be34e585da6d68e654e36f22bb958e883..95a809d25a1a5551bad1de39da6d46dc7208c678 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
   use Pleroma.DataCase
 
   alias Pleroma.HTTP
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
 
   import Mock
@@ -24,6 +25,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
   test "it prefetches media proxy URIs" do
     with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do
       MediaProxyWarmingPolicy.filter(@message)
+
+      ObanHelpers.perform_all()
+      # Performing jobs which has been just enqueued
+      ObanHelpers.perform_all()
+
       assert called(HTTP.get(:_, :_, :_))
     end
   end
index 36a39c84c9208a0f38e4eb9d86a2940fac6b4fe4..26d019878b77a7efaffd2b7526e15df58cb93fce 100644 (file)
@@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
       assert called(
                Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
                  inbox: "https://domain.com/users/nick1/inbox",
-                 actor: actor,
+                 actor_id: actor.id,
                  id: note_activity.data["id"]
                })
              )
index 060b91e29794ee1c4a6ba7a07c55fae2f051893d..e80263328c818f7b01d9ea373b60cb2713345ed5 100644 (file)
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
   alias Pleroma.Object
   alias Pleroma.Object.Fetcher
   alias Pleroma.Repo
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -584,6 +585,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
         |> Poison.decode!()
 
       {:ok, _} = Transmogrifier.handle_incoming(data)
+      ObanHelpers.perform_all()
 
       refute User.get_cached_by_ap_id(ap_id)
     end
@@ -1146,6 +1148,8 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
       assert user.info.note_count == 1
 
       {:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye")
+      ObanHelpers.perform_all()
+
       assert user.info.ap_enabled
       assert user.info.note_count == 1
       assert user.follower_address == "https://niu.moe/users/rye/followers"
index 73cfaa8f1ec96dbb681ba44ccd4427dfd1a139ed..9ca341b6da988d710bde2325b54d1e641f999a42 100644 (file)
@@ -4,9 +4,14 @@
 
 defmodule Pleroma.Web.FederatorTest do
   alias Pleroma.Instances
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Federator
+  alias Pleroma.Workers.Publisher, as: PublisherWorker
+
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
+
   import Pleroma.Factory
   import Mock
 
@@ -22,15 +27,6 @@ defmodule Pleroma.Web.FederatorTest do
     :ok
   end
 
-  describe "Publisher.perform" do
-    test "call `perform` with unknown task" do
-      assert {
-               :error,
-               "Don't know what to do with this"
-             } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok)
-    end
-  end
-
   describe "Publish an activity" do
     setup do
       user = insert(:user)
@@ -51,6 +47,7 @@ defmodule Pleroma.Web.FederatorTest do
     } do
       with_mocks([relay_mock]) do
         Federator.publish(activity)
+        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
       end
 
       assert_received :relay_publish
@@ -64,6 +61,7 @@ defmodule Pleroma.Web.FederatorTest do
 
       with_mocks([relay_mock]) do
         Federator.publish(activity)
+        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
       end
 
       refute_received :relay_publish
@@ -73,10 +71,7 @@ defmodule Pleroma.Web.FederatorTest do
   end
 
   describe "Targets reachability filtering in `publish`" do
-    test_with_mock "it federates only to reachable instances via AP",
-                   Pleroma.Web.ActivityPub.Publisher,
-                   [:passthrough],
-                   [] do
+    test "it federates only to reachable instances via AP" do
       user = insert(:user)
 
       {inbox1, inbox2} =
@@ -104,20 +99,20 @@ defmodule Pleroma.Web.FederatorTest do
       {:ok, _activity} =
         CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
 
-      assert called(
-               Pleroma.Web.ActivityPub.Publisher.publish_one(%{
-                 inbox: inbox1,
-                 unreachable_since: dt
-               })
-             )
+      expected_dt = NaiveDateTime.to_iso8601(dt)
 
-      refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2}))
+      ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
+
+      assert ObanHelpers.member?(
+               %{
+                 "op" => "publish_one",
+                 "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt}
+               },
+               all_enqueued(worker: PublisherWorker)
+             )
     end
 
-    test_with_mock "it federates only to reachable instances via Websub",
-                   Pleroma.Web.Websub,
-                   [:passthrough],
-                   [] do
+    test "it federates only to reachable instances via Websub" do
       user = insert(:user)
       websub_topic = Pleroma.Web.OStatus.feed_path(user)
 
@@ -142,23 +137,27 @@ defmodule Pleroma.Web.FederatorTest do
 
       {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
 
-      assert called(
-               Pleroma.Web.Websub.publish_one(%{
-                 callback: sub2.callback,
-                 unreachable_since: dt
-               })
-             )
+      expected_callback = sub2.callback
+      expected_dt = NaiveDateTime.to_iso8601(dt)
+
+      ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
 
-      refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback}))
+      assert ObanHelpers.member?(
+               %{
+                 "op" => "publish_one",
+                 "params" => %{
+                   "callback" => expected_callback,
+                   "unreachable_since" => expected_dt
+                 }
+               },
+               all_enqueued(worker: PublisherWorker)
+             )
     end
 
-    test_with_mock "it federates only to reachable instances via Salmon",
-                   Pleroma.Web.Salmon,
-                   [:passthrough],
-                   [] do
+    test "it federates only to reachable instances via Salmon" do
       user = insert(:user)
 
-      remote_user1 =
+      _remote_user1 =
         insert(:user, %{
           local: false,
           nickname: "nick1@domain.com",
@@ -174,6 +173,8 @@ defmodule Pleroma.Web.FederatorTest do
           info: %{salmon: "https://domain2.com/salmon"}
         })
 
+      remote_user2_id = remote_user2.id
+
       dt = NaiveDateTime.utc_now()
       Instances.set_unreachable(remote_user2.ap_id, dt)
 
@@ -182,14 +183,20 @@ defmodule Pleroma.Web.FederatorTest do
       {:ok, _activity} =
         CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
 
-      assert called(
-               Pleroma.Web.Salmon.publish_one(%{
-                 recipient: remote_user2,
-                 unreachable_since: dt
-               })
-             )
+      expected_dt = NaiveDateTime.to_iso8601(dt)
+
+      ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
 
-      refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1}))
+      assert ObanHelpers.member?(
+               %{
+                 "op" => "publish_one",
+                 "params" => %{
+                   "recipient_id" => remote_user2_id,
+                   "unreachable_since" => expected_dt
+                 }
+               },
+               all_enqueued(worker: PublisherWorker)
+             )
     end
   end
 
@@ -209,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do
         "to" => ["https://www.w3.org/ns/activitystreams#Public"]
       }
 
-      {:ok, _activity} = Federator.incoming_ap_doc(params)
+      assert {:ok, job} = Federator.incoming_ap_doc(params)
+      assert {:ok, _activity} = ObanHelpers.perform(job)
     end
 
     test "rejects incoming AP docs with incorrect origin" do
@@ -227,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do
         "to" => ["https://www.w3.org/ns/activitystreams#Public"]
       }
 
-      :error = Federator.incoming_ap_doc(params)
+      assert {:ok, job} = Federator.incoming_ap_doc(params)
+      assert :error = ObanHelpers.perform(job)
     end
 
     test "it does not crash if MRF rejects the post" do
@@ -240,7 +249,8 @@ defmodule Pleroma.Web.FederatorTest do
         File.read!("test/fixtures/mastodon-post-activity.json")
         |> Poison.decode!()
 
-      assert Federator.incoming_ap_doc(params) == :error
+      assert {:ok, job} = Federator.incoming_ap_doc(params)
+      assert :error = ObanHelpers.perform(job)
 
       Pleroma.Config.put([:instance, :rewrite_policy], policies)
       Pleroma.Config.put(:mrf_keyword, mrf_keyword_policy)
index d28730994567b66bff6f812a137729fc257e46f8..a1bdd45d3d2b115387c656bc55bbe379b8d9ac97 100644 (file)
@@ -22,7 +22,8 @@ defmodule Pleroma.Instances.InstanceTest do
 
   describe "set_reachable/1" do
     test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do
-      instance = insert(:instance, unreachable_since: NaiveDateTime.utc_now())
+      unreachable_since = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now())
+      instance = insert(:instance, unreachable_since: unreachable_since)
 
       assert {:ok, instance} = Instance.set_reachable(instance.host)
       refute instance.unreachable_since
index 2febe8b3a395591b1ef6e5ee4cec71b90c7147fb..f18b58a1bf4c86a0f95cda7e0ce01a59d5d60c4e 100644 (file)
@@ -12,6 +12,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.ScheduledActivity
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.CommonAPI
@@ -3874,6 +3875,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
     end
 
     test "it sends an email to user", %{user: user} do
+      ObanHelpers.perform_all()
       token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)
 
       email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token)
@@ -3937,6 +3939,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
       |> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}")
       |> json_response(:no_content)
 
+      ObanHelpers.perform_all()
+
       email = Pleroma.Emails.UserEmail.account_confirmation_email(user)
       notify_email = Config.get([:instance, :notify_email])
       instance_name = Config.get([:instance, :name])
diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs
deleted file mode 100644 (file)
index ecb3ce5..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule MockActivityPub do
-  def publish_one({ret, waiter}) do
-    send(waiter, :complete)
-    {ret, "success"}
-  end
-end
-
-defmodule Pleroma.Web.Federator.RetryQueueTest do
-  use Pleroma.DataCase
-  alias Pleroma.Web.Federator.RetryQueue
-
-  @small_retry_count 0
-  @hopeless_retry_count 10
-
-  setup do
-    RetryQueue.reset_stats()
-  end
-
-  test "RetryQueue responds to stats request" do
-    assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats()
-  end
-
-  test "failed posts are retried" do
-    {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
-
-    wait_task =
-      Task.async(fn ->
-        receive do
-          :complete -> :ok
-        end
-      end)
-
-    RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count)
-    Task.await(wait_task)
-    assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats()
-  end
-
-  test "posts that have been tried too many times are dropped" do
-    {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
-
-    RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count)
-    assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats()
-  end
-end
index e86e76fe931efdf6220cb46773da8cc60af8d772..0186f3fef541613be5efa352bf523ac424bd40c6 100644 (file)
@@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do
 
     Salmon.publish(user, activity)
 
-    assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user}))
+    assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))
   end
 end
index 8bb8aa36d9d5eca967119e83f5a7f933a93bd475..9ac4ff9297c01375cbb9dd13eb67ec62af1edf6a 100644 (file)
@@ -12,6 +12,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do
   alias Pleroma.Notification
   alias Pleroma.Object
   alias Pleroma.Repo
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.CommonAPI
@@ -1099,6 +1100,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do
     end
 
     test "it sends an email to user", %{user: user} do
+      ObanHelpers.perform_all()
       token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)
 
       email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token)
@@ -1209,6 +1211,8 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do
       |> assign(:user, user)
       |> post("/api/account/resend_confirmation_email?email=#{user.email}")
 
+      ObanHelpers.perform_all()
+
       email = Pleroma.Emails.UserEmail.account_confirmation_email(user)
       notify_email = Pleroma.Config.get([:instance, :notify_email])
       instance_name = Pleroma.Config.get([:instance, :name])
index cbe83852e9bade09009cfc2407e0a594da4883d2..bf063a0dec5307d93a750c89427d4155a5b5661d 100644 (file)
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do
   alias Pleroma.Activity
   alias Pleroma.Object
   alias Pleroma.Repo
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.UserInviteToken
   alias Pleroma.Web.ActivityPub.ActivityPub
@@ -321,6 +322,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do
     }
 
     {:ok, user} = TwitterAPI.register_user(data)
+    ObanHelpers.perform_all()
 
     assert user.info.confirmation_pending
 
index 640579c09fe49e4e962b838d77ece35afd52c1b3..e3f129f72ada47723fc9ca6feb8d493dadc0ef26 100644 (file)
@@ -4,9 +4,11 @@
 
 defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
   use Pleroma.Web.ConnCase
+  use Oban.Testing, repo: Pleroma.Repo
 
   alias Pleroma.Notification
   alias Pleroma.Repo
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   import Pleroma.Factory
@@ -50,8 +52,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
         {File, [],
          read!: fn "follow_list.txt" ->
            "Account address,Show boosts\n#{user2.ap_id},true"
-         end},
-        {PleromaJobQueue, [:passthrough], []}
+         end}
       ]) do
         response =
           conn
@@ -59,15 +60,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
           |> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}})
           |> json_response(:ok)
 
-        assert called(
-                 PleromaJobQueue.enqueue(
-                   :background,
-                   User,
-                   [:follow_import, user1, [user2.ap_id]]
-                 )
-               )
-
         assert response == "job started"
+
+        assert ObanHelpers.member?(
+                 %{
+                   "op" => "follow_import",
+                   "follower_id" => user1.id,
+                   "followed_identifiers" => [user2.ap_id]
+                 },
+                 all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
+               )
       end
     end
 
@@ -126,8 +128,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
       user3 = insert(:user)
 
       with_mocks([
-        {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end},
-        {PleromaJobQueue, [:passthrough], []}
+        {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
       ]) do
         response =
           conn
@@ -135,15 +136,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
           |> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}})
           |> json_response(:ok)
 
-        assert called(
-                 PleromaJobQueue.enqueue(
-                   :background,
-                   User,
-                   [:blocks_import, user1, [user2.ap_id, user3.ap_id]]
-                 )
-               )
-
         assert response == "job started"
+
+        assert ObanHelpers.member?(
+                 %{
+                   "op" => "blocks_import",
+                   "blocker_id" => user1.id,
+                   "blocked_identifiers" => [user2.ap_id, user3.ap_id]
+                 },
+                 all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
+               )
       end
     end
   end
@@ -607,6 +609,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
         |> json_response(:ok)
 
       assert response == %{"status" => "success"}
+      ObanHelpers.perform_all()
 
       user = User.get_cached_by_id(user.id)
 
index 74386d7dbb98a149fbb8afa1d2e3106782c79d28..414610879629263e41d3320fc010b10896fa8bac 100644 (file)
@@ -4,11 +4,14 @@
 
 defmodule Pleroma.Web.WebsubTest do
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
 
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.Router.Helpers
   alias Pleroma.Web.Websub
   alias Pleroma.Web.Websub.WebsubClientSubscription
   alias Pleroma.Web.Websub.WebsubServerSubscription
+  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
 
   import Pleroma.Factory
   import Tesla.Mock
@@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do
         })
 
       _refresh = Websub.refresh_subscriptions()
+      ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))
 
       assert still_good == Repo.get(WebsubClientSubscription, still_good.id)
       refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)