Merge branch 'fix-tests' into 'develop'
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.OAuth.Token
19 alias Pleroma.Web.Plugs.OAuthScopesPlug
20 alias Pleroma.Web.StreamerView
21
22 @mix_env Mix.env()
23 @registry Pleroma.Web.StreamerRegistry
24
25 def registry, do: @registry
26
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
29
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
32 stream :: String.t(),
33 User.t() | nil,
34 Token.t() | nil,
35 Map.t() | nil
36 ) ::
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
40 add_socket(topic, user)
41 end
42 end
43
44 @doc "Expand and authorizes a stream"
45 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
46 {:ok, topic :: String.t()} | {:error, :bad_topic}
47 def get_topic(stream, user, oauth_token, params \\ %{})
48
49 # Allow all public steams.
50 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
51 {:ok, stream}
52 end
53
54 # Allow all hashtags streams.
55 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
56 {:ok, "hashtag:" <> tag}
57 end
58
59 # Allow remote instance streams.
60 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
61 {:ok, "public:remote:" <> instance}
62 end
63
64 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
65 {:ok, "public:remote:media:" <> instance}
66 end
67
68 # Expand user streams.
69 def get_topic(
70 stream,
71 %User{id: user_id} = user,
72 %Token{user_id: user_id} = oauth_token,
73 _params
74 )
75 when stream in @user_streams do
76 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
77 required_scopes =
78 if stream == "user:notification" do
79 ["read:notifications"]
80 else
81 ["read:statuses"]
82 end
83
84 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
85 {:error, :unauthorized}
86 else
87 {:ok, stream <> ":" <> to_string(user.id)}
88 end
89 end
90
91 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
92 {:error, :unauthorized}
93 end
94
95 # List streams.
96 def get_topic(
97 "list",
98 %User{id: user_id} = user,
99 %Token{user_id: user_id} = oauth_token,
100 %{"list" => id}
101 ) do
102 cond do
103 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
104 {:error, :unauthorized}
105
106 Pleroma.List.get(id, user) ->
107 {:ok, "list:" <> to_string(id)}
108
109 true ->
110 {:error, :bad_topic}
111 end
112 end
113
114 def get_topic("list", _user, _oauth_token, _params) do
115 {:error, :unauthorized}
116 end
117
118 def get_topic(_stream, _user, _oauth_token, _params) do
119 {:error, :bad_topic}
120 end
121
122 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
123 def add_socket(topic, user) do
124 if should_env_send?() do
125 auth? = if user, do: true
126 Registry.register(@registry, topic, auth?)
127 end
128
129 {:ok, topic}
130 end
131
132 def remove_socket(topic) do
133 if should_env_send?(), do: Registry.unregister(@registry, topic)
134 end
135
136 def stream(topics, items) do
137 if should_env_send?() do
138 for topic <- List.wrap(topics), item <- List.wrap(items) do
139 spawn(fn -> do_stream(topic, item) end)
140 end
141 end
142 end
143
144 def filtered_by_user?(user, item, streamed_type \\ :activity)
145
146 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
147 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
148 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
149
150 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
151 recipients = MapSet.new(item.recipients)
152 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
153
154 with parent <- Object.normalize(item, fetch: false) || item,
155 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
156 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
157 true <-
158 !(streamed_type == :activity && item.data["type"] == "Announce" &&
159 parent.data["actor"] == user.ap_id),
160 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
161 true <- MapSet.disjoint?(recipients, recipient_blocks),
162 %{host: item_host} <- URI.parse(item.actor),
163 %{host: parent_host} <- URI.parse(parent.data["actor"]),
164 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
165 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
166 true <- thread_containment(item, user),
167 false <- CommonAPI.thread_muted?(user, parent) do
168 false
169 else
170 _ -> true
171 end
172 end
173
174 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
175 filtered_by_user?(user, activity, :notification)
176 end
177
178 defp do_stream("direct", item) do
179 recipient_topics =
180 User.get_recipients_from_activity(item)
181 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
182
183 Enum.each(recipient_topics, fn user_topic ->
184 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
185 push_to_socket(user_topic, item)
186 end)
187 end
188
189 defp do_stream("follow_relationship", item) do
190 text = StreamerView.render("follow_relationships_update.json", item)
191 user_topic = "user:#{item.follower.id}"
192
193 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
194
195 Registry.dispatch(@registry, user_topic, fn list ->
196 Enum.each(list, fn {pid, _auth} ->
197 send(pid, {:text, text})
198 end)
199 end)
200 end
201
202 defp do_stream("participation", participation) do
203 user_topic = "direct:#{participation.user_id}"
204 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
205
206 push_to_socket(user_topic, participation)
207 end
208
209 defp do_stream("list", item) do
210 # filter the recipient list if the activity is not public, see #270.
211 recipient_lists =
212 case Visibility.is_public?(item) do
213 true ->
214 Pleroma.List.get_lists_from_activity(item)
215
216 _ ->
217 Pleroma.List.get_lists_from_activity(item)
218 |> Enum.filter(fn list ->
219 owner = User.get_cached_by_id(list.user_id)
220
221 Visibility.visible_for_user?(item, owner)
222 end)
223 end
224
225 recipient_topics =
226 recipient_lists
227 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
228
229 Enum.each(recipient_topics, fn list_topic ->
230 Logger.debug("Trying to push message to #{list_topic}\n\n")
231 push_to_socket(list_topic, item)
232 end)
233 end
234
235 defp do_stream(topic, %Notification{} = item)
236 when topic in ["user", "user:notification"] do
237 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
238 Enum.each(list, fn {pid, _auth} ->
239 send(pid, {:render_with_user, StreamerView, "notification.json", item})
240 end)
241 end)
242 end
243
244 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
245 when topic in ["user", "user:pleroma_chat"] do
246 topic = "#{topic}:#{user.id}"
247
248 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
249
250 Registry.dispatch(@registry, topic, fn list ->
251 Enum.each(list, fn {pid, _auth} ->
252 send(pid, {:text, text})
253 end)
254 end)
255 end
256
257 defp do_stream("user", item) do
258 Logger.debug("Trying to push to users")
259
260 recipient_topics =
261 User.get_recipients_from_activity(item)
262 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
263
264 Enum.each(recipient_topics, fn topic ->
265 push_to_socket(topic, item)
266 end)
267 end
268
269 defp do_stream(topic, item) do
270 Logger.debug("Trying to push to #{topic}")
271 Logger.debug("Pushing item to #{topic}")
272 push_to_socket(topic, item)
273 end
274
275 defp push_to_socket(topic, %Participation{} = participation) do
276 rendered = StreamerView.render("conversation.json", participation)
277
278 Registry.dispatch(@registry, topic, fn list ->
279 Enum.each(list, fn {pid, _} ->
280 send(pid, {:text, rendered})
281 end)
282 end)
283 end
284
285 defp push_to_socket(topic, %Activity{
286 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
287 }) do
288 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
289
290 Registry.dispatch(@registry, topic, fn list ->
291 Enum.each(list, fn {pid, _} ->
292 send(pid, {:text, rendered})
293 end)
294 end)
295 end
296
297 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
298
299 defp push_to_socket(topic, item) do
300 anon_render = StreamerView.render("update.json", item)
301
302 Registry.dispatch(@registry, topic, fn list ->
303 Enum.each(list, fn {pid, auth?} ->
304 if auth? do
305 send(pid, {:render_with_user, StreamerView, "update.json", item})
306 else
307 send(pid, {:text, anon_render})
308 end
309 end)
310 end)
311 end
312
313 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
314
315 defp thread_containment(activity, user) do
316 if Config.get([:instance, :skip_thread_containment]) do
317 true
318 else
319 ActivityPub.contain_activity(activity, user)
320 end
321 end
322
323 # In test environement, only return true if the registry is started.
324 # In benchmark environment, returns false.
325 # In any other environment, always returns true.
326 cond do
327 @mix_env == :test ->
328 def should_env_send? do
329 case Process.whereis(@registry) do
330 nil ->
331 false
332
333 pid ->
334 Process.alive?(pid)
335 end
336 end
337
338 @mix_env == :benchmark ->
339 def should_env_send?, do: false
340
341 true ->
342 def should_env_send?, do: true
343 end
344 end