1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
38 def handle(object, meta \\ [])
42 # - Sends a notification
49 "object" => follow_activity_id
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
77 "object" => follow_activity_id
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
104 "object" => followed_user,
105 "actor" => following_user
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
131 |> add_notifications(notifications)
133 updated_object = Activity.get_by_ap_id(follow_id)
135 {:ok, updated_object, meta}
138 # Tasks this handles:
139 # - Unfollow and block
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
154 # Tasks this handles:
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
163 |> User.update_and_set_cache()
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
183 Notification.create_notifications(object)
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 # - Index incoming posts for search (if needed)
198 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
199 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
200 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
201 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
202 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
205 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
206 Object.increase_replies_count(in_reply_to)
209 reply_depth = (meta[:depth] || 0) + 1
211 # FIXME: Force inReplyTo to replies
212 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
213 object.data["replies"] != nil do
214 for reply_id <- object.data["replies"] do
215 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
217 "depth" => reply_depth
222 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
223 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
226 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
230 |> add_notifications(notifications)
232 ap_streamer().stream_out(activity)
234 {:ok, activity, meta}
236 e -> Repo.rollback(e)
240 # Tasks this handles:
241 # - Add announce to object
242 # - Set up notification
243 # - Stream out the announce
245 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
246 announced_object = Object.get_by_ap_id(object.data["object"])
247 user = User.get_cached_by_ap_id(object.data["actor"])
249 Utils.add_announce_to_object(object, announced_object)
251 if !User.is_internal_user?(user) do
252 Notification.create_notifications(object)
254 ap_streamer().stream_out(object)
261 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
262 with undone_object <- Activity.get_by_ap_id(undone_object),
263 :ok <- handle_undoing(undone_object) do
268 # Tasks this handles:
269 # - Add reaction to object
270 # - Set up notification
272 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
273 reacted_object = Object.get_by_ap_id(object.data["object"])
274 Utils.add_emoji_reaction_to_object(object, reacted_object)
276 Notification.create_notifications(object)
281 # Tasks this handles:
282 # - Delete and unpins the create activity
283 # - Replace object with Tombstone
284 # - Set up notification
285 # - Reduce the user note count
286 # - Reduce the reply count
287 # - Stream out the activity
288 # - Removes posts from search index (if needed)
290 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
292 Object.normalize(deleted_object, fetch: false) ||
293 User.get_cached_by_ap_id(deleted_object)
296 case deleted_object do
298 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
299 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
300 %User{} = user <- User.get_cached_by_ap_id(actor) do
301 User.remove_pinned_object_id(user, deleted_object.data["id"])
303 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
305 if in_reply_to = deleted_object.data["inReplyTo"] do
306 Object.decrease_replies_count(in_reply_to)
309 MessageReference.delete_for_object(deleted_object)
311 ap_streamer().stream_out(object)
312 ap_streamer().stream_out_participations(deleted_object, user)
316 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
321 with {:ok, _} <- User.delete(deleted_object) do
327 Notification.create_notifications(object)
329 # Only remove from index when deleting actual objects, not users or anything else
330 with %Pleroma.Object{} <- deleted_object do
331 Pleroma.Search.remove_from_index(deleted_object)
340 # Tasks this handles:
342 # - removes expiration job for pinned activity, if was set for expiration
344 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
345 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
346 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
347 # if pinned activity was scheduled for deletion, we remove job
348 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
349 Oban.cancel_job(expiration.id)
355 {:error, :user_not_found}
357 {:error, changeset} ->
358 if changeset.errors[:pinned_objects] do
359 {:error, :pinned_statuses_limit_reached}
366 # Tasks this handles:
367 # - removes pin from user
368 # - removes corresponding Add activity
369 # - if activity had expiration, recreates activity expiration job
371 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
372 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
373 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
375 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
378 # if pinned activity was scheduled for deletion, we reschedule it for deletion
379 if meta[:expires_at] do
380 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
382 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
384 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
385 activity_id: meta[:activity_id],
386 expires_at: expires_at
392 nil -> {:error, :user_not_found}
399 def handle(object, meta) do
403 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
404 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
405 actor = User.get_cached_by_ap_id(object.data["actor"])
406 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
409 [[actor, recipient], [recipient, actor]]
411 |> Enum.map(fn [user, other_user] ->
413 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
414 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
417 :chat_message_id_idempotency_key_cache,
419 meta[:idempotency_key]
423 ["user", "user:pleroma_chat"],
424 {user, %{cm_ref | chat: chat, object: object}}
432 |> add_streamables(streamables)
438 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
439 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
440 PollWorker.schedule_poll_end(activity)
445 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
446 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
447 Object.increase_vote_count(
448 object.data["inReplyTo"],
457 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
458 when objtype in ~w[Audio Video Event Article Note Page] do
459 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
465 def handle_object_creation(object, _activity, meta) do
469 defp undo_like(nil, object), do: delete_object(object)
471 defp undo_like(%Object{} = liked_object, object) do
472 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
473 delete_object(object)
477 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
478 object.data["object"]
479 |> Object.get_by_ap_id()
483 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
484 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
485 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
486 {:ok, _} <- Repo.delete(object) do
491 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
492 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
493 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
494 {:ok, _} <- Repo.delete(object) do
500 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
502 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
503 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
504 {:ok, _} <- User.unblock(blocker, blocked),
505 {:ok, _} <- Repo.delete(object) do
510 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
512 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
513 defp delete_object(object) do
514 with {:ok, _} <- Repo.delete(object), do: :ok
517 defp send_notifications(meta) do
518 Keyword.get(meta, :notifications, [])
519 |> Enum.each(fn notification ->
520 Streamer.stream(["user", "user:notification"], notification)
521 Push.send(notification)
527 defp send_streamables(meta) do
528 Keyword.get(meta, :streamables, [])
529 |> Enum.each(fn {topics, items} ->
530 Streamer.stream(topics, items)
536 defp add_streamables(meta, streamables) do
537 existing = Keyword.get(meta, :streamables, [])
540 |> Keyword.put(:streamables, streamables ++ existing)
543 defp add_notifications(meta, notifications) do
544 existing = Keyword.get(meta, :notifications, [])
547 |> Keyword.put(:notifications, notifications ++ existing)
551 def handle_after_transaction(meta) do
553 |> send_notifications()
554 |> send_streamables()