X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fmastodon_api%2Fwebsocket_handler.ex;h=94e4595d84a932c8faf08a3382763c884fe465b6;hb=d1e1057e22c484a9ad3e3e28ad65b14088903019;hp=b1aebe01420918cf23ac4681baf144615df68b59;hpb=a5ccb5b0b1032b102c54d4df2e17c61423089e73;p=akkoma diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index b1aebe014..94e4595d8 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -12,8 +12,12 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do @behaviour :cowboy_websocket - # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. - @timeout :infinity + # Client ping period. + @tick :timer.seconds(30) + # Cowboy timeout period. + @timeout :timer.seconds(60) + # Hibernate every X messages + @hibernate_every 100 def init(%{qs: qs} = req, state) do with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), @@ -28,7 +32,8 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do req end - {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}} + {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil}, + %{idle_timeout: @timeout}} else {:error, :bad_topic} -> Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") @@ -43,28 +48,54 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end def websocket_init(state) do - send(self(), :subscribe) - {:ok, state} - end - - # We never receive messages. - def websocket_handle(_frame, state) do - {:ok, state} - end - - def websocket_info(:subscribe, state) do Logger.debug( "#{__MODULE__} accepted websocket connection for user #{ (state.user || %{id: "anonymous"}).id }, topic #{state.topic}" ) - Streamer.add_socket(state.topic, streamer_socket(state)) + Streamer.add_socket(state.topic, state.user) + {:ok, %{state | timer: timer()}} + end + + # Client's Pong frame. + def websocket_handle(:pong, state) do + if state.timer, do: Process.cancel_timer(state.timer) + {:ok, %{state | timer: timer()}} + end + + # We never receive messages. + def websocket_handle(frame, state) do + Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") {:ok, state} end + def websocket_info({:render_with_user, view, template, item}, state) do + user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) + + unless Streamer.filtered_by_user?(user, item) do + websocket_info({:text, view.render(template, item, user)}, %{state | user: user}) + else + {:ok, state} + end + end + def websocket_info({:text, message}, state) do - {:reply, {:text, message}, state} + # If the websocket processed X messages, force an hibernate/GC. + # We don't hibernate at every message to balance CPU usage/latency with RAM usage. + if state.count > @hibernate_every do + {:reply, {:text, message}, %{state | count: 0}, :hibernate} + else + {:reply, {:text, message}, %{state | count: state.count + 1}} + end + end + + # Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received. + # As we hibernate there, reset the count to 0. + # If the client misses :pong, Cowboy will automatically timeout the connection after + # `@idle_timeout`. + def websocket_info(:tick, state) do + {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate} end def terminate(reason, _req, state) do @@ -74,7 +105,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do }, topic #{state.topic || "?"}: #{inspect(reason)}" ) - Streamer.remove_socket(state.topic, streamer_socket(state)) + Streamer.remove_socket(state.topic) :ok end @@ -96,7 +127,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end end - defp streamer_socket(state) do - %{transport_pid: self(), assigns: state} + defp timer do + Process.send_after(self(), :tick, @tick) end end