Merge branch 'release/2.2.0' into 'stable'
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.OAuth.Token
19 alias Pleroma.Web.Plugs.OAuthScopesPlug
20 alias Pleroma.Web.StreamerView
21
22 @mix_env Mix.env()
23 @registry Pleroma.Web.StreamerRegistry
24
25 def registry, do: @registry
26
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
29
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
32 stream :: String.t(),
33 User.t() | nil,
34 Token.t() | nil,
35 Map.t() | nil
36 ) ::
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 case get_topic(stream, user, oauth_token, params) do
40 {:ok, topic} -> add_socket(topic, user)
41 error -> error
42 end
43 end
44
45 @doc "Expand and authorizes a stream"
46 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
47 {:ok, topic :: String.t()} | {:error, :bad_topic}
48 def get_topic(stream, user, oauth_token, params \\ %{})
49
50 # Allow all public steams.
51 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
52 {:ok, stream}
53 end
54
55 # Allow all hashtags streams.
56 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
57 {:ok, "hashtag:" <> tag}
58 end
59
60 # Expand user streams.
61 def get_topic(
62 stream,
63 %User{id: user_id} = user,
64 %Token{user_id: token_user_id} = oauth_token,
65 _params
66 )
67 when stream in @user_streams and user_id == token_user_id do
68 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
69 required_scopes =
70 if stream == "user:notification" do
71 ["read:notifications"]
72 else
73 ["read:statuses"]
74 end
75
76 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
77 {:error, :unauthorized}
78 else
79 {:ok, stream <> ":" <> to_string(user.id)}
80 end
81 end
82
83 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
84 {:error, :unauthorized}
85 end
86
87 # List streams.
88 def get_topic(
89 "list",
90 %User{id: user_id} = user,
91 %Token{user_id: token_user_id} = oauth_token,
92 %{"list" => id}
93 )
94 when user_id == token_user_id do
95 cond do
96 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
97 {:error, :unauthorized}
98
99 Pleroma.List.get(id, user) ->
100 {:ok, "list:" <> to_string(id)}
101
102 true ->
103 {:error, :bad_topic}
104 end
105 end
106
107 def get_topic("list", _user, _oauth_token, _params) do
108 {:error, :unauthorized}
109 end
110
111 def get_topic(_stream, _user, _oauth_token, _params) do
112 {:error, :bad_topic}
113 end
114
115 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
116 def add_socket(topic, user) do
117 if should_env_send?() do
118 auth? = if user, do: true
119 Registry.register(@registry, topic, auth?)
120 end
121
122 {:ok, topic}
123 end
124
125 def remove_socket(topic) do
126 if should_env_send?(), do: Registry.unregister(@registry, topic)
127 end
128
129 def stream(topics, items) do
130 if should_env_send?() do
131 List.wrap(topics)
132 |> Enum.each(fn topic ->
133 List.wrap(items)
134 |> Enum.each(fn item ->
135 spawn(fn -> do_stream(topic, item) end)
136 end)
137 end)
138 end
139
140 :ok
141 end
142
143 def filtered_by_user?(user, item, streamed_type \\ :activity)
144
145 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
146 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
147 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
148
149 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
150 recipients = MapSet.new(item.recipients)
151 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
152
153 with parent <- Object.normalize(item) || item,
154 true <-
155 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
156 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
157 true <-
158 !(streamed_type == :activity && item.data["type"] == "Announce" &&
159 parent.data["actor"] == user.ap_id),
160 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
161 true <- MapSet.disjoint?(recipients, recipient_blocks),
162 %{host: item_host} <- URI.parse(item.actor),
163 %{host: parent_host} <- URI.parse(parent.data["actor"]),
164 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
165 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
166 true <- thread_containment(item, user),
167 false <- CommonAPI.thread_muted?(user, parent) do
168 false
169 else
170 _ -> true
171 end
172 end
173
174 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
175 filtered_by_user?(user, activity, :notification)
176 end
177
178 defp do_stream("direct", item) do
179 recipient_topics =
180 User.get_recipients_from_activity(item)
181 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
182
183 Enum.each(recipient_topics, fn user_topic ->
184 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
185 push_to_socket(user_topic, item)
186 end)
187 end
188
189 defp do_stream("participation", participation) do
190 user_topic = "direct:#{participation.user_id}"
191 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
192
193 push_to_socket(user_topic, participation)
194 end
195
196 defp do_stream("list", item) do
197 # filter the recipient list if the activity is not public, see #270.
198 recipient_lists =
199 case Visibility.is_public?(item) do
200 true ->
201 Pleroma.List.get_lists_from_activity(item)
202
203 _ ->
204 Pleroma.List.get_lists_from_activity(item)
205 |> Enum.filter(fn list ->
206 owner = User.get_cached_by_id(list.user_id)
207
208 Visibility.visible_for_user?(item, owner)
209 end)
210 end
211
212 recipient_topics =
213 recipient_lists
214 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
215
216 Enum.each(recipient_topics, fn list_topic ->
217 Logger.debug("Trying to push message to #{list_topic}\n\n")
218 push_to_socket(list_topic, item)
219 end)
220 end
221
222 defp do_stream(topic, %Notification{} = item)
223 when topic in ["user", "user:notification"] do
224 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
225 Enum.each(list, fn {pid, _auth} ->
226 send(pid, {:render_with_user, StreamerView, "notification.json", item})
227 end)
228 end)
229 end
230
231 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
232 when topic in ["user", "user:pleroma_chat"] do
233 topic = "#{topic}:#{user.id}"
234
235 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
236
237 Registry.dispatch(@registry, topic, fn list ->
238 Enum.each(list, fn {pid, _auth} ->
239 send(pid, {:text, text})
240 end)
241 end)
242 end
243
244 defp do_stream("user", item) do
245 Logger.debug("Trying to push to users")
246
247 recipient_topics =
248 User.get_recipients_from_activity(item)
249 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
250
251 Enum.each(recipient_topics, fn topic ->
252 push_to_socket(topic, item)
253 end)
254 end
255
256 defp do_stream(topic, item) do
257 Logger.debug("Trying to push to #{topic}")
258 Logger.debug("Pushing item to #{topic}")
259 push_to_socket(topic, item)
260 end
261
262 defp push_to_socket(topic, %Participation{} = participation) do
263 rendered = StreamerView.render("conversation.json", participation)
264
265 Registry.dispatch(@registry, topic, fn list ->
266 Enum.each(list, fn {pid, _} ->
267 send(pid, {:text, rendered})
268 end)
269 end)
270 end
271
272 defp push_to_socket(topic, %Activity{
273 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
274 }) do
275 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
276
277 Registry.dispatch(@registry, topic, fn list ->
278 Enum.each(list, fn {pid, _} ->
279 send(pid, {:text, rendered})
280 end)
281 end)
282 end
283
284 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
285
286 defp push_to_socket(topic, item) do
287 anon_render = StreamerView.render("update.json", item)
288
289 Registry.dispatch(@registry, topic, fn list ->
290 Enum.each(list, fn {pid, auth?} ->
291 if auth? do
292 send(pid, {:render_with_user, StreamerView, "update.json", item})
293 else
294 send(pid, {:text, anon_render})
295 end
296 end)
297 end)
298 end
299
300 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
301
302 defp thread_containment(activity, user) do
303 if Config.get([:instance, :skip_thread_containment]) do
304 true
305 else
306 ActivityPub.contain_activity(activity, user)
307 end
308 end
309
310 # In test environement, only return true if the registry is started.
311 # In benchmark environment, returns false.
312 # In any other environment, always returns true.
313 cond do
314 @mix_env == :test ->
315 def should_env_send? do
316 case Process.whereis(@registry) do
317 nil ->
318 false
319
320 pid ->
321 Process.alive?(pid)
322 end
323 end
324
325 @mix_env == :benchmark ->
326 def should_env_send?, do: false
327
328 true ->
329 def should_env_send?, do: true
330 end
331 end