+ alias Pleroma.Activity
+ alias Pleroma.Chat.MessageReference
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.Object
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Visibility
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Web.Plugs.OAuthScopesPlug
+ alias Pleroma.Web.StreamerView
+
+ @mix_env Mix.env()
+ @registry Pleroma.Web.StreamerRegistry
+
+ def registry, do: @registry
+
+ @public_streams ["public", "public:local", "public:media", "public:local:media"]
+ @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+
+ @doc "Expands and authorizes a stream, and registers the process for streaming."
+ @spec get_topic_and_add_socket(
+ stream :: String.t(),
+ User.t() | nil,
+ Token.t() | nil,
+ Map.t() | nil
+ ) ::
+ {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
+ def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
+ case get_topic(stream, user, oauth_token, params) do
+ {:ok, topic} -> add_socket(topic, user)
+ error -> error
+ end
+ end
+
+ @doc "Expand and authorizes a stream"
+ @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
+ {:ok, topic :: String.t()} | {:error, :bad_topic}
+ def get_topic(stream, user, oauth_token, params \\ %{})
+
+ # Allow all public steams.
+ def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
+ {:ok, stream}
+ end
+
+ # Allow all hashtags streams.
+ def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
+ {:ok, "hashtag:" <> tag}
+ end
+
+ # Allow remote instance streams.
+ def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
+ {:ok, "public:remote:" <> instance}
+ end
+
+ def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
+ {:ok, "public:remote:media:" <> instance}
+ end
+
+ # Expand user streams.
+ def get_topic(
+ stream,
+ %User{id: user_id} = user,
+ %Token{user_id: token_user_id} = oauth_token,
+ _params
+ )
+ when stream in @user_streams and user_id == token_user_id do
+ # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
+ required_scopes =
+ if stream == "user:notification" do
+ ["read:notifications"]
+ else
+ ["read:statuses"]
+ end
+
+ if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
+ {:error, :unauthorized}
+ else
+ {:ok, stream <> ":" <> to_string(user.id)}
+ end
+ end
+
+ def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
+ {:error, :unauthorized}
+ end
+
+ # List streams.
+ def get_topic(
+ "list",
+ %User{id: user_id} = user,
+ %Token{user_id: token_user_id} = oauth_token,
+ %{"list" => id}
+ )
+ when user_id == token_user_id do
+ cond do
+ OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+ {:error, :unauthorized}
+
+ Pleroma.List.get(id, user) ->
+ {:ok, "list:" <> to_string(id)}
+
+ true ->
+ {:error, :bad_topic}
+ end
+ end
+
+ def get_topic("list", _user, _oauth_token, _params) do
+ {:error, :unauthorized}
+ end
+
+ def get_topic(_stream, _user, _oauth_token, _params) do
+ {:error, :bad_topic}
+ end
+
+ @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+ def add_socket(topic, user) do
+ if should_env_send?() do
+ auth? = if user, do: true
+ Registry.register(@registry, topic, auth?)
+ end
+
+ {:ok, topic}