Cachex: Make caching provider switchable at runtime.
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31
32 def handle(object, meta \\ [])
33
34 # Task this handles
35 # - Follows
36 # - Sends a notification
37 def handle(
38 %{
39 data: %{
40 "actor" => actor,
41 "type" => "Accept",
42 "object" => follow_activity_id
43 }
44 } = object,
45 meta
46 ) do
47 with %Activity{actor: follower_id} = follow_activity <-
48 Activity.get_by_ap_id(follow_activity_id),
49 %User{} = followed <- User.get_cached_by_ap_id(actor),
50 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
51 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
52 {:ok, _follower, followed} <-
53 FollowingRelationship.update(follower, followed, :follow_accept) do
54 Notification.update_notification_type(followed, follow_activity)
55 end
56
57 {:ok, object, meta}
58 end
59
60 # Task this handles
61 # - Rejects all existing follow activities for this person
62 # - Updates the follow state
63 # - Dismisses notification
64 def handle(
65 %{
66 data: %{
67 "actor" => actor,
68 "type" => "Reject",
69 "object" => follow_activity_id
70 }
71 } = object,
72 meta
73 ) do
74 with %Activity{actor: follower_id} = follow_activity <-
75 Activity.get_by_ap_id(follow_activity_id),
76 %User{} = followed <- User.get_cached_by_ap_id(actor),
77 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
78 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
79 FollowingRelationship.update(follower, followed, :follow_reject)
80 Notification.dismiss(follow_activity)
81 end
82
83 {:ok, object, meta}
84 end
85
86 # Tasks this handle
87 # - Follows if possible
88 # - Sends a notification
89 # - Generates accept or reject if appropriate
90 def handle(
91 %{
92 data: %{
93 "id" => follow_id,
94 "type" => "Follow",
95 "object" => followed_user,
96 "actor" => following_user
97 }
98 } = object,
99 meta
100 ) do
101 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
102 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
103 {_, {:ok, _, _}, _, _} <-
104 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
105 if followed.local && !followed.is_locked do
106 {:ok, accept_data, _} = Builder.accept(followed, object)
107 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
108 end
109 else
110 {:following, {:error, _}, _follower, followed} ->
111 {:ok, reject_data, _} = Builder.reject(followed, object)
112 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
113
114 _ ->
115 nil
116 end
117
118 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
119
120 meta =
121 meta
122 |> add_notifications(notifications)
123
124 updated_object = Activity.get_by_ap_id(follow_id)
125
126 {:ok, updated_object, meta}
127 end
128
129 # Tasks this handles:
130 # - Unfollow and block
131 def handle(
132 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
133 object,
134 meta
135 ) do
136 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
137 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
138 User.block(blocker, blocked)
139 end
140
141 {:ok, object, meta}
142 end
143
144 # Tasks this handles:
145 # - Update the user
146 #
147 # For a local user, we also get a changeset with the full information, so we
148 # can update non-federating, non-activitypub settings as well.
149 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
150 if changeset = Keyword.get(meta, :user_update_changeset) do
151 changeset
152 |> User.update_and_set_cache()
153 else
154 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
155
156 User.get_by_ap_id(updated_object["id"])
157 |> User.remote_user_changeset(new_user_data)
158 |> User.update_and_set_cache()
159 end
160
161 {:ok, object, meta}
162 end
163
164 # Tasks this handles:
165 # - Add like to object
166 # - Set up notification
167 def handle(%{data: %{"type" => "Like"}} = object, meta) do
168 liked_object = Object.get_by_ap_id(object.data["object"])
169 Utils.add_like_to_object(object, liked_object)
170
171 Notification.create_notifications(object)
172
173 {:ok, object, meta}
174 end
175
176 # Tasks this handles
177 # - Actually create object
178 # - Rollback if we couldn't create it
179 # - Increase the user note count
180 # - Increase the reply count
181 # - Increase replies count
182 # - Set up ActivityExpiration
183 # - Set up notifications
184 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
185 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
186 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
187 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
188 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
189
190 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
191 Object.increase_replies_count(in_reply_to)
192 end
193
194 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
195 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
196 end)
197
198 meta =
199 meta
200 |> add_notifications(notifications)
201
202 {:ok, activity, meta}
203 else
204 e -> Repo.rollback(e)
205 end
206 end
207
208 # Tasks this handles:
209 # - Add announce to object
210 # - Set up notification
211 # - Stream out the announce
212 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
213 announced_object = Object.get_by_ap_id(object.data["object"])
214 user = User.get_cached_by_ap_id(object.data["actor"])
215
216 Utils.add_announce_to_object(object, announced_object)
217
218 if !User.is_internal_user?(user) do
219 Notification.create_notifications(object)
220
221 object
222 |> Topics.get_activity_topics()
223 |> Streamer.stream(object)
224 end
225
226 {:ok, object, meta}
227 end
228
229 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
230 with undone_object <- Activity.get_by_ap_id(undone_object),
231 :ok <- handle_undoing(undone_object) do
232 {:ok, object, meta}
233 end
234 end
235
236 # Tasks this handles:
237 # - Add reaction to object
238 # - Set up notification
239 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
240 reacted_object = Object.get_by_ap_id(object.data["object"])
241 Utils.add_emoji_reaction_to_object(object, reacted_object)
242
243 Notification.create_notifications(object)
244
245 {:ok, object, meta}
246 end
247
248 # Tasks this handles:
249 # - Delete and unpins the create activity
250 # - Replace object with Tombstone
251 # - Set up notification
252 # - Reduce the user note count
253 # - Reduce the reply count
254 # - Stream out the activity
255 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
256 deleted_object =
257 Object.normalize(deleted_object, false) ||
258 User.get_cached_by_ap_id(deleted_object)
259
260 result =
261 case deleted_object do
262 %Object{} ->
263 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
264 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
265 %User{} = user <- User.get_cached_by_ap_id(actor) do
266 User.remove_pinnned_activity(user, activity)
267
268 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
269
270 if in_reply_to = deleted_object.data["inReplyTo"] do
271 Object.decrease_replies_count(in_reply_to)
272 end
273
274 MessageReference.delete_for_object(deleted_object)
275
276 ActivityPub.stream_out(object)
277 ActivityPub.stream_out_participations(deleted_object, user)
278 :ok
279 else
280 {:actor, _} ->
281 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
282 :no_object_actor
283 end
284
285 %User{} ->
286 with {:ok, _} <- User.delete(deleted_object) do
287 :ok
288 end
289 end
290
291 if result == :ok do
292 Notification.create_notifications(object)
293 {:ok, object, meta}
294 else
295 {:error, result}
296 end
297 end
298
299 # Nothing to do
300 def handle(object, meta) do
301 {:ok, object, meta}
302 end
303
304 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
305 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
306 actor = User.get_cached_by_ap_id(object.data["actor"])
307 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
308
309 streamables =
310 [[actor, recipient], [recipient, actor]]
311 |> Enum.uniq()
312 |> Enum.map(fn [user, other_user] ->
313 if user.local do
314 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
315 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
316
317 @cachex.put(
318 :chat_message_id_idempotency_key_cache,
319 cm_ref.id,
320 meta[:idempotency_key]
321 )
322
323 {
324 ["user", "user:pleroma_chat"],
325 {user, %{cm_ref | chat: chat, object: object}}
326 }
327 end
328 end)
329 |> Enum.filter(& &1)
330
331 meta =
332 meta
333 |> add_streamables(streamables)
334
335 {:ok, object, meta}
336 end
337 end
338
339 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
340 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
341 Object.increase_vote_count(
342 object.data["inReplyTo"],
343 object.data["name"],
344 object.data["actor"]
345 )
346
347 {:ok, object, meta}
348 end
349 end
350
351 def handle_object_creation(%{"type" => objtype} = object, meta)
352 when objtype in ~w[Audio Video Question Event Article] do
353 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
354 {:ok, object, meta}
355 end
356 end
357
358 # Nothing to do
359 def handle_object_creation(object, meta) do
360 {:ok, object, meta}
361 end
362
363 defp undo_like(nil, object), do: delete_object(object)
364
365 defp undo_like(%Object{} = liked_object, object) do
366 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
367 delete_object(object)
368 end
369 end
370
371 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
372 object.data["object"]
373 |> Object.get_by_ap_id()
374 |> undo_like(object)
375 end
376
377 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
378 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
379 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
380 {:ok, _} <- Repo.delete(object) do
381 :ok
382 end
383 end
384
385 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
386 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
387 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
388 {:ok, _} <- Repo.delete(object) do
389 :ok
390 end
391 end
392
393 def handle_undoing(
394 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
395 ) do
396 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
397 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
398 {:ok, _} <- User.unblock(blocker, blocked),
399 {:ok, _} <- Repo.delete(object) do
400 :ok
401 end
402 end
403
404 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
405
406 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
407 defp delete_object(object) do
408 with {:ok, _} <- Repo.delete(object), do: :ok
409 end
410
411 defp send_notifications(meta) do
412 Keyword.get(meta, :notifications, [])
413 |> Enum.each(fn notification ->
414 Streamer.stream(["user", "user:notification"], notification)
415 Push.send(notification)
416 end)
417
418 meta
419 end
420
421 defp send_streamables(meta) do
422 Keyword.get(meta, :streamables, [])
423 |> Enum.each(fn {topics, items} ->
424 Streamer.stream(topics, items)
425 end)
426
427 meta
428 end
429
430 defp add_streamables(meta, streamables) do
431 existing = Keyword.get(meta, :streamables, [])
432
433 meta
434 |> Keyword.put(:streamables, streamables ++ existing)
435 end
436
437 defp add_notifications(meta, notifications) do
438 existing = Keyword.get(meta, :notifications, [])
439
440 meta
441 |> Keyword.put(:notifications, notifications ++ existing)
442 end
443
444 def handle_after_transaction(meta) do
445 meta
446 |> send_notifications()
447 |> send_streamables()
448 end
449 end