Remerge of hashtag following (#341)
[akkoma] / lib / pleroma / web / streamer.ex
index 7d4a1304a9b381e3f25bfec72acd38398adcc070..f009fbd9e1853e01001209207e6cd83270ca1a89 100644 (file)
@@ -1,12 +1,11 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
-  alias Pleroma.Chat.MessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
@@ -18,6 +17,7 @@ defmodule Pleroma.Web.Streamer do
   alias Pleroma.Web.OAuth.Token
   alias Pleroma.Web.Plugs.OAuthScopesPlug
   alias Pleroma.Web.StreamerView
+  require Pleroma.Constants
 
   @mix_env Mix.env()
   @registry Pleroma.Web.StreamerRegistry
@@ -25,7 +25,7 @@ defmodule Pleroma.Web.Streamer do
   def registry, do: @registry
 
   @public_streams ["public", "public:local", "public:media", "public:local:media"]
-  @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+  @user_streams ["user", "user:notification", "direct"]
 
   @doc "Expands and authorizes a stream, and registers the process for streaming."
   @spec get_topic_and_add_socket(
@@ -37,7 +37,7 @@ defmodule Pleroma.Web.Streamer do
           {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
   def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
     with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
-      add_socket(topic, user)
+      add_socket(topic, oauth_token)
     end
   end
 
@@ -115,15 +115,20 @@ defmodule Pleroma.Web.Streamer do
     {:error, :unauthorized}
   end
 
+  # mastodon multi-topic WS
+  def get_topic(nil, _user, _oauth_token, _params) do
+    {:ok, :multi}
+  end
+
   def get_topic(_stream, _user, _oauth_token, _params) do
     {:error, :bad_topic}
   end
 
   @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
-  def add_socket(topic, user) do
+  def add_socket(topic, oauth_token) do
     if should_env_send?() do
-      auth? = if user, do: true
-      Registry.register(@registry, topic, auth?)
+      oauth_token_id = if oauth_token, do: oauth_token.id, else: false
+      Registry.register(@registry, topic, oauth_token_id)
     end
 
     {:ok, topic}
@@ -151,7 +156,7 @@ defmodule Pleroma.Web.Streamer do
     recipients = MapSet.new(item.recipients)
     domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
 
-    with parent <- Object.normalize(item) || item,
+    with parent <- Object.normalize(item, fetch: false) || item,
          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
          true <-
@@ -187,8 +192,8 @@ defmodule Pleroma.Web.Streamer do
   end
 
   defp do_stream("follow_relationship", item) do
-    text = StreamerView.render("follow_relationships_update.json", item)
     user_topic = "user:#{item.follower.id}"
+    text = StreamerView.render("follow_relationships_update.json", item, user_topic)
 
     Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
 
@@ -236,20 +241,7 @@ defmodule Pleroma.Web.Streamer do
        when topic in ["user", "user:notification"] do
     Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
       Enum.each(list, fn {pid, _auth} ->
-        send(pid, {:render_with_user, StreamerView, "notification.json", item})
-      end)
-    end)
-  end
-
-  defp do_stream(topic, {user, %MessageReference{} = cm_ref})
-       when topic in ["user", "user:pleroma_chat"] do
-    topic = "#{topic}:#{user.id}"
-
-    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
-
-    Registry.dispatch(@registry, topic, fn list ->
-      Enum.each(list, fn {pid, _auth} ->
-        send(pid, {:text, text})
+        send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
       end)
     end)
   end
@@ -261,7 +253,17 @@ defmodule Pleroma.Web.Streamer do
       User.get_recipients_from_activity(item)
       |> Enum.map(fn %{id: id} -> "user:#{id}" end)
 
-    Enum.each(recipient_topics, fn topic ->
+    hashtag_recipients =
+      if Pleroma.Constants.as_public() in item.recipients do
+        Pleroma.Hashtag.get_recipients_for_activity(item)
+        |> Enum.map(fn id -> "user:#{id}" end)
+      else
+        []
+      end
+
+    all_recipients = Enum.uniq(recipient_topics ++ hashtag_recipients)
+
+    Enum.each(all_recipients, fn topic ->
       push_to_socket(topic, item)
     end)
   end
@@ -273,7 +275,7 @@ defmodule Pleroma.Web.Streamer do
   end
 
   defp push_to_socket(topic, %Participation{} = participation) do
-    rendered = StreamerView.render("conversation.json", participation)
+    rendered = StreamerView.render("conversation.json", participation, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, _} ->
@@ -296,13 +298,34 @@ defmodule Pleroma.Web.Streamer do
 
   defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
 
+  defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
+    create_activity =
+      Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
+      |> Map.put(:object, item.object)
+
+    anon_render = StreamerView.render("status_update.json", create_activity, topic)
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, auth?} ->
+        if auth? do
+          send(
+            pid,
+            {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
+          )
+        else
+          send(pid, {:text, anon_render})
+        end
+      end)
+    end)
+  end
+
   defp push_to_socket(topic, item) do
-    anon_render = StreamerView.render("update.json", item)
+    anon_render = StreamerView.render("update.json", item, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, auth?} ->
         if auth? do
-          send(pid, {:render_with_user, StreamerView, "update.json", item})
+          send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
         else
           send(pid, {:text, anon_render})
         end
@@ -320,6 +343,22 @@ defmodule Pleroma.Web.Streamer do
     end
   end
 
+  def close_streams_by_oauth_token(oauth_token) do
+    if should_env_send?() do
+      Registry.select(
+        @registry,
+        [
+          {
+            {:"$1", :"$2", :"$3"},
+            [{:==, :"$3", oauth_token.id}],
+            [:"$2"]
+          }
+        ]
+      )
+      |> Enum.each(fn pid -> send(pid, :close) end)
+    end
+  end
+
   # In test environement, only return true if the registry is started.
   # In benchmark environment, returns false.
   # In any other environment, always returns true.