Add configurable timeline per oban job (#273)
authorfloatingghost <hannah@coffee-and-dreams.uk>
Sun, 13 Nov 2022 23:55:51 +0000 (23:55 +0000)
committerfloatingghost <hannah@coffee-and-dreams.uk>
Sun, 13 Nov 2022 23:55:51 +0000 (23:55 +0000)
Heavily inspired by https://git.pleroma.social/pleroma/pleroma/-/merge_requests/3777

Co-authored-by: FloatingGhost <hannah@coffee-and-dreams.uk>
Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/273

config/config.exs
config/description.exs
lib/pleroma/workers/backup_worker.ex
lib/pleroma/workers/purge_expired_activity.ex
lib/pleroma/workers/purge_expired_filter.ex
lib/pleroma/workers/purge_expired_token.ex
lib/pleroma/workers/worker_helper.ex
test/pleroma/workers/purge_expired_activity_test.exs
test/pleroma/workers/scheduled_activity_worker_test.exs

index fd470ed9329545de0d12e33c6e12c4ae463b042d..ba77d8b021a67d6d7a4047a1cef201085ee2896e 100644 (file)
@@ -584,6 +584,27 @@ config :pleroma, :workers,
     federator_incoming: 5,
     federator_outgoing: 5,
     search_indexing: 2
+  ],
+  timeout: [
+    activity_expiration: :timer.seconds(5),
+    token_expiration: :timer.seconds(5),
+    filter_expiration: :timer.seconds(5),
+    backup: :timer.seconds(900),
+    federator_incoming: :timer.seconds(10),
+    federator_outgoing: :timer.seconds(10),
+    ingestion_queue: :timer.seconds(5),
+    web_push: :timer.seconds(5),
+    mailer: :timer.seconds(5),
+    transmogrifier: :timer.seconds(5),
+    scheduled_activities: :timer.seconds(5),
+    poll_notifications: :timer.seconds(5),
+    background: :timer.seconds(5),
+    remote_fetcher: :timer.seconds(10),
+    attachments_cleanup: :timer.seconds(900),
+    new_users_digest: :timer.seconds(10),
+    mute_expire: :timer.seconds(5),
+    search_indexing: :timer.seconds(5),
+    nodeinfo_fetcher: :timer.seconds(10)
   ]
 
 config :pleroma, Pleroma.Formatter,
index 4843c0aaeeb905decdc9d591adb03e92044eecb9..287abb747b8bcfb035b89d70f63574c137d917f1 100644 (file)
@@ -1979,6 +1979,32 @@ config :pleroma, :config_description, [
           federator_incoming: 5,
           federator_outgoing: 5
         ]
+      },
+      %{
+        key: :timeout,
+        type: {:keyword, :integer},
+        description: "Timeout for jobs, per `Oban` queue, in ms",
+        suggestions: [
+          activity_expiration: :timer.seconds(5),
+          token_expiration: :timer.seconds(5),
+          filter_expiration: :timer.seconds(5),
+          backup: :timer.seconds(900),
+          federator_incoming: :timer.seconds(10),
+          federator_outgoing: :timer.seconds(10),
+          ingestion_queue: :timer.seconds(5),
+          web_push: :timer.seconds(5),
+          mailer: :timer.seconds(5),
+          transmogrifier: :timer.seconds(5),
+          scheduled_activities: :timer.seconds(5),
+          poll_notifications: :timer.seconds(5),
+          background: :timer.seconds(5),
+          remote_fetcher: :timer.seconds(10),
+          attachments_cleanup: :timer.seconds(900),
+          new_users_digest: :timer.seconds(10),
+          mute_expire: :timer.seconds(5),
+          search_indexing: :timer.seconds(5),
+          nodeinfo_fetcher: :timer.seconds(10)
+        ]
       }
     ]
   },
index 66c5c35916e9b59da4fe69ca84f43dfa9833ccc3..4ab08706e6a903109715dd49fcc837f6d47a2754 100644 (file)
@@ -14,6 +14,11 @@ defmodule Pleroma.Workers.BackupWorker do
     |> Oban.insert()
   end
 
