1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Pipeline
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Push
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Workers.BackgroundWorker
25 def handle(object, meta \\ [])
28 # - Follows if possible
29 # - Sends a notification
30 # - Generates accept or reject if appropriate
36 "object" => followed_user,
37 "actor" => following_user
42 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
43 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
44 {_, {:ok, _}, _, _} <-
45 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
46 if followed.local && !followed.locked do
47 Utils.update_follow_state_for_all(object, "accept")
48 FollowingRelationship.update(follower, followed, :follow_accept)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
58 |> ActivityPub.accept()
61 {:following, {:error, _}, follower, followed} ->
62 Utils.update_follow_state_for_all(object, "reject")
63 FollowingRelationship.update(follower, followed, :follow_reject)
72 |> ActivityPub.reject()
79 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
83 |> add_notifications(notifications)
85 updated_object = Activity.get_by_ap_id(follow_id)
87 {:ok, updated_object, meta}
91 # - Unfollow and block
93 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
97 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
98 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
99 User.block(blocker, blocked)
105 # Tasks this handles:
108 # For a local user, we also get a changeset with the full information, so we
109 # can update non-federating, non-activitypub settings as well.
110 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
111 if changeset = Keyword.get(meta, :user_update_changeset) do
113 |> User.update_and_set_cache()
115 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
117 User.get_by_ap_id(updated_object["id"])
118 |> User.remote_user_changeset(new_user_data)
119 |> User.update_and_set_cache()
125 # Tasks this handles:
126 # - Add like to object
127 # - Set up notification
128 def handle(%{data: %{"type" => "Like"}} = object, meta) do
129 liked_object = Object.get_by_ap_id(object.data["object"])
130 Utils.add_like_to_object(object, liked_object)
132 Notification.create_notifications(object)
138 # - Actually create object
139 # - Rollback if we couldn't create it
140 # - Increase the user note count
141 # - Increase the reply count
142 # - Increase replies count
143 # - Set up ActivityExpiration
144 # - Set up notifications
145 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
146 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
147 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
148 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
149 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
151 if in_reply_to = object.data["inReplyTo"] do
152 Object.increase_replies_count(in_reply_to)
155 if expires_at = activity.data["expires_at"] do
156 ActivityExpiration.create(activity, expires_at)
159 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
163 |> add_notifications(notifications)
165 {:ok, activity, meta}
167 e -> Repo.rollback(e)
171 # Tasks this handles:
172 # - Add announce to object
173 # - Set up notification
174 # - Stream out the announce
175 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
176 announced_object = Object.get_by_ap_id(object.data["object"])
177 user = User.get_cached_by_ap_id(object.data["actor"])
179 Utils.add_announce_to_object(object, announced_object)
181 if !User.is_internal_user?(user) do
182 Notification.create_notifications(object)
185 |> Topics.get_activity_topics()
186 |> Streamer.stream(object)
192 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
193 with undone_object <- Activity.get_by_ap_id(undone_object),
194 :ok <- handle_undoing(undone_object) do
199 # Tasks this handles:
200 # - Add reaction to object
201 # - Set up notification
202 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
203 reacted_object = Object.get_by_ap_id(object.data["object"])
204 Utils.add_emoji_reaction_to_object(object, reacted_object)
206 Notification.create_notifications(object)
211 # Tasks this handles:
212 # - Delete and unpins the create activity
213 # - Replace object with Tombstone
214 # - Set up notification
215 # - Reduce the user note count
216 # - Reduce the reply count
217 # - Stream out the activity
218 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
220 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
223 case deleted_object do
225 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
226 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
227 User.remove_pinnned_activity(user, activity)
229 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
231 if in_reply_to = deleted_object.data["inReplyTo"] do
232 Object.decrease_replies_count(in_reply_to)
235 MessageReference.delete_for_object(deleted_object)
237 ActivityPub.stream_out(object)
238 ActivityPub.stream_out_participations(deleted_object, user)
243 with {:ok, _} <- User.delete(deleted_object) do
249 Notification.create_notifications(object)
257 def handle(object, meta) do
261 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
262 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
263 actor = User.get_cached_by_ap_id(object.data["actor"])
264 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
267 [[actor, recipient], [recipient, actor]]
268 |> Enum.map(fn [user, other_user] ->
270 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
271 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
274 ["user", "user:pleroma_chat"],
275 {user, %{cm_ref | chat: chat, object: object}}
283 |> add_streamables(streamables)
289 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
290 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
291 Object.increase_vote_count(
292 object.data["inReplyTo"],
301 def handle_object_creation(%{"type" => "Question"} = object, meta) do
302 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
308 def handle_object_creation(object, meta) do
312 defp undo_like(nil, object), do: delete_object(object)
314 defp undo_like(%Object{} = liked_object, object) do
315 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
316 delete_object(object)
320 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
321 object.data["object"]
322 |> Object.get_by_ap_id()
326 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
327 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
328 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
329 {:ok, _} <- Repo.delete(object) do
334 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
335 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
336 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
337 {:ok, _} <- Repo.delete(object) do
343 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
345 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
346 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
347 {:ok, _} <- User.unblock(blocker, blocked),
348 {:ok, _} <- Repo.delete(object) do
353 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
355 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
356 defp delete_object(object) do
357 with {:ok, _} <- Repo.delete(object), do: :ok
360 defp send_notifications(meta) do
361 Keyword.get(meta, :notifications, [])
362 |> Enum.each(fn notification ->
363 Streamer.stream(["user", "user:notification"], notification)
364 Push.send(notification)
370 defp send_streamables(meta) do
371 Keyword.get(meta, :streamables, [])
372 |> Enum.each(fn {topics, items} ->
373 Streamer.stream(topics, items)
379 defp add_streamables(meta, streamables) do
380 existing = Keyword.get(meta, :streamables, [])
383 |> Keyword.put(:streamables, streamables ++ existing)
386 defp add_notifications(meta, notifications) do
387 existing = Keyword.get(meta, :notifications, [])
390 |> Keyword.put(:notifications, notifications ++ existing)
393 def handle_after_transaction(meta) do
395 |> send_notifications()
396 |> send_streamables()