+ # List streams.
+ def get_topic(
+ "list",
+ %User{id: user_id} = user,
+ %Token{user_id: user_id} = oauth_token,
+ %{"list" => id}
+ ) do
+ cond do
+ OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+ {:error, :unauthorized}
+
+ Pleroma.List.get(id, user) ->
+ {:ok, "list:" <> to_string(id)}
+
+ true ->
+ {:error, :bad_topic}
+ end
+ end
+
+ def get_topic("list", _user, _oauth_token, _params) do
+ {:error, :unauthorized}
+ end
+
+ def get_topic(_stream, _user, _oauth_token, _params) do
+ {:error, :bad_topic}
+ end
+
+ @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+ def add_socket(topic, user) do
+ if should_env_send?() do
+ auth? = if user, do: true
+ Registry.register(@registry, topic, auth?)
+ end
+
+ {:ok, topic}
+ end
+
+ def remove_socket(topic) do
+ if should_env_send?(), do: Registry.unregister(@registry, topic)
+ end
+
+ def stream(topics, items) do
+ if should_env_send?() do
+ for topic <- List.wrap(topics), item <- List.wrap(items) do
+ spawn(fn -> do_stream(topic, item) end)
+ end
+ end
+ end
+
+ def filtered_by_user?(user, item, streamed_type \\ :activity)
+
+ def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
+ %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
+ User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
+
+ recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
+ recipients = MapSet.new(item.recipients)
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)