Merge remote-tracking branch 'upstream/develop' into develop
[akkoma] / lib / pleroma / elasticsearch / store.ex
1 defmodule Pleroma.Elasticsearch do
2 alias Pleroma.Activity
3 alias Pleroma.User
4 alias Pleroma.Elasticsearch.DocumentMappings
5 alias Pleroma.Config
6 require Logger
7
8 defp url do
9 Config.get([:elasticsearch, :url])
10 end
11
12 defp enabled? do
13 Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
14 end
15
16 def put_by_id(:activity, id) do
17 id
18 |> Activity.get_by_id_with_object()
19 |> maybe_put_into_elasticsearch()
20 end
21
22 def maybe_put_into_elasticsearch({:ok, item}) do
23 maybe_put_into_elasticsearch(item)
24 end
25
26 def maybe_put_into_elasticsearch(
27 %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
28 ) do
29 if enabled?() do
30 actor = Pleroma.Activity.user_actor(activity)
31
32 activity
33 |> Map.put(:user_actor, actor)
34 |> put()
35 end
36 end
37
38 def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
39 if enabled?() do
40 put(user)
41 end
42 end
43
44 def maybe_put_into_elasticsearch(_) do
45 {:ok, :skipped}
46 end
47
48 def put(%Activity{} = activity) do
49 {:ok, _} =
50 Elastix.Document.index(
51 url(),
52 "activities",
53 "activity",
54 DocumentMappings.Activity.id(activity),
55 DocumentMappings.Activity.encode(activity)
56 )
57 end
58
59 def put(%User{} = user) do
60 {:ok, _} =
61 Elastix.Document.index(
62 url(),
63 "users",
64 "user",
65 DocumentMappings.User.id(user),
66 DocumentMappings.User.encode(user)
67 )
68 end
69
70 def bulk_post(data, :activities) do
71 d =
72 data
73 |> Enum.filter(fn x ->
74 t =
75 x.object
76 |> Map.get(:data, %{})
77 |> Map.get("type", "")
78
79 t == "Note"
80 end)
81 |> Enum.map(fn d ->
82 [
83 %{index: %{_id: DocumentMappings.Activity.id(d)}},
84 DocumentMappings.Activity.encode(d)
85 ]
86 end)
87 |> List.flatten()
88
89 {:ok, %{body: %{"errors" => false}}} =
90 Elastix.Bulk.post(
91 url(),
92 d,
93 index: "activities",
94 type: "activity"
95 )
96 end
97
98 def maybe_bulk_post(data, type) do
99 if enabled?() do
100 bulk_post(data, type)
101 end
102 end
103
104 def bulk_post(data, :users) do
105 d =
106 data
107 |> Enum.filter(fn x -> x.actor_type == "Person" end)
108 |> Enum.map(fn d ->
109 [
110 %{index: %{_id: DocumentMappings.User.id(d)}},
111 DocumentMappings.User.encode(d)
112 ]
113 end)
114 |> List.flatten()
115
116 Elastix.Bulk.post(
117 url(),
118 d,
119 index: "users",
120 type: "user"
121 )
122 end
123
124 def bulk_post([] = data, :hashtags) do
125 d =
126 data
127 |> Enum.map(fn d ->
128 [
129 %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
130 DocumentMappings.Hashtag.encode(d)
131 ]
132 end)
133 |> List.flatten()
134
135 Elastix.Bulk.post(
136 url(),
137 d,
138 index: "hashtags",
139 type: "hashtag"
140 )
141 end
142
143 def bulk_post(_, :hashtags), do: {:ok, nil}
144
145 def search(_, _, _, :skip), do: []
146
147 def search(:raw, index, type, q) do
148 with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
149 results =
150 raw_results
151 |> Map.get(:body, %{})
152 |> Map.get("hits", %{})
153 |> Map.get("hits", [])
154
155 {:ok, results}
156 else
157 {:error, e} ->
158 Logger.error(e)
159 {:error, e}
160 end
161 end
162
163 def search(:activities, q) do
164 with {:ok, results} <- search(:raw, "activities", "activity", q) do
165 results
166 |> Enum.map(fn result -> result["_id"] end)
167 |> Pleroma.Activity.all_by_ids_with_object()
168 |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
169 else
170 e ->
171 Logger.error(e)
172 []
173 end
174 end
175
176 def search(:users, q) do
177 with {:ok, results} <- search(:raw, "users", "user", q) do
178 results
179 |> Enum.map(fn result -> result["_id"] end)
180 |> Pleroma.User.get_all_by_ids()
181 else
182 e ->
183 Logger.error(e)
184 []
185 end
186 end
187
188 def search(:hashtags, q) do
189 with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
190 results
191 |> Enum.map(fn result -> result["_source"]["hashtag"] end)
192 else
193 e ->
194 Logger.error(e)
195 []
196 end
197 end
198 end