Streamer: Add a chat message stream.
authorlain <lain@soykaf.club>
Fri, 29 May 2020 13:24:41 +0000 (15:24 +0200)
committerlain <lain@soykaf.club>
Fri, 29 May 2020 13:24:41 +0000 (15:24 +0200)
lib/pleroma/web/streamer/streamer.ex
lib/pleroma/web/views/streamer_view.ex
test/web/streamer/streamer_test.exs

index 49a400df7e3ec4d4aa3ab2d9848873156df39c20..331490a784f8754f5e721516bf311b28f0e07f76 100644 (file)
@@ -10,6 +10,7 @@ defmodule Pleroma.Web.Streamer do
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.Object
+  alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Visibility
@@ -22,7 +23,7 @@ defmodule Pleroma.Web.Streamer do
   def registry, do: @registry
 
   @public_streams ["public", "public:local", "public:media", "public:local:media"]
-  @user_streams ["user", "user:notification", "direct"]
+  @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
 
   @doc "Expands and authorizes a stream, and registers the process for streaming."
   @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
@@ -200,6 +201,26 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
+  defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
+       when topic in ["user", "user:pleroma_chat"] do
+    recipients = [object.data["actor"] | object.data["to"]]
+
+    topics =
+      %{ap_id: recipients, local: true}
+      |> Pleroma.User.Query.build()
+      |> Repo.all()
+      |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
+
+    Enum.each(topics, fn {user, topic} ->
+      Registry.dispatch(@registry, topic, fn list ->
+        Enum.each(list, fn {pid, _auth} ->
+          text = StreamerView.render("chat_update.json", object, user, recipients)
+          send(pid, {:text, text})
+        end)
+      end)
+    end)
+  end
+
   defp do_stream("user", item) do
     Logger.debug("Trying to push to users")
 
index 237b29ded98ba4082ba6b49e22f7efb267400e42..949e2ed374076c2634553d6f63c083d867a15be0 100644 (file)
@@ -6,11 +6,30 @@ defmodule Pleroma.Web.StreamerView do
   use Pleroma.Web, :view
 
   alias Pleroma.Activity
+  alias Pleroma.Chat
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.User
   alias Pleroma.Web.MastodonAPI.NotificationView
 
+  def render("chat_update.json", object, user, recipients) do
+    chat = Chat.get(user.id, hd(recipients -- [user.ap_id]))
+
+    representation =
+      Pleroma.Web.PleromaAPI.ChatMessageView.render(
+        "show.json",
+        %{object: object, chat: chat}
+      )
+
+    %{
+      event: "pleroma:chat_update",
+      payload:
+        representation
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+
   def render("update.json", %Activity{} = activity, %User{} = user) do
     %{
       event: "update",
index 115ba47032aa0c8e951796b410c7d895ecf789ff..ffbff35ca32046f6883b20a84b108c929353b27b 100644 (file)
@@ -9,9 +9,11 @@ defmodule Pleroma.Web.StreamerTest do
 
   alias Pleroma.Conversation.Participation
   alias Pleroma.List
+  alias Pleroma.Object
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
+  alias Pleroma.Web.StreamerView
 
   @moduletag needs_streamer: true, capture_log: true
 
@@ -126,6 +128,28 @@ defmodule Pleroma.Web.StreamerTest do
       refute Streamer.filtered_by_user?(user, notify)
     end
 
+    test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
+      other_user = insert(:user)
+
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+      object = Object.normalize(create_activity, false)
+      Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
+      Streamer.stream("user:pleroma_chat", object)
+      text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+      assert_receive {:text, ^text}
+    end
+
+    test "it sends chat messages to the 'user' stream", %{user: user} do
+      other_user = insert(:user)
+
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+      object = Object.normalize(create_activity, false)
+      Streamer.get_topic_and_add_socket("user", user)
+      Streamer.stream("user", object)
+      text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+      assert_receive {:text, ^text}
+    end
+
     test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do
       other_user = insert(:user)