IO list, not concatenation
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Chat
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.Repo
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Pipeline
18 alias Pleroma.Web.ActivityPub.Utils
19 alias Pleroma.Web.Push
20 alias Pleroma.Web.Streamer
21
22 def handle(object, meta \\ [])
23
24 # Tasks this handles:
25 # - Unfollow and block
26 def handle(
27 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
28 object,
29 meta
30 ) do
31 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
32 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
33 User.block(blocker, blocked)
34 end
35
36 {:ok, object, meta}
37 end
38
39 # Tasks this handles:
40 # - Update the user
41 #
42 # For a local user, we also get a changeset with the full information, so we
43 # can update non-federating, non-activitypub settings as well.
44 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
45 if changeset = Keyword.get(meta, :user_update_changeset) do
46 changeset
47 |> User.update_and_set_cache()
48 else
49 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
50
51 User.get_by_ap_id(updated_object["id"])
52 |> User.remote_user_changeset(new_user_data)
53 |> User.update_and_set_cache()
54 end
55
56 {:ok, object, meta}
57 end
58
59 # Tasks this handles:
60 # - Add like to object
61 # - Set up notification
62 def handle(%{data: %{"type" => "Like"}} = object, meta) do
63 liked_object = Object.get_by_ap_id(object.data["object"])
64 Utils.add_like_to_object(object, liked_object)
65
66 Notification.create_notifications(object)
67
68 {:ok, object, meta}
69 end
70
71 # Tasks this handles
72 # - Actually create object
73 # - Rollback if we couldn't create it
74 # - Set up notifications
75 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
76 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
77 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
78
79 meta =
80 meta
81 |> add_notifications(notifications)
82
83 {:ok, activity, meta}
84 else
85 e -> Repo.rollback(e)
86 end
87 end
88
89 # Tasks this handles:
90 # - Add announce to object
91 # - Set up notification
92 # - Stream out the announce
93 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
94 announced_object = Object.get_by_ap_id(object.data["object"])
95 user = User.get_cached_by_ap_id(object.data["actor"])
96
97 Utils.add_announce_to_object(object, announced_object)
98
99 if !User.is_internal_user?(user) do
100 Notification.create_notifications(object)
101
102 object
103 |> Topics.get_activity_topics()
104 |> Streamer.stream(object)
105 end
106
107 {:ok, object, meta}
108 end
109
110 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
111 with undone_object <- Activity.get_by_ap_id(undone_object),
112 :ok <- handle_undoing(undone_object) do
113 {:ok, object, meta}
114 end
115 end
116
117 # Tasks this handles:
118 # - Add reaction to object
119 # - Set up notification
120 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
121 reacted_object = Object.get_by_ap_id(object.data["object"])
122 Utils.add_emoji_reaction_to_object(object, reacted_object)
123
124 Notification.create_notifications(object)
125
126 {:ok, object, meta}
127 end
128
129 # Tasks this handles:
130 # - Delete and unpins the create activity
131 # - Replace object with Tombstone
132 # - Set up notification
133 # - Reduce the user note count
134 # - Reduce the reply count
135 # - Stream out the activity
136 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
137 deleted_object =
138 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
139
140 result =
141 case deleted_object do
142 %Object{} ->
143 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
144 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
145 User.remove_pinnned_activity(user, activity)
146
147 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
148
149 if in_reply_to = deleted_object.data["inReplyTo"] do
150 Object.decrease_replies_count(in_reply_to)
151 end
152
153 MessageReference.delete_for_object(deleted_object)
154
155 ActivityPub.stream_out(object)
156 ActivityPub.stream_out_participations(deleted_object, user)
157 :ok
158 end
159
160 %User{} ->
161 with {:ok, _} <- User.delete(deleted_object) do
162 :ok
163 end
164 end
165
166 if result == :ok do
167 Notification.create_notifications(object)
168 {:ok, object, meta}
169 else
170 {:error, result}
171 end
172 end
173
174 # Nothing to do
175 def handle(object, meta) do
176 {:ok, object, meta}
177 end
178
179 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
180 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
181 actor = User.get_cached_by_ap_id(object.data["actor"])
182 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
183
184 streamables =
185 [[actor, recipient], [recipient, actor]]
186 |> Enum.map(fn [user, other_user] ->
187 if user.local do
188 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
189 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
190
191 {
192 ["user", "user:pleroma_chat"],
193 {user, %{cm_ref | chat: chat, object: object}}
194 }
195 end
196 end)
197 |> Enum.filter(& &1)
198
199 meta =
200 meta
201 |> add_streamables(streamables)
202
203 {:ok, object, meta}
204 end
205 end
206
207 # Nothing to do
208 def handle_object_creation(object) do
209 {:ok, object}
210 end
211
212 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
213 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
214 {:ok, _} <- Utils.remove_like_from_object(object, liked_object),
215 {:ok, _} <- Repo.delete(object) do
216 :ok
217 end
218 end
219
220 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
221 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
222 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
223 {:ok, _} <- Repo.delete(object) do
224 :ok
225 end
226 end
227
228 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
229 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
230 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
231 {:ok, _} <- Repo.delete(object) do
232 :ok
233 end
234 end
235
236 def handle_undoing(
237 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
238 ) do
239 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
240 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
241 {:ok, _} <- User.unblock(blocker, blocked),
242 {:ok, _} <- Repo.delete(object) do
243 :ok
244 end
245 end
246
247 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
248
249 defp send_notifications(meta) do
250 Keyword.get(meta, :notifications, [])
251 |> Enum.each(fn notification ->
252 Streamer.stream(["user", "user:notification"], notification)
253 Push.send(notification)
254 end)
255
256 meta
257 end
258
259 defp send_streamables(meta) do
260 Keyword.get(meta, :streamables, [])
261 |> Enum.each(fn {topics, items} ->
262 Streamer.stream(topics, items)
263 end)
264
265 meta
266 end
267
268 defp add_streamables(meta, streamables) do
269 existing = Keyword.get(meta, :streamables, [])
270
271 meta
272 |> Keyword.put(:streamables, streamables ++ existing)
273 end
274
275 defp add_notifications(meta, notifications) do
276 existing = Keyword.get(meta, :notifications, [])
277
278 meta
279 |> Keyword.put(:notifications, notifications ++ existing)
280 end
281
282 def handle_after_transaction(meta) do
283 meta
284 |> send_notifications()
285 |> send_streamables()
286 end
287 end