Merge branch 'develop' into activation-meta
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Chat
10 alias Pleroma.Chat.MessageReference
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Repo
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Pipeline
17 alias Pleroma.Web.ActivityPub.Utils
18 alias Pleroma.Web.Push
19 alias Pleroma.Web.Streamer
20
21 def handle(object, meta \\ [])
22
23 # Tasks this handles:
24 # - Unfollow and block
25 def handle(
26 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
27 object,
28 meta
29 ) do
30 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
31 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
32 User.block(blocker, blocked)
33 end
34
35 {:ok, object, meta}
36 end
37
38 # Tasks this handles:
39 # - Update the user
40 #
41 # For a local user, we also get a changeset with the full information, so we
42 # can update non-federating, non-activitypub settings as well.
43 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
44 if changeset = Keyword.get(meta, :user_update_changeset) do
45 changeset
46 |> User.update_and_set_cache()
47 else
48 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
49
50 User.get_by_ap_id(updated_object["id"])
51 |> User.remote_user_changeset(new_user_data)
52 |> User.update_and_set_cache()
53 end
54
55 {:ok, object, meta}
56 end
57
58 # Tasks this handles:
59 # - Add like to object
60 # - Set up notification
61 def handle(%{data: %{"type" => "Like"}} = object, meta) do
62 liked_object = Object.get_by_ap_id(object.data["object"])
63 Utils.add_like_to_object(object, liked_object)
64
65 Notification.create_notifications(object)
66
67 {:ok, object, meta}
68 end
69
70 # Tasks this handles
71 # - Actually create object
72 # - Rollback if we couldn't create it
73 # - Set up notifications
74 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
75 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
76 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
77
78 meta =
79 meta
80 |> add_notifications(notifications)
81
82 {:ok, activity, meta}
83 else
84 e -> Repo.rollback(e)
85 end
86 end
87
88 # Tasks this handles:
89 # - Add announce to object
90 # - Set up notification
91 # - Stream out the announce
92 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
93 announced_object = Object.get_by_ap_id(object.data["object"])
94 user = User.get_cached_by_ap_id(object.data["actor"])
95
96 Utils.add_announce_to_object(object, announced_object)
97
98 if !User.is_internal_user?(user) do
99 Notification.create_notifications(object)
100 ActivityPub.stream_out(object)
101 end
102
103 {:ok, object, meta}
104 end
105
106 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
107 with undone_object <- Activity.get_by_ap_id(undone_object),
108 :ok <- handle_undoing(undone_object) do
109 {:ok, object, meta}
110 end
111 end
112
113 # Tasks this handles:
114 # - Add reaction to object
115 # - Set up notification
116 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
117 reacted_object = Object.get_by_ap_id(object.data["object"])
118 Utils.add_emoji_reaction_to_object(object, reacted_object)
119
120 Notification.create_notifications(object)
121
122 {:ok, object, meta}
123 end
124
125 # Tasks this handles:
126 # - Delete and unpins the create activity
127 # - Replace object with Tombstone
128 # - Set up notification
129 # - Reduce the user note count
130 # - Reduce the reply count
131 # - Stream out the activity
132 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
133 deleted_object =
134 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
135
136 result =
137 case deleted_object do
138 %Object{} ->
139 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
140 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
141 User.remove_pinnned_activity(user, activity)
142
143 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
144
145 if in_reply_to = deleted_object.data["inReplyTo"] do
146 Object.decrease_replies_count(in_reply_to)
147 end
148
149 MessageReference.delete_for_object(deleted_object)
150
151 ActivityPub.stream_out(object)
152 ActivityPub.stream_out_participations(deleted_object, user)
153 :ok
154 end
155
156 %User{} ->
157 with {:ok, _} <- User.delete(deleted_object) do
158 :ok
159 end
160 end
161
162 if result == :ok do
163 Notification.create_notifications(object)
164 {:ok, object, meta}
165 else
166 {:error, result}
167 end
168 end
169
170 # Nothing to do
171 def handle(object, meta) do
172 {:ok, object, meta}
173 end
174
175 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
176 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
177 actor = User.get_cached_by_ap_id(object.data["actor"])
178 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
179
180 streamables =
181 [[actor, recipient], [recipient, actor]]
182 |> Enum.map(fn [user, other_user] ->
183 if user.local do
184 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
185 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
186
187 {
188 ["user", "user:pleroma_chat"],
189 {user, %{cm_ref | chat: chat, object: object}}
190 }
191 end
192 end)
193 |> Enum.filter(& &1)
194
195 meta =
196 meta
197 |> add_streamables(streamables)
198
199 {:ok, object, meta}
200 end
201 end
202
203 # Nothing to do
204 def handle_object_creation(object) do
205 {:ok, object}
206 end
207
208 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
209 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
210 {:ok, _} <- Utils.remove_like_from_object(object, liked_object),
211 {:ok, _} <- Repo.delete(object) do
212 :ok
213 end
214 end
215
216 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
217 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
218 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
219 {:ok, _} <- Repo.delete(object) do
220 :ok
221 end
222 end
223
224 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
225 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
226 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
227 {:ok, _} <- Repo.delete(object) do
228 :ok
229 end
230 end
231
232 def handle_undoing(
233 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
234 ) do
235 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
236 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
237 {:ok, _} <- User.unblock(blocker, blocked),
238 {:ok, _} <- Repo.delete(object) do
239 :ok
240 end
241 end
242
243 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
244
245 defp send_notifications(meta) do
246 Keyword.get(meta, :notifications, [])
247 |> Enum.each(fn notification ->
248 Streamer.stream(["user", "user:notification"], notification)
249 Push.send(notification)
250 end)
251
252 meta
253 end
254
255 defp send_streamables(meta) do
256 Keyword.get(meta, :streamables, [])
257 |> Enum.each(fn {topics, items} ->
258 Streamer.stream(topics, items)
259 end)
260
261 meta
262 end
263
264 defp add_streamables(meta, streamables) do
265 existing = Keyword.get(meta, :streamables, [])
266
267 meta
268 |> Keyword.put(:streamables, streamables ++ existing)
269 end
270
271 defp add_notifications(meta, notifications) do
272 existing = Keyword.get(meta, :notifications, [])
273
274 meta
275 |> Keyword.put(:notifications, notifications ++ existing)
276 end
277
278 def handle_after_transaction(meta) do
279 meta
280 |> send_notifications()
281 |> send_streamables()
282 end
283 end