+ # We only receive pings for now
+ def websocket_handle(:ping, state), do: {:ok, state}
+
+ def websocket_handle({:text, ping}, state) when ping in ~w[ping PING] do
+ if state.timer, do: Process.cancel_timer(state.timer)
+ {:reply, {:text, "pong"}, %{state | timer: timer()}}
+ end
+
+ def websocket_handle({:text, text}, state) do
+ with {:ok, json} <- Jason.decode(text) do
+ websocket_handle({:json, json}, state)
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received text frame: #{text}")
+ {:ok, state}
+ end
+ end
+
+ def websocket_handle(
+ {:json, %{"type" => "subscribe", "stream" => stream_name}},
+ %{user: user, oauth_token: token} = state
+ ) do
+ with {:ok, topic} <- Streamer.get_topic(stream_name, user, token, %{}) do
+ new_subscriptions =
+ [topic | Map.get(state, :subscriptions, [])]
+ |> Enum.uniq()
+
+ {:ok, _topic} = Streamer.add_socket(topic, user)
+
+ {:ok, Map.put(state, :subscriptions, new_subscriptions)}
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received invalid topic: #{stream_name}")
+ {:ok, state}
+ end
+ end
+
+ def websocket_handle(frame, state) do
+ Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")