MastoAPI: Profile directory
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Chat
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Repo
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 @impl true
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
203
204 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
205 Object.increase_replies_count(in_reply_to)
206 end
207
208 reply_depth = (meta[:depth] || 0) + 1
209
210 # FIXME: Force inReplyTo to replies
211 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
212 object.data["replies"] != nil do
213 for reply_id <- object.data["replies"] do
214 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
215 "id" => reply_id,
216 "depth" => reply_depth
217 })
218 end
219 end
220
221 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
222 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
223 end)
224
225 meta =
226 meta
227 |> add_notifications(notifications)
228
229 ap_streamer().stream_out(activity)
230
231 {:ok, activity, meta}
232 else
233 e -> Repo.rollback(e)
234 end
235 end
236
237 # Tasks this handles:
238 # - Add announce to object
239 # - Set up notification
240 # - Stream out the announce
241 @impl true
242 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
243 announced_object = Object.get_by_ap_id(object.data["object"])
244 user = User.get_cached_by_ap_id(object.data["actor"])
245
246 Utils.add_announce_to_object(object, announced_object)
247
248 if !User.is_internal_user?(user) do
249 Notification.create_notifications(object)
250
251 ap_streamer().stream_out(object)
252 end
253
254 {:ok, object, meta}
255 end
256
257 @impl true
258 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
259 with undone_object <- Activity.get_by_ap_id(undone_object),
260 :ok <- handle_undoing(undone_object) do
261 {:ok, object, meta}
262 end
263 end
264
265 # Tasks this handles:
266 # - Add reaction to object
267 # - Set up notification
268 @impl true
269 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
270 reacted_object = Object.get_by_ap_id(object.data["object"])
271 Utils.add_emoji_reaction_to_object(object, reacted_object)
272
273 Notification.create_notifications(object)
274
275 {:ok, object, meta}
276 end
277
278 # Tasks this handles:
279 # - Delete and unpins the create activity
280 # - Replace object with Tombstone
281 # - Set up notification
282 # - Reduce the user note count
283 # - Reduce the reply count
284 # - Stream out the activity
285 @impl true
286 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
287 deleted_object =
288 Object.normalize(deleted_object, fetch: false) ||
289 User.get_cached_by_ap_id(deleted_object)
290
291 result =
292 case deleted_object do
293 %Object{} ->
294 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
295 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
296 %User{} = user <- User.get_cached_by_ap_id(actor) do
297 User.remove_pinned_object_id(user, deleted_object.data["id"])
298
299 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
300
301 if in_reply_to = deleted_object.data["inReplyTo"] do
302 Object.decrease_replies_count(in_reply_to)
303 end
304
305 MessageReference.delete_for_object(deleted_object)
306
307 ap_streamer().stream_out(object)
308 ap_streamer().stream_out_participations(deleted_object, user)
309 :ok
310 else
311 {:actor, _} ->
312 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
313 :no_object_actor
314 end
315
316 %User{} ->
317 with {:ok, _} <- User.delete(deleted_object) do
318 :ok
319 end
320 end
321
322 if result == :ok do
323 Notification.create_notifications(object)
324 {:ok, object, meta}
325 else
326 {:error, result}
327 end
328 end
329
330 # Tasks this handles:
331 # - adds pin to user
332 # - removes expiration job for pinned activity, if was set for expiration
333 @impl true
334 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
335 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
336 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
337 # if pinned activity was scheduled for deletion, we remove job
338 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
339 Oban.cancel_job(expiration.id)
340 end
341
342 {:ok, object, meta}
343 else
344 nil ->
345 {:error, :user_not_found}
346
347 {:error, changeset} ->
348 if changeset.errors[:pinned_objects] do
349 {:error, :pinned_statuses_limit_reached}
350 else
351 changeset.errors
352 end
353 end
354 end
355
356 # Tasks this handles:
357 # - removes pin from user
358 # - removes corresponding Add activity
359 # - if activity had expiration, recreates activity expiration job
360 @impl true
361 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
362 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
363 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
364 data["object"]
365 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
366 |> Repo.delete_all()
367
368 # if pinned activity was scheduled for deletion, we reschedule it for deletion
369 if meta[:expires_at] do
370 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
371 {:ok, expires_at} =
372 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
373
374 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
375 activity_id: meta[:activity_id],
376 expires_at: expires_at
377 })
378 end
379
380 {:ok, object, meta}
381 else
382 nil -> {:error, :user_not_found}
383 error -> error
384 end
385 end
386
387 # Nothing to do
388 @impl true
389 def handle(object, meta) do
390 {:ok, object, meta}
391 end
392
393 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
394 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
395 actor = User.get_cached_by_ap_id(object.data["actor"])
396 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
397
398 streamables =
399 [[actor, recipient], [recipient, actor]]
400 |> Enum.uniq()
401 |> Enum.map(fn [user, other_user] ->
402 if user.local do
403 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
404 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
405
406 @cachex.put(
407 :chat_message_id_idempotency_key_cache,
408 cm_ref.id,
409 meta[:idempotency_key]
410 )
411
412 {
413 ["user", "user:pleroma_chat"],
414 {user, %{cm_ref | chat: chat, object: object}}
415 }
416 end
417 end)
418 |> Enum.filter(& &1)
419
420 meta =
421 meta
422 |> add_streamables(streamables)
423
424 {:ok, object, meta}
425 end
426 end
427
428 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
429 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
430 PollWorker.schedule_poll_end(activity)
431 {:ok, object, meta}
432 end
433 end
434
435 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
436 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
437 Object.increase_vote_count(
438 object.data["inReplyTo"],
439 object.data["name"],
440 object.data["actor"]
441 )
442
443 {:ok, object, meta}
444 end
445 end
446
447 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
448 when objtype in ~w[Audio Video Event Article Note Page] do
449 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
450 {:ok, object, meta}
451 end
452 end
453
454 # Nothing to do
455 def handle_object_creation(object, _activity, meta) do
456 {:ok, object, meta}
457 end
458
459 defp undo_like(nil, object), do: delete_object(object)
460
461 defp undo_like(%Object{} = liked_object, object) do
462 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
463 delete_object(object)
464 end
465 end
466
467 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
468 object.data["object"]
469 |> Object.get_by_ap_id()
470 |> undo_like(object)
471 end
472
473 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
474 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
475 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
476 {:ok, _} <- Repo.delete(object) do
477 :ok
478 end
479 end
480
481 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
482 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
483 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
484 {:ok, _} <- Repo.delete(object) do
485 :ok
486 end
487 end
488
489 def handle_undoing(
490 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
491 ) do
492 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
493 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
494 {:ok, _} <- User.unblock(blocker, blocked),
495 {:ok, _} <- Repo.delete(object) do
496 :ok
497 end
498 end
499
500 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
501
502 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
503 defp delete_object(object) do
504 with {:ok, _} <- Repo.delete(object), do: :ok
505 end
506
507 defp send_notifications(meta) do
508 Keyword.get(meta, :notifications, [])
509 |> Enum.each(fn notification ->
510 Streamer.stream(["user", "user:notification"], notification)
511 Push.send(notification)
512 end)
513
514 meta
515 end
516
517 defp send_streamables(meta) do
518 Keyword.get(meta, :streamables, [])
519 |> Enum.each(fn {topics, items} ->
520 Streamer.stream(topics, items)
521 end)
522
523 meta
524 end
525
526 defp add_streamables(meta, streamables) do
527 existing = Keyword.get(meta, :streamables, [])
528
529 meta
530 |> Keyword.put(:streamables, streamables ++ existing)
531 end
532
533 defp add_notifications(meta, notifications) do
534 existing = Keyword.get(meta, :notifications, [])
535
536 meta
537 |> Keyword.put(:notifications, notifications ++ existing)
538 end
539
540 @impl true
541 def handle_after_transaction(meta) do
542 meta
543 |> send_notifications()
544 |> send_streamables()
545 end
546 end