Streamer rework
[akkoma] / test / web / streamer / streamer_test.exs
index 3c0f240f54168daaa4e91e8567f0bdb00be04a13..ee530f4e94515db157efd0e5264b9b63e3abc3ca 100644 (file)
@@ -12,13 +12,9 @@ defmodule Pleroma.Web.StreamerTest do
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
-  alias Pleroma.Web.Streamer.StreamerSocket
-  alias Pleroma.Web.Streamer.Worker
 
   @moduletag needs_streamer: true, capture_log: true
 
 
   @moduletag needs_streamer: true, capture_log: true
 
-  @streamer_timeout 150
-  @streamer_start_wait 10
   setup do: clear_config([:instance, :skip_thread_containment])
 
   describe "user streams" do
   setup do: clear_config([:instance, :skip_thread_containment])
 
   describe "user streams" do
@@ -29,69 +25,35 @@ defmodule Pleroma.Web.StreamerTest do
     end
 
     test "it streams the user's post in the 'user' stream", %{user: user} do
     end
 
     test "it streams the user's post in the 'user' stream", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
+      Streamer.add_socket("user", user)
       {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
       {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
-
-      Streamer.stream("user", activity)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
 
     test "it streams boosts of the user in the 'user' stream", %{user: user} do
     end
 
     test "it streams boosts of the user in the 'user' stream", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user", user)
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
       {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
 
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
       {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
 
-      Streamer.stream("user", announce)
-      Task.await(task)
+      assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+      refute Streamer.filtered_by_user?(user, announce)
     end
 
     test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
     end
 
     test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
+      Streamer.add_socket("user", user)
       Streamer.stream("user", notify)
       Streamer.stream("user", notify)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
     end
 
     test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
+      Streamer.add_socket("user:notification", user)
       Streamer.stream("user:notification", notify)
       Streamer.stream("user:notification", notify)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
@@ -100,18 +62,12 @@ defmodule Pleroma.Web.StreamerTest do
       blocked = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked)
 
       blocked = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked)
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user:notification", user)
 
       {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
 
       {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
-      {:ok, notif} = CommonAPI.favorite(blocked, activity.id)
+      {:ok, _} = CommonAPI.favorite(blocked, activity.id)
 
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
@@ -119,45 +75,50 @@ defmodule Pleroma.Web.StreamerTest do
     } do
       user2 = insert(:user)
 
     } do
       user2 = insert(:user)
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
+      {:ok, _} = CommonAPI.add_mute(user, activity)
 
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user:notification", user)
 
 
-      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-      {:ok, activity} = CommonAPI.add_mute(user, activity)
-      {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
+      assert Streamer.filtered_by_user?(user, favorite_activity)
     end
 
     end
 
-    test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
+    test "it sends favorite to 'user:notification' stream'", %{
       user: user
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       user: user
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
+      Streamer.add_socket("user:notification", user)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
+
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert notif.activity.id == favorite_activity.id
+      refute Streamer.filtered_by_user?(user, notif)
+    end
 
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+    test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
+      user: user
+    } do
+      user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
       {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
 
       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
       {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-      {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+      Streamer.add_socket("user:notification", user)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
+      assert Streamer.filtered_by_user?(user, favorite_activity)
     end
 
     test "it sends follow activities to the 'user:notification' stream", %{
       user: user
     } do
       user_url = user.ap_id
     end
 
     test "it sends follow activities to the 'user:notification' stream", %{
       user: user
     } do
       user_url = user.ap_id
+      user2 = insert(:user)
 
       body =
         File.read!("test/fixtures/users_mock/localhost.json")
 
       body =
         File.read!("test/fixtures/users_mock/localhost.json")
@@ -169,47 +130,24 @@ defmodule Pleroma.Web.StreamerTest do
           %Tesla.Env{status: 200, body: body}
       end)
 
           %Tesla.Env{status: 200, body: body}
       end)
 
-      user2 = insert(:user)
-      task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
-
-      Process.sleep(@streamer_start_wait)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user:notification", user)
+      {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
 
 
-      {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
-
-      # We don't directly pipe the notification to the streamer as it's already
-      # generated as a side effect of CommonAPI.follow().
-      Task.await(task)
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert notif.activity.id == follow_activity.id
+      refute Streamer.filtered_by_user?(user, notif)
     end
   end
 
     end
   end
 
-  test "it sends to public" do
+  test "it sends to public authenticated" do
     user = insert(:user)
     other_user = insert(:user)
 
     user = insert(:user)
     other_user = insert(:user)
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, @streamer_timeout
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user
-    }
-
-    {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
+    Streamer.add_socket("public", other_user)
 
 
-    topics = %{
-      "public" => [fake_socket]
-    }
-
-    Worker.push_to_socket(topics, "public", activity)
-
-    Task.await(task)
+    {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+    assert_receive {:render_with_user, _, _, ^activity}
+    refute Streamer.filtered_by_user?(user, activity)
   end
 
   test "works for deletions" do
   end
 
   test "works for deletions" do
@@ -217,37 +155,32 @@ defmodule Pleroma.Web.StreamerTest do
     other_user = insert(:user)
     {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
 
     other_user = insert(:user)
     {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
 
-    task =
-      Task.async(fn ->
-        expected_event =
-          %{
-            "event" => "delete",
-            "payload" => activity.id
-          }
-          |> Jason.encode!()
-
-        assert_receive {:text, received_event}, @streamer_timeout
-        assert received_event == expected_event
-      end)
+    Streamer.add_socket("public", user)
 
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user
-    }
+    {:ok, _} = CommonAPI.delete(activity.id, other_user)
+    activity_id = activity.id
+    assert_receive {:text, event}
+    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
+  end
 
 
-    {:ok, activity} = CommonAPI.delete(activity.id, other_user)
+  test "it sends to public unauthenticated" do
+    user = insert(:user)
 
 
-    topics = %{
-      "public" => [fake_socket]
-    }
+    Streamer.add_socket("public", nil)
 
 
-    Worker.push_to_socket(topics, "public", activity)
+    {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+    activity_id = activity.id
+    assert_receive {:text, event}
+    assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
+    assert %{"id" => ^activity_id} = Jason.decode!(payload)
 
 
-    Task.await(task)
+    {:ok, _} = CommonAPI.delete(activity.id, user)
+    assert_receive {:text, event}
+    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
   end
 
   describe "thread_containment" do
   end
 
   describe "thread_containment" do
-    test "it doesn't send to user if recipients invalid and thread containment is enabled" do
+    test "it filters to user if recipients invalid and thread containment is enabled" do
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
       user = insert(:user)
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
       user = insert(:user)
@@ -262,12 +195,10 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
             )
         )
 
-      task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
-
-      Task.await(task)
+      Streamer.add_socket("public", user)
+      Streamer.stream("public", activity)
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user, activity)
     end
 
     test "it sends message if recipients invalid and thread containment is disabled" do
     end
 
     test "it sends message if recipients invalid and thread containment is disabled" do
@@ -285,12 +216,11 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
             )
         )
 
-      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
+      Streamer.add_socket("public", user)
+      Streamer.stream("public", activity)
 
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
 
     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
     end
 
     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
@@ -308,255 +238,168 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
             )
         )
 
-      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
+      Streamer.add_socket("public", user)
+      Streamer.stream("public", activity)
 
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
   end
 
   describe "blocks" do
     end
   end
 
   describe "blocks" do
-    test "it doesn't send messages involving blocked users" do
+    test "it filters messages involving blocked users" do
       user = insert(:user)
       blocked_user = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked_user)
 
       user = insert(:user)
       blocked_user = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked_user)
 
