1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Pipeline
18 alias Pleroma.Web.ActivityPub.Utils
19 alias Pleroma.Web.Push
20 alias Pleroma.Web.Streamer
22 def handle(object, meta \\ [])
25 # - Unfollow and block
27 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
31 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
32 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
33 User.block(blocker, blocked)
42 # For a local user, we also get a changeset with the full information, so we
43 # can update non-federating, non-activitypub settings as well.
44 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
45 if changeset = Keyword.get(meta, :user_update_changeset) do
47 |> User.update_and_set_cache()
49 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
51 User.get_by_ap_id(updated_object["id"])
52 |> User.remote_user_changeset(new_user_data)
53 |> User.update_and_set_cache()
60 # - Add like to object
61 # - Set up notification
62 def handle(%{data: %{"type" => "Like"}} = object, meta) do
63 liked_object = Object.get_by_ap_id(object.data["object"])
64 Utils.add_like_to_object(object, liked_object)
66 Notification.create_notifications(object)
72 # - Actually create object
73 # - Rollback if we couldn't create it
74 # - Set up notifications
75 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
76 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
77 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
81 |> add_notifications(notifications)
90 # - Add announce to object
91 # - Set up notification
92 # - Stream out the announce
93 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
94 announced_object = Object.get_by_ap_id(object.data["object"])
95 user = User.get_cached_by_ap_id(object.data["actor"])
97 Utils.add_announce_to_object(object, announced_object)
99 if !User.is_internal_user?(user) do
100 Notification.create_notifications(object)
103 |> Topics.get_activity_topics()
104 |> Streamer.stream(object)
110 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
111 with undone_object <- Activity.get_by_ap_id(undone_object),
112 :ok <- handle_undoing(undone_object) do
117 # Tasks this handles:
118 # - Add reaction to object
119 # - Set up notification
120 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
121 reacted_object = Object.get_by_ap_id(object.data["object"])
122 Utils.add_emoji_reaction_to_object(object, reacted_object)
124 Notification.create_notifications(object)
129 # Tasks this handles:
130 # - Delete and unpins the create activity
131 # - Replace object with Tombstone
132 # - Set up notification
133 # - Reduce the user note count
134 # - Reduce the reply count
135 # - Stream out the activity
136 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
138 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
141 case deleted_object do
143 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
144 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
145 User.remove_pinnned_activity(user, activity)
147 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
149 if in_reply_to = deleted_object.data["inReplyTo"] do
150 Object.decrease_replies_count(in_reply_to)
153 MessageReference.delete_for_object(deleted_object)
155 ActivityPub.stream_out(object)
156 ActivityPub.stream_out_participations(deleted_object, user)
161 with {:ok, _} <- User.delete(deleted_object) do
167 Notification.create_notifications(object)
175 def handle(object, meta) do
179 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
180 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
181 actor = User.get_cached_by_ap_id(object.data["actor"])
182 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
185 [[actor, recipient], [recipient, actor]]
186 |> Enum.map(fn [user, other_user] ->
188 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
189 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
192 ["user", "user:pleroma_chat"],
193 {user, %{cm_ref | chat: chat, object: object}}
201 |> add_streamables(streamables)
208 def handle_object_creation(object) do
212 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
213 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
214 {:ok, _} <- Utils.remove_like_from_object(object, liked_object),
215 {:ok, _} <- Repo.delete(object) do
220 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
221 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
222 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
223 {:ok, _} <- Repo.delete(object) do
228 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
229 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
230 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
231 {:ok, _} <- Repo.delete(object) do
237 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
239 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
240 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
241 {:ok, _} <- User.unblock(blocker, blocked),
242 {:ok, _} <- Repo.delete(object) do
247 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
249 defp send_notifications(meta) do
250 Keyword.get(meta, :notifications, [])
251 |> Enum.each(fn notification ->
252 Streamer.stream(["user", "user:notification"], notification)
253 Push.send(notification)
259 defp send_streamables(meta) do
260 Keyword.get(meta, :streamables, [])
261 |> Enum.each(fn {topics, items} ->
262 Streamer.stream(topics, items)
268 defp add_streamables(meta, streamables) do
269 existing = Keyword.get(meta, :streamables, [])
272 |> Keyword.put(:streamables, streamables ++ existing)
275 defp add_notifications(meta, notifications) do
276 existing = Keyword.get(meta, :notifications, [])
279 |> Keyword.put(:notifications, notifications ++ existing)
282 def handle_after_transaction(meta) do
284 |> send_notifications()
285 |> send_streamables()