+  @impl Oban.Worker
+  def timeout(_job) do
+    Pleroma.Config.get([:workers, :timeout, :backup]) || :timer.minutes(1)
+  end
+
   def schedule_deletion(backup) do
     days = Pleroma.Config.get([Backup, :purge_after_days])
     time = 60 * 60 * 24 * days
@@ -30,6 +35,7 @@ defmodule Pleroma.Workers.BackupWorker do
     |> Oban.insert()
   end
 
+  @impl true
   def perform(%Job{
         args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
       }) do
index 027171c1e66fbf7b61eb746b6ba975405f890699..ece84df3735cf9f5740970da1120beb70c1c2232 100644 (file)
@@ -27,6 +27,11 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do
     end
   end
 
+  @impl Oban.Worker
+  def timeout(_job) do
+    Pleroma.Config.get([:workers, :timeout, :activity_expiration]) || :timer.minutes(1)
+  end
+
   @impl true
   def perform(%Oban.Job{args: %{"activity_id" => id}}) do
     with %Activity{} = activity <- find_activity(id),
index 4740d52e91ac99349fdab64d1f27ffaa0f0f996b..dd6a22a689340f21b10cf809e5990d481ca36050 100644 (file)
@@ -24,6 +24,11 @@ defmodule Pleroma.Workers.PurgeExpiredFilter do
     |> Oban.insert()
   end
 
+  @impl Oban.Worker
+  def timeout(_job) do
+    Pleroma.Config.get([:workers, :timeout, :filter_expiration]) || :timer.minutes(1)
+  end
+
   @impl true
   def perform(%Job{args: %{"filter_id" => id}}) do
     Pleroma.Filter
index cfdf5c6dc71c9115baaaffb069a1383d32689351..1773aeff98f1b810873983b3f89b6d1c9736c556 100644 (file)
@@ -19,6 +19,11 @@ defmodule Pleroma.Workers.PurgeExpiredToken do
     |> Oban.insert()
   end
 
+  @impl Oban.Worker
+  def timeout(_job) do
+    Pleroma.Config.get([:workers, :timeout, :token_expiration]) || :timer.minutes(1)
+  end
+
   @impl true
   def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
     module
index 4befbeb3b2be34ea2073443fb1b94cc6a2cd1079..97c0e4e5c73e426ced7908c16e0dc9853d63e0e1 100644 (file)
@@ -43,6 +43,12 @@ defmodule Pleroma.Workers.WorkerHelper do
         |> apply(:new, [params, worker_args])
         |> Oban.insert()
       end
+
+      @impl Oban.Worker
+      def timeout(_job) do
+        queue_atom = String.to_atom(unquote(queue))
+        Config.get([:workers, :timeout, queue_atom]) || :timer.minutes(1)
+      end
     end
   end
 end
index 98f30f61f50cedda4bcbb39d44167f8e1cae5389..6285dca3ebefb3be6ff2692001b0db34017cdcc1 100644 (file)
@@ -56,4 +56,9 @@ defmodule Pleroma.Workers.PurgeExpiredActivityTest do
     assert {:error, :activity_not_found} =
              perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"})
   end
+
+  test "has a timeout" do
+    clear_config([:workers, :timeout, :activity_expiration], 50)
+    assert Pleroma.Workers.PurgeExpiredActivity.timeout(%Oban.Job{}) == 50
+  end
 end
index 5558d5b5f9a789d14e1cf0557b09952bfe62d243..9f5f1b687f63fc99803a28d5c2cfbc2916ee5f40 100644 (file)
@@ -49,4 +49,9 @@ defmodule Pleroma.Workers.ScheduledActivityWorkerTest do
              ScheduledActivityWorker.perform(%Oban.Job{args: %{"activity_id" => 42}})
            end) =~ "Couldn't find scheduled activity: 42"
   end
+
+  test "has a timeout" do
+    clear_config([:workers, :timeout, :scheduled_activities], :timer.minutes(5))
+    assert ScheduledActivityWorker.timeout(nil) == :timer.minutes(5)
+  end
 end