from(e in "activity_expirations",
select: %{id: e.id, activity_id: e.activity_id, scheduled_at: e.scheduled_at}
)
- |> Pleroma.RepoStreamer.chunk_stream(500)
- |> Stream.each(fn expirations ->
- Enum.each(expirations, fn expiration ->
- with {:ok, expires_at} <- DateTime.from_naive(expiration.scheduled_at, "Etc/UTC") do
- Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
- activity_id: FlakeId.to_string(expiration.activity_id),
- expires_at: expires_at,
- validate: false
- })
- end
- end)
+ |> Pleroma.Repo.stream()
+ |> Enum.each(fn expiration ->
+ with {:ok, expires_at} <- DateTime.from_naive(expiration.scheduled_at, "Etc/UTC") do
+ Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+ activity_id: FlakeId.to_string(expiration.activity_id),
+ expires_at: expires_at,
+ validate: false
+ })
+ end
end)
|> Stream.run()
end