SideEffects: Move streaming of chats to after the transaction.
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
index 1e9d6c2fca5c59cde018936cb008deac6c754422..1a1cc675cc3e5b0909c44e95157702c2666e4041 100644 (file)
@@ -37,7 +37,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   # - Rollback if we couldn't create it
   # - Set up notifications
   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
-    with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
+    with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
       {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
 
       meta =
@@ -142,24 +142,24 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
       actor = User.get_cached_by_ap_id(object.data["actor"])
       recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
 
-      [[actor, recipient], [recipient, actor]]
-      |> Enum.each(fn [user, other_user] ->
-        if user.local do
-          {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
-          {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
-
-          # We add a cache of the unread value here so that it
-          # doesn't change when being streamed out
-          chat =
-            chat
-            |> Map.put(:unread, MessageReference.unread_count_for_chat(chat))
-
-          Streamer.stream(
-            ["user", "user:pleroma_chat"],
-            {user, %{cm_ref | chat: chat, object: object}}
-          )
-        end
-      end)
+      streamables =
+        [[actor, recipient], [recipient, actor]]
+        |> Enum.map(fn [user, other_user] ->
+          if user.local do
+            {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
+            {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
+
+            {
+              ["user", "user:pleroma_chat"],
+              {user, %{cm_ref | chat: chat, object: object}}
+            }
+          end
+        end)
+        |> Enum.filter(& &1)
+
+      meta =
+        meta
+        |> add_streamables(streamables)
 
       {:ok, object, meta}
     end
@@ -208,7 +208,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
 
   defp send_notifications(meta) do
-    Keyword.get(meta, :created_notifications, [])
+    Keyword.get(meta, :notifications, [])
     |> Enum.each(fn notification ->
       Streamer.stream(["user", "user:notification"], notification)
       Push.send(notification)
@@ -217,15 +217,32 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
     meta
   end
 
+  defp send_streamables(meta) do
+    Keyword.get(meta, :streamables, [])
+    |> Enum.each(fn {topics, items} ->
+      Streamer.stream(topics, items)
+    end)
+
+    meta
+  end
+
+  defp add_streamables(meta, streamables) do
+    existing = Keyword.get(meta, :streamables, [])
+
+    meta
+    |> Keyword.put(:streamables, streamables ++ existing)
+  end
+
   defp add_notifications(meta, notifications) do
-    existing = Keyword.get(meta, :created_notifications, [])
+    existing = Keyword.get(meta, :notifications, [])
 
     meta
-    |> Keyword.put(:created_notifications, notifications ++ existing)
+    |> Keyword.put(:notifications, notifications ++ existing)
   end
 
   def handle_after_transaction(meta) do
     meta
     |> send_notifications()
+    |> send_streamables()
   end
 end