SideEffects: Move streaming of chats to after the transaction.
authorlain <lain@soykaf.club>
Sun, 7 Jun 2020 12:52:56 +0000 (14:52 +0200)
committerlain <lain@soykaf.club>
Sun, 7 Jun 2020 12:52:56 +0000 (14:52 +0200)
lib/pleroma/web/activity_pub/side_effects.ex
lib/pleroma/web/pleroma_api/views/chat_view.ex
test/web/activity_pub/side_effects_test.exs
test/web/common_api/common_api_test.exs
test/web/pleroma_api/views/chat_view_test.exs

index 1e9d6c2fca5c59cde018936cb008deac6c754422..1a1cc675cc3e5b0909c44e95157702c2666e4041 100644 (file)
@@ -37,7 +37,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   # - Rollback if we couldn't create it
   # - Set up notifications
   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
-    with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
+    with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
       {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
 
       meta =
@@ -142,24 +142,24 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
       actor = User.get_cached_by_ap_id(object.data["actor"])
       recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
 
-      [[actor, recipient], [recipient, actor]]
-      |> Enum.each(fn [user, other_user] ->
-        if user.local do
-          {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
-          {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
-
-          # We add a cache of the unread value here so that it
-          # doesn't change when being streamed out
-          chat =
-            chat
-            |> Map.put(:unread, MessageReference.unread_count_for_chat(chat))
-
-          Streamer.stream(
-            ["user", "user:pleroma_chat"],
-            {user, %{cm_ref | chat: chat, object: object}}
-          )
-        end
-      end)
+      streamables =
+        [[actor, recipient], [recipient, actor]]
+        |> Enum.map(fn [user, other_user] ->
+          if user.local do
+            {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
+            {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
+
+            {
+              ["user", "user:pleroma_chat"],
+              {user, %{cm_ref | chat: chat, object: object}}
+            }
+          end
+        end)
+        |> Enum.filter(& &1)
+
+      meta =
+        meta
+        |> add_streamables(streamables)
 
       {:ok, object, meta}
     end
@@ -208,7 +208,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
 
   defp send_notifications(meta) do
-    Keyword.get(meta, :created_notifications, [])
+    Keyword.get(meta, :notifications, [])
     |> Enum.each(fn notification ->
       Streamer.stream(["user", "user:notification"], notification)
       Push.send(notification)
@@ -217,15 +217,32 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
     meta
   end
 
+  defp send_streamables(meta) do
+    Keyword.get(meta, :streamables, [])
+    |> Enum.each(fn {topics, items} ->
+      Streamer.stream(topics, items)
+    end)
+
+    meta
+  end
+
+  defp add_streamables(meta, streamables) do
+    existing = Keyword.get(meta, :streamables, [])
+
+    meta
+    |> Keyword.put(:streamables, streamables ++ existing)
+  end
+
   defp add_notifications(meta, notifications) do
-    existing = Keyword.get(meta, :created_notifications, [])
+    existing = Keyword.get(meta, :notifications, [])
 
     meta
-    |> Keyword.put(:created_notifications, notifications ++ existing)
+    |> Keyword.put(:notifications, notifications ++ existing)
   end
 
   def handle_after_transaction(meta) do
     meta
     |> send_notifications()
+    |> send_streamables()
   end
 end
index d4c10977f31d3a608ddda95c668fa74065a9d311..1c996da119474501a145a75e766f8cf7044a0ecf 100644 (file)
@@ -14,13 +14,12 @@ defmodule Pleroma.Web.PleromaAPI.ChatView do
 
   def render("show.json", %{chat: %Chat{} = chat} = opts) do
     recipient = User.get_cached_by_ap_id(chat.recipient)
-
     last_message = opts[:last_message] || MessageReference.last_message_for_chat(chat)
 
     %{
       id: chat.id |> to_string(),
       account: AccountView.render("show.json", Map.put(opts, :user, recipient)),
-      unread: Map.get(chat, :unread) || MessageReference.unread_count_for_chat(chat),
+      unread: MessageReference.unread_count_for_chat(chat),
       last_message:
         last_message &&
           MessageReferenceView.render("show.json", chat_message_reference: last_message),
index b1afa6a2e976da771436ee7e933acc4e51f9f407..6bbbaae87c620aa0e9bb66e7e858087cb9f492a4 100644 (file)
@@ -23,7 +23,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
   import Mock
 
   describe "handle_after_transaction" do
-    test "it streams out notifications" do
+    test "it streams out notifications and streams" do
       author = insert(:user, local: true)
       recipient = insert(:user, local: true)
 
@@ -37,7 +37,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
       {:ok, _create_activity, meta} =
         SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
 
-      assert [notification] = meta[:created_notifications]
+      assert [notification] = meta[:notifications]
 
       with_mocks([
         {
@@ -58,6 +58,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
         SideEffects.handle_after_transaction(meta)
 
         assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_))
         assert called(Pleroma.Web.Push.send(notification))
       end
     end
@@ -362,33 +363,10 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
 
       {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
 
-      with_mock Pleroma.Web.Streamer, [],
-        stream: fn _, payload ->
-          case payload do
-            {^author, cm_ref} ->
-              assert cm_ref.unread == false
-
-            {^recipient, cm_ref} ->
-              assert cm_ref.unread == true
-
-              view =
-                Pleroma.Web.PleromaAPI.ChatView.render("show.json",
-                  last_message: cm_ref,
-                  chat: cm_ref.chat
-                )
-
-              assert view.unread == 1
-
-            _ ->
-              nil
-          end
-        end do
-        {:ok, _create_activity, _meta} =
-          SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+      {:ok, _create_activity, meta} =
+        SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
 
-        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_}))
-        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_}))
-      end
+      assert [_, _] = meta[:streamables]
     end
 
     test "it creates a Chat and MessageReferences for the local users and bumps the unread count, except for the author" do
@@ -422,13 +400,18 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
           SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
 
         # The notification gets created
-        assert [notification] = meta[:created_notifications]
+        assert [notification] = meta[:notifications]
         assert notification.activity_id == create_activity.id
 
         # But it is not sent out
         refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
         refute called(Pleroma.Web.Push.send(notification))
 
+        # Same for the user chat stream
+        assert [{topics, _}, _] = meta[:streamables]
+        assert topics == ["user", "user:pleroma_chat"]
+        refute called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_))
+
         chat = Chat.get(author.id, recipient.ap_id)
 
         [cm_ref] = MessageReference.for_chat_query(chat) |> Repo.all()
index 63b59820e979df4dfe2d0aee84122bf1924f7857..6bd26050ef003b04cf763f1f6e599de6b900942e 100644 (file)
@@ -72,6 +72,7 @@ defmodule Pleroma.Web.CommonAPITest do
 
         assert called(Pleroma.Web.Push.send(notification))
         assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_))
 
         assert activity
       end
index f7af5d4e024bafddf3da94fcc96857e5e134ae0d..14eecb1bdcdb626a1372b650862a80020e2c088a 100644 (file)
@@ -16,21 +16,6 @@ defmodule Pleroma.Web.PleromaAPI.ChatViewTest do
 
   import Pleroma.Factory
 
-  test "giving a chat with an 'unread' field, it uses that" do
-    user = insert(:user)
-    recipient = insert(:user)
-
-    {:ok, chat} = Chat.get_or_create(user.id, recipient.ap_id)
-
-    chat =
-      chat
-      |> Map.put(:unread, 5)
-
-    represented_chat = ChatView.render("show.json", chat: chat)
-
-    assert represented_chat[:unread] == 5
-  end
-
   test "it represents a chat" do
     user = insert(:user)
     recipient = insert(:user)