MastoAPI: Websocket streaming for federated timeline.
authorRoger Braun <roger@rogerbraun.net>
Sat, 11 Nov 2017 19:00:11 +0000 (20:00 +0100)
committerRoger Braun <roger@rogerbraun.net>
Sat, 11 Nov 2017 19:00:11 +0000 (20:00 +0100)
lib/pleroma/web/mastodon_api/mastodon_socket.ex
lib/pleroma/web/streamer.ex

index c27d025c488f7348936275ab007558519634598e..f9c8cec321e6e4def2ef35d04d19118346550f71 100644 (file)
@@ -1,12 +1,18 @@
 defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
   use Phoenix.Socket
 
-  transport :streaming, Phoenix.Transports.WebSocket.Raw
+  transport :streaming, Phoenix.Transports.WebSocket.Raw,
+    timeout: :infinity # We never receive data.
 
   def connect(params, socket) do
-    IO.inspect(params)
-    Pleroma.Web.Streamer.add_socket(params["stream"], socket)
-    {:ok, socket}
+    if params["stream"] == "public" do
+      socket = socket
+      |> assign(:topic, params["stream"])
+      Pleroma.Web.Streamer.add_socket(params["stream"], socket)
+      {:ok, socket}
+    else
+      :error
+    end
   end
 
   def id(socket), do: nil
@@ -21,7 +27,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
     {:text, message}
   end
 
-  def handle(:closed, reason, _state) do
-    IO.inspect reason
+  def handle(:closed, reason, %{socket: socket}) do
+    topic = socket.assigns[:topic]
+    Pleroma.Web.Streamer.remove_socket(topic, socket)
   end
 end
index cc38058946980ac0aff19de8cabe5e6532e6ae45..3a7b9174322a72c8b68d720dd69830ac852e69ab 100644 (file)
@@ -4,6 +4,10 @@ defmodule Pleroma.Web.Streamer do
   import Plug.Conn
 
   def start_link do
+    spawn(fn ->
+      Process.sleep(1000 * 30) # 30 seconds
+      GenServer.cast(__MODULE__, %{action: :ping})
+    end)
     GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
   end
 
@@ -11,10 +15,28 @@ defmodule Pleroma.Web.Streamer do
     GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
   end
 
+  def remove_socket(topic, socket) do
+    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
+  end
+
   def stream(topic, item) do
     GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
   end
 
+  def handle_cast(%{action: :ping}, topics) do
+    Map.values(topics)
+    |> List.flatten
+    |> Enum.each(fn (socket) ->
+      Logger.debug("Sending keepalive ping")
+      send socket.transport_pid, {:text, ""}
+    end)
+    spawn(fn ->
+      Process.sleep(1000 * 30) # 30 seconds
+      GenServer.cast(__MODULE__, %{action: :ping})
+    end)
+    {:noreply, topics}
+  end
+
   def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
     Logger.debug("Trying to push to #{topic}")
     Logger.debug("Pushing item to #{topic}")
@@ -38,6 +60,15 @@ defmodule Pleroma.Web.Streamer do
     {:noreply, sockets}
   end
 
+  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
+    sockets_for_topic = sockets[topic] || []
+    sockets_for_topic = List.delete(sockets_for_topic, socket)
+    sockets = Map.put(sockets, topic, sockets_for_topic)
+    Logger.debug("Removed conn for #{topic}")
+    IO.inspect(sockets)
+    {:noreply, sockets}
+  end
+
   def handle_cast(m, state) do
     IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
     {:noreply, state}