fix and delete purge activities duplicates
authorAlexander Strizhakov <alex.strizhakov@gmail.com>
Thu, 28 Jan 2021 13:50:21 +0000 (16:50 +0300)
committerAlexander Strizhakov <alex.strizhakov@gmail.com>
Thu, 28 Jan 2021 13:57:41 +0000 (16:57 +0300)
lib/pleroma/workers/purge_expired_activity.ex
priv/repo/migrations/20210128092834_remove_duplicates_from_activity_expiration_queue.exs [new file with mode: 0644]

index 01256831bc6490a93f18475bfc5b114db234fecc..027171c1e66fbf7b61eb746b6ba975405f890699 100644 (file)
@@ -7,7 +7,7 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do
   Worker which purges expired activity.
   """
 
-  use Oban.Worker, queue: :activity_expiration, max_attempts: 1
+  use Oban.Worker, queue: :activity_expiration, max_attempts: 1, unique: [period: :infinity]
 
   import Ecto.Query
 
diff --git a/priv/repo/migrations/20210128092834_remove_duplicates_from_activity_expiration_queue.exs b/priv/repo/migrations/20210128092834_remove_duplicates_from_activity_expiration_queue.exs
new file mode 100644 (file)
index 0000000..3090092
--- /dev/null
@@ -0,0 +1,29 @@
+defmodule Pleroma.Repo.Migrations.RemoveDuplicatesFromActivityExpirationQueue do
+  use Ecto.Migration
+
+  import Ecto.Query, only: [from: 2]
+
+  def up do
+    duplicate_ids =
+      from(j in Oban.Job,
+        where: j.queue == "activity_expiration",
+        where: j.worker == "Pleroma.Workers.PurgeExpiredActivity",
+        where: j.state == "scheduled",
+        select:
+          {fragment("(?)->>'activity_id'", j.args), fragment("array_agg(?)", j.id), count(j.id)},
+        group_by: fragment("(?)->>'activity_id'", j.args),
+        having: count(j.id) > 1
+      )
+      |> Pleroma.Repo.all()
+      |> Enum.map(fn {_, ids, _} ->
+        max_id = Enum.max(ids)
+        List.delete(ids, max_id)
+      end)
+      |> List.flatten()
+
+    from(j in Oban.Job, where: j.id in ^duplicate_ids)
+    |> Pleroma.Repo.delete_all()
+  end
+
+  def down, do: :noop
+end