X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;ds=sidebyside;f=lib%2Fpleroma%2Fweb%2Fmastodon_api%2Fwebsocket_handler.ex;h=bd7c562432549c6e46258a8dfbd49e533f6a8078;hb=95e4018c1a17bd96331cdeb19d1c62a599061351;hp=e2ffd02d0a606081d2f2269a921d41b2a113af3e;hpb=570940a3fd8d5a2fb600656432dfc01304161221;p=akkoma diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index e2ffd02d0..bd7c56243 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors +# Copyright © 2017-2021 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do @@ -12,31 +12,19 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do @behaviour :cowboy_websocket + # Client ping period. + @tick :timer.seconds(30) # Cowboy timeout period. - @timeout :timer.seconds(30) + @timeout :timer.seconds(60) # Hibernate every X messages @hibernate_every 100 - @streams [ - "public", - "public:local", - "public:media", - "public:local:media", - "user", - "user:notification", - "direct", - "list", - "hashtag" - ] - @anonymous_streams ["public", "public:local", "hashtag"] - def init(%{qs: qs} = req, state) do - with params <- :cow_qs.parse_qs(qs), + with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), - access_token <- List.keyfind(params, "access_token", 0), - {_, stream} <- List.keyfind(params, "stream", 0), - {:ok, user} <- allow_request(stream, [access_token, sec_websocket]), - topic when is_binary(topic) <- expand_topic(stream, params) do + access_token <- Map.get(params, "access_token"), + {:ok, user, oauth_token} <- authenticate_request(access_token, sec_websocket), + {:ok, topic} <- Streamer.get_topic(params["stream"], user, oauth_token, params) do req = if sec_websocket do :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) @@ -44,41 +32,90 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do req end - {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}} + {:cowboy_websocket, req, + %{ + user: user, + topic: topic, + count: 0, + timer: nil, + subscriptions: [], + oauth_token: oauth_token + }, %{idle_timeout: @timeout}} else - {:error, code} -> - Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") - {:ok, req} = :cowboy_req.reply(code, req) + {:error, :bad_topic} -> + Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") + req = :cowboy_req.reply(404, req) {:ok, req, state} - error -> - Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}") - {:ok, req} = :cowboy_req.reply(400, req) + {:error, :unauthorized} -> + Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}") + req = :cowboy_req.reply(401, req) {:ok, req, state} end end def websocket_init(state) do Logger.debug( - "#{__MODULE__} accepted websocket connection for user #{ - (state.user || %{id: "anonymous"}).id - }, topic #{state.topic}" + "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}" ) - Streamer.add_socket(state.topic, state.user) - {:ok, state} + Streamer.add_socket(state.topic, state.oauth_token) + {:ok, %{state | timer: timer()}} + end + + # Client's Pong frame. + def websocket_handle(:pong, state) do + if state.timer, do: Process.cancel_timer(state.timer) + {:ok, %{state | timer: timer()}} end - # We never receive messages. - def websocket_handle(_frame, state) do + # We only receive pings for now + def websocket_handle(:ping, state), do: {:ok, state} + + def websocket_handle({:text, ping}, state) when ping in ~w[ping PING] do + if state.timer, do: Process.cancel_timer(state.timer) + {:reply, {:text, "pong"}, %{state | timer: timer()}} + end + + def websocket_handle({:text, text}, state) do + with {:ok, json} <- Jason.decode(text) do + websocket_handle({:json, json}, state) + else + _ -> + Logger.error("#{__MODULE__} received text frame: #{text}") + {:ok, state} + end + end + + def websocket_handle( + {:json, %{"type" => "subscribe", "stream" => stream_name}}, + %{user: user, oauth_token: token} = state + ) do + with {:ok, topic} <- Streamer.get_topic(stream_name, user, token, %{}) do + new_subscriptions = + [topic | Map.get(state, :subscriptions, [])] + |> Enum.uniq() + + {:ok, _topic} = Streamer.add_socket(topic, user) + + {:ok, Map.put(state, :subscriptions, new_subscriptions)} + else + _ -> + Logger.error("#{__MODULE__} received invalid topic: #{stream_name}") + {:ok, state} + end + end + + def websocket_handle(frame, state) do + Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") {:ok, state} end - def websocket_info({:render_with_user, view, template, item}, state) do + def websocket_info({:render_with_user, view, template, item, topic}, state) do user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) unless Streamer.filtered_by_user?(user, item) do - websocket_info({:text, view.render(template, item, user)}, %{state | user: user}) + websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user}) else {:ok, state} end @@ -94,11 +131,25 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end end + # Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received. + # As we hibernate there, reset the count to 0. + # If the client misses :pong, Cowboy will automatically timeout the connection after + # `@idle_timeout`. + def websocket_info(:tick, state) do + {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate} + end + + def websocket_info(:close, state) do + {:stop, state} + end + + # State can be `[]` only in case we terminate before switching to websocket, + # we already log errors for these cases in `init/1`, so just do nothing here + def terminate(_reason, _req, []), do: :ok + def terminate(reason, _req, state) do Logger.debug( - "#{__MODULE__} terminating websocket connection for user #{ - (state.user || %{id: "anonymous"}).id - }, topic #{state.topic || "?"}: #{inspect(reason)}" + "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}" ) Streamer.remove_socket(state.topic) @@ -106,47 +157,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end # Public streams without authentication. - defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do - {:ok, nil} + defp authenticate_request(nil, nil) do + {:ok, nil, nil} end # Authenticated streams. - defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do - token = - with {"access_token", token} <- access_token do - token - else - _ -> sec_websocket - end + defp authenticate_request(access_token, sec_websocket) do + token = access_token || sec_websocket with true <- is_bitstring(token), - %Token{user_id: user_id} <- Repo.get_by(Token, token: token), + oauth_token = %Token{user_id: user_id} <- Repo.get_by(Token, token: token), user = %User{} <- User.get_cached_by_id(user_id) do - {:ok, user} + {:ok, user, oauth_token} else - _ -> {:error, 403} + _ -> {:error, :unauthorized} end end - # Not authenticated. - defp allow_request(stream, _) when stream in @streams, do: {:error, 403} - - # No matching stream. - defp allow_request(_, _), do: {:error, 404} - - defp expand_topic("hashtag", params) do - case List.keyfind(params, "tag", 0) do - {_, tag} -> "hashtag:#{tag}" - _ -> nil - end + defp timer do + Process.send_after(self(), :tick, @tick) end - - defp expand_topic("list", params) do - case List.keyfind(params, "list", 0) do - {_, list} -> "list:#{list}" - _ -> nil - end - end - - defp expand_topic(topic, _), do: topic end