X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;ds=inline;f=lib%2Fpleroma%2Fweb%2Fmastodon_api%2Fwebsocket_handler.ex;h=bd7c562432549c6e46258a8dfbd49e533f6a8078;hb=a079ec3a3cdfd42d2cbd51c7698c2c87828e5778;hp=439cdd716d459a5a5124dab90d00e97fce731e27;hpb=a9938611f7f3771dc9cf10fb0d0d9f78c4e2f80d;p=akkoma
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 439cdd716..bd7c56243 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
+# Copyright © 2017-2021 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@@ -32,8 +32,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
- {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
- %{idle_timeout: @timeout}}
+ {:cowboy_websocket, req,
+ %{
+ user: user,
+ topic: topic,
+ count: 0,
+ timer: nil,
+ subscriptions: [],
+ oauth_token: oauth_token
+ }, %{idle_timeout: @timeout}}
else
{:error, :bad_topic} ->
Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
@@ -49,12 +56,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def websocket_init(state) do
Logger.debug(
- "#{__MODULE__} accepted websocket connection for user #{
- (state.user || %{id: "anonymous"}).id
- }, topic #{state.topic}"
+ "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}"
)
- Streamer.add_socket(state.topic, state.user)
+ Streamer.add_socket(state.topic, state.oauth_token)
{:ok, %{state | timer: timer()}}
end
@@ -67,16 +72,50 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
# We only receive pings for now
def websocket_handle(:ping, state), do: {:ok, state}
+ def websocket_handle({:text, ping}, state) when ping in ~w[ping PING] do
+ if state.timer, do: Process.cancel_timer(state.timer)
+ {:reply, {:text, "pong"}, %{state | timer: timer()}}
+ end
+
+ def websocket_handle({:text, text}, state) do
+ with {:ok, json} <- Jason.decode(text) do
+ websocket_handle({:json, json}, state)
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received text frame: #{text}")
+ {:ok, state}
+ end
+ end
+
+ def websocket_handle(
+ {:json, %{"type" => "subscribe", "stream" => stream_name}},
+ %{user: user, oauth_token: token} = state
+ ) do
+ with {:ok, topic} <- Streamer.get_topic(stream_name, user, token, %{}) do
+ new_subscriptions =
+ [topic | Map.get(state, :subscriptions, [])]
+ |> Enum.uniq()
+
+ {:ok, _topic} = Streamer.add_socket(topic, user)
+
+ {:ok, Map.put(state, :subscriptions, new_subscriptions)}
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received invalid topic: #{stream_name}")
+ {:ok, state}
+ end
+ end
+
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
- def websocket_info({:render_with_user, view, template, item}, state) do
+ def websocket_info({:render_with_user, view, template, item, topic}, state) do
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
unless Streamer.filtered_by_user?(user, item) do
- websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
+ websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user})
else
{:ok, state}
end
@@ -100,15 +139,17 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
{:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
end
+ def websocket_info(:close, state) do
+ {:stop, state}
+ end
+
# State can be `[]` only in case we terminate before switching to websocket,
# we already log errors for these cases in `init/1`, so just do nothing here
def terminate(_reason, _req, []), do: :ok
def terminate(reason, _req, state) do
Logger.debug(
- "#{__MODULE__} terminating websocket connection for user #{
- (state.user || %{id: "anonymous"}).id
- }, topic #{state.topic || "?"}: #{inspect(reason)}"
+ "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
Streamer.remove_socket(state.topic)