[#1149] Refactored Oban workers API (introduced `enqueue/3`).
authorIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 31 Aug 2019 18:58:42 +0000 (21:58 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 31 Aug 2019 18:58:42 +0000 (21:58 +0300)
23 files changed:
lib/pleroma/activity_expiration_worker.ex
lib/pleroma/digest_email_worker.ex
lib/pleroma/emails/mailer.ex
lib/pleroma/scheduled_activity_worker.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/publisher.ex
lib/pleroma/web/oauth/token/clean_worker.ex
lib/pleroma/web/push/push.ex
lib/pleroma/workers/activity_expiration_worker.ex
lib/pleroma/workers/background_worker.ex
lib/pleroma/workers/digest_emails_worker.ex [new file with mode: 0644]
lib/pleroma/workers/mailer_worker.ex
lib/pleroma/workers/publisher_worker.ex
lib/pleroma/workers/receiver_worker.ex
lib/pleroma/workers/scheduled_activity_worker.ex
lib/pleroma/workers/subscriber_worker.ex
lib/pleroma/workers/transmogrifier_worker.ex
lib/pleroma/workers/web_pusher_worker.ex
lib/pleroma/workers/worker_helper.ex

index 7aba7eeceee37d918897d0716734a1e6f6e71c84..c0820c202253d6e41faf96a64a4e550b664d85ce 100644 (file)
@@ -9,14 +9,11 @@ defmodule Pleroma.ActivityExpirationWorker do
   alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
-  alias Pleroma.Workers.ActivityExpirationWorker
 
   require Logger
   use GenServer
   import Ecto.Query
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   @schedule_interval :timer.minutes(1)
 
   def start_link(_) do
@@ -53,12 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do
   def handle_info(:perform, state) do
     ActivityExpiration.due_expirations(@schedule_interval)
     |> Enum.each(fn expiration ->
-      %{
-        "op" => "activity_expiration",
-        "activity_expiration_id" => expiration.id
-      }
-      |> ActivityExpirationWorker.new(worker_args(:activity_expiration))
-      |> Repo.insert()
+      Pleroma.Workers.ActivityExpirationWorker.enqueue(
+        "activity_expiration",
+        %{"activity_expiration_id" => expiration.id}
+      )
     end)
 
     schedule_next()
index 4ab2a4ef4f8cb781da9e8222b76cc45fc581bc6d..5be7cf26b3820e50496d89aa1b82e4532d870c72 100644 (file)
@@ -4,12 +4,10 @@
 
 defmodule Pleroma.DigestEmailWorker do
   alias Pleroma.Repo
-  alias Pleroma.Workers.MailerWorker
+  alias Pleroma.Workers.DigestEmailsWorker
 
   import Ecto.Query
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   def perform do
     config = Pleroma.Config.get([:email_notifications, :digest])
     negative_interval = -Map.fetch!(config, :interval)
@@ -23,11 +21,9 @@ defmodule Pleroma.DigestEmailWorker do
       where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
       select: u
     )
-    |> Pleroma.Repo.all()
+    |> Repo.all()
     |> Enum.each(fn user ->
-      %{"op" => "digest_email", "user_id" => user.id}
-      |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
-      |> Repo.insert()
+      DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
     end)
   end
 
index 9cbe7313c05c2912508d9976001e611bc1cd24cf..eb96f2e8b7b81b4d0557a59334411aa7181eb406 100644 (file)
@@ -9,7 +9,6 @@ defmodule Pleroma.Emails.Mailer do
   The module contains functions to delivery email using Swoosh.Mailer.
   """
 
-  alias Pleroma.Repo
   alias Pleroma.Workers.MailerWorker
   alias Swoosh.DeliveryError
 
@@ -19,8 +18,6 @@ defmodule Pleroma.Emails.Mailer do
   @spec enabled?() :: boolean()
   def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   @doc "add email to queue"
   def deliver_async(email, config \\ []) do
     encoded_email =
@@ -28,9 +25,7 @@ defmodule Pleroma.Emails.Mailer do
       |> :erlang.term_to_binary()
       |> Base.encode64()
 
-    %{"op" => "email", "encoded_email" => encoded_email, "config" => config}
-    |> MailerWorker.new(worker_args(:mailer))
-    |> Repo.insert()
+    MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
   end
 
   @doc "callback to perform send email from queue"
index 8bf534f4290b8a16bdb7725b03fe2ffd4350dfe2..c41a542de4003818d810b50ec256592b53d37a58 100644 (file)
@@ -8,7 +8,6 @@ defmodule Pleroma.ScheduledActivityWorker do
   """
 
   alias Pleroma.Config
-  alias Pleroma.Repo
   alias Pleroma.ScheduledActivity
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
@@ -18,8 +17,6 @@ defmodule Pleroma.ScheduledActivityWorker do
 
   @schedule_interval :timer.minutes(1)
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   def start_link(_) do
     GenServer.start_link(__MODULE__, nil)
   end
@@ -49,9 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do
   def handle_info(:perform, state) do
     ScheduledActivity.due_activities(@schedule_interval)
     |> Enum.each(fn scheduled_activity ->
-      %{"op" => "execute", "activity_id" => scheduled_activity.id}
-      |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
-      |> Repo.insert()
+      Pleroma.Workers.ScheduledActivityWorker.enqueue(
+        "execute",
+        %{"activity_id" => scheduled_activity.id}
+      )
     end)
 
     schedule_next()
index abfa063fb2c74d9a725674f01746236963505152..2fe7e1748c678e601503b20c6a87499979758953 100644 (file)
@@ -41,8 +41,6 @@ defmodule Pleroma.User do
   @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
   @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   schema "users" do
     field(:bio, :string)
     field(:email, :string)
@@ -623,9 +621,7 @@ defmodule Pleroma.User do
 
   @doc "Fetch some posts when the user has just been federated with"
   def fetch_initial_posts(user) do
-    %{"op" => "fetch_initial_posts", "user_id" => user.id}
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
   end
 
   @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
@@ -1056,9 +1052,7 @@ defmodule Pleroma.User do
   end
 
   def deactivate_async(user, status \\ true) do
-    %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
   end
 
   def deactivate(%User{} = user, status \\ true) do
@@ -1087,9 +1081,7 @@ defmodule Pleroma.User do
   end
 
   def delete(%User{} = user) do
-    %{"op" => "delete_user", "user_id" => user.id}
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
   end
 
   @spec perform(atom(), User.t()) :: {:ok, User.t()}
@@ -1198,24 +1190,18 @@ defmodule Pleroma.User do
   end
 
   def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
-    %{
-      "op" => "blocks_import",
+    BackgroundWorker.enqueue("blocks_import", %{
       "blocker_id" => blocker.id,
       "blocked_identifiers" => blocked_identifiers
-    }
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    })
   end
 
   def follow_import(%User{} = follower, followed_identifiers)
       when is_list(followed_identifiers) do
-    %{
-      "op" => "follow_import",
+    BackgroundWorker.enqueue("follow_import", %{
       "follower_id" => follower.id,
       "followed_identifiers" => followed_identifiers
-    }
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    })
   end
 
   def delete_user_activities(%User{ap_id: ap_id} = user) do
index 74c5eb91c63a0c6f3f4c88a8f652314cef9b12d9..90b409606906407d02b4d07b9ceaf37e1510e6ff 100644 (file)
@@ -26,8 +26,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   require Logger
   require Pleroma.Constants
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   # For Announce activities, we filter the recipients based on following status for any actors
   # that match actual users.  See issue #164 for more information about why this is necessary.
   defp get_recipients(%{"type" => "Announce"} = data) do
@@ -148,9 +146,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           activity
         end
 
-      %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
-      |> BackgroundWorker.new(worker_args(:background))
-      |> Repo.insert()
+      BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
 
       Notification.create_notifications(activity)
 
index 178321558a4f9b5213a1ee8267792f33915f96a2..26b8539fe43283b652f67512bf4e00a575bf116c 100644 (file)
@@ -7,7 +7,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
   @behaviour Pleroma.Web.ActivityPub.MRF
 
   alias Pleroma.HTTP
-  alias Pleroma.Repo
   alias Pleroma.Web.MediaProxy
   alias Pleroma.Workers.BackgroundWorker
 
@@ -18,8 +17,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
     recv_timeout: 10_000
   ]
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   def perform(:prefetch, url) do
     Logger.info("Prefetching #{inspect(url)}")
 
