Merge remote-tracking branch 'pleroma/develop' into optional-config
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36 @impl true
37 def handle(object, meta \\ [])
38
39 # Task this handles
40 # - Follows
41 # - Sends a notification
42 @impl true
43 def handle(
44 %{
45 data: %{
46 "actor" => actor,
47 "type" => "Accept",
48 "object" => follow_activity_id
49 }
50 } = object,
51 meta
52 ) do
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
61 end
62
63 {:ok, object, meta}
64 end
65
66 # Task this handles
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
70 @impl true
71 def handle(
72 %{
73 data: %{
74 "actor" => actor,
75 "type" => "Reject",
76 "object" => follow_activity_id
77 }
78 } = object,
79 meta
80 ) do
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
88 end
89
90 {:ok, object, meta}
91 end
92
93 # Tasks this handle
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
97 @impl true
98 def handle(
99 %{
100 data: %{
101 "id" => follow_id,
102 "type" => "Follow",
103 "object" => followed_user,
104 "actor" => following_user
105 }
106 } = object,
107 meta
108 ) do
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 end
117 else
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
121
122 _ ->
123 nil
124 end
125
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
127
128 meta =
129 meta
130 |> add_notifications(notifications)
131
132 updated_object = Activity.get_by_ap_id(follow_id)
133
134 {:ok, updated_object, meta}
135 end
136
137 # Tasks this handles:
138 # - Unfollow and block
139 @impl true
140 def handle(
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
142 object,
143 meta
144 ) do
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
148 end
149
150 {:ok, object, meta}
151 end
152
153 # Tasks this handles:
154 # - Update the user
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
161 changeset
162 |> User.update_and_set_cache()
163 else
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
165
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
169 end
170
171 {:ok, object, meta}
172 end
173
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
177 @impl true
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
181
182 Notification.create_notifications(object)
183
184 {:ok, object, meta}
185 end
186
187 # Tasks this handles
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
195 @impl true
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
201
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
204 end
205
206 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
207 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
208 end)
209
210 meta =
211 meta
212 |> add_notifications(notifications)
213
214 {:ok, activity, meta}
215 else
216 e -> Repo.rollback(e)
217 end
218 end
219
220 # Tasks this handles:
221 # - Add announce to object
222 # - Set up notification
223 # - Stream out the announce
224 @impl true
225 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
226 announced_object = Object.get_by_ap_id(object.data["object"])
227 user = User.get_cached_by_ap_id(object.data["actor"])
228
229 Utils.add_announce_to_object(object, announced_object)
230
231 if !User.is_internal_user?(user) do
232 Notification.create_notifications(object)
233
234 object
235 |> Topics.get_activity_topics()
236 |> Streamer.stream(object)
237 end
238
239 {:ok, object, meta}
240 end
241
242 @impl true
243 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
244 with undone_object <- Activity.get_by_ap_id(undone_object),
245 :ok <- handle_undoing(undone_object) do
246 {:ok, object, meta}
247 end
248 end
249
250 # Tasks this handles:
251 # - Add reaction to object
252 # - Set up notification
253 @impl true
254 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
255 reacted_object = Object.get_by_ap_id(object.data["object"])
256 Utils.add_emoji_reaction_to_object(object, reacted_object)
257
258 Notification.create_notifications(object)
259
260 {:ok, object, meta}
261 end
262
263 # Tasks this handles:
264 # - Delete and unpins the create activity
265 # - Replace object with Tombstone
266 # - Set up notification
267 # - Reduce the user note count
268 # - Reduce the reply count
269 # - Stream out the activity
270 @impl true
271 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
272 deleted_object =
273 Object.normalize(deleted_object, fetch: false) ||
274 User.get_cached_by_ap_id(deleted_object)
275
276 result =
277 case deleted_object do
278 %Object{} ->
279 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
280 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
281 %User{} = user <- User.get_cached_by_ap_id(actor) do
282 User.remove_pinned_object_id(user, deleted_object.data["id"])
283
284 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
285
286 if in_reply_to = deleted_object.data["inReplyTo"] do
287 Object.decrease_replies_count(in_reply_to)
288 end
289
290 MessageReference.delete_for_object(deleted_object)
291
292 @ap_streamer.stream_out(object)
293 @ap_streamer.stream_out_participations(deleted_object, user)
294 :ok
295 else
296 {:actor, _} ->
297 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
298 :no_object_actor
299 end
300
301 %User{} ->
302 with {:ok, _} <- User.delete(deleted_object) do
303 :ok
304 end
305 end
306
307 if result == :ok do
308 Notification.create_notifications(object)
309 {:ok, object, meta}
310 else
311 {:error, result}
312 end
313 end
314
315 # Tasks this handles:
316 # - adds pin to user
317 # - removes expiration job for pinned activity, if was set for expiration
318 @impl true
319 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
320 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
321 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
322 # if pinned activity was scheduled for deletion, we remove job
323 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
324 Oban.cancel_job(expiration.id)
325 end
326
327 {:ok, object, meta}
328 else
329 nil ->
330 {:error, :user_not_found}
331
332 {:error, changeset} ->
333 if changeset.errors[:pinned_objects] do
334 {:error, :pinned_statuses_limit_reached}
335 else
336 changeset.errors
337 end
338 end
339 end
340
341 # Tasks this handles:
342 # - removes pin from user
343 # - removes corresponding Add activity
344 # - if activity had expiration, recreates activity expiration job
345 @impl true
346 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
347 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
348 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
349 data["object"]
350 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
351 |> Repo.delete_all()
352
353 # if pinned activity was scheduled for deletion, we reschedule it for deletion
354 if meta[:expires_at] do
355 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
356 {:ok, expires_at} =
357 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
358
359 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
360 activity_id: meta[:activity_id],
361 expires_at: expires_at
362 })
363 end
364
365 {:ok, object, meta}
366 else
367 nil -> {:error, :user_not_found}
368 error -> error
369 end
370 end
371
372 # Nothing to do
373 @impl true
374 def handle(object, meta) do
375 {:ok, object, meta}
376 end
377
378 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
379 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
380 actor = User.get_cached_by_ap_id(object.data["actor"])
381 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
382
383 streamables =
384 [[actor, recipient], [recipient, actor]]
385 |> Enum.uniq()
386 |> Enum.map(fn [user, other_user] ->
387 if user.local do
388 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
389 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
390
391 @cachex.put(
392 :chat_message_id_idempotency_key_cache,
393 cm_ref.id,
394 meta[:idempotency_key]
395 )
396
397 {
398 ["user", "user:pleroma_chat"],
399 {user, %{cm_ref | chat: chat, object: object}}
400 }
401 end
402 end)
403 |> Enum.filter(& &1)
404
405 meta =
406 meta
407 |> add_streamables(streamables)
408
409 {:ok, object, meta}
410 end
411 end
412
413 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
414 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
415 Object.increase_vote_count(
416 object.data["inReplyTo"],
417 object.data["name"],
418 object.data["actor"]
419 )
420
421 {:ok, object, meta}
422 end
423 end
424
425 def handle_object_creation(%{"type" => objtype} = object, meta)
426 when objtype in ~w[Audio Video Question Event Article] do
427 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
428 {:ok, object, meta}
429 end
430 end
431
432 # Nothing to do
433 def handle_object_creation(object, meta) do
434 {:ok, object, meta}
435 end
436
437 defp undo_like(nil, object), do: delete_object(object)
438
439 defp undo_like(%Object{} = liked_object, object) do
440 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
441 delete_object(object)
442 end
443 end
444
445 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
446 object.data["object"]
447 |> Object.get_by_ap_id()
448 |> undo_like(object)
449 end
450
451 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
452 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
453 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
454 {:ok, _} <- Repo.delete(object) do
455 :ok
456 end
457 end
458
459 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
460 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
461 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
462 {:ok, _} <- Repo.delete(object) do
463 :ok
464 end
465 end
466
467 def handle_undoing(
468 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
469 ) do
470 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
471 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
472 {:ok, _} <- User.unblock(blocker, blocked),
473 {:ok, _} <- Repo.delete(object) do
474 :ok
475 end
476 end
477
478 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
479
480 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
481 defp delete_object(object) do
482 with {:ok, _} <- Repo.delete(object), do: :ok
483 end
484
485 defp send_notifications(meta) do
486 Keyword.get(meta, :notifications, [])
487 |> Enum.each(fn notification ->
488 Streamer.stream(["user", "user:notification"], notification)
489 Push.send(notification)
490 end)
491
492 meta
493 end
494
495 defp send_streamables(meta) do
496 Keyword.get(meta, :streamables, [])
497 |> Enum.each(fn {topics, items} ->
498 Streamer.stream(topics, items)
499 end)
500
501 meta
502 end
503
504 defp add_streamables(meta, streamables) do
505 existing = Keyword.get(meta, :streamables, [])
506
507 meta
508 |> Keyword.put(:streamables, streamables ++ existing)
509 end
510
511 defp add_notifications(meta, notifications) do
512 existing = Keyword.get(meta, :notifications, [])
513
514 meta
515 |> Keyword.put(:notifications, notifications ++ existing)
516 end
517
518 @impl true
519 def handle_after_transaction(meta) do
520 meta
521 |> send_notifications()
522 |> send_streamables()
523 end
524 end