Cowboy handler for Mastodon WebSocket
authorhref <href@random.sh>
Mon, 17 Dec 2018 13:39:59 +0000 (14:39 +0100)
committerhref <href@random.sh>
Mon, 17 Dec 2018 16:06:18 +0000 (17:06 +0100)
config/config.exs
lib/pleroma/web/endpoint.ex
lib/pleroma/web/mastodon_api/mastodon_socket.ex [deleted file]
lib/pleroma/web/mastodon_api/websocket_handler.ex [new file with mode: 0644]
lib/pleroma/web/streamer.ex

index 1777a54c0918d0d8b2b1a158adb49160b364e841..df6ea09ae4c894dfa0d85fc5d43c5763a44b6716 100644 (file)
@@ -50,6 +50,15 @@ config :pleroma, :uri_schemes,
 # Configures the endpoint
 config :pleroma, Pleroma.Web.Endpoint,
   url: [host: "localhost"],
+  http: [
+    dispatch: [
+      {:_,
+       [
+         {"/api/v1/streaming", Elixir.Pleroma.Web.MastodonAPI.WebsocketHandler, []},
+         {:_, Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}}
+       ]}
+    ]
+  ],
   protocol: "https",
   secret_key_base: "aK4Abxf29xU9TTDKre9coZPUgevcVCFQJe/5xP/7Lt4BEif6idBIbjupVbOrbKxl",
   signing_salt: "CqaoopA2",
index aacc8819540035362c76b04444021b45d85bcd4d..e52667c72691245aabb10e1b06ae9788526478dc 100644 (file)
@@ -3,8 +3,6 @@ defmodule Pleroma.Web.Endpoint do
 
   socket("/socket", Pleroma.Web.UserSocket)
 
