From 115d08a7542b92c5e1d889da41c0ee6837a1235e Mon Sep 17 00:00:00 2001
From: lain <lain@soykaf.club>
Date: Fri, 5 Jun 2020 16:47:02 +0200
Subject: [PATCH] Pipeline: Add a side effects step after the transaction
 finishes

This is to run things like streaming notifications out, which will
sometimes need data that is created by the transaction, but is
streamed out asynchronously.
---
 lib/pleroma/notification.ex                  | 26 ++++--
 lib/pleroma/web/activity_pub/pipeline.ex     |  4 +
 lib/pleroma/web/activity_pub/side_effects.ex | 30 ++++++-
 test/web/activity_pub/pipeline_test.exs      |  9 +-
 test/web/activity_pub/side_effects_test.exs  | 86 +++++++++++++++++---
 test/web/common_api/common_api_test.exs      | 45 ++++++++--
 6 files changed, 170 insertions(+), 30 deletions(-)

diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index e5b880b10..49e27c05a 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -334,30 +334,34 @@ defmodule Pleroma.Notification do
     end
   end
 
-  def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
+  def create_notifications(activity, options \\ [])
+
+  def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
     object = Object.normalize(activity, false)
 
     if object && object.data["type"] == "Answer" do
       {:ok, []}
     else
-      do_create_notifications(activity)
+      do_create_notifications(activity, options)
     end
   end
 
-  def create_notifications(%Activity{data: %{"type" => type}} = activity)
+  def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
       when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
-    do_create_notifications(activity)
+    do_create_notifications(activity, options)
   end
 
-  def create_notifications(_), do: {:ok, []}
+  def create_notifications(_, _), do: {:ok, []}
+
+  defp do_create_notifications(%Activity{} = activity, options) do
+    do_send = Keyword.get(options, :do_send, true)
 
