Make the indexing batch differently and more, show number indexed
authorEkaterina Vaartis <vaartis@kotobank.ch>
Mon, 16 Aug 2021 19:30:56 +0000 (22:30 +0300)
committerFloatingGhost <hannah@coffee-and-dreams.uk>
Wed, 29 Jun 2022 19:48:29 +0000 (20:48 +0100)
lib/mix/tasks/pleroma/search/meilisearch.ex

index 0b86fdececf550a1f83b0ea66f132e3b1009ac3b..2a64385289922cf25645135f5be6b433054d761a 100644 (file)
@@ -28,33 +28,46 @@ defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
         ])
       )
 
-    Pleroma.Repo.chunk_stream(
-      from(Pleroma.Object,
-        # Only index public posts which are notes and have some text
-        where:
-          fragment("data->>'type' = 'Note'") and
-            fragment("LENGTH(data->>'source') > 0") and
-            fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public())
-      ),
-      200,
-      :batches
-    )
-    |> Stream.map(fn objects ->
-      Enum.map(objects, fn object ->
-        data = object.data
-        %{id: object.id, source: data["source"], ap: data["id"]}
-      end)
-    end)
-    |> Stream.each(fn objects ->
-      {:ok, _} =
-        Pleroma.HTTP.post(
-          "#{endpoint}/indexes/objects/documents",
-          Jason.encode!(objects)
+    chunk_size = 100_000
+
+    Pleroma.Repo.transaction(
+      fn ->
+        Pleroma.Repo.stream(
+          from(Pleroma.Object,
+            # Only index public posts which are notes and have some text
+            where:
+              fragment("data->>'type' = 'Note'") and
+                fragment("LENGTH(data->>'source') > 0") and
+                fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()),
+            order_by: fragment("data->'published' DESC")
+          ),
+          timeout: :infinity
         )
+        |> Stream.chunk_every(chunk_size)
+        |> Stream.transform(0, fn objects, acc ->
+          new_acc = acc + Enum.count(objects)
 
-      IO.puts("Indexed #{Enum.count(objects)} entries")
-    end)
-    |> Stream.run()
+          IO.puts("Indexed #{new_acc} entries")
+
+          {[objects], new_acc}
+        end)
+        |> Stream.map(fn objects ->
+          Enum.map(objects, fn object ->
+            data = object.data
+            %{id: object.id, source: data["source"], ap: data["id"]}
+          end)
+        end)
+        |> Stream.each(fn objects ->
+          {:ok, _} =
+            Pleroma.HTTP.post(
+              "#{endpoint}/indexes/objects/documents",
+              Jason.encode!(objects)
+            )
+        end)
+        |> Stream.run()
+      end,
+      timeout: :infinity
+    )
   end
 
   def run(["clear"]) do