API compatibility with fedibird, frontend config (#163)
[akkoma] / lib / pleroma / web / streamer.ex
index d618dfe54a93b74bf6fd9bb81b7151893b591500..d5b1d067895446a0aaa6604ebb2ef08577fcd417 100644 (file)
@@ -1,12 +1,11 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
-  alias Pleroma.Chat.MessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
@@ -25,7 +24,7 @@ defmodule Pleroma.Web.Streamer do
   def registry, do: @registry
 
   @public_streams ["public", "public:local", "public:media", "public:local:media"]
-  @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+  @user_streams ["user", "user:notification", "direct"]
 
   @doc "Expands and authorizes a stream, and registers the process for streaming."
   @spec get_topic_and_add_socket(
@@ -36,9 +35,8 @@ defmodule Pleroma.Web.Streamer do
         ) ::
           {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
   def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
-    case get_topic(stream, user, oauth_token, params) do
-      {:ok, topic} -> add_socket(topic, user)
-      error -> error
+    with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
+      add_socket(topic, user)
     end
   end
 
@@ -57,14 +55,23 @@ defmodule Pleroma.Web.Streamer do
     {:ok, "hashtag:" <> tag}
   end
 
+  # Allow remote instance streams.
+  def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
+    {:ok, "public:remote:" <> instance}
+  end
+
+  def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
+    {:ok, "public:remote:media:" <> instance}
+  end
+
   # Expand user streams.
   def get_topic(
         stream,
         %User{id: user_id} = user,
-        %Token{user_id: token_user_id} = oauth_token,
+        %Token{user_id: user_id} = oauth_token,
         _params
       )
-      when stream in @user_streams and user_id == token_user_id do
+      when stream in @user_streams do
     # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
     required_scopes =
       if stream == "user:notification" do
@@ -88,10 +95,9 @@ defmodule Pleroma.Web.Streamer do
   def get_topic(
         "list",
         %User{id: user_id} = user,
-        %Token{user_id: token_user_id} = oauth_token,
+        %Token{user_id: user_id} = oauth_token,
         %{"list" => id}
-      )
-      when user_id == token_user_id do
+      ) do
     cond do
       OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
         {:error, :unauthorized}
@@ -108,6 +114,11 @@ defmodule Pleroma.Web.Streamer do
     {:error, :unauthorized}
   end
 
+  # mastodon multi-topic WS
+  def get_topic(nil, _user, _oauth_token, _params) do
+    {:ok, :multi}
+  end
+
   def get_topic(_stream, _user, _oauth_token, _params) do
     {:error, :bad_topic}
   end
@@ -128,16 +139,10 @@ defmodule Pleroma.Web.Streamer do
 
   def stream(topics, items) do
     if should_env_send?() do
-      List.wrap(topics)
-      |> Enum.each(fn topic ->
-        List.wrap(items)
-        |> Enum.each(fn item ->
-          spawn(fn -> do_stream(topic, item) end)
-        end)
-      end)
+      for topic <- List.wrap(topics), item <- List.wrap(items) do
+        spawn(fn -> do_stream(topic, item) end)
+      end
     end
-
-    :ok
   end
 
   def filtered_by_user?(user, item, streamed_type \\ :activity)
@@ -150,9 +155,8 @@ defmodule Pleroma.Web.Streamer do
     recipients = MapSet.new(item.recipients)
     domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
 
-    with parent <- Object.normalize(item) || item,
-         true <-
-           Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+    with parent <- Object.normalize(item, fetch: false) || item,
+         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
          true <-
            !(streamed_type == :activity && item.data["type"] == "Announce" &&
@@ -186,6 +190,19 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
+  defp do_stream("follow_relationship", item) do
+    user_topic = "user:#{item.follower.id}"
+    text = StreamerView.render("follow_relationships_update.json", item, user_topic)
+
+    Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
+
+    Registry.dispatch(@registry, user_topic, fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:text, text})
+      end)
+    end)
+  end
+
   defp do_stream("participation", participation) do
     user_topic = "direct:#{participation.user_id}"
     Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
@@ -223,20 +240,7 @@ defmodule Pleroma.Web.Streamer do
        when topic in ["user", "user:notification"] do
     Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
       Enum.each(list, fn {pid, _auth} ->
-        send(pid, {:render_with_user, StreamerView, "notification.json", item})
-      end)
-    end)
-  end
-
-  defp do_stream(topic, {user, %MessageReference{} = cm_ref})
-       when topic in ["user", "user:pleroma_chat"] do
-    topic = "#{topic}:#{user.id}"
-
-    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
-
-    Registry.dispatch(@registry, topic, fn list ->
-      Enum.each(list, fn {pid, _auth} ->
-        send(pid, {:text, text})
+        send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
       end)
     end)
   end
@@ -260,7 +264,7 @@ defmodule Pleroma.Web.Streamer do
   end
 
   defp push_to_socket(topic, %Participation{} = participation) do
-    rendered = StreamerView.render("conversation.json", participation)
+    rendered = StreamerView.render("conversation.json", participation, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, _} ->
@@ -284,12 +288,12 @@ defmodule Pleroma.Web.Streamer do
   defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
 
   defp push_to_socket(topic, item) do
-    anon_render = StreamerView.render("update.json", item)
+    anon_render = StreamerView.render("update.json", item, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, auth?} ->
         if auth? do
-          send(pid, {:render_with_user, StreamerView, "update.json", item})
+          send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
         else
           send(pid, {:text, anon_render})
         end