Pipeline: Add a side effects step after the transaction finishes
authorlain <lain@soykaf.club>
Fri, 5 Jun 2020 14:47:02 +0000 (16:47 +0200)
committerlain <lain@soykaf.club>
Fri, 5 Jun 2020 14:47:02 +0000 (16:47 +0200)
This is to run things like streaming notifications out, which will
sometimes need data that is created by the transaction, but is
streamed out asynchronously.

lib/pleroma/notification.ex
lib/pleroma/web/activity_pub/pipeline.ex
lib/pleroma/web/activity_pub/side_effects.ex
test/web/activity_pub/pipeline_test.exs
test/web/activity_pub/side_effects_test.exs
test/web/common_api/common_api_test.exs

index e5b880b105f8ba40cbfd298fd5c0a166a7612e95..49e27c05a4a255034c4a271e03230ed760c202c0 100644 (file)
@@ -334,30 +334,34 @@ defmodule Pleroma.Notification do
     end
   end
 
-  def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
+  def create_notifications(activity, options \\ [])
+
+  def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
     object = Object.normalize(activity, false)
 
     if object && object.data["type"] == "Answer" do
       {:ok, []}
     else
-      do_create_notifications(activity)
+      do_create_notifications(activity, options)
     end
   end
 
-  def create_notifications(%Activity{data: %{"type" => type}} = activity)
+  def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
       when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
-    do_create_notifications(activity)
+    do_create_notifications(activity, options)
   end
 
-  def create_notifications(_), do: {:ok, []}
+  def create_notifications(_, _), do: {:ok, []}
+
+  defp do_create_notifications(%Activity{} = activity, options) do
+    do_send = Keyword.get(options, :do_send, true)
 
