revert 4a94c9a31ef11f63ea71ad9c1f085c18cf8ef083
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.OAuth.Token
18 alias Pleroma.Web.Plugs.OAuthScopesPlug
19 alias Pleroma.Web.StreamerView
20
21 @mix_env Mix.env()
22 @registry Pleroma.Web.StreamerRegistry
23
24 def registry, do: @registry
25
26 @public_streams ["public", "public:local", "public:media", "public:local:media"]
27 @user_streams ["user", "user:notification", "direct"]
28
29 @doc "Expands and authorizes a stream, and registers the process for streaming."
30 @spec get_topic_and_add_socket(
31 stream :: String.t(),
32 User.t() | nil,
33 Token.t() | nil,
34 Map.t() | nil
35 ) ::
36 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
37 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
38 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
39 add_socket(topic, oauth_token)
40 end
41 end
42
43 @doc "Expand and authorizes a stream"
44 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
45 {:ok, topic :: String.t()} | {:error, :bad_topic}
46 def get_topic(stream, user, oauth_token, params \\ %{})
47
48 # Allow all public steams.
49 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
50 {:ok, stream}
51 end
52
53 # Allow all hashtags streams.
54 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
55 {:ok, "hashtag:" <> tag}
56 end
57
58 # Allow remote instance streams.
59 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
60 {:ok, "public:remote:" <> instance}
61 end
62
63 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
64 {:ok, "public:remote:media:" <> instance}
65 end
66
67 # Expand user streams.
68 def get_topic(
69 stream,
70 %User{id: user_id} = user,
71 %Token{user_id: user_id} = oauth_token,
72 _params
73 )
74 when stream in @user_streams do
75 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
76 required_scopes =
77 if stream == "user:notification" do
78 ["read:notifications"]
79 else
80 ["read:statuses"]
81 end
82
83 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
84 {:error, :unauthorized}
85 else
86 {:ok, stream <> ":" <> to_string(user.id)}
87 end
88 end
89
90 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
91 {:error, :unauthorized}
92 end
93
94 # List streams.
95 def get_topic(
96 "list",
97 %User{id: user_id} = user,
98 %Token{user_id: user_id} = oauth_token,
99 %{"list" => id}
100 ) do
101 cond do
102 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
103 {:error, :unauthorized}
104
105 Pleroma.List.get(id, user) ->
106 {:ok, "list:" <> to_string(id)}
107
108 true ->
109 {:error, :bad_topic}
110 end
111 end
112
113 def get_topic("list", _user, _oauth_token, _params) do
114 {:error, :unauthorized}
115 end
116
117 # mastodon multi-topic WS
118 def get_topic(nil, _user, _oauth_token, _params) do
119 {:ok, :multi}
120 end
121
122 def get_topic(_stream, _user, _oauth_token, _params) do
123 {:error, :bad_topic}
124 end
125
126 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
127 def add_socket(topic, oauth_token) do
128 if should_env_send?() do
129 oauth_token_id = if oauth_token, do: oauth_token.id, else: false
130 Registry.register(@registry, topic, oauth_token_id)
131 end
132
133 {:ok, topic}
134 end
135
136 def remove_socket(topic) do
137 if should_env_send?(), do: Registry.unregister(@registry, topic)
138 end
139
140 def stream(topics, items) do
141 if should_env_send?() do
142 for topic <- List.wrap(topics), item <- List.wrap(items) do
143 spawn(fn -> do_stream(topic, item) end)
144 end
145 end
146 end
147
148 def filtered_by_user?(user, item, streamed_type \\ :activity)
149
150 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
151 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
152 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
153
154 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
155 recipients = MapSet.new(item.recipients)
156 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
157
158 with parent <- Object.normalize(item, fetch: false) || item,
159 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
160 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
161 true <-
162 !(streamed_type == :activity && item.data["type"] == "Announce" &&
163 parent.data["actor"] == user.ap_id),
164 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
165 true <- MapSet.disjoint?(recipients, recipient_blocks),
166 %{host: item_host} <- URI.parse(item.actor),
167 %{host: parent_host} <- URI.parse(parent.data["actor"]),
168 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
169 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
170 true <- thread_containment(item, user),
171 false <- CommonAPI.thread_muted?(user, parent) do
172 false
173 else
174 _ -> true
175 end
176 end
177
178 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
179 filtered_by_user?(user, activity, :notification)
180 end
181
182 defp do_stream("direct", item) do
183 recipient_topics =
184 User.get_recipients_from_activity(item)
185 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
186
187 Enum.each(recipient_topics, fn user_topic ->
188 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
189 push_to_socket(user_topic, item)
190 end)
191 end
192
193 defp do_stream("follow_relationship", item) do
194 user_topic = "user:#{item.follower.id}"
195 text = StreamerView.render("follow_relationships_update.json", item, user_topic)
196
197 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
198
199 Registry.dispatch(@registry, user_topic, fn list ->
200 Enum.each(list, fn {pid, _auth} ->
201 send(pid, {:text, text})
202 end)
203 end)
204 end
205
206 defp do_stream("participation", participation) do
207 user_topic = "direct:#{participation.user_id}"
208 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
209
210 push_to_socket(user_topic, participation)
211 end
212
213 defp do_stream("list", item) do
214 # filter the recipient list if the activity is not public, see #270.
215 recipient_lists =
216 case Visibility.is_public?(item) do
217 true ->
218 Pleroma.List.get_lists_from_activity(item)
219
220 _ ->
221 Pleroma.List.get_lists_from_activity(item)
222 |> Enum.filter(fn list ->
223 owner = User.get_cached_by_id(list.user_id)
224
225 Visibility.visible_for_user?(item, owner)
226 end)
227 end
228
229 recipient_topics =
230 recipient_lists
231 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
232
233 Enum.each(recipient_topics, fn list_topic ->
234 Logger.debug("Trying to push message to #{list_topic}\n\n")
235 push_to_socket(list_topic, item)
236 end)
237 end
238
239 defp do_stream(topic, %Notification{} = item)
240 when topic in ["user", "user:notification"] do
241 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
242 Enum.each(list, fn {pid, _auth} ->
243 send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
244 end)
245 end)
246 end
247
248 defp do_stream("user", item) do
249 Logger.debug("Trying to push to users")
250
251 recipient_topics =
252 User.get_recipients_from_activity(item)
253 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
254
255 Enum.each(recipient_topics, fn topic ->
256 push_to_socket(topic, item)
257 end)
258 end
259
260 defp do_stream(topic, item) do
261 Logger.debug("Trying to push to #{topic}")
262 Logger.debug("Pushing item to #{topic}")
263 push_to_socket(topic, item)
264 end
265
266 defp push_to_socket(topic, %Participation{} = participation) do
267 rendered = StreamerView.render("conversation.json", participation, topic)
268
269 Registry.dispatch(@registry, topic, fn list ->
270 Enum.each(list, fn {pid, _} ->
271 send(pid, {:text, rendered})
272 end)
273 end)
274 end
275
276 defp push_to_socket(topic, %Activity{
277 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
278 }) do
279 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
280
281 Registry.dispatch(@registry, topic, fn list ->
282 Enum.each(list, fn {pid, _} ->
283 send(pid, {:text, rendered})
284 end)
285 end)
286 end
287
288 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
289
290 defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
291 create_activity =
292 Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
293 |> Map.put(:object, item.object)
294
295 anon_render = StreamerView.render("status_update.json", create_activity, topic)
296
297 Registry.dispatch(@registry, topic, fn list ->
298 Enum.each(list, fn {pid, auth?} ->
299 if auth? do
300 send(
301 pid,
302 {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
303 )
304 else
305 send(pid, {:text, anon_render})
306 end
307 end)
308 end)
309 end
310
311 defp push_to_socket(topic, item) do
312 anon_render = StreamerView.render("update.json", item, topic)
313
314 Registry.dispatch(@registry, topic, fn list ->
315 Enum.each(list, fn {pid, auth?} ->
316 if auth? do
317 send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
318 else
319 send(pid, {:text, anon_render})
320 end
321 end)
322 end)
323 end
324
325 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
326
327 defp thread_containment(activity, user) do
328 if Config.get([:instance, :skip_thread_containment]) do
329 true
330 else
331 ActivityPub.contain_activity(activity, user)
332 end
333 end
334
335 def close_streams_by_oauth_token(oauth_token) do
336 if should_env_send?() do
337 Registry.select(
338 @registry,
339 [
340 {
341 {:"$1", :"$2", :"$3"},
342 [{:==, :"$3", oauth_token.id}],
343 [:"$2"]
344 }
345 ]
346 )
347 |> Enum.each(fn pid -> send(pid, :close) end)
348 end
349 end
350
351 # In test environement, only return true if the registry is started.
352 # In benchmark environment, returns false.
353 # In any other environment, always returns true.
354 cond do
355 @mix_env == :test ->
356 def should_env_send? do
357 case Process.whereis(@registry) do
358 nil ->
359 false
360
361 pid ->
362 Process.alive?(pid)
363 end
364 end
365
366 @mix_env == :benchmark ->
367 def should_env_send?, do: false
368
369 true ->
370 def should_env_send?, do: true
371 end
372 end