Merge branch 'issue/2256' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 def handle(object, meta \\ [])
31
32 # Task this handles
33 # - Follows
34 # - Sends a notification
35 def handle(
36 %{
37 data: %{
38 "actor" => actor,
39 "type" => "Accept",
40 "object" => follow_activity_id
41 }
42 } = object,
43 meta
44 ) do
45 with %Activity{actor: follower_id} = follow_activity <-
46 Activity.get_by_ap_id(follow_activity_id),
47 %User{} = followed <- User.get_cached_by_ap_id(actor),
48 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
49 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
50 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
51 Notification.update_notification_type(followed, follow_activity)
52 User.update_follower_count(followed)
53 User.update_following_count(follower)
54 end
55
56 {:ok, object, meta}
57 end
58
59 # Task this handles
60 # - Rejects all existing follow activities for this person
61 # - Updates the follow state
62 # - Dismisses notification
63 def handle(
64 %{
65 data: %{
66 "actor" => actor,
67 "type" => "Reject",
68 "object" => follow_activity_id
69 }
70 } = object,
71 meta
72 ) do
73 with %Activity{actor: follower_id} = follow_activity <-
74 Activity.get_by_ap_id(follow_activity_id),
75 %User{} = followed <- User.get_cached_by_ap_id(actor),
76 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
77 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
78 FollowingRelationship.update(follower, followed, :follow_reject)
79 Notification.dismiss(follow_activity)
80 end
81
82 {:ok, object, meta}
83 end
84
85 # Tasks this handle
86 # - Follows if possible
87 # - Sends a notification
88 # - Generates accept or reject if appropriate
89 def handle(
90 %{
91 data: %{
92 "id" => follow_id,
93 "type" => "Follow",
94 "object" => followed_user,
95 "actor" => following_user
96 }
97 } = object,
98 meta
99 ) do
100 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
101 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
102 {_, {:ok, _}, _, _} <-
103 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
104 if followed.local && !followed.is_locked do
105 {:ok, accept_data, _} = Builder.accept(followed, object)
106 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
107 end
108 else
109 {:following, {:error, _}, _follower, followed} ->
110 {:ok, reject_data, _} = Builder.reject(followed, object)
111 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
112
113 _ ->
114 nil
115 end
116
117 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
118
119 meta =
120 meta
121 |> add_notifications(notifications)
122
123 updated_object = Activity.get_by_ap_id(follow_id)
124
125 {:ok, updated_object, meta}
126 end
127
128 # Tasks this handles:
129 # - Unfollow and block
130 def handle(
131 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
132 object,
133 meta
134 ) do
135 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
136 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
137 User.block(blocker, blocked)
138 end
139
140 {:ok, object, meta}
141 end
142
143 # Tasks this handles:
144 # - Update the user
145 #
146 # For a local user, we also get a changeset with the full information, so we
147 # can update non-federating, non-activitypub settings as well.
148 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
149 if changeset = Keyword.get(meta, :user_update_changeset) do
150 changeset
151 |> User.update_and_set_cache()
152 else
153 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
154
155 User.get_by_ap_id(updated_object["id"])
156 |> User.remote_user_changeset(new_user_data)
157 |> User.update_and_set_cache()
158 end
159
160 {:ok, object, meta}
161 end
162
163 # Tasks this handles:
164 # - Add like to object
165 # - Set up notification
166 def handle(%{data: %{"type" => "Like"}} = object, meta) do
167 liked_object = Object.get_by_ap_id(object.data["object"])
168 Utils.add_like_to_object(object, liked_object)
169
170 Notification.create_notifications(object)
171
172 {:ok, object, meta}
173 end
174
175 # Tasks this handles
176 # - Actually create object
177 # - Rollback if we couldn't create it
178 # - Increase the user note count
179 # - Increase the reply count
180 # - Increase replies count
181 # - Set up ActivityExpiration
182 # - Set up notifications
183 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
184 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
185 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
186 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
187 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
188
189 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
190 Object.increase_replies_count(in_reply_to)
191 end
192
193 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
194 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
195 end)
196
197 meta =
198 meta
199 |> add_notifications(notifications)
200
201 {:ok, activity, meta}
202 else
203 e -> Repo.rollback(e)
204 end
205 end
206
207 # Tasks this handles:
208 # - Add announce to object
209 # - Set up notification
210 # - Stream out the announce
211 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
212 announced_object = Object.get_by_ap_id(object.data["object"])
213 user = User.get_cached_by_ap_id(object.data["actor"])
214
215 Utils.add_announce_to_object(object, announced_object)
216
217 if !User.is_internal_user?(user) do
218 Notification.create_notifications(object)
219
220 object
221 |> Topics.get_activity_topics()
222 |> Streamer.stream(object)
223 end
224
225 {:ok, object, meta}
226 end
227
228 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
229 with undone_object <- Activity.get_by_ap_id(undone_object),
230 :ok <- handle_undoing(undone_object) do
231 {:ok, object, meta}
232 end
233 end
234
235 # Tasks this handles:
236 # - Add reaction to object
237 # - Set up notification
238 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
239 reacted_object = Object.get_by_ap_id(object.data["object"])
240 Utils.add_emoji_reaction_to_object(object, reacted_object)
241
242 Notification.create_notifications(object)
243
244 {:ok, object, meta}
245 end
246
247 # Tasks this handles:
248 # - Delete and unpins the create activity
249 # - Replace object with Tombstone
250 # - Set up notification
251 # - Reduce the user note count
252 # - Reduce the reply count
253 # - Stream out the activity
254 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
255 deleted_object =
256 Object.normalize(deleted_object, false) ||
257 User.get_cached_by_ap_id(deleted_object)
258
259 result =
260 case deleted_object do
261 %Object{} ->
262 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
263 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
264 %User{} = user <- User.get_cached_by_ap_id(actor) do
265 User.remove_pinnned_activity(user, activity)
266
267 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
268
269 if in_reply_to = deleted_object.data["inReplyTo"] do
270 Object.decrease_replies_count(in_reply_to)
271 end
272
273 MessageReference.delete_for_object(deleted_object)
274
275 ActivityPub.stream_out(object)
276 ActivityPub.stream_out_participations(deleted_object, user)
277 :ok
278 else
279 {:actor, _} ->
280 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
281 :no_object_actor
282 end
283
284 %User{} ->
285 with {:ok, _} <- User.delete(deleted_object) do
286 :ok
287 end
288 end
289
290 if result == :ok do
291 Notification.create_notifications(object)
292 {:ok, object, meta}
293 else
294 {:error, result}
295 end
296 end
297
298 # Nothing to do
299 def handle(object, meta) do
300 {:ok, object, meta}
301 end
302
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
307
308 streamables =
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.uniq()
311 |> Enum.map(fn [user, other_user] ->
312 if user.local do
313 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
314 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
315
316 Cachex.put(
317 :chat_message_id_idempotency_key_cache,
318 cm_ref.id,
319 meta[:idempotency_key]
320 )
321
322 {
323 ["user", "user:pleroma_chat"],
324 {user, %{cm_ref | chat: chat, object: object}}
325 }
326 end
327 end)
328 |> Enum.filter(& &1)
329
330 meta =
331 meta
332 |> add_streamables(streamables)
333
334 {:ok, object, meta}
335 end
336 end
337
338 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
339 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
340 Object.increase_vote_count(
341 object.data["inReplyTo"],
342 object.data["name"],
343 object.data["actor"]
344 )
345
346 {:ok, object, meta}
347 end
348 end
349
350 def handle_object_creation(%{"type" => objtype} = object, meta)
351 when objtype in ~w[Audio Video Question Event Article] do
352 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
353 {:ok, object, meta}
354 end
355 end
356
357 # Nothing to do
358 def handle_object_creation(object, meta) do
359 {:ok, object, meta}
360 end
361
362 defp undo_like(nil, object), do: delete_object(object)
363
364 defp undo_like(%Object{} = liked_object, object) do
365 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
366 delete_object(object)
367 end
368 end
369
370 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
371 object.data["object"]
372 |> Object.get_by_ap_id()
373 |> undo_like(object)
374 end
375
376 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
377 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
378 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
379 {:ok, _} <- Repo.delete(object) do
380 :ok
381 end
382 end
383
384 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
385 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
386 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
387 {:ok, _} <- Repo.delete(object) do
388 :ok
389 end
390 end
391
392 def handle_undoing(
393 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
394 ) do
395 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
396 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
397 {:ok, _} <- User.unblock(blocker, blocked),
398 {:ok, _} <- Repo.delete(object) do
399 :ok
400 end
401 end
402
403 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
404
405 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
406 defp delete_object(object) do
407 with {:ok, _} <- Repo.delete(object), do: :ok
408 end
409
410 defp send_notifications(meta) do
411 Keyword.get(meta, :notifications, [])
412 |> Enum.each(fn notification ->
413 Streamer.stream(["user", "user:notification"], notification)
414 Push.send(notification)
415 end)
416
417 meta
418 end
419
420 defp send_streamables(meta) do
421 Keyword.get(meta, :streamables, [])
422 |> Enum.each(fn {topics, items} ->
423 Streamer.stream(topics, items)
424 end)
425
426 meta
427 end
428
429 defp add_streamables(meta, streamables) do
430 existing = Keyword.get(meta, :streamables, [])
431
432 meta
433 |> Keyword.put(:streamables, streamables ++ existing)
434 end
435
436 defp add_notifications(meta, notifications) do
437 existing = Keyword.get(meta, :notifications, [])
438
439 meta
440 |> Keyword.put(:notifications, notifications ++ existing)
441 end
442
443 def handle_after_transaction(meta) do
444 meta
445 |> send_notifications()
446 |> send_streamables()
447 end
448 end