Merge branch 'develop' into activation-meta
[akkoma] / test / web / streamer / streamer_test.exs
index 8b8d8af6c51521dac9046715814c22d24fcc6731..245f6e63f53f7bccab9782c236cea453ecf0f943 100644 (file)
@@ -7,20 +7,90 @@ defmodule Pleroma.Web.StreamerTest do
 
   import Pleroma.Factory
 
 
   import Pleroma.Factory
 
+  alias Pleroma.Chat
+  alias Pleroma.Chat.MessageReference
   alias Pleroma.Conversation.Participation
   alias Pleroma.List
   alias Pleroma.Conversation.Participation
   alias Pleroma.List
+  alias Pleroma.Object
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
-  alias Pleroma.Web.Streamer.StreamerSocket
-  alias Pleroma.Web.Streamer.Worker
+  alias Pleroma.Web.StreamerView
 
   @moduletag needs_streamer: true, capture_log: true
 
 
   @moduletag needs_streamer: true, capture_log: true
 
-  @streamer_timeout 150
-  @streamer_start_wait 10
   setup do: clear_config([:instance, :skip_thread_containment])
 
   setup do: clear_config([:instance, :skip_thread_containment])
 
+  describe "get_topic without an user" do
+    test "allows public" do
+      assert {:ok, "public"} = Streamer.get_topic("public", nil)
+      assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil)
+      assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil)
+      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil)
+    end
+
+    test "allows hashtag streams" do
+      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"})
+    end
+
+    test "disallows user streams" do
+      assert {:error, _} = Streamer.get_topic("user", nil)
+      assert {:error, _} = Streamer.get_topic("user:notification", nil)
+      assert {:error, _} = Streamer.get_topic("direct", nil)
+    end
+
+    test "disallows list streams" do
+      assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42})
+    end
+  end
+
+  describe "get_topic with an user" do
+    setup do
+      user = insert(:user)
+      {:ok, %{user: user}}
+    end
+
+    test "allows public streams", %{user: user} do
+      assert {:ok, "public"} = Streamer.get_topic("public", user)
+      assert {:ok, "public:local"} = Streamer.get_topic("public:local", user)
+      assert {:ok, "public:media"} = Streamer.get_topic("public:media", user)
+      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user)
+    end
+
+    test "allows user streams", %{user: user} do
+      expected_user_topic = "user:#{user.id}"
+      expected_notif_topic = "user:notification:#{user.id}"
+      expected_direct_topic = "direct:#{user.id}"
+      assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user)
+      assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user)
+      assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user)
+    end
+
+    test "allows hashtag streams", %{user: user} do
+      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"})
+    end
+
+    test "disallows registering to an user stream", %{user: user} do
+      another_user = insert(:user)
+      assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user)
+      assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user)
+      assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user)
+    end
+
+    test "allows list stream that are owned by the user", %{user: user} do
+      {:ok, list} = List.create("Test", user)
+      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
+      assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id})
+    end
+
+    test "disallows list stream that are not owned by the user", %{user: user} do
+      another_user = insert(:user)
+      {:ok, list} = List.create("Test", another_user)
+      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
+      assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id})
+    end
+  end
+
   describe "user streams" do
     setup do
       user = insert(:user)
   describe "user streams" do
     setup do
       user = insert(:user)
@@ -29,69 +99,105 @@ defmodule Pleroma.Web.StreamerTest do
     end
 
     test "it streams the user's post in the 'user' stream", %{user: user} do
     end
 
     test "it streams the user's post in the 'user' stream", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
