a4ad12d5372f494a537e90c6b21fcff6f25c74de
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25
26 def handle(object, meta \\ [])
27
28 # Task this handles
29 # - Follows
30 # - Sends a notification
31 def handle(
32 %{
33 data: %{
34 "actor" => actor,
35 "type" => "Accept",
36 "object" => follow_activity_id
37 }
38 } = object,
39 meta
40 ) do
41 with %Activity{actor: follower_id} = follow_activity <-
42 Activity.get_by_ap_id(follow_activity_id),
43 %User{} = followed <- User.get_cached_by_ap_id(actor),
44 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
45 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
46 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
47 Notification.update_notification_type(followed, follow_activity)
48 User.update_follower_count(followed)
49 User.update_following_count(follower)
50 end
51
52 {:ok, object, meta}
53 end
54
55 # Task this handles
56 # - Rejects all existing follow activities for this person
57 # - Updates the follow state
58 def handle(
59 %{
60 data: %{
61 "actor" => actor,
62 "type" => "Reject",
63 "object" => follow_activity_id
64 }
65 } = object,
66 meta
67 ) do
68 with %Activity{actor: follower_id} = follow_activity <-
69 Activity.get_by_ap_id(follow_activity_id),
70 %User{} = followed <- User.get_cached_by_ap_id(actor),
71 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
72 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
73 FollowingRelationship.update(follower, followed, :follow_reject)
74 end
75
76 {:ok, object, meta}
77 end
78
79 # Tasks this handle
80 # - Follows if possible
81 # - Sends a notification
82 # - Generates accept or reject if appropriate
83 def handle(
84 %{
85 data: %{
86 "id" => follow_id,
87 "type" => "Follow",
88 "object" => followed_user,
89 "actor" => following_user
90 }
91 } = object,
92 meta
93 ) do
94 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
95 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
96 {_, {:ok, _}, _, _} <-
97 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
98 if followed.local && !followed.locked do
99 {:ok, accept_data, _} = Builder.accept(followed, object)
100 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
101 end
102 else
103 {:following, {:error, _}, follower, followed} ->
104 Utils.update_follow_state_for_all(object, "reject")
105 FollowingRelationship.update(follower, followed, :follow_reject)
106
107 if followed.local do
108 %{
109 to: [follower.ap_id],
110 actor: followed,
111 object: follow_id,
112 local: true
113 }
114 |> ActivityPub.reject()
115 end
116
117 _ ->
118 nil
119 end
120
121 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
122
123 meta =
124 meta
125 |> add_notifications(notifications)
126
127 updated_object = Activity.get_by_ap_id(follow_id)
128
129 {:ok, updated_object, meta}
130 end
131
132 # Tasks this handles:
133 # - Unfollow and block
134 def handle(
135 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
136 object,
137 meta
138 ) do
139 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
140 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
141 User.block(blocker, blocked)
142 end
143
144 {:ok, object, meta}
145 end
146
147 # Tasks this handles:
148 # - Update the user
149 #
150 # For a local user, we also get a changeset with the full information, so we
151 # can update non-federating, non-activitypub settings as well.
152 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
153 if changeset = Keyword.get(meta, :user_update_changeset) do
154 changeset
155 |> User.update_and_set_cache()
156 else
157 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
158
159 User.get_by_ap_id(updated_object["id"])
160 |> User.remote_user_changeset(new_user_data)
161 |> User.update_and_set_cache()
162 end
163
164 {:ok, object, meta}
165 end
166
167 # Tasks this handles:
168 # - Add like to object
169 # - Set up notification
170 def handle(%{data: %{"type" => "Like"}} = object, meta) do
171 liked_object = Object.get_by_ap_id(object.data["object"])
172 Utils.add_like_to_object(object, liked_object)
173
174 Notification.create_notifications(object)
175
176 {:ok, object, meta}
177 end
178
179 # Tasks this handles
180 # - Actually create object
181 # - Rollback if we couldn't create it
182 # - Increase the user note count
183 # - Increase the reply count
184 # - Increase replies count
185 # - Set up ActivityExpiration
186 # - Set up notifications
187 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
188 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
189 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
190 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
191 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
192
193 if in_reply_to = object.data["inReplyTo"] do
194 Object.increase_replies_count(in_reply_to)
195 end
196
197 if expires_at = activity.data["expires_at"] do
198 ActivityExpiration.create(activity, expires_at)
199 end
200
201 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
202
203 meta =
204 meta
205 |> add_notifications(notifications)
206
207 {:ok, activity, meta}
208 else
209 e -> Repo.rollback(e)
210 end
211 end
212
213 # Tasks this handles:
214 # - Add announce to object
215 # - Set up notification
216 # - Stream out the announce
217 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
218 announced_object = Object.get_by_ap_id(object.data["object"])
219 user = User.get_cached_by_ap_id(object.data["actor"])
220
221 Utils.add_announce_to_object(object, announced_object)
222
223 if !User.is_internal_user?(user) do
224 Notification.create_notifications(object)
225
226 object
227 |> Topics.get_activity_topics()
228 |> Streamer.stream(object)
229 end
230
231 {:ok, object, meta}
232 end
233
234 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
235 with undone_object <- Activity.get_by_ap_id(undone_object),
236 :ok <- handle_undoing(undone_object) do
237 {:ok, object, meta}
238 end
239 end
240
241 # Tasks this handles:
242 # - Add reaction to object
243 # - Set up notification
244 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
245 reacted_object = Object.get_by_ap_id(object.data["object"])
246 Utils.add_emoji_reaction_to_object(object, reacted_object)
247
248 Notification.create_notifications(object)
249
250 {:ok, object, meta}
251 end
252
253 # Tasks this handles:
254 # - Delete and unpins the create activity
255 # - Replace object with Tombstone
256 # - Set up notification
257 # - Reduce the user note count
258 # - Reduce the reply count
259 # - Stream out the activity
260 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
261 deleted_object =
262 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
263
264 result =
265 case deleted_object do
266 %Object{} ->
267 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
268 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
269 User.remove_pinnned_activity(user, activity)
270
271 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
272
273 if in_reply_to = deleted_object.data["inReplyTo"] do
274 Object.decrease_replies_count(in_reply_to)
275 end
276
277 MessageReference.delete_for_object(deleted_object)
278
279 ActivityPub.stream_out(object)
280 ActivityPub.stream_out_participations(deleted_object, user)
281 :ok
282 end
283
284 %User{} ->
285 with {:ok, _} <- User.delete(deleted_object) do
286 :ok
287 end
288 end
289
290 if result == :ok do
291 Notification.create_notifications(object)
292 {:ok, object, meta}
293 else
294 {:error, result}
295 end
296 end
297
298 # Nothing to do
299 def handle(object, meta) do
300 {:ok, object, meta}
301 end
302
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
307
308 streamables =
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
311 if user.local do
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
314
315 {
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
318 }
319 end
320 end)
321 |> Enum.filter(& &1)
322
323 meta =
324 meta
325 |> add_streamables(streamables)
326
327 {:ok, object, meta}
328 end
329 end
330
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
335 object.data["name"],
336 object.data["actor"]
337 )
338
339 {:ok, object, meta}
340 end
341 end
342
343 def handle_object_creation(%{"type" => "Question"} = object, meta) do
344 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
345 {:ok, object, meta}
346 end
347 end
348
349 # Nothing to do
350 def handle_object_creation(object, meta) do
351 {:ok, object, meta}
352 end
353
354 defp undo_like(nil, object), do: delete_object(object)
355
356 defp undo_like(%Object{} = liked_object, object) do
357 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
358 delete_object(object)
359 end
360 end
361
362 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
363 object.data["object"]
364 |> Object.get_by_ap_id()
365 |> undo_like(object)
366 end
367
368 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
369 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
370 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
371 {:ok, _} <- Repo.delete(object) do
372 :ok
373 end
374 end
375
376 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
377 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
378 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
379 {:ok, _} <- Repo.delete(object) do
380 :ok
381 end
382 end
383
384 def handle_undoing(
385 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
386 ) do
387 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
388 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
389 {:ok, _} <- User.unblock(blocker, blocked),
390 {:ok, _} <- Repo.delete(object) do
391 :ok
392 end
393 end
394
395 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
396
397 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
398 defp delete_object(object) do
399 with {:ok, _} <- Repo.delete(object), do: :ok
400 end
401
402 defp send_notifications(meta) do
403 Keyword.get(meta, :notifications, [])
404 |> Enum.each(fn notification ->
405 Streamer.stream(["user", "user:notification"], notification)
406 Push.send(notification)
407 end)
408
409 meta
410 end
411
412 defp send_streamables(meta) do
413 Keyword.get(meta, :streamables, [])
414 |> Enum.each(fn {topics, items} ->
415 Streamer.stream(topics, items)
416 end)
417
418 meta
419 end
420
421 defp add_streamables(meta, streamables) do
422 existing = Keyword.get(meta, :streamables, [])
423
424 meta
425 |> Keyword.put(:streamables, streamables ++ existing)
426 end
427
428 defp add_notifications(meta, notifications) do
429 existing = Keyword.get(meta, :notifications, [])
430
431 meta
432 |> Keyword.put(:notifications, notifications ++ existing)
433 end
434
435 def handle_after_transaction(meta) do
436 meta
437 |> send_notifications()
438 |> send_streamables()
439 end
440 end