+      Streamer.add_socket("public", user)
       {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
       {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
-
-      task =
-        Task.async(fn ->
-          refute_receive {:text, _}, 1_000
-        end)
-
-      fake_socket = %StreamerSocket{
-        transport_pid: task.pid,
-        user: user
-      }
-
-      topics = %{
-        "public" => [fake_socket]
-      }
-
-      Worker.push_to_socket(topics, "public", activity)
-
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user, activity)
     end
 
     end
 
-    test "it doesn't send messages transitively involving blocked users" do
+    test "it filters messages transitively involving blocked users" do
       blocker = insert(:user)
       blockee = insert(:user)
       friend = insert(:user)
 
       blocker = insert(:user)
       blockee = insert(:user)
       friend = insert(:user)
 
-      task =
-        Task.async(fn ->
-          refute_receive {:text, _}, 1_000
-        end)
-
-      fake_socket = %StreamerSocket{
-        transport_pid: task.pid,
-        user: blocker
-      }
-
-      topics = %{
-        "public" => [fake_socket]
-      }
+      Streamer.add_socket("public", blocker)
 
       {:ok, _user_relationship} = User.block(blocker, blockee)
 
       {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
 
 
       {:ok, _user_relationship} = User.block(blocker, blockee)
 
       {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
 
-      Worker.push_to_socket(topics, "public", activity_one)
+      assert_receive {:render_with_user, _, _, ^activity_one}
+      assert Streamer.filtered_by_user?(blocker, activity_one)
 
       {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
 
 
       {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
 
-      Worker.push_to_socket(topics, "public", activity_two)
+      assert_receive {:render_with_user, _, _, ^activity_two}
+      assert Streamer.filtered_by_user?(blocker, activity_two)
 
       {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
 
 
       {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
 
-      Worker.push_to_socket(topics, "public", activity_three)
-
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity_three}
+      assert Streamer.filtered_by_user?(blocker, activity_three)
     end
   end
 
     end
   end
 
-  test "it doesn't send unwanted DMs to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
-    user_c = insert(:user)
-
-    {:ok, user_a} = User.follow(user_a, user_b)
-
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
-
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "@#{user_c.nickname} Test",
-        "visibility" => "direct"
-      })
-
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
-
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
-
-    Worker.handle_call({:stream, "list", activity}, self(), topics)
-
-    Task.await(task)
-  end
-
-  test "it doesn't send unwanted private posts to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
+  describe "lists" do
+    test "it doesn't send unwanted DMs to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
+      user_c = insert(:user)
 
 
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
+      {:ok, user_a} = User.follow(user_a, user_b)
 
 
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "Test",
-        "visibility" => "private"
-      })
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
 
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
+      Streamer.add_socket("list:#{list.id}", user_a)
 
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
+      {:ok, _activity} =
+        CommonAPI.post(user_b, %{
+          "status" => "@#{user_c.nickname} Test",
+          "visibility" => "direct"
+        })
 
 
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
+      refute_receive _
+    end
 
 
-    Worker.handle_call({:stream, "list", activity}, self(), topics)
+    test "it doesn't send unwanted private posts to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
 
 
-    Task.await(task)
-  end
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
 
-  test "it sends wanted private posts to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
+      Streamer.add_socket("list:#{list.id}", user_a)
 
 
-    {:ok, user_a} = User.follow(user_a, user_b)
+      {:ok, _activity} =
+        CommonAPI.post(user_b, %{
+          "status" => "Test",
+          "visibility" => "private"
+        })
 
 
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
+      refute_receive _
+    end
 
 
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "Test",
-        "visibility" => "private"
-      })
+    test "it sends wanted private posts to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
 
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, 1_000
-      end)
+      {:ok, user_a} = User.follow(user_a, user_b)
 
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
 