-  defp do_create_notifications(%Activity{} = activity) do
     {enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
     potential_receivers = enabled_receivers ++ disabled_receivers
 
     notifications =
       Enum.map(potential_receivers, fn user ->
-        do_send = user in enabled_receivers
+        do_send = do_send && user in enabled_receivers
         create_notification(activity, user, do_send)
       end)
 
@@ -623,4 +627,12 @@ defmodule Pleroma.Notification do
   end
 
   def skip?(_, _, _), do: false
+
+  def for_user_and_activity(user, activity) do
+    from(n in __MODULE__,
+      where: n.user_id == ^user.id,
+      where: n.activity_id == ^activity.id
+    )
+    |> Repo.one()
+  end
 end
index 0c54c4b234d75c15e24c3bb59ffe620dd9747634..6875c47f67e230e412b1f1f050bbc1e170326256 100644 (file)
@@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
           {:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
   def common_pipeline(object, meta) do
     case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
+      {:ok, {:ok, activity, meta}} ->
+        SideEffects.handle_after_transaction(meta)
+        {:ok, activity, meta}
+
       {:ok, value} ->
         value
 
index b3aacff4065235682ae8497b8886f72085a8d93d..10136789a5aea37475baa4fc40571d30444dcfe3 100644 (file)
@@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   alias Pleroma.Web.ActivityPub.Pipeline
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.Streamer
+  alias Pleroma.Web.Push
 
   def handle(object, meta \\ [])
 
@@ -37,7 +38,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   # - Set up notifications
   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
     with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
-      Notification.create_notifications(activity)
+      {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
+
+      meta =
+        meta
+        |> add_notifications(notifications)
+
       {:ok, activity, meta}
     else
       e -> Repo.rollback(e)
@@ -200,4 +206,26 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   end
 
   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+
+  defp send_notifications(meta) do
+    Keyword.get(meta, :created_notifications, [])
+    |> Enum.each(fn notification ->
+      Streamer.stream(["user", "user:notification"], notification)
+      Push.send(notification)
+    end)
+
+    meta
+  end
+
+  defp add_notifications(meta, notifications) do
+    existing = Keyword.get(meta, :created_notifications, [])
+
+    meta
+    |> Keyword.put(:created_notifications, notifications ++ existing)
+  end
+
+  def handle_after_transaction(meta) do
+    meta
+    |> send_notifications()
+  end
 end
index 26557720b5606c57e65bb81d8734d614761fa8cb..8deb64501380858e083e6757bd5d03a2e6391469 100644 (file)
@@ -33,7 +33,10 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
         {
           Pleroma.Web.ActivityPub.SideEffects,
           [],
-          [handle: fn o, m -> {:ok, o, m} end]
+          [
+            handle: fn o, m -> {:ok, o, m} end,
+            handle_after_transaction: fn m -> m end
+          ]
         },
         {
           Pleroma.Web.Federator,
@@ -71,7 +74,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
         {
           Pleroma.Web.ActivityPub.SideEffects,
           [],
-          [handle: fn o, m -> {:ok, o, m} end]
+          [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
         },
         {
           Pleroma.Web.Federator,
@@ -110,7 +113,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
         {
           Pleroma.Web.ActivityPub.SideEffects,
           [],
-          [handle: fn o, m -> {:ok, o, m} end]
+          [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
         },
         {
           Pleroma.Web.Federator,
index 40df664eb5f0c85424652a65ffb478aaaf55879f..43ffe13375a98f69cdbbcfd679d5b9fd963fbc2f 100644 (file)
@@ -22,6 +22,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
   import Pleroma.Factory
   import Mock
 
+  describe "handle_after_transaction" do
+    test "it streams out notifications" do
+      author = insert(:user, local: true)
+      recipient = insert(:user, local: true)
+
+      {:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey")
+
+      {:ok, create_activity_data, _meta} =
+        Builder.create(author, chat_message_data["id"], [recipient.ap_id])
+
+      {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
+
+      {:ok, _create_activity, meta} =
+        SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+
+      assert [notification] = meta[:created_notifications]
+
+      with_mocks([
+        {
+          Pleroma.Web.Streamer,
+          [],
+          [
+            stream: fn _, _ -> nil end
+          ]
+        },
+        {
+          Pleroma.Web.Push,
+          [],
+          [
+            send: fn _ -> nil end
+          ]
+        }
+      ]) do
+        SideEffects.handle_after_transaction(meta)
+
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+        assert called(Pleroma.Web.Push.send(notification))
+      end
+    end
+  end
+
   describe "delete objects" do
     setup do
       user = insert(:user)
@@ -361,22 +402,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
 
       {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
 
-      {:ok, _create_activity, _meta} =
-        SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+      with_mocks([
+        {
+          Pleroma.Web.Streamer,
+          [],
+          [
+            stream: fn _, _ -> nil end
+          ]
+        },
+        {
+          Pleroma.Web.Push,
+          [],
+          [
+            send: fn _ -> nil end
+          ]
+        }
+      ]) do
+        {:ok, _create_activity, meta} =
+          SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
 
-      chat = Chat.get(author.id, recipient.ap_id)
+        # The notification gets created
+        assert [notification] = meta[:created_notifications]
+        assert notification.activity_id == create_activity.id
 
-      [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+        # But it is not sent out
+        refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+        refute called(Pleroma.Web.Push.send(notification))
 
-      assert cm_ref.object.data["content"] == "hey"
-      assert cm_ref.unread == false
+        chat = Chat.get(author.id, recipient.ap_id)
 
-      chat = Chat.get(recipient.id, author.ap_id)
+        [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
 
-      [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+        assert cm_ref.object.data["content"] == "hey"
+        assert cm_ref.unread == false
 
-      assert cm_ref.object.data["content"] == "hey"
-      assert cm_ref.unread == true
+        chat = Chat.get(recipient.id, author.ap_id)
+
+        [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+
+        assert cm_ref.object.data["content"] == "hey"
+        assert cm_ref.unread == true
+      end
     end
 
     test "it creates a Chat for the local users and bumps the unread count" do
index 611a9ae66cf5afb6fe7c5cc46380718db7625666..63b59820e979df4dfe2d0aee84122bf1924f7857 100644 (file)
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.CommonAPITest do
   alias Pleroma.Activity
   alias Pleroma.Chat
   alias Pleroma.Conversation.Participation
+  alias Pleroma.Notification
   alias Pleroma.Object
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
@@ -39,15 +40,41 @@ defmodule Pleroma.Web.CommonAPITest do
 
       {:ok, upload} = ActivityPub.upload(file, actor: author.ap_id)
 
-      {:ok, activity} =
-        CommonAPI.post_chat_message(
-          author,
-          recipient,
-          nil,
-          media_id: upload.id
-        )
-
-      assert activity
+      with_mocks([
+        {
+          Pleroma.Web.Streamer,
+          [],
+          [
+            stream: fn _, _ ->
+              nil
+            end
+          ]
+        },
+        {
+          Pleroma.Web.Push,
+          [],
+          [
+            send: fn _ -> nil end
+          ]
+        }
+      ]) do
+        {:ok, activity} =
+          CommonAPI.post_chat_message(
+            author,
+            recipient,
+            nil,
+            media_id: upload.id
+          )
+
+        notification =
+          Notification.for_user_and_activity(recipient, activity)
+          |> Repo.preload(:activity)
+
+        assert called(Pleroma.Web.Push.send(notification))
+        assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+
+        assert activity
+      end
     end
 
     test "it adds html newlines" do