Pipeline Ingestion: Page
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36 @impl true
37 def handle(object, meta \\ [])
38
39 # Task this handles
40 # - Follows
41 # - Sends a notification
42 @impl true
43 def handle(
44 %{
45 data: %{
46 "actor" => actor,
47 "type" => "Accept",
48 "object" => follow_activity_id
49 }
50 } = object,
51 meta
52 ) do
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
61 end
62
63 {:ok, object, meta}
64 end
65
66 # Task this handles
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
70 @impl true
71 def handle(
72 %{
73 data: %{
74 "actor" => actor,
75 "type" => "Reject",
76 "object" => follow_activity_id
77 }
78 } = object,
79 meta
80 ) do
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
88 end
89
90 {:ok, object, meta}
91 end
92
93 # Tasks this handle
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
97 @impl true
98 def handle(
99 %{
100 data: %{
101 "id" => follow_id,
102 "type" => "Follow",
103 "object" => followed_user,
104 "actor" => following_user
105 }
106 } = object,
107 meta
108 ) do
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 end
117 else
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
121
122 _ ->
123 nil
124 end
125
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
127
128 meta =
129 meta
130 |> add_notifications(notifications)
131
132 updated_object = Activity.get_by_ap_id(follow_id)
133
134 {:ok, updated_object, meta}
135 end
136
137 # Tasks this handles:
138 # - Unfollow and block
139 @impl true
140 def handle(
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
142 object,
143 meta
144 ) do
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
148 end
149
150 {:ok, object, meta}
151 end
152
153 # Tasks this handles:
154 # - Update the user
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
161 changeset
162 |> User.update_and_set_cache()
163 else
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
165
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
169 end
170
171 {:ok, object, meta}
172 end
173
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
177 @impl true
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
181
182 Notification.create_notifications(object)
183
184 {:ok, object, meta}
185 end
186
187 # Tasks this handles
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
195 @impl true
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
201
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
204 end
205
206 reply_depth = (meta[:depth] || 0) + 1
207
208 # FIXME: Force inReplyTo to replies
209 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
210 object.data["replies"] != nil do
211 for reply_id <- object.data["replies"] do
212 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
213 "id" => reply_id,
214 "depth" => reply_depth
215 })
216 end
217 end
218
219 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
220 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
221 end)
222
223 meta =
224 meta
225 |> add_notifications(notifications)
226
227 {:ok, activity, meta}
228 else
229 e -> Repo.rollback(e)
230 end
231 end
232
233 # Tasks this handles:
234 # - Add announce to object
235 # - Set up notification
236 # - Stream out the announce
237 @impl true
238 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
239 announced_object = Object.get_by_ap_id(object.data["object"])
240 user = User.get_cached_by_ap_id(object.data["actor"])
241
242 Utils.add_announce_to_object(object, announced_object)
243
244 if !User.is_internal_user?(user) do
245 Notification.create_notifications(object)
246
247 object
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(object)
250 end
251
252 {:ok, object, meta}
253 end
254
255 @impl true
256 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
257 with undone_object <- Activity.get_by_ap_id(undone_object),
258 :ok <- handle_undoing(undone_object) do
259 {:ok, object, meta}
260 end
261 end
262
263 # Tasks this handles:
264 # - Add reaction to object
265 # - Set up notification
266 @impl true
267 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
268 reacted_object = Object.get_by_ap_id(object.data["object"])
269 Utils.add_emoji_reaction_to_object(object, reacted_object)
270
271 Notification.create_notifications(object)
272
273 {:ok, object, meta}
274 end
275
276 # Tasks this handles:
277 # - Delete and unpins the create activity
278 # - Replace object with Tombstone
279 # - Set up notification
280 # - Reduce the user note count
281 # - Reduce the reply count
282 # - Stream out the activity
283 @impl true
284 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
285 deleted_object =
286 Object.normalize(deleted_object, fetch: false) ||
287 User.get_cached_by_ap_id(deleted_object)
288
289 result =
290 case deleted_object do
291 %Object{} ->
292 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
293 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
294 %User{} = user <- User.get_cached_by_ap_id(actor) do
295 User.remove_pinned_object_id(user, deleted_object.data["id"])
296
297 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
298
299 if in_reply_to = deleted_object.data["inReplyTo"] do
300 Object.decrease_replies_count(in_reply_to)
301 end
302
303 MessageReference.delete_for_object(deleted_object)
304
305 @ap_streamer.stream_out(object)
306 @ap_streamer.stream_out_participations(deleted_object, user)
307 :ok
308 else
309 {:actor, _} ->
310 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
311 :no_object_actor
312 end
313
314 %User{} ->
315 with {:ok, _} <- User.delete(deleted_object) do
316 :ok
317 end
318 end
319
320 if result == :ok do
321 Notification.create_notifications(object)
322 {:ok, object, meta}
323 else
324 {:error, result}
325 end
326 end
327
328 # Tasks this handles:
329 # - adds pin to user
330 # - removes expiration job for pinned activity, if was set for expiration
331 @impl true
332 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
333 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
334 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
335 # if pinned activity was scheduled for deletion, we remove job
336 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
337 Oban.cancel_job(expiration.id)
338 end
339
340 {:ok, object, meta}
341 else
342 nil ->
343 {:error, :user_not_found}
344
345 {:error, changeset} ->
346 if changeset.errors[:pinned_objects] do
347 {:error, :pinned_statuses_limit_reached}
348 else
349 changeset.errors
350 end
351 end
352 end
353
354 # Tasks this handles:
355 # - removes pin from user
356 # - removes corresponding Add activity
357 # - if activity had expiration, recreates activity expiration job
358 @impl true
359 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
360 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
361 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
362 data["object"]
363 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
364 |> Repo.delete_all()
365
366 # if pinned activity was scheduled for deletion, we reschedule it for deletion
367 if meta[:expires_at] do
368 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
369 {:ok, expires_at} =
370 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
371
372 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
373 activity_id: meta[:activity_id],
374 expires_at: expires_at
375 })
376 end
377
378 {:ok, object, meta}
379 else
380 nil -> {:error, :user_not_found}
381 error -> error
382 end
383 end
384
385 # Nothing to do
386 @impl true
387 def handle(object, meta) do
388 {:ok, object, meta}
389 end
390
391 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
392 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
393 actor = User.get_cached_by_ap_id(object.data["actor"])
394 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
395
396 streamables =
397 [[actor, recipient], [recipient, actor]]
398 |> Enum.uniq()
399 |> Enum.map(fn [user, other_user] ->
400 if user.local do
401 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
402 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
403
404 @cachex.put(
405 :chat_message_id_idempotency_key_cache,
406 cm_ref.id,
407 meta[:idempotency_key]
408 )
409
410 {
411 ["user", "user:pleroma_chat"],
412 {user, %{cm_ref | chat: chat, object: object}}
413 }
414 end
415 end)
416 |> Enum.filter(& &1)
417
418 meta =
419 meta
420 |> add_streamables(streamables)
421
422 {:ok, object, meta}
423 end
424 end
425
426 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
427 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
428 Object.increase_vote_count(
429 object.data["inReplyTo"],
430 object.data["name"],
431 object.data["actor"]
432 )
433
434 {:ok, object, meta}
435 end
436 end
437
438 def handle_object_creation(%{"type" => objtype} = object, meta)
439 when objtype in ~w[Audio Video Question Event Article Note Page] do
440 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
441 {:ok, object, meta}
442 end
443 end
444
445 # Nothing to do
446 def handle_object_creation(object, meta) do
447 {:ok, object, meta}
448 end
449
450 defp undo_like(nil, object), do: delete_object(object)
451
452 defp undo_like(%Object{} = liked_object, object) do
453 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
454 delete_object(object)
455 end
456 end
457
458 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
459 object.data["object"]
460 |> Object.get_by_ap_id()
461 |> undo_like(object)
462 end
463
464 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
465 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
466 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
467 {:ok, _} <- Repo.delete(object) do
468 :ok
469 end
470 end
471
472 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
473 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
474 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
475 {:ok, _} <- Repo.delete(object) do
476 :ok
477 end
478 end
479
480 def handle_undoing(
481 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
482 ) do
483 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
484 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
485 {:ok, _} <- User.unblock(blocker, blocked),
486 {:ok, _} <- Repo.delete(object) do
487 :ok
488 end
489 end
490
491 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
492
493 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
494 defp delete_object(object) do
495 with {:ok, _} <- Repo.delete(object), do: :ok
496 end
497
498 defp send_notifications(meta) do
499 Keyword.get(meta, :notifications, [])
500 |> Enum.each(fn notification ->
501 Streamer.stream(["user", "user:notification"], notification)
502 Push.send(notification)
503 end)
504
505 meta
506 end
507
508 defp send_streamables(meta) do
509 Keyword.get(meta, :streamables, [])
510 |> Enum.each(fn {topics, items} ->
511 Streamer.stream(topics, items)
512 end)
513
514 meta
515 end
516
517 defp add_streamables(meta, streamables) do
518 existing = Keyword.get(meta, :streamables, [])
519
520 meta
521 |> Keyword.put(:streamables, streamables ++ existing)
522 end
523
524 defp add_notifications(meta, notifications) do
525 existing = Keyword.get(meta, :notifications, [])
526
527 meta
528 |> Keyword.put(:notifications, notifications ++ existing)
529 end
530
531 @impl true
532 def handle_after_transaction(meta) do
533 meta
534 |> send_notifications()
535 |> send_streamables()
536 end
537 end