+      Streamer.get_topic_and_add_socket("user", user)
+      {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
+    end
 
 
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+    test "it streams boosts of the user in the 'user' stream", %{user: user} do
+      Streamer.get_topic_and_add_socket("user", user)
 
 
-      {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
+      other_user = insert(:user)
+      {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
+      {:ok, announce} = CommonAPI.repeat(activity.id, user)
 
 
-      Streamer.stream("user", activity)
-      Task.await(task)
+      assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+      refute Streamer.filtered_by_user?(user, announce)
     end
 
     end
 
-    test "it streams boosts of the user in the 'user' stream", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+    test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do
+      Streamer.get_topic_and_add_socket("user", user)
 
       other_user = insert(:user)
 
       other_user = insert(:user)
-      {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
-      {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
+      {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
 
 
-      Streamer.stream("user", announce)
-      Task.await(task)
-    end
+      data =
+        File.read!("test/fixtures/mastodon-announce.json")
+        |> Poison.decode!()
+        |> Map.put("object", activity.data["object"])
+        |> Map.put("actor", user.ap_id)
 
 
-    test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
+      {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
+        Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
 
 
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+      refute Streamer.filtered_by_user?(user, announce)
+    end
 
 
+    test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
+      Streamer.get_topic_and_add_socket("user", user)
       Streamer.stream("user", notify)
       Streamer.stream("user", notify)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
     end
 
     test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
+      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.stream("user:notification", notify)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
+    end
+
+    test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
+      other_user = insert(:user)
+
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
+      object = Object.normalize(create_activity, false)
+      chat = Chat.get(user.id, other_user.ap_id)
+      cm_ref = MessageReference.for_chat_and_object(chat, object)
+      cm_ref = %{cm_ref | chat: chat, object: object}
+
+      Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
+      Streamer.stream("user:pleroma_chat", {user, cm_ref})
+
+      text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+      assert text =~ "hey cirno"
+      assert_receive {:text, ^text}
+    end
+
+    test "it sends chat messages to the 'user' stream", %{user: user} do
+      other_user = insert(:user)
+
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
+      object = Object.normalize(create_activity, false)
+      chat = Chat.get(user.id, other_user.ap_id)
+      cm_ref = MessageReference.for_chat_and_object(chat, object)
+      cm_ref = %{cm_ref | chat: chat, object: object}
+
+      Streamer.get_topic_and_add_socket("user", user)
+      Streamer.stream("user", {user, cm_ref})
 
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+      assert text =~ "hey cirno"
+      assert_receive {:text, ^text}
+    end
+
+    test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do
+      other_user = insert(:user)
 
 
+      {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+
+      notify =
+        Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
+        |> Repo.preload(:activity)
+
+      Streamer.get_topic_and_add_socket("user:notification", user)
       Streamer.stream("user:notification", notify)
       Streamer.stream("user:notification", notify)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
@@ -100,18 +206,12 @@ defmodule Pleroma.Web.StreamerTest do
       blocked = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked)
 
       blocked = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked)
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.get_topic_and_add_socket("user:notification", user)
 
 
-      {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
-      {:ok, notif} = CommonAPI.favorite(blocked, activity.id)
+      {:ok, activity} = CommonAPI.post(user, %{status: ":("})
+      {:ok, _} = CommonAPI.favorite(blocked, activity.id)
 
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
@@ -119,45 +219,50 @@ defmodule Pleroma.Web.StreamerTest do
     } do
       user2 = insert(:user)
 
     } do
       user2 = insert(:user)
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
+      {:ok, _} = CommonAPI.add_mute(user, activity)
 
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.get_topic_and_add_socket("user:notification", user)
 
 
-      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-      {:ok, activity} = CommonAPI.add_mute(user, activity)
-      {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
+      assert Streamer.filtered_by_user?(user, favorite_activity)
     end
 
     end
 
-    test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
+    test "it sends favorite to 'user:notification' stream'", %{
       user: user
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       user: user
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
+      Streamer.get_topic_and_add_socket("user:notification", user)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert notif.activity.id == favorite_activity.id
+      refute Streamer.filtered_by_user?(user, notif)
+    end
+
+    test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
+      user: user
+    } do
+      user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
 
       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
-      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-      {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
+      Streamer.get_topic_and_add_socket("user:notification", user)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
+      assert Streamer.filtered_by_user?(user, favorite_activity)
     end
 
     test "it sends follow activities to the 'user:notification' stream", %{
       user: user
     } do
       user_url = user.ap_id
     end
 
     test "it sends follow activities to the 'user:notification' stream", %{
       user: user
     } do
       user_url = user.ap_id
+      user2 = insert(:user)
 
       body =
         File.read!("test/fixtures/users_mock/localhost.json")
 
       body =
         File.read!("test/fixtures/users_mock/localhost.json")
@@ -169,79 +274,57 @@ defmodule Pleroma.Web.StreamerTest do
           %Tesla.Env{status: 200, body: body}
       end)
 
           %Tesla.Env{status: 200, body: body}
       end)
 
-      user2 = insert(:user)
-      task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
-
-      Process.sleep(@streamer_start_wait)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
-      {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
+      Streamer.get_topic_and_add_socket("user:notification", user)
+      {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
 
 
-      # We don't directly pipe the notification to the streamer as it's already
-      # generated as a side effect of CommonAPI.follow().
-      Task.await(task)
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert notif.activity.id == follow_activity.id
+      refute Streamer.filtered_by_user?(user, notif)
     end
   end
 
     end
   end
 
-  test "it sends to public" do
+  test "it sends to public authenticated" do
     user = insert(:user)
     other_user = insert(:user)
 
     user = insert(:user)
     other_user = insert(:user)
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, @streamer_timeout
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user
-    }
-
-    {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
+    Streamer.get_topic_and_add_socket("public", other_user)
 
 
-    topics = %{
-      "public" => [fake_socket]
-    }
-
-    Worker.push_to_socket(topics, "public", activity)
+    {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
+    assert_receive {:render_with_user, _, _, ^activity}
+    refute Streamer.filtered_by_user?(user, activity)
+  end
 
 
-    Task.await(task)
+  test "works for deletions" do
+    user = insert(:user)
+    other_user = insert(:user)
+    {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
 
 
-    task =
-      Task.async(fn ->
-        expected_event =
-          %{
-            "event" => "delete",
-            "payload" => activity.id
-          }
-          |> Jason.encode!()
+    Streamer.get_topic_and_add_socket("public", user)
 
 
-        assert_receive {:text, received_event}, @streamer_timeout
-        assert received_event == expected_event
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user
-    }
+    {:ok, _} = CommonAPI.delete(activity.id, other_user)
+    activity_id = activity.id
+    assert_receive {:text, event}
+    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
+  end
 
 
-    {:ok, activity} = CommonAPI.delete(activity.id, other_user)
+  test "it sends to public unauthenticated" do
+    user = insert(:user)
 
 
-    topics = %{
-      "public" => [fake_socket]
-    }
+    Streamer.get_topic_and_add_socket("public", nil)
 
 
-    Worker.push_to_socket(topics, "public", activity)
+    {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
+    activity_id = activity.id
+    assert_receive {:text, event}
+    assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
+    assert %{"id" => ^activity_id} = Jason.decode!(payload)
 
 
-    Task.await(task)
+    {:ok, _} = CommonAPI.delete(activity.id, user)
+    assert_receive {:text, event}
+    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
   end
 
   describe "thread_containment" do
   end
 
   describe "thread_containment" do
-    test "it doesn't send to user if recipients invalid and thread containment is enabled" do
+    test "it filters to user if recipients invalid and thread containment is enabled" do
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
       user = insert(:user)
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
       user = insert(:user)
@@ -256,12 +339,10 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
             )
         )
 
-      task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
-
-      Task.await(task)
+      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.stream("public", activity)
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user, activity)
     end
 
     test "it sends message if recipients invalid and thread containment is disabled" do
     end
 
     test "it sends message if recipients invalid and thread containment is disabled" do
@@ -279,12 +360,11 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
             )
         )
 
-      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
+      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.stream("public", activity)
 
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
 
     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
     end
 
     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
@@ -302,255 +382,168 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
             )
         )
 
-      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
+      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.stream("public", activity)
 
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
   end
 
   describe "blocks" do
     end
   end
 
   describe "blocks" do
-    test "it doesn't send messages involving blocked users" do
+    test "it filters messages involving blocked users" do
       user = insert(:user)
       blocked_user = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked_user)
 
       user = insert(:user)
       blocked_user = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked_user)
 
-      {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
-
-      task =
-        Task.async(fn ->
-          refute_receive {:text, _}, 1_000
-        end)
-
-      fake_socket = %StreamerSocket{
-        transport_pid: task.pid,
-        user: user
-      }
-
-      topics = %{
-        "public" => [fake_socket]
-      }
-
-      Worker.push_to_socket(topics, "public", activity)
-
-      Task.await(task)
+      Streamer.get_topic_and_add_socket("public", user)
+      {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user, activity)
     end
 
     end
 
-    test "it doesn't send messages transitively involving blocked users" do
+    test "it filters messages transitively involving blocked users" do
       blocker = insert(:user)
       blockee = insert(:user)
       friend = insert(:user)
 
       blocker = insert(:user)
       blockee = insert(:user)
       friend = insert(:user)
 
-      task =
-        Task.async(fn ->
-          refute_receive {:text, _}, 1_000
-        end)
-
-      fake_socket = %StreamerSocket{
-        transport_pid: task.pid,
-        user: blocker
-      }
-
-      topics = %{
-        "public" => [fake_socket]
-      }
+      Streamer.get_topic_and_add_socket("public", blocker)
 
       {:ok, _user_relationship} = User.block(blocker, blockee)
 
 
       {:ok, _user_relationship} = User.block(blocker, blockee)
 
-      {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
-
-      Worker.push_to_socket(topics, "public", activity_one)
+      {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
 
 
-      {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
+      assert_receive {:render_with_user, _, _, ^activity_one}
+      assert Streamer.filtered_by_user?(blocker, activity_one)
 
 
-      Worker.push_to_socket(topics, "public", activity_two)
+      {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
 
 
-      {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
+      assert_receive {:render_with_user, _, _, ^activity_two}
+      assert Streamer.filtered_by_user?(blocker, activity_two)
 
 
-      Worker.push_to_socket(topics, "public", activity_three)
+      {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
 
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity_three}
+      assert Streamer.filtered_by_user?(blocker, activity_three)
     end
   end
 
     end
   end
 
-  test "it doesn't send unwanted DMs to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
-    user_c = insert(:user)
+  describe "lists" do
+    test "it doesn't send unwanted DMs to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
+      user_c = insert(:user)
 
 
-    {:ok, user_a} = User.follow(user_a, user_b)
+      {:ok, user_a} = User.follow(user_a, user_b)
 
 
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
 
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "@#{user_c.nickname} Test",
-        "visibility" => "direct"
-      })
+      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id})
 
 
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
-
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
-
-    Worker.handle_call({:stream, "list", activity}, self(), topics)
-
-    Task.await(task)
-  end
-
-  test "it doesn't send unwanted private posts to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
-
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
-
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "Test",
-        "visibility" => "private"
-      })
-
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
+      {:ok, _activity} =
+        CommonAPI.post(user_b, %{
+          status: "@#{user_c.nickname} Test",
+          visibility: "direct"
+        })
 
 
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
+      refute_receive _
+    end
 
 
-    Worker.handle_call({:stream, "list", activity}, self(), topics)
+    test "it doesn't send unwanted private posts to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
 
 
-    Task.await(task)
-  end
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
 
-  test "it sends wanted private posts to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
+      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id})
 
 
-    {:ok, user_a} = User.follow(user_a, user_b)
+      {:ok, _activity} =
+        CommonAPI.post(user_b, %{
+          status: "Test",
+          visibility: "private"
+        })
 
 
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
+      refute_receive _
+    end
 
 
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "Test",
-        "visibility" => "private"
-      })
+    test "it sends wanted private posts to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
 
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, 1_000
-      end)
+      {:ok, user_a} = User.follow(user_a, user_b)
 
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
 
