move old expirations into Oban
authorAlexander Strizhakov <alex.strizhakov@gmail.com>
Tue, 25 Aug 2020 09:30:00 +0000 (12:30 +0300)
committerrinpatch <rinpatch@sdf.org>
Thu, 10 Sep 2020 18:50:41 +0000 (21:50 +0300)
priv/repo/migrations/20200825061316_move_activity_expirations_to_oban.exs [new file with mode: 0644]

diff --git a/priv/repo/migrations/20200825061316_move_activity_expirations_to_oban.exs b/priv/repo/migrations/20200825061316_move_activity_expirations_to_oban.exs
new file mode 100644 (file)
index 0000000..585d1a6
--- /dev/null
@@ -0,0 +1,29 @@
+defmodule Pleroma.Repo.Migrations.MoveActivityExpirationsToOban do
+  use Ecto.Migration
+
+  import Ecto.Query, only: [from: 2]
+
+  def change do
+    Supervisor.start_link([{Oban, Pleroma.Config.get(Oban)}],
+      strategy: :one_for_one,
+      name: Pleroma.Supervisor
+    )
+
+    from(e in "activity_expirations",
+      select: %{id: e.id, activity_id: e.activity_id, scheduled_at: e.scheduled_at}
+    )
+    |> Pleroma.RepoStreamer.chunk_stream(500)
+    |> Stream.each(fn expirations ->
+      Enum.each(expirations, fn expiration ->
+        with {:ok, expires_at} <- DateTime.from_naive(expiration.scheduled_at, "Etc/UTC") do
+          Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+            activity_id: FlakeId.to_string(expiration.activity_id),
+            expires_at: expires_at,
+            validate: false
+          })
+        end
+      end)
+    end)
+    |> Stream.run()
+  end
+end