Separate Subscription Notifications from regular Notifications
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 use GenServer
7 require Logger
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.SubscriptionNotification
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.MastodonAPI.NotificationView
19
20 @keepalive_interval :timer.seconds(30)
21
22 def start_link(_) do
23 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
24 end
25
26 def add_socket(topic, socket) do
27 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
28 end
29
30 def remove_socket(topic, socket) do
31 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
32 end
33
34 def stream(topic, item) do
35 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
36 end
37
38 def init(args) do
39 Process.send_after(self(), %{action: :ping}, @keepalive_interval)
40
41 {:ok, args}
42 end
43
44 def handle_info(%{action: :ping}, topics) do
45 topics
46 |> Map.values()
47 |> List.flatten()
48 |> Enum.each(fn socket ->
49 Logger.debug("Sending keepalive ping")
50 send(socket.transport_pid, {:text, ""})
51 end)
52
53 Process.send_after(self(), %{action: :ping}, @keepalive_interval)
54
55 {:noreply, topics}
56 end
57
58 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
59 recipient_topics =
60 User.get_recipients_from_activity(item)
61 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62
63 Enum.each(recipient_topics || [], fn user_topic ->
64 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
65 push_to_socket(topics, user_topic, item)
66 end)
67
68 {:noreply, topics}
69 end
70
71 def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
72 user_topic = "direct:#{participation.user_id}"
73 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
74
75 push_to_socket(topics, user_topic, participation)
76
77 {:noreply, topics}
78 end
79
80 def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
81 # filter the recipient list if the activity is not public, see #270.
82 recipient_lists =
83 case Visibility.is_public?(item) do
84 true ->
85 Pleroma.List.get_lists_from_activity(item)
86
87 _ ->
88 Pleroma.List.get_lists_from_activity(item)
89 |> Enum.filter(fn list ->
90 owner = User.get_cached_by_id(list.user_id)
91
92 Visibility.visible_for_user?(item, owner)
93 end)
94 end
95
96 recipient_topics =
97 recipient_lists
98 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
99
100 Enum.each(recipient_topics || [], fn list_topic ->
101 Logger.debug("Trying to push message to #{list_topic}\n\n")
102 push_to_socket(topics, list_topic, item)
103 end)
104
105 {:noreply, topics}
106 end
107
108 def handle_cast(
109 %{action: :stream, topic: topic, item: %Notification{} = item},
110 topics
111 )
112 when topic in ["user", "user:notification"] do
113 topics
114 |> Map.get("#{topic}:#{item.user_id}", [])
115 |> Enum.each(fn socket ->
116 with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
117 true <- should_send?(user, item) do
118 send(
119 socket.transport_pid,
120 {:text, represent_notification(socket.assigns[:user], item)}
121 )
122 end
123 end)
124
125 {:noreply, topics}
126 end
127
128 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
129 Logger.debug("Trying to push to users")
130
131 recipient_topics =
132 User.get_recipients_from_activity(item)
133 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
134
135 Enum.each(recipient_topics, fn topic ->
136 push_to_socket(topics, topic, item)
137 end)
138
139 {:noreply, topics}
140 end
141
142 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
143 Logger.debug("Trying to push to #{topic}")
144 Logger.debug("Pushing item to #{topic}")
145 push_to_socket(topics, topic, item)
146 {:noreply, topics}
147 end
148
149 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
150 topic = internal_topic(topic, socket)
151 sockets_for_topic = sockets[topic] || []
152 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
153 sockets = Map.put(sockets, topic, sockets_for_topic)
154 Logger.debug("Got new conn for #{topic}")
155 {:noreply, sockets}
156 end
157
158 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
159 topic = internal_topic(topic, socket)
160 sockets_for_topic = sockets[topic] || []
161 sockets_for_topic = List.delete(sockets_for_topic, socket)
162 sockets = Map.put(sockets, topic, sockets_for_topic)
163 Logger.debug("Removed conn for #{topic}")
164 {:noreply, sockets}
165 end
166
167 def handle_cast(m, state) do
168 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
169 {:noreply, state}
170 end
171
172 defp represent_update(%Activity{} = activity, %User{} = user) do
173 %{
174 event: "update",
175 payload:
176 Pleroma.Web.MastodonAPI.StatusView.render(
177 "status.json",
178 activity: activity,
179 for: user
180 )
181 |> Jason.encode!()
182 }
183 |> Jason.encode!()
184 end
185
186 defp represent_update(%Activity{} = activity) do
187 %{
188 event: "update",
189 payload:
190 Pleroma.Web.MastodonAPI.StatusView.render(
191 "status.json",
192 activity: activity
193 )
194 |> Jason.encode!()
195 }
196 |> Jason.encode!()
197 end
198
199 def represent_conversation(%Participation{} = participation) do
200 %{
201 event: "conversation",
202 payload:
203 Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
204 participation: participation,
205 for: participation.user
206 })
207 |> Jason.encode!()
208 }
209 |> Jason.encode!()
210 end
211
212 @spec represent_notification(User.t(), Notification.t() | %SubscriptionNotification{}) ::
213 binary()
214 defp represent_notification(%User{} = user, notify) do
215 event =
216 case notify do
217 %Notification{} -> "notification"
218 %SubscriptionNotification{} -> "subscription_norification"
219 end
220
221 %{
222 event: event,
223 payload:
224 NotificationView.render(
225 "show.json",
226 %{notification: notify, for: user}
227 )
228 |> Jason.encode!()
229 }
230 |> Jason.encode!()
231 end
232
233 defp should_send?(%User{} = user, %Activity{} = item) do
234 blocks = user.info.blocks || []
235 mutes = user.info.mutes || []
236 reblog_mutes = user.info.muted_reblogs || []
237 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
238
239 with parent when not is_nil(parent) <- Object.normalize(item),
240 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
241 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
242 %{host: item_host} <- URI.parse(item.actor),
243 %{host: parent_host} <- URI.parse(parent.data["actor"]),
244 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
245 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
246 true <- thread_containment(item, user),
247 false <- CommonAPI.thread_muted?(user, item) do
248 true
249 else
250 _ -> false
251 end
252 end
253
254 defp should_send?(%User{} = user, %Notification{activity: activity}) do
255 should_send?(user, activity)
256 end
257
258 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
259 Enum.each(topics[topic] || [], fn socket ->
260 # Get the current user so we have up-to-date blocks etc.
261 if socket.assigns[:user] do
262 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
263
264 if should_send?(user, item) do
265 send(socket.transport_pid, {:text, represent_update(item, user)})
266 end
267 else
268 send(socket.transport_pid, {:text, represent_update(item)})
269 end
270 end)
271 end
272
273 def push_to_socket(topics, topic, %Participation{} = participation) do
274 Enum.each(topics[topic] || [], fn socket ->
275 send(socket.transport_pid, {:text, represent_conversation(participation)})
276 end)
277 end
278
279 def push_to_socket(topics, topic, %Activity{
280 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
281 }) do
282 Enum.each(topics[topic] || [], fn socket ->
283 send(
284 socket.transport_pid,
285 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
286 )
287 end)
288 end
289
290 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
291
292 def push_to_socket(topics, topic, item) do
293 Enum.each(topics[topic] || [], fn socket ->
294 # Get the current user so we have up-to-date blocks etc.
295 if socket.assigns[:user] do
296 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
297 blocks = user.info.blocks || []
298 mutes = user.info.mutes || []
299
300 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
301 true <- thread_containment(item, user) do
302 send(socket.transport_pid, {:text, represent_update(item, user)})
303 end
304 else
305 send(socket.transport_pid, {:text, represent_update(item)})
306 end
307 end)
308 end
309
310 defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
311 "#{topic}:#{socket.assigns[:user].id}"
312 end
313
314 defp internal_topic(topic, _), do: topic
315
316 @spec thread_containment(Activity.t(), User.t()) :: boolean()
317 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
318
319 defp thread_containment(activity, user) do
320 if Config.get([:instance, :skip_thread_containment]) do
321 true
322 else
323 ActivityPub.contain_activity(activity, user)
324 end
325 end
326 end