Merge branch 'develop' into refactor/deactivated_user_field
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36 @impl true
37 def handle(object, meta \\ [])
38
39 # Task this handles
40 # - Follows
41 # - Sends a notification
42 @impl true
43 def handle(
44 %{
45 data: %{
46 "actor" => actor,
47 "type" => "Accept",
48 "object" => follow_activity_id
49 }
50 } = object,
51 meta
52 ) do
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
61 end
62
63 {:ok, object, meta}
64 end
65
66 # Task this handles
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
70 @impl true
71 def handle(
72 %{
73 data: %{
74 "actor" => actor,
75 "type" => "Reject",
76 "object" => follow_activity_id
77 }
78 } = object,
79 meta
80 ) do
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
88 end
89
90 {:ok, object, meta}
91 end
92
93 # Tasks this handle
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
97 @impl true
98 def handle(
99 %{
100 data: %{
101 "id" => follow_id,
102 "type" => "Follow",
103 "object" => followed_user,
104 "actor" => following_user
105 }
106 } = object,
107 meta
108 ) do
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 end
117 else
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
121
122 _ ->
123 nil
124 end
125
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
127
128 meta =
129 meta
130 |> add_notifications(notifications)
131
132 updated_object = Activity.get_by_ap_id(follow_id)
133
134 {:ok, updated_object, meta}
135 end
136
137 # Tasks this handles:
138 # - Unfollow and block
139 @impl true
140 def handle(
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
142 object,
143 meta
144 ) do
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
148 end
149
150 {:ok, object, meta}
151 end
152
153 # Tasks this handles:
154 # - Update the user
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
161 changeset
162 |> User.update_and_set_cache()
163 else
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
165
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
169 end
170
171 {:ok, object, meta}
172 end
173
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
177 @impl true
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
181
182 Notification.create_notifications(object)
183
184 {:ok, object, meta}
185 end
186
187 # Tasks this handles
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
195 @impl true
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
201
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
204 end
205
206 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
207 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
208 end)
209
210 meta =
211 meta
212 |> add_notifications(notifications)
213
214 {:ok, activity, meta}
215 else
216 e -> Repo.rollback(e)
217 end
218 end
219
220 # Tasks this handles:
221 # - Add announce to object
222 # - Set up notification
223 # - Stream out the announce
224 @impl true
225 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
226 announced_object = Object.get_by_ap_id(object.data["object"])
227 user = User.get_cached_by_ap_id(object.data["actor"])
228
229 Utils.add_announce_to_object(object, announced_object)
230
231 if !User.is_internal_user?(user) do
232 Notification.create_notifications(object)
233
234 object
235 |> Topics.get_activity_topics()
236 |> Streamer.stream(object)
237 end
238
239 {:ok, object, meta}
240 end
241
242 @impl true
243 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
244 with undone_object <- Activity.get_by_ap_id(undone_object),
245 :ok <- handle_undoing(undone_object) do
246 {:ok, object, meta}
247 end
248 end
249
250 # Tasks this handles:
251 # - Add reaction to object
252 # - Set up notification
253 @impl true
254 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
255 reacted_object = Object.get_by_ap_id(object.data["object"])
256 Utils.add_emoji_reaction_to_object(object, reacted_object)
257
258 Notification.create_notifications(object)
259
260 {:ok, object, meta}
261 end
262
263 # Tasks this handles:
264 # - Delete and unpins the create activity
265 # - Replace object with Tombstone
266 # - Set up notification
267 # - Reduce the user note count
268 # - Reduce the reply count
269 # - Stream out the activity
270 @impl true
271 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
272 deleted_object =
273 Object.normalize(deleted_object, fetch: false) ||
274 User.get_cached_by_ap_id(deleted_object)
275
276 result =
277 case deleted_object do
278 %Object{} ->
279 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
280 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
281 %User{} = user <- User.get_cached_by_ap_id(actor) do
282 User.remove_pinnned_activity(user, activity)
283
284 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
285
286 if in_reply_to = deleted_object.data["inReplyTo"] do
287 Object.decrease_replies_count(in_reply_to)
288 end
289
290 MessageReference.delete_for_object(deleted_object)
291
292 @ap_streamer.stream_out(object)
293 @ap_streamer.stream_out_participations(deleted_object, user)
294 :ok
295 else
296 {:actor, _} ->
297 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
298 :no_object_actor
299 end
300
301 %User{} ->
302 with {:ok, _} <- User.delete(deleted_object) do
303 :ok
304 end
305 end
306
307 if result == :ok do
308 Notification.create_notifications(object)
309 {:ok, object, meta}
310 else
311 {:error, result}
312 end
313 end
314
315 # Nothing to do
316 @impl true
317 def handle(object, meta) do
318 {:ok, object, meta}
319 end
320
321 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
322 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
323 actor = User.get_cached_by_ap_id(object.data["actor"])
324 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
325
326 streamables =
327 [[actor, recipient], [recipient, actor]]
328 |> Enum.uniq()
329 |> Enum.map(fn [user, other_user] ->
330 if user.local do
331 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
332 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
333
334 @cachex.put(
335 :chat_message_id_idempotency_key_cache,
336 cm_ref.id,
337 meta[:idempotency_key]
338 )
339
340 {
341 ["user", "user:pleroma_chat"],
342 {user, %{cm_ref | chat: chat, object: object}}
343 }
344 end
345 end)
346 |> Enum.filter(& &1)
347
348 meta =
349 meta
350 |> add_streamables(streamables)
351
352 {:ok, object, meta}
353 end
354 end
355
356 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
357 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
358 Object.increase_vote_count(
359 object.data["inReplyTo"],
360 object.data["name"],
361 object.data["actor"]
362 )
363
364 {:ok, object, meta}
365 end
366 end
367
368 def handle_object_creation(%{"type" => objtype} = object, meta)
369 when objtype in ~w[Audio Video Question Event Article] do
370 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
371 {:ok, object, meta}
372 end
373 end
374
375 # Nothing to do
376 def handle_object_creation(object, meta) do
377 {:ok, object, meta}
378 end
379
380 defp undo_like(nil, object), do: delete_object(object)
381
382 defp undo_like(%Object{} = liked_object, object) do
383 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
384 delete_object(object)
385 end
386 end
387
388 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
389 object.data["object"]
390 |> Object.get_by_ap_id()
391 |> undo_like(object)
392 end
393
394 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
395 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
396 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
397 {:ok, _} <- Repo.delete(object) do
398 :ok
399 end
400 end
401
402 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
403 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
404 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
405 {:ok, _} <- Repo.delete(object) do
406 :ok
407 end
408 end
409
410 def handle_undoing(
411 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
412 ) do
413 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
414 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
415 {:ok, _} <- User.unblock(blocker, blocked),
416 {:ok, _} <- Repo.delete(object) do
417 :ok
418 end
419 end
420
421 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
422
423 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
424 defp delete_object(object) do
425 with {:ok, _} <- Repo.delete(object), do: :ok
426 end
427
428 defp send_notifications(meta) do
429 Keyword.get(meta, :notifications, [])
430 |> Enum.each(fn notification ->
431 Streamer.stream(["user", "user:notification"], notification)
432 Push.send(notification)
433 end)
434
435 meta
436 end
437
438 defp send_streamables(meta) do
439 Keyword.get(meta, :streamables, [])
440 |> Enum.each(fn {topics, items} ->
441 Streamer.stream(topics, items)
442 end)
443
444 meta
445 end
446
447 defp add_streamables(meta, streamables) do
448 existing = Keyword.get(meta, :streamables, [])
449
450 meta
451 |> Keyword.put(:streamables, streamables ++ existing)
452 end
453
454 defp add_notifications(meta, notifications) do
455 existing = Keyword.get(meta, :notifications, [])
456
457 meta
458 |> Keyword.put(:notifications, notifications ++ existing)
459 end
460
461 @impl true
462 def handle_after_transaction(meta) do
463 meta
464 |> send_notifications()
465 |> send_streamables()
466 end
467 end