Don't try removing deleted users and such from index as posts
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Chat
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Repo
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 # - Index incoming posts for search (if needed)
197 @impl true
198 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
199 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
200 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
201 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
202 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
204
205 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
206 Object.increase_replies_count(in_reply_to)
207 end
208
209 reply_depth = (meta[:depth] || 0) + 1
210
211 # FIXME: Force inReplyTo to replies
212 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
213 object.data["replies"] != nil do
214 for reply_id <- object.data["replies"] do
215 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
216 "id" => reply_id,
217 "depth" => reply_depth
218 })
219 end
220 end
221
222 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
223 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
224 end)
225
226 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
227
228 meta =
229 meta
230 |> add_notifications(notifications)
231
232 ap_streamer().stream_out(activity)
233
234 {:ok, activity, meta}
235 else
236 e -> Repo.rollback(e)
237 end
238 end
239
240 # Tasks this handles:
241 # - Add announce to object
242 # - Set up notification
243 # - Stream out the announce
244 @impl true
245 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
246 announced_object = Object.get_by_ap_id(object.data["object"])
247 user = User.get_cached_by_ap_id(object.data["actor"])
248
249 Utils.add_announce_to_object(object, announced_object)
250
251 if !User.is_internal_user?(user) do
252 Notification.create_notifications(object)
253
254 ap_streamer().stream_out(object)
255 end
256
257 {:ok, object, meta}
258 end
259
260 @impl true
261 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
262 with undone_object <- Activity.get_by_ap_id(undone_object),
263 :ok <- handle_undoing(undone_object) do
264 {:ok, object, meta}
265 end
266 end
267
268 # Tasks this handles:
269 # - Add reaction to object
270 # - Set up notification
271 @impl true
272 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
273 reacted_object = Object.get_by_ap_id(object.data["object"])
274 Utils.add_emoji_reaction_to_object(object, reacted_object)
275 Notification.create_notifications(object)
276
277 {:ok, object, meta}
278 end
279
280 # Tasks this handles:
281 # - Delete and unpins the create activity
282 # - Replace object with Tombstone
283 # - Set up notification
284 # - Reduce the user note count
285 # - Reduce the reply count
286 # - Stream out the activity
287 # - Removes posts from search index (if needed)
288 @impl true
289 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
290 deleted_object =
291 Object.normalize(deleted_object, fetch: false) ||
292 User.get_cached_by_ap_id(deleted_object)
293
294 result =
295 case deleted_object do
296 %Object{} ->
297 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
298 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
299 %User{} = user <- User.get_cached_by_ap_id(actor) do
300 User.remove_pinned_object_id(user, deleted_object.data["id"])
301
302 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
303
304 if in_reply_to = deleted_object.data["inReplyTo"] do
305 Object.decrease_replies_count(in_reply_to)
306 end
307
308 MessageReference.delete_for_object(deleted_object)
309
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
312 :ok
313 else
314 {:actor, _} ->
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316 :no_object_actor
317 end
318
319 %User{} ->
320 with {:ok, _} <- User.delete(deleted_object) do
321 :ok
322 end
323 end
324
325 if result == :ok do
326 Notification.create_notifications(object)
327
328 # Only remove from index when deleting actual objects, not users or anything else
329 with %Pleroma.Object{} <- deleted_object do
330 Pleroma.Search.remove_from_index(deleted_object)
331 end
332
333 {:ok, object, meta}
334 else
335 {:error, result}
336 end
337 end
338
339 # Tasks this handles:
340 # - adds pin to user
341 # - removes expiration job for pinned activity, if was set for expiration
342 @impl true
343 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
344 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
345 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
346 # if pinned activity was scheduled for deletion, we remove job
347 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
348 Oban.cancel_job(expiration.id)
349 end
350
351 {:ok, object, meta}
352 else
353 nil ->
354 {:error, :user_not_found}
355
356 {:error, changeset} ->
357 if changeset.errors[:pinned_objects] do
358 {:error, :pinned_statuses_limit_reached}
359 else
360 changeset.errors
361 end
362 end
363 end
364
365 # Tasks this handles:
366 # - removes pin from user
367 # - removes corresponding Add activity
368 # - if activity had expiration, recreates activity expiration job
369 @impl true
370 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
371 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
372 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
373 data["object"]
374 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
375 |> Repo.delete_all()
376
377 # if pinned activity was scheduled for deletion, we reschedule it for deletion
378 if meta[:expires_at] do
379 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
380 {:ok, expires_at} =
381 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
382
383 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
384 activity_id: meta[:activity_id],
385 expires_at: expires_at
386 })
387 end
388
389 {:ok, object, meta}
390 else
391 nil -> {:error, :user_not_found}
392 error -> error
393 end
394 end
395
396 # Nothing to do
397 @impl true
398 def handle(object, meta) do
399 {:ok, object, meta}
400 end
401
402 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
403 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
404 actor = User.get_cached_by_ap_id(object.data["actor"])
405 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
406
407 streamables =
408 [[actor, recipient], [recipient, actor]]
409 |> Enum.uniq()
410 |> Enum.map(fn [user, other_user] ->
411 if user.local do
412 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
413 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
414
415 @cachex.put(
416 :chat_message_id_idempotency_key_cache,
417 cm_ref.id,
418 meta[:idempotency_key]
419 )
420
421 {
422 ["user", "user:pleroma_chat"],
423 {user, %{cm_ref | chat: chat, object: object}}
424 }
425 end
426 end)
427 |> Enum.filter(& &1)
428
429 meta =
430 meta
431 |> add_streamables(streamables)
432
433 {:ok, object, meta}
434 end
435 end
436
437 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
438 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
439 PollWorker.schedule_poll_end(activity)
440 {:ok, object, meta}
441 end
442 end
443
444 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
445 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
446 Object.increase_vote_count(
447 object.data["inReplyTo"],
448 object.data["name"],
449 object.data["actor"]
450 )
451
452 {:ok, object, meta}
453 end
454 end
455
456 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
457 when objtype in ~w[Audio Video Event Article Note Page] do
458 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
459 {:ok, object, meta}
460 end
461 end
462
463 # Nothing to do
464 def handle_object_creation(object, _activity, meta) do
465 {:ok, object, meta}
466 end
467
468 defp undo_like(nil, object), do: delete_object(object)
469
470 defp undo_like(%Object{} = liked_object, object) do
471 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
472 delete_object(object)
473 end
474 end
475
476 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
477 object.data["object"]
478 |> Object.get_by_ap_id()
479 |> undo_like(object)
480 end
481
482 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
483 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
484 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
485 {:ok, _} <- Repo.delete(object) do
486 :ok
487 end
488 end
489
490 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
491 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
492 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
493 {:ok, _} <- Repo.delete(object) do
494 :ok
495 end
496 end
497
498 def handle_undoing(
499 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
500 ) do
501 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
502 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
503 {:ok, _} <- User.unblock(blocker, blocked),
504 {:ok, _} <- Repo.delete(object) do
505 :ok
506 end
507 end
508
509 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
510
511 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
512 defp delete_object(object) do
513 with {:ok, _} <- Repo.delete(object), do: :ok
514 end
515
516 defp send_notifications(meta) do
517 Keyword.get(meta, :notifications, [])
518 |> Enum.each(fn notification ->
519 Streamer.stream(["user", "user:notification"], notification)
520 Push.send(notification)
521 end)
522
523 meta
524 end
525
526 defp send_streamables(meta) do
527 Keyword.get(meta, :streamables, [])
528 |> Enum.each(fn {topics, items} ->
529 Streamer.stream(topics, items)
530 end)
531
532 meta
533 end
534
535 defp add_streamables(meta, streamables) do
536 existing = Keyword.get(meta, :streamables, [])
537
538 meta
539 |> Keyword.put(:streamables, streamables ++ existing)
540 end
541
542 defp add_notifications(meta, notifications) do
543 existing = Keyword.get(meta, :notifications, [])
544
545 meta
546 |> Keyword.put(:notifications, notifications ++ existing)
547 end
548
549 @impl true
550 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
551 Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
552 end
553
554 def handle_after_transaction(%Pleroma.Activity{
555 data: %{"type" => "Delete", "deleted_activity_id" => id}
556 }) do
557 Pleroma.Elasticsearch.delete_by_id(:activity, id)
558 end
559
560 def handle_after_transaction(%Pleroma.Activity{}) do
561 :ok
562 end
563
564 def handle_after_transaction(%Pleroma.Object{}) do
565 :ok
566 end
567
568 def handle_after_transaction(meta) do
569 meta
570 |> send_notifications()
571 |> send_streamables()
572 end
573 end