b30ca1bd7534db9ba6c63baf37d985c0477b42f3
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Chat
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.FollowingRelationship
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Repo
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ActivityPub
18 alias Pleroma.Web.ActivityPub.Builder
19 alias Pleroma.Web.ActivityPub.Pipeline
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Push
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Workers.BackgroundWorker
24
25 require Logger
26
27 def handle(object, meta \\ [])
28
29 # Task this handles
30 # - Follows
31 # - Sends a notification
32 def handle(
33 %{
34 data: %{
35 "actor" => actor,
36 "type" => "Accept",
37 "object" => follow_activity_id
38 }
39 } = object,
40 meta
41 ) do
42 with %Activity{actor: follower_id} = follow_activity <-
43 Activity.get_by_ap_id(follow_activity_id),
44 %User{} = followed <- User.get_cached_by_ap_id(actor),
45 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
46 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
47 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
48 Notification.update_notification_type(followed, follow_activity)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
51 end
52
53 {:ok, object, meta}
54 end
55
56 # Task this handles
57 # - Rejects all existing follow activities for this person
58 # - Updates the follow state
59 # - Dismisses notification
60 def handle(
61 %{
62 data: %{
63 "actor" => actor,
64 "type" => "Reject",
65 "object" => follow_activity_id
66 }
67 } = object,
68 meta
69 ) do
70 with %Activity{actor: follower_id} = follow_activity <-
71 Activity.get_by_ap_id(follow_activity_id),
72 %User{} = followed <- User.get_cached_by_ap_id(actor),
73 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
74 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
75 FollowingRelationship.update(follower, followed, :follow_reject)
76 Notification.dismiss(follow_activity)
77 end
78
79 {:ok, object, meta}
80 end
81
82 # Tasks this handle
83 # - Follows if possible
84 # - Sends a notification
85 # - Generates accept or reject if appropriate
86 def handle(
87 %{
88 data: %{
89 "id" => follow_id,
90 "type" => "Follow",
91 "object" => followed_user,
92 "actor" => following_user
93 }
94 } = object,
95 meta
96 ) do
97 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
98 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
99 {_, {:ok, _}, _, _} <-
100 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
101 if followed.local && !followed.locked do
102 {:ok, accept_data, _} = Builder.accept(followed, object)
103 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
104 end
105 else
106 {:following, {:error, _}, _follower, followed} ->
107 {:ok, reject_data, _} = Builder.reject(followed, object)
108 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
109
110 _ ->
111 nil
112 end
113
114 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
115
116 meta =
117 meta
118 |> add_notifications(notifications)
119
120 updated_object = Activity.get_by_ap_id(follow_id)
121
122 {:ok, updated_object, meta}
123 end
124
125 # Tasks this handles:
126 # - Unfollow and block
127 def handle(
128 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
129 object,
130 meta
131 ) do
132 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
133 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
134 User.block(blocker, blocked)
135 end
136
137 {:ok, object, meta}
138 end
139
140 # Tasks this handles:
141 # - Update the user
142 #
143 # For a local user, we also get a changeset with the full information, so we
144 # can update non-federating, non-activitypub settings as well.
145 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
146 if changeset = Keyword.get(meta, :user_update_changeset) do
147 changeset
148 |> User.update_and_set_cache()
149 else
150 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
151
152 User.get_by_ap_id(updated_object["id"])
153 |> User.remote_user_changeset(new_user_data)
154 |> User.update_and_set_cache()
155 end
156
157 {:ok, object, meta}
158 end
159
160 # Tasks this handles:
161 # - Add like to object
162 # - Set up notification
163 def handle(%{data: %{"type" => "Like"}} = object, meta) do
164 liked_object = Object.get_by_ap_id(object.data["object"])
165 Utils.add_like_to_object(object, liked_object)
166
167 Notification.create_notifications(object)
168
169 {:ok, object, meta}
170 end
171
172 # Tasks this handles
173 # - Actually create object
174 # - Rollback if we couldn't create it
175 # - Increase the user note count
176 # - Increase the reply count
177 # - Increase replies count
178 # - Set up ActivityExpiration
179 # - Set up notifications
180 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
181 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
182 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
183 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
184 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
185
186 if in_reply_to = object.data["inReplyTo"] do
187 Object.increase_replies_count(in_reply_to)
188 end
189
190 if expires_at = activity.data["expires_at"] do
191 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
192 activity_id: activity.id,
193 expires_at: expires_at
194 })
195 end
196
197 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
198
199 meta =
200 meta
201 |> add_notifications(notifications)
202
203 {:ok, activity, meta}
204 else
205 e -> Repo.rollback(e)
206 end
207 end
208
209 # Tasks this handles:
210 # - Add announce to object
211 # - Set up notification
212 # - Stream out the announce
213 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
214 announced_object = Object.get_by_ap_id(object.data["object"])
215 user = User.get_cached_by_ap_id(object.data["actor"])
216
217 Utils.add_announce_to_object(object, announced_object)
218
219 if !User.is_internal_user?(user) do
220 Notification.create_notifications(object)
221
222 object
223 |> Topics.get_activity_topics()
224 |> Streamer.stream(object)
225 end
226
227 {:ok, object, meta}
228 end
229
230 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
231 with undone_object <- Activity.get_by_ap_id(undone_object),
232 :ok <- handle_undoing(undone_object) do
233 {:ok, object, meta}
234 end
235 end
236
237 # Tasks this handles:
238 # - Add reaction to object
239 # - Set up notification
240 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
241 reacted_object = Object.get_by_ap_id(object.data["object"])
242 Utils.add_emoji_reaction_to_object(object, reacted_object)
243
244 Notification.create_notifications(object)
245
246 {:ok, object, meta}
247 end
248
249 # Tasks this handles:
250 # - Delete and unpins the create activity
251 # - Replace object with Tombstone
252 # - Set up notification
253 # - Reduce the user note count
254 # - Reduce the reply count
255 # - Stream out the activity
256 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
257 deleted_object =
258 Object.normalize(deleted_object, false) ||
259 User.get_cached_by_ap_id(deleted_object)
260
261 result =
262 case deleted_object do
263 %Object{} ->
264 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
265 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
266 %User{} = user <- User.get_cached_by_ap_id(actor) do
267 User.remove_pinnned_activity(user, activity)
268
269 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
270
271 if in_reply_to = deleted_object.data["inReplyTo"] do
272 Object.decrease_replies_count(in_reply_to)
273 end
274
275 MessageReference.delete_for_object(deleted_object)
276
277 ActivityPub.stream_out(object)
278 ActivityPub.stream_out_participations(deleted_object, user)
279 :ok
280 else
281 {:actor, _} ->
282 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
283 :no_object_actor
284 end
285
286 %User{} ->
287 with {:ok, _} <- User.delete(deleted_object) do
288 :ok
289 end
290 end
291
292 if result == :ok do
293 Notification.create_notifications(object)
294 {:ok, object, meta}
295 else
296 {:error, result}
297 end
298 end
299
300 # Nothing to do
301 def handle(object, meta) do
302 {:ok, object, meta}
303 end
304
305 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
306 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
307 actor = User.get_cached_by_ap_id(object.data["actor"])
308 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
309
310 streamables =
311 [[actor, recipient], [recipient, actor]]
312 |> Enum.map(fn [user, other_user] ->
313 if user.local do
314 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
315 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
316
317 {
318 ["user", "user:pleroma_chat"],
319 {user, %{cm_ref | chat: chat, object: object}}
320 }
321 end
322 end)
323 |> Enum.filter(& &1)
324
325 meta =
326 meta
327 |> add_streamables(streamables)
328
329 {:ok, object, meta}
330 end
331 end
332
333 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
334 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
335 Object.increase_vote_count(
336 object.data["inReplyTo"],
337 object.data["name"],
338 object.data["actor"]
339 )
340
341 {:ok, object, meta}
342 end
343 end
344
345 def handle_object_creation(%{"type" => objtype} = object, meta)
346 when objtype in ~w[Audio Question Event] do
347 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
348 {:ok, object, meta}
349 end
350 end
351
352 # Nothing to do
353 def handle_object_creation(object, meta) do
354 {:ok, object, meta}
355 end
356
357 defp undo_like(nil, object), do: delete_object(object)
358
359 defp undo_like(%Object{} = liked_object, object) do
360 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
361 delete_object(object)
362 end
363 end
364
365 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
366 object.data["object"]
367 |> Object.get_by_ap_id()
368 |> undo_like(object)
369 end
370
371 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
372 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
373 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
374 {:ok, _} <- Repo.delete(object) do
375 :ok
376 end
377 end
378
379 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
380 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
381 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
382 {:ok, _} <- Repo.delete(object) do
383 :ok
384 end
385 end
386
387 def handle_undoing(
388 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
389 ) do
390 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
391 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
392 {:ok, _} <- User.unblock(blocker, blocked),
393 {:ok, _} <- Repo.delete(object) do
394 :ok
395 end
396 end
397
398 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
399
400 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
401 defp delete_object(object) do
402 with {:ok, _} <- Repo.delete(object), do: :ok
403 end
404
405 defp send_notifications(meta) do
406 Keyword.get(meta, :notifications, [])
407 |> Enum.each(fn notification ->
408 Streamer.stream(["user", "user:notification"], notification)
409 Push.send(notification)
410 end)
411
412 meta
413 end
414
415 defp send_streamables(meta) do
416 Keyword.get(meta, :streamables, [])
417 |> Enum.each(fn {topics, items} ->
418 Streamer.stream(topics, items)
419 end)
420
421 meta
422 end
423
424 defp add_streamables(meta, streamables) do
425 existing = Keyword.get(meta, :streamables, [])
426
427 meta
428 |> Keyword.put(:streamables, streamables ++ existing)
429 end
430
431 defp add_notifications(meta, notifications) do
432 existing = Keyword.get(meta, :notifications, [])
433
434 meta
435 |> Keyword.put(:notifications, notifications ++ existing)
436 end
437
438 def handle_after_transaction(meta) do
439 meta
440 |> send_notifications()
441 |> send_streamables()
442 end
443 end