Merge branch 'features/ingestion-page' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 @impl true
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202
203 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
204 Object.increase_replies_count(in_reply_to)
205 end
206
207 reply_depth = (meta[:depth] || 0) + 1
208
209 # FIXME: Force inReplyTo to replies
210 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
211 object.data["replies"] != nil do
212 for reply_id <- object.data["replies"] do
213 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
214 "id" => reply_id,
215 "depth" => reply_depth
216 })
217 end
218 end
219
220 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
221 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
222 end)
223
224 meta =
225 meta
226 |> add_notifications(notifications)
227
228 {:ok, activity, meta}
229 else
230 e -> Repo.rollback(e)
231 end
232 end
233
234 # Tasks this handles:
235 # - Add announce to object
236 # - Set up notification
237 # - Stream out the announce
238 @impl true
239 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
240 announced_object = Object.get_by_ap_id(object.data["object"])
241 user = User.get_cached_by_ap_id(object.data["actor"])
242
243 Utils.add_announce_to_object(object, announced_object)
244
245 if !User.is_internal_user?(user) do
246 Notification.create_notifications(object)
247
248 object
249 |> Topics.get_activity_topics()
250 |> Streamer.stream(object)
251 end
252
253 {:ok, object, meta}
254 end
255
256 @impl true
257 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
258 with undone_object <- Activity.get_by_ap_id(undone_object),
259 :ok <- handle_undoing(undone_object) do
260 {:ok, object, meta}
261 end
262 end
263
264 # Tasks this handles:
265 # - Add reaction to object
266 # - Set up notification
267 @impl true
268 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
269 reacted_object = Object.get_by_ap_id(object.data["object"])
270 Utils.add_emoji_reaction_to_object(object, reacted_object)
271
272 Notification.create_notifications(object)
273
274 {:ok, object, meta}
275 end
276
277 # Tasks this handles:
278 # - Delete and unpins the create activity
279 # - Replace object with Tombstone
280 # - Set up notification
281 # - Reduce the user note count
282 # - Reduce the reply count
283 # - Stream out the activity
284 @impl true
285 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
286 deleted_object =
287 Object.normalize(deleted_object, fetch: false) ||
288 User.get_cached_by_ap_id(deleted_object)
289
290 result =
291 case deleted_object do
292 %Object{} ->
293 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
294 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
295 %User{} = user <- User.get_cached_by_ap_id(actor) do
296 User.remove_pinned_object_id(user, deleted_object.data["id"])
297
298 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
299
300 if in_reply_to = deleted_object.data["inReplyTo"] do
301 Object.decrease_replies_count(in_reply_to)
302 end
303
304 MessageReference.delete_for_object(deleted_object)
305
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
308 :ok
309 else
310 {:actor, _} ->
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
312 :no_object_actor
313 end
314
315 %User{} ->
316 with {:ok, _} <- User.delete(deleted_object) do
317 :ok
318 end
319 end
320
321 if result == :ok do
322 Notification.create_notifications(object)
323 {:ok, object, meta}
324 else
325 {:error, result}
326 end
327 end
328
329 # Tasks this handles:
330 # - adds pin to user
331 # - removes expiration job for pinned activity, if was set for expiration
332 @impl true
333 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
334 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
335 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
336 # if pinned activity was scheduled for deletion, we remove job
337 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
338 Oban.cancel_job(expiration.id)
339 end
340
341 {:ok, object, meta}
342 else
343 nil ->
344 {:error, :user_not_found}
345
346 {:error, changeset} ->
347 if changeset.errors[:pinned_objects] do
348 {:error, :pinned_statuses_limit_reached}
349 else
350 changeset.errors
351 end
352 end
353 end
354
355 # Tasks this handles:
356 # - removes pin from user
357 # - removes corresponding Add activity
358 # - if activity had expiration, recreates activity expiration job
359 @impl true
360 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
361 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
362 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
363 data["object"]
364 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
365 |> Repo.delete_all()
366
367 # if pinned activity was scheduled for deletion, we reschedule it for deletion
368 if meta[:expires_at] do
369 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
370 {:ok, expires_at} =
371 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
372
373 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
374 activity_id: meta[:activity_id],
375 expires_at: expires_at
376 })
377 end
378
379 {:ok, object, meta}
380 else
381 nil -> {:error, :user_not_found}
382 error -> error
383 end
384 end
385
386 # Nothing to do
387 @impl true
388 def handle(object, meta) do
389 {:ok, object, meta}
390 end
391
392 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
393 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
394 actor = User.get_cached_by_ap_id(object.data["actor"])
395 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
396
397 streamables =
398 [[actor, recipient], [recipient, actor]]
399 |> Enum.uniq()
400 |> Enum.map(fn [user, other_user] ->
401 if user.local do
402 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
403 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
404
405 @cachex.put(
406 :chat_message_id_idempotency_key_cache,
407 cm_ref.id,
408 meta[:idempotency_key]
409 )
410
411 {
412 ["user", "user:pleroma_chat"],
413 {user, %{cm_ref | chat: chat, object: object}}
414 }
415 end
416 end)
417 |> Enum.filter(& &1)
418
419 meta =
420 meta
421 |> add_streamables(streamables)
422
423 {:ok, object, meta}
424 end
425 end
426
427 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
428 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
429 Object.increase_vote_count(
430 object.data["inReplyTo"],
431 object.data["name"],
432 object.data["actor"]
433 )
434
435 {:ok, object, meta}
436 end
437 end
438
439 def handle_object_creation(%{"type" => objtype} = object, meta)
440 when objtype in ~w[Audio Video Question Event Article Note Page] do
441 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
442 {:ok, object, meta}
443 end
444 end
445
446 # Nothing to do
447 def handle_object_creation(object, meta) do
448 {:ok, object, meta}
449 end
450
451 defp undo_like(nil, object), do: delete_object(object)
452
453 defp undo_like(%Object{} = liked_object, object) do
454 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
455 delete_object(object)
456 end
457 end
458
459 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
460 object.data["object"]
461 |> Object.get_by_ap_id()
462 |> undo_like(object)
463 end
464
465 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
466 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
467 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
468 {:ok, _} <- Repo.delete(object) do
469 :ok
470 end
471 end
472
473 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
474 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
475 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
476 {:ok, _} <- Repo.delete(object) do
477 :ok
478 end
479 end
480
481 def handle_undoing(
482 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
483 ) do
484 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
485 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
486 {:ok, _} <- User.unblock(blocker, blocked),
487 {:ok, _} <- Repo.delete(object) do
488 :ok
489 end
490 end
491
492 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
493
494 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
495 defp delete_object(object) do
496 with {:ok, _} <- Repo.delete(object), do: :ok
497 end
498
499 defp send_notifications(meta) do
500 Keyword.get(meta, :notifications, [])
501 |> Enum.each(fn notification ->
502 Streamer.stream(["user", "user:notification"], notification)
503 Push.send(notification)
504 end)
505
506 meta
507 end
508
509 defp send_streamables(meta) do
510 Keyword.get(meta, :streamables, [])
511 |> Enum.each(fn {topics, items} ->
512 Streamer.stream(topics, items)
513 end)
514
515 meta
516 end
517
518 defp add_streamables(meta, streamables) do
519 existing = Keyword.get(meta, :streamables, [])
520
521 meta
522 |> Keyword.put(:streamables, streamables ++ existing)
523 end
524
525 defp add_notifications(meta, notifications) do
526 existing = Keyword.get(meta, :notifications, [])
527
528 meta
529 |> Keyword.put(:notifications, notifications ++ existing)
530 end
531
532 @impl true
533 def handle_after_transaction(meta) do
534 meta
535 |> send_notifications()
536 |> send_streamables()
537 end
538 end