de02baf0f8db15e3bfa8a024755ec388e846dc66
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Chat
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.FollowingRelationship
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Repo
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ActivityPub
18 alias Pleroma.Web.ActivityPub.Pipeline
19 alias Pleroma.Web.ActivityPub.Utils
20 alias Pleroma.Web.Push
21 alias Pleroma.Web.Streamer
22
23 def handle(object, meta \\ [])
24
25 # Tasks this handle
26 # - Follows if possible
27 # - Sends a notification
28 # - Generates accept or reject if appropriate
29 def handle(
30 %{
31 data: %{
32 "id" => follow_id,
33 "type" => "Follow",
34 "object" => followed_user,
35 "actor" => following_user
36 }
37 } = object,
38 meta
39 ) do
40 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
41 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
42 {_, {:ok, _}, _, _} <-
43 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
44 if followed.local && !followed.locked do
45 Utils.update_follow_state_for_all(object, "accept")
46 FollowingRelationship.update(follower, followed, :follow_accept)
47 User.update_follower_count(followed)
48 User.update_following_count(follower)
49
50 %{
51 to: [following_user],
52 actor: followed,
53 object: follow_id,
54 local: true
55 }
56 |> ActivityPub.accept()
57 end
58 else
59 {:following, {:error, _}, follower, followed} ->
60 Utils.update_follow_state_for_all(object, "reject")
61 FollowingRelationship.update(follower, followed, :follow_reject)
62
63 if followed.local do
64 %{
65 to: [follower.ap_id],
66 actor: followed,
67 object: follow_id,
68 local: true
69 }
70 |> ActivityPub.reject()
71 end
72
73 _ ->
74 nil
75 end
76
77 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
78
79 meta =
80 meta
81 |> add_notifications(notifications)
82
83 updated_object = Activity.get_by_ap_id(follow_id)
84
85 {:ok, updated_object, meta}
86 end
87
88 # Tasks this handles:
89 # - Unfollow and block
90 def handle(
91 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
92 object,
93 meta
94 ) do
95 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
96 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
97 User.block(blocker, blocked)
98 end
99
100 {:ok, object, meta}
101 end
102
103 # Tasks this handles:
104 # - Update the user
105 #
106 # For a local user, we also get a changeset with the full information, so we
107 # can update non-federating, non-activitypub settings as well.
108 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
109 if changeset = Keyword.get(meta, :user_update_changeset) do
110 changeset
111 |> User.update_and_set_cache()
112 else
113 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
114
115 User.get_by_ap_id(updated_object["id"])
116 |> User.remote_user_changeset(new_user_data)
117 |> User.update_and_set_cache()
118 end
119
120 {:ok, object, meta}
121 end
122
123 # Tasks this handles:
124 # - Add like to object
125 # - Set up notification
126 def handle(%{data: %{"type" => "Like"}} = object, meta) do
127 liked_object = Object.get_by_ap_id(object.data["object"])
128 Utils.add_like_to_object(object, liked_object)
129
130 Notification.create_notifications(object)
131
132 {:ok, object, meta}
133 end
134
135 # Tasks this handles
136 # - Actually create object
137 # - Rollback if we couldn't create it
138 # - Set up notifications
139 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
140 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
141 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
142
143 meta =
144 meta
145 |> add_notifications(notifications)
146
147 {:ok, activity, meta}
148 else
149 e -> Repo.rollback(e)
150 end
151 end
152
153 # Tasks this handles:
154 # - Add announce to object
155 # - Set up notification
156 # - Stream out the announce
157 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
158 announced_object = Object.get_by_ap_id(object.data["object"])
159 user = User.get_cached_by_ap_id(object.data["actor"])
160
161 Utils.add_announce_to_object(object, announced_object)
162
163 if !User.is_internal_user?(user) do
164 Notification.create_notifications(object)
165
166 object
167 |> Topics.get_activity_topics()
168 |> Streamer.stream(object)
169 end
170
171 {:ok, object, meta}
172 end
173
174 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
175 with undone_object <- Activity.get_by_ap_id(undone_object),
176 :ok <- handle_undoing(undone_object) do
177 {:ok, object, meta}
178 end
179 end
180
181 # Tasks this handles:
182 # - Add reaction to object
183 # - Set up notification
184 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
185 reacted_object = Object.get_by_ap_id(object.data["object"])
186 Utils.add_emoji_reaction_to_object(object, reacted_object)
187
188 Notification.create_notifications(object)
189
190 {:ok, object, meta}
191 end
192
193 # Tasks this handles:
194 # - Delete and unpins the create activity
195 # - Replace object with Tombstone
196 # - Set up notification
197 # - Reduce the user note count
198 # - Reduce the reply count
199 # - Stream out the activity
200 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
201 deleted_object =
202 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
203
204 result =
205 case deleted_object do
206 %Object{} ->
207 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
208 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
209 User.remove_pinnned_activity(user, activity)
210
211 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
212
213 if in_reply_to = deleted_object.data["inReplyTo"] do
214 Object.decrease_replies_count(in_reply_to)
215 end
216
217 MessageReference.delete_for_object(deleted_object)
218
219 ActivityPub.stream_out(object)
220 ActivityPub.stream_out_participations(deleted_object, user)
221 :ok
222 end
223
224 %User{} ->
225 with {:ok, _} <- User.delete(deleted_object) do
226 :ok
227 end
228 end
229
230 if result == :ok do
231 Notification.create_notifications(object)
232 {:ok, object, meta}
233 else
234 {:error, result}
235 end
236 end
237
238 # Nothing to do
239 def handle(object, meta) do
240 {:ok, object, meta}
241 end
242
243 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
244 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
245 actor = User.get_cached_by_ap_id(object.data["actor"])
246 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
247
248 streamables =
249 [[actor, recipient], [recipient, actor]]
250 |> Enum.map(fn [user, other_user] ->
251 if user.local do
252 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
253 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
254
255 {
256 ["user", "user:pleroma_chat"],
257 {user, %{cm_ref | chat: chat, object: object}}
258 }
259 end
260 end)
261 |> Enum.filter(& &1)
262
263 meta =
264 meta
265 |> add_streamables(streamables)
266
267 {:ok, object, meta}
268 end
269 end
270
271 # Nothing to do
272 def handle_object_creation(object) do
273 {:ok, object}
274 end
275
276 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
277 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
278 {:ok, _} <- Utils.remove_like_from_object(object, liked_object),
279 {:ok, _} <- Repo.delete(object) do
280 :ok
281 end
282 end
283
284 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
285 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
286 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
287 {:ok, _} <- Repo.delete(object) do
288 :ok
289 end
290 end
291
292 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
293 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
294 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
295 {:ok, _} <- Repo.delete(object) do
296 :ok
297 end
298 end
299
300 def handle_undoing(
301 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
302 ) do
303 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
304 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
305 {:ok, _} <- User.unblock(blocker, blocked),
306 {:ok, _} <- Repo.delete(object) do
307 :ok
308 end
309 end
310
311 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
312
313 defp send_notifications(meta) do
314 Keyword.get(meta, :notifications, [])
315 |> Enum.each(fn notification ->
316 Streamer.stream(["user", "user:notification"], notification)
317 Push.send(notification)
318 end)
319
320 meta
321 end
322
323 defp send_streamables(meta) do
324 Keyword.get(meta, :streamables, [])
325 |> Enum.each(fn {topics, items} ->
326 Streamer.stream(topics, items)
327 end)
328
329 meta
330 end
331
332 defp add_streamables(meta, streamables) do
333 existing = Keyword.get(meta, :streamables, [])
334
335 meta
336 |> Keyword.put(:streamables, streamables ++ existing)
337 end
338
339 defp add_notifications(meta, notifications) do
340 existing = Keyword.get(meta, :notifications, [])
341
342 meta
343 |> Keyword.put(:notifications, notifications ++ existing)
344 end
345
346 def handle_after_transaction(meta) do
347 meta
348 |> send_notifications()
349 |> send_streamables()
350 end
351 end