RepoStreamer.chunk_stream -> Repo.chunk_stream
authorMaksim Pechnikov <parallel588@gmail.com>
Wed, 16 Sep 2020 06:47:18 +0000 (09:47 +0300)
committerMaksim Pechnikov <parallel588@gmail.com>
Wed, 16 Sep 2020 06:47:18 +0000 (09:47 +0300)
lib/mix/tasks/pleroma/database.ex
lib/mix/tasks/pleroma/user.ex
lib/pleroma/repo.ex
lib/pleroma/repo_streamer.ex [deleted file]
lib/pleroma/user.ex
test/repo_test.exs

index 7f1108dcfa87cee488266a2cc1343be7824ece75..a01c36ece30754ada443c22dffc5e3d29b9f73cf 100644 (file)
@@ -99,7 +99,7 @@ defmodule Mix.Tasks.Pleroma.Database do
       where: fragment("(?)->>'likes' is not null", object.data),
       select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
     )
-    |> Pleroma.RepoStreamer.chunk_stream(100)
+    |> Pleroma.Repo.chunk_stream(100, :batches)
     |> Stream.each(fn objects ->
       ids =
         objects
@@ -145,7 +145,7 @@ defmodule Mix.Tasks.Pleroma.Database do
     |> where(local: true)
     |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
     |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
-    |> Pleroma.RepoStreamer.chunk_stream(100)
+    |> Pleroma.Repo.chunk_stream(100, :batches)
     |> Stream.each(fn activities ->
       Enum.each(activities, fn activity ->
         expires_at =
index 01824aa18b0f45dda65716d6189856f3394f63c1..b20c49d89538db746214e7b6b17256995d3879c8 100644 (file)
@@ -179,7 +179,7 @@ defmodule Mix.Tasks.Pleroma.User do
     start_pleroma()
 
     Pleroma.User.Query.build(%{nickname: "@#{instance}"})
-    |> Pleroma.RepoStreamer.chunk_stream(500)
+    |> Pleroma.Repo.chunk_stream(500, :batches)
     |> Stream.each(fn users ->
       users
       |> Enum.each(fn user ->
@@ -370,7 +370,7 @@ defmodule Mix.Tasks.Pleroma.User do
     start_pleroma()
 
     Pleroma.User.Query.build(%{local: true})
-    |> Pleroma.RepoStreamer.chunk_stream(500)
+    |> Pleroma.Repo.chunk_stream(500, :batches)
     |> Stream.each(fn users ->
       users
       |> Enum.each(fn user ->
index a75610879a6c7a78a12a5461ef14160b690a5380..4524bd5e2c766636382ba2c0c34d409d10c289b4 100644 (file)
@@ -49,6 +49,20 @@ defmodule Pleroma.Repo do
     end
   end
 
+  @doc """
+  Returns a lazy enumerable that emits all entries from the data store matching the given query.
+
+  `returns_as` use to group records. use the `batches` option to fetch records in bulk.
+
+  ## Examples
+
+  # fetch records one-by-one
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
+
+  # fetch records in bulk
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
+  """
+  @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
   def chunk_stream(query, chunk_size, returns_as \\ :one) do
     # We don't actually need start and end funcitons of resource streaming,
     # but it seems to be the only way to not fetch records one-by-one and
diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex
deleted file mode 100644 (file)
index cb4d7bb..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.RepoStreamer do
-  alias Pleroma.Repo
-  import Ecto.Query
-
-  def chunk_stream(query, chunk_size) do
-    Stream.unfold(0, fn
-      :halt ->
-        {[], :halt}
-
-      last_id ->
-        query
-        |> order_by(asc: :id)
-        |> where([r], r.id > ^last_id)
-        |> limit(^chunk_size)
-        |> Repo.all()
-        |> case do
-          [] ->
-            {[], :halt}
-
-          records ->
-            last_id = List.last(records).id
-            {records, last_id}
-        end
-    end)
-    |> Stream.take_while(fn
-      [] -> false
-      _ -> true
-    end)
-  end
-end
index e73d199648f616d0a57a1c9ad2014cc83fc77ac9..57497eb83e66da2f9a799346703df4c7d997f11d 100644 (file)
@@ -25,7 +25,6 @@ defmodule Pleroma.User do
   alias Pleroma.Object
   alias Pleroma.Registration
   alias Pleroma.Repo
-  alias Pleroma.RepoStreamer
   alias Pleroma.User
   alias Pleroma.UserRelationship
   alias Pleroma.Web
@@ -1775,7 +1774,7 @@ defmodule Pleroma.User do
   def delete_user_activities(%User{ap_id: ap_id} = user) do
     ap_id
     |> Activity.Queries.by_actor()
-    |> RepoStreamer.chunk_stream(50)
+    |> Repo.chunk_stream(50, :batches)
     |> Stream.each(fn activities ->
       Enum.each(activities, fn activity -> delete_activity(activity, user) end)
     end)
index 72c0b5071ccb99dfecfbef9b4c18f4e3111284a8..155791be22fd1fb2905ca0c82d97d9d1f14d4f01 100644 (file)
@@ -49,4 +49,32 @@ defmodule Pleroma.RepoTest do
       assert Repo.get_assoc(token, :user) == {:error, :not_found}
     end
   end
+
+  describe "chunk_stream/3" do
+    test "fetch records one-by-one" do
+      users = insert_list(50, :user)
+
+      {fetch_users, 50} =
+        from(t in User)
+        |> Repo.chunk_stream(5)
+        |> Enum.reduce({[], 0}, fn %User{} = user, {acc, count} ->
+          {acc ++ [user], count + 1}
+        end)
+
+      assert users == fetch_users
+    end
+
+    test "fetch records in bulk" do
+      users = insert_list(50, :user)
+
+      {fetch_users, 10} =
+        from(t in User)
+        |> Repo.chunk_stream(5, :batches)
+        |> Enum.reduce({[], 0}, fn users, {acc, count} ->
+          {acc ++ users, count + 1}
+        end)
+
+      assert users == fetch_users
+    end
+  end
 end