Broadcast conversation update when DM is deleted
authorEugenij <eugenijm@protonmail.com>
Mon, 24 Jun 2019 07:14:04 +0000 (07:14 +0000)
committerkaniini <nenolod@gmail.com>
Mon, 24 Jun 2019 07:14:04 +0000 (07:14 +0000)
lib/pleroma/web/activity_pub/activity_pub.ex
test/web/streamer_test.exs

index c0e3d1478794622ccd9d633bc04807fac9faed02..55315d66ed454e320615f153e8f884d132a00502 100644 (file)
@@ -189,6 +189,22 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end)
   end
 
+  def stream_out_participations(%Object{data: %{"context" => context}}, user) do
+    with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
+         conversation = Repo.preload(conversation, :participations),
+         last_activity_id =
+           fetch_latest_activity_id_for_context(conversation.ap_id, %{
+             "user" => user,
+             "blocking_user" => user
+           }) do
+      if last_activity_id do
+        stream_out_participations(conversation.participations)
+      end
+    end
+  end
+
+  def stream_out_participations(_, _), do: :noop
+
   def stream_out(activity) do
     public = "https://www.w3.org/ns/activitystreams#Public"
 
@@ -401,7 +417,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
            "to" => to,
            "deleted_activity_id" => activity && activity.id
          },
-         {:ok, activity} <- insert(data, local),
+         {:ok, activity} <- insert(data, local, false),
+         stream_out_participations(object, user),
          _ <- decrease_replies_count_if_reply(object),
          # Changing note count prior to enqueuing federation task in order to avoid
          # race conditions on updating user.info
index 648e2871278bd6c757ccf7f4d809eba17f970e24..4633d7765cd583c5e597b56f170126dd7ad0176c 100644 (file)
@@ -356,4 +356,110 @@ defmodule Pleroma.Web.StreamerTest do
 
     Task.await(task)
   end
+
+  describe "direct streams" do
+    setup do
+      GenServer.start(Streamer, %{}, name: Streamer)
+
+      on_exit(fn ->
+        if pid = Process.whereis(Streamer) do
+          Process.exit(pid, :kill)
+        end
+      end)
+
+      :ok
+    end
+
+    test "it sends conversation update to the 'direct' stream", %{} do
+      user = insert(:user)
+      another_user = insert(:user)
+
+      task =
+        Task.async(fn ->
+          assert_receive {:text, _received_event}, 4_000
+        end)
+
+      Streamer.add_socket(
+        "direct",
+        %{transport_pid: task.pid, assigns: %{user: user}}
+      )
+
+      {:ok, _create_activity} =
+        CommonAPI.post(another_user, %{
+          "status" => "hey @#{user.nickname}",
+          "visibility" => "direct"
+        })
+
+      Task.await(task)
+    end
+
+    test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
+      user = insert(:user)
+      another_user = insert(:user)
+
+      {:ok, create_activity} =
+        CommonAPI.post(another_user, %{
+          "status" => "hi @#{user.nickname}",
+          "visibility" => "direct"
+        })
+
+      task =
+        Task.async(fn ->
+          assert_receive {:text, received_event}, 4_000
+          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+
+          refute_receive {:text, _}, 4_000
+        end)
+
+      Streamer.add_socket(
+        "direct",
+        %{transport_pid: task.pid, assigns: %{user: user}}
+      )
+
+      {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
+
+      Task.await(task)
+    end
+
+    test "it sends conversation update to the 'direct' stream when a message is deleted" do
+      user = insert(:user)
+      another_user = insert(:user)
+
+      {:ok, create_activity} =
+        CommonAPI.post(another_user, %{
+          "status" => "hi @#{user.nickname}",
+          "visibility" => "direct"
+        })
+
+      {:ok, create_activity2} =
+        CommonAPI.post(another_user, %{
+          "status" => "hi @#{user.nickname}",
+          "in_reply_to_status_id" => create_activity.id,
+          "visibility" => "direct"
+        })
+
+      task =
+        Task.async(fn ->
+          assert_receive {:text, received_event}, 4_000
+          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+
+          assert_receive {:text, received_event}, 4_000
+
+          assert %{"event" => "conversation", "payload" => received_payload} =
+                   Jason.decode!(received_event)
+
+          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+          assert last_status["id"] == to_string(create_activity.id)
+        end)
+
+      Streamer.add_socket(
+        "direct",
+        %{transport_pid: task.pid, assigns: %{user: user}}
+      )
+
+      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
+
+      Task.await(task)
+    end
+  end
 end