-  defp do_create_notifications(%Activity{} = activity) do
     {enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
     potential_receivers = enabled_receivers ++ disabled_receivers
 
     notifications =
       Enum.map(potential_receivers, fn user ->
-        do_send = user in enabled_receivers
+        do_send = do_send && user in enabled_receivers
         create_notification(activity, user, do_send)
       end)
 
@@ -623,4 +627,12 @@ defmodule Pleroma.Notification do
   end
 
   def skip?(_, _, _), do: false
+
+  def for_user_and_activity(user, activity) do
+    from(n in __MODULE__,
+      where: n.user_id == ^user.id,
+      where: n.activity_id == ^activity.id
+    )
+    |> Repo.one()
+  end
 end
diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex
index 0c54c4b23..6875c47f6 100644
--- a/lib/pleroma/web/activity_pub/pipeline.ex
+++ b/lib/pleroma/web/activity_pub/pipeline.ex
@@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
           {:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
   def common_pipeline(object, meta) do
     case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
+      {:ok, {:ok, activity, meta}} ->
+        SideEffects.handle_after_transaction(meta)
+        {:ok, activity, meta}
+
       {:ok, value} ->
         value
 
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index b3aacff40..10136789a 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   alias Pleroma.Web.ActivityPub.Pipeline
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.Streamer
+  alias Pleroma.Web.Push
 
   def handle(object, meta \\ [])
 
@@ -37,7 +38,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   # - Set up notifications
   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
     with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
-      Notification.create_notifications(activity)
+      {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
+
+      meta =
+        meta
+        |> add_notifications(notifications)
+
       {:ok, activity, meta}
     else
       e -> Repo.rollback(e)
@@ -200,4 +206,26 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   end
 
   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+
+  defp send_notifications(meta) do
+    Keyword.get(meta, :created_notifications, [])
+    |> Enum.each(fn notification ->
+      Streamer.stream(["user", "user:notification"], notification)
+      Push.send(notification)
+    end)
+
+    meta
+  end
+
+  defp add_notifications(meta, notifications) do
+    existing = Keyword.get(meta, :created_notifications, [])
+
+    meta
+    |> Keyword.put(:created_notifications, notifications ++ existing)
+  end
+
+  def handle_after_transaction(meta) do
+    meta
+    |> send_notifications()
+  end
 end
diff --git a/test/web/activity_pub/pipeline_test.exs b/test/web/activity_pub/pipeline_test.exs
index 26557720b..8deb64501 100644
--- a/test/web/activity_pub/pipeline_test.exs
+++ b/test/web/activity_pub/pipeline_test.exs
@@ -33,7 +33,10 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
         {
           Pleroma.Web.ActivityPub.SideEffects,
           [],
-          [handle: fn o, m -> {:ok, o, m} end]
+          [
+            handle: fn o, m -> {:ok, o, m} end,
+            handle_after_transaction: fn m -> m end
+          ]
         },
         {
           Pleroma.Web.Federator,
@@ -71,7 +74,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
         {
           Pleroma.Web.ActivityPub.SideEffects,
           [],
-          [handle: fn o, m -> {:ok, o, m} end]
+          [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
         },
         {
           Pleroma.Web.Federator,
@@ -110,7 +113,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
         {
           Pleroma.Web.ActivityPub.SideEffects,
           [],
-          [handle: fn o, m -> {:ok, o, m} end]
+          [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
         },
         {
           Pleroma.Web.Federator,
diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs
index 40df664eb..43ffe1337 100644
--- a/test/web/activity_pub/side_effects_test.exs
+++ b/test/web/activity_pub/side_effects_test.exs
@@ -22,6 +22,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
   import Pleroma.Factory
   import Mock
 
+  describe "handle_after_transaction" do
+    test "it streams out notifications" do
+      author = insert(:user, local: true)
+      recipient = insert(:user, local: true)
+
+      {:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey")
+
+      {:ok, create_activity_data, _meta} =
+        Builder.create(author, chat_message_data["id"], [recipient.ap_id])
+
+      {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
+
+      {:ok, _create_activity, meta} =
+        SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+
+      assert [notification] = meta[:created_notifications]
+
+      with_mocks([
+        {
+          Pleroma.Web.Streamer,
+          [],
+          [
+            stream: fn _, _ -> nil end
+          ]
+        },
+        {
+          Pleroma.Web.Push,
+          [],
+          [
+            send: fn _ -> nil end
+          ]
+        }
+      ]) do
+        SideEffects.handle_after_transaction(meta)
+
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+        assert called(Pleroma.Web.Push.send(notification))
+      end
+    end
+  end
+
   describe "delete objects" do
     setup do
       user = insert(:user)
@@ -361,22 +402,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
 
       {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
 
-      {:ok, _create_activity, _meta} =
-        SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+      with_mocks([
+        {
+          Pleroma.Web.Streamer,
+          [],
+          [
+            stream: fn _, _ -> nil end
+          ]
+        },
+        {
+          Pleroma.Web.Push,
+          [],
+          [
+            send: fn _ -> nil end
+          ]
+        }
+      ]) do
+        {:ok, _create_activity, meta} =
+          SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
 
-      chat = Chat.get(author.id, recipient.ap_id)
+        # The notification gets created
+        assert [notification] = meta[:created_notifications]
+        assert notification.activity_id == create_activity.id
 
-      [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+        # But it is not sent out
+        refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+        refute called(Pleroma.Web.Push.send(notification))
 
-      assert cm_ref.object.data["content"] == "hey"
-      assert cm_ref.unread == false
+        chat = Chat.get(author.id, recipient.ap_id)
 
-      chat = Chat.get(recipient.id, author.ap_id)
+        [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
 
-      [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+        assert cm_ref.object.data["content"] == "hey"
+        assert cm_ref.unread == false
 
-      assert cm_ref.object.data["content"] == "hey"
-      assert cm_ref.unread == true
+        chat = Chat.get(recipient.id, author.ap_id)
+
+        [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+
+        assert cm_ref.object.data["content"] == "hey"
+        assert cm_ref.unread == true
+      end
     end
 
     test "it creates a Chat for the local users and bumps the unread count" do
diff --git a/test/web/common_api/common_api_test.exs b/test/web/common_api/common_api_test.exs
index 611a9ae66..63b59820e 100644
--- a/test/web/common_api/common_api_test.exs
+++ b/test/web/common_api/common_api_test.exs
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.CommonAPITest do
   alias Pleroma.Activity
   alias Pleroma.Chat
   alias Pleroma.Conversation.Participation
+  alias Pleroma.Notification
   alias Pleroma.Object
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
@@ -39,15 +40,41 @@ defmodule Pleroma.Web.CommonAPITest do
 
       {:ok, upload} = ActivityPub.upload(file, actor: author.ap_id)
 
-      {:ok, activity} =
-        CommonAPI.post_chat_message(
-          author,
-          recipient,
-          nil,
-          media_id: upload.id
-        )
-
-      assert activity
+      with_mocks([
+        {
+          Pleroma.Web.Streamer,
+          [],
+          [
+            stream: fn _, _ ->
+              nil
+            end
+          ]
+        },
+        {
+          Pleroma.Web.Push,
+          [],
+          [
+            send: fn _ -> nil end
+          ]
+        }
+      ]) do
+        {:ok, activity} =
+          CommonAPI.post_chat_message(
+            author,
+            recipient,
+            nil,
+            media_id: upload.id
+          )
+
+        notification =
+          Notification.for_user_and_activity(recipient, activity)
+          |> Repo.preload(:activity)
+
+        assert called(Pleroma.Web.Push.send(notification))
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+
+        assert activity
+      end
     end
 
     test "it adds html newlines" do
-- 
2.49.0