43b1b089b6e938496e325973f3d7a11133f6c391
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
25
26 require Pleroma.Constants
27 require Logger
28
29 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
30
31 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
32
33 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
34
35 @impl true
36 def handle(object, meta \\ [])
37
38 # Task this handles
39 # - Follows
40 # - Sends a notification
41 @impl true
42 def handle(
43 %{
44 data: %{
45 "actor" => actor,
46 "type" => "Accept",
47 "object" => follow_activity_id
48 }
49 } = object,
50 meta
51 ) do
52 with %Activity{actor: follower_id} = follow_activity <-
53 Activity.get_by_ap_id(follow_activity_id),
54 %User{} = followed <- User.get_cached_by_ap_id(actor),
55 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
56 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
57 {:ok, _follower, followed} <-
58 FollowingRelationship.update(follower, followed, :follow_accept) do
59 Notification.update_notification_type(followed, follow_activity)
60 end
61
62 {:ok, object, meta}
63 end
64
65 # Task this handles
66 # - Rejects all existing follow activities for this person
67 # - Updates the follow state
68 # - Dismisses notification
69 @impl true
70 def handle(
71 %{
72 data: %{
73 "actor" => actor,
74 "type" => "Reject",
75 "object" => follow_activity_id
76 }
77 } = object,
78 meta
79 ) do
80 with %Activity{actor: follower_id} = follow_activity <-
81 Activity.get_by_ap_id(follow_activity_id),
82 %User{} = followed <- User.get_cached_by_ap_id(actor),
83 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
84 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
85 FollowingRelationship.update(follower, followed, :follow_reject)
86 Notification.dismiss(follow_activity)
87 end
88
89 {:ok, object, meta}
90 end
91
92 # Tasks this handle
93 # - Follows if possible
94 # - Sends a notification
95 # - Generates accept or reject if appropriate
96 @impl true
97 def handle(
98 %{
99 data: %{
100 "id" => follow_id,
101 "type" => "Follow",
102 "object" => followed_user,
103 "actor" => following_user
104 }
105 } = object,
106 meta
107 ) do
108 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
109 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
110 {_, {:ok, _, _}, _, _} <-
111 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
112 if followed.local && !followed.is_locked do
113 {:ok, accept_data, _} = Builder.accept(followed, object)
114 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
115 end
116 else
117 {:following, {:error, _}, _follower, followed} ->
118 {:ok, reject_data, _} = Builder.reject(followed, object)
119 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
120
121 _ ->
122 nil
123 end
124
125 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
126
127 meta =
128 meta
129 |> add_notifications(notifications)
130
131 updated_object = Activity.get_by_ap_id(follow_id)
132
133 {:ok, updated_object, meta}
134 end
135
136 # Tasks this handles:
137 # - Unfollow and block
138 @impl true
139 def handle(
140 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
141 object,
142 meta
143 ) do
144 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
145 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
146 User.block(blocker, blocked)
147 end
148
149 {:ok, object, meta}
150 end
151
152 # Tasks this handles:
153 # - Update the user
154 # - Update a non-user object (Note, Question, etc.)
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 updated_object_id = updated_object["id"]
161
162 with {_, true} <- {:has_id, is_binary(updated_object_id)},
163 %{"type" => type} <- updated_object,
164 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
165 if is_user do
166 handle_update_user(object, meta)
167 else
168 handle_update_object(object, meta)
169 end
170 else
171 _ ->
172 {:ok, object, meta}
173 end
174 end
175
176 # Tasks this handles:
177 # - Add like to object
178 # - Set up notification
179 @impl true
180 def handle(%{data: %{"type" => "Like"}} = object, meta) do
181 liked_object = Object.get_by_ap_id(object.data["object"])
182 Utils.add_like_to_object(object, liked_object)
183
184 Notification.create_notifications(object)
185
186 {:ok, object, meta}
187 end
188
189 # Tasks this handles
190 # - Actually create object
191 # - Rollback if we couldn't create it
192 # - Increase the user note count
193 # - Increase the reply count
194 # - Increase replies count
195 # - Set up ActivityExpiration
196 # - Set up notifications
197 # - Index incoming posts for search (if needed)
198 @impl true
199 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
200 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
201 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
202 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
203 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
204 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
205
206 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
207 Object.increase_replies_count(in_reply_to)
208 end
209
210 reply_depth = (meta[:depth] || 0) + 1
211
212 # FIXME: Force inReplyTo to replies
213 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
214 object.data["replies"] != nil do
215 for reply_id <- object.data["replies"] do
216 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
217 "id" => reply_id,
218 "depth" => reply_depth
219 })
220 end
221 end
222
223 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
224 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
225 end)
226
227 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
228
229 meta =
230 meta
231 |> add_notifications(notifications)
232
233 ap_streamer().stream_out(activity)
234
235 {:ok, activity, meta}
236 else
237 e -> Repo.rollback(e)
238 end
239 end
240
241 # Tasks this handles:
242 # - Add announce to object
243 # - Set up notification
244 # - Stream out the announce
245 @impl true
246 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
247 announced_object = Object.get_by_ap_id(object.data["object"])
248 user = User.get_cached_by_ap_id(object.data["actor"])
249
250 Utils.add_announce_to_object(object, announced_object)
251
252 if !User.is_internal_user?(user) do
253 Notification.create_notifications(object)
254
255 ap_streamer().stream_out(object)
256 end
257
258 {:ok, object, meta}
259 end
260
261 @impl true
262 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
263 with undone_object <- Activity.get_by_ap_id(undone_object),
264 :ok <- handle_undoing(undone_object) do
265 {:ok, object, meta}
266 end
267 end
268
269 # Tasks this handles:
270 # - Add reaction to object
271 # - Set up notification
272 @impl true
273 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
274 reacted_object = Object.get_by_ap_id(object.data["object"])
275 Utils.add_emoji_reaction_to_object(object, reacted_object)
276
277 Notification.create_notifications(object)
278
279 {:ok, object, meta}
280 end
281
282 # Tasks this handles:
283 # - Delete and unpins the create activity
284 # - Replace object with Tombstone
285 # - Set up notification
286 # - Reduce the user note count
287 # - Reduce the reply count
288 # - Stream out the activity
289 # - Removes posts from search index (if needed)
290 @impl true
291 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
292 deleted_object =
293 Object.normalize(deleted_object, fetch: false) ||
294 User.get_cached_by_ap_id(deleted_object)
295
296 result =
297 case deleted_object do
298 %Object{} ->
299 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
300 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
301 %User{} = user <- User.get_cached_by_ap_id(actor) do
302 User.remove_pinned_object_id(user, deleted_object.data["id"])
303
304 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
305
306 if in_reply_to = deleted_object.data["inReplyTo"] do
307 Object.decrease_replies_count(in_reply_to)
308 end
309
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
312 :ok
313 else
314 {:actor, _} ->
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316 :no_object_actor
317 end
318
319 %User{} ->
320 with {:ok, _} <- User.delete(deleted_object) do
321 :ok
322 end
323 end
324
325 if result == :ok do
326 Notification.create_notifications(object)
327
328 # Only remove from index when deleting actual objects, not users or anything else
329 with %Pleroma.Object{} <- deleted_object do
330 Pleroma.Search.remove_from_index(deleted_object)
331 end
332
333 {:ok, object, meta}
334 else
335 {:error, result}
336 end
337 end
338
339 # Tasks this handles:
340 # - adds pin to user
341 # - removes expiration job for pinned activity, if was set for expiration
342 @impl true
343 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
344 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
345 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
346 # if pinned activity was scheduled for deletion, we remove job
347 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
348 Oban.cancel_job(expiration.id)
349 end
350
351 {:ok, object, meta}
352 else
353 nil ->
354 {:error, :user_not_found}
355
356 {:error, changeset} ->
357 if changeset.errors[:pinned_objects] do
358 {:error, :pinned_statuses_limit_reached}
359 else
360 changeset.errors
361 end
362 end
363 end
364
365 # Tasks this handles:
366 # - removes pin from user
367 # - removes corresponding Add activity
368 # - if activity had expiration, recreates activity expiration job
369 @impl true
370 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
371 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
372 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
373 data["object"]
374 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
375 |> Repo.delete_all()
376
377 # if pinned activity was scheduled for deletion, we reschedule it for deletion
378 if meta[:expires_at] do
379 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
380 {:ok, expires_at} =
381 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
382
383 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
384 activity_id: meta[:activity_id],
385 expires_at: expires_at
386 })
387 end
388
389 {:ok, object, meta}
390 else
391 nil -> {:error, :user_not_found}
392 error -> error
393 end
394 end
395
396 # Nothing to do
397 @impl true
398 def handle(object, meta) do
399 {:ok, object, meta}
400 end
401
402 defp handle_update_user(
403 %{data: %{"type" => "Update", "object" => updated_object}} = object,
404 meta
405 ) do
406 if changeset = Keyword.get(meta, :user_update_changeset) do
407 changeset
408 |> User.update_and_set_cache()
409 else
410 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
411
412 User.get_by_ap_id(updated_object["id"])
413 |> User.remote_user_changeset(new_user_data)
414 |> User.update_and_set_cache()
415 end
416
417 {:ok, object, meta}
418 end
419
420 defp handle_update_object(
421 %{data: %{"type" => "Update", "object" => updated_object}} = object,
422 meta
423 ) do
424 orig_object_ap_id = updated_object["id"]
425 orig_object = Object.get_by_ap_id(orig_object_ap_id)
426 orig_object_data = orig_object.data
427
428 updated_object =
429 if meta[:local] do
430 # If this is a local Update, we don't process it by transmogrifier,
431 # so we use the embedded object as-is.
432 updated_object
433 else
434 meta[:object_data]
435 end
436
437 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
438 %{
439 updated_data: updated_object_data,
440 updated: updated,
441 used_history_in_new_object?: used_history_in_new_object?
442 } = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
443
444 changeset =
445 orig_object
446 |> Repo.preload(:hashtags)
447 |> Object.change(%{data: updated_object_data})
448
449 with {:ok, new_object} <- Repo.update(changeset),
450 {:ok, _} <- Object.invalid_object_cache(new_object),
451 {:ok, _} <- Object.set_cache(new_object),
452 # The metadata/utils.ex uses the object id for the cache.
453 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
454 if used_history_in_new_object? do
455 with create_activity when not is_nil(create_activity) <-
456 Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
457 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
458 nil
459 else
460 _ -> nil
461 end
462 end
463
464 if updated do
465 object
466 |> Activity.normalize()
467 |> ActivityPub.notify_and_stream()
468 end
469 end
470 end
471
472 {:ok, object, meta}
473 end
474
475 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
476 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
477 PollWorker.schedule_poll_end(activity)
478 {:ok, object, meta}
479 end
480 end
481
482 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
483 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
484 Object.increase_vote_count(
485 object.data["inReplyTo"],
486 object.data["name"],
487 object.data["actor"]
488 )
489
490 {:ok, object, meta}
491 end
492 end
493
494 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
495 when objtype in ~w[Audio Video Event Article Note Page] do
496 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
497 {:ok, object, meta}
498 end
499 end
500
501 # Nothing to do
502 def handle_object_creation(object, _activity, meta) do
503 {:ok, object, meta}
504 end
505
506 defp undo_like(nil, object), do: delete_object(object)
507
508 defp undo_like(%Object{} = liked_object, object) do
509 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
510 delete_object(object)
511 end
512 end
513
514 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
515 object.data["object"]
516 |> Object.get_by_ap_id()
517 |> undo_like(object)
518 end
519
520 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
521 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
522 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
523 {:ok, _} <- Repo.delete(object) do
524 :ok
525 end
526 end
527
528 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
529 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
530 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
531 {:ok, _} <- Repo.delete(object) do
532 :ok
533 end
534 end
535
536 def handle_undoing(
537 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
538 ) do
539 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
540 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
541 {:ok, _} <- User.unblock(blocker, blocked),
542 {:ok, _} <- Repo.delete(object) do
543 :ok
544 end
545 end
546
547 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
548
549 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
550 defp delete_object(object) do
551 with {:ok, _} <- Repo.delete(object), do: :ok
552 end
553
554 defp send_notifications(meta) do
555 Keyword.get(meta, :notifications, [])
556 |> Enum.each(fn notification ->
557 Streamer.stream(["user", "user:notification"], notification)
558 Push.send(notification)
559 end)
560
561 meta
562 end
563
564 defp send_streamables(meta) do
565 Keyword.get(meta, :streamables, [])
566 |> Enum.each(fn {topics, items} ->
567 Streamer.stream(topics, items)
568 end)
569
570 meta
571 end
572
573 defp add_notifications(meta, notifications) do
574 existing = Keyword.get(meta, :notifications, [])
575
576 meta
577 |> Keyword.put(:notifications, notifications ++ existing)
578 end
579
580 @impl true
581 def handle_after_transaction(meta) do
582 meta
583 |> send_notifications()
584 |> send_streamables()
585 end
586 end