Also use actor_type to determine if an account is a bot in antiFollowbotPolicy
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Chat
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Repo
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
32
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
34
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36
37 @impl true
38 def handle(object, meta \\ [])
39
40 # Task this handles
41 # - Follows
42 # - Sends a notification
43 @impl true
44 def handle(
45 %{
46 data: %{
47 "actor" => actor,
48 "type" => "Accept",
49 "object" => follow_activity_id
50 }
51 } = object,
52 meta
53 ) do
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
62 end
63
64 {:ok, object, meta}
65 end
66
67 # Task this handles
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
71 @impl true
72 def handle(
73 %{
74 data: %{
75 "actor" => actor,
76 "type" => "Reject",
77 "object" => follow_activity_id
78 }
79 } = object,
80 meta
81 ) do
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
89 end
90
91 {:ok, object, meta}
92 end
93
94 # Tasks this handle
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
98 @impl true
99 def handle(
100 %{
101 data: %{
102 "id" => follow_id,
103 "type" => "Follow",
104 "object" => followed_user,
105 "actor" => following_user
106 }
107 } = object,
108 meta
109 ) do
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 end
118 else
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
122
123 _ ->
124 nil
125 end
126
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128
129 meta =
130 meta
131 |> add_notifications(notifications)
132
133 updated_object = Activity.get_by_ap_id(follow_id)
134
135 {:ok, updated_object, meta}
136 end
137
138 # Tasks this handles:
139 # - Unfollow and block
140 @impl true
141 def handle(
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 object,
144 meta
145 ) do
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
149 end
150
151 {:ok, object, meta}
152 end
153
154 # Tasks this handles:
155 # - Update the user
156 #
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
159 @impl true
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
162 changeset
163 |> User.update_and_set_cache()
164 else
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
170 end
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
178 @impl true
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
182
183 Notification.create_notifications(object)
184
185 {:ok, object, meta}
186 end
187
188 # Tasks this handles
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 @impl true
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
203
204 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
205 Object.increase_replies_count(in_reply_to)
206 end
207
208 reply_depth = (meta[:depth] || 0) + 1
209
210 # FIXME: Force inReplyTo to replies
211 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
212 object.data["replies"] != nil do
213 for reply_id <- object.data["replies"] do
214 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
215 "id" => reply_id,
216 "depth" => reply_depth
217 })
218 end
219 end
220
221 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
222 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
223 end)
224
225 meta =
226 meta
227 |> add_notifications(notifications)
228
229 ap_streamer().stream_out(activity)
230
231 {:ok, activity, meta}
232 else
233 e -> Repo.rollback(e)
234 end
235 end
236
237 # Tasks this handles:
238 # - Add announce to object
239 # - Set up notification
240 # - Stream out the announce
241 @impl true
242 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
243 announced_object = Object.get_by_ap_id(object.data["object"])
244 user = User.get_cached_by_ap_id(object.data["actor"])
245
246 Utils.add_announce_to_object(object, announced_object)
247
248 if !User.is_internal_user?(user) do
249 Notification.create_notifications(object)
250
251 ap_streamer().stream_out(object)
252 end
253
254 {:ok, object, meta}
255 end
256
257 @impl true
258 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
259 with undone_object <- Activity.get_by_ap_id(undone_object),
260 :ok <- handle_undoing(undone_object) do
261 {:ok, object, meta}
262 end
263 end
264
265 # Tasks this handles:
266 # - Add reaction to object
267 # - Set up notification
268 @impl true
269 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
270 reacted_object = Object.get_by_ap_id(object.data["object"])
271 Utils.add_emoji_reaction_to_object(object, reacted_object)
272 Notification.create_notifications(object)
273
274 {:ok, object, meta}
275 end
276
277 # Tasks this handles:
278 # - Delete and unpins the create activity
279 # - Replace object with Tombstone
280 # - Set up notification
281 # - Reduce the user note count
282 # - Reduce the reply count
283 # - Stream out the activity
284 @impl true
285 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
286 deleted_object =
287 Object.normalize(deleted_object, fetch: false) ||
288 User.get_cached_by_ap_id(deleted_object)
289
290 result =
291 case deleted_object do
292 %Object{} ->
293 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
294 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
295 %User{} = user <- User.get_cached_by_ap_id(actor) do
296 User.remove_pinned_object_id(user, deleted_object.data["id"])
297
298 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
299
300 if in_reply_to = deleted_object.data["inReplyTo"] do
301 Object.decrease_replies_count(in_reply_to)
302 end
303
304 MessageReference.delete_for_object(deleted_object)
305
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
308 :ok
309 else
310 {:actor, _} ->
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
312 :no_object_actor
313 end
314
315 %User{} ->
316 with {:ok, _} <- User.delete(deleted_object) do
317 :ok
318 end
319 end
320
321 if result == :ok do
322 Notification.create_notifications(object)
323 {:ok, object, meta}
324 else
325 {:error, result}
326 end
327 end
328
329 # Tasks this handles:
330 # - adds pin to user
331 # - removes expiration job for pinned activity, if was set for expiration
332 @impl true
333 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
334 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
335 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
336 # if pinned activity was scheduled for deletion, we remove job
337 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
338 Oban.cancel_job(expiration.id)
339 end
340
341 {:ok, object, meta}
342 else
343 nil ->
344 {:error, :user_not_found}
345
346 {:error, changeset} ->
347 if changeset.errors[:pinned_objects] do
348 {:error, :pinned_statuses_limit_reached}
349 else
350 changeset.errors
351 end
352 end
353 end
354
355 # Tasks this handles:
356 # - removes pin from user
357 # - removes corresponding Add activity
358 # - if activity had expiration, recreates activity expiration job
359 @impl true
360 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
361 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
362 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
363 data["object"]
364 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
365 |> Repo.delete_all()
366
367 # if pinned activity was scheduled for deletion, we reschedule it for deletion
368 if meta[:expires_at] do
369 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
370 {:ok, expires_at} =
371 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
372
373 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
374 activity_id: meta[:activity_id],
375 expires_at: expires_at
376 })
377 end
378
379 {:ok, object, meta}
380 else
381 nil -> {:error, :user_not_found}
382 error -> error
383 end
384 end
385
386 # Nothing to do
387 @impl true
388 def handle(object, meta) do
389 {:ok, object, meta}
390 end
391
392 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
393 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
394 actor = User.get_cached_by_ap_id(object.data["actor"])
395 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
396
397 streamables =
398 [[actor, recipient], [recipient, actor]]
399 |> Enum.uniq()
400 |> Enum.map(fn [user, other_user] ->
401 if user.local do
402 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
403 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
404
405 @cachex.put(
406 :chat_message_id_idempotency_key_cache,
407 cm_ref.id,
408 meta[:idempotency_key]
409 )
410
411 {
412 ["user", "user:pleroma_chat"],
413 {user, %{cm_ref | chat: chat, object: object}}
414 }
415 end
416 end)
417 |> Enum.filter(& &1)
418
419 meta =
420 meta
421 |> add_streamables(streamables)
422
423 {:ok, object, meta}
424 end
425 end
426
427 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
428 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
429 PollWorker.schedule_poll_end(activity)
430 {:ok, object, meta}
431 end
432 end
433
434 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
435 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
436 Object.increase_vote_count(
437 object.data["inReplyTo"],
438 object.data["name"],
439 object.data["actor"]
440 )
441
442 {:ok, object, meta}
443 end
444 end
445
446 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
447 when objtype in ~w[Audio Video Event Article Note Page] do
448 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
449 {:ok, object, meta}
450 end
451 end
452
453 # Nothing to do
454 def handle_object_creation(object, _activity, meta) do
455 {:ok, object, meta}
456 end
457
458 defp undo_like(nil, object), do: delete_object(object)
459
460 defp undo_like(%Object{} = liked_object, object) do
461 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
462 delete_object(object)
463 end
464 end
465
466 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
467 object.data["object"]
468 |> Object.get_by_ap_id()
469 |> undo_like(object)
470 end
471
472 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
473 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
474 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
475 {:ok, _} <- Repo.delete(object) do
476 :ok
477 end
478 end
479
480 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
481 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
482 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
483 {:ok, _} <- Repo.delete(object) do
484 :ok
485 end
486 end
487
488 def handle_undoing(
489 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
490 ) do
491 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
492 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
493 {:ok, _} <- User.unblock(blocker, blocked),
494 {:ok, _} <- Repo.delete(object) do
495 :ok
496 end
497 end
498
499 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
500
501 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
502 defp delete_object(object) do
503 with {:ok, _} <- Repo.delete(object), do: :ok
504 end
505
506 defp send_notifications(meta) do
507 Keyword.get(meta, :notifications, [])
508 |> Enum.each(fn notification ->
509 Streamer.stream(["user", "user:notification"], notification)
510 Push.send(notification)
511 end)
512
513 meta
514 end
515
516 defp send_streamables(meta) do
517 Keyword.get(meta, :streamables, [])
518 |> Enum.each(fn {topics, items} ->
519 Streamer.stream(topics, items)
520 end)
521
522 meta
523 end
524
525 defp add_streamables(meta, streamables) do
526 existing = Keyword.get(meta, :streamables, [])
527
528 meta
529 |> Keyword.put(:streamables, streamables ++ existing)
530 end
531
532 defp add_notifications(meta, notifications) do
533 existing = Keyword.get(meta, :notifications, [])
534
535 meta
536 |> Keyword.put(:notifications, notifications ++ existing)
537 end
538
539 @impl true
540 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
541 Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
542 end
543
544 def handle_after_transaction(%Pleroma.Activity{
545 data: %{"type" => "Delete", "deleted_activity_id" => id}
546 }) do
547 Pleroma.Elasticsearch.delete_by_id(:activity, id)
548 end
549
550 def handle_after_transaction(%Pleroma.Activity{}) do
551 :ok
552 end
553
554 def handle_after_transaction(%Pleroma.Object{}) do
555 :ok
556 end
557
558 def handle_after_transaction(meta) do
559 meta
560 |> send_notifications()
561 |> send_streamables()
562 end
563 end