1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.OAuth.Token
18 alias Pleroma.Web.Plugs.OAuthScopesPlug
19 alias Pleroma.Web.StreamerView
22 @registry Pleroma.Web.StreamerRegistry
24 def registry, do: @registry
26 @public_streams ["public", "public:local", "public:media", "public:local:media"]
27 @user_streams ["user", "user:notification", "direct"]
29 @doc "Expands and authorizes a stream, and registers the process for streaming."
30 @spec get_topic_and_add_socket(
36 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
37 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
38 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
39 add_socket(topic, oauth_token)
43 @doc "Expand and authorizes a stream"
44 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
45 {:ok, topic :: String.t()} | {:error, :bad_topic}
46 def get_topic(stream, user, oauth_token, params \\ %{})
48 # Allow all public steams.
49 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
53 # Allow all hashtags streams.
54 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
55 {:ok, "hashtag:" <> tag}
58 # Allow remote instance streams.
59 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
60 {:ok, "public:remote:" <> instance}
63 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
64 {:ok, "public:remote:media:" <> instance}
67 # Expand user streams.
70 %User{id: user_id} = user,
71 %Token{user_id: user_id} = oauth_token,
74 when stream in @user_streams do
75 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
77 if stream == "user:notification" do
78 ["read:notifications"]
83 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
84 {:error, :unauthorized}
86 {:ok, stream <> ":" <> to_string(user.id)}
90 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
91 {:error, :unauthorized}
97 %User{id: user_id} = user,
98 %Token{user_id: user_id} = oauth_token,
102 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
103 {:error, :unauthorized}
105 Pleroma.List.get(id, user) ->
106 {:ok, "list:" <> to_string(id)}
113 def get_topic("list", _user, _oauth_token, _params) do
114 {:error, :unauthorized}
117 # mastodon multi-topic WS
118 def get_topic(nil, _user, _oauth_token, _params) do
122 def get_topic(_stream, _user, _oauth_token, _params) do
126 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
127 def add_socket(topic, oauth_token) do
128 if should_env_send?() do
129 oauth_token_id = if oauth_token, do: oauth_token.id, else: false
130 Registry.register(@registry, topic, oauth_token_id)
136 def remove_socket(topic) do
137 if should_env_send?(), do: Registry.unregister(@registry, topic)
140 def stream(topics, items) do
141 if should_env_send?() do
142 for topic <- List.wrap(topics), item <- List.wrap(items) do
143 spawn(fn -> do_stream(topic, item) end)
148 def filtered_by_user?(user, item, streamed_type \\ :activity)
150 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
151 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
152 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
154 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
155 recipients = MapSet.new(item.recipients)
156 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
158 with parent <- Object.normalize(item, fetch: false) || item,
159 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
160 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
162 !(streamed_type == :activity && item.data["type"] == "Announce" &&
163 parent.data["actor"] == user.ap_id),
164 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
165 true <- MapSet.disjoint?(recipients, recipient_blocks),
166 %{host: item_host} <- URI.parse(item.actor),
167 %{host: parent_host} <- URI.parse(parent.data["actor"]),
168 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
169 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
170 true <- thread_containment(item, user),
171 false <- CommonAPI.thread_muted?(user, parent) do
178 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
179 filtered_by_user?(user, activity, :notification)
182 defp do_stream("direct", item) do
184 User.get_recipients_from_activity(item)
185 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
187 Enum.each(recipient_topics, fn user_topic ->
188 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
189 push_to_socket(user_topic, item)
193 defp do_stream("follow_relationship", item) do
194 user_topic = "user:#{item.follower.id}"
195 text = StreamerView.render("follow_relationships_update.json", item, user_topic)
197 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
199 Registry.dispatch(@registry, user_topic, fn list ->
200 Enum.each(list, fn {pid, _auth} ->
201 send(pid, {:text, text})
206 defp do_stream("participation", participation) do
207 user_topic = "direct:#{participation.user_id}"
208 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
210 push_to_socket(user_topic, participation)
213 defp do_stream("list", item) do
214 # filter the recipient list if the activity is not public, see #270.
216 case Visibility.is_public?(item) do
218 Pleroma.List.get_lists_from_activity(item)
221 Pleroma.List.get_lists_from_activity(item)
222 |> Enum.filter(fn list ->
223 owner = User.get_cached_by_id(list.user_id)
225 Visibility.visible_for_user?(item, owner)
231 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
233 Enum.each(recipient_topics, fn list_topic ->
234 Logger.debug("Trying to push message to #{list_topic}\n\n")
235 push_to_socket(list_topic, item)
239 defp do_stream(topic, %Notification{} = item)
240 when topic in ["user", "user:notification"] do
241 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
242 Enum.each(list, fn {pid, _auth} ->
243 send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
248 defp do_stream("user", item) do
249 Logger.debug("Trying to push to users")
252 User.get_recipients_from_activity(item)
253 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
255 Enum.each(recipient_topics, fn topic ->
256 push_to_socket(topic, item)
260 defp do_stream(topic, item) do
261 Logger.debug("Trying to push to #{topic}")
262 Logger.debug("Pushing item to #{topic}")
263 push_to_socket(topic, item)
266 defp push_to_socket(topic, %Participation{} = participation) do
267 rendered = StreamerView.render("conversation.json", participation, topic)
269 Registry.dispatch(@registry, topic, fn list ->
270 Enum.each(list, fn {pid, _} ->
271 send(pid, {:text, rendered})
276 defp push_to_socket(topic, %Activity{
277 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
279 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
281 Registry.dispatch(@registry, topic, fn list ->
282 Enum.each(list, fn {pid, _} ->
283 send(pid, {:text, rendered})
288 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
290 defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
292 Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
293 |> Map.put(:object, item.object)
295 anon_render = StreamerView.render("status_update.json", create_activity, topic)
297 Registry.dispatch(@registry, topic, fn list ->
298 Enum.each(list, fn {pid, auth?} ->
302 {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
305 send(pid, {:text, anon_render})
311 defp push_to_socket(topic, item) do
312 anon_render = StreamerView.render("update.json", item, topic)
314 Registry.dispatch(@registry, topic, fn list ->
315 Enum.each(list, fn {pid, auth?} ->
317 send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
319 send(pid, {:text, anon_render})
325 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
327 defp thread_containment(activity, user) do
328 if Config.get([:instance, :skip_thread_containment]) do
331 ActivityPub.contain_activity(activity, user)
335 def close_streams_by_oauth_token(oauth_token) do
336 if should_env_send?() do
341 {:"$1", :"$2", :"$3"},
342 [{:==, :"$3", oauth_token.id}],
347 |> Enum.each(fn pid -> send(pid, :close) end)
351 # In test environement, only return true if the registry is started.
352 # In benchmark environment, returns false.
353 # In any other environment, always returns true.
356 def should_env_send? do
357 case Process.whereis(@registry) do
366 @mix_env == :benchmark ->
367 def should_env_send?, do: false
370 def should_env_send?, do: true