Mark only content as searchable for meilisearch
[akkoma] / lib / mix / tasks / pleroma / search / meilisearch.ex
index 1fece96e5ea602de24f229cfa31d1d2f8adbc2a9..ebd3cc81fdc19ed34857a538bb32cf5156eba9b2 100644 (file)
@@ -4,6 +4,7 @@
 
 defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
   require Logger
+  require Pleroma.Constants
 
   import Mix.Pleroma
   import Ecto.Query
@@ -17,7 +18,7 @@ defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
       Pleroma.HTTP.post(
         "#{endpoint}/indexes/objects/settings/ranking-rules",
         Jason.encode!([
-          "desc(id)",
+          "desc(published)",
           "typo",
           "words",
           "proximity",
@@ -27,28 +28,80 @@ defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
         ])
       )
 
-    Pleroma.Repo.chunk_stream(
-      from(Pleroma.Object,
-        where: fragment("data->>'type' = 'Note'") and fragment("LENGTH(data->>'source') > 0")
-      ),
-      200,
-      :batches
-    )
-    |> Stream.map(fn objects ->
-      Enum.map(objects, fn object ->
-        data = object.data
-        %{id: object.id, source: data["source"], ap: data["id"]}
-      end)
-    end)
-    |> Stream.each(fn objects ->
-      {:ok, _} =
-        Pleroma.HTTP.post(
-          "#{endpoint}/indexes/objects/documents",
-          Jason.encode!(objects)
+    {:ok, _} =
+      Pleroma.HTTP.post(
+        "#{endpoint}/indexes/objects/settings/searchable-attributes",
+        Jason.encode!([
+          "content"
+        ])
+      )
+
+    chunk_size = 10_000
+
+    Pleroma.Repo.transaction(
+      fn ->
+        Pleroma.Repo.stream(
+          from(Pleroma.Object,
+            # Only index public posts which are notes and have some text
+            where:
+              fragment("data->>'type' = 'Note'") and
+                fragment("LENGTH(data->>'content') > 0") and
+                fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()),
+            order_by: [desc: fragment("data->'published'")]
+          ),
+          timeout: :infinity
         )
+        |> Stream.chunk_every(chunk_size)
+        |> Stream.transform(0, fn objects, acc ->
+          new_acc = acc + Enum.count(objects)
+
+          IO.puts("Indexed #{new_acc} entries")
+
+          {[objects], new_acc}
+        end)
+        |> Stream.map(fn objects ->
+          Enum.map(objects, fn object ->
+            data = object.data
+
+            content_str =
+              case data["content"] do
+                [nil | rest] -> to_string(rest)
+                str -> str
+              end
+
+            {:ok, published, _} = DateTime.from_iso8601(data["published"])
+            {:ok, content} = FastSanitize.strip_tags(content_str)
+
+            %{
+              id: object.id,
+              content: content,
+              ap: data["id"],
+              published: published |> DateTime.to_unix()
+            }
+          end)
+        end)
+        |> Stream.each(fn objects ->
+          {:ok, result} =
+            Pleroma.HTTP.post(
+              "#{endpoint}/indexes/objects/documents",
+              Jason.encode!(objects)
+            )
+
+          if not Map.has_key?(Jason.decode!(result.body), "updateId") do
+            IO.puts("Failed to index: #{result}")
+          end
+        end)
+        |> Stream.run()
+      end,
+      timeout: :infinity
+    )
+  end
+
+  def run(["clear"]) do
+    start_pleroma()
+
+    endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
 
-      IO.puts("Indexed #{Enum.count(objects)} entries")
-    end)
-    |> Stream.run()
+    {:ok, _} = Pleroma.HTTP.request(:delete, "#{endpoint}/indexes/objects", "", [], [])
   end
 end