14a1da0c125d7d378e82d061189f2c2ad1160116
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25
26 def handle(object, meta \\ [])
27
28 # Task this handles
29 # - Follows
30 # - Sends a notification
31 def handle(
32 %{
33 data: %{
34 "actor" => actor,
35 "type" => "Accept",
36 "object" => follow_activity_id
37 }
38 } = object,
39 meta
40 ) do
41 with %Activity{actor: follower_id} = follow_activity <-
42 Activity.get_by_ap_id(follow_activity_id),
43 %User{} = followed <- User.get_cached_by_ap_id(actor),
44 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
45 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
46 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
47 Notification.update_notification_type(followed, follow_activity)
48 User.update_follower_count(followed)
49 User.update_following_count(follower)
50 end
51
52 {:ok, object, meta}
53 end
54
55 # Task this handles
56 # - Rejects all existing follow activities for this person
57 # - Updates the follow state
58 # - Dismisses notificatios
59 def handle(
60 %{
61 data: %{
62 "actor" => actor,
63 "type" => "Reject",
64 "object" => follow_activity_id
65 }
66 } = object,
67 meta
68 ) do
69 with %Activity{actor: follower_id} = follow_activity <-
70 Activity.get_by_ap_id(follow_activity_id),
71 %User{} = followed <- User.get_cached_by_ap_id(actor),
72 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
73 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
74 FollowingRelationship.update(follower, followed, :follow_reject)
75 Notification.dismiss(follow_activity)
76 end
77
78 {:ok, object, meta}
79 end
80
81 # Tasks this handle
82 # - Follows if possible
83 # - Sends a notification
84 # - Generates accept or reject if appropriate
85 def handle(
86 %{
87 data: %{
88 "id" => follow_id,
89 "type" => "Follow",
90 "object" => followed_user,
91 "actor" => following_user
92 }
93 } = object,
94 meta
95 ) do
96 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
97 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
98 {_, {:ok, _}, _, _} <-
99 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
100 if followed.local && !followed.locked do
101 {:ok, accept_data, _} = Builder.accept(followed, object)
102 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
103 end
104 else
105 {:following, {:error, _}, _follower, followed} ->
106 {:ok, reject_data, _} = Builder.reject(followed, object)
107 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
108
109 _ ->
110 nil
111 end
112
113 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
114
115 meta =
116 meta
117 |> add_notifications(notifications)
118
119 updated_object = Activity.get_by_ap_id(follow_id)
120
121 {:ok, updated_object, meta}
122 end
123
124 # Tasks this handles:
125 # - Unfollow and block
126 def handle(
127 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
128 object,
129 meta
130 ) do
131 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
132 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
133 User.block(blocker, blocked)
134 end
135
136 {:ok, object, meta}
137 end
138
139 # Tasks this handles:
140 # - Update the user
141 #
142 # For a local user, we also get a changeset with the full information, so we
143 # can update non-federating, non-activitypub settings as well.
144 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
145 if changeset = Keyword.get(meta, :user_update_changeset) do
146 changeset
147 |> User.update_and_set_cache()
148 else
149 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
150
151 User.get_by_ap_id(updated_object["id"])
152 |> User.remote_user_changeset(new_user_data)
153 |> User.update_and_set_cache()
154 end
155
156 {:ok, object, meta}
157 end
158
159 # Tasks this handles:
160 # - Add like to object
161 # - Set up notification
162 def handle(%{data: %{"type" => "Like"}} = object, meta) do
163 liked_object = Object.get_by_ap_id(object.data["object"])
164 Utils.add_like_to_object(object, liked_object)
165
166 Notification.create_notifications(object)
167
168 {:ok, object, meta}
169 end
170
171 # Tasks this handles
172 # - Actually create object
173 # - Rollback if we couldn't create it
174 # - Increase the user note count
175 # - Increase the reply count
176 # - Increase replies count
177 # - Set up ActivityExpiration
178 # - Set up notifications
179 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
180 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
181 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
182 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
183 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
184
185 if in_reply_to = object.data["inReplyTo"] do
186 Object.increase_replies_count(in_reply_to)
187 end
188
189 if expires_at = activity.data["expires_at"] do
190 ActivityExpiration.create(activity, expires_at)
191 end
192
193 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
194
195 meta =
196 meta
197 |> add_notifications(notifications)
198
199 {:ok, activity, meta}
200 else
201 e -> Repo.rollback(e)
202 end
203 end
204
205 # Tasks this handles:
206 # - Add announce to object
207 # - Set up notification
208 # - Stream out the announce
209 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
210 announced_object = Object.get_by_ap_id(object.data["object"])
211 user = User.get_cached_by_ap_id(object.data["actor"])
212
213 Utils.add_announce_to_object(object, announced_object)
214
215 if !User.is_internal_user?(user) do
216 Notification.create_notifications(object)
217
218 object
219 |> Topics.get_activity_topics()
220 |> Streamer.stream(object)
221 end
222
223 {:ok, object, meta}
224 end
225
226 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
227 with undone_object <- Activity.get_by_ap_id(undone_object),
228 :ok <- handle_undoing(undone_object) do
229 {:ok, object, meta}
230 end
231 end
232
233 # Tasks this handles:
234 # - Add reaction to object
235 # - Set up notification
236 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
237 reacted_object = Object.get_by_ap_id(object.data["object"])
238 Utils.add_emoji_reaction_to_object(object, reacted_object)
239
240 Notification.create_notifications(object)
241
242 {:ok, object, meta}
243 end
244
245 # Tasks this handles:
246 # - Delete and unpins the create activity
247 # - Replace object with Tombstone
248 # - Set up notification
249 # - Reduce the user note count
250 # - Reduce the reply count
251 # - Stream out the activity
252 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
253 deleted_object =
254 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
255
256 result =
257 case deleted_object do
258 %Object{} ->
259 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
260 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
261 User.remove_pinnned_activity(user, activity)
262
263 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
264
265 if in_reply_to = deleted_object.data["inReplyTo"] do
266 Object.decrease_replies_count(in_reply_to)
267 end
268
269 MessageReference.delete_for_object(deleted_object)
270
271 ActivityPub.stream_out(object)
272 ActivityPub.stream_out_participations(deleted_object, user)
273 :ok
274 end
275
276 %User{} ->
277 with {:ok, _} <- User.delete(deleted_object) do
278 :ok
279 end
280 end
281
282 if result == :ok do
283 Notification.create_notifications(object)
284 {:ok, object, meta}
285 else
286 {:error, result}
287 end
288 end
289
290 # Nothing to do
291 def handle(object, meta) do
292 {:ok, object, meta}
293 end
294
295 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
296 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
297 actor = User.get_cached_by_ap_id(object.data["actor"])
298 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
299
300 streamables =
301 [[actor, recipient], [recipient, actor]]
302 |> Enum.map(fn [user, other_user] ->
303 if user.local do
304 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
305 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
306
307 {
308 ["user", "user:pleroma_chat"],
309 {user, %{cm_ref | chat: chat, object: object}}
310 }
311 end
312 end)
313 |> Enum.filter(& &1)
314
315 meta =
316 meta
317 |> add_streamables(streamables)
318
319 {:ok, object, meta}
320 end
321 end
322
323 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
324 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
325 Object.increase_vote_count(
326 object.data["inReplyTo"],
327 object.data["name"],
328 object.data["actor"]
329 )
330
331 {:ok, object, meta}
332 end
333 end
334
335 def handle_object_creation(%{"type" => "Question"} = object, meta) do
336 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
337 {:ok, object, meta}
338 end
339 end
340
341 # Nothing to do
342 def handle_object_creation(object, meta) do
343 {:ok, object, meta}
344 end
345
346 defp undo_like(nil, object), do: delete_object(object)
347
348 defp undo_like(%Object{} = liked_object, object) do
349 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
350 delete_object(object)
351 end
352 end
353
354 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
355 object.data["object"]
356 |> Object.get_by_ap_id()
357 |> undo_like(object)
358 end
359
360 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
361 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
362 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
363 {:ok, _} <- Repo.delete(object) do
364 :ok
365 end
366 end
367
368 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
369 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
370 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
371 {:ok, _} <- Repo.delete(object) do
372 :ok
373 end
374 end
375
376 def handle_undoing(
377 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
378 ) do
379 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
380 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
381 {:ok, _} <- User.unblock(blocker, blocked),
382 {:ok, _} <- Repo.delete(object) do
383 :ok
384 end
385 end
386
387 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
388
389 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
390 defp delete_object(object) do
391 with {:ok, _} <- Repo.delete(object), do: :ok
392 end
393
394 defp send_notifications(meta) do
395 Keyword.get(meta, :notifications, [])
396 |> Enum.each(fn notification ->
397 Streamer.stream(["user", "user:notification"], notification)
398 Push.send(notification)
399 end)
400
401 meta
402 end
403
404 defp send_streamables(meta) do
405 Keyword.get(meta, :streamables, [])
406 |> Enum.each(fn {topics, items} ->
407 Streamer.stream(topics, items)
408 end)
409
410 meta
411 end
412
413 defp add_streamables(meta, streamables) do
414 existing = Keyword.get(meta, :streamables, [])
415
416 meta
417 |> Keyword.put(:streamables, streamables ++ existing)
418 end
419
420 defp add_notifications(meta, notifications) do
421 existing = Keyword.get(meta, :notifications, [])
422
423 meta
424 |> Keyword.put(:notifications, notifications ++ existing)
425 end
426
427 def handle_after_transaction(meta) do
428 meta
429 |> send_notifications()
430 |> send_streamables()
431 end
432 end