From abf82a63ec242885672e7add20ddfc9554d7f81d Mon Sep 17 00:00:00 2001 From: Ekaterina Vaartis Date: Mon, 16 Aug 2021 22:30:56 +0300 Subject: [PATCH] Make the indexing batch differently and more, show number indexed --- lib/mix/tasks/pleroma/search/meilisearch.ex | 63 +++++++++++++-------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex index 0b86fdece..2a6438528 100644 --- a/lib/mix/tasks/pleroma/search/meilisearch.ex +++ b/lib/mix/tasks/pleroma/search/meilisearch.ex @@ -28,33 +28,46 @@ defmodule Mix.Tasks.Pleroma.Search.Meilisearch do ]) ) - Pleroma.Repo.chunk_stream( - from(Pleroma.Object, - # Only index public posts which are notes and have some text - where: - fragment("data->>'type' = 'Note'") and - fragment("LENGTH(data->>'source') > 0") and - fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) - ), - 200, - :batches - ) - |> Stream.map(fn objects -> - Enum.map(objects, fn object -> - data = object.data - %{id: object.id, source: data["source"], ap: data["id"]} - end) - end) - |> Stream.each(fn objects -> - {:ok, _} = - Pleroma.HTTP.post( - "#{endpoint}/indexes/objects/documents", - Jason.encode!(objects) + chunk_size = 100_000 + + Pleroma.Repo.transaction( + fn -> + Pleroma.Repo.stream( + from(Pleroma.Object, + # Only index public posts which are notes and have some text + where: + fragment("data->>'type' = 'Note'") and + fragment("LENGTH(data->>'source') > 0") and + fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()), + order_by: fragment("data->'published' DESC") + ), + timeout: :infinity ) + |> Stream.chunk_every(chunk_size) + |> Stream.transform(0, fn objects, acc -> + new_acc = acc + Enum.count(objects) - IO.puts("Indexed #{Enum.count(objects)} entries") - end) - |> Stream.run() + IO.puts("Indexed #{new_acc} entries") + + {[objects], new_acc} + end) + |> Stream.map(fn objects -> + Enum.map(objects, fn object -> + data = object.data + %{id: object.id, source: data["source"], ap: data["id"]} + end) + end) + |> Stream.each(fn objects -> + {:ok, _} = + Pleroma.HTTP.post( + "#{endpoint}/indexes/objects/documents", + Jason.encode!(objects) + ) + end) + |> Stream.run() + end, + timeout: :infinity + ) end def run(["clear"]) do -- 2.45.2