Heavily inspired by https://git.pleroma.social/pleroma/pleroma/-/merge_requests/3777
Co-authored-by: FloatingGhost <hannah@coffee-and-dreams.uk>
Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/273
federator_incoming: 5,
federator_outgoing: 5,
search_indexing: 2
+ ],
+ timeout: [
+ activity_expiration: :timer.seconds(5),
+ token_expiration: :timer.seconds(5),
+ filter_expiration: :timer.seconds(5),
+ backup: :timer.seconds(900),
+ federator_incoming: :timer.seconds(10),
+ federator_outgoing: :timer.seconds(10),
+ ingestion_queue: :timer.seconds(5),
+ web_push: :timer.seconds(5),
+ mailer: :timer.seconds(5),
+ transmogrifier: :timer.seconds(5),
+ scheduled_activities: :timer.seconds(5),
+ poll_notifications: :timer.seconds(5),
+ background: :timer.seconds(5),
+ remote_fetcher: :timer.seconds(10),
+ attachments_cleanup: :timer.seconds(900),
+ new_users_digest: :timer.seconds(10),
+ mute_expire: :timer.seconds(5),
+ search_indexing: :timer.seconds(5),
+ nodeinfo_fetcher: :timer.seconds(10)
]
config :pleroma, Pleroma.Formatter,
federator_incoming: 5,
federator_outgoing: 5
]
+ },
+ %{
+ key: :timeout,
+ type: {:keyword, :integer},
+ description: "Timeout for jobs, per `Oban` queue, in ms",
+ suggestions: [
+ activity_expiration: :timer.seconds(5),
+ token_expiration: :timer.seconds(5),
+ filter_expiration: :timer.seconds(5),
+ backup: :timer.seconds(900),
+ federator_incoming: :timer.seconds(10),
+ federator_outgoing: :timer.seconds(10),
+ ingestion_queue: :timer.seconds(5),
+ web_push: :timer.seconds(5),
+ mailer: :timer.seconds(5),
+ transmogrifier: :timer.seconds(5),
+ scheduled_activities: :timer.seconds(5),
+ poll_notifications: :timer.seconds(5),
+ background: :timer.seconds(5),
+ remote_fetcher: :timer.seconds(10),
+ attachments_cleanup: :timer.seconds(900),
+ new_users_digest: :timer.seconds(10),
+ mute_expire: :timer.seconds(5),
+ search_indexing: :timer.seconds(5),
+ nodeinfo_fetcher: :timer.seconds(10)
+ ]
}
]
},
|> Oban.insert()
end
+ @impl Oban.Worker
+ def timeout(_job) do
+ Pleroma.Config.get([:workers, :timeout, :backup]) || :timer.minutes(1)
+ end
+
def schedule_deletion(backup) do
days = Pleroma.Config.get([Backup, :purge_after_days])
time = 60 * 60 * 24 * days
|> Oban.insert()
end
+ @impl true
def perform(%Job{
args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
}) do
end
end
+ @impl Oban.Worker
+ def timeout(_job) do
+ Pleroma.Config.get([:workers, :timeout, :activity_expiration]) || :timer.minutes(1)
+ end
+
@impl true
def perform(%Oban.Job{args: %{"activity_id" => id}}) do
with %Activity{} = activity <- find_activity(id),
|> Oban.insert()
end
+ @impl Oban.Worker
+ def timeout(_job) do
+ Pleroma.Config.get([:workers, :timeout, :filter_expiration]) || :timer.minutes(1)
+ end
+
@impl true
def perform(%Job{args: %{"filter_id" => id}}) do
Pleroma.Filter
|> Oban.insert()
end
+ @impl Oban.Worker
+ def timeout(_job) do
+ Pleroma.Config.get([:workers, :timeout, :token_expiration]) || :timer.minutes(1)
+ end
+
@impl true
def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
module
|> apply(:new, [params, worker_args])
|> Oban.insert()
end
+
+ @impl Oban.Worker
+ def timeout(_job) do
+ queue_atom = String.to_atom(unquote(queue))
+ Config.get([:workers, :timeout, queue_atom]) || :timer.minutes(1)
+ end
end
end
end
assert {:error, :activity_not_found} =
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"})
end
+
+ test "has a timeout" do
+ clear_config([:workers, :timeout, :activity_expiration], 50)
+ assert Pleroma.Workers.PurgeExpiredActivity.timeout(%Oban.Job{}) == 50
+ end
end
ScheduledActivityWorker.perform(%Oban.Job{args: %{"activity_id" => 42}})
end) =~ "Couldn't find scheduled activity: 42"
end
+
+ test "has a timeout" do
+ clear_config([:workers, :timeout, :scheduled_activities], :timer.minutes(5))
+ assert ScheduledActivityWorker.timeout(nil) == :timer.minutes(5)
+ end
end