Merge branch 'develop' of https://git.pleroma.social/pleroma/pleroma into develop
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31
32 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
33
34 @impl true
35 def handle(object, meta \\ [])
36
37 # Task this handles
38 # - Follows
39 # - Sends a notification
40 @impl true
41 def handle(
42 %{
43 data: %{
44 "actor" => actor,
45 "type" => "Accept",
46 "object" => follow_activity_id
47 }
48 } = object,
49 meta
50 ) do
51 with %Activity{actor: follower_id} = follow_activity <-
52 Activity.get_by_ap_id(follow_activity_id),
53 %User{} = followed <- User.get_cached_by_ap_id(actor),
54 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
55 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
56 {:ok, _follower, followed} <-
57 FollowingRelationship.update(follower, followed, :follow_accept) do
58 Notification.update_notification_type(followed, follow_activity)
59 end
60
61 {:ok, object, meta}
62 end
63
64 # Task this handles
65 # - Rejects all existing follow activities for this person
66 # - Updates the follow state
67 # - Dismisses notification
68 @impl true
69 def handle(
70 %{
71 data: %{
72 "actor" => actor,
73 "type" => "Reject",
74 "object" => follow_activity_id
75 }
76 } = object,
77 meta
78 ) do
79 with %Activity{actor: follower_id} = follow_activity <-
80 Activity.get_by_ap_id(follow_activity_id),
81 %User{} = followed <- User.get_cached_by_ap_id(actor),
82 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
83 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
84 FollowingRelationship.update(follower, followed, :follow_reject)
85 Notification.dismiss(follow_activity)
86 end
87
88 {:ok, object, meta}
89 end
90
91 # Tasks this handle
92 # - Follows if possible
93 # - Sends a notification
94 # - Generates accept or reject if appropriate
95 @impl true
96 def handle(
97 %{
98 data: %{
99 "id" => follow_id,
100 "type" => "Follow",
101 "object" => followed_user,
102 "actor" => following_user
103 }
104 } = object,
105 meta
106 ) do
107 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
108 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
109 {_, {:ok, _, _}, _, _} <-
110 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
111 if followed.local && !followed.is_locked do
112 {:ok, accept_data, _} = Builder.accept(followed, object)
113 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
114 end
115 else
116 {:following, {:error, _}, _follower, followed} ->
117 {:ok, reject_data, _} = Builder.reject(followed, object)
118 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
119
120 _ ->
121 nil
122 end
123
124 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
125
126 meta =
127 meta
128 |> add_notifications(notifications)
129
130 updated_object = Activity.get_by_ap_id(follow_id)
131
132 {:ok, updated_object, meta}
133 end
134
135 # Tasks this handles:
136 # - Unfollow and block
137 @impl true
138 def handle(
139 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
140 object,
141 meta
142 ) do
143 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
144 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
145 User.block(blocker, blocked)
146 end
147
148 {:ok, object, meta}
149 end
150
151 # Tasks this handles:
152 # - Update the user
153 #
154 # For a local user, we also get a changeset with the full information, so we
155 # can update non-federating, non-activitypub settings as well.
156 @impl true
157 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
158 if changeset = Keyword.get(meta, :user_update_changeset) do
159 changeset
160 |> User.update_and_set_cache()
161 else
162 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
163
164 User.get_by_ap_id(updated_object["id"])
165 |> User.remote_user_changeset(new_user_data)
166 |> User.update_and_set_cache()
167 end
168
169 {:ok, object, meta}
170 end
171
172 # Tasks this handles:
173 # - Add like to object
174 # - Set up notification
175 @impl true
176 def handle(%{data: %{"type" => "Like"}} = object, meta) do
177 liked_object = Object.get_by_ap_id(object.data["object"])
178 Utils.add_like_to_object(object, liked_object)
179
180 Notification.create_notifications(object)
181
182 {:ok, object, meta}
183 end
184
185 # Tasks this handles
186 # - Actually create object
187 # - Rollback if we couldn't create it
188 # - Increase the user note count
189 # - Increase the reply count
190 # - Increase replies count
191 # - Set up ActivityExpiration
192 # - Set up notifications
193 @impl true
194 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
195 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
196 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
197 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
198 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
199
200 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
201 Object.increase_replies_count(in_reply_to)
202 end
203
204 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
205 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
206 end)
207
208 meta =
209 meta
210 |> add_notifications(notifications)
211
212 {:ok, activity, meta}
213 else
214 e -> Repo.rollback(e)
215 end
216 end
217
218 # Tasks this handles:
219 # - Add announce to object
220 # - Set up notification
221 # - Stream out the announce
222 @impl true
223 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
224 announced_object = Object.get_by_ap_id(object.data["object"])
225 user = User.get_cached_by_ap_id(object.data["actor"])
226
227 Utils.add_announce_to_object(object, announced_object)
228
229 if !User.is_internal_user?(user) do
230 Notification.create_notifications(object)
231
232 object
233 |> Topics.get_activity_topics()
234 |> Streamer.stream(object)
235 end
236
237 {:ok, object, meta}
238 end
239
240 @impl true
241 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
242 with undone_object <- Activity.get_by_ap_id(undone_object),
243 :ok <- handle_undoing(undone_object) do
244 {:ok, object, meta}
245 end
246 end
247
248 # Tasks this handles:
249 # - Add reaction to object
250 # - Set up notification
251 @impl true
252 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
253 reacted_object = Object.get_by_ap_id(object.data["object"])
254 Utils.add_emoji_reaction_to_object(object, reacted_object)
255
256 Notification.create_notifications(object)
257
258 {:ok, object, meta}
259 end
260
261 # Tasks this handles:
262 # - Delete and unpins the create activity
263 # - Replace object with Tombstone
264 # - Set up notification
265 # - Reduce the user note count
266 # - Reduce the reply count
267 # - Stream out the activity
268 @impl true
269 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
270 deleted_object =
271 Object.normalize(deleted_object, false) ||
272 User.get_cached_by_ap_id(deleted_object)
273
274 result =
275 case deleted_object do
276 %Object{} ->
277 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
278 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
279 %User{} = user <- User.get_cached_by_ap_id(actor) do
280 User.remove_pinnned_activity(user, activity)
281
282 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
283
284 if in_reply_to = deleted_object.data["inReplyTo"] do
285 Object.decrease_replies_count(in_reply_to)
286 end
287
288 MessageReference.delete_for_object(deleted_object)
289
290 ActivityPub.stream_out(object)
291 ActivityPub.stream_out_participations(deleted_object, user)
292 :ok
293 else
294 {:actor, _} ->
295 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
296 :no_object_actor
297 end
298
299 %User{} ->
300 with {:ok, _} <- User.delete(deleted_object) do
301 :ok
302 end
303 end
304
305 if result == :ok do
306 Notification.create_notifications(object)
307 {:ok, object, meta}
308 else
309 {:error, result}
310 end
311 end
312
313 # Nothing to do
314 @impl true
315 def handle(object, meta) do
316 {:ok, object, meta}
317 end
318
319 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
320 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
321 actor = User.get_cached_by_ap_id(object.data["actor"])
322 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
323
324 streamables =
325 [[actor, recipient], [recipient, actor]]
326 |> Enum.uniq()
327 |> Enum.map(fn [user, other_user] ->
328 if user.local do
329 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
330 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
331
332 @cachex.put(
333 :chat_message_id_idempotency_key_cache,
334 cm_ref.id,
335 meta[:idempotency_key]
336 )
337
338 {
339 ["user", "user:pleroma_chat"],
340 {user, %{cm_ref | chat: chat, object: object}}
341 }
342 end
343 end)
344 |> Enum.filter(& &1)
345
346 meta =
347 meta
348 |> add_streamables(streamables)
349
350 {:ok, object, meta}
351 end
352 end
353
354 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
355 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
356 Object.increase_vote_count(
357 object.data["inReplyTo"],
358 object.data["name"],
359 object.data["actor"]
360 )
361
362 {:ok, object, meta}
363 end
364 end
365
366 def handle_object_creation(%{"type" => objtype} = object, meta)
367 when objtype in ~w[Audio Video Question Event Article] do
368 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
369 {:ok, object, meta}
370 end
371 end
372
373 # Nothing to do
374 def handle_object_creation(object, meta) do
375 {:ok, object, meta}
376 end
377
378 defp undo_like(nil, object), do: delete_object(object)
379
380 defp undo_like(%Object{} = liked_object, object) do
381 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
382 delete_object(object)
383 end
384 end
385
386 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
387 object.data["object"]
388 |> Object.get_by_ap_id()
389 |> undo_like(object)
390 end
391
392 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
393 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
394 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
395 {:ok, _} <- Repo.delete(object) do
396 :ok
397 end
398 end
399
400 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
401 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
402 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
403 {:ok, _} <- Repo.delete(object) do
404 :ok
405 end
406 end
407
408 def handle_undoing(
409 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
410 ) do
411 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
412 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
413 {:ok, _} <- User.unblock(blocker, blocked),
414 {:ok, _} <- Repo.delete(object) do
415 :ok
416 end
417 end
418
419 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
420
421 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
422 defp delete_object(object) do
423 with {:ok, _} <- Repo.delete(object), do: :ok
424 end
425
426 defp send_notifications(meta) do
427 Keyword.get(meta, :notifications, [])
428 |> Enum.each(fn notification ->
429 Streamer.stream(["user", "user:notification"], notification)
430 Push.send(notification)
431 end)
432
433 meta
434 end
435
436 defp send_streamables(meta) do
437 Keyword.get(meta, :streamables, [])
438 |> Enum.each(fn {topics, items} ->
439 Streamer.stream(topics, items)
440 end)
441
442 meta
443 end
444
445 defp add_streamables(meta, streamables) do
446 existing = Keyword.get(meta, :streamables, [])
447
448 meta
449 |> Keyword.put(:streamables, streamables ++ existing)
450 end
451
452 defp add_notifications(meta, notifications) do
453 existing = Keyword.get(meta, :notifications, [])
454
455 meta
456 |> Keyword.put(:notifications, notifications ++ existing)
457 end
458
459 @impl true
460 def handle_after_transaction(meta) do
461 meta
462 |> send_notifications()
463 |> send_streamables()
464 end
465 end