Merge remote-tracking branch 'origin/patch/readd-mastofe' into develop
[akkoma] / lib / pleroma / elasticsearch / store.ex
1 defmodule Pleroma.Elasticsearch do
2 alias Pleroma.Activity
3 alias Pleroma.User
4 alias Pleroma.Object
5 alias Pleroma.Elasticsearch.DocumentMappings
6 alias Pleroma.Config
7 require Logger
8
9 defp url do
10 Config.get([:elasticsearch, :url])
11 end
12
13 defp enabled? do
14 Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
15 end
16
17 def delete_by_id(:activity, id) do
18 if enabled?() do
19 Elastix.Document.delete(url(), "activities", "activity", id)
20 end
21 end
22
23 def put_by_id(:activity, id) do
24 id
25 |> Activity.get_by_id_with_object()
26 |> maybe_put_into_elasticsearch()
27 end
28
29 def maybe_put_into_elasticsearch({:ok, item}) do
30 maybe_put_into_elasticsearch(item)
31 end
32
33 def maybe_put_into_elasticsearch(
34 %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
35 ) do
36 if enabled?() do
37 actor = Pleroma.Activity.user_actor(activity)
38
39 activity
40 |> Map.put(:user_actor, actor)
41 |> put()
42 end
43 end
44
45 def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
46 if enabled?() do
47 put(user)
48 end
49 end
50
51 def maybe_put_into_elasticsearch(_) do
52 {:ok, :skipped}
53 end
54
55 def maybe_bulk_post(data, type) do
56 if enabled?() do
57 bulk_post(data, type)
58 end
59 end
60
61 def put(%Activity{} = activity) do
62 with {:ok, _} <-
63 Elastix.Document.index(
64 url(),
65 "activities",
66 "activity",
67 DocumentMappings.Activity.id(activity),
68 DocumentMappings.Activity.encode(activity)
69 ) do
70 activity
71 |> Map.get(:object)
72 |> Object.hashtags()
73 |> Enum.map(fn x ->
74 %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
75 end)
76 |> bulk_post(:hashtags)
77 else
78 {:error, %{reason: err}} ->
79 Logger.error("Could not put activity: #{err}")
80 :skipped
81 end
82 end
83
84 def put(%User{} = user) do
85 with {:ok, _} <-
86 Elastix.Document.index(
87 url(),
88 "users",
89 "user",
90 DocumentMappings.User.id(user),
91 DocumentMappings.User.encode(user)
92 ) do
93 :ok
94 else
95 {:error, %{reason: err}} ->
96 Logger.error("Could not put user: #{err}")
97 :skipped
98 end
99 end
100
101 def bulk_post(data, :activities) do
102 d =
103 data
104 |> Enum.filter(fn x ->
105 t =
106 x.object
107 |> Map.get(:data, %{})
108 |> Map.get("type", "")
109
110 t == "Note"
111 end)
112 |> Enum.map(fn d ->
113 [
114 %{index: %{_id: DocumentMappings.Activity.id(d)}},
115 DocumentMappings.Activity.encode(d)
116 ]
117 end)
118 |> List.flatten()
119
120 with {:ok, %{body: %{"errors" => false}}} <-
121 Elastix.Bulk.post(
122 url(),
123 d,
124 index: "activities",
125 type: "activity"
126 ) do
127 :ok
128 else
129 {:error, %{reason: err}} ->
130 Logger.error("Could not bulk put activity: #{err}")
131 :skipped
132
133 {:ok, %{body: body}} ->
134 IO.inspect(body)
135 :skipped
136 end
137 end
138
139 def bulk_post(data, :users) do
140 d =
141 data
142 |> Enum.filter(fn x -> x.actor_type == "Person" end)
143 |> Enum.map(fn d ->
144 [
145 %{index: %{_id: DocumentMappings.User.id(d)}},
146 DocumentMappings.User.encode(d)
147 ]
148 end)
149 |> List.flatten()
150
151 with {:ok, %{body: %{"errors" => false}}} <-
152 Elastix.Bulk.post(
153 url(),
154 d,
155 index: "users",
156 type: "user"
157 ) do
158 :ok
159 else
160 {:error, %{reason: err}} ->
161 Logger.error("Could not bulk put users: #{err}")
162 :skipped
163
164 {:ok, %{body: body}} ->
165 IO.inspect(body)
166 :skipped
167 end
168 end
169
170 def bulk_post(data, :hashtags) when is_list(data) do
171 d =
172 data
173 |> Enum.map(fn d ->
174 [
175 %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
176 DocumentMappings.Hashtag.encode(d)
177 ]
178 end)
179 |> List.flatten()
180
181 with {:ok, %{body: %{"errors" => false}}} <-
182 Elastix.Bulk.post(
183 url(),
184 d,
185 index: "hashtags",
186 type: "hashtag"
187 ) do
188 :ok
189 else
190 {:error, %{reason: err}} ->
191 Logger.error("Could not bulk put hashtags: #{err}")
192 :skipped
193
194 {:ok, %{body: body}} ->
195 IO.inspect(body)
196 :skipped
197 end
198 end
199
200 def bulk_post(_, :hashtags), do: {:ok, nil}
201
202 def search(_, _, _, :skip), do: []
203
204 def search(:raw, index, type, q) do
205 with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
206 results =
207 raw_results
208 |> Map.get(:body, %{})
209 |> Map.get("hits", %{})
210 |> Map.get("hits", [])
211
212 {:ok, results}
213 else
214 {:error, e} ->
215 Logger.error(e)
216 {:error, e}
217 end
218 end
219
220 def search(:activities, q) do
221 with {:ok, results} <- search(:raw, "activities", "activity", q) do
222 results
223 |> Enum.map(fn result -> result["_id"] end)
224 |> Pleroma.Activity.all_by_ids_with_object()
225 |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
226 else
227 e ->
228 Logger.error(e)
229 []
230 end
231 end
232
233 def search(:users, q) do
234 with {:ok, results} <- search(:raw, "users", "user", q) do
235 results
236 |> Enum.map(fn result -> result["_id"] end)
237 |> Pleroma.User.get_all_by_ids()
238 else
239 e ->
240 Logger.error(e)
241 []
242 end
243 end
244
245 def search(:hashtags, q) do
246 with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
247 results
248 |> Enum.map(fn result -> result["_source"]["hashtag"] end)
249 else
250 e ->
251 Logger.error(e)
252 []
253 end
254 end
255 end