From 2cdaac433035d8df3890eae098b55380b9e1c9fc Mon Sep 17 00:00:00 2001 From: lain Date: Sun, 7 Jun 2020 14:52:56 +0200 Subject: [PATCH] SideEffects: Move streaming of chats to after the transaction. --- lib/pleroma/web/activity_pub/side_effects.ex | 61 ++++++++++++------- .../web/pleroma_api/views/chat_view.ex | 3 +- test/web/activity_pub/side_effects_test.exs | 41 ++++--------- test/web/common_api/common_api_test.exs | 1 + test/web/pleroma_api/views/chat_view_test.exs | 15 ----- 5 files changed, 53 insertions(+), 68 deletions(-) diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 1e9d6c2fc..1a1cc675c 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -37,7 +37,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do # - Rollback if we couldn't create it # - Set up notifications def handle(%{data: %{"type" => "Create"}} = activity, meta) do - with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do + with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do {:ok, notifications} = Notification.create_notifications(activity, do_send: false) meta = @@ -142,24 +142,24 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do actor = User.get_cached_by_ap_id(object.data["actor"]) recipient = User.get_cached_by_ap_id(hd(object.data["to"])) - [[actor, recipient], [recipient, actor]] - |> Enum.each(fn [user, other_user] -> - if user.local do - {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id) - {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id) - - # We add a cache of the unread value here so that it - # doesn't change when being streamed out - chat = - chat - |> Map.put(:unread, MessageReference.unread_count_for_chat(chat)) - - Streamer.stream( - ["user", "user:pleroma_chat"], - {user, %{cm_ref | chat: chat, object: object}} - ) - end - end) + streamables = + [[actor, recipient], [recipient, actor]] + |> Enum.map(fn [user, other_user] -> + if user.local do + {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id) + {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id) + + { + ["user", "user:pleroma_chat"], + {user, %{cm_ref | chat: chat, object: object}} + } + end + end) + |> Enum.filter(& &1) + + meta = + meta + |> add_streamables(streamables) {:ok, object, meta} end @@ -208,7 +208,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do def handle_undoing(object), do: {:error, ["don't know how to handle", object]} defp send_notifications(meta) do - Keyword.get(meta, :created_notifications, []) + Keyword.get(meta, :notifications, []) |> Enum.each(fn notification -> Streamer.stream(["user", "user:notification"], notification) Push.send(notification) @@ -217,15 +217,32 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do meta end + defp send_streamables(meta) do + Keyword.get(meta, :streamables, []) + |> Enum.each(fn {topics, items} -> + Streamer.stream(topics, items) + end) + + meta + end + + defp add_streamables(meta, streamables) do + existing = Keyword.get(meta, :streamables, []) + + meta + |> Keyword.put(:streamables, streamables ++ existing) + end + defp add_notifications(meta, notifications) do - existing = Keyword.get(meta, :created_notifications, []) + existing = Keyword.get(meta, :notifications, []) meta - |> Keyword.put(:created_notifications, notifications ++ existing) + |> Keyword.put(:notifications, notifications ++ existing) end def handle_after_transaction(meta) do meta |> send_notifications() + |> send_streamables() end end diff --git a/lib/pleroma/web/pleroma_api/views/chat_view.ex b/lib/pleroma/web/pleroma_api/views/chat_view.ex index d4c10977f..1c996da11 100644 --- a/lib/pleroma/web/pleroma_api/views/chat_view.ex +++ b/lib/pleroma/web/pleroma_api/views/chat_view.ex @@ -14,13 +14,12 @@ defmodule Pleroma.Web.PleromaAPI.ChatView do def render("show.json", %{chat: %Chat{} = chat} = opts) do recipient = User.get_cached_by_ap_id(chat.recipient) - last_message = opts[:last_message] || MessageReference.last_message_for_chat(chat) %{ id: chat.id |> to_string(), account: AccountView.render("show.json", Map.put(opts, :user, recipient)), - unread: Map.get(chat, :unread) || MessageReference.unread_count_for_chat(chat), + unread: MessageReference.unread_count_for_chat(chat), last_message: last_message && MessageReferenceView.render("show.json", chat_message_reference: last_message), diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs index b1afa6a2e..6bbbaae87 100644 --- a/test/web/activity_pub/side_effects_test.exs +++ b/test/web/activity_pub/side_effects_test.exs @@ -23,7 +23,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do import Mock describe "handle_after_transaction" do - test "it streams out notifications" do + test "it streams out notifications and streams" do author = insert(:user, local: true) recipient = insert(:user, local: true) @@ -37,7 +37,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do {:ok, _create_activity, meta} = SideEffects.handle(create_activity, local: false, object_data: chat_message_data) - assert [notification] = meta[:created_notifications] + assert [notification] = meta[:notifications] with_mocks([ { @@ -58,6 +58,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do SideEffects.handle_after_transaction(meta) assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_)) assert called(Pleroma.Web.Push.send(notification)) end end @@ -362,33 +363,10 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false) - with_mock Pleroma.Web.Streamer, [], - stream: fn _, payload -> - case payload do - {^author, cm_ref} -> - assert cm_ref.unread == false - - {^recipient, cm_ref} -> - assert cm_ref.unread == true - - view = - Pleroma.Web.PleromaAPI.ChatView.render("show.json", - last_message: cm_ref, - chat: cm_ref.chat - ) - - assert view.unread == 1 - - _ -> - nil - end - end do - {:ok, _create_activity, _meta} = - SideEffects.handle(create_activity, local: false, object_data: chat_message_data) + {:ok, _create_activity, meta} = + SideEffects.handle(create_activity, local: false, object_data: chat_message_data) - assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_})) - assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_})) - end + assert [_, _] = meta[:streamables] end test "it creates a Chat and MessageReferences for the local users and bumps the unread count, except for the author" do @@ -422,13 +400,18 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do SideEffects.handle(create_activity, local: false, object_data: chat_message_data) # The notification gets created - assert [notification] = meta[:created_notifications] + assert [notification] = meta[:notifications] assert notification.activity_id == create_activity.id # But it is not sent out refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) refute called(Pleroma.Web.Push.send(notification)) + # Same for the user chat stream + assert [{topics, _}, _] = meta[:streamables] + assert topics == ["user", "user:pleroma_chat"] + refute called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_)) + chat = Chat.get(author.id, recipient.ap_id) [cm_ref] = MessageReference.for_chat_query(chat) |> Repo.all() diff --git a/test/web/common_api/common_api_test.exs b/test/web/common_api/common_api_test.exs index 63b59820e..6bd26050e 100644 --- a/test/web/common_api/common_api_test.exs +++ b/test/web/common_api/common_api_test.exs @@ -72,6 +72,7 @@ defmodule Pleroma.Web.CommonAPITest do assert called(Pleroma.Web.Push.send(notification)) assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_)) assert activity end diff --git a/test/web/pleroma_api/views/chat_view_test.exs b/test/web/pleroma_api/views/chat_view_test.exs index f7af5d4e0..14eecb1bd 100644 --- a/test/web/pleroma_api/views/chat_view_test.exs +++ b/test/web/pleroma_api/views/chat_view_test.exs @@ -16,21 +16,6 @@ defmodule Pleroma.Web.PleromaAPI.ChatViewTest do import Pleroma.Factory - test "giving a chat with an 'unread' field, it uses that" do - user = insert(:user) - recipient = insert(:user) - - {:ok, chat} = Chat.get_or_create(user.id, recipient.ap_id) - - chat = - chat - |> Map.put(:unread, 5) - - represented_chat = ChatView.render("show.json", chat: chat) - - assert represented_chat[:unread] == 5 - end - test "it represents a chat" do user = insert(:user) recipient = insert(:user) -- 2.45.2