Pipeline Ingestion: Article
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Chat
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.FollowingRelationship
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Repo
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ActivityPub
18 alias Pleroma.Web.ActivityPub.Builder
19 alias Pleroma.Web.ActivityPub.Pipeline
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Push
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Workers.BackgroundWorker
24
25 require Logger
26
27 def handle(object, meta \\ [])
28
29 # Task this handles
30 # - Follows
31 # - Sends a notification
32 def handle(
33 %{
34 data: %{
35 "actor" => actor,
36 "type" => "Accept",
37 "object" => follow_activity_id
38 }
39 } = object,
40 meta
41 ) do
42 with %Activity{actor: follower_id} = follow_activity <-
43 Activity.get_by_ap_id(follow_activity_id),
44 %User{} = followed <- User.get_cached_by_ap_id(actor),
45 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
46 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
47 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
48 Notification.update_notification_type(followed, follow_activity)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
51 end
52
53 {:ok, object, meta}
54 end
55
56 # Task this handles
57 # - Rejects all existing follow activities for this person
58 # - Updates the follow state
59 # - Dismisses notification
60 def handle(
61 %{
62 data: %{
63 "actor" => actor,
64 "type" => "Reject",
65 "object" => follow_activity_id
66 }
67 } = object,
68 meta
69 ) do
70 with %Activity{actor: follower_id} = follow_activity <-
71 Activity.get_by_ap_id(follow_activity_id),
72 %User{} = followed <- User.get_cached_by_ap_id(actor),
73 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
74 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
75 FollowingRelationship.update(follower, followed, :follow_reject)
76 Notification.dismiss(follow_activity)
77 end
78
79 {:ok, object, meta}
80 end
81
82 # Tasks this handle
83 # - Follows if possible
84 # - Sends a notification
85 # - Generates accept or reject if appropriate
86 def handle(
87 %{
88 data: %{
89 "id" => follow_id,
90 "type" => "Follow",
91 "object" => followed_user,
92 "actor" => following_user
93 }
94 } = object,
95 meta
96 ) do
97 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
98 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
99 {_, {:ok, _}, _, _} <-
100 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
101 if followed.local && !followed.locked do
102 {:ok, accept_data, _} = Builder.accept(followed, object)
103 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
104 end
105 else
106 {:following, {:error, _}, _follower, followed} ->
107 {:ok, reject_data, _} = Builder.reject(followed, object)
108 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
109
110 _ ->
111 nil
112 end
113
114 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
115
116 meta =
117 meta
118 |> add_notifications(notifications)
119
120 updated_object = Activity.get_by_ap_id(follow_id)
121
122 {:ok, updated_object, meta}
123 end
124
125 # Tasks this handles:
126 # - Unfollow and block
127 def handle(
128 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
129 object,
130 meta
131 ) do
132 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
133 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
134 User.block(blocker, blocked)
135 end
136
137 {:ok, object, meta}
138 end
139
140 # Tasks this handles:
141 # - Update the user
142 #
143 # For a local user, we also get a changeset with the full information, so we
144 # can update non-federating, non-activitypub settings as well.
145 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
146 if changeset = Keyword.get(meta, :user_update_changeset) do
147 changeset
148 |> User.update_and_set_cache()
149 else
150 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
151
152 User.get_by_ap_id(updated_object["id"])
153 |> User.remote_user_changeset(new_user_data)
154 |> User.update_and_set_cache()
155 end
156
157 {:ok, object, meta}
158 end
159
160 # Tasks this handles:
161 # - Add like to object
162 # - Set up notification
163 def handle(%{data: %{"type" => "Like"}} = object, meta) do
164 liked_object = Object.get_by_ap_id(object.data["object"])
165 Utils.add_like_to_object(object, liked_object)
166
167 Notification.create_notifications(object)
168
169 {:ok, object, meta}
170 end
171
172 # Tasks this handles
173 # - Actually create object
174 # - Rollback if we couldn't create it
175 # - Increase the user note count
176 # - Increase the reply count
177 # - Increase replies count
178 # - Set up ActivityExpiration
179 # - Set up notifications
180 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
181 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
182 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
183 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
184 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
185
186 if in_reply_to = object.data["inReplyTo"] do
187 Object.increase_replies_count(in_reply_to)
188 end
189
190 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
191
192 meta =
193 meta
194 |> add_notifications(notifications)
195
196 {:ok, activity, meta}
197 else
198 e -> Repo.rollback(e)
199 end
200 end
201
202 # Tasks this handles:
203 # - Add announce to object
204 # - Set up notification
205 # - Stream out the announce
206 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
207 announced_object = Object.get_by_ap_id(object.data["object"])
208 user = User.get_cached_by_ap_id(object.data["actor"])
209
210 Utils.add_announce_to_object(object, announced_object)
211
212 if !User.is_internal_user?(user) do
213 Notification.create_notifications(object)
214
215 object
216 |> Topics.get_activity_topics()
217 |> Streamer.stream(object)
218 end
219
220 {:ok, object, meta}
221 end
222
223 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
224 with undone_object <- Activity.get_by_ap_id(undone_object),
225 :ok <- handle_undoing(undone_object) do
226 {:ok, object, meta}
227 end
228 end
229
230 # Tasks this handles:
231 # - Add reaction to object
232 # - Set up notification
233 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
234 reacted_object = Object.get_by_ap_id(object.data["object"])
235 Utils.add_emoji_reaction_to_object(object, reacted_object)
236
237 Notification.create_notifications(object)
238
239 {:ok, object, meta}
240 end
241
242 # Tasks this handles:
243 # - Delete and unpins the create activity
244 # - Replace object with Tombstone
245 # - Set up notification
246 # - Reduce the user note count
247 # - Reduce the reply count
248 # - Stream out the activity
249 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
250 deleted_object =
251 Object.normalize(deleted_object, false) ||
252 User.get_cached_by_ap_id(deleted_object)
253
254 result =
255 case deleted_object do
256 %Object{} ->
257 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
258 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
259 %User{} = user <- User.get_cached_by_ap_id(actor) do
260 User.remove_pinnned_activity(user, activity)
261
262 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
263
264 if in_reply_to = deleted_object.data["inReplyTo"] do
265 Object.decrease_replies_count(in_reply_to)
266 end
267
268 MessageReference.delete_for_object(deleted_object)
269
270 ActivityPub.stream_out(object)
271 ActivityPub.stream_out_participations(deleted_object, user)
272 :ok
273 else
274 {:actor, _} ->
275 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
276 :no_object_actor
277 end
278
279 %User{} ->
280 with {:ok, _} <- User.delete(deleted_object) do
281 :ok
282 end
283 end
284
285 if result == :ok do
286 Notification.create_notifications(object)
287 {:ok, object, meta}
288 else
289 {:error, result}
290 end
291 end
292
293 # Nothing to do
294 def handle(object, meta) do
295 {:ok, object, meta}
296 end
297
298 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
299 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
300 actor = User.get_cached_by_ap_id(object.data["actor"])
301 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
302
303 streamables =
304 [[actor, recipient], [recipient, actor]]
305 |> Enum.map(fn [user, other_user] ->
306 if user.local do
307 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
308 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
309
310 {
311 ["user", "user:pleroma_chat"],
312 {user, %{cm_ref | chat: chat, object: object}}
313 }
314 end
315 end)
316 |> Enum.filter(& &1)
317
318 meta =
319 meta
320 |> add_streamables(streamables)
321
322 {:ok, object, meta}
323 end
324 end
325
326 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
327 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
328 Object.increase_vote_count(
329 object.data["inReplyTo"],
330 object.data["name"],
331 object.data["actor"]
332 )
333
334 {:ok, object, meta}
335 end
336 end
337
338 def handle_object_creation(%{"type" => objtype} = object, meta)
339 when objtype in ~w[Audio Video Question Event Article] do
340 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
341 {:ok, object, meta}
342 end
343 end
344
345 # Nothing to do
346 def handle_object_creation(object, meta) do
347 {:ok, object, meta}
348 end
349
350 defp undo_like(nil, object), do: delete_object(object)
351
352 defp undo_like(%Object{} = liked_object, object) do
353 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
354 delete_object(object)
355 end
356 end
357
358 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
359 object.data["object"]
360 |> Object.get_by_ap_id()
361 |> undo_like(object)
362 end
363
364 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
365 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
366 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
367 {:ok, _} <- Repo.delete(object) do
368 :ok
369 end
370 end
371
372 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
373 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
374 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
375 {:ok, _} <- Repo.delete(object) do
376 :ok
377 end
378 end
379
380 def handle_undoing(
381 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
382 ) do
383 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
384 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
385 {:ok, _} <- User.unblock(blocker, blocked),
386 {:ok, _} <- Repo.delete(object) do
387 :ok
388 end
389 end
390
391 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
392
393 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
394 defp delete_object(object) do
395 with {:ok, _} <- Repo.delete(object), do: :ok
396 end
397
398 defp send_notifications(meta) do
399 Keyword.get(meta, :notifications, [])
400 |> Enum.each(fn notification ->
401 Streamer.stream(["user", "user:notification"], notification)
402 Push.send(notification)
403 end)
404
405 meta
406 end
407
408 defp send_streamables(meta) do
409 Keyword.get(meta, :streamables, [])
410 |> Enum.each(fn {topics, items} ->
411 Streamer.stream(topics, items)
412 end)
413
414 meta
415 end
416
417 defp add_streamables(meta, streamables) do
418 existing = Keyword.get(meta, :streamables, [])
419
420 meta
421 |> Keyword.put(:streamables, streamables ++ existing)
422 end
423
424 defp add_notifications(meta, notifications) do
425 existing = Keyword.get(meta, :notifications, [])
426
427 meta
428 |> Keyword.put(:notifications, notifications ++ existing)
429 end
430
431 def handle_after_transaction(meta) do
432 meta
433 |> send_notifications()
434 |> send_streamables()
435 end
436 end