0fff5faf2d4a138978f6ff8c85097574e7f0320d
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Workers.BackgroundWorker
28
29 require Logger
30
31 def handle(object, meta \\ [])
32
33 # Task this handles
34 # - Follows
35 # - Sends a notification
36 def handle(
37 %{
38 data: %{
39 "actor" => actor,
40 "type" => "Accept",
41 "object" => follow_activity_id
42 }
43 } = object,
44 meta
45 ) do
46 with %Activity{actor: follower_id} = follow_activity <-
47 Activity.get_by_ap_id(follow_activity_id),
48 %User{} = followed <- User.get_cached_by_ap_id(actor),
49 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
50 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
51 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
52 Notification.update_notification_type(followed, follow_activity)
53 User.update_follower_count(followed)
54 User.update_following_count(follower)
55 end
56
57 {:ok, object, meta}
58 end
59
60 # Task this handles
61 # - Rejects all existing follow activities for this person
62 # - Updates the follow state
63 # - Dismisses notification
64 def handle(
65 %{
66 data: %{
67 "actor" => actor,
68 "type" => "Reject",
69 "object" => follow_activity_id
70 }
71 } = object,
72 meta
73 ) do
74 with %Activity{actor: follower_id} = follow_activity <-
75 Activity.get_by_ap_id(follow_activity_id),
76 %User{} = followed <- User.get_cached_by_ap_id(actor),
77 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
78 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
79 FollowingRelationship.update(follower, followed, :follow_reject)
80 Notification.dismiss(follow_activity)
81 end
82
83 {:ok, object, meta}
84 end
85
86 # Tasks this handle
87 # - Follows if possible
88 # - Sends a notification
89 # - Generates accept or reject if appropriate
90 def handle(
91 %{
92 data: %{
93 "id" => follow_id,
94 "type" => "Follow",
95 "object" => followed_user,
96 "actor" => following_user
97 }
98 } = object,
99 meta
100 ) do
101 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
102 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
103 {_, {:ok, _}, _, _} <-
104 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
105 if followed.local && !followed.is_locked do
106 {:ok, accept_data, _} = Builder.accept(followed, object)
107 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
108 end
109 else
110 {:following, {:error, _}, _follower, followed} ->
111 {:ok, reject_data, _} = Builder.reject(followed, object)
112 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
113
114 _ ->
115 nil
116 end
117
118 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
119
120 meta =
121 meta
122 |> add_notifications(notifications)
123
124 updated_object = Activity.get_by_ap_id(follow_id)
125
126 {:ok, updated_object, meta}
127 end
128
129 # Tasks this handles:
130 # - Unfollow and block
131 def handle(
132 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
133 object,
134 meta
135 ) do
136 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
137 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
138 User.block(blocker, blocked)
139 end
140
141 {:ok, object, meta}
142 end
143
144 # Tasks this handles:
145 # - Update the user
146 #
147 # For a local user, we also get a changeset with the full information, so we
148 # can update non-federating, non-activitypub settings as well.
149 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
150 if changeset = Keyword.get(meta, :user_update_changeset) do
151 changeset
152 |> User.update_and_set_cache()
153 else
154 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
155
156 User.get_by_ap_id(updated_object["id"])
157 |> User.remote_user_changeset(new_user_data)
158 |> User.update_and_set_cache()
159 end
160
161 {:ok, object, meta}
162 end
163
164 # Tasks this handles:
165 # - Add like to object
166 # - Set up notification
167 def handle(%{data: %{"type" => "Like"}} = object, meta) do
168 liked_object = Object.get_by_ap_id(object.data["object"])
169 Utils.add_like_to_object(object, liked_object)
170
171 Notification.create_notifications(object)
172
173 {:ok, object, meta}
174 end
175
176 # Tasks this handles
177 # - Actually create object
178 # - Rollback if we couldn't create it
179 # - Increase the user note count
180 # - Increase the reply count
181 # - Increase replies count
182 # - Set up ActivityExpiration
183 # - Set up notifications
184 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
185 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
186 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
187 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
188 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
189
190 if in_reply_to = object.data["inReplyTo"] do
191 Object.increase_replies_count(in_reply_to)
192 end
193
194 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
195
196 meta =
197 meta
198 |> add_notifications(notifications)
199
200 {:ok, activity, meta}
201 else
202 e -> Repo.rollback(e)
203 end
204 end
205
206 # Tasks this handles:
207 # - Add announce to object
208 # - Set up notification
209 # - Stream out the announce
210 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
211 announced_object = Object.get_by_ap_id(object.data["object"])
212 user = User.get_cached_by_ap_id(object.data["actor"])
213
214 Utils.add_announce_to_object(object, announced_object)
215
216 if !User.is_internal_user?(user) do
217 Notification.create_notifications(object)
218
219 object
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(object)
222 end
223
224 {:ok, object, meta}
225 end
226
227 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
228 with undone_object <- Activity.get_by_ap_id(undone_object),
229 :ok <- handle_undoing(undone_object) do
230 {:ok, object, meta}
231 end
232 end
233
234 # Tasks this handles:
235 # - Add reaction to object
236 # - Set up notification
237 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
238 reacted_object = Object.get_by_ap_id(object.data["object"])
239 Utils.add_emoji_reaction_to_object(object, reacted_object)
240
241 Notification.create_notifications(object)
242
243 {:ok, object, meta}
244 end
245
246 # Tasks this handles:
247 # - Delete and unpins the create activity
248 # - Replace object with Tombstone
249 # - Set up notification
250 # - Reduce the user note count
251 # - Reduce the reply count
252 # - Stream out the activity
253 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
254 deleted_object =
255 Object.normalize(deleted_object, false) ||
256 User.get_cached_by_ap_id(deleted_object)
257
258 result =
259 case deleted_object do
260 %Object{} ->
261 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
262 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
263 %User{} = user <- User.get_cached_by_ap_id(actor) do
264 User.remove_pinnned_activity(user, activity)
265
266 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
267
268 if in_reply_to = deleted_object.data["inReplyTo"] do
269 Object.decrease_replies_count(in_reply_to)
270 end
271
272 MessageReference.delete_for_object(deleted_object)
273
274 ActivityPub.stream_out(object)
275 ActivityPub.stream_out_participations(deleted_object, user)
276 :ok
277 else
278 {:actor, _} ->
279 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
280 :no_object_actor
281 end
282
283 %User{} ->
284 with {:ok, _} <- User.delete(deleted_object) do
285 :ok
286 end
287 end
288
289 if result == :ok do
290 Notification.create_notifications(object)
291 {:ok, object, meta}
292 else
293 {:error, result}
294 end
295 end
296
297 # Nothing to do
298 def handle(object, meta) do
299 {:ok, object, meta}
300 end
301
302 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
303 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
304 actor = User.get_cached_by_ap_id(object.data["actor"])
305 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
306
307 streamables =
308 [[actor, recipient], [recipient, actor]]
309 |> Enum.uniq()
310 |> Enum.map(fn [user, other_user] ->
311 if user.local do
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
314
315 {
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
318 }
319 end
320 end)
321 |> Enum.filter(& &1)
322
323 meta =
324 meta
325 |> add_streamables(streamables)
326
327 {:ok, object, meta}
328 end
329 end
330
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
335 object.data["name"],
336 object.data["actor"]
337 )
338
339 {:ok, object, meta}
340 end
341 end
342
343 def handle_object_creation(%{"type" => objtype} = object, meta)
344 when objtype in ~w[Audio Video Question Event Article] do
345 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
346 {:ok, object, meta}
347 end
348 end
349
350 # Nothing to do
351 def handle_object_creation(object, meta) do
352 {:ok, object, meta}
353 end
354
355 defp undo_like(nil, object), do: delete_object(object)
356
357 defp undo_like(%Object{} = liked_object, object) do
358 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
359 delete_object(object)
360 end
361 end
362
363 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
364 object.data["object"]
365 |> Object.get_by_ap_id()
366 |> undo_like(object)
367 end
368
369 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
370 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
371 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
372 {:ok, _} <- Repo.delete(object) do
373 :ok
374 end
375 end
376
377 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
378 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
379 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
380 {:ok, _} <- Repo.delete(object) do
381 :ok
382 end
383 end
384
385 def handle_undoing(
386 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
387 ) do
388 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
389 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
390 {:ok, _} <- User.unblock(blocker, blocked),
391 {:ok, _} <- Repo.delete(object) do
392 :ok
393 end
394 end
395
396 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
397
398 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
399 defp delete_object(object) do
400 with {:ok, _} <- Repo.delete(object), do: :ok
401 end
402
403 defp send_notifications(meta) do
404 Keyword.get(meta, :notifications, [])
405 |> Enum.each(fn notification ->
406 Streamer.stream(["user", "user:notification"], notification)
407 Push.send(notification)
408 end)
409
410 meta
411 end
412
413 defp send_streamables(meta) do
414 Keyword.get(meta, :streamables, [])
415 |> Enum.each(fn {topics, items} ->
416 Streamer.stream(topics, items)
417 end)
418
419 meta
420 end
421
422 defp add_streamables(meta, streamables) do
423 existing = Keyword.get(meta, :streamables, [])
424
425 meta
426 |> Keyword.put(:streamables, streamables ++ existing)
427 end
428
429 defp add_notifications(meta, notifications) do
430 existing = Keyword.get(meta, :notifications, [])
431
432 meta
433 |> Keyword.put(:notifications, notifications ++ existing)
434 end
435
436 def handle_after_transaction(meta) do
437 meta
438 |> send_notifications()
439 |> send_streamables()
440 end
441 end