Let pins federate
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36 @impl true
37 def handle(object, meta \\ [])
38
39 # Task this handles
40 # - Follows
41 # - Sends a notification
42 @impl true
43 def handle(
44 %{
45 data: %{
46 "actor" => actor,
47 "type" => "Accept",
48 "object" => follow_activity_id
49 }
50 } = object,
51 meta
52 ) do
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
61 end
62
63 {:ok, object, meta}
64 end
65
66 # Task this handles
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
70 @impl true
71 def handle(
72 %{
73 data: %{
74 "actor" => actor,
75 "type" => "Reject",
76 "object" => follow_activity_id
77 }
78 } = object,
79 meta
80 ) do
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
88 end
89
90 {:ok, object, meta}
91 end
92
93 # Tasks this handle
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
97 @impl true
98 def handle(
99 %{
100 data: %{
101 "id" => follow_id,
102 "type" => "Follow",
103 "object" => followed_user,
104 "actor" => following_user
105 }
106 } = object,
107 meta
108 ) do
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 end
117 else
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
121
122 _ ->
123 nil
124 end
125
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
127
128 meta =
129 meta
130 |> add_notifications(notifications)
131
132 updated_object = Activity.get_by_ap_id(follow_id)
133
134 {:ok, updated_object, meta}
135 end
136
137 # Tasks this handles:
138 # - Unfollow and block
139 @impl true
140 def handle(
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
142 object,
143 meta
144 ) do
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
148 end
149
150 {:ok, object, meta}
151 end
152
153 # Tasks this handles:
154 # - Update the user
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
161 changeset
162 |> User.update_and_set_cache()
163 else
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
165
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
169 end
170
171 {:ok, object, meta}
172 end
173
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
177 @impl true
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
181
182 Notification.create_notifications(object)
183
184 {:ok, object, meta}
185 end
186
187 # Tasks this handles
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
195 @impl true
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
201
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
204 end
205
206 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
207 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
208 end)
209
210 meta =
211 meta
212 |> add_notifications(notifications)
213
214 {:ok, activity, meta}
215 else
216 e -> Repo.rollback(e)
217 end
218 end
219
220 # Tasks this handles:
221 # - Add announce to object
222 # - Set up notification
223 # - Stream out the announce
224 @impl true
225 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
226 announced_object = Object.get_by_ap_id(object.data["object"])
227 user = User.get_cached_by_ap_id(object.data["actor"])
228
229 Utils.add_announce_to_object(object, announced_object)
230
231 if !User.is_internal_user?(user) do
232 Notification.create_notifications(object)
233
234 object
235 |> Topics.get_activity_topics()
236 |> Streamer.stream(object)
237 end
238
239 {:ok, object, meta}
240 end
241
242 @impl true
243 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
244 with undone_object <- Activity.get_by_ap_id(undone_object),
245 :ok <- handle_undoing(undone_object) do
246 {:ok, object, meta}
247 end
248 end
249
250 # Tasks this handles:
251 # - Add reaction to object
252 # - Set up notification
253 @impl true
254 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
255 reacted_object = Object.get_by_ap_id(object.data["object"])
256 Utils.add_emoji_reaction_to_object(object, reacted_object)
257
258 Notification.create_notifications(object)
259
260 {:ok, object, meta}
261 end
262
263 # Tasks this handles:
264 # - Delete and unpins the create activity
265 # - Replace object with Tombstone
266 # - Set up notification
267 # - Reduce the user note count
268 # - Reduce the reply count
269 # - Stream out the activity
270 @impl true
271 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
272 deleted_object =
273 Object.normalize(deleted_object, fetch: false) ||
274 User.get_cached_by_ap_id(deleted_object)
275
276 result =
277 case deleted_object do
278 %Object{} ->
279 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
280 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
281 %User{} = user <- User.get_cached_by_ap_id(actor) do
282 User.remove_pinned_object_id(user, deleted_object.data["id"])
283
284 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
285
286 if in_reply_to = deleted_object.data["inReplyTo"] do
287 Object.decrease_replies_count(in_reply_to)
288 end
289
290 MessageReference.delete_for_object(deleted_object)
291
292 @ap_streamer.stream_out(object)
293 @ap_streamer.stream_out_participations(deleted_object, user)
294 :ok
295 else
296 {:actor, _} ->
297 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
298 :no_object_actor
299 end
300
301 %User{} ->
302 with {:ok, _} <- User.delete(deleted_object) do
303 :ok
304 end
305 end
306
307 if result == :ok do
308 Notification.create_notifications(object)
309 {:ok, object, meta}
310 else
311 {:error, result}
312 end
313 end
314
315 # Tasks this handles:
316 # - adds pin to user
317 # - removes expiration job for pinned activity, if was set for expiration
318 @impl true
319 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
320 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
321 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
322 # if pinned activity was scheduled for deletion, we remove job
323 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
324 Oban.cancel_job(expiration.id)
325 end
326
327 {:ok, object, meta}
328 else
329 nil ->
330 {:error, :user_not_found}
331
332 {:error, changeset} ->
333 if changeset.errors[:pinned_objects] do
334 {:error, :pinned_statuses_limit_reached}
335 else
336 changeset.errors
337 end
338 end
339 end
340
341 # Tasks this handles:
342 # - removes pin from user
343 # - if activity had expiration, recreates activity expiration job
344 @impl true
345 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
346 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
347 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
348 # if pinned activity was scheduled for deletion, we reschedule it for deletion
349 if meta[:expires_at] do
350 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
351 {:ok, expires_at} =
352 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
353
354 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
355 activity_id: meta[:activity_id],
356 expires_at: expires_at
357 })
358 end
359
360 {:ok, object, meta}
361 else
362 nil -> {:error, :user_not_found}
363 error -> error
364 end
365 end
366
367 # Nothing to do
368 @impl true
369 def handle(object, meta) do
370 {:ok, object, meta}
371 end
372
373 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
374 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
375 actor = User.get_cached_by_ap_id(object.data["actor"])
376 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
377
378 streamables =
379 [[actor, recipient], [recipient, actor]]
380 |> Enum.uniq()
381 |> Enum.map(fn [user, other_user] ->
382 if user.local do
383 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
384 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
385
386 @cachex.put(
387 :chat_message_id_idempotency_key_cache,
388 cm_ref.id,
389 meta[:idempotency_key]
390 )
391
392 {
393 ["user", "user:pleroma_chat"],
394 {user, %{cm_ref | chat: chat, object: object}}
395 }
396 end
397 end)
398 |> Enum.filter(& &1)
399
400 meta =
401 meta
402 |> add_streamables(streamables)
403
404 {:ok, object, meta}
405 end
406 end
407
408 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
409 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
410 Object.increase_vote_count(
411 object.data["inReplyTo"],
412 object.data["name"],
413 object.data["actor"]
414 )
415
416 {:ok, object, meta}
417 end
418 end
419
420 def handle_object_creation(%{"type" => objtype} = object, meta)
421 when objtype in ~w[Audio Video Question Event Article] do
422 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
423 {:ok, object, meta}
424 end
425 end
426
427 # Nothing to do
428 def handle_object_creation(object, meta) do
429 {:ok, object, meta}
430 end
431
432 defp undo_like(nil, object), do: delete_object(object)
433
434 defp undo_like(%Object{} = liked_object, object) do
435 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
436 delete_object(object)
437 end
438 end
439
440 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
441 object.data["object"]
442 |> Object.get_by_ap_id()
443 |> undo_like(object)
444 end
445
446 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
447 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
448 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
449 {:ok, _} <- Repo.delete(object) do
450 :ok
451 end
452 end
453
454 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
455 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
456 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
457 {:ok, _} <- Repo.delete(object) do
458 :ok
459 end
460 end
461
462 def handle_undoing(
463 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
464 ) do
465 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
466 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
467 {:ok, _} <- User.unblock(blocker, blocked),
468 {:ok, _} <- Repo.delete(object) do
469 :ok
470 end
471 end
472
473 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
474
475 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
476 defp delete_object(object) do
477 with {:ok, _} <- Repo.delete(object), do: :ok
478 end
479
480 defp send_notifications(meta) do
481 Keyword.get(meta, :notifications, [])
482 |> Enum.each(fn notification ->
483 Streamer.stream(["user", "user:notification"], notification)
484 Push.send(notification)
485 end)
486
487 meta
488 end
489
490 defp send_streamables(meta) do
491 Keyword.get(meta, :streamables, [])
492 |> Enum.each(fn {topics, items} ->
493 Streamer.stream(topics, items)
494 end)
495
496 meta
497 end
498
499 defp add_streamables(meta, streamables) do
500 existing = Keyword.get(meta, :streamables, [])
501
502 meta
503 |> Keyword.put(:streamables, streamables ++ existing)
504 end
505
506 defp add_notifications(meta, notifications) do
507 existing = Keyword.get(meta, :notifications, [])
508
509 meta
510 |> Keyword.put(:notifications, notifications ++ existing)
511 end
512
513 @impl true
514 def handle_after_transaction(meta) do
515 meta
516 |> send_notifications()
517 |> send_streamables()
518 end
519 end