Merge branch 'delete_orphaned_activities' into develop
[akkoma] / lib / pleroma / activity.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Activity do
6 use Ecto.Schema
7
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Queries
10 alias Pleroma.Bookmark
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Repo
14 alias Pleroma.ReportNote
15 alias Pleroma.ThreadMute
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ActivityPub
18
19 import Ecto.Changeset
20 import Ecto.Query
21
22 @type t :: %__MODULE__{}
23 @type actor :: String.t()
24
25 @primary_key {:id, FlakeId.Ecto.CompatType, autogenerate: true}
26
27 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
28
29 schema "activities" do
30 field(:data, :map)
31 field(:local, :boolean, default: true)
32 field(:actor, :string)
33 field(:recipients, {:array, :string}, default: [])
34 field(:thread_muted?, :boolean, virtual: true)
35
36 # A field that can be used if you need to join some kind of other
37 # id to order / paginate this field by
38 field(:pagination_id, :string, virtual: true)
39
40 # This is a fake relation,
41 # do not use outside of with_preloaded_user_actor/with_joined_user_actor
42 has_one(:user_actor, User, on_delete: :nothing, foreign_key: :id)
43 # This is a fake relation, do not use outside of with_preloaded_bookmark/get_bookmark
44 has_one(:bookmark, Bookmark)
45 # This is a fake relation, do not use outside of with_preloaded_report_notes
46 has_many(:report_notes, ReportNote)
47 has_many(:notifications, Notification, on_delete: :delete_all)
48
49 # Attention: this is a fake relation, don't try to preload it blindly and expect it to work!
50 # The foreign key is embedded in a jsonb field.
51 #
52 # To use it, you probably want to do an inner join and a preload:
53 #
54 # ```
55 # |> join(:inner, [activity], o in Object,
56 # on: fragment("(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
57 # o.data, activity.data, activity.data))
58 # |> preload([activity, object], [object: object])
59 # ```
60 #
61 # As a convenience, Activity.with_preloaded_object() sets up an inner join and preload for the
62 # typical case.
63 has_one(:object, Object, on_delete: :nothing, foreign_key: :id)
64
65 timestamps()
66 end
67
68 def with_joined_object(query, join_type \\ :inner) do
69 join(query, join_type, [activity], o in Object,
70 on:
71 fragment(
72 "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
73 o.data,
74 activity.data,
75 activity.data
76 ),
77 as: :object
78 )
79 end
80
81 def with_preloaded_object(query, join_type \\ :inner) do
82 query
83 |> has_named_binding?(:object)
84 |> if(do: query, else: with_joined_object(query, join_type))
85 |> preload([activity, object: object], object: object)
86 end
87
88 # Note: applies to fake activities (ActivityPub.Utils.get_notified_from_object/1 etc.)
89 def user_actor(%Activity{actor: nil}), do: nil
90
91 def user_actor(%Activity{} = activity) do
92 with %User{} <- activity.user_actor do
93 activity.user_actor
94 else
95 _ -> User.get_cached_by_ap_id(activity.actor)
96 end
97 end
98
99 def with_joined_user_actor(query, join_type \\ :inner) do
100 join(query, join_type, [activity], u in User,
101 on: u.ap_id == activity.actor,
102 as: :user_actor
103 )
104 end
105
106 def with_preloaded_user_actor(query, join_type \\ :inner) do
107 query
108 |> with_joined_user_actor(join_type)
109 |> preload([activity, user_actor: user_actor], user_actor: user_actor)
110 end
111
112 def with_preloaded_bookmark(query, %User{} = user) do
113 from([a] in query,
114 left_join: b in Bookmark,
115 on: b.user_id == ^user.id and b.activity_id == a.id,
116 as: :bookmark,
117 preload: [bookmark: b]
118 )
119 end
120
121 def with_preloaded_bookmark(query, _), do: query
122
123 def with_preloaded_report_notes(query) do
124 from([a] in query,
125 left_join: r in ReportNote,
126 on: a.id == r.activity_id,
127 as: :report_note,
128 preload: [report_notes: r]
129 )
130 end
131
132 def with_preloaded_report_notes(query, _), do: query
133
134 def with_set_thread_muted_field(query, %User{} = user) do
135 from([a] in query,
136 left_join: tm in ThreadMute,
137 on: tm.user_id == ^user.id and tm.context == fragment("?->>'context'", a.data),
138 as: :thread_mute,
139 select: %Activity{a | thread_muted?: not is_nil(tm.id)}
140 )
141 end
142
143 def with_set_thread_muted_field(query, _), do: query
144
145 def get_by_ap_id(ap_id) do
146 ap_id
147 |> Queries.by_ap_id()
148 |> Repo.one()
149 end
150
151 def get_bookmark(%Activity{} = activity, %User{} = user) do
152 if Ecto.assoc_loaded?(activity.bookmark) do
153 activity.bookmark
154 else
155 Bookmark.get(user.id, activity.id)
156 end
157 end
158
159 def get_bookmark(_, _), do: nil
160
161 def get_report(activity_id) do
162 opts = %{
163 type: "Flag",
164 skip_preload: true,
165 preload_report_notes: true
166 }
167
168 ActivityPub.fetch_activities_query([], opts)
169 |> where(id: ^activity_id)
170 |> Repo.one()
171 end
172
173 def change(struct, params \\ %{}) do
174 struct
175 |> cast(params, [:data, :recipients])
176 |> validate_required([:data])
177 |> unique_constraint(:ap_id, name: :activities_unique_apid_index)
178 end
179
180 def get_by_ap_id_with_object(ap_id) do
181 ap_id
182 |> Queries.by_ap_id()
183 |> with_preloaded_object(:left)
184 |> Repo.one()
185 end
186
187 @doc """
188 Gets activity by ID, doesn't load activities from deactivated actors by default.
189 """
190 @spec get_by_id(String.t(), keyword()) :: t() | nil
191 def get_by_id(id, opts \\ [filter: [:restrict_deactivated]]), do: get_by_id_with_opts(id, opts)
192
193 @spec get_by_id_with_user_actor(String.t()) :: t() | nil
194 def get_by_id_with_user_actor(id), do: get_by_id_with_opts(id, preload: [:user_actor])
195
196 @spec get_by_id_with_object(String.t()) :: t() | nil
197 def get_by_id_with_object(id), do: get_by_id_with_opts(id, preload: [:object])
198
199 defp get_by_id_with_opts(id, opts) do
200 if FlakeId.flake_id?(id) do
201 query = Queries.by_id(id)
202
203 with_filters_query =
204 if is_list(opts[:filter]) do
205 Enum.reduce(opts[:filter], query, fn
206 {:type, type}, acc -> Queries.by_type(acc, type)
207 :restrict_deactivated, acc -> restrict_deactivated_users(acc)
208 _, acc -> acc
209 end)
210 else
211 query
212 end
213
214 with_preloads_query =
215 if is_list(opts[:preload]) do
216 Enum.reduce(opts[:preload], with_filters_query, fn
217 :user_actor, acc -> with_preloaded_user_actor(acc)
218 :object, acc -> with_preloaded_object(acc)
219 _, acc -> acc
220 end)
221 else
222 with_filters_query
223 end
224
225 Repo.one(with_preloads_query)
226 end
227 end
228
229 def all_by_ids_with_object(ids) do
230 Activity
231 |> where([a], a.id in ^ids)
232 |> with_preloaded_object()
233 |> Repo.all()
234 end
235
236 @doc """
237 Accepts `ap_id` or list of `ap_id`.
238 Returns a query.
239 """
240 @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
241 def create_by_object_ap_id(ap_id) do
242 ap_id
243 |> Queries.by_object_id()
244 |> Queries.by_type("Create")
245 end
246
247 def get_all_create_by_object_ap_id(ap_id) do
248 ap_id
249 |> create_by_object_ap_id()
250 |> Repo.all()
251 end
252
253 def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
254 create_by_object_ap_id(ap_id)
255 |> restrict_deactivated_users()
256 |> Repo.one()
257 end
258
259 def get_create_by_object_ap_id(_), do: nil
260
261 @doc """
262 Accepts `ap_id` or list of `ap_id`.
263 Returns a query.
264 """
265 @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
266 def create_by_object_ap_id_with_object(ap_id) do
267 ap_id
268 |> create_by_object_ap_id()
269 |> with_preloaded_object()
270 end
271
272 def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
273 ap_id
274 |> create_by_object_ap_id_with_object()
275 |> Repo.one()
276 end
277
278 def get_create_by_object_ap_id_with_object(_), do: nil
279
280 @spec create_by_id_with_object(String.t()) :: t() | nil
281 def create_by_id_with_object(id) do
282 get_by_id_with_opts(id, preload: [:object], filter: [type: "Create"])
283 end
284
285 defp get_in_reply_to_activity_from_object(%Object{data: %{"inReplyTo" => ap_id}}) do
286 get_create_by_object_ap_id_with_object(ap_id)
287 end
288
289 defp get_in_reply_to_activity_from_object(_), do: nil
290
291 def get_in_reply_to_activity(%Activity{} = activity) do
292 get_in_reply_to_activity_from_object(Object.normalize(activity, fetch: false))
293 end
294
295 def get_quoted_activity_from_object(%Object{data: %{"quoteUri" => ap_id}}) do
296 get_create_by_object_ap_id_with_object(ap_id)
297 end
298
299 def get_quoted_activity_from_object(_), do: nil
300
301 def normalize(%Activity{data: %{"id" => ap_id}}), do: get_by_ap_id_with_object(ap_id)
302 def normalize(%{"id" => ap_id}), do: get_by_ap_id_with_object(ap_id)
303 def normalize(ap_id) when is_binary(ap_id), do: get_by_ap_id_with_object(ap_id)
304 def normalize(_), do: nil
305
306 def delete_all_by_object_ap_id(id) when is_binary(id) do
307 id
308 |> Queries.by_object_id()
309 |> Queries.exclude_type("Delete")
310 |> select([u], u)
311 |> Repo.delete_all(timeout: :infinity)
312 |> elem(1)
313 |> Enum.find(fn
314 %{data: %{"type" => "Create", "object" => ap_id}} when is_binary(ap_id) -> ap_id == id
315 %{data: %{"type" => "Create", "object" => %{"id" => ap_id}}} -> ap_id == id
316 _ -> nil
317 end)
318 |> purge_web_resp_cache()
319 end
320
321 def delete_all_by_object_ap_id(_), do: nil
322
323 defp purge_web_resp_cache(%Activity{data: %{"id" => id}} = activity) when is_binary(id) do
324 with %{path: path} <- URI.parse(id) do
325 @cachex.del(:web_resp_cache, path)
326 end
327
328 activity
329 end
330
331 defp purge_web_resp_cache(activity), do: activity
332
333 def follow_accepted?(
334 %Activity{data: %{"type" => "Follow", "object" => followed_ap_id}} = activity
335 ) do
336 with %User{} = follower <- Activity.user_actor(activity),
337 %User{} = followed <- User.get_cached_by_ap_id(followed_ap_id) do
338 Pleroma.FollowingRelationship.following?(follower, followed)
339 else
340 _ -> false
341 end
342 end
343
344 def follow_accepted?(_), do: false
345
346 def all_by_actor_and_id(actor, status_ids \\ [])
347 def all_by_actor_and_id(_actor, []), do: []
348
349 def all_by_actor_and_id(actor, status_ids) do
350 Activity
351 |> where([s], s.id in ^status_ids)
352 |> where([s], s.actor == ^actor)
353 |> Repo.all()
354 end
355
356 def follow_requests_for_actor(%User{ap_id: ap_id}) do
357 ap_id
358 |> Queries.by_object_id()
359 |> Queries.by_type("Follow")
360 |> where([a], fragment("? ->> 'state' = 'pending'", a.data))
361 end
362
363 def following_requests_for_actor(%User{ap_id: ap_id}) do
364 Queries.by_type("Follow")
365 |> where([a], fragment("?->>'state' = 'pending'", a.data))
366 |> where([a], a.actor == ^ap_id)
367 |> Repo.all()
368 end
369
370 def follow_activity(%User{ap_id: ap_id}, %User{ap_id: followed_ap_id}) do
371 Queries.by_type("Follow")
372 |> where([a], a.actor == ^ap_id)
373 |> where([a], fragment("?->>'object' = ?", a.data, ^followed_ap_id))
374 |> where([a], fragment("?->>'state'", a.data) in ["pending", "accept"])
375 |> Repo.one()
376 end
377
378 def restrict_deactivated_users(query) do
379 query
380 |> join(
381 :inner_lateral,
382 [activity],
383 active in fragment(
384 "SELECT is_active from users WHERE ap_id = ? AND is_active = TRUE",
385 activity.actor
386 )
387 )
388 end
389
390 defdelegate search(user, query, options \\ []), to: Pleroma.Search.DatabaseSearch
391
392 def direct_conversation_id(activity, for_user) do
393 alias Pleroma.Conversation.Participation
394
395 with %{data: %{"context" => context}} when is_binary(context) <- activity,
396 %Pleroma.Conversation{} = conversation <- Pleroma.Conversation.get_for_ap_id(context),
397 %Participation{id: participation_id} <-
398 Participation.for_user_and_conversation(for_user, conversation) do
399 participation_id
400 else
401 _ -> nil
402 end
403 end
404
405 @spec get_by_object_ap_id_with_object(String.t()) :: t() | nil
406 def get_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
407 ap_id
408 |> Queries.by_object_id()
409 |> with_preloaded_object()
410 |> first()
411 |> Repo.one()
412 end
413
414 def get_by_object_ap_id_with_object(_), do: nil
415
416 @spec add_by_params_query(String.t(), String.t(), String.t()) :: Ecto.Query.t()
417 def add_by_params_query(object_id, actor, target) do
418 object_id
419 |> Queries.by_object_id()
420 |> Queries.by_type("Add")
421 |> Queries.by_actor(actor)
422 |> where([a], fragment("?->>'target' = ?", a.data, ^target))
423 end
424 end