Stream follow updates
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.OAuth.Token
19 alias Pleroma.Web.Plugs.OAuthScopesPlug
20 alias Pleroma.Web.StreamerView
21
22 @mix_env Mix.env()
23 @registry Pleroma.Web.StreamerRegistry
24
25 def registry, do: @registry
26
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
29
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
32 stream :: String.t(),
33 User.t() | nil,
34 Token.t() | nil,
35 Map.t() | nil
36 ) ::
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
40 add_socket(topic, user)
41 end
42 end
43
44 @doc "Expand and authorizes a stream"
45 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
46 {:ok, topic :: String.t()} | {:error, :bad_topic}
47 def get_topic(stream, user, oauth_token, params \\ %{})
48
49 # Allow all public steams.
50 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
51 {:ok, stream}
52 end
53
54 # Allow all hashtags streams.
55 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
56 {:ok, "hashtag:" <> tag}
57 end
58
59 # Allow remote instance streams.
60 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
61 {:ok, "public:remote:" <> instance}
62 end
63
64 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
65 {:ok, "public:remote:media:" <> instance}
66 end
67
68 # Expand user streams.
69 def get_topic(
70 stream,
71 %User{id: user_id} = user,
72 %Token{user_id: user_id} = oauth_token,
73 _params
74 )
75 when stream in @user_streams do
76 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
77 required_scopes =
78 if stream == "user:notification" do
79 ["read:notifications"]
80 else
81 ["read:statuses"]
82 end
83
84 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
85 {:error, :unauthorized}
86 else
87 {:ok, stream <> ":" <> to_string(user.id)}
88 end
89 end
90
91 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
92 {:error, :unauthorized}
93 end
94
95 # List streams.
96 def get_topic(
97 "list",
98 %User{id: user_id} = user,
99 %Token{user_id: user_id} = oauth_token,
100 %{"list" => id}
101 ) do
102 cond do
103 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
104 {:error, :unauthorized}
105
106 Pleroma.List.get(id, user) ->
107 {:ok, "list:" <> to_string(id)}
108
109 true ->
110 {:error, :bad_topic}
111 end
112 end
113
114 def get_topic("list", _user, _oauth_token, _params) do
115 {:error, :unauthorized}
116 end
117
118 def get_topic(_stream, _user, _oauth_token, _params) do
119 {:error, :bad_topic}
120 end
121
122 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
123 def add_socket(topic, user) do
124 if should_env_send?() do
125 auth? = if user, do: true
126 Registry.register(@registry, topic, auth?)
127 end
128
129 {:ok, topic}
130 end
131
132 def remove_socket(topic) do
133 if should_env_send?(), do: Registry.unregister(@registry, topic)
134 end
135
136 def stream(topics, items) do
137 if should_env_send?() do
138 for topic <- List.wrap(topics), item <- List.wrap(items) do
139 spawn(fn -> do_stream(topic, item) end)
140 end
141 end
142 end
143
144 def filtered_by_user?(user, item, streamed_type \\ :activity)
145
146 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
147 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
148 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
149
150 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
151 recipients = MapSet.new(item.recipients)
152 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
153
154 with parent <- Object.normalize(item) || item,
155 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
156 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
157 true <-
158 !(streamed_type == :activity && item.data["type"] == "Announce" &&
159 parent.data["actor"] == user.ap_id),
160 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
161 true <- MapSet.disjoint?(recipients, recipient_blocks),
162 %{host: item_host} <- URI.parse(item.actor),
163 %{host: parent_host} <- URI.parse(parent.data["actor"]),
164 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
165 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
166 true <- thread_containment(item, user),
167 false <- CommonAPI.thread_muted?(user, parent) do
168 false
169 else
170 _ -> true
171 end
172 end
173
174 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
175 filtered_by_user?(user, activity, :notification)
176 end
177
178 defp do_stream("direct", item) do
179 recipient_topics =
180 User.get_recipients_from_activity(item)
181 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
182
183 Enum.each(recipient_topics, fn user_topic ->
184 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
185 push_to_socket(user_topic, item)
186 end)
187 end
188
189 defp do_stream("relationships:update", item) do
190 text = StreamerView.render("relationships_update.json", item)
191
192 [item.follower, item.following]
193 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
194 |> Enum.each(fn user_topic ->
195 Logger.debug("Trying to push relationships:update to #{user_topic}\n\n")
196
197 Registry.dispatch(@registry, user_topic, fn list ->
198 Enum.each(list, fn {pid, _auth} ->
199 send(pid, {:text, text})
200 end)
201 end)
202 end)
203 end
204
205 defp do_stream("participation", participation) do
206 user_topic = "direct:#{participation.user_id}"
207 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
208
209 push_to_socket(user_topic, participation)
210 end
211
212 defp do_stream("list", item) do
213 # filter the recipient list if the activity is not public, see #270.
214 recipient_lists =
215 case Visibility.is_public?(item) do
216 true ->
217 Pleroma.List.get_lists_from_activity(item)
218
219 _ ->
220 Pleroma.List.get_lists_from_activity(item)
221 |> Enum.filter(fn list ->
222 owner = User.get_cached_by_id(list.user_id)
223
224 Visibility.visible_for_user?(item, owner)
225 end)
226 end
227
228 recipient_topics =
229 recipient_lists
230 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
231
232 Enum.each(recipient_topics, fn list_topic ->
233 Logger.debug("Trying to push message to #{list_topic}\n\n")
234 push_to_socket(list_topic, item)
235 end)
236 end
237
238 defp do_stream(topic, %Notification{} = item)
239 when topic in ["user", "user:notification"] do
240 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
241 Enum.each(list, fn {pid, _auth} ->
242 send(pid, {:render_with_user, StreamerView, "notification.json", item})
243 end)
244 end)
245 end
246
247 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
248 when topic in ["user", "user:pleroma_chat"] do
249 topic = "#{topic}:#{user.id}"
250
251 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
252
253 Registry.dispatch(@registry, topic, fn list ->
254 Enum.each(list, fn {pid, _auth} ->
255 send(pid, {:text, text})
256 end)
257 end)
258 end
259
260 defp do_stream("user", item) do
261 Logger.debug("Trying to push to users")
262
263 recipient_topics =
264 User.get_recipients_from_activity(item)
265 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
266
267 Enum.each(recipient_topics, fn topic ->
268 push_to_socket(topic, item)
269 end)
270 end
271
272 defp do_stream(topic, item) do
273 Logger.debug("Trying to push to #{topic}")
274 Logger.debug("Pushing item to #{topic}")
275 push_to_socket(topic, item)
276 end
277
278 defp push_to_socket(topic, %Participation{} = participation) do
279 rendered = StreamerView.render("conversation.json", participation)
280
281 Registry.dispatch(@registry, topic, fn list ->
282 Enum.each(list, fn {pid, _} ->
283 send(pid, {:text, rendered})
284 end)
285 end)
286 end
287
288 defp push_to_socket(topic, %Activity{
289 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
290 }) do
291 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
292
293 Registry.dispatch(@registry, topic, fn list ->
294 Enum.each(list, fn {pid, _} ->
295 send(pid, {:text, rendered})
296 end)
297 end)
298 end
299
300 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
301
302 defp push_to_socket(topic, item) do
303 anon_render = StreamerView.render("update.json", item)
304
305 Registry.dispatch(@registry, topic, fn list ->
306 Enum.each(list, fn {pid, auth?} ->
307 if auth? do
308 send(pid, {:render_with_user, StreamerView, "update.json", item})
309 else
310 send(pid, {:text, anon_render})
311 end
312 end)
313 end)
314 end
315
316 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
317
318 defp thread_containment(activity, user) do
319 if Config.get([:instance, :skip_thread_containment]) do
320 true
321 else
322 ActivityPub.contain_activity(activity, user)
323 end
324 end
325
326 # In test environement, only return true if the registry is started.
327 # In benchmark environment, returns false.
328 # In any other environment, always returns true.
329 cond do
330 @mix_env == :test ->
331 def should_env_send? do
332 case Process.whereis(@registry) do
333 nil ->
334 false
335
336 pid ->
337 Process.alive?(pid)
338 end
339 end
340
341 @mix_env == :benchmark ->
342 def should_env_send?, do: false
343
344 true ->
345 def should_env_send?, do: true
346 end
347 end