+ Streamer.add_socket(state.topic, state.user)
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # Client's Pong frame.
+ def websocket_handle(:pong, state) do
+ if state.timer, do: Process.cancel_timer(state.timer)
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # We only receive pings for now
+ def websocket_handle(:ping, state), do: {:ok, state}
+
+ def websocket_handle(frame, state) do
+ Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
+ {:ok, state}
+ end
+
+ def websocket_info({:render_with_user, view, template, item}, state) do
+ user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
+
+ unless Streamer.filtered_by_user?(user, item) do
+ websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
+ else
+ {:ok, state}
+ end
+ end
+
+ def websocket_info({:text, message}, state) do
+ # If the websocket processed X messages, force an hibernate/GC.
+ # We don't hibernate at every message to balance CPU usage/latency with RAM usage.
+ if state.count > @hibernate_every do
+ {:reply, {:text, message}, %{state | count: 0}, :hibernate}
+ else
+ {:reply, {:text, message}, %{state | count: state.count + 1}}
+ end