Add `account_activation_required` to /api/v1/instance
[akkoma] / lib / pleroma / web / streamer / streamer.ex
index 5ad4aa9367a8c588f6b3c1f4440f48375967b4c6..0cf41189b2f7b75e06dad3d1135b46f7a38f9562 100644 (file)
@@ -21,12 +21,68 @@ defmodule Pleroma.Web.Streamer do
 
   def registry, do: @registry
 
-  def add_socket(topic, %User{} = user) do
-    if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
+  @public_streams ["public", "public:local", "public:media", "public:local:media"]
+  @user_streams ["user", "user:notification", "direct"]
+
+  @doc "Expands and authorizes a stream, and registers the process for streaming."
+  @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
+          {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
+  def get_topic_and_add_socket(stream, user, params \\ %{}) do
+    case get_topic(stream, user, params) do
+      {:ok, topic} -> add_socket(topic, user)
+      error -> error
+    end
+  end
+
+  @doc "Expand and authorizes a stream"
+  @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
+          {:ok, topic :: String.t()} | {:error, :bad_topic}
+  def get_topic(stream, user, params \\ %{})
+
+  # Allow all public steams.
+  def get_topic(stream, _, _) when stream in @public_streams do
+    {:ok, stream}
   end
 
-  def add_socket(topic, _) do
-    if should_env_send?(), do: Registry.register(@registry, topic, false)
+  # Allow all hashtags streams.
+  def get_topic("hashtag", _, %{"tag" => tag}) do
+    {:ok, "hashtag:" <> tag}
+  end
+
+  # Expand user streams.
+  def get_topic(stream, %User{} = user, _) when stream in @user_streams do
+    {:ok, stream <> ":" <> to_string(user.id)}
+  end
+
+  def get_topic(stream, _, _) when stream in @user_streams do
+    {:error, :unauthorized}
+  end
+
+  # List streams.
+  def get_topic("list", %User{} = user, %{"list" => id}) do
+    if Pleroma.List.get(id, user) do
+      {:ok, "list:" <> to_string(id)}
+    else
+      {:error, :bad_topic}
+    end
+  end
+
+  def get_topic("list", _, _) do
+    {:error, :unauthorized}
+  end
+
+  def get_topic(_, _, _) do
+    {:error, :bad_topic}
+  end
+
+  @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+  def add_socket(topic, user) do
+    if should_env_send?() do
+      auth? = if user, do: true
+      Registry.register(@registry, topic, auth?)
+    end
+
+    {:ok, topic}
   end
 
   def remove_socket(topic) do
@@ -80,7 +136,7 @@ defmodule Pleroma.Web.Streamer do
          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
          true <- thread_containment(item, user),
-         false <- CommonAPI.thread_muted?(user, item) do
+         false <- CommonAPI.thread_muted?(user, parent) do
       false
     else
       _ -> true
@@ -231,13 +287,4 @@ defmodule Pleroma.Web.Streamer do
     true ->
       def should_env_send?, do: true
   end
-
-  defp user_topic(topic, user)
-       when topic in ~w[user user:notification direct] do
-    "#{topic}:#{user.id}"
-  end
-
-  defp user_topic(topic, _) do
-    topic
-  end
 end