Support reindexing meilisearch >=0.24.0
[akkoma] / lib / mix / tasks / pleroma / search / meilisearch.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
6 require Logger
7 require Pleroma.Constants
8
9 import Mix.Pleroma
10 import Ecto.Query
11
12 import Pleroma.Search.Meilisearch, only: [meili_post!: 2, meili_delete!: 1, meili_get!: 1]
13
14 def run(["index" | args]) do
15 start_pleroma()
16
17 is_reindex = "--reindex" in args
18
19 meili_post!(
20 "/indexes/objects/settings/ranking-rules",
21 [
22 "desc(published)",
23 "words",
24 "exactness",
25 "proximity",
26 "wordsPosition",
27 "typo",
28 "attribute"
29 ]
30 )
31
32 meili_post!(
33 "/indexes/objects/settings/searchable-attributes",
34 [
35 "content"
36 ]
37 )
38
39 chunk_size = 10_000
40
41 Pleroma.Repo.transaction(
42 fn ->
43 query =
44 from(Pleroma.Object,
45 # Only index public posts which are notes and have some text
46 where:
47 fragment("data->>'type' = 'Note'") and
48 fragment("LENGTH(data->>'content') > 0") and
49 fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()),
50 order_by: [desc: fragment("data->'published'")]
51 )
52
53 count = query |> Pleroma.Repo.aggregate(:count, :data)
54 IO.puts("Entries to index: #{count}")
55
56 Pleroma.Repo.stream(
57 query,
58 timeout: :infinity
59 )
60 |> Stream.map(&Pleroma.Search.Meilisearch.object_to_search_data/1)
61 |> Stream.filter(fn o -> not is_nil(o) end)
62 |> Stream.chunk_every(chunk_size)
63 |> Stream.transform(0, fn objects, acc ->
64 new_acc = acc + Enum.count(objects)
65
66 # Reset to the beginning of the line and rewrite it
67 IO.write("\r")
68 IO.write("Indexed #{new_acc} entries")
69
70 {[objects], new_acc}
71 end)
72 |> Stream.each(fn objects ->
73 objects =
74 objects
75 |> Enum.filter(fn o ->
76 if is_reindex do
77 result = meili_get!("/indexes/objects/documents/#{o.id}")
78
79 # With >= 0.24.0 the name for "errorCode" is just "code"
80 error_code_key =
81 if meili_get!("/version")["pkgVersion"] |> Version.match?(">= 0.24.0"),
82 do: "code",
83 else: "errorCode"
84
85 # Filter out the already indexed documents. This is true when the document does not exist
86 result[error_code_key] == "document_not_found"
87 else
88 true
89 end
90 end)
91
92 result =
93 meili_post!(
94 "/indexes/objects/documents",
95 objects
96 )
97
98 if not Map.has_key?(result, "updateId") do
99 IO.puts("Failed to index: #{inspect(result)}")
100 end
101 end)
102 |> Stream.run()
103 end,
104 timeout: :infinity
105 )
106
107 IO.write("\n")
108 end
109
110 def run(["clear"]) do
111 start_pleroma()
112
113 meili_delete!("/indexes/objects/documents")
114 end
115
116 def run(["show-private-key", master_key]) do
117 start_pleroma()
118
119 endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
120
121 {:ok, result} =
122 Pleroma.HTTP.get(
123 Path.join(endpoint, "/keys"),
124 [{"X-Meili-API-Key", master_key}]
125 )
126
127 decoded = Jason.decode!(result.body)
128
129 if decoded["private"] do
130 IO.puts(decoded["private"])
131 else
132 IO.puts("Error fetching the key, check the master key is correct: #{inspect(decoded)}")
133 end
134 end
135
136 def run(["stats"]) do
137 start_pleroma()
138
139 result = meili_get!("/indexes/objects/stats")
140 IO.puts("Number of entries: #{result["numberOfDocuments"]}")
141 IO.puts("Indexing? #{result["isIndexing"]}")
142 end
143 end