01561a0df2606c70c55c4044fc26003ad5e4aa2e
[akkoma] / lib / pleroma / elasticsearch / store.ex
1 defmodule Pleroma.Elasticsearch do
2 alias Pleroma.Activity
3 alias Pleroma.User
4 alias Pleroma.Object
5 alias Pleroma.Elasticsearch.DocumentMappings
6 alias Pleroma.Config
7 require Logger
8
9 defp url do
10 Config.get([:elasticsearch, :url])
11 end
12
13 defp enabled? do
14 Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
15 end
16
17 def delete_by_id(:activity, id) do
18 if enabled?() do
19 Elastix.Document.delete(url(), "activities", "activity", id)
20 end
21 end
22
23 def put_by_id(:activity, id) do
24 id
25 |> Activity.get_by_id_with_object()
26 |> maybe_put_into_elasticsearch()
27 end
28
29 def maybe_put_into_elasticsearch({:ok, item}) do
30 maybe_put_into_elasticsearch(item)
31 end
32
33 def maybe_put_into_elasticsearch(
34 %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
35 ) do
36 if enabled?() do
37 actor = Pleroma.Activity.user_actor(activity)
38
39 activity
40 |> Map.put(:user_actor, actor)
41 |> put()
42 end
43 end
44
45 def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
46 if enabled?() do
47 put(user)
48 end
49 end
50
51 def maybe_put_into_elasticsearch(_) do
52 {:ok, :skipped}
53 end
54
55 def maybe_bulk_post(data, type) do
56 if enabled?() do
57 bulk_post(data, type)
58 end
59 end
60
61 def put(%Activity{} = activity) do
62 with {:ok, _} <-
63 Elastix.Document.index(
64 url(),
65 "activities",
66 "activity",
67 DocumentMappings.Activity.id(activity),
68 DocumentMappings.Activity.encode(activity)
69 ) do
70 activity
71 |> Map.get(:object)
72 |> Object.hashtags()
73 |> Enum.map(fn x ->
74 %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
75 end)
76 |> bulk_post(:hashtags)
77 else
78 {:error, %{reason: err}} ->
79 Logger.error("Could not put activity: #{err}")
80 :skipped
81 end
82 end
83
84 def put(%User{} = user) do
85 with {:ok, _} <-
86 Elastix.Document.index(
87 url(),
88 "users",
89 "user",
90 DocumentMappings.User.id(user),
91 DocumentMappings.User.encode(user)
92 ) do
93 :ok
94 else
95 {:error, %{reason: err}} ->
96 Logger.error("Could not put user: #{err}")
97 :skipped
98 end
99 end
100
101 def bulk_post(data, :activities) do
102 d =
103 data
104 |> Enum.filter(fn x ->
105 t =
106 x.object
107 |> Map.get(:data, %{})
108 |> Map.get("type", "")
109
110 t == "Note"
111 end)
112 |> Enum.map(fn d ->
113 [
114 %{index: %{_id: DocumentMappings.Activity.id(d)}},
115 DocumentMappings.Activity.encode(d)
116 ]
117 end)
118 |> List.flatten()
119
120 with {:ok, %{body: %{"errors" => false}}} <-
121 Elastix.Bulk.post(
122 url(),
123 d,
124 index: "activities",
125 type: "activity"
126 ) do
127 :ok
128 else
129 {:error, %{reason: err}} ->
130 Logger.error("Could not bulk put activity: #{err}")
131 :skipped
132 {:ok, %{body: body}} ->
133 IO.inspect(body)
134 :skipped
135 end
136 end
137
138 def bulk_post(data, :users) do
139 d =
140 data
141 |> Enum.filter(fn x -> x.actor_type == "Person" end)
142 |> Enum.map(fn d ->
143 [
144 %{index: %{_id: DocumentMappings.User.id(d)}},
145 DocumentMappings.User.encode(d)
146 ]
147 end)
148 |> List.flatten()
149
150 with {:ok, %{body: %{"errors" => false}}} <-
151 Elastix.Bulk.post(
152 url(),
153 d,
154 index: "users",
155 type: "user"
156 ) do
157 :ok
158 else
159 {:error, %{reason: err}} ->
160 Logger.error("Could not bulk put users: #{err}")
161 :skipped
162 {:ok, %{body: body}} ->
163 IO.inspect(body)
164 :skipped
165 end
166 end
167
168 def bulk_post(data, :hashtags) when is_list(data) do
169 d =
170 data
171 |> Enum.map(fn d ->
172 [
173 %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
174 DocumentMappings.Hashtag.encode(d)
175 ]
176 end)
177 |> List.flatten()
178
179 with {:ok, %{body: %{"errors" => false}}} <-
180 Elastix.Bulk.post(
181 url(),
182 d,
183 index: "hashtags",
184 type: "hashtag"
185 ) do
186 :ok
187 else
188 {:error, %{reason: err}} ->
189 Logger.error("Could not bulk put hashtags: #{err}")
190 :skipped
191 {:ok, %{body: body}} ->
192 IO.inspect(body)
193 :skipped
194 end
195 end
196
197 def bulk_post(_, :hashtags), do: {:ok, nil}
198
199 def search(_, _, _, :skip), do: []
200
201 def search(:raw, index, type, q) do
202 with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
203 results =
204 raw_results
205 |> Map.get(:body, %{})
206 |> Map.get("hits", %{})
207 |> Map.get("hits", [])
208
209 {:ok, results}
210 else
211 {:error, e} ->
212 Logger.error(e)
213 {:error, e}
214 end
215 end
216
217 def search(:activities, q) do
218 with {:ok, results} <- search(:raw, "activities", "activity", q) do
219 results
220 |> Enum.map(fn result -> result["_id"] end)
221 |> Pleroma.Activity.all_by_ids_with_object()
222 |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
223 else
224 e ->
225 Logger.error(e)
226 []
227 end
228 end
229
230 def search(:users, q) do
231 with {:ok, results} <- search(:raw, "users", "user", q) do
232 results
233 |> Enum.map(fn result -> result["_id"] end)
234 |> Pleroma.User.get_all_by_ids()
235 else
236 e ->
237 Logger.error(e)
238 []
239 end
240 end
241
242 def search(:hashtags, q) do
243 with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
244 results
245 |> Enum.map(fn result -> result["_source"]["hashtag"] end)
246 else
247 e ->
248 Logger.error(e)
249 []
250 end
251 end
252 end