Streamer, SideEffects: Stream out ChatMessageReferences
authorlain <lain@soykaf.club>
Wed, 3 Jun 2020 14:45:04 +0000 (16:45 +0200)
committerlain <lain@soykaf.club>
Wed, 3 Jun 2020 14:45:04 +0000 (16:45 +0200)
Saves us a few calles to fetch things from the DB that we already
have.

lib/pleroma/web/activity_pub/side_effects.ex
lib/pleroma/web/streamer/streamer.ex
lib/pleroma/web/views/streamer_view.ex
test/web/activity_pub/side_effects_test.exs
test/web/streamer/streamer_test.exs

index 884d399d043e542545dfbd930d536c78f04b5638..0c5709356d595e3ef33744de86127f6b07ad418c 100644 (file)
@@ -140,11 +140,15 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
       |> Enum.each(fn [user, other_user] ->
         if user.local do
           {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
-          ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id)
+          {:ok, cm_ref} = ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id)
+
+          Streamer.stream(
+            ["user", "user:pleroma_chat"],
+            {user, %{cm_ref | chat: chat, object: object}}
+          )
         end
       end)
 
-      Streamer.stream(["user", "user:pleroma_chat"], object)
       {:ok, object, meta}
     end
   end
index 2201cbfef87690eaab47557021a47c247ad7bede..5e37e2cf2d80c2299cff77a75f2bcb1dd2e43b87 100644 (file)
@@ -6,11 +6,11 @@ defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
+  alias Pleroma.ChatMessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.Object
-  alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Visibility
@@ -201,22 +201,15 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
-  defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
+  defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref})
        when topic in ["user", "user:pleroma_chat"] do
-    recipients = [object.data["actor"] | object.data["to"]]
-
-    topics =
-      %{ap_id: recipients, local: true}
-      |> Pleroma.User.Query.build()
-      |> Repo.all()
-      |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
-
-    Enum.each(topics, fn {user, topic} ->
-      Registry.dispatch(@registry, topic, fn list ->
-        Enum.each(list, fn {pid, _auth} ->
-          text = StreamerView.render("chat_update.json", object, user, recipients)
-          send(pid, {:text, text})
-        end)
+    topic = "#{topic}:#{user.id}"
+
+    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:text, text})
       end)
     end)
   end
index 616e0c4f24f8c5e1743cfa877f70361eacc105ee..a6efd01090d9fced2d3b0e8dd49fc358ef12bbf5 100644 (file)
@@ -6,36 +6,11 @@ defmodule Pleroma.Web.StreamerView do
   use Pleroma.Web, :view
 
   alias Pleroma.Activity
-  alias Pleroma.Chat
-  alias Pleroma.ChatMessageReference
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.User
   alias Pleroma.Web.MastodonAPI.NotificationView
 
-  def render("chat_update.json", object, user, recipients) do
-    chat = Chat.get(user.id, hd(recipients -- [user.ap_id]))
-
-    # Explicitly giving the cmr for the object here, so we don't accidentally
-    # send a later 'last_message' that was inserted between inserting this and
-    # streaming it out
-    cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
-
-    representation =
-      Pleroma.Web.PleromaAPI.ChatView.render(
-        "show.json",
-        %{last_message: cm_ref, chat: chat}
-      )
-
-    %{
-      event: "pleroma:chat_update",
-      payload:
-        representation
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
   def render("update.json", %Activity{} = activity, %User{} = user) do
     %{
       event: "update",
@@ -76,6 +51,27 @@ defmodule Pleroma.Web.StreamerView do
     |> Jason.encode!()
   end
 
+  def render("chat_update.json", %{chat_message_reference: cm_ref}) do
+    # Explicitly giving the cmr for the object here, so we don't accidentally
+    # send a later 'last_message' that was inserted between inserting this and
+    # streaming it out
+    Logger.debug("Trying to stream out #{inspect(cm_ref)}")
+
+    representation =
+      Pleroma.Web.PleromaAPI.ChatView.render(
+        "show.json",
+        %{last_message: cm_ref, chat: cm_ref.chat}
+      )
+
+    %{
+      event: "pleroma:chat_update",
+      payload:
+        representation
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+
   def render("conversation.json", %Participation{} = participation) do
     %{
       event: "conversation",
index f2fa062b4f30bc07bca5f644c5888cadc8c4573d..92c266d846f1d1ff0015cf098dcf9cfb7cd1d0b9 100644 (file)
@@ -325,9 +325,8 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
         {:ok, _create_activity, _meta} =
           SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
 
-        object = Object.normalize(create_activity, false)
-
-        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], object))
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_}))
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_}))
       end
     end
 
index bcb05a02d11fa4b8740171a668ca0caca18f9a53..893ae5449e9149ed932702c517670930d4e9a398 100644 (file)
@@ -7,6 +7,8 @@ defmodule Pleroma.Web.StreamerTest do
 
   import Pleroma.Factory
 
+  alias Pleroma.Chat
+  alias Pleroma.ChatMessageReference
   alias Pleroma.Conversation.Participation
   alias Pleroma.List
   alias Pleroma.Object
@@ -150,22 +152,36 @@ defmodule Pleroma.Web.StreamerTest do
     test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
       other_user = insert(:user)
 
-      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
       object = Object.normalize(create_activity, false)
+      chat = Chat.get(user.id, other_user.ap_id)
+      cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
+      cm_ref = %{cm_ref | chat: chat, object: object}
+
       Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
-      Streamer.stream("user:pleroma_chat", object)
-      text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+      Streamer.stream("user:pleroma_chat", {user, cm_ref})
+
+      text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+      assert text =~ "hey cirno"
       assert_receive {:text, ^text}
     end
 
     test "it sends chat messages to the 'user' stream", %{user: user} do
       other_user = insert(:user)
 
-      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
       object = Object.normalize(create_activity, false)
+      chat = Chat.get(user.id, other_user.ap_id)
+      cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
+      cm_ref = %{cm_ref | chat: chat, object: object}
+
       Streamer.get_topic_and_add_socket("user", user)
-      Streamer.stream("user", object)
-      text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+      Streamer.stream("user", {user, cm_ref})
+
+      text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+      assert text =~ "hey cirno"
       assert_receive {:text, ^text}
     end