Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into features/validators...
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
11 alias Pleroma.Chat
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25
26 require Logger
27
28 def handle(object, meta \\ [])
29
30 # Task this handles
31 # - Follows
32 # - Sends a notification
33 def handle(
34 %{
35 data: %{
36 "actor" => actor,
37 "type" => "Accept",
38 "object" => follow_activity_id
39 }
40 } = object,
41 meta
42 ) do
43 with %Activity{actor: follower_id} = follow_activity <-
44 Activity.get_by_ap_id(follow_activity_id),
45 %User{} = followed <- User.get_cached_by_ap_id(actor),
46 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
47 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
48 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
49 Notification.update_notification_type(followed, follow_activity)
50 User.update_follower_count(followed)
51 User.update_following_count(follower)
52 end
53
54 {:ok, object, meta}
55 end
56
57 # Task this handles
58 # - Rejects all existing follow activities for this person
59 # - Updates the follow state
60 # - Dismisses notification
61 def handle(
62 %{
63 data: %{
64 "actor" => actor,
65 "type" => "Reject",
66 "object" => follow_activity_id
67 }
68 } = object,
69 meta
70 ) do
71 with %Activity{actor: follower_id} = follow_activity <-
72 Activity.get_by_ap_id(follow_activity_id),
73 %User{} = followed <- User.get_cached_by_ap_id(actor),
74 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
75 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
76 FollowingRelationship.update(follower, followed, :follow_reject)
77 Notification.dismiss(follow_activity)
78 end
79
80 {:ok, object, meta}
81 end
82
83 # Tasks this handle
84 # - Follows if possible
85 # - Sends a notification
86 # - Generates accept or reject if appropriate
87 def handle(
88 %{
89 data: %{
90 "id" => follow_id,
91 "type" => "Follow",
92 "object" => followed_user,
93 "actor" => following_user
94 }
95 } = object,
96 meta
97 ) do
98 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
99 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
100 {_, {:ok, _}, _, _} <-
101 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
102 if followed.local && !followed.locked do
103 {:ok, accept_data, _} = Builder.accept(followed, object)
104 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
105 end
106 else
107 {:following, {:error, _}, _follower, followed} ->
108 {:ok, reject_data, _} = Builder.reject(followed, object)
109 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
110
111 _ ->
112 nil
113 end
114
115 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
116
117 meta =
118 meta
119 |> add_notifications(notifications)
120
121 updated_object = Activity.get_by_ap_id(follow_id)
122
123 {:ok, updated_object, meta}
124 end
125
126 # Tasks this handles:
127 # - Unfollow and block
128 def handle(
129 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
130 object,
131 meta
132 ) do
133 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
134 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
135 User.block(blocker, blocked)
136 end
137
138 {:ok, object, meta}
139 end
140
141 # Tasks this handles:
142 # - Update the user
143 #
144 # For a local user, we also get a changeset with the full information, so we
145 # can update non-federating, non-activitypub settings as well.
146 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
147 if changeset = Keyword.get(meta, :user_update_changeset) do
148 changeset
149 |> User.update_and_set_cache()
150 else
151 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
152
153 User.get_by_ap_id(updated_object["id"])
154 |> User.remote_user_changeset(new_user_data)
155 |> User.update_and_set_cache()
156 end
157
158 {:ok, object, meta}
159 end
160
161 # Tasks this handles:
162 # - Add like to object
163 # - Set up notification
164 def handle(%{data: %{"type" => "Like"}} = object, meta) do
165 liked_object = Object.get_by_ap_id(object.data["object"])
166 Utils.add_like_to_object(object, liked_object)
167
168 Notification.create_notifications(object)
169
170 {:ok, object, meta}
171 end
172
173 # Tasks this handles
174 # - Actually create object
175 # - Rollback if we couldn't create it
176 # - Increase the user note count
177 # - Increase the reply count
178 # - Increase replies count
179 # - Set up ActivityExpiration
180 # - Set up notifications
181 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
182 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
183 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
184 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
185 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
186
187 if in_reply_to = object.data["inReplyTo"] do
188 Object.increase_replies_count(in_reply_to)
189 end
190
191 if expires_at = activity.data["expires_at"] do
192 ActivityExpiration.create(activity, expires_at)
193 end
194
195 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
196
197 meta =
198 meta
199 |> add_notifications(notifications)
200
201 {:ok, activity, meta}
202 else
203 e -> Repo.rollback(e)
204 end
205 end
206
207 # Tasks this handles:
208 # - Add announce to object
209 # - Set up notification
210 # - Stream out the announce
211 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
212 announced_object = Object.get_by_ap_id(object.data["object"])
213 user = User.get_cached_by_ap_id(object.data["actor"])
214
215 Utils.add_announce_to_object(object, announced_object)
216
217 if !User.is_internal_user?(user) do
218 Notification.create_notifications(object)
219
220 object
221 |> Topics.get_activity_topics()
222 |> Streamer.stream(object)
223 end
224
225 {:ok, object, meta}
226 end
227
228 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
229 with undone_object <- Activity.get_by_ap_id(undone_object),
230 :ok <- handle_undoing(undone_object) do
231 {:ok, object, meta}
232 end
233 end
234
235 # Tasks this handles:
236 # - Add reaction to object
237 # - Set up notification
238 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
239 reacted_object = Object.get_by_ap_id(object.data["object"])
240 Utils.add_emoji_reaction_to_object(object, reacted_object)
241
242 Notification.create_notifications(object)
243
244 {:ok, object, meta}
245 end
246
247 # Tasks this handles:
248 # - Delete and unpins the create activity
249 # - Replace object with Tombstone
250 # - Set up notification
251 # - Reduce the user note count
252 # - Reduce the reply count
253 # - Stream out the activity
254 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
255 deleted_object =
256 Object.normalize(deleted_object, false) ||
257 User.get_cached_by_ap_id(deleted_object)
258
259 result =
260 case deleted_object do
261 %Object{} ->
262 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
263 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
264 %User{} = user <- User.get_cached_by_ap_id(actor) do
265 User.remove_pinnned_activity(user, activity)
266
267 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
268
269 if in_reply_to = deleted_object.data["inReplyTo"] do
270 Object.decrease_replies_count(in_reply_to)
271 end
272
273 MessageReference.delete_for_object(deleted_object)
274
275 ActivityPub.stream_out(object)
276 ActivityPub.stream_out_participations(deleted_object, user)
277 :ok
278 else
279 {:actor, _} ->
280 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
281 :no_object_actor
282 end
283
284 %User{} ->
285 with {:ok, _} <- User.delete(deleted_object) do
286 :ok
287 end
288 end
289
290 if result == :ok do
291 Notification.create_notifications(object)
292 {:ok, object, meta}
293 else
294 {:error, result}
295 end
296 end
297
298 # Nothing to do
299 def handle(object, meta) do
300 {:ok, object, meta}
301 end
302
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
307
308 streamables =
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
311 if user.local do
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
314
315 {
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
318 }
319 end
320 end)
321 |> Enum.filter(& &1)
322
323 meta =
324 meta
325 |> add_streamables(streamables)
326
327 {:ok, object, meta}
328 end
329 end
330
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
335 object.data["name"],
336 object.data["actor"]
337 )
338
339 {:ok, object, meta}
340 end
341 end
342
343 def handle_object_creation(%{"type" => objtype} = object, meta)
344 when objtype in ~w[Audio Question] do
345 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
346 {:ok, object, meta}
347 end
348 end
349
350 # Nothing to do
351 def handle_object_creation(object, meta) do
352 {:ok, object, meta}
353 end
354
355 defp undo_like(nil, object), do: delete_object(object)
356
357 defp undo_like(%Object{} = liked_object, object) do
358 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
359 delete_object(object)
360 end
361 end
362
363 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
364 object.data["object"]
365 |> Object.get_by_ap_id()
366 |> undo_like(object)
367 end
368
369 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
370 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
371 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
372 {:ok, _} <- Repo.delete(object) do
373 :ok
374 end
375 end
376
377 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
378 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
379 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
380 {:ok, _} <- Repo.delete(object) do
381 :ok
382 end
383 end
384
385 def handle_undoing(
386 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
387 ) do
388 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
389 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
390 {:ok, _} <- User.unblock(blocker, blocked),
391 {:ok, _} <- Repo.delete(object) do
392 :ok
393 end
394 end
395
396 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
397
398 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
399 defp delete_object(object) do
400 with {:ok, _} <- Repo.delete(object), do: :ok
401 end
402
403 defp send_notifications(meta) do
404 Keyword.get(meta, :notifications, [])
405 |> Enum.each(fn notification ->
406 Streamer.stream(["user", "user:notification"], notification)
407 Push.send(notification)
408 end)
409
410 meta
411 end
412
413 defp send_streamables(meta) do
414 Keyword.get(meta, :streamables, [])
415 |> Enum.each(fn {topics, items} ->
416 Streamer.stream(topics, items)
417 end)
418
419 meta
420 end
421
422 defp add_streamables(meta, streamables) do
423 existing = Keyword.get(meta, :streamables, [])
424
425 meta
426 |> Keyword.put(:streamables, streamables ++ existing)
427 end
428
429 defp add_notifications(meta, notifications) do
430 existing = Keyword.get(meta, :notifications, [])
431
432 meta
433 |> Keyword.put(:notifications, notifications ++ existing)
434 end
435
436 def handle_after_transaction(meta) do
437 meta
438 |> send_notifications()
439 |> send_streamables()
440 end
441 end