Scrape instance nodeinfo (#251)
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
25
26 require Pleroma.Constants
27 require Logger
28
29 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
30
31 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
32
33 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
34
35 @impl true
36 def handle(object, meta \\ [])
37
38 # Task this handles
39 # - Follows
40 # - Sends a notification
41 @impl true
42 def handle(
43 %{
44 data: %{
45 "actor" => actor,
46 "type" => "Accept",
47 "object" => follow_activity_id
48 }
49 } = object,
50 meta
51 ) do
52 with %Activity{actor: follower_id} = follow_activity <-
53 Activity.get_by_ap_id(follow_activity_id),
54 %User{} = followed <- User.get_cached_by_ap_id(actor),
55 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
56 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
57 {:ok, _follower, followed} <-
58 FollowingRelationship.update(follower, followed, :follow_accept) do
59 Notification.update_notification_type(followed, follow_activity)
60 end
61
62 {:ok, object, meta}
63 end
64
65 # Task this handles
66 # - Rejects all existing follow activities for this person
67 # - Updates the follow state
68 # - Dismisses notification
69 @impl true
70 def handle(
71 %{
72 data: %{
73 "actor" => actor,
74 "type" => "Reject",
75 "object" => follow_activity_id
76 }
77 } = object,
78 meta
79 ) do
80 with %Activity{actor: follower_id} = follow_activity <-
81 Activity.get_by_ap_id(follow_activity_id),
82 %User{} = followed <- User.get_cached_by_ap_id(actor),
83 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
84 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
85 FollowingRelationship.update(follower, followed, :follow_reject)
86 Notification.dismiss(follow_activity)
87 end
88
89 {:ok, object, meta}
90 end
91
92 # Tasks this handle
93 # - Follows if possible
94 # - Sends a notification
95 # - Generates accept or reject if appropriate
96 @impl true
97 def handle(
98 %{
99 data: %{
100 "id" => follow_id,
101 "type" => "Follow",
102 "object" => followed_user,
103 "actor" => following_user
104 }
105 } = object,
106 meta
107 ) do
108 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
109 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
110 {_, {:ok, _, _}, _, _} <-
111 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
112 if followed.local && !followed.is_locked do
113 {:ok, accept_data, _} = Builder.accept(followed, object)
114 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
115 end
116 else
117 {:following, {:error, _}, _follower, followed} ->
118 {:ok, reject_data, _} = Builder.reject(followed, object)
119 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
120
121 _ ->
122 nil
123 end
124
125 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
126
127 meta =
128 meta
129 |> add_notifications(notifications)
130
131 updated_object = Activity.get_by_ap_id(follow_id)
132
133 {:ok, updated_object, meta}
134 end
135
136 # Tasks this handles:
137 # - Unfollow and block
138 @impl true
139 def handle(
140 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
141 object,
142 meta
143 ) do
144 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
145 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
146 User.block(blocker, blocked)
147 end
148
149 {:ok, object, meta}
150 end
151
152 # Tasks this handles:
153 # - Update the user
154 # - Update a non-user object (Note, Question, etc.)
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 updated_object_id = updated_object["id"]
161
162 with {_, true} <- {:has_id, is_binary(updated_object_id)},
163 %{"type" => type} <- updated_object,
164 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
165 if is_user do
166 handle_update_user(object, meta)
167 else
168 handle_update_object(object, meta)
169 end
170 else
171 _ ->
172 {:ok, object, meta}
173 end
174 end
175
176 # Tasks this handles:
177 # - Add like to object
178 # - Set up notification
179 @impl true
180 def handle(%{data: %{"type" => "Like"}} = object, meta) do
181 liked_object = Object.get_by_ap_id(object.data["object"])
182 Utils.add_like_to_object(object, liked_object)
183
184 Notification.create_notifications(object)
185
186 {:ok, object, meta}
187 end
188
189 # Tasks this handles
190 # - Actually create object
191 # - Rollback if we couldn't create it
192 # - Increase the user note count
193 # - Increase the reply count
194 # - Increase replies count
195 # - Ask for scraping of nodeinfo
196 # - Set up ActivityExpiration
197 # - Set up notifications
198 # - Index incoming posts for search (if needed)
199 @impl true
200 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
201 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
202 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
203 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
204 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
205 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
206
207 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
208 Object.increase_replies_count(in_reply_to)
209 end
210
211 reply_depth = (meta[:depth] || 0) + 1
212
213 Pleroma.Workers.NodeInfoFetcherWorker.enqueue("process", %{
214 "source_url" => activity.data["actor"]
215 })
216
217 # FIXME: Force inReplyTo to replies
218 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
219 object.data["replies"] != nil do
220 for reply_id <- object.data["replies"] do
221 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
222 "id" => reply_id,
223 "depth" => reply_depth
224 })
225 end
226 end
227
228 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
229 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
230 end)
231
232 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
233
234 meta =
235 meta
236 |> add_notifications(notifications)
237
238 ap_streamer().stream_out(activity)
239
240 {:ok, activity, meta}
241 else
242 e ->
243 Logger.error(inspect(e))
244 Repo.rollback(e)
245 end
246 end
247
248 # Tasks this handles:
249 # - Add announce to object
250 # - Set up notification
251 # - Stream out the announce
252 @impl true
253 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
254 announced_object = Object.get_by_ap_id(object.data["object"])
255 user = User.get_cached_by_ap_id(object.data["actor"])
256
257 Utils.add_announce_to_object(object, announced_object)
258
259 if !User.is_internal_user?(user) do
260 Notification.create_notifications(object)
261
262 ap_streamer().stream_out(object)
263 end
264
265 {:ok, object, meta}
266 end
267
268 @impl true
269 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
270 with undone_object <- Activity.get_by_ap_id(undone_object),
271 :ok <- handle_undoing(undone_object) do
272 {:ok, object, meta}
273 end
274 end
275
276 # Tasks this handles:
277 # - Add reaction to object
278 # - Set up notification
279 @impl true
280 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
281 reacted_object = Object.get_by_ap_id(object.data["object"])
282 Utils.add_emoji_reaction_to_object(object, reacted_object)
283
284 Notification.create_notifications(object)
285
286 {:ok, object, meta}
287 end
288
289 # Tasks this handles:
290 # - Delete and unpins the create activity
291 # - Replace object with Tombstone
292 # - Set up notification
293 # - Reduce the user note count
294 # - Reduce the reply count
295 # - Stream out the activity
296 # - Removes posts from search index (if needed)
297 @impl true
298 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
299 deleted_object =
300 Object.normalize(deleted_object, fetch: false) ||
301 User.get_cached_by_ap_id(deleted_object)
302
303 result =
304 case deleted_object do
305 %Object{} ->
306 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
307 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
308 %User{} = user <- User.get_cached_by_ap_id(actor) do
309 User.remove_pinned_object_id(user, deleted_object.data["id"])
310
311 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
312
313 if in_reply_to = deleted_object.data["inReplyTo"] do
314 Object.decrease_replies_count(in_reply_to)
315 end
316
317 ap_streamer().stream_out(object)
318 ap_streamer().stream_out_participations(deleted_object, user)
319 :ok
320 else
321 {:actor, _} ->
322 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
323 :no_object_actor
324 end
325
326 %User{} ->
327 with {:ok, _} <- User.delete(deleted_object) do
328 :ok
329 end
330 end
331
332 if result == :ok do
333 # Only remove from index when deleting actual objects, not users or anything else
334 with %Pleroma.Object{} <- deleted_object do
335 Pleroma.Search.remove_from_index(deleted_object)
336 end
337
338 {:ok, object, meta}
339 else
340 {:error, result}
341 end
342 end
343
344 # Tasks this handles:
345 # - adds pin to user
346 # - removes expiration job for pinned activity, if was set for expiration
347 @impl true
348 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
349 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
350 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
351 # if pinned activity was scheduled for deletion, we remove job
352 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
353 Oban.cancel_job(expiration.id)
354 end
355
356 {:ok, object, meta}
357 else
358 nil ->
359 {:error, :user_not_found}
360
361 {:error, changeset} ->
362 if changeset.errors[:pinned_objects] do
363 {:error, :pinned_statuses_limit_reached}
364 else
365 changeset.errors
366 end
367 end
368 end
369
370 # Tasks this handles:
371 # - removes pin from user
372 # - removes corresponding Add activity
373 # - if activity had expiration, recreates activity expiration job
374 @impl true
375 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
376 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
377 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
378 data["object"]
379 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
380 |> Repo.delete_all()
381
382 # if pinned activity was scheduled for deletion, we reschedule it for deletion
383 if meta[:expires_at] do
384 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
385 {:ok, expires_at} =
386 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
387
388 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
389 activity_id: meta[:activity_id],
390 expires_at: expires_at
391 })
392 end
393
394 {:ok, object, meta}
395 else
396 nil -> {:error, :user_not_found}
397 error -> error
398 end
399 end
400
401 # Nothing to do
402 @impl true
403 def handle(object, meta) do
404 {:ok, object, meta}
405 end
406
407 defp handle_update_user(
408 %{data: %{"type" => "Update", "object" => updated_object}} = object,
409 meta
410 ) do
411 if changeset = Keyword.get(meta, :user_update_changeset) do
412 changeset
413 |> User.update_and_set_cache()
414 else
415 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
416
417 User.get_by_ap_id(updated_object["id"])
418 |> User.remote_user_changeset(new_user_data)
419 |> User.update_and_set_cache()
420 end
421
422 {:ok, object, meta}
423 end
424
425 defp handle_update_object(
426 %{data: %{"type" => "Update", "object" => updated_object}} = object,
427 meta
428 ) do
429 orig_object_ap_id = updated_object["id"]
430 orig_object = Object.get_by_ap_id(orig_object_ap_id)
431 orig_object_data = orig_object.data
432
433 updated_object =
434 if meta[:local] do
435 # If this is a local Update, we don't process it by transmogrifier,
436 # so we use the embedded object as-is.
437 updated_object
438 else
439 meta[:object_data]
440 end
441
442 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
443 %{
444 updated_data: updated_object_data,
445 updated: updated,
446 used_history_in_new_object?: used_history_in_new_object?
447 } = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
448
449 changeset =
450 orig_object
451 |> Repo.preload(:hashtags)
452 |> Object.change(%{data: updated_object_data})
453
454 with {:ok, new_object} <- Repo.update(changeset),
455 {:ok, _} <- Object.invalid_object_cache(new_object),
456 {:ok, _} <- Object.set_cache(new_object),
457 # The metadata/utils.ex uses the object id for the cache.
458 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
459 if used_history_in_new_object? do
460 with create_activity when not is_nil(create_activity) <-
461 Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
462 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
463 nil
464 else
465 _ -> nil
466 end
467 end
468
469 if updated do
470 object
471 |> Activity.normalize()
472 |> ActivityPub.notify_and_stream()
473 end
474 end
475 end
476
477 {:ok, object, meta}
478 end
479
480 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
481 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
482 PollWorker.schedule_poll_end(activity)
483 {:ok, object, meta}
484 end
485 end
486
487 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
488 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
489 Object.increase_vote_count(
490 object.data["inReplyTo"],
491 object.data["name"],
492 object.data["actor"]
493 )
494
495 {:ok, object, meta}
496 end
497 end
498
499 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
500 when objtype in ~w[Audio Video Event Article Note Page] do
501 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
502 {:ok, object, meta}
503 end
504 end
505
506 # Nothing to do
507 def handle_object_creation(object, _activity, meta) do
508 {:ok, object, meta}
509 end
510
511 defp undo_like(nil, object), do: delete_object(object)
512
513 defp undo_like(%Object{} = liked_object, object) do
514 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
515 delete_object(object)
516 end
517 end
518
519 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
520 object.data["object"]
521 |> Object.get_by_ap_id()
522 |> undo_like(object)
523 end
524
525 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
526 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
527 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
528 {:ok, _} <- Repo.delete(object) do
529 :ok
530 end
531 end
532
533 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
534 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
535 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
536 {:ok, _} <- Repo.delete(object) do
537 :ok
538 end
539 end
540
541 def handle_undoing(
542 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
543 ) do
544 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
545 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
546 {:ok, _} <- User.unblock(blocker, blocked),
547 {:ok, _} <- Repo.delete(object) do
548 :ok
549 end
550 end
551
552 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
553
554 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
555 defp delete_object(object) do
556 with {:ok, _} <- Repo.delete(object), do: :ok
557 end
558
559 defp send_notifications(meta) do
560 Keyword.get(meta, :notifications, [])
561 |> Enum.each(fn notification ->
562 Streamer.stream(["user", "user:notification"], notification)
563 Push.send(notification)
564 end)
565
566 meta
567 end
568
569 defp send_streamables(meta) do
570 Keyword.get(meta, :streamables, [])
571 |> Enum.each(fn {topics, items} ->
572 Streamer.stream(topics, items)
573 end)
574
575 meta
576 end
577
578 defp add_notifications(meta, notifications) do
579 existing = Keyword.get(meta, :notifications, [])
580
581 meta
582 |> Keyword.put(:notifications, notifications ++ existing)
583 end
584
585 @impl true
586 def handle_after_transaction(meta) do
587 meta
588 |> send_notifications()
589 |> send_streamables()
590 end
591 end