[#1149] Addressed code review comments (code style, jobs pruning etc.).
authorIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 31 Aug 2019 16:08:56 +0000 (19:08 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 31 Aug 2019 16:08:56 +0000 (19:08 +0300)
34 files changed:
CHANGELOG.md
config/config.exs
config/test.exs
docs/config.md
lib/pleroma/activity_expiration_worker.ex
lib/pleroma/application.ex
lib/pleroma/digest_email_worker.ex
lib/pleroma/emails/mailer.ex
lib/pleroma/scheduled_activity_worker.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
lib/pleroma/web/activity_pub/publisher.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/publisher.ex
lib/pleroma/web/oauth/token/clean_worker.ex
lib/pleroma/web/push/push.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/workers/activity_expiration_worker.ex [new file with mode: 0644]
lib/pleroma/workers/background_worker.ex
lib/pleroma/workers/helper.ex [deleted file]
lib/pleroma/workers/mailer_worker.ex [moved from lib/pleroma/workers/mailer.ex with 58% similarity]
lib/pleroma/workers/publisher_worker.ex [moved from lib/pleroma/workers/publisher.ex with 76% similarity]
lib/pleroma/workers/receiver_worker.ex [moved from lib/pleroma/workers/receiver.ex with 83% similarity]
lib/pleroma/workers/scheduled_activity_worker.ex
lib/pleroma/workers/subscriber_worker.ex [moved from lib/pleroma/workers/subscriber.ex with 88% similarity]
lib/pleroma/workers/transmogrifier_worker.ex [moved from lib/pleroma/workers/transmogrifier.ex with 73% similarity]
lib/pleroma/workers/web_pusher_worker.ex [moved from lib/pleroma/workers/web_pusher.ex with 82% similarity]
lib/pleroma/workers/worker_helper.ex [new file with mode: 0644]
test/user_test.exs
test/web/activity_pub/activity_pub_controller_test.exs
test/web/federator_test.exs
test/web/websub/websub_test.exs

index 8b73c783fd2ce757a3f6bc59deaf39abb7950999..c9d6fef17bb412a6f284e5fcc0ffa0a6d31a7953 100644 (file)
@@ -19,7 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - Mastodon API: Unsubscribe followers when they unfollow a user
 - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses)
 - Improve digest email template
-- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban)
+- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) and `Pleroma.Web.Federator.RetryQueue` with [Oban](https://github.com/sorentwo/oban) (see [`docs/config.md`](docs/config.md) on migrating customized worker / retry settings).
 - Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler
 
 ### Fixed
index da89aa3e956074efb02ec43c64161788b08997cc..6fb4a09695524cdf1e522e01f05c30fdc5ccc390 100644 (file)
@@ -470,7 +470,7 @@ config :pleroma, Pleroma.User,
 config :pleroma, Oban,
   repo: Pleroma.Repo,
   verbose: false,
-  prune: {:maxage, 60 * 60 * 24 * 7},
+  prune: {:maxlen, 1500},
   queues: [
     activity_expiration: 10,
     federator_incoming: 50,
index 0ef809ac17ce801f5c02add520fd265b25dd74c9..df512b5d7c59f5422173d6c07dd1de4b67b3096e 100644 (file)
@@ -65,6 +65,8 @@ config :pleroma, Oban,
   queues: false,
   prune: :disabled
 
+config :pleroma, Pleroma.Scheduler, jobs: []
+
 config :pleroma, Pleroma.ScheduledActivity,
   daily_user_limit: 2,
   total_user_limit: 3,
index 2e351e27201b450bc994b393882d030a5ea51ac7..29a4d4c9722f9c85e370e53e6c7ef94ac3db6ef3 100644 (file)
@@ -404,20 +404,29 @@ curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerando
 
 [Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration.
 
+Configuration options described in [Oban readme](https://github.com/sorentwo/oban#usage):
+* `repo` - app's Ecto repo (`Pleroma.Repo`)
+* `verbose` - logs verbosity
+* `prune` - non-retryable jobs [pruning settings](https://github.com/sorentwo/oban#pruning) (`:disabled` / `{:maxlen, value}` / `{:maxage, value}`)
+* `queues` - job queues (see below)
+
 Pleroma has the following queues:
 
+* `activity_expiration` - Activity expiration
 * `federator_outgoing` - Outgoing federation
 * `federator_incoming` - Incoming federation
-* `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleroma-emails-mailer)
+* `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleromaemailsmailer)
 * `transmogrifier` - Transmogrifier
 * `web_push` - Web push notifications
-* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity)
+* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivity`](#pleromascheduledactivity)
 
 Example:
 
 ```elixir
 config :pleroma, Oban,
   repo: Pleroma.Repo,
+  verbose: false,
+  prune: {:maxlen, 1500},
   queues: [
     federator_incoming: 50,
     federator_outgoing: 50
@@ -426,12 +435,37 @@ config :pleroma, Oban,
 
 This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`.
 
+### Migrating `pleroma_job_queue` settings
+
+`config :pleroma_job_queue, :queues` is replaced by `config :pleroma, Oban, :queues` and uses the same format (keys are queues' names, values are max concurrent jobs numbers).
+
+### Note on running with PostgreSQL in silent mode
+
+If you are running PostgreSQL in [`silent_mode`](https://postgresqlco.nf/en/doc/param/silent_mode?version=9.1), it's advised to set [`log_destination`](https://postgresqlco.nf/en/doc/param/log_destination?version=9.1) to `syslog`, 
+otherwise `postmaster.log` file may grow because of "you don't own a lock of type ShareLock" warnings (see https://github.com/sorentwo/oban/issues/52). 
+
 ## :workers
 
 Includes custom worker options not interpretable directly by `Oban`.
 
 * `retries` — keyword lists where keys are `Oban` queues (see above) and values are numbers of max attempts for failed jobs.
 
+Example:
+
+```elixir
+config :pleroma, :workers,
+  retries: [
+    federator_incoming: 5,
+    federator_outgoing: 5
+  ]
+```
+
+### Migrating `Pleroma.Web.Federator.RetryQueue` settings
+
+* `max_retries` is replaced with `config :pleroma, :workers, retries: [federator_outgoing: 5]`
+* `enabled: false` corresponds to `config :pleroma, :workers, retries: [federator_outgoing: 1]`
+* deprecated options: `max_jobs`, `initial_timeout`
+
 ## Pleroma.Web.Metadata
 * `providers`: a list of metadata providers to enable. Providers available:
   * Pleroma.Web.Metadata.Providers.OpenGraph
@@ -491,6 +525,24 @@ config :auto_linker,
   ]
 ```
 
+## Pleroma.Scheduler
+
+Configuration for [Quantum](https://github.com/quantum-elixir/quantum-core) jobs scheduler.
+
+See [Quantum readme](https://github.com/quantum-elixir/quantum-core#usage) for the list of supported options. 
+
+Example:
+
+```elixir
+config :pleroma, Pleroma.Scheduler,
+  global: true,
+  overlap: true,
+  timezone: :utc,
+  jobs: [{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}]
+```
+
+The above example defines a single job which invokes `Pleroma.Web.Websub.refresh_subscriptions()` every 6 hours ("0 */6 * * * *", [crontab format](https://en.wikipedia.org/wiki/Cron)).
+
 ## Pleroma.ScheduledActivity
 
 * `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)
index 5c0c532325a670098fbe92a3c2b1febe31ba88b2..7aba7eeceee37d918897d0716734a1e6f6e71c84 100644 (file)
@@ -9,13 +9,13 @@ defmodule Pleroma.ActivityExpirationWorker do
   alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
-  alias Pleroma.Workers.BackgroundWorker
+  alias Pleroma.Workers.ActivityExpirationWorker
 
   require Logger
   use GenServer
   import Ecto.Query
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   @schedule_interval :timer.minutes(1)
 
@@ -57,7 +57,7 @@ defmodule Pleroma.ActivityExpirationWorker do
         "op" => "activity_expiration",
         "activity_expiration_id" => expiration.id
       }
-      |> BackgroundWorker.new(worker_args(:activity_expiration))
+      |> ActivityExpirationWorker.new(worker_args(:activity_expiration))
       |> Repo.insert()
     end)
 
index 7d38ed5c4c1c247fafbe46a7e27ea9876432a733..f8f866dbde63e04e3ca996acad1339a861f2ec68 100644 (file)
@@ -43,7 +43,7 @@ defmodule Pleroma.Application do
         hackney_pool_children() ++
         [
           Pleroma.Stats,
-          {Oban, Application.get_env(:pleroma, Oban)},
+          {Oban, Pleroma.Config.get(Oban)},
           %{
             id: :web_push_init,
             start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
index ffc48bfabd93c16318b1038d60f960671a906ba7..4ab2a4ef4f8cb781da9e8222b76cc45fc581bc6d 100644 (file)
@@ -4,11 +4,11 @@
 
 defmodule Pleroma.DigestEmailWorker do
   alias Pleroma.Repo
-  alias Pleroma.Workers.Mailer, as: MailerWorker
+  alias Pleroma.Workers.MailerWorker
 
   import Ecto.Query
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   def perform do
     config = Pleroma.Config.get([:email_notifications, :digest])
index bb534f6026cd31860e490d3d86f9bff7447cb398..9cbe7313c05c2912508d9976001e611bc1cd24cf 100644 (file)
@@ -10,7 +10,7 @@ defmodule Pleroma.Emails.Mailer do
   """
 
   alias Pleroma.Repo
-  alias Pleroma.Workers.Mailer, as: MailerWorker
+  alias Pleroma.Workers.MailerWorker
   alias Swoosh.DeliveryError
 
   @otp_app :pleroma
@@ -19,7 +19,7 @@ defmodule Pleroma.Emails.Mailer do
   @spec enabled?() :: boolean()
   def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   @doc "add email to queue"
   def deliver_async(email, config \\ []) do
index a01fb4fcb8db6a977bf59b1b1efa6b14ba364fed..8bf534f4290b8a16bdb7725b03fe2ffd4350dfe2 100644 (file)
@@ -18,7 +18,7 @@ defmodule Pleroma.ScheduledActivityWorker do
 
   @schedule_interval :timer.minutes(1)
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   def start_link(_) do
     GenServer.start_link(__MODULE__, nil)
index 18bba0fbbd765bae3ecfc4fb58131aa0b8a14b9f..abfa063fb2c74d9a725674f01746236963505152 100644 (file)
@@ -41,7 +41,7 @@ defmodule Pleroma.User do
   @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
   @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   schema "users" do
     field(:bio, :string)
index 50279cca5350bfaad0947a38c0536df8880eb0c8..74c5eb91c63a0c6f3f4c88a8f652314cef9b12d9 100644 (file)
@@ -26,7 +26,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   require Logger
   require Pleroma.Constants
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   # For Announce activities, we filter the recipients based on following status for any actors
   # that match actual users.  See issue #164 for more information about why this is necessary.
index b188164ee36cd6cad90f66f3f9c8379ed5cf0a7c..178321558a4f9b5213a1ee8267792f33915f96a2 100644 (file)
@@ -18,7 +18,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
     recv_timeout: 10_000
   ]
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   def perform(:prefetch, url) do
     Logger.info("Prefetching #{inspect(url)}")
index 24d101dc8d04e3a8d9427a8ef2977ef2f5dba922..a6322e25a075624eb2669956e7e0b1e193807407 100644 (file)
@@ -85,7 +85,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
   end
 
   def publish_one(%{actor_id: actor_id} = params) do
-    actor = User.get_by_id(actor_id)
+    actor = User.get_cached_by_id(actor_id)
 
     params
     |> Map.delete(:actor_id)
index b068d28a775b657b679b63458b455d883b4a1c9f..9437f9a16d034d55de7d00165a72e2c903f82904 100644 (file)
@@ -15,14 +15,14 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.ActivityPub.Visibility
   alias Pleroma.Web.Federator
-  alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker
+  alias Pleroma.Workers.TransmogrifierWorker
 
   import Ecto.Query
 
   require Logger
   require Pleroma.Constants
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   @doc """
   Modifies an incoming AP object (mastodon format) to our internal format.
index cf7e50fee28bd61829473a5671a992019083a19c..8f43066e33ca28c96d182a3c5f6b0dde9cab4570 100644 (file)
@@ -12,13 +12,13 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.Web.Federator.Publisher
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub
-  alias Pleroma.Workers.Publisher, as: PublisherWorker
-  alias Pleroma.Workers.Receiver, as: ReceiverWorker
-  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+  alias Pleroma.Workers.PublisherWorker
+  alias Pleroma.Workers.ReceiverWorker
+  alias Pleroma.Workers.SubscriberWorker
 
   require Logger
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   def init do
     # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
index 05d2be61558381b3fe3cb54af62958a3ebb6ea86..42be109ab958cc459adf2dc44c48dbe412958cd3 100644 (file)
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
   alias Pleroma.Activity
   alias Pleroma.Config
   alias Pleroma.User
-  alias Pleroma.Workers.Publisher, as: PublisherWorker
+  alias Pleroma.Workers.PublisherWorker
 
   require Logger
 
@@ -31,12 +31,7 @@ defmodule Pleroma.Web.Federator.Publisher do
   """
   @spec enqueue_one(module(), Map.t()) :: :ok
   def enqueue_one(module, %{} = params) do
-    worker_args =
-      if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
-        [max_attempts: max_attempts]
-      else
-        []
-      end
+    worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing)
 
     %{"op" => "publish_one", "module" => to_string(module), "params" => params}
     |> PublisherWorker.new(worker_args)
index 943e732891be94e6a3535a426fbcd7c28670bb17..b150a68a70545263c591634726d79af507e4ee41 100644 (file)
@@ -20,7 +20,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
   alias Pleroma.Web.OAuth.Token
   alias Pleroma.Workers.BackgroundWorker
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   def start_link(_), do: GenServer.start_link(__MODULE__, %{})
 
index b4f0e5127e37518efb603fde9dd1a307017aed65..4973b529ceec978a4fcd489a20155c00e100ca71 100644 (file)
@@ -4,11 +4,11 @@
 
 defmodule Pleroma.Web.Push do
   alias Pleroma.Repo
-  alias Pleroma.Workers.WebPusher
+  alias Pleroma.Workers.WebPusherWorker
 
   require Logger
 
-  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+  import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
 
   def init do
     unless enabled() do
@@ -36,7 +36,7 @@ defmodule Pleroma.Web.Push do
 
   def send(notification) do
     %{"op" => "web_push", "notification_id" => notification.id}
-    |> WebPusher.new(worker_args(:web_push))
+    |> WebPusherWorker.new(worker_args(:web_push))
     |> Repo.insert()
   end
 end
index bbaa293fd3105a4b0f3e617ffa45c33190528a79..8ba7380c0902b9df9a30a72f1efe5ee02cf64532 100644 (file)
@@ -171,7 +171,7 @@ defmodule Pleroma.Web.Salmon do
   end
 
   def publish_one(%{recipient_id: recipient_id} = params) do
-    recipient = User.get_by_id(recipient_id)
+    recipient = User.get_cached_by_id(recipient_id)
 
     params
     |> Map.delete(:recipient_id)
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
new file mode 100644 (file)
index 0000000..0b491ea
--- /dev/null
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ActivityExpirationWorker do
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
+  use Oban.Worker,
+    queue: "activity_expiration",
+    max_attempts: 1
+
+  @impl Oban.Worker
+  def perform(
+        %{
+          "op" => "activity_expiration",
+          "activity_expiration_id" => activity_expiration_id
+        },
+        _job
+      ) do
+    Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
+  end
+end
index fbce7d789242e79d2f56c19bd6cc07744d9ea6d0..7b5575a5fef0feddc9924af70bdbff2f94c4dd87 100644 (file)
@@ -8,24 +8,24 @@ defmodule Pleroma.Workers.BackgroundWorker do
   alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
   alias Pleroma.Web.OAuth.Token.CleanWorker
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "background",
     max_attempts: 1
 
   @impl Oban.Worker
   def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
-    user = User.get_by_id(user_id)
+    user = User.get_cached_by_id(user_id)
     User.perform(:fetch_initial_posts, user)
   end
 
   def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
-    user = User.get_by_id(user_id)
+    user = User.get_cached_by_id(user_id)
     User.perform(:deactivate_async, user, status)
   end
 
   def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
-    user = User.get_by_id(user_id)
+    user = User.get_cached_by_id(user_id)
     User.perform(:delete, user)
   end
 
@@ -37,7 +37,7 @@ defmodule Pleroma.Workers.BackgroundWorker do
         },
         _job
       ) do
-    blocker = User.get_by_id(blocker_id)
+    blocker = User.get_cached_by_id(blocker_id)
     User.perform(:blocks_import, blocker, blocked_identifiers)
   end
 
@@ -49,7 +49,7 @@ defmodule Pleroma.Workers.BackgroundWorker do
         },
         _job
       ) do
-    follower = User.get_by_id(follower_id)
+    follower = User.get_cached_by_id(follower_id)
     User.perform(:follow_import, follower, followed_identifiers)
   end
 
@@ -69,11 +69,4 @@ defmodule Pleroma.Workers.BackgroundWorker do
     activity = Activity.get_by_id(activity_id)
     Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
   end
-
-  def perform(
-        %{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id},
-        _job
-      ) do
-    Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
-  end
 end
diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex
deleted file mode 100644 (file)
index 3286ce0..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Helper do
-  def worker_args(queue) do
-    if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
-      [max_attempts: max_attempts]
-    else
-      []
-    end
-  end
-end
similarity index 58%
rename from lib/pleroma/workers/mailer.ex
rename to lib/pleroma/workers/mailer_worker.ex
index 1cce2ea03f2d74309c8fca3b049c065fd197aeb8..4f73d61bc8b219e559c1bf2f5d1cdf3bbdcfe8b4 100644 (file)
@@ -2,26 +2,25 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.Mailer do
+defmodule Pleroma.Workers.MailerWorker do
   alias Pleroma.User
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "mailer",
     max_attempts: 1
 
   @impl Oban.Worker
   def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
-    email =
-      encoded_email
-      |> Base.decode64!()
-      |> :erlang.binary_to_term()
-
-    Pleroma.Emails.Mailer.deliver(email, config)
+    encoded_email
+    |> Base.decode64!()
+    |> :erlang.binary_to_term()
+    |> Pleroma.Emails.Mailer.deliver(config)
   end
 
   def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
-    user = User.get_by_id(user_id)
-    Pleroma.DigestEmailWorker.perform(user)
+    user_id
+    |> User.get_cached_by_id()
+    |> Pleroma.DigestEmailWorker.perform()
   end
 end
similarity index 76%
rename from lib/pleroma/workers/publisher.ex
rename to lib/pleroma/workers/publisher_worker.ex
index 00fae99c723d949e7d32f212b9282c74ffd60494..5671d2a293277a03256fd20a8cd8a7b3cdc36ff3 100644 (file)
@@ -2,15 +2,19 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.Publisher do
+defmodule Pleroma.Workers.PublisherWorker do
   alias Pleroma.Activity
   alias Pleroma.Web.Federator
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "federator_outgoing",
     max_attempts: 1
 
+  def backoff(attempt) when is_integer(attempt) do
+    Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
+  end
+
   @impl Oban.Worker
   def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
     activity = Activity.get_by_id(activity_id)
similarity index 83%
rename from lib/pleroma/workers/receiver.ex
rename to lib/pleroma/workers/receiver_worker.ex
index 4ee270d7411cf2cec8cecda7797a514a9a06e12c..cdce630f2e0f745cc0aff7fdb26e26ec8d99fea1 100644 (file)
@@ -2,10 +2,10 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.Receiver do
+defmodule Pleroma.Workers.ReceiverWorker do
   alias Pleroma.Web.Federator
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "federator_incoming",
     max_attempts: 1
index d9724c78af6bc93dfd5bc4948439a6d041acc500..4094411ae37f16206140ca7a19b1bad609ed5b98 100644 (file)
@@ -3,7 +3,7 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Workers.ScheduledActivityWorker do
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "scheduled_activities",
     max_attempts: 1
similarity index 88%
rename from lib/pleroma/workers/subscriber.ex
rename to lib/pleroma/workers/subscriber_worker.ex
index e960b35bf655292796635966c10ca6ce4275f55a..22d1dc956cd53b6cf40184b85ce5abeae78dae39 100644 (file)
@@ -2,12 +2,12 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.Subscriber do
+defmodule Pleroma.Workers.SubscriberWorker do
   alias Pleroma.Repo
   alias Pleroma.Web.Federator
   alias Pleroma.Web.Websub
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "federator_outgoing",
     max_attempts: 1
similarity index 73%
rename from lib/pleroma/workers/transmogrifier.ex
rename to lib/pleroma/workers/transmogrifier_worker.ex
index e13202c061a559c09c3aa3df38dda69ddc721052..6f5c1a2f29cd184aa3fa822651d339cd5a1e201e 100644 (file)
@@ -2,17 +2,17 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.Transmogrifier do
+defmodule Pleroma.Workers.TransmogrifierWorker do
   alias Pleroma.User
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "transmogrifier",
     max_attempts: 1
 
   @impl Oban.Worker
   def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
-    user = User.get_by_id(user_id)
+    user = User.get_cached_by_id(user_id)
     Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
   end
 end
similarity index 82%
rename from lib/pleroma/workers/web_pusher.ex
rename to lib/pleroma/workers/web_pusher_worker.ex
index 7b78bb3ea153dd51b2bd697fe560958d22df7a39..2b1d3b99a783117b2a0c647cded5b54dd814bfe5 100644 (file)
@@ -2,11 +2,11 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.WebPusher do
+defmodule Pleroma.Workers.WebPusherWorker do
   alias Pleroma.Notification
   alias Pleroma.Repo
 
-  # Note: `max_attempts` is intended to be overridden in `new/1` call
+  # Note: `max_attempts` is intended to be overridden in `new/2` call
   use Oban.Worker,
     queue: "web_push",
     max_attempts: 1
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
new file mode 100644 (file)
index 0000000..f9ed2e6
--- /dev/null
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WorkerHelper do
+  alias Pleroma.Config
+
+  def worker_args(queue) do
+    case Config.get([:workers, :retries, queue]) do
+      nil -> []
+      max_attempts -> [max_attempts: max_attempts]
+    end
+  end
+
+  def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
+    backoff =
+      :math.pow(attempt, pow) +
+        base_backoff +
+        :rand.uniform(2 * base_backoff) * attempt
+
+    trunc(backoff)
+  end
+end
index 86232de998ed89624368f44daa13b4d7d2f285fb..0acd0db4e69c06f2541a4b65095401edf7e1ab0b 100644 (file)
@@ -1123,7 +1123,7 @@ defmodule Pleroma.UserTest do
                    "id" => "pleroma:fakeid"
                  }
                },
-               all_enqueued(worker: Pleroma.Workers.Publisher)
+               all_enqueued(worker: Pleroma.Workers.PublisherWorker)
              )
     end
   end
index a1b567a464cba26c10066eb37769093b268d3923..f1c1bb5039a927af971812edac7c4108f8029d05 100644 (file)
@@ -17,7 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
   alias Pleroma.Web.ActivityPub.UserView
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.CommonAPI
-  alias Pleroma.Workers.Receiver, as: ReceiverWorker
+  alias Pleroma.Workers.ReceiverWorker
 
   setup_all do
     Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
index 5724672fdcc6933989b7723a1394b003bfe532d6..4096d46908283873a9a2b0274ecf57ef1ab9810f 100644 (file)
@@ -7,7 +7,7 @@ defmodule Pleroma.Web.FederatorTest do
   alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Federator
-  alias Pleroma.Workers.Publisher, as: PublisherWorker
+  alias Pleroma.Workers.PublisherWorker
 
   use Pleroma.DataCase
   use Oban.Testing, repo: Pleroma.Repo
index 414610879629263e41d3320fc010b10896fa8bac..929acf5a2a57fd5dc82e64ce4aedc324ded6dbf1 100644 (file)
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.WebsubTest do
   alias Pleroma.Web.Websub
   alias Pleroma.Web.Websub.WebsubClientSubscription
   alias Pleroma.Web.Websub.WebsubServerSubscription
-  alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+  alias Pleroma.Workers.SubscriberWorker
 
   import Pleroma.Factory
   import Tesla.Mock