Merge branch 'develop' into feature/elasticsearch
[akkoma] / lib / pleroma / elasticsearch / store.ex
1 defmodule Pleroma.Elasticsearch do
2 alias Pleroma.Activity
3 alias Pleroma.User
4 alias Pleroma.Object
5 alias Pleroma.Elasticsearch.DocumentMappings
6 alias Pleroma.Config
7 require Logger
8
9 defp url do
10 Config.get([:elasticsearch, :url])
11 end
12
13 defp enabled? do
14 Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
15 end
16
17 def put_by_id(:activity, id) do
18 id
19 |> Activity.get_by_id_with_object()
20 |> maybe_put_into_elasticsearch()
21 end
22
23 def maybe_put_into_elasticsearch({:ok, item}) do
24 maybe_put_into_elasticsearch(item)
25 end
26
27 def maybe_put_into_elasticsearch(
28 %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
29 ) do
30 if enabled?() do
31 actor = Pleroma.Activity.user_actor(activity)
32
33 activity
34 |> Map.put(:user_actor, actor)
35 |> put()
36 end
37 end
38
39 def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
40 if enabled?() do
41 put(user)
42 end
43 end
44
45 def maybe_put_into_elasticsearch(_) do
46 {:ok, :skipped}
47 end
48
49 def maybe_bulk_post(data, type) do
50 if enabled?() do
51 bulk_post(data, type)
52 end
53 end
54
55 def put(%Activity{} = activity) do
56 {:ok, _} =
57 Elastix.Document.index(
58 url(),
59 "activities",
60 "activity",
61 DocumentMappings.Activity.id(activity),
62 DocumentMappings.Activity.encode(activity)
63 )
64
65 activity
66 |> Map.get(:object)
67 |> Object.hashtags()
68 |> Enum.map(fn x -> %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())} end)
69 |> bulk_post(:hashtags)
70 end
71
72 def put(%User{} = user) do
73 {:ok, _} =
74 Elastix.Document.index(
75 url(),
76 "users",
77 "user",
78 DocumentMappings.User.id(user),
79 DocumentMappings.User.encode(user)
80 )
81 end
82
83 def bulk_post(data, :activities) do
84 d =
85 data
86 |> Enum.filter(fn x ->
87 t =
88 x.object
89 |> Map.get(:data, %{})
90 |> Map.get("type", "")
91
92 t == "Note"
93 end)
94 |> Enum.map(fn d ->
95 [
96 %{index: %{_id: DocumentMappings.Activity.id(d)}},
97 DocumentMappings.Activity.encode(d)
98 ]
99 end)
100 |> List.flatten()
101
102 {:ok, %{body: %{"errors" => false}}} =
103 Elastix.Bulk.post(
104 url(),
105 d,
106 index: "activities",
107 type: "activity"
108 )
109 end
110
111 def bulk_post(data, :users) do
112 d =
113 data
114 |> Enum.map(fn d ->
115 [
116 %{index: %{_id: DocumentMappings.User.id(d)}},
117 DocumentMappings.User.encode(d)
118 ]
119 end)
120 |> List.flatten()
121
122 Elastix.Bulk.post(
123 url(),
124 d,
125 index: "users",
126 type: "user"
127 )
128 end
129
130 def bulk_post(data, :hashtags) when is_list(data) do
131 d =
132 data
133 |> Enum.map(fn d ->
134 [
135 %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
136 DocumentMappings.Hashtag.encode(d)
137 ]
138 end)
139 |> List.flatten()
140
141 Elastix.Bulk.post(
142 url(),
143 d,
144 index: "hashtags",
145 type: "hashtag"
146 )
147 end
148
149 def bulk_post(_, :hashtags), do: {:ok, nil}
150
151 def search(_, _, _, :skip), do: []
152
153 def search(:raw, index, type, q) do
154 with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
155 results =
156 raw_results
157 |> Map.get(:body, %{})
158 |> Map.get("hits", %{})
159 |> Map.get("hits", [])
160
161 {:ok, results}
162 else
163 {:error, e} ->
164 Logger.error(e)
165 {:error, e}
166 end
167 end
168
169 def search(:activities, q) do
170 with {:ok, results} <- search(:raw, "activities", "activity", q) do
171 results
172 |> Enum.map(fn result -> result["_id"] end)
173 |> Pleroma.Activity.all_by_ids_with_object()
174 |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
175 else
176 e ->
177 Logger.error(e)
178 []
179 end
180 end
181
182 def search(:users, q) do
183 with {:ok, results} <- search(:raw, "users", "user", q) do
184 results
185 |> Enum.map(fn result -> result["_id"] end)
186 |> Pleroma.User.get_all_by_ids()
187 else
188 e ->
189 Logger.error(e)
190 []
191 end
192 end
193
194 def search(:hashtags, q) do
195 with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
196 results
197 |> Enum.map(fn result -> result["_source"]["hashtag"] end)
198 else
199 e ->
200 Logger.error(e)
201 []
202 end
203 end
204 end