bcd6fd2fb1d55b10b31f37aa9586aa655dad33bc
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25
26 require Logger
27
28 def handle(object, meta \\ [])
29
30 # Task this handles
31 # - Follows
32 # - Sends a notification
33 def handle(
34 %{
35 data: %{
36 "actor" => actor,
37 "type" => "Accept",
38 "object" => follow_activity_id
39 }
40 } = object,
41 meta
42 ) do
43 with %Activity{actor: follower_id} = follow_activity <-
44 Activity.get_by_ap_id(follow_activity_id),
45 %User{} = followed <- User.get_cached_by_ap_id(actor),
46 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
47 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
48 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
49 Notification.update_notification_type(followed, follow_activity)
50 User.update_follower_count(followed)
51 User.update_following_count(follower)
52 end
53
54 {:ok, object, meta}
55 end
56
57 # Task this handles
58 # - Rejects all existing follow activities for this person
59 # - Updates the follow state
60 # - Dismisses notification
61 def handle(
62 %{
63 data: %{
64 "actor" => actor,
65 "type" => "Reject",
66 "object" => follow_activity_id
67 }
68 } = object,
69 meta
70 ) do
71 with %Activity{actor: follower_id} = follow_activity <-
72 Activity.get_by_ap_id(follow_activity_id),
73 %User{} = followed <- User.get_cached_by_ap_id(actor),
74 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
75 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
76 FollowingRelationship.update(follower, followed, :follow_reject)
77 Notification.dismiss(follow_activity)
78 end
79
80 {:ok, object, meta}
81 end
82
83 # Tasks this handle
84 # - Follows if possible
85 # - Sends a notification
86 # - Generates accept or reject if appropriate
87 def handle(
88 %{
89 data: %{
90 "id" => follow_id,
91 "type" => "Follow",
92 "object" => followed_user,
93 "actor" => following_user
94 }
95 } = object,
96 meta
97 ) do
98 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
99 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
100 {_, {:ok, _}, _, _} <-
101 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
102 if followed.local && !followed.locked do
103 {:ok, accept_data, _} = Builder.accept(followed, object)
104 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
105 end
106 else
107 {:following, {:error, _}, _follower, followed} ->
108 {:ok, reject_data, _} = Builder.reject(followed, object)
109 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
110
111 _ ->
112 nil
113 end
114
115 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
116
117 meta =
118 meta
119 |> add_notifications(notifications)
120
121 updated_object = Activity.get_by_ap_id(follow_id)
122
123 {:ok, updated_object, meta}
124 end
125
126 # Tasks this handles:
127 # - Unfollow and block
128 def handle(
129 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
130 object,
131 meta
132 ) do
133 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
134 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
135 User.block(blocker, blocked)
136 end
137
138 {:ok, object, meta}
139 end
140
141 # Tasks this handles:
142 # - Update the user
143 #
144 # For a local user, we also get a changeset with the full information, so we
145 # can update non-federating, non-activitypub settings as well.
146 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
147 if changeset = Keyword.get(meta, :user_update_changeset) do
148 changeset
149 |> User.update_and_set_cache()
150 else
151 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
152
153 User.get_by_ap_id(updated_object["id"])
154 |> User.remote_user_changeset(new_user_data)
155 |> User.update_and_set_cache()
156 end
157
158 {:ok, object, meta}
159 end
160
161 # Tasks this handles:
162 # - Add like to object
163 # - Set up notification
164 def handle(%{data: %{"type" => "Like"}} = object, meta) do
165 liked_object = Object.get_by_ap_id(object.data["object"])
166 Utils.add_like_to_object(object, liked_object)
167
168 Notification.create_notifications(object)
169
170 {:ok, object, meta}
171 end
172
173 # Tasks this handles
174 # - Actually create object
175 # - Rollback if we couldn't create it
176 # - Increase the user note count
177 # - Increase the reply count
178 # - Increase replies count
179 # - Set up ActivityExpiration
180 # - Set up notifications
181 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
182 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
183 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
184 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
185 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
186
187 if in_reply_to = object.data["inReplyTo"] do
188 Object.increase_replies_count(in_reply_to)
189 end
190
191 if expires_at = activity.data["expires_at"] do
192 ActivityExpiration.create(activity, expires_at)
193 end
194
195 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
196
197 meta =
198 meta
199 |> add_notifications(notifications)
200
201 {:ok, activity, meta}
202 else
203 e -> Repo.rollback(e)
204 end
205 end
206
207 # Tasks this handles:
208 # - Add announce to object
209 # - Set up notification
210 # - Stream out the announce
211 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
212 announced_object = Object.get_by_ap_id(object.data["object"])
213 user = User.get_cached_by_ap_id(object.data["actor"])
214
215 Utils.add_announce_to_object(object, announced_object)
216
217 if !User.is_internal_user?(user) do
218 Notification.create_notifications(object)
219
220 object
221 |> Topics.get_activity_topics()
222 |> Streamer.stream(object)
223 end
224
225 {:ok, object, meta}
226 end
227
228 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
229 with undone_object <- Activity.get_by_ap_id(undone_object),
230 :ok <- handle_undoing(undone_object) do
231 {:ok, object, meta}
232 end
233 end
234
235 # Tasks this handles:
236 # - Add reaction to object
237 # - Set up notification
238 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
239 reacted_object = Object.get_by_ap_id(object.data["object"])
240 Utils.add_emoji_reaction_to_object(object, reacted_object)
241
242 Notification.create_notifications(object)
243
244 {:ok, object, meta}
245 end
246
247 # Tasks this handles:
248 # - Delete and unpins the create activity
249 # - Replace object with Tombstone
250 # - Set up notification
251 # - Reduce the user note count
252 # - Reduce the reply count
253 # - Stream out the activity
254 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
255 deleted_object =
256 Object.normalize(deleted_object, false) ||
257 User.get_cached_by_ap_id(deleted_object)
258
259 result =
260 case deleted_object do
261 %Object{} ->
262 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
263 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
264 %User{} = user <- User.get_cached_by_ap_id(actor) do
265 User.remove_pinnned_activity(user, activity)
266
267 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
268
269 if in_reply_to = deleted_object.data["inReplyTo"] do
270 Object.decrease_replies_count(in_reply_to)
271 end
272
273 MessageReference.delete_for_object(deleted_object)
274
275 ActivityPub.stream_out(object)
276 ActivityPub.stream_out_participations(deleted_object, user)
277 :ok
278 else
279 {:actor, _} ->
280 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
281 :no_object_actor
282 end
283
284 %User{} ->
285 with {:ok, _} <- User.delete(deleted_object) do
286 :ok
287 end
288 end
289
290 if result == :ok do
291 Notification.create_notifications(object)
292 {:ok, object, meta}
293 else
294 {:error, result}
295 end
296 end
297
298 # Nothing to do
299 def handle(object, meta) do
300 {:ok, object, meta}
301 end
302
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
307
308 streamables =
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
311 if user.local do
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
314
315 {
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
318 }
319 end
320 end)
321 |> Enum.filter(& &1)
322
323 meta =
324 meta
325 |> add_streamables(streamables)
326
327 {:ok, object, meta}
328 end
329 end
330
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
335 object.data["name"],
336 object.data["actor"]
337 )
338
339 {:ok, object, meta}
340 end
341 end
342
343 def handle_object_creation(%{"type" => "Question"} = object, meta) do
344 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
345 {:ok, object, meta}
346 end
347 end
348
349 # Nothing to do
350 def handle_object_creation(object, meta) do
351 {:ok, object, meta}
352 end
353
354 defp undo_like(nil, object), do: delete_object(object)
355
356 defp undo_like(%Object{} = liked_object, object) do
357 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
358 delete_object(object)
359 end
360 end
361
362 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
363 object.data["object"]
364 |> Object.get_by_ap_id()
365 |> undo_like(object)
366 end
367
368 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
369 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
370 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
371 {:ok, _} <- Repo.delete(object) do
372 :ok
373 end
374 end
375
376 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
377 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
378 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
379 {:ok, _} <- Repo.delete(object) do
380 :ok
381 end
382 end
383
384 def handle_undoing(
385 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
386 ) do
387 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
388 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
389 {:ok, _} <- User.unblock(blocker, blocked),
390 {:ok, _} <- Repo.delete(object) do
391 :ok
392 end
393 end
394
395 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
396
397 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
398 defp delete_object(object) do
399 with {:ok, _} <- Repo.delete(object), do: :ok
400 end
401
402 defp send_notifications(meta) do
403 Keyword.get(meta, :notifications, [])
404 |> Enum.each(fn notification ->
405 Streamer.stream(["user", "user:notification"], notification)
406 Push.send(notification)
407 end)
408
409 meta
410 end
411
412 defp send_streamables(meta) do
413 Keyword.get(meta, :streamables, [])
414 |> Enum.each(fn {topics, items} ->
415 Streamer.stream(topics, items)
416 end)
417
418 meta
419 end
420
421 defp add_streamables(meta, streamables) do
422 existing = Keyword.get(meta, :streamables, [])
423
424 meta
425 |> Keyword.put(:streamables, streamables ++ existing)
426 end
427
428 defp add_notifications(meta, notifications) do
429 existing = Keyword.get(meta, :notifications, [])
430
431 meta
432 |> Keyword.put(:notifications, notifications ++ existing)
433 end
434
435 def handle_after_transaction(meta) do
436 meta
437 |> send_notifications()
438 |> send_streamables()
439 end
440 end