Merge branch 'chore/remove-reference-to-distsn' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
1 defmodule Pleroma.Web.ActivityPub.SideEffects do
2 @moduledoc """
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
6 collection, and so on.
7 """
8 alias Pleroma.Activity
9 alias Pleroma.Chat
10 alias Pleroma.Chat.MessageReference
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Repo
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Pipeline
17 alias Pleroma.Web.ActivityPub.Utils
18 alias Pleroma.Web.Push
19 alias Pleroma.Web.Streamer
20
21 def handle(object, meta \\ [])
22
23 # Tasks this handles:
24 # - Update the user
25 #
26 # For a local user, we also get a changeset with the full information, so we
27 # can update non-federating, non-activitypub settings as well.
28 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
29 if changeset = Keyword.get(meta, :user_update_changeset) do
30 changeset
31 |> User.update_and_set_cache()
32 else
33 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
34
35 User.get_by_ap_id(updated_object["id"])
36 |> User.remote_user_changeset(new_user_data)
37 |> User.update_and_set_cache()
38 end
39
40 {:ok, object, meta}
41 end
42
43 # Tasks this handles:
44 # - Add like to object
45 # - Set up notification
46 def handle(%{data: %{"type" => "Like"}} = object, meta) do
47 liked_object = Object.get_by_ap_id(object.data["object"])
48 Utils.add_like_to_object(object, liked_object)
49
50 Notification.create_notifications(object)
51
52 {:ok, object, meta}
53 end
54
55 # Tasks this handles
56 # - Actually create object
57 # - Rollback if we couldn't create it
58 # - Set up notifications
59 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
60 with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
61 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
62
63 meta =
64 meta
65 |> add_notifications(notifications)
66
67 {:ok, activity, meta}
68 else
69 e -> Repo.rollback(e)
70 end
71 end
72
73 # Tasks this handles:
74 # - Add announce to object
75 # - Set up notification
76 # - Stream out the announce
77 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
78 announced_object = Object.get_by_ap_id(object.data["object"])
79 user = User.get_cached_by_ap_id(object.data["actor"])
80
81 Utils.add_announce_to_object(object, announced_object)
82
83 if !User.is_internal_user?(user) do
84 Notification.create_notifications(object)
85 ActivityPub.stream_out(object)
86 end
87
88 {:ok, object, meta}
89 end
90
91 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
92 with undone_object <- Activity.get_by_ap_id(undone_object),
93 :ok <- handle_undoing(undone_object) do
94 {:ok, object, meta}
95 end
96 end
97
98 # Tasks this handles:
99 # - Add reaction to object
100 # - Set up notification
101 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
102 reacted_object = Object.get_by_ap_id(object.data["object"])
103 Utils.add_emoji_reaction_to_object(object, reacted_object)
104
105 Notification.create_notifications(object)
106
107 {:ok, object, meta}
108 end
109
110 # Tasks this handles:
111 # - Delete and unpins the create activity
112 # - Replace object with Tombstone
113 # - Set up notification
114 # - Reduce the user note count
115 # - Reduce the reply count
116 # - Stream out the activity
117 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
118 deleted_object =
119 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
120
121 result =
122 case deleted_object do
123 %Object{} ->
124 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
125 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
126 User.remove_pinnned_activity(user, activity)
127
128 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
129
130 if in_reply_to = deleted_object.data["inReplyTo"] do
131 Object.decrease_replies_count(in_reply_to)
132 end
133
134 MessageReference.delete_for_object(deleted_object)
135
136 ActivityPub.stream_out(object)
137 ActivityPub.stream_out_participations(deleted_object, user)
138 :ok
139 end
140
141 %User{} ->
142 with {:ok, _} <- User.delete(deleted_object) do
143 :ok
144 end
145 end
146
147 if result == :ok do
148 Notification.create_notifications(object)
149 {:ok, object, meta}
150 else
151 {:error, result}
152 end
153 end
154
155 # Nothing to do
156 def handle(object, meta) do
157 {:ok, object, meta}
158 end
159
160 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
161 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
162 actor = User.get_cached_by_ap_id(object.data["actor"])
163 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
164
165 streamables =
166 [[actor, recipient], [recipient, actor]]
167 |> Enum.map(fn [user, other_user] ->
168 if user.local do
169 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
170 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
171
172 {
173 ["user", "user:pleroma_chat"],
174 {user, %{cm_ref | chat: chat, object: object}}
175 }
176 end
177 end)
178 |> Enum.filter(& &1)
179
180 meta =
181 meta
182 |> add_streamables(streamables)
183
184 {:ok, object, meta}
185 end
186 end
187
188 # Nothing to do
189 def handle_object_creation(object) do
190 {:ok, object}
191 end
192
193 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
194 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
195 {:ok, _} <- Utils.remove_like_from_object(object, liked_object),
196 {:ok, _} <- Repo.delete(object) do
197 :ok
198 end
199 end
200
201 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
202 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
203 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
204 {:ok, _} <- Repo.delete(object) do
205 :ok
206 end
207 end
208
209 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
210 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
211 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
212 {:ok, _} <- Repo.delete(object) do
213 :ok
214 end
215 end
216
217 def handle_undoing(
218 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
219 ) do
220 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
221 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
222 {:ok, _} <- User.unblock(blocker, blocked),
223 {:ok, _} <- Repo.delete(object) do
224 :ok
225 end
226 end
227
228 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
229
230 defp send_notifications(meta) do
231 Keyword.get(meta, :notifications, [])
232 |> Enum.each(fn notification ->
233 Streamer.stream(["user", "user:notification"], notification)
234 Push.send(notification)
235 end)
236
237 meta
238 end
239
240 defp send_streamables(meta) do
241 Keyword.get(meta, :streamables, [])
242 |> Enum.each(fn {topics, items} ->
243 Streamer.stream(topics, items)
244 end)
245
246 meta
247 end
248
249 defp add_streamables(meta, streamables) do
250 existing = Keyword.get(meta, :streamables, [])
251
252 meta
253 |> Keyword.put(:streamables, streamables ++ existing)
254 end
255
256 defp add_notifications(meta, notifications) do
257 existing = Keyword.get(meta, :notifications, [])
258
259 meta
260 |> Keyword.put(:notifications, notifications ++ existing)
261 end
262
263 def handle_after_transaction(meta) do
264 meta
265 |> send_notifications()
266 |> send_streamables()
267 end
268 end