Pipeline Ingestion: Note
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36 @impl true
37 def handle(object, meta \\ [])
38
39 # Task this handles
40 # - Follows
41 # - Sends a notification
42 @impl true
43 def handle(
44 %{
45 data: %{
46 "actor" => actor,
47 "type" => "Accept",
48 "object" => follow_activity_id
49 }
50 } = object,
51 meta
52 ) do
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
61 end
62
63 {:ok, object, meta}
64 end
65
66 # Task this handles
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
70 @impl true
71 def handle(
72 %{
73 data: %{
74 "actor" => actor,
75 "type" => "Reject",
76 "object" => follow_activity_id
77 }
78 } = object,
79 meta
80 ) do
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
88 end
89
90 {:ok, object, meta}
91 end
92
93 # Tasks this handle
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
97 @impl true
98 def handle(
99 %{
100 data: %{
101 "id" => follow_id,
102 "type" => "Follow",
103 "object" => followed_user,
104 "actor" => following_user
105 }
106 } = object,
107 meta
108 ) do
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 end
117 else
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
121
122 _ ->
123 nil
124 end
125
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
127
128 meta =
129 meta
130 |> add_notifications(notifications)
131
132 updated_object = Activity.get_by_ap_id(follow_id)
133
134 {:ok, updated_object, meta}
135 end
136
137 # Tasks this handles:
138 # - Unfollow and block
139 @impl true
140 def handle(
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
142 object,
143 meta
144 ) do
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
148 end
149
150 {:ok, object, meta}
151 end
152
153 # Tasks this handles:
154 # - Update the user
155 #
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
158 @impl true
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
161 changeset
162 |> User.update_and_set_cache()
163 else
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
165
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
169 end
170
171 {:ok, object, meta}
172 end
173
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
177 @impl true
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
181
182 Notification.create_notifications(object)
183
184 {:ok, object, meta}
185 end
186
187 # Tasks this handles
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
195 @impl true
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
201
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
204 end
205
206 reply_depth = (meta[:depth] || 0) + 1
207
208 # FIXME: Force inReplyTo to replies
209 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
210 object.data["replies"] != nil do
211 for reply_id <- object.data["replies"] do
212 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
213 "id" => reply_id,
214 "depth" => reply_depth
215 })
216 end
217 end
218
219 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
220 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
221 end)
222
223 meta =
224 meta
225 |> add_notifications(notifications)
226
227 {:ok, activity, meta}
228 else
229 e -> Repo.rollback(e)
230 end
231 end
232
233 # Tasks this handles:
234 # - Add announce to object
235 # - Set up notification
236 # - Stream out the announce
237 @impl true
238 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
239 announced_object = Object.get_by_ap_id(object.data["object"])
240 user = User.get_cached_by_ap_id(object.data["actor"])
241
242 Utils.add_announce_to_object(object, announced_object)
243
244 if !User.is_internal_user?(user) do
245 Notification.create_notifications(object)
246
247 object
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(object)
250 end
251
252 {:ok, object, meta}
253 end
254
255 @impl true
256 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
257 with undone_object <- Activity.get_by_ap_id(undone_object),
258 :ok <- handle_undoing(undone_object) do
259 {:ok, object, meta}
260 end
261 end
262
263 # Tasks this handles:
264 # - Add reaction to object
265 # - Set up notification
266 @impl true
267 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
268 reacted_object = Object.get_by_ap_id(object.data["object"])
269 Utils.add_emoji_reaction_to_object(object, reacted_object)
270
271 Notification.create_notifications(object)
272
273 {:ok, object, meta}
274 end
275
276 # Tasks this handles:
277 # - Delete and unpins the create activity
278 # - Replace object with Tombstone
279 # - Set up notification
280 # - Reduce the user note count
281 # - Reduce the reply count
282 # - Stream out the activity
283 @impl true
284 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
285 deleted_object =
286 Object.normalize(deleted_object, fetch: false) ||
287 User.get_cached_by_ap_id(deleted_object)
288
289 result =
290 case deleted_object do
291 %Object{} ->
292 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
293 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
294 %User{} = user <- User.get_cached_by_ap_id(actor) do
295 User.remove_pinnned_activity(user, activity)
296
297 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
298
299 if in_reply_to = deleted_object.data["inReplyTo"] do
300 Object.decrease_replies_count(in_reply_to)
301 end
302
303 MessageReference.delete_for_object(deleted_object)
304
305 @ap_streamer.stream_out(object)
306 @ap_streamer.stream_out_participations(deleted_object, user)
307 :ok
308 else
309 {:actor, _} ->
310 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
311 :no_object_actor
312 end
313
314 %User{} ->
315 with {:ok, _} <- User.delete(deleted_object) do
316 :ok
317 end
318 end
319
320 if result == :ok do
321 Notification.create_notifications(object)
322 {:ok, object, meta}
323 else
324 {:error, result}
325 end
326 end
327
328 # Nothing to do
329 @impl true
330 def handle(object, meta) do
331 {:ok, object, meta}
332 end
333
334 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
335 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
336 actor = User.get_cached_by_ap_id(object.data["actor"])
337 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
338
339 streamables =
340 [[actor, recipient], [recipient, actor]]
341 |> Enum.uniq()
342 |> Enum.map(fn [user, other_user] ->
343 if user.local do
344 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
345 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
346
347 @cachex.put(
348 :chat_message_id_idempotency_key_cache,
349 cm_ref.id,
350 meta[:idempotency_key]
351 )
352
353 {
354 ["user", "user:pleroma_chat"],
355 {user, %{cm_ref | chat: chat, object: object}}
356 }
357 end
358 end)
359 |> Enum.filter(& &1)
360
361 meta =
362 meta
363 |> add_streamables(streamables)
364
365 {:ok, object, meta}
366 end
367 end
368
369 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
370 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
371 Object.increase_vote_count(
372 object.data["inReplyTo"],
373 object.data["name"],
374 object.data["actor"]
375 )
376
377 {:ok, object, meta}
378 end
379 end
380
381 def handle_object_creation(%{"type" => objtype} = object, meta)
382 when objtype in ~w[Audio Video Question Event Article Note] do
383 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
384 {:ok, object, meta}
385 end
386 end
387
388 # Nothing to do
389 def handle_object_creation(object, meta) do
390 {:ok, object, meta}
391 end
392
393 defp undo_like(nil, object), do: delete_object(object)
394
395 defp undo_like(%Object{} = liked_object, object) do
396 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
397 delete_object(object)
398 end
399 end
400
401 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
402 object.data["object"]
403 |> Object.get_by_ap_id()
404 |> undo_like(object)
405 end
406
407 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
408 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
409 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
410 {:ok, _} <- Repo.delete(object) do
411 :ok
412 end
413 end
414
415 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
416 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
417 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
418 {:ok, _} <- Repo.delete(object) do
419 :ok
420 end
421 end
422
423 def handle_undoing(
424 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
425 ) do
426 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
427 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
428 {:ok, _} <- User.unblock(blocker, blocked),
429 {:ok, _} <- Repo.delete(object) do
430 :ok
431 end
432 end
433
434 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
435
436 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
437 defp delete_object(object) do
438 with {:ok, _} <- Repo.delete(object), do: :ok
439 end
440
441 defp send_notifications(meta) do
442 Keyword.get(meta, :notifications, [])
443 |> Enum.each(fn notification ->
444 Streamer.stream(["user", "user:notification"], notification)
445 Push.send(notification)
446 end)
447
448 meta
449 end
450
451 defp send_streamables(meta) do
452 Keyword.get(meta, :streamables, [])
453 |> Enum.each(fn {topics, items} ->
454 Streamer.stream(topics, items)
455 end)
456
457 meta
458 end
459
460 defp add_streamables(meta, streamables) do
461 existing = Keyword.get(meta, :streamables, [])
462
463 meta
464 |> Keyword.put(:streamables, streamables ++ existing)
465 end
466
467 defp add_notifications(meta, notifications) do
468 existing = Keyword.get(meta, :notifications, [])
469
470 meta
471 |> Keyword.put(:notifications, notifications ++ existing)
472 end
473
474 @impl true
475 def handle_after_transaction(meta) do
476 meta
477 |> send_notifications()
478 |> send_streamables()
479 end
480 end