ChatMessageReferences: Change seen -> unread
[akkoma] / lib / pleroma / web / streamer / streamer.ex
index 49a400df7e3ec4d4aa3ab2d9848873156df39c20..5e37e2cf2d80c2299cff77a75f2bcb1dd2e43b87 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
+  alias Pleroma.ChatMessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
@@ -22,7 +23,7 @@ defmodule Pleroma.Web.Streamer do
   def registry, do: @registry
 
   @public_streams ["public", "public:local", "public:media", "public:local:media"]
-  @user_streams ["user", "user:notification", "direct"]
+  @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
 
   @doc "Expands and authorizes a stream, and registers the process for streaming."
   @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
@@ -136,7 +137,7 @@ defmodule Pleroma.Web.Streamer do
          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
          true <- thread_containment(item, user),
-         false <- CommonAPI.thread_muted?(user, item) do
+         false <- CommonAPI.thread_muted?(user, parent) do
       false
     else
       _ -> true
@@ -200,6 +201,19 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
+  defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref})
+       when topic in ["user", "user:pleroma_chat"] do
+    topic = "#{topic}:#{user.id}"
+
+    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:text, text})
+      end)
+    end)
+  end
+
   defp do_stream("user", item) do
     Logger.debug("Trying to push to users")