purge chat and shout endpoints
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
25
26 require Logger
27
28 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
29
30 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
31
32 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
33
34 @impl true
35 def handle(object, meta \\ [])
36
37 # Task this handles
38 # - Follows
39 # - Sends a notification
40 @impl true
41 def handle(
42 %{
43 data: %{
44 "actor" => actor,
45 "type" => "Accept",
46 "object" => follow_activity_id
47 }
48 } = object,
49 meta
50 ) do
51 with %Activity{actor: follower_id} = follow_activity <-
52 Activity.get_by_ap_id(follow_activity_id),
53 %User{} = followed <- User.get_cached_by_ap_id(actor),
54 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
55 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
56 {:ok, _follower, followed} <-
57 FollowingRelationship.update(follower, followed, :follow_accept) do
58 Notification.update_notification_type(followed, follow_activity)
59 end
60
61 {:ok, object, meta}
62 end
63
64 # Task this handles
65 # - Rejects all existing follow activities for this person
66 # - Updates the follow state
67 # - Dismisses notification
68 @impl true
69 def handle(
70 %{
71 data: %{
72 "actor" => actor,
73 "type" => "Reject",
74 "object" => follow_activity_id
75 }
76 } = object,
77 meta
78 ) do
79 with %Activity{actor: follower_id} = follow_activity <-
80 Activity.get_by_ap_id(follow_activity_id),
81 %User{} = followed <- User.get_cached_by_ap_id(actor),
82 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
83 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
84 FollowingRelationship.update(follower, followed, :follow_reject)
85 Notification.dismiss(follow_activity)
86 end
87
88 {:ok, object, meta}
89 end
90
91 # Tasks this handle
92 # - Follows if possible
93 # - Sends a notification
94 # - Generates accept or reject if appropriate
95 @impl true
96 def handle(
97 %{
98 data: %{
99 "id" => follow_id,
100 "type" => "Follow",
101 "object" => followed_user,
102 "actor" => following_user
103 }
104 } = object,
105 meta
106 ) do
107 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
108 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
109 {_, {:ok, _, _}, _, _} <-
110 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
111 if followed.local && !followed.is_locked do
112 {:ok, accept_data, _} = Builder.accept(followed, object)
113 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
114 end
115 else
116 {:following, {:error, _}, _follower, followed} ->
117 {:ok, reject_data, _} = Builder.reject(followed, object)
118 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
119
120 _ ->
121 nil
122 end
123
124 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
125
126 meta =
127 meta
128 |> add_notifications(notifications)
129
130 updated_object = Activity.get_by_ap_id(follow_id)
131
132 {:ok, updated_object, meta}
133 end
134
135 # Tasks this handles:
136 # - Unfollow and block
137 @impl true
138 def handle(
139 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
140 object,
141 meta
142 ) do
143 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
144 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
145 User.block(blocker, blocked)
146 end
147
148 {:ok, object, meta}
149 end
150
151 # Tasks this handles:
152 # - Update the user
153 #
154 # For a local user, we also get a changeset with the full information, so we
155 # can update non-federating, non-activitypub settings as well.
156 @impl true
157 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
158 if changeset = Keyword.get(meta, :user_update_changeset) do
159 changeset
160 |> User.update_and_set_cache()
161 else
162 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
163
164 User.get_by_ap_id(updated_object["id"])
165 |> User.remote_user_changeset(new_user_data)
166 |> User.update_and_set_cache()
167 end
168
169 {:ok, object, meta}
170 end
171
172 # Tasks this handles:
173 # - Add like to object
174 # - Set up notification
175 @impl true
176 def handle(%{data: %{"type" => "Like"}} = object, meta) do
177 liked_object = Object.get_by_ap_id(object.data["object"])
178 Utils.add_like_to_object(object, liked_object)
179
180 Notification.create_notifications(object)
181
182 {:ok, object, meta}
183 end
184
185 # Tasks this handles
186 # - Actually create object
187 # - Rollback if we couldn't create it
188 # - Increase the user note count
189 # - Increase the reply count
190 # - Increase replies count
191 # - Set up ActivityExpiration
192 # - Set up notifications
193 # - Index incoming posts for search (if needed)
194 @impl true
195 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
196 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
197 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
198 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
199 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
200 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
201
202 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
203 Object.increase_replies_count(in_reply_to)
204 end
205
206 reply_depth = (meta[:depth] || 0) + 1
207
208 # FIXME: Force inReplyTo to replies
209 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
210 object.data["replies"] != nil do
211 for reply_id <- object.data["replies"] do
212 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
213 "id" => reply_id,
214 "depth" => reply_depth
215 })
216 end
217 end
218
219 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
220 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
221 end)
222
223 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
224
225 meta =
226 meta
227 |> add_notifications(notifications)
228
229 ap_streamer().stream_out(activity)
230
231 {:ok, activity, meta}
232 else
233 e -> Repo.rollback(e)
234 end
235 end
236
237 # Tasks this handles:
238 # - Add announce to object
239 # - Set up notification
240 # - Stream out the announce
241 @impl true
242 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
243 announced_object = Object.get_by_ap_id(object.data["object"])
244 user = User.get_cached_by_ap_id(object.data["actor"])
245
246 Utils.add_announce_to_object(object, announced_object)
247
248 if !User.is_internal_user?(user) do
249 Notification.create_notifications(object)
250
251 ap_streamer().stream_out(object)
252 end
253
254 {:ok, object, meta}
255 end
256
257 @impl true
258 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
259 with undone_object <- Activity.get_by_ap_id(undone_object),
260 :ok <- handle_undoing(undone_object) do
261 {:ok, object, meta}
262 end
263 end
264
265 # Tasks this handles:
266 # - Add reaction to object
267 # - Set up notification
268 @impl true
269 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
270 reacted_object = Object.get_by_ap_id(object.data["object"])
271 Utils.add_emoji_reaction_to_object(object, reacted_object)
272
273 Notification.create_notifications(object)
274
275 {:ok, object, meta}
276 end
277
278 # Tasks this handles:
279 # - Delete and unpins the create activity
280 # - Replace object with Tombstone
281 # - Set up notification
282 # - Reduce the user note count
283 # - Reduce the reply count
284 # - Stream out the activity
285 # - Removes posts from search index (if needed)
286 @impl true
287 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
288 deleted_object =
289 Object.normalize(deleted_object, fetch: false) ||
290 User.get_cached_by_ap_id(deleted_object)
291
292 result =
293 case deleted_object do
294 %Object{} ->
295 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
296 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
297 %User{} = user <- User.get_cached_by_ap_id(actor) do
298 User.remove_pinned_object_id(user, deleted_object.data["id"])
299
300 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
301
302 if in_reply_to = deleted_object.data["inReplyTo"] do
303 Object.decrease_replies_count(in_reply_to)
304 end
305
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
308 :ok
309 else
310 {:actor, _} ->
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
312 :no_object_actor
313 end
314
315 %User{} ->
316 with {:ok, _} <- User.delete(deleted_object) do
317 :ok
318 end
319 end
320
321 if result == :ok do
322 Notification.create_notifications(object)
323
324 # Only remove from index when deleting actual objects, not users or anything else
325 with %Pleroma.Object{} <- deleted_object do
326 Pleroma.Search.remove_from_index(deleted_object)
327 end
328
329 {:ok, object, meta}
330 else
331 {:error, result}
332 end
333 end
334
335 # Tasks this handles:
336 # - adds pin to user
337 # - removes expiration job for pinned activity, if was set for expiration
338 @impl true
339 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
340 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
341 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
342 # if pinned activity was scheduled for deletion, we remove job
343 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
344 Oban.cancel_job(expiration.id)
345 end
346
347 {:ok, object, meta}
348 else
349 nil ->
350 {:error, :user_not_found}
351
352 {:error, changeset} ->
353 if changeset.errors[:pinned_objects] do
354 {:error, :pinned_statuses_limit_reached}
355 else
356 changeset.errors
357 end
358 end
359 end
360
361 # Tasks this handles:
362 # - removes pin from user
363 # - removes corresponding Add activity
364 # - if activity had expiration, recreates activity expiration job
365 @impl true
366 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
367 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
368 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
369 data["object"]
370 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
371 |> Repo.delete_all()
372
373 # if pinned activity was scheduled for deletion, we reschedule it for deletion
374 if meta[:expires_at] do
375 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
376 {:ok, expires_at} =
377 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
378
379 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
380 activity_id: meta[:activity_id],
381 expires_at: expires_at
382 })
383 end
384
385 {:ok, object, meta}
386 else
387 nil -> {:error, :user_not_found}
388 error -> error
389 end
390 end
391
392 # Nothing to do
393 @impl true
394 def handle(object, meta) do
395 {:ok, object, meta}
396 end
397
398 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
399 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
400 PollWorker.schedule_poll_end(activity)
401 {:ok, object, meta}
402 end
403 end
404
405 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
406 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
407 Object.increase_vote_count(
408 object.data["inReplyTo"],
409 object.data["name"],
410 object.data["actor"]
411 )
412
413 {:ok, object, meta}
414 end
415 end
416
417 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
418 when objtype in ~w[Audio Video Event Article Note Page] do
419 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
420 {:ok, object, meta}
421 end
422 end
423
424 # Nothing to do
425 def handle_object_creation(object, _activity, meta) do
426 {:ok, object, meta}
427 end
428
429 defp undo_like(nil, object), do: delete_object(object)
430
431 defp undo_like(%Object{} = liked_object, object) do
432 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
433 delete_object(object)
434 end
435 end
436
437 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
438 object.data["object"]
439 |> Object.get_by_ap_id()
440 |> undo_like(object)
441 end
442
443 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
444 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
445 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
446 {:ok, _} <- Repo.delete(object) do
447 :ok
448 end
449 end
450
451 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
452 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
453 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
454 {:ok, _} <- Repo.delete(object) do
455 :ok
456 end
457 end
458
459 def handle_undoing(
460 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
461 ) do
462 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
463 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
464 {:ok, _} <- User.unblock(blocker, blocked),
465 {:ok, _} <- Repo.delete(object) do
466 :ok
467 end
468 end
469
470 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
471
472 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
473 defp delete_object(object) do
474 with {:ok, _} <- Repo.delete(object), do: :ok
475 end
476
477 defp send_notifications(meta) do
478 Keyword.get(meta, :notifications, [])
479 |> Enum.each(fn notification ->
480 Streamer.stream(["user", "user:notification"], notification)
481 Push.send(notification)
482 end)
483
484 meta
485 end
486
487 defp send_streamables(meta) do
488 Keyword.get(meta, :streamables, [])
489 |> Enum.each(fn {topics, items} ->
490 Streamer.stream(topics, items)
491 end)
492
493 meta
494 end
495
496 defp add_notifications(meta, notifications) do
497 existing = Keyword.get(meta, :notifications, [])
498
499 meta
500 |> Keyword.put(:notifications, notifications ++ existing)
501 end
502
503 @impl true
504 def handle_after_transaction(meta) do
505 meta
506 |> send_notifications()
507 |> send_streamables()
508 end
509 end