Merge branch 'develop' into feature/bulk-confirmation
[akkoma] / lib / pleroma / web / streamer / streamer.ex
index 5ad4aa9367a8c588f6b3c1f4440f48375967b4c6..5475f18a6d1e7b2878262a3fbb5d2b4285dad069 100644 (file)
@@ -6,14 +6,17 @@ defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
+  alias Pleroma.Chat.MessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.Object
+  alias Pleroma.Plugs.OAuthScopesPlug
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Visibility
   alias Pleroma.Web.CommonAPI
+  alias Pleroma.Web.OAuth.Token
   alias Pleroma.Web.StreamerView
 
   @mix_env Mix.env()
@@ -21,47 +24,125 @@ defmodule Pleroma.Web.Streamer do
 
   def registry, do: @registry
 
-  def add_socket(topic, %User{} = user) do
-    if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
+  @public_streams ["public", "public:local", "public:media", "public:local:media"]
+  @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+
+  @doc "Expands and authorizes a stream, and registers the process for streaming."
+  @spec get_topic_and_add_socket(
+          stream :: String.t(),
+          User.t() | nil,
+          Token.t() | nil,
+          Map.t() | nil
+        ) ::
+          {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
+  def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
+    case get_topic(stream, user, oauth_token, params) do
+      {:ok, topic} -> add_socket(topic, user)
+      error -> error
+    end
   end
 
-  def add_socket(topic, _) do
-    if should_env_send?(), do: Registry.register(@registry, topic, false)
+  @doc "Expand and authorizes a stream"
+  @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
+          {:ok, topic :: String.t()} | {:error, :bad_topic}
+  def get_topic(stream, user, oauth_token, params \\ %{})
+
+  # Allow all public steams.
+  def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
+    {:ok, stream}
   end
 
-  def remove_socket(topic) do
-    if should_env_send?(), do: Registry.unregister(@registry, topic)
+  # Allow all hashtags streams.
+  def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
+    {:ok, "hashtag:" <> tag}
   end
 
-  def stream(topics, item) when is_list(topics) do
-    if should_env_send?() do
-      Enum.each(topics, fn t ->
-        spawn(fn -> do_stream(t, item) end)
-      end)
+  # Expand user streams.
+  def get_topic(
+        stream,
+        %User{id: user_id} = user,
+        %Token{user_id: token_user_id} = oauth_token,
+        _params
+      )
+      when stream in @user_streams and user_id == token_user_id do
+    # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
+    required_scopes =
+      if stream == "user:notification" do
+        ["read:notifications"]
+      else
+        ["read:statuses"]
+      end
+
+    if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
+      {:error, :unauthorized}
+    else
+      {:ok, stream <> ":" <> to_string(user.id)}
+    end
+  end
+
+  def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
+    {:error, :unauthorized}
+  end
+
+  # List streams.
+  def get_topic(
+        "list",
+        %User{id: user_id} = user,
+        %Token{user_id: token_user_id} = oauth_token,
+        %{"list" => id}
+      )
+      when user_id == token_user_id do
+    cond do
+      OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+        {:error, :unauthorized}
+
+      Pleroma.List.get(id, user) ->
+        {:ok, "list:" <> to_string(id)}
+
+      true ->
+        {:error, :bad_topic}
     end
+  end
 
-    :ok
+  def get_topic("list", _user, _oauth_token, _params) do
+    {:error, :unauthorized}
   end
 
-  def stream(topic, items) when is_list(items) do
-    if should_env_send?() do
-      Enum.each(items, fn i ->
-        spawn(fn -> do_stream(topic, i) end)
-      end)
+  def get_topic(_stream, _user, _oauth_token, _params) do
+    {:error, :bad_topic}
+  end
 
-      :ok
+  @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+  def add_socket(topic, user) do
+    if should_env_send?() do
+      auth? = if user, do: true
+      Registry.register(@registry, topic, auth?)
     end
+
+    {:ok, topic}
+  end
+
+  def remove_socket(topic) do
+    if should_env_send?(), do: Registry.unregister(@registry, topic)
   end
 
-  def stream(topic, item) do
+  def stream(topics, items) do
     if should_env_send?() do
-      spawn(fn -> do_stream(topic, item) end)
+      List.wrap(topics)
+      |> Enum.each(fn topic ->
+        List.wrap(items)
+        |> Enum.each(fn item ->
+          spawn(fn -> do_stream(topic, item) end)
+        end)
+      end)
     end
 
     :ok
   end
 
-  def filtered_by_user?(%User{} = user, %Activity{} = item) do
+  def filtered_by_user?(user, item, streamed_type \\ :activity)
+
+  def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
     %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
       User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
 
@@ -73,6 +154,9 @@ defmodule Pleroma.Web.Streamer do
          true <-
            Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+         true <-
+           !(streamed_type == :activity && item.data["type"] == "Announce" &&
+               parent.data["actor"] == user.ap_id),
          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
          true <- MapSet.disjoint?(recipients, recipient_blocks),
          %{host: item_host} <- URI.parse(item.actor),
@@ -80,15 +164,15 @@ defmodule Pleroma.Web.Streamer do
          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
          true <- thread_containment(item, user),
-         false <- CommonAPI.thread_muted?(user, item) do
+         false <- CommonAPI.thread_muted?(user, parent) do
       false
     else
       _ -> true
     end
   end
 
-  def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
-    filtered_by_user?(user, activity)
+  def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
+    filtered_by_user?(user, activity, :notification)
   end
 
   defp do_stream("direct", item) do
@@ -144,6 +228,19 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
+  defp do_stream(topic, {user, %MessageReference{} = cm_ref})
+       when topic in ["user", "user:pleroma_chat"] do
+    topic = "#{topic}:#{user.id}"
+
+    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:text, text})
+      end)
+    end)
+  end
+
   defp do_stream("user", item) do
     Logger.debug("Trying to push to users")
 
@@ -231,13 +328,4 @@ defmodule Pleroma.Web.Streamer do
     true ->
       def should_env_send?, do: true
   end
-
-  defp user_topic(topic, user)
-       when topic in ~w[user user:notification direct] do
-    "#{topic}:#{user.id}"
-  end
-
-  defp user_topic(topic, _) do
-    topic
-  end
 end