1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
37 def handle(object, meta \\ [])
41 # - Sends a notification
48 "object" => follow_activity_id
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
76 "object" => follow_activity_id
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
103 "object" => followed_user,
104 "actor" => following_user
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
130 |> add_notifications(notifications)
132 updated_object = Activity.get_by_ap_id(follow_id)
134 {:ok, updated_object, meta}
137 # Tasks this handles:
138 # - Unfollow and block
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
153 # Tasks this handles:
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
162 |> User.update_and_set_cache()
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
182 Notification.create_notifications(object)
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
206 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
207 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
212 |> add_notifications(notifications)
214 {:ok, activity, meta}
216 e -> Repo.rollback(e)
220 # Tasks this handles:
221 # - Add announce to object
222 # - Set up notification
223 # - Stream out the announce
225 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
226 announced_object = Object.get_by_ap_id(object.data["object"])
227 user = User.get_cached_by_ap_id(object.data["actor"])
229 Utils.add_announce_to_object(object, announced_object)
231 if !User.is_internal_user?(user) do
232 Notification.create_notifications(object)
235 |> Topics.get_activity_topics()
236 |> Streamer.stream(object)
243 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
244 with undone_object <- Activity.get_by_ap_id(undone_object),
245 :ok <- handle_undoing(undone_object) do
250 # Tasks this handles:
251 # - Add reaction to object
252 # - Set up notification
254 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
255 reacted_object = Object.get_by_ap_id(object.data["object"])
256 Utils.add_emoji_reaction_to_object(object, reacted_object)
258 Notification.create_notifications(object)
263 # Tasks this handles:
264 # - Delete and unpins the create activity
265 # - Replace object with Tombstone
266 # - Set up notification
267 # - Reduce the user note count
268 # - Reduce the reply count
269 # - Stream out the activity
271 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
273 Object.normalize(deleted_object, fetch: false) ||
274 User.get_cached_by_ap_id(deleted_object)
277 case deleted_object do
279 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
280 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
281 %User{} = user <- User.get_cached_by_ap_id(actor) do
282 User.remove_pinned_object_id(user, deleted_object.data["id"])
284 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
286 if in_reply_to = deleted_object.data["inReplyTo"] do
287 Object.decrease_replies_count(in_reply_to)
290 MessageReference.delete_for_object(deleted_object)
292 @ap_streamer.stream_out(object)
293 @ap_streamer.stream_out_participations(deleted_object, user)
297 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
302 with {:ok, _} <- User.delete(deleted_object) do
308 Notification.create_notifications(object)
315 # Tasks this handles:
317 # - removes expiration job for pinned activity, if was set for expiration
319 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
320 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
321 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
322 # if pinned activity was scheduled for deletion, we remove job
323 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
324 Oban.cancel_job(expiration.id)
330 {:error, :user_not_found}
332 {:error, changeset} ->
333 if changeset.errors[:pinned_objects] do
334 {:error, :pinned_statuses_limit_reached}
341 # Tasks this handles:
342 # - removes pin from user
343 # - if activity had expiration, recreates activity expiration job
345 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
346 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
347 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
348 # if pinned activity was scheduled for deletion, we reschedule it for deletion
349 if meta[:expires_at] do
350 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
352 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
354 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
355 activity_id: meta[:activity_id],
356 expires_at: expires_at
362 nil -> {:error, :user_not_found}
369 def handle(object, meta) do
373 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
374 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
375 actor = User.get_cached_by_ap_id(object.data["actor"])
376 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
379 [[actor, recipient], [recipient, actor]]
381 |> Enum.map(fn [user, other_user] ->
383 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
384 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
387 :chat_message_id_idempotency_key_cache,
389 meta[:idempotency_key]
393 ["user", "user:pleroma_chat"],
394 {user, %{cm_ref | chat: chat, object: object}}
402 |> add_streamables(streamables)
408 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
409 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
410 Object.increase_vote_count(
411 object.data["inReplyTo"],
420 def handle_object_creation(%{"type" => objtype} = object, meta)
421 when objtype in ~w[Audio Video Question Event Article] do
422 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
428 def handle_object_creation(object, meta) do
432 defp undo_like(nil, object), do: delete_object(object)
434 defp undo_like(%Object{} = liked_object, object) do
435 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
436 delete_object(object)
440 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
441 object.data["object"]
442 |> Object.get_by_ap_id()
446 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
447 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
448 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
449 {:ok, _} <- Repo.delete(object) do
454 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
455 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
456 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
457 {:ok, _} <- Repo.delete(object) do
463 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
465 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
466 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
467 {:ok, _} <- User.unblock(blocker, blocked),
468 {:ok, _} <- Repo.delete(object) do
473 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
475 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
476 defp delete_object(object) do
477 with {:ok, _} <- Repo.delete(object), do: :ok
480 defp send_notifications(meta) do
481 Keyword.get(meta, :notifications, [])
482 |> Enum.each(fn notification ->
483 Streamer.stream(["user", "user:notification"], notification)
484 Push.send(notification)
490 defp send_streamables(meta) do
491 Keyword.get(meta, :streamables, [])
492 |> Enum.each(fn {topics, items} ->
493 Streamer.stream(topics, items)
499 defp add_streamables(meta, streamables) do
500 existing = Keyword.get(meta, :streamables, [])
503 |> Keyword.put(:streamables, streamables ++ existing)
506 defp add_notifications(meta, notifications) do
507 existing = Keyword.get(meta, :notifications, [])
510 |> Keyword.put(:notifications, notifications ++ existing)
514 def handle_after_transaction(meta) do
516 |> send_notifications()
517 |> send_streamables()