fix tests
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Chat
10 alias Pleroma.Chat.MessageReference
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Repo
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Pipeline
17 alias Pleroma.Web.ActivityPub.Utils
18 alias Pleroma.Web.Push
19 alias Pleroma.Web.Streamer
20
21 def handle(object, meta \\ [])
22
23 # Tasks this handles:
24 # - Add like to object
25 # - Set up notification
26 def handle(%{data: %{"type" => "Like"}} = object, meta) do
27 liked_object = Object.get_by_ap_id(object.data["object"])
28 Utils.add_like_to_object(object, liked_object)
29
30 Notification.create_notifications(object)
31
32 {:ok, object, meta}
33 end
34
35 # Tasks this handles
36 # - Actually create object
37 # - Rollback if we couldn't create it
38 # - Set up notifications
39 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
40 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
41 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
42
43 meta =
44 meta
45 |> add_notifications(notifications)
46
47 {:ok, activity, meta}
48 else
49 e -> Repo.rollback(e)
50 end
51 end
52
53 # Tasks this handles:
54 # - Add announce to object
55 # - Set up notification
56 # - Stream out the announce
57 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
58 announced_object = Object.get_by_ap_id(object.data["object"])
59 user = User.get_cached_by_ap_id(object.data["actor"])
60
61 Utils.add_announce_to_object(object, announced_object)
62
63 if !User.is_internal_user?(user) do
64 Notification.create_notifications(object)
65 ActivityPub.stream_out(object)
66 end
67
68 {:ok, object, meta}
69 end
70
71 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
72 with undone_object <- Activity.get_by_ap_id(undone_object),
73 :ok <- handle_undoing(undone_object) do
74 {:ok, object, meta}
75 end
76 end
77
78 # Tasks this handles:
79 # - Add reaction to object
80 # - Set up notification
81 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
82 reacted_object = Object.get_by_ap_id(object.data["object"])
83 Utils.add_emoji_reaction_to_object(object, reacted_object)
84
85 Notification.create_notifications(object)
86
87 {:ok, object, meta}
88 end
89
90 # Tasks this handles:
91 # - Delete and unpins the create activity
92 # - Replace object with Tombstone
93 # - Set up notification
94 # - Reduce the user note count
95 # - Reduce the reply count
96 # - Stream out the activity
97 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
98 deleted_object =
99 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
100
101 result =
102 case deleted_object do
103 %Object{} ->
104 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
105 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
106 User.remove_pinnned_activity(user, activity)
107
108 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
109
110 if in_reply_to = deleted_object.data["inReplyTo"] do
111 Object.decrease_replies_count(in_reply_to)
112 end
113
114 MessageReference.delete_for_object(deleted_object)
115
116 ActivityPub.stream_out(object)
117 ActivityPub.stream_out_participations(deleted_object, user)
118 :ok
119 end
120
121 %User{} ->
122 with {:ok, _} <- User.delete(deleted_object) do
123 :ok
124 end
125 end
126
127 if result == :ok do
128 Notification.create_notifications(object)
129 {:ok, object, meta}
130 else
131 {:error, result}
132 end
133 end
134
135 # Nothing to do
136 def handle(object, meta) do
137 {:ok, object, meta}
138 end
139
140 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
141 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
142 actor = User.get_cached_by_ap_id(object.data["actor"])
143 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
144
145 streamables =
146 [[actor, recipient], [recipient, actor]]
147 |> Enum.map(fn [user, other_user] ->
148 if user.local do
149 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
150 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
151
152 {
153 ["user", "user:pleroma_chat"],
154 {user, %{cm_ref | chat: chat, object: object}}
155 }
156 end
157 end)
158 |> Enum.filter(& &1)
159
160 meta =
161 meta
162 |> add_streamables(streamables)
163
164 {:ok, object, meta}
165 end
166 end
167
168 # Nothing to do
169 def handle_object_creation(object) do
170 {:ok, object}
171 end
172
173 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
174 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
175 {:ok, _} <- Utils.remove_like_from_object(object, liked_object),
176 {:ok, _} <- Repo.delete(object) do
177 :ok
178 end
179 end
180
181 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
182 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
183 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
184 {:ok, _} <- Repo.delete(object) do
185 :ok
186 end
187 end
188
189 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
190 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
191 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
192 {:ok, _} <- Repo.delete(object) do
193 :ok
194 end
195 end
196
197 def handle_undoing(
198 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
199 ) do
200 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
201 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
202 {:ok, _} <- User.unblock(blocker, blocked),
203 {:ok, _} <- Repo.delete(object) do
204 :ok
205 end
206 end
207
208 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
209
210 defp send_notifications(meta) do
211 Keyword.get(meta, :notifications, [])
212 |> Enum.each(fn notification ->
213 Streamer.stream(["user", "user:notification"], notification)
214 Push.send(notification)
215 end)
216
217 meta
218 end
219
220 defp send_streamables(meta) do
221 Keyword.get(meta, :streamables, [])
222 |> Enum.each(fn {topics, items} ->
223 Streamer.stream(topics, items)
224 end)
225
226 meta
227 end
228
229 defp add_streamables(meta, streamables) do
230 existing = Keyword.get(meta, :streamables, [])
231
232 meta
233 |> Keyword.put(:streamables, streamables ++ existing)
234 end
235
236 defp add_notifications(meta, notifications) do
237 existing = Keyword.get(meta, :notifications, [])
238
239 meta
240 |> Keyword.put(:notifications, notifications ++ existing)
241 end
242
243 def handle_after_transaction(meta) do
244 meta
245 |> send_notifications()
246 |> send_streamables()
247 end
248 end