purge chat and shout endpoints
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.OAuth.Token
18 alias Pleroma.Web.Plugs.OAuthScopesPlug
19 alias Pleroma.Web.StreamerView
20
21 @mix_env Mix.env()
22 @registry Pleroma.Web.StreamerRegistry
23
24 def registry, do: @registry
25
26 @public_streams ["public", "public:local", "public:media", "public:local:media"]
27 @user_streams ["user", "user:notification", "direct"]
28
29 @doc "Expands and authorizes a stream, and registers the process for streaming."
30 @spec get_topic_and_add_socket(
31 stream :: String.t(),
32 User.t() | nil,
33 Token.t() | nil,
34 Map.t() | nil
35 ) ::
36 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
37 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
38 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
39 add_socket(topic, user)
40 end
41 end
42
43 @doc "Expand and authorizes a stream"
44 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
45 {:ok, topic :: String.t()} | {:error, :bad_topic}
46 def get_topic(stream, user, oauth_token, params \\ %{})
47
48 # Allow all public steams.
49 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
50 {:ok, stream}
51 end
52
53 # Allow all hashtags streams.
54 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
55 {:ok, "hashtag:" <> tag}
56 end
57
58 # Allow remote instance streams.
59 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
60 {:ok, "public:remote:" <> instance}
61 end
62
63 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
64 {:ok, "public:remote:media:" <> instance}
65 end
66
67 # Expand user streams.
68 def get_topic(
69 stream,
70 %User{id: user_id} = user,
71 %Token{user_id: user_id} = oauth_token,
72 _params
73 )
74 when stream in @user_streams do
75 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
76 required_scopes =
77 if stream == "user:notification" do
78 ["read:notifications"]
79 else
80 ["read:statuses"]
81 end
82
83 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
84 {:error, :unauthorized}
85 else
86 {:ok, stream <> ":" <> to_string(user.id)}
87 end
88 end
89
90 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
91 {:error, :unauthorized}
92 end
93
94 # List streams.
95 def get_topic(
96 "list",
97 %User{id: user_id} = user,
98 %Token{user_id: user_id} = oauth_token,
99 %{"list" => id}
100 ) do
101 cond do
102 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
103 {:error, :unauthorized}
104
105 Pleroma.List.get(id, user) ->
106 {:ok, "list:" <> to_string(id)}
107
108 true ->
109 {:error, :bad_topic}
110 end
111 end
112
113 def get_topic("list", _user, _oauth_token, _params) do
114 {:error, :unauthorized}
115 end
116
117 def get_topic(_stream, _user, _oauth_token, _params) do
118 {:error, :bad_topic}
119 end
120
121 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
122 def add_socket(topic, user) do
123 if should_env_send?() do
124 auth? = if user, do: true
125 Registry.register(@registry, topic, auth?)
126 end
127
128 {:ok, topic}
129 end
130
131 def remove_socket(topic) do
132 if should_env_send?(), do: Registry.unregister(@registry, topic)
133 end
134
135 def stream(topics, items) do
136 if should_env_send?() do
137 for topic <- List.wrap(topics), item <- List.wrap(items) do
138 spawn(fn -> do_stream(topic, item) end)
139 end
140 end
141 end
142
143 def filtered_by_user?(user, item, streamed_type \\ :activity)
144
145 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
146 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
147 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
148
149 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
150 recipients = MapSet.new(item.recipients)
151 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
152
153 with parent <- Object.normalize(item, fetch: false) || item,
154 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
155 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
156 true <-
157 !(streamed_type == :activity && item.data["type"] == "Announce" &&
158 parent.data["actor"] == user.ap_id),
159 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
160 true <- MapSet.disjoint?(recipients, recipient_blocks),
161 %{host: item_host} <- URI.parse(item.actor),
162 %{host: parent_host} <- URI.parse(parent.data["actor"]),
163 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
164 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
165 true <- thread_containment(item, user),
166 false <- CommonAPI.thread_muted?(user, parent) do
167 false
168 else
169 _ -> true
170 end
171 end
172
173 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
174 filtered_by_user?(user, activity, :notification)
175 end
176
177 defp do_stream("direct", item) do
178 recipient_topics =
179 User.get_recipients_from_activity(item)
180 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
181
182 Enum.each(recipient_topics, fn user_topic ->
183 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
184 push_to_socket(user_topic, item)
185 end)
186 end
187
188 defp do_stream("follow_relationship", item) do
189 text = StreamerView.render("follow_relationships_update.json", item)
190 user_topic = "user:#{item.follower.id}"
191
192 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
193
194 Registry.dispatch(@registry, user_topic, fn list ->
195 Enum.each(list, fn {pid, _auth} ->
196 send(pid, {:text, text})
197 end)
198 end)
199 end
200
201 defp do_stream("participation", participation) do
202 user_topic = "direct:#{participation.user_id}"
203 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
204
205 push_to_socket(user_topic, participation)
206 end
207
208 defp do_stream("list", item) do
209 # filter the recipient list if the activity is not public, see #270.
210 recipient_lists =
211 case Visibility.is_public?(item) do
212 true ->
213 Pleroma.List.get_lists_from_activity(item)
214
215 _ ->
216 Pleroma.List.get_lists_from_activity(item)
217 |> Enum.filter(fn list ->
218 owner = User.get_cached_by_id(list.user_id)
219
220 Visibility.visible_for_user?(item, owner)
221 end)
222 end
223
224 recipient_topics =
225 recipient_lists
226 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
227
228 Enum.each(recipient_topics, fn list_topic ->
229 Logger.debug("Trying to push message to #{list_topic}\n\n")
230 push_to_socket(list_topic, item)
231 end)
232 end
233
234 defp do_stream(topic, %Notification{} = item)
235 when topic in ["user", "user:notification"] do
236 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
237 Enum.each(list, fn {pid, _auth} ->
238 send(pid, {:render_with_user, StreamerView, "notification.json", item})
239 end)
240 end)
241 end
242
243 defp do_stream("user", item) do
244 Logger.debug("Trying to push to users")
245
246 recipient_topics =
247 User.get_recipients_from_activity(item)
248 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
249
250 Enum.each(recipient_topics, fn topic ->
251 push_to_socket(topic, item)
252 end)
253 end
254
255 defp do_stream(topic, item) do
256 Logger.debug("Trying to push to #{topic}")
257 Logger.debug("Pushing item to #{topic}")
258 push_to_socket(topic, item)
259 end
260
261 defp push_to_socket(topic, %Participation{} = participation) do
262 rendered = StreamerView.render("conversation.json", participation)
263
264 Registry.dispatch(@registry, topic, fn list ->
265 Enum.each(list, fn {pid, _} ->
266 send(pid, {:text, rendered})
267 end)
268 end)
269 end
270
271 defp push_to_socket(topic, %Activity{
272 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
273 }) do
274 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
275
276 Registry.dispatch(@registry, topic, fn list ->
277 Enum.each(list, fn {pid, _} ->
278 send(pid, {:text, rendered})
279 end)
280 end)
281 end
282
283 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
284
285 defp push_to_socket(topic, item) do
286 anon_render = StreamerView.render("update.json", item)
287
288 Registry.dispatch(@registry, topic, fn list ->
289 Enum.each(list, fn {pid, auth?} ->
290 if auth? do
291 send(pid, {:render_with_user, StreamerView, "update.json", item})
292 else
293 send(pid, {:text, anon_render})
294 end
295 end)
296 end)
297 end
298
299 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
300
301 defp thread_containment(activity, user) do
302 if Config.get([:instance, :skip_thread_containment]) do
303 true
304 else
305 ActivityPub.contain_activity(activity, user)
306 end
307 end
308
309 # In test environement, only return true if the registry is started.
310 # In benchmark environment, returns false.
311 # In any other environment, always returns true.
312 cond do
313 @mix_env == :test ->
314 def should_env_send? do
315 case Process.whereis(@registry) do
316 nil ->
317 false
318
319 pid ->
320 Process.alive?(pid)
321 end
322 end
323
324 @mix_env == :benchmark ->
325 def should_env_send?, do: false
326
327 true ->
328 def should_env_send?, do: true
329 end
330 end