X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fmastodon_api%2Fwebsocket_handler.ex;h=bd7c562432549c6e46258a8dfbd49e533f6a8078;hb=95e4018c1a17bd96331cdeb19d1c62a599061351;hp=b978167b6288f95b3e8b042fb0b9625c2429e37f;hpb=6eb7d69e60a96e577de92de232ed48e509f23cd4;p=akkoma diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index b978167b6..bd7c56243 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -32,8 +32,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do req end - {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil}, - %{idle_timeout: @timeout}} + {:cowboy_websocket, req, + %{ + user: user, + topic: topic, + count: 0, + timer: nil, + subscriptions: [], + oauth_token: oauth_token + }, %{idle_timeout: @timeout}} else {:error, :bad_topic} -> Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") @@ -52,7 +59,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}" ) - Streamer.add_socket(state.topic, state.user) + Streamer.add_socket(state.topic, state.oauth_token) {:ok, %{state | timer: timer()}} end @@ -65,16 +72,50 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do # We only receive pings for now def websocket_handle(:ping, state), do: {:ok, state} + def websocket_handle({:text, ping}, state) when ping in ~w[ping PING] do + if state.timer, do: Process.cancel_timer(state.timer) + {:reply, {:text, "pong"}, %{state | timer: timer()}} + end + + def websocket_handle({:text, text}, state) do + with {:ok, json} <- Jason.decode(text) do + websocket_handle({:json, json}, state) + else + _ -> + Logger.error("#{__MODULE__} received text frame: #{text}") + {:ok, state} + end + end + + def websocket_handle( + {:json, %{"type" => "subscribe", "stream" => stream_name}}, + %{user: user, oauth_token: token} = state + ) do + with {:ok, topic} <- Streamer.get_topic(stream_name, user, token, %{}) do + new_subscriptions = + [topic | Map.get(state, :subscriptions, [])] + |> Enum.uniq() + + {:ok, _topic} = Streamer.add_socket(topic, user) + + {:ok, Map.put(state, :subscriptions, new_subscriptions)} + else + _ -> + Logger.error("#{__MODULE__} received invalid topic: #{stream_name}") + {:ok, state} + end + end + def websocket_handle(frame, state) do Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") {:ok, state} end - def websocket_info({:render_with_user, view, template, item}, state) do + def websocket_info({:render_with_user, view, template, item, topic}, state) do user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) unless Streamer.filtered_by_user?(user, item) do - websocket_info({:text, view.render(template, item, user)}, %{state | user: user}) + websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user}) else {:ok, state} end @@ -98,6 +139,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate} end + def websocket_info(:close, state) do + {:stop, state} + end + # State can be `[]` only in case we terminate before switching to websocket, # we already log errors for these cases in `init/1`, so just do nothing here def terminate(_reason, _req, []), do: :ok