+ describe "user streams" do
+ setup do
+ GenServer.start(Streamer, %{}, name: Streamer)
+
+ on_exit(fn ->
+ if pid = Process.whereis(Streamer) do
+ Process.exit(pid, :kill)
+ end
+ end)
+
+ user = insert(:user)
+ notify = insert(:notification, user: user, activity: build(:note_activity))
+ {:ok, %{user: user, notify: notify}}
+ end
+
+ test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _}, 4_000
+ end)
+
+ Streamer.add_socket(
+ "user",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+
+ Streamer.stream("user", notify)
+ Task.await(task)
+ end
+
+ test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _}, 4_000
+ end)
+
+ Streamer.add_socket(
+ "user:notification",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+
+ Streamer.stream("user:notification", notify)
+ Task.await(task)
+ end
+ end
+