@@ -34,9 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
         url
         |> Enum.each(fn
           %{"href" => href} ->
-            %{"op" => "media_proxy_prefetch", "url" => href}
-            |> BackgroundWorker.new(worker_args(:background))
-            |> Repo.insert()
+            BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
 
           x ->
             Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -52,9 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
         %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
       )
       when is_list(attachments) and length(attachments) > 0 do
-    %{"op" => "media_proxy_preload", "message" => message}
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
 
     {:ok, message}
   end
index 9437f9a16d034d55de7d00165a72e2c903f82904..f27455e8b6ef76839eaf849990637a3d4d5ba165 100644 (file)
@@ -22,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
   require Logger
   require Pleroma.Constants
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   @doc """
   Modifies an incoming AP object (mastodon format) to our internal format.
   """
@@ -1054,9 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
          already_ap <- User.ap_enabled?(user),
          {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
       unless already_ap do
-        %{"op" => "user_upgrade", "user_id" => user.id}
-        |> TransmogrifierWorker.new(worker_args(:transmogrifier))
-        |> Repo.insert()
+        TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
       end
 
       {:ok, user}
index 8f43066e33ca28c96d182a3c5f6b0dde9cab4570..1a2da014ae054999325a55eaf1c844a12d1210e4 100644 (file)
@@ -18,8 +18,6 @@ defmodule Pleroma.Web.Federator do
 
   require Logger
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   def init do
     # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
     refresh_subscriptions(schedule_in: 60)
@@ -40,15 +38,11 @@ defmodule Pleroma.Web.Federator do
   # Client API
 
   def incoming_doc(doc) do
-    %{"op" => "incoming_doc", "body" => doc}
-    |> ReceiverWorker.new(worker_args(:federator_incoming))
-    |> Pleroma.Repo.insert()
+    ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
   end
 
   def incoming_ap_doc(params) do
-    %{"op" => "incoming_ap_doc", "params" => params}
-    |> ReceiverWorker.new(worker_args(:federator_incoming))
-    |> Pleroma.Repo.insert()
+    ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
   end
 
   def publish(%{id: "pleroma:fakeid"} = activity) do
@@ -56,27 +50,19 @@ defmodule Pleroma.Web.Federator do
   end
 
   def publish(activity) do
-    %{"op" => "publish", "activity_id" => activity.id}
-    |> PublisherWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
   end
 
   def verify_websub(websub) do
-    %{"op" => "verify_websub", "websub_id" => websub.id}
-    |> SubscriberWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
   end
 
   def request_subscription(websub) do
-    %{"op" => "request_subscription", "websub_id" => websub.id}
-    |> SubscriberWorker.new(worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
   end
 
   def refresh_subscriptions(worker_args \\ []) do
-    %{"op" => "refresh_subscriptions"}
-    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
-    |> Pleroma.Repo.insert()
+    SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
   end
 
   # Job Worker Callbacks
index 42be109ab958cc459adf2dc44c48dbe412958cd3..937064638fc72ee9d8d1b21b80ac34656e17fc89 100644 (file)
@@ -31,11 +31,10 @@ defmodule Pleroma.Web.Federator.Publisher do
   """
   @spec enqueue_one(module(), Map.t()) :: :ok
   def enqueue_one(module, %{} = params) do
-    worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing)
-
-    %{"op" => "publish_one", "module" => to_string(module), "params" => params}
-    |> PublisherWorker.new(worker_args)
-    |> Pleroma.Repo.insert()
+    PublisherWorker.enqueue(
+      "publish_one",
+      %{"module" => to_string(module), "params" => params}
+    )
   end
 
   @doc """
index b150a68a70545263c591634726d79af507e4ee41..eb94bf86f6a47a6458666852bd61969ad5a17be5 100644 (file)
@@ -16,12 +16,9 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
               @one_day
             )
 
-  alias Pleroma.Repo
   alias Pleroma.Web.OAuth.Token
   alias Pleroma.Workers.BackgroundWorker
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   def start_link(_), do: GenServer.start_link(__MODULE__, %{})
 
   def init(_) do
@@ -31,9 +28,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
 
   @doc false
   def handle_info(:perform, state) do
-    %{"op" => "clean_expired_tokens"}
-    |> BackgroundWorker.new(worker_args(:background))
-    |> Repo.insert()
+    BackgroundWorker.enqueue("clean_expired_tokens", %{})
 
     Process.send_after(self(), :perform, @interval)
     {:noreply, state}
index 4973b529ceec978a4fcd489a20155c00e100ca71..7ef1532acae45a6add18bbf9820229623d33bb0f 100644 (file)
@@ -3,13 +3,10 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Push do
-  alias Pleroma.Repo
   alias Pleroma.Workers.WebPusherWorker
 
   require Logger
 
-  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
   def init do
     unless enabled() do
       Logger.warn("""
@@ -35,8 +32,6 @@ defmodule Pleroma.Web.Push do
   end
 
   def send(notification) do
-    %{"op" => "web_push", "notification_id" => notification.id}
-    |> WebPusherWorker.new(worker_args(:web_push))
-    |> Repo.insert()
+    WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
   end
 end
index 0b491eabbd629db1d67c09bee05a31b6de401a01..60dd3febab951e15c2d58250ab64b2cefbda4b4c 100644 (file)
@@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ActivityExpirationWorker do
     queue: "activity_expiration",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
   @impl Oban.Worker
   def perform(
         %{
index 7b5575a5fef0feddc9924af70bdbff2f94c4dd87..b9aef3a926073fac9724119e0de4e53b7d260236 100644 (file)
@@ -13,6 +13,8 @@ defmodule Pleroma.Workers.BackgroundWorker do
     queue: "background",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "background"
+
   @impl Oban.Worker
   def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
     user = User.get_cached_by_id(user_id)
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
new file mode 100644 (file)
index 0000000..ca073ce
--- /dev/null
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+  alias Pleroma.User
+
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
+  use Oban.Worker,
+    queue: "digest_emails",
+    max_attempts: 1
+
+  use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+  @impl Oban.Worker
+  def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+    user_id
+    |> User.get_cached_by_id()
+    |> Pleroma.DigestEmailWorker.perform()
+  end
+end
index 4f73d61bc8b219e559c1bf2f5d1cdf3bbdcfe8b4..a4bd54a6cd0467a39f9052d92aa62daaf95bcb0a 100644 (file)
@@ -3,13 +3,13 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Workers.MailerWorker do
-  alias Pleroma.User
-
   # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "mailer",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
   @impl Oban.Worker
   def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
     encoded_email
@@ -17,10 +17,4 @@ defmodule Pleroma.Workers.MailerWorker do
     |> :erlang.binary_to_term()
     |> Pleroma.Emails.Mailer.deliver(config)
   end
-
-  def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
-    user_id
-    |> User.get_cached_by_id()
-    |> Pleroma.DigestEmailWorker.perform()
-  end
 end
index 5671d2a293277a03256fd20a8cd8a7b3cdc36ff3..a3ac22635e540ace4a38f8fd957df3a09381ebc6 100644 (file)
@@ -11,6 +11,8 @@ defmodule Pleroma.Workers.PublisherWorker do
     queue: "federator_outgoing",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
   def backoff(attempt) when is_integer(attempt) do
     Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
   end
index cdce630f2e0f745cc0aff7fdb26e26ec8d99fea1..3cc415ce411379fd0511741064d9b1afb2ac70bd 100644 (file)
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.ReceiverWorker do
     queue: "federator_incoming",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
   @impl Oban.Worker
   def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
     Federator.perform(:incoming_doc, doc)
index 4094411ae37f16206140ca7a19b1bad609ed5b98..936bb64d366d6e8182de61c64fc3226b83c46b87 100644 (file)
@@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
     queue: "scheduled_activities",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
   @impl Oban.Worker
   def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
     Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
index 22d1dc956cd53b6cf40184b85ce5abeae78dae39..4fb994554e05c84d4dbb20cf38c338c89a02b727 100644 (file)
@@ -12,6 +12,8 @@ defmodule Pleroma.Workers.SubscriberWorker do
     queue: "federator_outgoing",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
   @impl Oban.Worker
   def perform(%{"op" => "refresh_subscriptions"}, _job) do
     Federator.perform(:refresh_subscriptions)
index 6f5c1a2f29cd184aa3fa822651d339cd5a1e201e..6fecc2bf946d498b371f094e90496d7a77ee5f31 100644 (file)
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.TransmogrifierWorker do
     queue: "transmogrifier",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
   @impl Oban.Worker
   def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
     user = User.get_cached_by_id(user_id)
index 2b1d3b99a783117b2a0c647cded5b54dd814bfe5..4c2591a5c32dc43f4f4877918f6257be9f510d0f 100644 (file)
@@ -11,6 +11,8 @@ defmodule Pleroma.Workers.WebPusherWorker do
     queue: "web_push",
     max_attempts: 1
 
+  use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
   @impl Oban.Worker
   def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
     notification = Repo.get(Notification, notification_id)
index f9ed2e64dc11f9caa6d3fde6a82aa9cf35431fc6..b12f198d47f5772b9a856448eff129ec61a3c2aa 100644 (file)
@@ -4,6 +4,7 @@
 
 defmodule Pleroma.Workers.WorkerHelper do
   alias Pleroma.Config
+  alias Pleroma.Workers.WorkerHelper
 
   def worker_args(queue) do
     case Config.get([:workers, :retries, queue]) do
@@ -20,4 +21,21 @@ defmodule Pleroma.Workers.WorkerHelper do
 
     trunc(backoff)
   end
+
+  defmacro __using__(opts) do
+    caller_module = __CALLER__.module
+    queue = Keyword.fetch!(opts, :queue)
+
+    quote do
+      def enqueue(op, params, worker_args \\ []) do
+        params = Map.merge(%{"op" => op}, params)
+        queue_atom = String.to_atom(unquote(queue))
+        worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+        unquote(caller_module)
+        |> apply(:new, [params, worker_args])
+        |> Pleroma.Repo.insert()
+      end
+    end
+  end
 end