4228041e7a0256d3b0ef7a2899b88ea03e93807e
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25 alias Pleroma.FollowingRelationship
26
27 def handle(object, meta \\ [])
28
29 # Task this handles
30 # - Follows
31 # - Sends a notification
32 def handle(
33 %{
34 data: %{
35 "actor" => actor,
36 "type" => "Accept",
37 "object" => follow_activity_id
38 }
39 } = object,
40 meta
41 ) do
42 with %Activity{actor: follower_id} = follow_activity <-
43 Activity.get_by_ap_id(follow_activity_id),
44 %User{} = followed <- User.get_cached_by_ap_id(actor),
45 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
46 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
47 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
48 Notification.update_notification_type(followed, follow_activity)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
51 end
52
53 {:ok, object, meta}
54 end
55
56 # Tasks this handle
57 # - Follows if possible
58 # - Sends a notification
59 # - Generates accept or reject if appropriate
60 def handle(
61 %{
62 data: %{
63 "id" => follow_id,
64 "type" => "Follow",
65 "object" => followed_user,
66 "actor" => following_user
67 }
68 } = object,
69 meta
70 ) do
71 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
72 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
73 {_, {:ok, _}, _, _} <-
74 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
75 if followed.local && !followed.locked do
76 {:ok, accept_data, _} = Builder.accept(followed, object)
77 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
78 end
79 else
80 {:following, {:error, _}, follower, followed} ->
81 Utils.update_follow_state_for_all(object, "reject")
82 FollowingRelationship.update(follower, followed, :follow_reject)
83
84 if followed.local do
85 %{
86 to: [follower.ap_id],
87 actor: followed,
88 object: follow_id,
89 local: true
90 }
91 |> ActivityPub.reject()
92 end
93
94 _ ->
95 nil
96 end
97
98 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
99
100 meta =
101 meta
102 |> add_notifications(notifications)
103
104 updated_object = Activity.get_by_ap_id(follow_id)
105
106 {:ok, updated_object, meta}
107 end
108
109 # Tasks this handles:
110 # - Unfollow and block
111 def handle(
112 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
113 object,
114 meta
115 ) do
116 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
117 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
118 User.block(blocker, blocked)
119 end
120
121 {:ok, object, meta}
122 end
123
124 # Tasks this handles:
125 # - Update the user
126 #
127 # For a local user, we also get a changeset with the full information, so we
128 # can update non-federating, non-activitypub settings as well.
129 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
130 if changeset = Keyword.get(meta, :user_update_changeset) do
131 changeset
132 |> User.update_and_set_cache()
133 else
134 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
135
136 User.get_by_ap_id(updated_object["id"])
137 |> User.remote_user_changeset(new_user_data)
138 |> User.update_and_set_cache()
139 end
140
141 {:ok, object, meta}
142 end
143
144 # Tasks this handles:
145 # - Add like to object
146 # - Set up notification
147 def handle(%{data: %{"type" => "Like"}} = object, meta) do
148 liked_object = Object.get_by_ap_id(object.data["object"])
149 Utils.add_like_to_object(object, liked_object)
150
151 Notification.create_notifications(object)
152
153 {:ok, object, meta}
154 end
155
156 # Tasks this handles
157 # - Actually create object
158 # - Rollback if we couldn't create it
159 # - Increase the user note count
160 # - Increase the reply count
161 # - Increase replies count
162 # - Set up ActivityExpiration
163 # - Set up notifications
164 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
165 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
166 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
167 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
168 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
169
170 if in_reply_to = object.data["inReplyTo"] do
171 Object.increase_replies_count(in_reply_to)
172 end
173
174 if expires_at = activity.data["expires_at"] do
175 ActivityExpiration.create(activity, expires_at)
176 end
177
178 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
179
180 meta =
181 meta
182 |> add_notifications(notifications)
183
184 {:ok, activity, meta}
185 else
186 e -> Repo.rollback(e)
187 end
188 end
189
190 # Tasks this handles:
191 # - Add announce to object
192 # - Set up notification
193 # - Stream out the announce
194 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
195 announced_object = Object.get_by_ap_id(object.data["object"])
196 user = User.get_cached_by_ap_id(object.data["actor"])
197
198 Utils.add_announce_to_object(object, announced_object)
199
200 if !User.is_internal_user?(user) do
201 Notification.create_notifications(object)
202
203 object
204 |> Topics.get_activity_topics()
205 |> Streamer.stream(object)
206 end
207
208 {:ok, object, meta}
209 end
210
211 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
212 with undone_object <- Activity.get_by_ap_id(undone_object),
213 :ok <- handle_undoing(undone_object) do
214 {:ok, object, meta}
215 end
216 end
217
218 # Tasks this handles:
219 # - Add reaction to object
220 # - Set up notification
221 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
222 reacted_object = Object.get_by_ap_id(object.data["object"])
223 Utils.add_emoji_reaction_to_object(object, reacted_object)
224
225 Notification.create_notifications(object)
226
227 {:ok, object, meta}
228 end
229
230 # Tasks this handles:
231 # - Delete and unpins the create activity
232 # - Replace object with Tombstone
233 # - Set up notification
234 # - Reduce the user note count
235 # - Reduce the reply count
236 # - Stream out the activity
237 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
238 deleted_object =
239 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
240
241 result =
242 case deleted_object do
243 %Object{} ->
244 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
245 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
246 User.remove_pinnned_activity(user, activity)
247
248 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
249
250 if in_reply_to = deleted_object.data["inReplyTo"] do
251 Object.decrease_replies_count(in_reply_to)
252 end
253
254 MessageReference.delete_for_object(deleted_object)
255
256 ActivityPub.stream_out(object)
257 ActivityPub.stream_out_participations(deleted_object, user)
258 :ok
259 end
260
261 %User{} ->
262 with {:ok, _} <- User.delete(deleted_object) do
263 :ok
264 end
265 end
266
267 if result == :ok do
268 Notification.create_notifications(object)
269 {:ok, object, meta}
270 else
271 {:error, result}
272 end
273 end
274
275 # Nothing to do
276 def handle(object, meta) do
277 {:ok, object, meta}
278 end
279
280 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
281 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
282 actor = User.get_cached_by_ap_id(object.data["actor"])
283 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
284
285 streamables =
286 [[actor, recipient], [recipient, actor]]
287 |> Enum.map(fn [user, other_user] ->
288 if user.local do
289 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
290 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
291
292 {
293 ["user", "user:pleroma_chat"],
294 {user, %{cm_ref | chat: chat, object: object}}
295 }
296 end
297 end)
298 |> Enum.filter(& &1)
299
300 meta =
301 meta
302 |> add_streamables(streamables)
303
304 {:ok, object, meta}
305 end
306 end
307
308 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
309 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
310 Object.increase_vote_count(
311 object.data["inReplyTo"],
312 object.data["name"],
313 object.data["actor"]
314 )
315
316 {:ok, object, meta}
317 end
318 end
319
320 def handle_object_creation(%{"type" => "Question"} = object, meta) do
321 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
322 {:ok, object, meta}
323 end
324 end
325
326 # Nothing to do
327 def handle_object_creation(object, meta) do
328 {:ok, object, meta}
329 end
330
331 defp undo_like(nil, object), do: delete_object(object)
332
333 defp undo_like(%Object{} = liked_object, object) do
334 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
335 delete_object(object)
336 end
337 end
338
339 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
340 object.data["object"]
341 |> Object.get_by_ap_id()
342 |> undo_like(object)
343 end
344
345 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
346 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
347 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
348 {:ok, _} <- Repo.delete(object) do
349 :ok
350 end
351 end
352
353 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
354 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
355 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
356 {:ok, _} <- Repo.delete(object) do
357 :ok
358 end
359 end
360
361 def handle_undoing(
362 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
363 ) do
364 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
365 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
366 {:ok, _} <- User.unblock(blocker, blocked),
367 {:ok, _} <- Repo.delete(object) do
368 :ok
369 end
370 end
371
372 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
373
374 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
375 defp delete_object(object) do
376 with {:ok, _} <- Repo.delete(object), do: :ok
377 end
378
379 defp send_notifications(meta) do
380 Keyword.get(meta, :notifications, [])
381 |> Enum.each(fn notification ->
382 Streamer.stream(["user", "user:notification"], notification)
383 Push.send(notification)
384 end)
385
386 meta
387 end
388
389 defp send_streamables(meta) do
390 Keyword.get(meta, :streamables, [])
391 |> Enum.each(fn {topics, items} ->
392 Streamer.stream(topics, items)
393 end)
394
395 meta
396 end
397
398 defp add_streamables(meta, streamables) do
399 existing = Keyword.get(meta, :streamables, [])
400
401 meta
402 |> Keyword.put(:streamables, streamables ++ existing)
403 end
404
405 defp add_notifications(meta, notifications) do
406 existing = Keyword.get(meta, :notifications, [])
407
408 meta
409 |> Keyword.put(:notifications, notifications ++ existing)
410 end
411
412 def handle_after_transaction(meta) do
413 meta
414 |> send_notifications()
415 |> send_streamables()
416 end
417 end