-    Streamer.add_socket(
-      "list:#{list.id}",
-      fake_socket
-    )
+      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id})
 
 
-    Worker.handle_call({:stream, "list", activity}, self(), %{})
+      {:ok, activity} =
+        CommonAPI.post(user_b, %{
+          status: "Test",
+          visibility: "private"
+        })
 
 
-    Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user_a, activity)
+    end
   end
 
   end
 
-  test "it doesn't send muted reblogs" do
-    user1 = insert(:user)
-    user2 = insert(:user)
-    user3 = insert(:user)
-    CommonAPI.hide_reblogs(user1, user2)
-
-    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
-    {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
-
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user1
-    }
-
-    topics = %{
-      "public" => [fake_socket]
-    }
-
-    Worker.push_to_socket(topics, "public", announce_activity)
+  describe "muted reblogs" do
+    test "it filters muted reblogs" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      user3 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
 
-    Task.await(task)
-  end
+      {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
 
 
-  test "it does send non-reblog notification for reblog-muted actors" do
-    user1 = insert(:user)
-    user2 = insert(:user)
-    user3 = insert(:user)
-    CommonAPI.hide_reblogs(user1, user2)
+      Streamer.get_topic_and_add_socket("user", user1)
+      {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
+      assert_receive {:render_with_user, _, _, ^announce_activity}
+      assert Streamer.filtered_by_user?(user1, announce_activity)
+    end
 
 
-    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
-    {:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
+    test "it filters reblog notification for reblog-muted actors" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, 1_000
-      end)
+      {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
+      Streamer.get_topic_and_add_socket("user", user1)
+      {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
 
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user1
-    }
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert Streamer.filtered_by_user?(user1, notif)
+    end
 
 
-    topics = %{
-      "public" => [fake_socket]
-    }
+    test "it send non-reblog notification for reblog-muted actors" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
 
-    Worker.push_to_socket(topics, "public", favorite_activity)
+      {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
+      Streamer.get_topic_and_add_socket("user", user1)
+      {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
 
 
-    Task.await(task)
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      refute Streamer.filtered_by_user?(user1, notif)
+    end
   end
 
   end
 
-  test "it doesn't send posts from muted threads" do
+  test "it filters posts from muted threads" do
     user = insert(:user)
     user2 = insert(:user)
     user = insert(:user)
     user2 = insert(:user)
+    Streamer.get_topic_and_add_socket("user", user2)
     {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
     {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
-
-    {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-
-    {:ok, activity} = CommonAPI.add_mute(user2, activity)
-
-    task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
-    Streamer.add_socket(
-      "user",
-      %{transport_pid: task.pid, assigns: %{user: user2}}
-    )
-
-    Streamer.stream("user", activity)
-    Task.await(task)
+    {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
+    {:ok, _} = CommonAPI.add_mute(user2, activity)
+    assert_receive {:render_with_user, _, _, ^activity}
+    assert Streamer.filtered_by_user?(user2, activity)
   end
 
   describe "direct streams" do
   end
 
   describe "direct streams" do
@@ -562,103 +555,88 @@ defmodule Pleroma.Web.StreamerTest do
       user = insert(:user)
       another_user = insert(:user)
 
       user = insert(:user)
       another_user = insert(:user)
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-
-          assert %{"event" => "conversation", "payload" => received_payload} =
-                   Jason.decode!(received_event)
-
-          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
-          [participation] = Participation.for_user(user)
-          assert last_status["pleroma"]["direct_conversation_id"] == participation.id
-        end)
-
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.get_topic_and_add_socket("direct", user)
 
       {:ok, _create_activity} =
         CommonAPI.post(another_user, %{
 
       {:ok, _create_activity} =
         CommonAPI.post(another_user, %{
-          "status" => "hey @#{user.nickname}",
-          "visibility" => "direct"
+          status: "hey @#{user.nickname}",
+          visibility: "direct"
         })
 
         })
 
-      Task.await(task)
+      assert_receive {:text, received_event}
+
+      assert %{"event" => "conversation", "payload" => received_payload} =
+               Jason.decode!(received_event)
+
+      assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+      [participation] = Participation.for_user(user)
+      assert last_status["pleroma"]["direct_conversation_id"] == participation.id
     end
 
     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
       user = insert(:user)
       another_user = insert(:user)
 
     end
 
     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
       user = insert(:user)
       another_user = insert(:user)
 
+      Streamer.get_topic_and_add_socket("direct", user)
+
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
-          "status" => "hi @#{user.nickname}",
-          "visibility" => "direct"
+          status: "hi @#{user.nickname}",
+          visibility: "direct"
         })
 
         })
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
-
-          refute_receive {:text, _}, @streamer_timeout
-        end)
+      create_activity_id = create_activity.id
+      assert_receive {:render_with_user, _, _, ^create_activity}
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
 
 
-      Process.sleep(@streamer_start_wait)
+      {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
 
 
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert_receive {:text, received_event}
 
 
-      {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
+      assert %{"event" => "delete", "payload" => ^create_activity_id} =
+               Jason.decode!(received_event)
 
 
-      Task.await(task)
+      refute_receive _
     end
 
     test "it sends conversation update to the 'direct' stream when a message is deleted" do
       user = insert(:user)
       another_user = insert(:user)
     end
 
     test "it sends conversation update to the 'direct' stream when a message is deleted" do
       user = insert(:user)
       another_user = insert(:user)
+      Streamer.get_topic_and_add_socket("direct", user)
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
-          "status" => "hi @#{user.nickname}",
-          "visibility" => "direct"
+          status: "hi @#{user.nickname}",
+          visibility: "direct"
         })
 
       {:ok, create_activity2} =
         CommonAPI.post(another_user, %{
         })
 
       {:ok, create_activity2} =
         CommonAPI.post(another_user, %{
-          "status" => "hi @#{user.nickname}",
-          "in_reply_to_status_id" => create_activity.id,
-          "visibility" => "direct"
+          status: "hi @#{user.nickname} 2",
+          in_reply_to_status_id: create_activity.id,
+          visibility: "direct"
         })
 
         })
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+      assert_receive {:render_with_user, _, _, ^create_activity}
+      assert_receive {:render_with_user, _, _, ^create_activity2}
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
 
 
-          assert_receive {:text, received_event}, @streamer_timeout
-
-          assert %{"event" => "conversation", "payload" => received_payload} =
-                   Jason.decode!(received_event)
-
-          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
-          assert last_status["id"] == to_string(create_activity.id)
-        end)
+      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
 
 
-      Process.sleep(@streamer_start_wait)
+      assert_receive {:text, received_event}
+      assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
 
 
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert_receive {:text, received_event}
 
 
-      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
+      assert %{"event" => "conversation", "payload" => received_payload} =
+               Jason.decode!(received_event)
 
 
-      Task.await(task)
+      assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+      assert last_status["id"] == to_string(create_activity.id)
     end
   end
 end
     end
   end
 end