-  socket("/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket, websocket: [path: "/streaming"])
-
   # Serve at "/" the static files from "priv/static" directory.
   #
   # You should set gzip to true if you are running phoenix.digest
diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
deleted file mode 100644 (file)
index 1b75897..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
-  use Phoenix.Socket
-
-  alias Pleroma.Web.OAuth.Token
-  alias Pleroma.{User, Repo}
-
-  @spec connect(params :: map(), Phoenix.Socket.t()) :: {:ok, Phoenix.Socket.t()} | :error
-  def connect(%{"access_token" => token} = params, socket) do
-    with %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
-         %User{} = user <- Repo.get(User, user_id),
-         stream
-         when stream in [
-                "public",
-                "public:local",
-                "public:media",
-                "public:local:media",
-                "user",
-                "direct",
-                "list",
-                "hashtag"
-              ] <- params["stream"] do
-      topic =
-        case stream do
-          "hashtag" -> "hashtag:#{params["tag"]}"
-          "list" -> "list:#{params["list"]}"
-          _ -> stream
-        end
-
-      socket =
-        socket
-        |> assign(:topic, topic)
-        |> assign(:user, user)
-
-      Pleroma.Web.Streamer.add_socket(topic, socket)
-      {:ok, socket}
-    else
-      _e -> :error
-    end
-  end
-
-  def connect(%{"stream" => stream} = params, socket)
-      when stream in ["public", "public:local", "hashtag"] do
-    topic =
-      case stream do
-        "hashtag" -> "hashtag:#{params["tag"]}"
-        _ -> stream
-      end
-
-    socket =
-      socket
-      |> assign(:topic, topic)
-
-    Pleroma.Web.Streamer.add_socket(topic, socket)
-    {:ok, socket}
-  end
-
-  def connect(_params, _socket), do: :error
-
-  def id(_), do: nil
-
-  def handle(:text, message, _state) do
-    # | :ok
-    # | state
-    # | {:text, message}
-    # | {:text, message, state}
-    # | {:close, "Goodbye!"}
-    {:text, message}
-  end
-
-  def handle(:closed, _, %{socket: socket}) do
-    topic = socket.assigns[:topic]
-    Pleroma.Web.Streamer.remove_socket(topic, socket)
-  end
-end
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
new file mode 100644 (file)
index 0000000..11e0e16
--- /dev/null
@@ -0,0 +1,120 @@
+defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
+  require Logger
+
+  alias Pleroma.Web.OAuth.Token
+  alias Pleroma.{User, Repo}
+
+  @behaviour :cowboy_websocket_handler
+
+  @streams [
+    "public",
+    "public:local",
+    "public:media",
+    "public:local:media",
+    "user",
+    "direct",
+    "list",
+    "hashtag"
+  ]
+  @anonymous_streams ["public", "public:local", "hashtag"]
+
+  # Handled by periodic keepalive in Pleroma.Web.Streamer.
+  @timeout :infinity
+
+  def init(_type, _req, _opts) do
+    {:upgrade, :protocol, :cowboy_websocket}
+  end
+
+  def websocket_init(_type, req, _opts) do
+    with {qs, req} <- :cowboy_req.qs(req),
+         params <- :cow_qs.parse_qs(qs),
+         access_token <- List.keyfind(params, "access_token", 0),
+         {_, stream} <- List.keyfind(params, "stream", 0),
+         {:ok, user} <- allow_request(stream, access_token),
+         topic when is_binary(topic) <- expand_topic(stream, params) do
+      send(self(), :subscribe)
+      {:ok, req, %{user: user, topic: topic}, @timeout}
+    else
+      {:error, code} ->
+        Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
+        {:ok, req} = :cowboy_req.reply(code, req)
+        {:shutdown, req}
+
+      error ->
+        Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}")
+        {:shutdown, req}
+    end
+  end
+
+  # We never receive messages.
+  def websocket_handle(_frame, req, state) do
+    {:ok, req, state}
+  end
+
+  def websocket_info(:subscribe, req, state) do
+    Logger.debug(
+      "#{__MODULE__} accepted websocket connection for user #{
+        (state.user || %{id: "anonymous"}).id
+      }, topic #{state.topic}"
+    )
+
+    Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
+    {:ok, req, state}
+  end
+
+  def websocket_info({:text, message}, req, state) do
+    {:reply, {:text, message}, req, state}
+  end
+
+  def websocket_terminate(reason, _req, state) do
+    Logger.debug(
+      "#{__MODULE__} terminating websocket connection for user #{
+        (state.user || %{id: "anonymous"}).id
+      }, topic #{state.topic || "?"}: #{inspect(reason)}"
+    )
+
+    Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
+    :ok
+  end
+
+  # Public streams without authentication.
+  defp allow_request(stream, nil) when stream in @anonymous_streams do
+    {:ok, nil}
+  end
+
+  # Authenticated streams.
+  defp allow_request(stream, {"access_token", access_token}) when stream in @streams do
+    with %Token{user_id: user_id} <- Repo.get_by(Token, token: access_token),
+         user = %User{} <- Repo.get(User, user_id) do
+      {:ok, user}
+    else
+      _ -> {:error, 403}
+    end
+  end
+
+  # Not authenticated.
+  defp allow_request(stream, _) when stream in @streams, do: {:error, 403}
+
+  # No matching stream.
+  defp allow_request(_, _), do: {:error, 404}
+
+  defp expand_topic("hashtag", params) do
+    case List.keyfind(params, "tag", 0) do
+      {_, tag} -> "hashtag:#{tag}"
+      _ -> nil
+    end
+  end
+
+  defp expand_topic("list", params) do
+    case List.keyfind(params, "list", 0) do
+      {_, list} -> "list:#{list}"
+      _ -> nil
+    end
+  end
+
+  defp expand_topic(topic, _), do: topic
+
+  defp streamer_socket(state) do
+    %{transport_pid: self(), assigns: state}
+  end
+end
index 29c44e9d5e5f021357af71a3b2ecc390288fdeca..e1eecba4d297547347b7eb0ec1bafc162792cc27 100644 (file)
@@ -4,17 +4,9 @@ defmodule Pleroma.Web.Streamer do
   alias Pleroma.{User, Notification, Activity, Object, Repo}
   alias Pleroma.Web.ActivityPub.ActivityPub
 
-  def init(args) do
-    {:ok, args}
-  end
+  @keepalive_interval :timer.seconds(30)
 
   def start_link do
-    spawn(fn ->
-      # 30 seconds
-      Process.sleep(1000 * 30)
-      GenServer.cast(__MODULE__, %{action: :ping})
-    end)
-
     GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
   end
 
@@ -30,6 +22,16 @@ defmodule Pleroma.Web.Streamer do
     GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
   end
 
+  def init(args) do
+    spawn(fn ->
+      # 30 seconds
+      Process.sleep(@keepalive_interval)
+      GenServer.cast(__MODULE__, %{action: :ping})
+    end)
+
+    {:ok, args}
+  end
+
   def handle_cast(%{action: :ping}, topics) do
     Map.values(topics)
     |> List.flatten()
@@ -40,7 +42,7 @@ defmodule Pleroma.Web.Streamer do
 
     spawn(fn ->
       # 30 seconds
-      Process.sleep(1000 * 30)
+      Process.sleep(@keepalive_interval)
       GenServer.cast(__MODULE__, %{action: :ping})
     end)