Merge remote-tracking branch 'upstream/develop' into restrict-domain
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.OAuth.Token
19 alias Pleroma.Web.Plugs.OAuthScopesPlug
20 alias Pleroma.Web.StreamerView
21
22 @mix_env Mix.env()
23 @registry Pleroma.Web.StreamerRegistry
24
25 def registry, do: @registry
26
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
29
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
32 stream :: String.t(),
33 User.t() | nil,
34 Token.t() | nil,
35 Map.t() | nil
36 ) ::
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 case get_topic(stream, user, oauth_token, params) do
40 {:ok, topic} -> add_socket(topic, user)
41 error -> error
42 end
43 end
44
45 @doc "Expand and authorizes a stream"
46 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
47 {:ok, topic :: String.t()} | {:error, :bad_topic}
48 def get_topic(stream, user, oauth_token, params \\ %{})
49
50 # Allow all public steams.
51 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
52 {:ok, stream}
53 end
54
55 # Allow all hashtags streams.
56 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
57 {:ok, "hashtag:" <> tag}
58 end
59
60 # Allow remote instance streams.
61 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
62 {:ok, "public:remote:" <> instance}
63 end
64
65 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
66 {:ok, "public:remote:media:" <> instance}
67 end
68
69 # Expand user streams.
70 def get_topic(
71 stream,
72 %User{id: user_id} = user,
73 %Token{user_id: token_user_id} = oauth_token,
74 _params
75 )
76 when stream in @user_streams and user_id == token_user_id do
77 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
78 required_scopes =
79 if stream == "user:notification" do
80 ["read:notifications"]
81 else
82 ["read:statuses"]
83 end
84
85 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
86 {:error, :unauthorized}
87 else
88 {:ok, stream <> ":" <> to_string(user.id)}
89 end
90 end
91
92 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
93 {:error, :unauthorized}
94 end
95
96 # List streams.
97 def get_topic(
98 "list",
99 %User{id: user_id} = user,
100 %Token{user_id: token_user_id} = oauth_token,
101 %{"list" => id}
102 )
103 when user_id == token_user_id do
104 cond do
105 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
106 {:error, :unauthorized}
107
108 Pleroma.List.get(id, user) ->
109 {:ok, "list:" <> to_string(id)}
110
111 true ->
112 {:error, :bad_topic}
113 end
114 end
115
116 def get_topic("list", _user, _oauth_token, _params) do
117 {:error, :unauthorized}
118 end
119
120 def get_topic(_stream, _user, _oauth_token, _params) do
121 {:error, :bad_topic}
122 end
123
124 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
125 def add_socket(topic, user) do
126 if should_env_send?() do
127 auth? = if user, do: true
128 Registry.register(@registry, topic, auth?)
129 end
130
131 {:ok, topic}
132 end
133
134 def remove_socket(topic) do
135 if should_env_send?(), do: Registry.unregister(@registry, topic)
136 end
137
138 def stream(topics, items) do
139 if should_env_send?() do
140 List.wrap(topics)
141 |> Enum.each(fn topic ->
142 List.wrap(items)
143 |> Enum.each(fn item ->
144 spawn(fn -> do_stream(topic, item) end)
145 end)
146 end)
147 end
148
149 :ok
150 end
151
152 def filtered_by_user?(user, item, streamed_type \\ :activity)
153
154 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
155 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
156 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
157
158 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
159 recipients = MapSet.new(item.recipients)
160 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
161
162 with parent <- Object.normalize(item) || item,
163 true <-
164 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
165 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
166 true <-
167 !(streamed_type == :activity && item.data["type"] == "Announce" &&
168 parent.data["actor"] == user.ap_id),
169 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
170 true <- MapSet.disjoint?(recipients, recipient_blocks),
171 %{host: item_host} <- URI.parse(item.actor),
172 %{host: parent_host} <- URI.parse(parent.data["actor"]),
173 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
174 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
175 true <- thread_containment(item, user),
176 false <- CommonAPI.thread_muted?(user, parent) do
177 false
178 else
179 _ -> true
180 end
181 end
182
183 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
184 filtered_by_user?(user, activity, :notification)
185 end
186
187 defp do_stream("direct", item) do
188 recipient_topics =
189 User.get_recipients_from_activity(item)
190 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
191
192 Enum.each(recipient_topics, fn user_topic ->
193 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
194 push_to_socket(user_topic, item)
195 end)
196 end
197
198 defp do_stream("participation", participation) do
199 user_topic = "direct:#{participation.user_id}"
200 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
201
202 push_to_socket(user_topic, participation)
203 end
204
205 defp do_stream("list", item) do
206 # filter the recipient list if the activity is not public, see #270.
207 recipient_lists =
208 case Visibility.is_public?(item) do
209 true ->
210 Pleroma.List.get_lists_from_activity(item)
211
212 _ ->
213 Pleroma.List.get_lists_from_activity(item)
214 |> Enum.filter(fn list ->
215 owner = User.get_cached_by_id(list.user_id)
216
217 Visibility.visible_for_user?(item, owner)
218 end)
219 end
220
221 recipient_topics =
222 recipient_lists
223 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
224
225 Enum.each(recipient_topics, fn list_topic ->
226 Logger.debug("Trying to push message to #{list_topic}\n\n")
227 push_to_socket(list_topic, item)
228 end)
229 end
230
231 defp do_stream(topic, %Notification{} = item)
232 when topic in ["user", "user:notification"] do
233 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
234 Enum.each(list, fn {pid, _auth} ->
235 send(pid, {:render_with_user, StreamerView, "notification.json", item})
236 end)
237 end)
238 end
239
240 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
241 when topic in ["user", "user:pleroma_chat"] do
242 topic = "#{topic}:#{user.id}"
243
244 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
245
246 Registry.dispatch(@registry, topic, fn list ->
247 Enum.each(list, fn {pid, _auth} ->
248 send(pid, {:text, text})
249 end)
250 end)
251 end
252
253 defp do_stream("user", item) do
254 Logger.debug("Trying to push to users")
255
256 recipient_topics =
257 User.get_recipients_from_activity(item)
258 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
259
260 Enum.each(recipient_topics, fn topic ->
261 push_to_socket(topic, item)
262 end)
263 end
264
265 defp do_stream(topic, item) do
266 Logger.debug("Trying to push to #{topic}")
267 Logger.debug("Pushing item to #{topic}")
268 push_to_socket(topic, item)
269 end
270
271 defp push_to_socket(topic, %Participation{} = participation) do
272 rendered = StreamerView.render("conversation.json", participation)
273
274 Registry.dispatch(@registry, topic, fn list ->
275 Enum.each(list, fn {pid, _} ->
276 send(pid, {:text, rendered})
277 end)
278 end)
279 end
280
281 defp push_to_socket(topic, %Activity{
282 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
283 }) do
284 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
285
286 Registry.dispatch(@registry, topic, fn list ->
287 Enum.each(list, fn {pid, _} ->
288 send(pid, {:text, rendered})
289 end)
290 end)
291 end
292
293 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
294
295 defp push_to_socket(topic, item) do
296 anon_render = StreamerView.render("update.json", item)
297
298 Registry.dispatch(@registry, topic, fn list ->
299 Enum.each(list, fn {pid, auth?} ->
300 if auth? do
301 send(pid, {:render_with_user, StreamerView, "update.json", item})
302 else
303 send(pid, {:text, anon_render})
304 end
305 end)
306 end)
307 end
308
309 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
310
311 defp thread_containment(activity, user) do
312 if Config.get([:instance, :skip_thread_containment]) do
313 true
314 else
315 ActivityPub.contain_activity(activity, user)
316 end
317 end
318
319 # In test environement, only return true if the registry is started.
320 # In benchmark environment, returns false.
321 # In any other environment, always returns true.
322 cond do
323 @mix_env == :test ->
324 def should_env_send? do
325 case Process.whereis(@registry) do
326 nil ->
327 false
328
329 pid ->
330 Process.alive?(pid)
331 end
332 end
333
334 @mix_env == :benchmark ->
335 def should_env_send?, do: false
336
337 true ->
338 def should_env_send?, do: true
339 end
340 end