e2371b6939d4240e5f305d196afb25f9d657a929
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Chat
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Repo
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 # - Index incoming posts for search (if needed)
197 @impl true
198 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
199 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
200 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
201 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
202 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
204
205 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
206 Object.increase_replies_count(in_reply_to)
207 end
208
209 reply_depth = (meta[:depth] || 0) + 1
210
211 # FIXME: Force inReplyTo to replies
212 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
213 object.data["replies"] != nil do
214 for reply_id <- object.data["replies"] do
215 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
216 "id" => reply_id,
217 "depth" => reply_depth
218 })
219 end
220 end
221
222 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
223 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
224 end)
225
226 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
227
228 meta =
229 meta
230 |> add_notifications(notifications)
231
232 ap_streamer().stream_out(activity)
233
234 {:ok, activity, meta}
235 else
236 e -> Repo.rollback(e)
237 end
238 end
239
240 # Tasks this handles:
241 # - Add announce to object
242 # - Set up notification
243 # - Stream out the announce
244 @impl true
245 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
246 announced_object = Object.get_by_ap_id(object.data["object"])
247 user = User.get_cached_by_ap_id(object.data["actor"])
248
249 Utils.add_announce_to_object(object, announced_object)
250
251 if !User.is_internal_user?(user) do
252 Notification.create_notifications(object)
253
254 ap_streamer().stream_out(object)
255 end
256
257 {:ok, object, meta}
258 end
259
260 @impl true
261 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
262 with undone_object <- Activity.get_by_ap_id(undone_object),
263 :ok <- handle_undoing(undone_object) do
264 {:ok, object, meta}
265 end
266 end
267
268 # Tasks this handles:
269 # - Add reaction to object
270 # - Set up notification
271 @impl true
272 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
273 reacted_object = Object.get_by_ap_id(object.data["object"])
274 Utils.add_emoji_reaction_to_object(object, reacted_object)
275
276 Notification.create_notifications(object)
277
278 {:ok, object, meta}
279 end
280
281 # Tasks this handles:
282 # - Delete and unpins the create activity
283 # - Replace object with Tombstone
284 # - Set up notification
285 # - Reduce the user note count
286 # - Reduce the reply count
287 # - Stream out the activity
288 # - Removes posts from search index (if needed)
289 @impl true
290 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
291 deleted_object =
292 Object.normalize(deleted_object, fetch: false) ||
293 User.get_cached_by_ap_id(deleted_object)
294
295 result =
296 case deleted_object do
297 %Object{} ->
298 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
299 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
300 %User{} = user <- User.get_cached_by_ap_id(actor) do
301 User.remove_pinned_object_id(user, deleted_object.data["id"])
302
303 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
304
305 if in_reply_to = deleted_object.data["inReplyTo"] do
306 Object.decrease_replies_count(in_reply_to)
307 end
308
309 MessageReference.delete_for_object(deleted_object)
310
311 ap_streamer().stream_out(object)
312 ap_streamer().stream_out_participations(deleted_object, user)
313 :ok
314 else
315 {:actor, _} ->
316 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
317 :no_object_actor
318 end
319
320 %User{} ->
321 with {:ok, _} <- User.delete(deleted_object) do
322 :ok
323 end
324 end
325
326 if result == :ok do
327 Notification.create_notifications(object)
328
329 # Only remove from index when deleting actual objects, not users or anything else
330 with %Pleroma.Object{} <- deleted_object do
331 Pleroma.Search.remove_from_index(deleted_object)
332 end
333
334 {:ok, object, meta}
335 else
336 {:error, result}
337 end
338 end
339
340 # Tasks this handles:
341 # - adds pin to user
342 # - removes expiration job for pinned activity, if was set for expiration
343 @impl true
344 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
345 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
346 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
347 # if pinned activity was scheduled for deletion, we remove job
348 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
349 Oban.cancel_job(expiration.id)
350 end
351
352 {:ok, object, meta}
353 else
354 nil ->
355 {:error, :user_not_found}
356
357 {:error, changeset} ->
358 if changeset.errors[:pinned_objects] do
359 {:error, :pinned_statuses_limit_reached}
360 else
361 changeset.errors
362 end
363 end
364 end
365
366 # Tasks this handles:
367 # - removes pin from user
368 # - removes corresponding Add activity
369 # - if activity had expiration, recreates activity expiration job
370 @impl true
371 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
372 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
373 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
374 data["object"]
375 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
376 |> Repo.delete_all()
377
378 # if pinned activity was scheduled for deletion, we reschedule it for deletion
379 if meta[:expires_at] do
380 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
381 {:ok, expires_at} =
382 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
383
384 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
385 activity_id: meta[:activity_id],
386 expires_at: expires_at
387 })
388 end
389
390 {:ok, object, meta}
391 else
392 nil -> {:error, :user_not_found}
393 error -> error
394 end
395 end
396
397 # Nothing to do
398 @impl true
399 def handle(object, meta) do
400 {:ok, object, meta}
401 end
402
403 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
404 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
405 actor = User.get_cached_by_ap_id(object.data["actor"])
406 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
407
408 streamables =
409 [[actor, recipient], [recipient, actor]]
410 |> Enum.uniq()
411 |> Enum.map(fn [user, other_user] ->
412 if user.local do
413 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
414 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
415
416 @cachex.put(
417 :chat_message_id_idempotency_key_cache,
418 cm_ref.id,
419 meta[:idempotency_key]
420 )
421
422 {
423 ["user", "user:pleroma_chat"],
424 {user, %{cm_ref | chat: chat, object: object}}
425 }
426 end
427 end)
428 |> Enum.filter(& &1)
429
430 meta =
431 meta
432 |> add_streamables(streamables)
433
434 {:ok, object, meta}
435 end
436 end
437
438 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
439 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
440 PollWorker.schedule_poll_end(activity)
441 {:ok, object, meta}
442 end
443 end
444
445 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
446 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
447 Object.increase_vote_count(
448 object.data["inReplyTo"],
449 object.data["name"],
450 object.data["actor"]
451 )
452
453 {:ok, object, meta}
454 end
455 end
456
457 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
458 when objtype in ~w[Audio Video Event Article Note Page] do
459 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
460 {:ok, object, meta}
461 end
462 end
463
464 # Nothing to do
465 def handle_object_creation(object, _activity, meta) do
466 {:ok, object, meta}
467 end
468
469 defp undo_like(nil, object), do: delete_object(object)
470
471 defp undo_like(%Object{} = liked_object, object) do
472 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
473 delete_object(object)
474 end
475 end
476
477 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
478 object.data["object"]
479 |> Object.get_by_ap_id()
480 |> undo_like(object)
481 end
482
483 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
484 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
485 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
486 {:ok, _} <- Repo.delete(object) do
487 :ok
488 end
489 end
490
491 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
492 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
493 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
494 {:ok, _} <- Repo.delete(object) do
495 :ok
496 end
497 end
498
499 def handle_undoing(
500 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
501 ) do
502 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
503 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
504 {:ok, _} <- User.unblock(blocker, blocked),
505 {:ok, _} <- Repo.delete(object) do
506 :ok
507 end
508 end
509
510 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
511
512 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
513 defp delete_object(object) do
514 with {:ok, _} <- Repo.delete(object), do: :ok
515 end
516
517 defp send_notifications(meta) do
518 Keyword.get(meta, :notifications, [])
519 |> Enum.each(fn notification ->
520 Streamer.stream(["user", "user:notification"], notification)
521 Push.send(notification)
522 end)
523
524 meta
525 end
526
527 defp send_streamables(meta) do
528 Keyword.get(meta, :streamables, [])
529 |> Enum.each(fn {topics, items} ->
530 Streamer.stream(topics, items)
531 end)
532
533 meta
534 end
535
536 defp add_streamables(meta, streamables) do
537 existing = Keyword.get(meta, :streamables, [])
538
539 meta
540 |> Keyword.put(:streamables, streamables ++ existing)
541 end
542
543 defp add_notifications(meta, notifications) do
544 existing = Keyword.get(meta, :notifications, [])
545
546 meta
547 |> Keyword.put(:notifications, notifications ++ existing)
548 end
549
550 @impl true
551 def handle_after_transaction(meta) do
552 meta
553 |> send_notifications()
554 |> send_streamables()
555 end
556 end