Stream follow updates
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6 @moduledoc """
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
11 """
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
14 alias Pleroma.Chat
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
18 alias Pleroma.Object
19 alias Pleroma.Repo
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27
28 require Logger
29
30 def handle(object, meta \\ [])
31
32 # Task this handles
33 # - Follows
34 # - Sends a notification
35 def handle(
36 %{
37 data: %{
38 "actor" => actor,
39 "type" => "Accept",
40 "object" => follow_activity_id
41 }
42 } = object,
43 meta
44 ) do
45 with %Activity{actor: follower_id} = follow_activity <-
46 Activity.get_by_ap_id(follow_activity_id),
47 %User{} = followed <- User.get_cached_by_ap_id(actor),
48 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
49 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
50 {:ok, _follower, followed} <-
51 FollowingRelationship.update(follower, followed, :follow_accept) do
52 Notification.update_notification_type(followed, follow_activity)
53 end
54
55 {:ok, object, meta}
56 end
57
58 # Task this handles
59 # - Rejects all existing follow activities for this person
60 # - Updates the follow state
61 # - Dismisses notification
62 def handle(
63 %{
64 data: %{
65 "actor" => actor,
66 "type" => "Reject",
67 "object" => follow_activity_id
68 }
69 } = object,
70 meta
71 ) do
72 with %Activity{actor: follower_id} = follow_activity <-
73 Activity.get_by_ap_id(follow_activity_id),
74 %User{} = followed <- User.get_cached_by_ap_id(actor),
75 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
76 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
77 FollowingRelationship.update(follower, followed, :follow_reject)
78 Notification.dismiss(follow_activity)
79 end
80
81 {:ok, object, meta}
82 end
83
84 # Tasks this handle
85 # - Follows if possible
86 # - Sends a notification
87 # - Generates accept or reject if appropriate
88 def handle(
89 %{
90 data: %{
91 "id" => follow_id,
92 "type" => "Follow",
93 "object" => followed_user,
94 "actor" => following_user
95 }
96 } = object,
97 meta
98 ) do
99 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
100 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
101 {_, {:ok, _, _}, _, _} <-
102 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
103 if followed.local && !followed.is_locked do
104 {:ok, accept_data, _} = Builder.accept(followed, object)
105 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
106 end
107 else
108 {:following, {:error, _}, _follower, followed} ->
109 {:ok, reject_data, _} = Builder.reject(followed, object)
110 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
111
112 _ ->
113 nil
114 end
115
116 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
117
118 meta =
119 meta
120 |> add_notifications(notifications)
121
122 updated_object = Activity.get_by_ap_id(follow_id)
123
124 {:ok, updated_object, meta}
125 end
126
127 # Tasks this handles:
128 # - Unfollow and block
129 def handle(
130 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
131 object,
132 meta
133 ) do
134 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
135 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
136 User.block(blocker, blocked)
137 end
138
139 {:ok, object, meta}
140 end
141
142 # Tasks this handles:
143 # - Update the user
144 #
145 # For a local user, we also get a changeset with the full information, so we
146 # can update non-federating, non-activitypub settings as well.
147 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
148 if changeset = Keyword.get(meta, :user_update_changeset) do
149 changeset
150 |> User.update_and_set_cache()
151 else
152 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
153
154 User.get_by_ap_id(updated_object["id"])
155 |> User.remote_user_changeset(new_user_data)
156 |> User.update_and_set_cache()
157 end
158
159 {:ok, object, meta}
160 end
161
162 # Tasks this handles:
163 # - Add like to object
164 # - Set up notification
165 def handle(%{data: %{"type" => "Like"}} = object, meta) do
166 liked_object = Object.get_by_ap_id(object.data["object"])
167 Utils.add_like_to_object(object, liked_object)
168
169 Notification.create_notifications(object)
170
171 {:ok, object, meta}
172 end
173
174 # Tasks this handles
175 # - Actually create object
176 # - Rollback if we couldn't create it
177 # - Increase the user note count
178 # - Increase the reply count
179 # - Increase replies count
180 # - Set up ActivityExpiration
181 # - Set up notifications
182 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
183 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
184 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
185 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
186 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
187
188 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
189 Object.increase_replies_count(in_reply_to)
190 end
191
192 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
193 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
194 end)
195
196 meta =
197 meta
198 |> add_notifications(notifications)
199
200 {:ok, activity, meta}
201 else
202 e -> Repo.rollback(e)
203 end
204 end
205
206 # Tasks this handles:
207 # - Add announce to object
208 # - Set up notification
209 # - Stream out the announce
210 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
211 announced_object = Object.get_by_ap_id(object.data["object"])
212 user = User.get_cached_by_ap_id(object.data["actor"])
213
214 Utils.add_announce_to_object(object, announced_object)
215
216 if !User.is_internal_user?(user) do
217 Notification.create_notifications(object)
218
219 object
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(object)
222 end
223
224 {:ok, object, meta}
225 end
226
227 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
228 with undone_object <- Activity.get_by_ap_id(undone_object),
229 :ok <- handle_undoing(undone_object) do
230 {:ok, object, meta}
231 end
232 end
233
234 # Tasks this handles:
235 # - Add reaction to object
236 # - Set up notification
237 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
238 reacted_object = Object.get_by_ap_id(object.data["object"])
239 Utils.add_emoji_reaction_to_object(object, reacted_object)
240
241 Notification.create_notifications(object)
242
243 {:ok, object, meta}
244 end
245
246 # Tasks this handles:
247 # - Delete and unpins the create activity
248 # - Replace object with Tombstone
249 # - Set up notification
250 # - Reduce the user note count
251 # - Reduce the reply count
252 # - Stream out the activity
253 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
254 deleted_object =
255 Object.normalize(deleted_object, false) ||
256 User.get_cached_by_ap_id(deleted_object)
257
258 result =
259 case deleted_object do
260 %Object{} ->
261 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
262 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
263 %User{} = user <- User.get_cached_by_ap_id(actor) do
264 User.remove_pinnned_activity(user, activity)
265
266 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
267
268 if in_reply_to = deleted_object.data["inReplyTo"] do
269 Object.decrease_replies_count(in_reply_to)
270 end
271
272 MessageReference.delete_for_object(deleted_object)
273
274 ActivityPub.stream_out(object)
275 ActivityPub.stream_out_participations(deleted_object, user)
276 :ok
277 else
278 {:actor, _} ->
279 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
280 :no_object_actor
281 end
282
283 %User{} ->
284 with {:ok, _} <- User.delete(deleted_object) do
285 :ok
286 end
287 end
288
289 if result == :ok do
290 Notification.create_notifications(object)
291 {:ok, object, meta}
292 else
293 {:error, result}
294 end
295 end
296
297 # Nothing to do
298 def handle(object, meta) do
299 {:ok, object, meta}
300 end
301
302 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
303 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
304 actor = User.get_cached_by_ap_id(object.data["actor"])
305 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
306
307 streamables =
308 [[actor, recipient], [recipient, actor]]
309 |> Enum.uniq()
310 |> Enum.map(fn [user, other_user] ->
311 if user.local do
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
314
315 Cachex.put(
316 :chat_message_id_idempotency_key_cache,
317 cm_ref.id,
318 meta[:idempotency_key]
319 )
320
321 {
322 ["user", "user:pleroma_chat"],
323 {user, %{cm_ref | chat: chat, object: object}}
324 }
325 end
326 end)
327 |> Enum.filter(& &1)
328
329 meta =
330 meta
331 |> add_streamables(streamables)
332
333 {:ok, object, meta}
334 end
335 end
336
337 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
338 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
339 Object.increase_vote_count(
340 object.data["inReplyTo"],
341 object.data["name"],
342 object.data["actor"]
343 )
344
345 {:ok, object, meta}
346 end
347 end
348
349 def handle_object_creation(%{"type" => objtype} = object, meta)
350 when objtype in ~w[Audio Video Question Event Article] do
351 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
352 {:ok, object, meta}
353 end
354 end
355
356 # Nothing to do
357 def handle_object_creation(object, meta) do
358 {:ok, object, meta}
359 end
360
361 defp undo_like(nil, object), do: delete_object(object)
362
363 defp undo_like(%Object{} = liked_object, object) do
364 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
365 delete_object(object)
366 end
367 end
368
369 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
370 object.data["object"]
371 |> Object.get_by_ap_id()
372 |> undo_like(object)
373 end
374
375 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
376 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
377 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
378 {:ok, _} <- Repo.delete(object) do
379 :ok
380 end
381 end
382
383 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
384 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
385 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
386 {:ok, _} <- Repo.delete(object) do
387 :ok
388 end
389 end
390
391 def handle_undoing(
392 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
393 ) do
394 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
395 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
396 {:ok, _} <- User.unblock(blocker, blocked),
397 {:ok, _} <- Repo.delete(object) do
398 :ok
399 end
400 end
401
402 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
403
404 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
405 defp delete_object(object) do
406 with {:ok, _} <- Repo.delete(object), do: :ok
407 end
408
409 defp send_notifications(meta) do
410 Keyword.get(meta, :notifications, [])
411 |> Enum.each(fn notification ->
412 Streamer.stream(["user", "user:notification"], notification)
413 Push.send(notification)
414 end)
415
416 meta
417 end
418
419 defp send_streamables(meta) do
420 Keyword.get(meta, :streamables, [])
421 |> Enum.each(fn {topics, items} ->
422 Streamer.stream(topics, items)
423 end)
424
425 meta
426 end
427
428 defp add_streamables(meta, streamables) do
429 existing = Keyword.get(meta, :streamables, [])
430
431 meta
432 |> Keyword.put(:streamables, streamables ++ existing)
433 end
434
435 defp add_notifications(meta, notifications) do
436 existing = Keyword.get(meta, :notifications, [])
437
438 meta
439 |> Keyword.put(:notifications, notifications ++ existing)
440 end
441
442 def handle_after_transaction(meta) do
443 meta
444 |> send_notifications()
445 |> send_streamables()
446 end
447 end