X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Frepo.ex;h=6d85d70bc66c9ac6de1d0266c4937572402c2e95;hb=7e488cd4a7ac038dfe8a1f5b204cb134bb5ba549;hp=cb0b6653cd465bb361201ebd0dfb0d0f475f9b2f;hpb=503d966e9f7d4c41a1bfbd215b375443a858564a;p=akkoma
diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex
index cb0b6653c..6d85d70bc 100644
--- a/lib/pleroma/repo.ex
+++ b/lib/pleroma/repo.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
+# Copyright © 2017-2020 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Repo do
@@ -8,6 +8,7 @@ defmodule Pleroma.Repo do
adapter: Ecto.Adapters.Postgres,
migration_timestamps: [type: :naive_datetime_usec]
+ import Ecto.Query
require Logger
defmodule Instrumenter do
@@ -78,6 +79,33 @@ defmodule Pleroma.Repo do
:ok
end
end
+
+ def chunk_stream(query, chunk_size) do
+ # We don't actually need start and end funcitons of resource streaming,
+ # but it seems to be the only way to not fetch records one-by-one and
+ # have individual records be the elements of the stream, instead of
+ # lists of records
+ Stream.resource(
+ fn -> 0 end,
+ fn
+ last_id ->
+ query
+ |> order_by(asc: :id)
+ |> where([r], r.id > ^last_id)
+ |> limit(^chunk_size)
+ |> all()
+ |> case do
+ [] ->
+ {:halt, last_id}
+
+ records ->
+ last_id = List.last(records).id
+ {records, last_id}
+ end
+ end,
+ fn _ -> :ok end
+ )
+ end
end
defmodule Pleroma.Repo.UnappliedMigrationsError do