- @streams [
- "public",
- "public:local",
- "public:media",
- "public:local:media",
- "user",
- "direct",
- "list",
- "hashtag"
- ]
- @anonymous_streams ["public", "public:local", "hashtag"]
-
- def init(%{qs: qs} = req, _state) do
- with params <- :cow_qs.parse_qs(qs),
- access_token <- List.keyfind(params, "access_token", 0),
- {_, stream} <- List.keyfind(params, "stream", 0),
- {:ok, user} <- allow_request(stream, access_token),
- topic when is_binary(topic) <- expand_topic(stream, params) do
- {:cowboy_websocket, req, %{user: user, topic: topic}}
+ # Client ping period.
+ @tick :timer.seconds(30)
+ # Cowboy timeout period.
+ @timeout :timer.seconds(60)
+ # Hibernate every X messages
+ @hibernate_every 100
+
+ def init(%{qs: qs} = req, state) do
+ with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
+ sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
+ access_token <- Map.get(params, "access_token"),
+ {:ok, user} <- authenticate_request(access_token, sec_websocket),
+ {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do
+ req =
+ if sec_websocket do
+ :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
+ else
+ req
+ end
+
+ {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
+ %{idle_timeout: @timeout}}