1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.OAuth.Token
18 alias Pleroma.Web.Plugs.OAuthScopesPlug
19 alias Pleroma.Web.StreamerView
20 require Pleroma.Constants
23 @registry Pleroma.Web.StreamerRegistry
25 def registry, do: @registry
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct"]
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
40 add_socket(topic, oauth_token)
44 @doc "Expand and authorizes a stream"
45 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
46 {:ok, topic :: String.t()} | {:error, :bad_topic}
47 def get_topic(stream, user, oauth_token, params \\ %{})
49 # Allow all public steams.
50 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
54 # Allow all hashtags streams.
55 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
56 {:ok, "hashtag:" <> tag}
59 # Allow remote instance streams.
60 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
61 {:ok, "public:remote:" <> instance}
64 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
65 {:ok, "public:remote:media:" <> instance}
68 # Expand user streams.
71 %User{id: user_id} = user,
72 %Token{user_id: user_id} = oauth_token,
75 when stream in @user_streams do
76 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
78 if stream == "user:notification" do
79 ["read:notifications"]
84 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
85 {:error, :unauthorized}
87 {:ok, stream <> ":" <> to_string(user.id)}
91 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
92 {:error, :unauthorized}
98 %User{id: user_id} = user,
99 %Token{user_id: user_id} = oauth_token,
103 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
104 {:error, :unauthorized}
106 Pleroma.List.get(id, user) ->
107 {:ok, "list:" <> to_string(id)}
114 def get_topic("list", _user, _oauth_token, _params) do
115 {:error, :unauthorized}
118 # mastodon multi-topic WS
119 def get_topic(nil, _user, _oauth_token, _params) do
123 def get_topic(_stream, _user, _oauth_token, _params) do
127 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
128 def add_socket(topic, oauth_token) do
129 if should_env_send?() do
130 oauth_token_id = if oauth_token, do: oauth_token.id, else: false
131 Registry.register(@registry, topic, oauth_token_id)
137 def remove_socket(topic) do
138 if should_env_send?(), do: Registry.unregister(@registry, topic)
141 def stream(topics, items) do
142 if should_env_send?() do
143 for topic <- List.wrap(topics), item <- List.wrap(items) do
144 spawn(fn -> do_stream(topic, item) end)
149 def filtered_by_user?(user, item, streamed_type \\ :activity)
151 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
152 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
153 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
155 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
156 recipients = MapSet.new(item.recipients)
157 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
159 with parent <- Object.normalize(item, fetch: false) || item,
160 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
161 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
163 !(streamed_type == :activity && item.data["type"] == "Announce" &&
164 parent.data["actor"] == user.ap_id),
165 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
166 true <- MapSet.disjoint?(recipients, recipient_blocks),
167 %{host: item_host} <- URI.parse(item.actor),
168 %{host: parent_host} <- URI.parse(parent.data["actor"]),
169 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
170 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
171 true <- thread_containment(item, user),
172 false <- CommonAPI.thread_muted?(user, parent) do
179 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
180 filtered_by_user?(user, activity, :notification)
183 defp do_stream("direct", item) do
185 User.get_recipients_from_activity(item)
186 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
188 Enum.each(recipient_topics, fn user_topic ->
189 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
190 push_to_socket(user_topic, item)
194 defp do_stream("follow_relationship", item) do
195 user_topic = "user:#{item.follower.id}"
196 text = StreamerView.render("follow_relationships_update.json", item, user_topic)
198 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
200 Registry.dispatch(@registry, user_topic, fn list ->
201 Enum.each(list, fn {pid, _auth} ->
202 send(pid, {:text, text})
207 defp do_stream("participation", participation) do
208 user_topic = "direct:#{participation.user_id}"
209 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
211 push_to_socket(user_topic, participation)
214 defp do_stream("list", item) do
215 # filter the recipient list if the activity is not public, see #270.
217 case Visibility.is_public?(item) do
219 Pleroma.List.get_lists_from_activity(item)
222 Pleroma.List.get_lists_from_activity(item)
223 |> Enum.filter(fn list ->
224 owner = User.get_cached_by_id(list.user_id)
226 Visibility.visible_for_user?(item, owner)
232 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
234 Enum.each(recipient_topics, fn list_topic ->
235 Logger.debug("Trying to push message to #{list_topic}\n\n")
236 push_to_socket(list_topic, item)
240 defp do_stream(topic, %Notification{} = item)
241 when topic in ["user", "user:notification"] do
242 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
243 Enum.each(list, fn {pid, _auth} ->
244 send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
249 defp do_stream("user", item) do
250 Logger.debug("Trying to push to users")
253 User.get_recipients_from_activity(item)
254 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
257 if Pleroma.Constants.as_public() in item.recipients do
258 Pleroma.Hashtag.get_recipients_for_activity(item)
259 |> Enum.map(fn id -> "user:#{id}" end)
264 all_recipients = Enum.uniq(recipient_topics ++ hashtag_recipients)
266 Enum.each(all_recipients, fn topic ->
267 push_to_socket(topic, item)
271 defp do_stream(topic, item) do
272 Logger.debug("Trying to push to #{topic}")
273 Logger.debug("Pushing item to #{topic}")
274 push_to_socket(topic, item)
277 defp push_to_socket(topic, %Participation{} = participation) do
278 rendered = StreamerView.render("conversation.json", participation, topic)
280 Registry.dispatch(@registry, topic, fn list ->
281 Enum.each(list, fn {pid, _} ->
282 send(pid, {:text, rendered})
287 defp push_to_socket(topic, %Activity{
288 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
290 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
292 Registry.dispatch(@registry, topic, fn list ->
293 Enum.each(list, fn {pid, _} ->
294 send(pid, {:text, rendered})
299 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
301 defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
303 Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
304 |> Map.put(:object, item.object)
306 anon_render = StreamerView.render("status_update.json", create_activity, topic)
308 Registry.dispatch(@registry, topic, fn list ->
309 Enum.each(list, fn {pid, auth?} ->
313 {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
316 send(pid, {:text, anon_render})
322 defp push_to_socket(topic, item) do
323 anon_render = StreamerView.render("update.json", item, topic)
325 Registry.dispatch(@registry, topic, fn list ->
326 Enum.each(list, fn {pid, auth?} ->
328 send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
330 send(pid, {:text, anon_render})
336 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
338 defp thread_containment(activity, user) do
339 if Config.get([:instance, :skip_thread_containment]) do
342 ActivityPub.contain_activity(activity, user)
346 def close_streams_by_oauth_token(oauth_token) do
347 if should_env_send?() do
352 {:"$1", :"$2", :"$3"},
353 [{:==, :"$3", oauth_token.id}],
358 |> Enum.each(fn pid -> send(pid, :close) end)
362 # In test environement, only return true if the registry is started.
363 # In benchmark environment, returns false.
364 # In any other environment, always returns true.
367 def should_env_send? do
368 case Process.whereis(@registry) do
377 @mix_env == :benchmark ->
378 def should_env_send?, do: false
381 def should_env_send?, do: true