1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
26 def handle(object, meta \\ [])
30 # - Sends a notification
36 "object" => follow_activity_id
41 with %Activity{actor: follower_id} = follow_activity <-
42 Activity.get_by_ap_id(follow_activity_id),
43 %User{} = followed <- User.get_cached_by_ap_id(actor),
44 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
45 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
46 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
47 Notification.update_notification_type(followed, follow_activity)
48 User.update_follower_count(followed)
49 User.update_following_count(follower)
56 # - Rejects all existing follow activities for this person
57 # - Updates the follow state
63 "object" => follow_activity_id
68 with %Activity{actor: follower_id} = follow_activity <-
69 Activity.get_by_ap_id(follow_activity_id),
70 %User{} = followed <- User.get_cached_by_ap_id(actor),
71 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
72 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
73 FollowingRelationship.update(follower, followed, :follow_reject)
80 # - Follows if possible
81 # - Sends a notification
82 # - Generates accept or reject if appropriate
88 "object" => followed_user,
89 "actor" => following_user
94 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
95 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
96 {_, {:ok, _}, _, _} <-
97 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
98 if followed.local && !followed.locked do
99 {:ok, accept_data, _} = Builder.accept(followed, object)
100 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
103 {:following, {:error, _}, follower, followed} ->
104 Utils.update_follow_state_for_all(object, "reject")
105 FollowingRelationship.update(follower, followed, :follow_reject)
109 to: [follower.ap_id],
114 |> ActivityPub.reject()
121 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
125 |> add_notifications(notifications)
127 updated_object = Activity.get_by_ap_id(follow_id)
129 {:ok, updated_object, meta}
132 # Tasks this handles:
133 # - Unfollow and block
135 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
139 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
140 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
141 User.block(blocker, blocked)
147 # Tasks this handles:
150 # For a local user, we also get a changeset with the full information, so we
151 # can update non-federating, non-activitypub settings as well.
152 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
153 if changeset = Keyword.get(meta, :user_update_changeset) do
155 |> User.update_and_set_cache()
157 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
159 User.get_by_ap_id(updated_object["id"])
160 |> User.remote_user_changeset(new_user_data)
161 |> User.update_and_set_cache()
167 # Tasks this handles:
168 # - Add like to object
169 # - Set up notification
170 def handle(%{data: %{"type" => "Like"}} = object, meta) do
171 liked_object = Object.get_by_ap_id(object.data["object"])
172 Utils.add_like_to_object(object, liked_object)
174 Notification.create_notifications(object)
180 # - Actually create object
181 # - Rollback if we couldn't create it
182 # - Increase the user note count
183 # - Increase the reply count
184 # - Increase replies count
185 # - Set up ActivityExpiration
186 # - Set up notifications
187 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
188 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
189 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
190 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
191 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
193 if in_reply_to = object.data["inReplyTo"] do
194 Object.increase_replies_count(in_reply_to)
197 if expires_at = activity.data["expires_at"] do
198 ActivityExpiration.create(activity, expires_at)
201 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
205 |> add_notifications(notifications)
207 {:ok, activity, meta}
209 e -> Repo.rollback(e)
213 # Tasks this handles:
214 # - Add announce to object
215 # - Set up notification
216 # - Stream out the announce
217 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
218 announced_object = Object.get_by_ap_id(object.data["object"])
219 user = User.get_cached_by_ap_id(object.data["actor"])
221 Utils.add_announce_to_object(object, announced_object)
223 if !User.is_internal_user?(user) do
224 Notification.create_notifications(object)
227 |> Topics.get_activity_topics()
228 |> Streamer.stream(object)
234 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
235 with undone_object <- Activity.get_by_ap_id(undone_object),
236 :ok <- handle_undoing(undone_object) do
241 # Tasks this handles:
242 # - Add reaction to object
243 # - Set up notification
244 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
245 reacted_object = Object.get_by_ap_id(object.data["object"])
246 Utils.add_emoji_reaction_to_object(object, reacted_object)
248 Notification.create_notifications(object)
253 # Tasks this handles:
254 # - Delete and unpins the create activity
255 # - Replace object with Tombstone
256 # - Set up notification
257 # - Reduce the user note count
258 # - Reduce the reply count
259 # - Stream out the activity
260 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
262 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
265 case deleted_object do
267 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
268 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
269 User.remove_pinnned_activity(user, activity)
271 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
273 if in_reply_to = deleted_object.data["inReplyTo"] do
274 Object.decrease_replies_count(in_reply_to)
277 MessageReference.delete_for_object(deleted_object)
279 ActivityPub.stream_out(object)
280 ActivityPub.stream_out_participations(deleted_object, user)
285 with {:ok, _} <- User.delete(deleted_object) do
291 Notification.create_notifications(object)
299 def handle(object, meta) do
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
325 |> add_streamables(streamables)
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
343 def handle_object_creation(%{"type" => "Question"} = object, meta) do
344 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
350 def handle_object_creation(object, meta) do
354 defp undo_like(nil, object), do: delete_object(object)
356 defp undo_like(%Object{} = liked_object, object) do
357 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
358 delete_object(object)
362 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
363 object.data["object"]
364 |> Object.get_by_ap_id()
368 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
369 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
370 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
371 {:ok, _} <- Repo.delete(object) do
376 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
377 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
378 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
379 {:ok, _} <- Repo.delete(object) do
385 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
387 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
388 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
389 {:ok, _} <- User.unblock(blocker, blocked),
390 {:ok, _} <- Repo.delete(object) do
395 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
397 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
398 defp delete_object(object) do
399 with {:ok, _} <- Repo.delete(object), do: :ok
402 defp send_notifications(meta) do
403 Keyword.get(meta, :notifications, [])
404 |> Enum.each(fn notification ->
405 Streamer.stream(["user", "user:notification"], notification)
406 Push.send(notification)
412 defp send_streamables(meta) do
413 Keyword.get(meta, :streamables, [])
414 |> Enum.each(fn {topics, items} ->
415 Streamer.stream(topics, items)
421 defp add_streamables(meta, streamables) do
422 existing = Keyword.get(meta, :streamables, [])
425 |> Keyword.put(:streamables, streamables ++ existing)
428 defp add_notifications(meta, notifications) do
429 existing = Keyword.get(meta, :notifications, [])
432 |> Keyword.put(:notifications, notifications ++ existing)
435 def handle_after_transaction(meta) do
437 |> send_notifications()
438 |> send_streamables()