Merge remote-tracking branch 'pleroma/develop' into features/poll-validation
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Pipeline
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Push
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Workers.BackgroundWorker
24
25 def handle(object, meta \\ [])
26
27 # Tasks this handle
28 # - Follows if possible
29 # - Sends a notification
30 # - Generates accept or reject if appropriate
31 def handle(
32 %{
33 data: %{
34 "id" => follow_id,
35 "type" => "Follow",
36 "object" => followed_user,
37 "actor" => following_user
38 }
39 } = object,
40 meta
41 ) do
42 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
43 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
44 {_, {:ok, _}, _, _} <-
45 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
46 if followed.local && !followed.locked do
47 Utils.update_follow_state_for_all(object, "accept")
48 FollowingRelationship.update(follower, followed, :follow_accept)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
51
52 %{
53 to: [following_user],
54 actor: followed,
55 object: follow_id,
56 local: true
57 }
58 |> ActivityPub.accept()
59 end
60 else
61 {:following, {:error, _}, follower, followed} ->
62 Utils.update_follow_state_for_all(object, "reject")
63 FollowingRelationship.update(follower, followed, :follow_reject)
64
65 if followed.local do
66 %{
67 to: [follower.ap_id],
68 actor: followed,
69 object: follow_id,
70 local: true
71 }
72 |> ActivityPub.reject()
73 end
74
75 _ ->
76 nil
77 end
78
79 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
80
81 meta =
82 meta
83 |> add_notifications(notifications)
84
85 updated_object = Activity.get_by_ap_id(follow_id)
86
87 {:ok, updated_object, meta}
88 end
89
90 # Tasks this handles:
91 # - Unfollow and block
92 def handle(
93 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
94 object,
95 meta
96 ) do
97 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
98 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
99 User.block(blocker, blocked)
100 end
101
102 {:ok, object, meta}
103 end
104
105 # Tasks this handles:
106 # - Update the user
107 #
108 # For a local user, we also get a changeset with the full information, so we
109 # can update non-federating, non-activitypub settings as well.
110 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
111 if changeset = Keyword.get(meta, :user_update_changeset) do
112 changeset
113 |> User.update_and_set_cache()
114 else
115 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
116
117 User.get_by_ap_id(updated_object["id"])
118 |> User.remote_user_changeset(new_user_data)
119 |> User.update_and_set_cache()
120 end
121
122 {:ok, object, meta}
123 end
124
125 # Tasks this handles:
126 # - Add like to object
127 # - Set up notification
128 def handle(%{data: %{"type" => "Like"}} = object, meta) do
129 liked_object = Object.get_by_ap_id(object.data["object"])
130 Utils.add_like_to_object(object, liked_object)
131
132 Notification.create_notifications(object)
133
134 {:ok, object, meta}
135 end
136
137 # Tasks this handles
138 # - Actually create object
139 # - Rollback if we couldn't create it
140 # - Increase the user note count
141 # - Increase the reply count
142 # - Increase replies count
143 # - Set up ActivityExpiration
144 # - Set up notifications
145 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
146 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
147 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
148 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
149 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
150
151 if in_reply_to = object.data["inReplyTo"] do
152 Object.increase_replies_count(in_reply_to)
153 end
154
155 if expires_at = activity.data["expires_at"] do
156 ActivityExpiration.create(activity, expires_at)
157 end
158
159 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
160
161 meta =
162 meta
163 |> add_notifications(notifications)
164
165 {:ok, activity, meta}
166 else
167 e -> Repo.rollback(e)
168 end
169 end
170
171 # Tasks this handles:
172 # - Add announce to object
173 # - Set up notification
174 # - Stream out the announce
175 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
176 announced_object = Object.get_by_ap_id(object.data["object"])
177 user = User.get_cached_by_ap_id(object.data["actor"])
178
179 Utils.add_announce_to_object(object, announced_object)
180
181 if !User.is_internal_user?(user) do
182 Notification.create_notifications(object)
183
184 object
185 |> Topics.get_activity_topics()
186 |> Streamer.stream(object)
187 end
188
189 {:ok, object, meta}
190 end
191
192 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
193 with undone_object <- Activity.get_by_ap_id(undone_object),
194 :ok <- handle_undoing(undone_object) do
195 {:ok, object, meta}
196 end
197 end
198
199 # Tasks this handles:
200 # - Add reaction to object
201 # - Set up notification
202 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
203 reacted_object = Object.get_by_ap_id(object.data["object"])
204 Utils.add_emoji_reaction_to_object(object, reacted_object)
205
206 Notification.create_notifications(object)
207
208 {:ok, object, meta}
209 end
210
211 # Tasks this handles:
212 # - Delete and unpins the create activity
213 # - Replace object with Tombstone
214 # - Set up notification
215 # - Reduce the user note count
216 # - Reduce the reply count
217 # - Stream out the activity
218 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
219 deleted_object =
220 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
221
222 result =
223 case deleted_object do
224 %Object{} ->
225 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
226 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
227 User.remove_pinnned_activity(user, activity)
228
229 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
230
231 if in_reply_to = deleted_object.data["inReplyTo"] do
232 Object.decrease_replies_count(in_reply_to)
233 end
234
235 MessageReference.delete_for_object(deleted_object)
236
237 ActivityPub.stream_out(object)
238 ActivityPub.stream_out_participations(deleted_object, user)
239 :ok
240 end
241
242 %User{} ->
243 with {:ok, _} <- User.delete(deleted_object) do
244 :ok
245 end
246 end
247
248 if result == :ok do
249 Notification.create_notifications(object)
250 {:ok, object, meta}
251 else
252 {:error, result}
253 end
254 end
255
256 # Nothing to do
257 def handle(object, meta) do
258 {:ok, object, meta}
259 end
260
261 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
262 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
263 actor = User.get_cached_by_ap_id(object.data["actor"])
264 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
265
266 streamables =
267 [[actor, recipient], [recipient, actor]]
268 |> Enum.map(fn [user, other_user] ->
269 if user.local do
270 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
271 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
272
273 {
274 ["user", "user:pleroma_chat"],
275 {user, %{cm_ref | chat: chat, object: object}}
276 }
277 end
278 end)
279 |> Enum.filter(& &1)
280
281 meta =
282 meta
283 |> add_streamables(streamables)
284
285 {:ok, object, meta}
286 end
287 end
288
289 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
290 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
291 Object.increase_vote_count(
292 object.data["inReplyTo"],
293 object.data["name"],
294 object.data["actor"]
295 )
296
297 {:ok, object, meta}
298 end
299 end
300
301 def handle_object_creation(%{"type" => "Question"} = object, meta) do
302 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
303 {:ok, object, meta}
304 end
305 end
306
307 # Nothing to do
308 def handle_object_creation(object, meta) do
309 {:ok, object, meta}
310 end
311
312 defp undo_like(nil, object), do: delete_object(object)
313
314 defp undo_like(%Object{} = liked_object, object) do
315 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
316 delete_object(object)
317 end
318 end
319
320 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
321 object.data["object"]
322 |> Object.get_by_ap_id()
323 |> undo_like(object)
324 end
325
326 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
327 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
328 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
329 {:ok, _} <- Repo.delete(object) do
330 :ok
331 end
332 end
333
334 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
335 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
336 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
337 {:ok, _} <- Repo.delete(object) do
338 :ok
339 end
340 end
341
342 def handle_undoing(
343 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
344 ) do
345 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
346 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
347 {:ok, _} <- User.unblock(blocker, blocked),
348 {:ok, _} <- Repo.delete(object) do
349 :ok
350 end
351 end
352
353 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
354
355 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
356 defp delete_object(object) do
357 with {:ok, _} <- Repo.delete(object), do: :ok
358 end
359
360 defp send_notifications(meta) do
361 Keyword.get(meta, :notifications, [])
362 |> Enum.each(fn notification ->
363 Streamer.stream(["user", "user:notification"], notification)
364 Push.send(notification)
365 end)
366
367 meta
368 end
369
370 defp send_streamables(meta) do
371 Keyword.get(meta, :streamables, [])
372 |> Enum.each(fn {topics, items} ->
373 Streamer.stream(topics, items)
374 end)
375
376 meta
377 end
378
379 defp add_streamables(meta, streamables) do
380 existing = Keyword.get(meta, :streamables, [])
381
382 meta
383 |> Keyword.put(:streamables, streamables ++ existing)
384 end
385
386 defp add_notifications(meta, notifications) do
387 existing = Keyword.get(meta, :notifications, [])
388
389 meta
390 |> Keyword.put(:notifications, notifications ++ existing)
391 end
392
393 def handle_after_transaction(meta) do
394 meta
395 |> send_notifications()
396 |> send_streamables()
397 end
398 end