Merge branch 'develop' into activation-meta
[akkoma] / lib / pleroma / web / streamer / streamer.ex
index 2201cbfef87690eaab47557021a47c247ad7bede..d1d2c9b9c5f85bfa53bd48168a5796a8523c80b1 100644 (file)
@@ -6,11 +6,11 @@ defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
+  alias Pleroma.Chat.MessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.Object
-  alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Visibility
@@ -90,29 +90,15 @@ defmodule Pleroma.Web.Streamer do
     if should_env_send?(), do: Registry.unregister(@registry, topic)
   end
 
-  def stream(topics, item) when is_list(topics) do
+  def stream(topics, items) do
     if should_env_send?() do
-      Enum.each(topics, fn t ->
-        spawn(fn -> do_stream(t, item) end)
-      end)
-    end
-
-    :ok
-  end
-
-  def stream(topic, items) when is_list(items) do
-    if should_env_send?() do
-      Enum.each(items, fn i ->
-        spawn(fn -> do_stream(topic, i) end)
+      List.wrap(topics)
+      |> Enum.each(fn topic ->
+        List.wrap(items)
+        |> Enum.each(fn item ->
+          spawn(fn -> do_stream(topic, item) end)
+        end)
       end)
-
-      :ok
-    end
-  end
-
-  def stream(topic, item) do
-    if should_env_send?() do
-      spawn(fn -> do_stream(topic, item) end)
     end
 
     :ok
@@ -201,22 +187,15 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
-  defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
+  defp do_stream(topic, {user, %MessageReference{} = cm_ref})
        when topic in ["user", "user:pleroma_chat"] do
-    recipients = [object.data["actor"] | object.data["to"]]
-
-    topics =
-      %{ap_id: recipients, local: true}
-      |> Pleroma.User.Query.build()
-      |> Repo.all()
-      |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
-
-    Enum.each(topics, fn {user, topic} ->
-      Registry.dispatch(@registry, topic, fn list ->
-        Enum.each(list, fn {pid, _auth} ->
-          text = StreamerView.render("chat_update.json", object, user, recipients)
-          send(pid, {:text, text})
-        end)
+    topic = "#{topic}:#{user.id}"
+
+    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:text, text})
       end)
     end)
   end