a4169d41fdfef69f72467a45444cb38bc10b4c7c
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Chat
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Repo
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 @impl true
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202
203 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
204 Object.increase_replies_count(in_reply_to)
205 end
206
207 reply_depth = (meta[:depth] || 0) + 1
208
209 # FIXME: Force inReplyTo to replies
210 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
211 object.data["replies"] != nil do
212 for reply_id <- object.data["replies"] do
213 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
214 "id" => reply_id,
215 "depth" => reply_depth
216 })
217 end
218 end
219
220 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
221 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
222 end)
223
224 meta =
225 meta
226 |> add_notifications(notifications)
227
228 ap_streamer().stream_out(activity)
229
230 {:ok, activity, meta}
231 else
232 e -> Repo.rollback(e)
233 end
234 end
235
236 # Tasks this handles:
237 # - Add announce to object
238 # - Set up notification
239 # - Stream out the announce
240 @impl true
241 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
242 announced_object = Object.get_by_ap_id(object.data["object"])
243 user = User.get_cached_by_ap_id(object.data["actor"])
244
245 Utils.add_announce_to_object(object, announced_object)
246
247 if !User.is_internal_user?(user) do
248 Notification.create_notifications(object)
249
250 ap_streamer().stream_out(object)
251 end
252
253 {:ok, object, meta}
254 end
255
256 @impl true
257 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
258 with undone_object <- Activity.get_by_ap_id(undone_object),
259 :ok <- handle_undoing(undone_object) do
260 {:ok, object, meta}
261 end
262 end
263
264 # Tasks this handles:
265 # - Add reaction to object
266 # - Set up notification
267 @impl true
268 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
269 reacted_object = Object.get_by_ap_id(object.data["object"])
270 Utils.add_emoji_reaction_to_object(object, reacted_object)
271
272 Notification.create_notifications(object)
273
274 {:ok, object, meta}
275 end
276
277 # Tasks this handles:
278 # - Delete and unpins the create activity
279 # - Replace object with Tombstone
280 # - Set up notification
281 # - Reduce the user note count
282 # - Reduce the reply count
283 # - Stream out the activity
284 @impl true
285 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
286 deleted_object =
287 Object.normalize(deleted_object, fetch: false) ||
288 User.get_cached_by_ap_id(deleted_object)
289
290 result =
291 case deleted_object do
292 %Object{} ->
293 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
294 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
295 %User{} = user <- User.get_cached_by_ap_id(actor) do
296 User.remove_pinned_object_id(user, deleted_object.data["id"])
297
298 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
299
300 if in_reply_to = deleted_object.data["inReplyTo"] do
301 Object.decrease_replies_count(in_reply_to)
302 end
303
304 MessageReference.delete_for_object(deleted_object)
305
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
308 :ok
309 else
310 {:actor, _} ->
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
312 :no_object_actor
313 end
314
315 %User{} ->
316 with {:ok, _} <- User.delete(deleted_object) do
317 :ok
318 end
319 end
320
321 if result == :ok do
322 Notification.create_notifications(object)
323 {:ok, object, meta}
324 else
325 {:error, result}
326 end
327 end
328
329 # Tasks this handles:
330 # - adds pin to user
331 # - removes expiration job for pinned activity, if was set for expiration
332 @impl true
333 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
334 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
335 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
336 # if pinned activity was scheduled for deletion, we remove job
337 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
338 Oban.cancel_job(expiration.id)
339 end
340
341 {:ok, object, meta}
342 else
343 nil ->
344 {:error, :user_not_found}
345
346 {:error, changeset} ->
347 if changeset.errors[:pinned_objects] do
348 {:error, :pinned_statuses_limit_reached}
349 else
350 changeset.errors
351 end
352 end
353 end
354
355 # Tasks this handles:
356 # - removes pin from user
357 # - removes corresponding Add activity
358 # - if activity had expiration, recreates activity expiration job
359 @impl true
360 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
361 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
362 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
363 data["object"]
364 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
365 |> Repo.delete_all()
366
367 # if pinned activity was scheduled for deletion, we reschedule it for deletion
368 if meta[:expires_at] do
369 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
370 {:ok, expires_at} =
371 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
372
373 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
374 activity_id: meta[:activity_id],
375 expires_at: expires_at
376 })
377 end
378
379 {:ok, object, meta}
380 else
381 nil -> {:error, :user_not_found}
382 error -> error
383 end
384 end
385
386 # Nothing to do
387 @impl true
388 def handle(object, meta) do
389 {:ok, object, meta}
390 end
391
392 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
393 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
394 actor = User.get_cached_by_ap_id(object.data["actor"])
395 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
396
397 streamables =
398 [[actor, recipient], [recipient, actor]]
399 |> Enum.uniq()
400 |> Enum.map(fn [user, other_user] ->
401 if user.local do
402 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
403 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
404
405 @cachex.put(
406 :chat_message_id_idempotency_key_cache,
407 cm_ref.id,
408 meta[:idempotency_key]
409 )
410
411 {
412 ["user", "user:pleroma_chat"],
413 {user, %{cm_ref | chat: chat, object: object}}
414 }
415 end
416 end)
417 |> Enum.filter(& &1)
418
419 meta =
420 meta
421 |> add_streamables(streamables)
422
423 {:ok, object, meta}
424 end
425 end
426
427 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
428 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
429 PollWorker.schedule_poll_end(activity)
430 {:ok, object, meta}
431 end
432 end
433
434 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
435 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
436 Object.increase_vote_count(
437 object.data["inReplyTo"],
438 object.data["name"],
439 object.data["actor"]
440 )
441
442 {:ok, object, meta}
443 end
444 end
445
446 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
447 when objtype in ~w[Audio Video Event Article Note Page] do
448 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
449 {:ok, object, meta}
450 end
451 end
452
453 # Nothing to do
454 def handle_object_creation(object, _activity, meta) do
455 {:ok, object, meta}
456 end
457
458 defp undo_like(nil, object), do: delete_object(object)
459
460 defp undo_like(%Object{} = liked_object, object) do
461 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
462 delete_object(object)
463 end
464 end
465
466 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
467 object.data["object"]
468 |> Object.get_by_ap_id()
469 |> undo_like(object)
470 end
471
472 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
473 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
474 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
475 {:ok, _} <- Repo.delete(object) do
476 :ok
477 end
478 end
479
480 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
481 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
482 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
483 {:ok, _} <- Repo.delete(object) do
484 :ok
485 end
486 end
487
488 def handle_undoing(
489 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
490 ) do
491 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
492 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
493 {:ok, _} <- User.unblock(blocker, blocked),
494 {:ok, _} <- Repo.delete(object) do
495 :ok
496 end
497 end
498
499 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
500
501 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
502 defp delete_object(object) do
503 with {:ok, _} <- Repo.delete(object), do: :ok
504 end
505
506 defp send_notifications(meta) do
507 Keyword.get(meta, :notifications, [])
508 |> Enum.each(fn notification ->
509 Streamer.stream(["user", "user:notification"], notification)
510 Push.send(notification)
511 end)
512
513 meta
514 end
515
516 defp send_streamables(meta) do
517 Keyword.get(meta, :streamables, [])
518 |> Enum.each(fn {topics, items} ->
519 Streamer.stream(topics, items)
520 end)
521
522 meta
523 end
524
525 defp add_streamables(meta, streamables) do
526 existing = Keyword.get(meta, :streamables, [])
527
528 meta
529 |> Keyword.put(:streamables, streamables ++ existing)
530 end
531
532 defp add_notifications(meta, notifications) do
533 existing = Keyword.get(meta, :notifications, [])
534
535 meta
536 |> Keyword.put(:notifications, notifications ++ existing)
537 end
538
539 @impl true
540 def handle_after_transaction(activity, meta) do
541 Pleroma.Elasticsearch.put_by_id(activity.id)
542
543 meta
544 |> send_notifications()
545 |> send_streamables()
546 end
547 end