+ # List streams.
+ def get_topic(
+ "list",
+ %User{id: user_id} = user,
+ %Token{user_id: user_id} = oauth_token,
+ %{"list" => id}
+ ) do
+ cond do
+ OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+ {:error, :unauthorized}
+
+ Pleroma.List.get(id, user) ->
+ {:ok, "list:" <> to_string(id)}
+
+ true ->
+ {:error, :bad_topic}
+ end
+ end
+
+ def get_topic("list", _user, _oauth_token, _params) do
+ {:error, :unauthorized}
+ end
+
+ # mastodon multi-topic WS
+ def get_topic(nil, _user, _oauth_token, _params) do
+ {:ok, :multi}
+ end
+
+ def get_topic(_stream, _user, _oauth_token, _params) do
+ {:error, :bad_topic}
+ end
+
+ @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+ def add_socket(topic, oauth_token) do
+ if should_env_send?() do
+ oauth_token_id = if oauth_token, do: oauth_token.id, else: false
+ Registry.register(@registry, topic, oauth_token_id)
+ end
+
+ {:ok, topic}
+ end
+
+ def remove_socket(topic) do
+ if should_env_send?(), do: Registry.unregister(@registry, topic)
+ end
+
+ def stream(topics, items) do
+ if should_env_send?() do
+ for topic <- List.wrap(topics), item <- List.wrap(items) do
+ spawn(fn -> do_stream(topic, item) end)
+ end
+ end
+ end
+
+ def filtered_by_user?(user, item, streamed_type \\ :activity)