a78ec411fb5fb136d53bb59db410d4580742b66a
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Chat
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.FollowingRelationship
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Repo
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ActivityPub
18 alias Pleroma.Web.ActivityPub.Pipeline
19 alias Pleroma.Web.ActivityPub.Utils
20 alias Pleroma.Web.Push
21 alias Pleroma.Web.Streamer
22
23 def handle(object, meta \\ [])
24
25 # Tasks this handle
26 # - Follows if possible
27 # - Sends a notification
28 # - Generates accept or reject if appropriate
29 def handle(
30 %{
31 data: %{
32 "id" => follow_id,
33 "type" => "Follow",
34 "object" => followed_user,
35 "actor" => following_user
36 }
37 } = object,
38 meta
39 ) do
40 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
41 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
42 {_, {:ok, _}, _, _} <-
43 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
44 if followed.local && !followed.locked do
45 Utils.update_follow_state_for_all(object, "accept")
46 FollowingRelationship.update(follower, followed, :follow_accept)
47 User.update_follower_count(followed)
48 User.update_following_count(follower)
49
50 %{
51 to: [following_user],
52 actor: followed,
53 object: follow_id,
54 local: true
55 }
56 |> ActivityPub.accept()
57 end
58 else
59 {:following, {:error, _}, follower, followed} ->
60 Utils.update_follow_state_for_all(object, "reject")
61 FollowingRelationship.update(follower, followed, :follow_reject)
62
63 if followed.local do
64 %{
65 to: [follower.ap_id],
66 actor: followed,
67 object: follow_id,
68 local: true
69 }
70 |> ActivityPub.reject()
71 end
72
73 _ ->
74 nil
75 end
76
77 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
78
79 meta =
80 meta
81 |> add_notifications(notifications)
82
83 updated_object = Activity.get_by_ap_id(follow_id)
84
85 {:ok, updated_object, meta}
86 end
87
88 # Tasks this handles:
89 # - Unfollow and block
90 def handle(
91 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
92 object,
93 meta
94 ) do
95 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
96 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
97 User.block(blocker, blocked)
98 end
99
100 {:ok, object, meta}
101 end
102
103 # Tasks this handles:
104 # - Update the user
105 #
106 # For a local user, we also get a changeset with the full information, so we
107 # can update non-federating, non-activitypub settings as well.
108 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
109 if changeset = Keyword.get(meta, :user_update_changeset) do
110 changeset
111 |> User.update_and_set_cache()
112 else
113 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
114
115 User.get_by_ap_id(updated_object["id"])
116 |> User.remote_user_changeset(new_user_data)
117 |> User.update_and_set_cache()
118 end
119
120 {:ok, object, meta}
121 end
122
123 # Tasks this handles:
124 # - Add like to object
125 # - Set up notification
126 def handle(%{data: %{"type" => "Like"}} = object, meta) do
127 liked_object = Object.get_by_ap_id(object.data["object"])
128 Utils.add_like_to_object(object, liked_object)
129
130 Notification.create_notifications(object)
131
132 {:ok, object, meta}
133 end
134
135 # Tasks this handles
136 # - Actually create object
137 # - Rollback if we couldn't create it
138 # - Set up notifications
139 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
140 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
141 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
142
143 meta =
144 meta
145 |> add_notifications(notifications)
146
147 {:ok, activity, meta}
148 else
149 e -> Repo.rollback(e)
150 end
151 end
152
153 # Tasks this handles:
154 # - Add announce to object
155 # - Set up notification
156 # - Stream out the announce
157 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
158 announced_object = Object.get_by_ap_id(object.data["object"])
159 user = User.get_cached_by_ap_id(object.data["actor"])
160
161 Utils.add_announce_to_object(object, announced_object)
162
163 if !User.is_internal_user?(user) do
164 Notification.create_notifications(object)
165
166 object
167 |> Topics.get_activity_topics()
168 |> Streamer.stream(object)
169 end
170
171 {:ok, object, meta}
172 end
173
174 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
175 with undone_object <- Activity.get_by_ap_id(undone_object),
176 :ok <- handle_undoing(undone_object) do
177 {:ok, object, meta}
178 end
179 end
180
181 # Tasks this handles:
182 # - Add reaction to object
183 # - Set up notification
184 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
185 reacted_object = Object.get_by_ap_id(object.data["object"])
186 Utils.add_emoji_reaction_to_object(object, reacted_object)
187
188 Notification.create_notifications(object)
189
190 {:ok, object, meta}
191 end
192
193 # Tasks this handles:
194 # - Delete and unpins the create activity
195 # - Replace object with Tombstone
196 # - Set up notification
197 # - Reduce the user note count
198 # - Reduce the reply count
199 # - Stream out the activity
200 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
201 deleted_object =
202 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
203
204 result =
205 case deleted_object do
206 %Object{} ->
207 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
208 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
209 User.remove_pinnned_activity(user, activity)
210
211 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
212
213 if in_reply_to = deleted_object.data["inReplyTo"] do
214 Object.decrease_replies_count(in_reply_to)
215 end
216
217 MessageReference.delete_for_object(deleted_object)
218
219 ActivityPub.stream_out(object)
220 ActivityPub.stream_out_participations(deleted_object, user)
221 :ok
222 end
223
224 %User{} ->
225 with {:ok, _} <- User.delete(deleted_object) do
226 :ok
227 end
228 end
229
230 if result == :ok do
231 Notification.create_notifications(object)
232 {:ok, object, meta}
233 else
234 {:error, result}
235 end
236 end
237
238 # Nothing to do
239 def handle(object, meta) do
240 {:ok, object, meta}
241 end
242
243 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
244 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
245 actor = User.get_cached_by_ap_id(object.data["actor"])
246 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
247
248 streamables =
249 [[actor, recipient], [recipient, actor]]
250 |> Enum.map(fn [user, other_user] ->
251 if user.local do
252 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
253 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
254
255 {
256 ["user", "user:pleroma_chat"],
257 {user, %{cm_ref | chat: chat, object: object}}
258 }
259 end
260 end)
261 |> Enum.filter(& &1)
262
263 meta =
264 meta
265 |> add_streamables(streamables)
266
267 {:ok, object, meta}
268 end
269 end
270
271 def handle_object_creation(%{"type" => "Question"} = object, meta) do
272 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
273 {:ok, object, meta}
274 end
275 end
276
277 # Nothing to do
278 def handle_object_creation(object, meta) do
279 {:ok, object, meta}
280 end
281
282 defp undo_like(nil, object), do: delete_object(object)
283
284 defp undo_like(%Object{} = liked_object, object) do
285 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
286 delete_object(object)
287 end
288 end
289
290 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
291 object.data["object"]
292 |> Object.get_by_ap_id()
293 |> undo_like(object)
294 end
295
296 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
297 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
298 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
299 {:ok, _} <- Repo.delete(object) do
300 :ok
301 end
302 end
303
304 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
305 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
306 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
307 {:ok, _} <- Repo.delete(object) do
308 :ok
309 end
310 end
311
312 def handle_undoing(
313 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
314 ) do
315 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
316 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
317 {:ok, _} <- User.unblock(blocker, blocked),
318 {:ok, _} <- Repo.delete(object) do
319 :ok
320 end
321 end
322
323 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
324
325 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
326 defp delete_object(object) do
327 with {:ok, _} <- Repo.delete(object), do: :ok
328 end
329
330 defp send_notifications(meta) do
331 Keyword.get(meta, :notifications, [])
332 |> Enum.each(fn notification ->
333 Streamer.stream(["user", "user:notification"], notification)
334 Push.send(notification)
335 end)
336
337 meta
338 end
339
340 defp send_streamables(meta) do
341 Keyword.get(meta, :streamables, [])
342 |> Enum.each(fn {topics, items} ->
343 Streamer.stream(topics, items)
344 end)
345
346 meta
347 end
348
349 defp add_streamables(meta, streamables) do
350 existing = Keyword.get(meta, :streamables, [])
351
352 meta
353 |> Keyword.put(:streamables, streamables ++ existing)
354 end
355
356 defp add_notifications(meta, notifications) do
357 existing = Keyword.get(meta, :notifications, [])
358
359 meta
360 |> Keyword.put(:notifications, notifications ++ existing)
361 end
362
363 def handle_after_transaction(meta) do
364 meta
365 |> send_notifications()
366 |> send_streamables()
367 end
368 end