e1fa75e1c49308bec166e6abe7d03c028c91a044
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25
26 def handle(object, meta \\ [])
27
28 # Task this handles
29 # - Follows
30 # - Sends a notification
31 def handle(
32 %{
33 data: %{
34 "actor" => actor,
35 "type" => "Accept",
36 "object" => follow_activity_id
37 }
38 } = object,
39 meta
40 ) do
41 with %Activity{actor: follower_id} = follow_activity <-
42 Activity.get_by_ap_id(follow_activity_id),
43 %User{} = followed <- User.get_cached_by_ap_id(actor),
44 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
45 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
46 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
47 Notification.update_notification_type(followed, follow_activity)
48 User.update_follower_count(followed)
49 User.update_following_count(follower)
50 end
51
52 {:ok, object, meta}
53 end
54
55 # Tasks this handle
56 # - Follows if possible
57 # - Sends a notification
58 # - Generates accept or reject if appropriate
59 def handle(
60 %{
61 data: %{
62 "id" => follow_id,
63 "type" => "Follow",
64 "object" => followed_user,
65 "actor" => following_user
66 }
67 } = object,
68 meta
69 ) do
70 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
71 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
72 {_, {:ok, _}, _, _} <-
73 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
74 if followed.local && !followed.locked do
75 {:ok, accept_data, _} = Builder.accept(followed, object)
76 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
77 end
78 else
79 {:following, {:error, _}, follower, followed} ->
80 Utils.update_follow_state_for_all(object, "reject")
81 FollowingRelationship.update(follower, followed, :follow_reject)
82
83 if followed.local do
84 %{
85 to: [follower.ap_id],
86 actor: followed,
87 object: follow_id,
88 local: true
89 }
90 |> ActivityPub.reject()
91 end
92
93 _ ->
94 nil
95 end
96
97 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
98
99 meta =
100 meta
101 |> add_notifications(notifications)
102
103 updated_object = Activity.get_by_ap_id(follow_id)
104
105 {:ok, updated_object, meta}
106 end
107
108 # Tasks this handles:
109 # - Unfollow and block
110 def handle(
111 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
112 object,
113 meta
114 ) do
115 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
116 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
117 User.block(blocker, blocked)
118 end
119
120 {:ok, object, meta}
121 end
122
123 # Tasks this handles:
124 # - Update the user
125 #
126 # For a local user, we also get a changeset with the full information, so we
127 # can update non-federating, non-activitypub settings as well.
128 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
129 if changeset = Keyword.get(meta, :user_update_changeset) do
130 changeset
131 |> User.update_and_set_cache()
132 else
133 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
134
135 User.get_by_ap_id(updated_object["id"])
136 |> User.remote_user_changeset(new_user_data)
137 |> User.update_and_set_cache()
138 end
139
140 {:ok, object, meta}
141 end
142
143 # Tasks this handles:
144 # - Add like to object
145 # - Set up notification
146 def handle(%{data: %{"type" => "Like"}} = object, meta) do
147 liked_object = Object.get_by_ap_id(object.data["object"])
148 Utils.add_like_to_object(object, liked_object)
149
150 Notification.create_notifications(object)
151
152 {:ok, object, meta}
153 end
154
155 # Tasks this handles
156 # - Actually create object
157 # - Rollback if we couldn't create it
158 # - Increase the user note count
159 # - Increase the reply count
160 # - Increase replies count
161 # - Set up ActivityExpiration
162 # - Set up notifications
163 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
164 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
165 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
166 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
167 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
168
169 if in_reply_to = object.data["inReplyTo"] do
170 Object.increase_replies_count(in_reply_to)
171 end
172
173 if expires_at = activity.data["expires_at"] do
174 ActivityExpiration.create(activity, expires_at)
175 end
176
177 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
178
179 meta =
180 meta
181 |> add_notifications(notifications)
182
183 {:ok, activity, meta}
184 else
185 e -> Repo.rollback(e)
186 end
187 end
188
189 # Tasks this handles:
190 # - Add announce to object
191 # - Set up notification
192 # - Stream out the announce
193 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
194 announced_object = Object.get_by_ap_id(object.data["object"])
195 user = User.get_cached_by_ap_id(object.data["actor"])
196
197 Utils.add_announce_to_object(object, announced_object)
198
199 if !User.is_internal_user?(user) do
200 Notification.create_notifications(object)
201
202 object
203 |> Topics.get_activity_topics()
204 |> Streamer.stream(object)
205 end
206
207 {:ok, object, meta}
208 end
209
210 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
211 with undone_object <- Activity.get_by_ap_id(undone_object),
212 :ok <- handle_undoing(undone_object) do
213 {:ok, object, meta}
214 end
215 end
216
217 # Tasks this handles:
218 # - Add reaction to object
219 # - Set up notification
220 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
221 reacted_object = Object.get_by_ap_id(object.data["object"])
222 Utils.add_emoji_reaction_to_object(object, reacted_object)
223
224 Notification.create_notifications(object)
225
226 {:ok, object, meta}
227 end
228
229 # Tasks this handles:
230 # - Delete and unpins the create activity
231 # - Replace object with Tombstone
232 # - Set up notification
233 # - Reduce the user note count
234 # - Reduce the reply count
235 # - Stream out the activity
236 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
237 deleted_object =
238 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
239
240 result =
241 case deleted_object do
242 %Object{} ->
243 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
244 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
245 User.remove_pinnned_activity(user, activity)
246
247 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
248
249 if in_reply_to = deleted_object.data["inReplyTo"] do
250 Object.decrease_replies_count(in_reply_to)
251 end
252
253 MessageReference.delete_for_object(deleted_object)
254
255 ActivityPub.stream_out(object)
256 ActivityPub.stream_out_participations(deleted_object, user)
257 :ok
258 end
259
260 %User{} ->
261 with {:ok, _} <- User.delete(deleted_object) do
262 :ok
263 end
264 end
265
266 if result == :ok do
267 Notification.create_notifications(object)
268 {:ok, object, meta}
269 else
270 {:error, result}
271 end
272 end
273
274 # Nothing to do
275 def handle(object, meta) do
276 {:ok, object, meta}
277 end
278
279 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
280 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
281 actor = User.get_cached_by_ap_id(object.data["actor"])
282 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
283
284 streamables =
285 [[actor, recipient], [recipient, actor]]
286 |> Enum.map(fn [user, other_user] ->
287 if user.local do
288 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
289 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
290
291 {
292 ["user", "user:pleroma_chat"],
293 {user, %{cm_ref | chat: chat, object: object}}
294 }
295 end
296 end)
297 |> Enum.filter(& &1)
298
299 meta =
300 meta
301 |> add_streamables(streamables)
302
303 {:ok, object, meta}
304 end
305 end
306
307 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
308 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
309 Object.increase_vote_count(
310 object.data["inReplyTo"],
311 object.data["name"],
312 object.data["actor"]
313 )
314
315 {:ok, object, meta}
316 end
317 end
318
319 def handle_object_creation(%{"type" => "Question"} = object, meta) do
320 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
321 {:ok, object, meta}
322 end
323 end
324
325 # Nothing to do
326 def handle_object_creation(object, meta) do
327 {:ok, object, meta}
328 end
329
330 defp undo_like(nil, object), do: delete_object(object)
331
332 defp undo_like(%Object{} = liked_object, object) do
333 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
334 delete_object(object)
335 end
336 end
337
338 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
339 object.data["object"]
340 |> Object.get_by_ap_id()
341 |> undo_like(object)
342 end
343
344 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
345 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
346 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
347 {:ok, _} <- Repo.delete(object) do
348 :ok
349 end
350 end
351
352 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
353 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
354 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
355 {:ok, _} <- Repo.delete(object) do
356 :ok
357 end
358 end
359
360 def handle_undoing(
361 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
362 ) do
363 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
364 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
365 {:ok, _} <- User.unblock(blocker, blocked),
366 {:ok, _} <- Repo.delete(object) do
367 :ok
368 end
369 end
370
371 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
372
373 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
374 defp delete_object(object) do
375 with {:ok, _} <- Repo.delete(object), do: :ok
376 end
377
378 defp send_notifications(meta) do
379 Keyword.get(meta, :notifications, [])
380 |> Enum.each(fn notification ->
381 Streamer.stream(["user", "user:notification"], notification)
382 Push.send(notification)
383 end)
384
385 meta
386 end
387
388 defp send_streamables(meta) do
389 Keyword.get(meta, :streamables, [])
390 |> Enum.each(fn {topics, items} ->
391 Streamer.stream(topics, items)
392 end)
393
394 meta
395 end
396
397 defp add_streamables(meta, streamables) do
398 existing = Keyword.get(meta, :streamables, [])
399
400 meta
401 |> Keyword.put(:streamables, streamables ++ existing)
402 end
403
404 defp add_notifications(meta, notifications) do
405 existing = Keyword.get(meta, :notifications, [])
406
407 meta
408 |> Keyword.put(:notifications, notifications ++ existing)
409 end
410
411 def handle_after_transaction(meta) do
412 meta
413 |> send_notifications()
414 |> send_streamables()
415 end
416 end