cb54eb89aeca810d83e10a6bff31e57fad66ceba
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do
6 @callback handle(map(), keyword()) :: {:ok, map(), keyword()} | {:error, any()}
7 @callback handle_after_transaction(map()) :: map()
8 end
9
10 defmodule Pleroma.Web.ActivityPub.SideEffects do
11 @moduledoc """
12 This module looks at an inserted object and executes the side effects that it
13 implies. For example, a `Like` activity will increase the like count on the
14 liked object, a `Follow` activity will add the user to the follower
15 collection, and so on.
16 """
17 alias Pleroma.Activity
18 alias Pleroma.Activity.Ir.Topics
19 alias Pleroma.Chat
20 alias Pleroma.Chat.MessageReference
21 alias Pleroma.FollowingRelationship
22 alias Pleroma.Notification
23 alias Pleroma.Object
24 alias Pleroma.Repo
25 alias Pleroma.User
26 alias Pleroma.Web.ActivityPub.ActivityPub
27 alias Pleroma.Web.ActivityPub.Builder
28 alias Pleroma.Web.ActivityPub.Pipeline
29 alias Pleroma.Web.ActivityPub.Utils
30 alias Pleroma.Web.Push
31 alias Pleroma.Web.Streamer
32
33 require Logger
34
35 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
36
37 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
38
39 @impl true
40 def handle(object, meta \\ [])
41
42 # Task this handles
43 # - Follows
44 # - Sends a notification
45 @impl true
46 def handle(
47 %{
48 data: %{
49 "actor" => actor,
50 "type" => "Accept",
51 "object" => follow_activity_id
52 }
53 } = object,
54 meta
55 ) do
56 with %Activity{actor: follower_id} = follow_activity <-
57 Activity.get_by_ap_id(follow_activity_id),
58 %User{} = followed <- User.get_cached_by_ap_id(actor),
59 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
60 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
61 {:ok, _follower, followed} <-
62 FollowingRelationship.update(follower, followed, :follow_accept) do
63 Notification.update_notification_type(followed, follow_activity)
64 end
65
66 {:ok, object, meta}
67 end
68
69 # Task this handles
70 # - Rejects all existing follow activities for this person
71 # - Updates the follow state
72 # - Dismisses notification
73 @impl true
74 def handle(
75 %{
76 data: %{
77 "actor" => actor,
78 "type" => "Reject",
79 "object" => follow_activity_id
80 }
81 } = object,
82 meta
83 ) do
84 with %Activity{actor: follower_id} = follow_activity <-
85 Activity.get_by_ap_id(follow_activity_id),
86 %User{} = followed <- User.get_cached_by_ap_id(actor),
87 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
88 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
89 FollowingRelationship.update(follower, followed, :follow_reject)
90 Notification.dismiss(follow_activity)
91 end
92
93 {:ok, object, meta}
94 end
95
96 # Tasks this handle
97 # - Follows if possible
98 # - Sends a notification
99 # - Generates accept or reject if appropriate
100 @impl true
101 def handle(
102 %{
103 data: %{
104 "id" => follow_id,
105 "type" => "Follow",
106 "object" => followed_user,
107 "actor" => following_user
108 }
109 } = object,
110 meta
111 ) do
112 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
113 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
114 {_, {:ok, _, _}, _, _} <-
115 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
116 if followed.local && !followed.is_locked do
117 {:ok, accept_data, _} = Builder.accept(followed, object)
118 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 end
120 else
121 {:following, {:error, _}, _follower, followed} ->
122 {:ok, reject_data, _} = Builder.reject(followed, object)
123 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
124
125 _ ->
126 nil
127 end
128
129 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
130
131 meta =
132 meta
133 |> add_notifications(notifications)
134
135 updated_object = Activity.get_by_ap_id(follow_id)
136
137 {:ok, updated_object, meta}
138 end
139
140 # Tasks this handles:
141 # - Unfollow and block
142 @impl true
143 def handle(
144 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
145 object,
146 meta
147 ) do
148 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
149 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
150 User.block(blocker, blocked)
151 end
152
153 {:ok, object, meta}
154 end
155
156 # Tasks this handles:
157 # - Update the user
158 #
159 # For a local user, we also get a changeset with the full information, so we
160 # can update non-federating, non-activitypub settings as well.
161 @impl true
162 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
163 if changeset = Keyword.get(meta, :user_update_changeset) do
164 changeset
165 |> User.update_and_set_cache()
166 else
167 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
168
169 User.get_by_ap_id(updated_object["id"])
170 |> User.remote_user_changeset(new_user_data)
171 |> User.update_and_set_cache()
172 end
173
174 {:ok, object, meta}
175 end
176
177 # Tasks this handles:
178 # - Add like to object
179 # - Set up notification
180 @impl true
181 def handle(%{data: %{"type" => "Like"}} = object, meta) do
182 liked_object = Object.get_by_ap_id(object.data["object"])
183 Utils.add_like_to_object(object, liked_object)
184
185 Notification.create_notifications(object)
186
187 {:ok, object, meta}
188 end
189
190 # Tasks this handles
191 # - Actually create object
192 # - Rollback if we couldn't create it
193 # - Increase the user note count
194 # - Increase the reply count
195 # - Increase replies count
196 # - Set up ActivityExpiration
197 # - Set up notifications
198 @impl true
199 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
200 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
201 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
202 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
203 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
204
205 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
206 Object.increase_replies_count(in_reply_to)
207 end
208
209 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
210 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
211 end)
212
213 meta =
214 meta
215 |> add_notifications(notifications)
216
217 {:ok, activity, meta}
218 else
219 e -> Repo.rollback(e)
220 end
221 end
222
223 # Tasks this handles:
224 # - Add announce to object
225 # - Set up notification
226 # - Stream out the announce
227 @impl true
228 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
229 announced_object = Object.get_by_ap_id(object.data["object"])
230 user = User.get_cached_by_ap_id(object.data["actor"])
231
232 Utils.add_announce_to_object(object, announced_object)
233
234 if !User.is_internal_user?(user) do
235 Notification.create_notifications(object)
236
237 object
238 |> Topics.get_activity_topics()
239 |> Streamer.stream(object)
240 end
241
242 {:ok, object, meta}
243 end
244
245 @impl true
246 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
247 with undone_object <- Activity.get_by_ap_id(undone_object),
248 :ok <- handle_undoing(undone_object) do
249 {:ok, object, meta}
250 end
251 end
252
253 # Tasks this handles:
254 # - Add reaction to object
255 # - Set up notification
256 @impl true
257 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
258 reacted_object = Object.get_by_ap_id(object.data["object"])
259 Utils.add_emoji_reaction_to_object(object, reacted_object)
260
261 Notification.create_notifications(object)
262
263 {:ok, object, meta}
264 end
265
266 # Tasks this handles:
267 # - Delete and unpins the create activity
268 # - Replace object with Tombstone
269 # - Set up notification
270 # - Reduce the user note count
271 # - Reduce the reply count
272 # - Stream out the activity
273 @impl true
274 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
275 deleted_object =
276 Object.normalize(deleted_object, false) ||
277 User.get_cached_by_ap_id(deleted_object)
278
279 result =
280 case deleted_object do
281 %Object{} ->
282 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
283 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
284 %User{} = user <- User.get_cached_by_ap_id(actor) do
285 User.remove_pinnned_activity(user, activity)
286
287 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
288
289 if in_reply_to = deleted_object.data["inReplyTo"] do
290 Object.decrease_replies_count(in_reply_to)
291 end
292
293 MessageReference.delete_for_object(deleted_object)
294
295 ActivityPub.stream_out(object)
296 ActivityPub.stream_out_participations(deleted_object, user)
297 :ok
298 else
299 {:actor, _} ->
300 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
301 :no_object_actor
302 end
303
304 %User{} ->
305 with {:ok, _} <- User.delete(deleted_object) do
306 :ok
307 end
308 end
309
310 if result == :ok do
311 Notification.create_notifications(object)
312 {:ok, object, meta}
313 else
314 {:error, result}
315 end
316 end
317
318 # Nothing to do
319 @impl true
320 def handle(object, meta) do
321 {:ok, object, meta}
322 end
323
324 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
325 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
326 actor = User.get_cached_by_ap_id(object.data["actor"])
327 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
328
329 streamables =
330 [[actor, recipient], [recipient, actor]]
331 |> Enum.uniq()
332 |> Enum.map(fn [user, other_user] ->
333 if user.local do
334 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
335 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
336
337 @cachex.put(
338 :chat_message_id_idempotency_key_cache,
339 cm_ref.id,
340 meta[:idempotency_key]
341 )
342
343 {
344 ["user", "user:pleroma_chat"],
345 {user, %{cm_ref | chat: chat, object: object}}
346 }
347 end
348 end)
349 |> Enum.filter(& &1)
350
351 meta =
352 meta
353 |> add_streamables(streamables)
354
355 {:ok, object, meta}
356 end
357 end
358
359 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
360 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
361 Object.increase_vote_count(
362 object.data["inReplyTo"],
363 object.data["name"],
364 object.data["actor"]
365 )
366
367 {:ok, object, meta}
368 end
369 end
370
371 def handle_object_creation(%{"type" => objtype} = object, meta)
372 when objtype in ~w[Audio Video Question Event Article] do
373 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
374 {:ok, object, meta}
375 end
376 end
377
378 # Nothing to do
379 def handle_object_creation(object, meta) do
380 {:ok, object, meta}
381 end
382
383 defp undo_like(nil, object), do: delete_object(object)
384
385 defp undo_like(%Object{} = liked_object, object) do
386 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
387 delete_object(object)
388 end
389 end
390
391 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
392 object.data["object"]
393 |> Object.get_by_ap_id()
394 |> undo_like(object)
395 end
396
397 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
398 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
399 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
400 {:ok, _} <- Repo.delete(object) do
401 :ok
402 end
403 end
404
405 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
406 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
407 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
408 {:ok, _} <- Repo.delete(object) do
409 :ok
410 end
411 end
412
413 def handle_undoing(
414 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
415 ) do
416 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
417 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
418 {:ok, _} <- User.unblock(blocker, blocked),
419 {:ok, _} <- Repo.delete(object) do
420 :ok
421 end
422 end
423
424 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
425
426 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
427 defp delete_object(object) do
428 with {:ok, _} <- Repo.delete(object), do: :ok
429 end
430
431 defp send_notifications(meta) do
432 Keyword.get(meta, :notifications, [])
433 |> Enum.each(fn notification ->
434 Streamer.stream(["user", "user:notification"], notification)
435 Push.send(notification)
436 end)
437
438 meta
439 end
440
441 defp send_streamables(meta) do
442 Keyword.get(meta, :streamables, [])
443 |> Enum.each(fn {topics, items} ->
444 Streamer.stream(topics, items)
445 end)
446
447 meta
448 end
449
450 defp add_streamables(meta, streamables) do
451 existing = Keyword.get(meta, :streamables, [])
452
453 meta
454 |> Keyword.put(:streamables, streamables ++ existing)
455 end
456
457 defp add_notifications(meta, notifications) do
458 existing = Keyword.get(meta, :notifications, [])
459
460 meta
461 |> Keyword.put(:notifications, notifications ++ existing)
462 end
463
464 @impl true
465 def handle_after_transaction(meta) do
466 meta
467 |> send_notifications()
468 |> send_streamables()
469 end
470 end