d08d6aa708594db5d81cbcd3b2b5013a4819450f
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Chat
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Repo
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 # - Index incoming posts for search (if needed)
197 @impl true
198 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
199 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
200 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
201 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
202 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
204
205 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
206 Object.increase_replies_count(in_reply_to)
207 end
208
209 reply_depth = (meta[:depth] || 0) + 1
210
211 # FIXME: Force inReplyTo to replies
212 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
213 object.data["replies"] != nil do
214 for reply_id <- object.data["replies"] do
215 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
216 "id" => reply_id,
217 "depth" => reply_depth
218 })
219 end
220 end
221
222 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
223 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
224 end)
225
226 Pleroma.Search.DatabaseSearch.add_to_index(Map.put(activity, :object, object))
227
228 meta =
229 meta
230 |> add_notifications(notifications)
231
232 ap_streamer().stream_out(activity)
233
234 {:ok, activity, meta}
235 else
236 e -> Repo.rollback(e)
237 end
238 end
239
240 # Tasks this handles:
241 # - Add announce to object
242 # - Set up notification
243 # - Stream out the announce
244 @impl true
245 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
246 announced_object = Object.get_by_ap_id(object.data["object"])
247 user = User.get_cached_by_ap_id(object.data["actor"])
248
249 Utils.add_announce_to_object(object, announced_object)
250
251 if !User.is_internal_user?(user) do
252 Notification.create_notifications(object)
253
254 ap_streamer().stream_out(object)
255 end
256
257 {:ok, object, meta}
258 end
259
260 @impl true
261 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
262 with undone_object <- Activity.get_by_ap_id(undone_object),
263 :ok <- handle_undoing(undone_object) do
264 {:ok, object, meta}
265 end
266 end
267
268 # Tasks this handles:
269 # - Add reaction to object
270 # - Set up notification
271 @impl true
272 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
273 reacted_object = Object.get_by_ap_id(object.data["object"])
274 Utils.add_emoji_reaction_to_object(object, reacted_object)
275 Notification.create_notifications(object)
276
277 {:ok, object, meta}
278 end
279
280 # Tasks this handles:
281 # - Delete and unpins the create activity
282 # - Replace object with Tombstone
283 # - Set up notification
284 # - Reduce the user note count
285 # - Reduce the reply count
286 # - Stream out the activity
287 # - Removes posts from search index (if needed)
288 @impl true
289 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
290 deleted_object =
291 Object.normalize(deleted_object, fetch: false) ||
292 User.get_cached_by_ap_id(deleted_object)
293
294 result =
295 case deleted_object do
296 %Object{} ->
297 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
298 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
299 %User{} = user <- User.get_cached_by_ap_id(actor) do
300 User.remove_pinned_object_id(user, deleted_object.data["id"])
301
302 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
303
304 if in_reply_to = deleted_object.data["inReplyTo"] do
305 Object.decrease_replies_count(in_reply_to)
306 end
307
308 MessageReference.delete_for_object(deleted_object)
309
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
312 :ok
313 else
314 {:actor, _} ->
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316 :no_object_actor
317 end
318
319 %User{} ->
320 with {:ok, _} <- User.delete(deleted_object) do
321 :ok
322 end
323 end
324
325 if result == :ok do
326 Notification.create_notifications(object)
327
328 Pleroma.Search.DatabaseSearch.remove_from_index(deleted_object)
329
330 {:ok, object, meta}
331 else
332 {:error, result}
333 end
334 end
335
336 # Tasks this handles:
337 # - adds pin to user
338 # - removes expiration job for pinned activity, if was set for expiration
339 @impl true
340 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
341 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
342 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
343 # if pinned activity was scheduled for deletion, we remove job
344 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
345 Oban.cancel_job(expiration.id)
346 end
347
348 {:ok, object, meta}
349 else
350 nil ->
351 {:error, :user_not_found}
352
353 {:error, changeset} ->
354 if changeset.errors[:pinned_objects] do
355 {:error, :pinned_statuses_limit_reached}
356 else
357 changeset.errors
358 end
359 end
360 end
361
362 # Tasks this handles:
363 # - removes pin from user
364 # - removes corresponding Add activity
365 # - if activity had expiration, recreates activity expiration job
366 @impl true
367 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
368 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
369 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
370 data["object"]
371 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
372 |> Repo.delete_all()
373
374 # if pinned activity was scheduled for deletion, we reschedule it for deletion
375 if meta[:expires_at] do
376 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
377 {:ok, expires_at} =
378 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
379
380 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
381 activity_id: meta[:activity_id],
382 expires_at: expires_at
383 })
384 end
385
386 {:ok, object, meta}
387 else
388 nil -> {:error, :user_not_found}
389 error -> error
390 end
391 end
392
393 # Nothing to do
394 @impl true
395 def handle(object, meta) do
396 {:ok, object, meta}
397 end
398
399 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
400 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
401 actor = User.get_cached_by_ap_id(object.data["actor"])
402 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
403
404 streamables =
405 [[actor, recipient], [recipient, actor]]
406 |> Enum.uniq()
407 |> Enum.map(fn [user, other_user] ->
408 if user.local do
409 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
410 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
411
412 @cachex.put(
413 :chat_message_id_idempotency_key_cache,
414 cm_ref.id,
415 meta[:idempotency_key]
416 )
417
418 {
419 ["user", "user:pleroma_chat"],
420 {user, %{cm_ref | chat: chat, object: object}}
421 }
422 end
423 end)
424 |> Enum.filter(& &1)
425
426 meta =
427 meta
428 |> add_streamables(streamables)
429
430 {:ok, object, meta}
431 end
432 end
433
434 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
435 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
436 PollWorker.schedule_poll_end(activity)
437 {:ok, object, meta}
438 end
439 end
440
441 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
442 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
443 Object.increase_vote_count(
444 object.data["inReplyTo"],
445 object.data["name"],
446 object.data["actor"]
447 )
448
449 {:ok, object, meta}
450 end
451 end
452
453 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
454 when objtype in ~w[Audio Video Event Article Note Page] do
455 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
456 {:ok, object, meta}
457 end
458 end
459
460 # Nothing to do
461 def handle_object_creation(object, _activity, meta) do
462 {:ok, object, meta}
463 end
464
465 defp undo_like(nil, object), do: delete_object(object)
466
467 defp undo_like(%Object{} = liked_object, object) do
468 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
469 delete_object(object)
470 end
471 end
472
473 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
474 object.data["object"]
475 |> Object.get_by_ap_id()
476 |> undo_like(object)
477 end
478
479 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
480 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
481 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
482 {:ok, _} <- Repo.delete(object) do
483 :ok
484 end
485 end
486
487 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
488 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
489 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
490 {:ok, _} <- Repo.delete(object) do
491 :ok
492 end
493 end
494
495 def handle_undoing(
496 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
497 ) do
498 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
499 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
500 {:ok, _} <- User.unblock(blocker, blocked),
501 {:ok, _} <- Repo.delete(object) do
502 :ok
503 end
504 end
505
506 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
507
508 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
509 defp delete_object(object) do
510 with {:ok, _} <- Repo.delete(object), do: :ok
511 end
512
513 defp send_notifications(meta) do
514 Keyword.get(meta, :notifications, [])
515 |> Enum.each(fn notification ->
516 Streamer.stream(["user", "user:notification"], notification)
517 Push.send(notification)
518 end)
519
520 meta
521 end
522
523 defp send_streamables(meta) do
524 Keyword.get(meta, :streamables, [])
525 |> Enum.each(fn {topics, items} ->
526 Streamer.stream(topics, items)
527 end)
528
529 meta
530 end
531
532 defp add_streamables(meta, streamables) do
533 existing = Keyword.get(meta, :streamables, [])
534
535 meta
536 |> Keyword.put(:streamables, streamables ++ existing)
537 end
538
539 defp add_notifications(meta, notifications) do
540 existing = Keyword.get(meta, :notifications, [])
541
542 meta
543 |> Keyword.put(:notifications, notifications ++ existing)
544 end
545
546 @impl true
547 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
548 Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
549 end
550
551 def handle_after_transaction(%Pleroma.Activity{
552 data: %{"type" => "Delete", "deleted_activity_id" => id}
553 }) do
554 Pleroma.Elasticsearch.delete_by_id(:activity, id)
555 end
556
557 def handle_after_transaction(%Pleroma.Activity{}) do
558 :ok
559 end
560
561 def handle_after_transaction(%Pleroma.Object{}) do
562 :ok
563 end
564
565 def handle_after_transaction(meta) do
566 meta
567 |> send_notifications()
568 |> send_streamables()
569 end
570 end