schedule activity expiration in Oban
[akkoma] / lib / pleroma / workers / purge_expired_activity.ex
1 defmodule Pleroma.Workers.PurgeExpiredActivity do
2 @moduledoc """
3 Worker which purges expired activity.
4 """
5
6 use Oban.Worker, queue: :activity_expiration, max_attempts: 1
7
8 import Ecto.Query
9
10 def enqueue(args) do
11 with true <- enabled?(),
12 args when is_map(args) <- validate_expires_at(args) do
13 {scheduled_at, args} = Map.pop(args, :expires_at)
14
15 args
16 |> __MODULE__.new(scheduled_at: scheduled_at)
17 |> Oban.insert()
18 end
19 end
20
21 @impl true
22 def perform(%Oban.Job{args: %{"activity_id" => id}}) do
23 with %Pleroma.Activity{} = activity <- find_activity(id),
24 %Pleroma.User{} = user <- find_user(activity.object.data["actor"]) do
25 Pleroma.Web.CommonAPI.delete(activity.id, user)
26 end
27 end
28
29 defp enabled? do
30 with false <- Pleroma.Config.get([__MODULE__, :enabled], false) do
31 {:error, :expired_activities_disabled}
32 end
33 end
34
35 defp validate_expires_at(%{validate: false} = args), do: Map.delete(args, :validate)
36
37 defp validate_expires_at(args) do
38 if expires_late_enough?(args[:expires_at]) do
39 args
40 else
41 {:error, :expiration_too_close}
42 end
43 end
44
45 defp find_activity(id) do
46 with nil <- Pleroma.Activity.get_by_id_with_object(id) do
47 {:error, :activity_not_found}
48 end
49 end
50
51 defp find_user(ap_id) do
52 with nil <- Pleroma.User.get_by_ap_id(ap_id) do
53 {:error, :user_not_found}
54 end
55 end
56
57 def get_expiration(id) do
58 from(j in Oban.Job,
59 where: j.state == "scheduled",
60 where: j.queue == "activity_expiration",
61 where: fragment("?->>'activity_id' = ?", j.args, ^id)
62 )
63 |> Pleroma.Repo.one()
64 end
65
66 @spec expires_late_enough?(DateTime.t()) :: boolean()
67 def expires_late_enough?(scheduled_at) do
68 now = DateTime.utc_now()
69 diff = DateTime.diff(scheduled_at, now, :millisecond)
70 diff > :timer.hours(1)
71 end
72 end