-    Streamer.add_socket(
-      "list:#{list.id}",
-      fake_socket
-    )
+      Streamer.add_socket("list:#{list.id}", user_a)
 
 
-    Worker.handle_call({:stream, "list", activity}, self(), %{})
+      {:ok, activity} =
+        CommonAPI.post(user_b, %{
+          "status" => "Test",
+          "visibility" => "private"
+        })
 
 
-    Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user_a, activity)
+    end
   end
 
   end
 
-  test "it doesn't send muted reblogs" do
-    user1 = insert(:user)
-    user2 = insert(:user)
-    user3 = insert(:user)
-    CommonAPI.hide_reblogs(user1, user2)
-
-    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
-    {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
-
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user1
-    }
-
-    topics = %{
-      "public" => [fake_socket]
-    }
-
-    Worker.push_to_socket(topics, "public", announce_activity)
+  describe "muted reblogs" do
+    test "it filters muted reblogs" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      user3 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
 
-    Task.await(task)
-  end
+      {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
 
 
-  test "it does send non-reblog notification for reblog-muted actors" do
-    user1 = insert(:user)
-    user2 = insert(:user)
-    user3 = insert(:user)
-    CommonAPI.hide_reblogs(user1, user2)
+      Streamer.add_socket("user", user1)
+      {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
+      assert_receive {:render_with_user, _, _, ^announce_activity}
+      assert Streamer.filtered_by_user?(user1, announce_activity)
+    end
 
 
-    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
-    {:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
+    test "it filters reblog notification for reblog-muted actors" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, 1_000
-      end)
+      {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
+      Streamer.add_socket("user", user1)
+      {:ok, _favorite_activity, _} = CommonAPI.repeat(create_activity.id, user2)
 
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user1
-    }
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert Streamer.filtered_by_user?(user1, notif)
+    end
 
 
-    topics = %{
-      "public" => [fake_socket]
-    }
+    test "it send non-reblog notification for reblog-muted actors" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
 
-    Worker.push_to_socket(topics, "public", favorite_activity)
+      {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
+      Streamer.add_socket("user", user1)
+      {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
 
 
-    Task.await(task)
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      refute Streamer.filtered_by_user?(user1, notif)
+    end
   end
 
   end
 
-  test "it doesn't send posts from muted threads" do
+  test "it filters posts from muted threads" do
     user = insert(:user)
     user2 = insert(:user)
     user = insert(:user)
     user2 = insert(:user)
+    Streamer.add_socket("user", user2)
     {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
     {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
-
     {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
     {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-
-    {:ok, activity} = CommonAPI.add_mute(user2, activity)
-
-    task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
-    Streamer.add_socket(
-      "user",
-      %{transport_pid: task.pid, assigns: %{user: user2}}
-    )
-
-    Streamer.stream("user", activity)
-    Task.await(task)
+    {:ok, _} = CommonAPI.add_mute(user2, activity)
+    assert_receive {:render_with_user, _, _, ^activity}
+    assert Streamer.filtered_by_user?(user2, activity)
   end
 
   describe "direct streams" do
   end
 
   describe "direct streams" do
@@ -568,22 +411,7 @@ defmodule Pleroma.Web.StreamerTest do
       user = insert(:user)
       another_user = insert(:user)
 
       user = insert(:user)
       another_user = insert(:user)
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-
-          assert %{"event" => "conversation", "payload" => received_payload} =
-                   Jason.decode!(received_event)
-
-          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
-          [participation] = Participation.for_user(user)
-          assert last_status["pleroma"]["direct_conversation_id"] == participation.id
-        end)
-
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("direct", user)
 
       {:ok, _create_activity} =
         CommonAPI.post(another_user, %{
 
       {:ok, _create_activity} =
         CommonAPI.post(another_user, %{
@@ -591,42 +419,47 @@ defmodule Pleroma.Web.StreamerTest do
           "visibility" => "direct"
         })
 
           "visibility" => "direct"
         })
 
-      Task.await(task)
+      assert_receive {:text, received_event}
+
+      assert %{"event" => "conversation", "payload" => received_payload} =
+               Jason.decode!(received_event)
+
+      assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+      [participation] = Participation.for_user(user)
+      assert last_status["pleroma"]["direct_conversation_id"] == participation.id
     end
 
     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
       user = insert(:user)
       another_user = insert(:user)
 
     end
 
     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
       user = insert(:user)
       another_user = insert(:user)
 
+      Streamer.add_socket("direct", user)
+
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
           "status" => "hi @#{user.nickname}",
           "visibility" => "direct"
         })
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
           "status" => "hi @#{user.nickname}",
           "visibility" => "direct"
         })
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+      create_activity_id = create_activity.id
+      assert_receive {:render_with_user, _, _, ^create_activity}
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
 
 
-          refute_receive {:text, _}, @streamer_timeout
-        end)
+      {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
 
 
-      Process.sleep(@streamer_start_wait)
+      assert_receive {:text, received_event}
 
 
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert %{"event" => "delete", "payload" => ^create_activity_id} =
+               Jason.decode!(received_event)
 
 
-      {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
-
-      Task.await(task)
+      refute_receive _
     end
 
     test "it sends conversation update to the 'direct' stream when a message is deleted" do
       user = insert(:user)
       another_user = insert(:user)
     end
 
     test "it sends conversation update to the 'direct' stream when a message is deleted" do
       user = insert(:user)
       another_user = insert(:user)
+      Streamer.add_socket("direct", user)
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
@@ -636,35 +469,30 @@ defmodule Pleroma.Web.StreamerTest do
 
       {:ok, create_activity2} =
         CommonAPI.post(another_user, %{
 
       {:ok, create_activity2} =
         CommonAPI.post(another_user, %{
-          "status" => "hi @#{user.nickname}",
+          "status" => "hi @#{user.nickname} 2",
           "in_reply_to_status_id" => create_activity.id,
           "visibility" => "direct"
         })
 
           "in_reply_to_status_id" => create_activity.id,
           "visibility" => "direct"
         })
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
-
-          assert_receive {:text, received_event}, @streamer_timeout
+      assert_receive {:render_with_user, _, _, ^create_activity}
+      assert_receive {:render_with_user, _, _, ^create_activity2}
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
 
 
-          assert %{"event" => "conversation", "payload" => received_payload} =
-                   Jason.decode!(received_event)
-
-          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
-          assert last_status["id"] == to_string(create_activity.id)
-        end)
+      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
 
 
-      Process.sleep(@streamer_start_wait)
+      assert_receive {:text, received_event}
+      assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
 
 
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert_receive {:text, received_event}
 
 
-      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
+      assert %{"event" => "conversation", "payload" => received_payload} =
+               Jason.decode!(received_event)
 
 
-      Task.await(task)
+      assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+      assert last_status["id"] == to_string(create_activity.id)
     end
   end
 end
     end
   end
 end