MastoAPI: Basic streaming.
authorRoger Braun <roger@rogerbraun.net>
Sat, 11 Nov 2017 13:59:25 +0000 (14:59 +0100)
committerRoger Braun <roger@rogerbraun.net>
Sat, 11 Nov 2017 13:59:25 +0000 (14:59 +0100)
lib/pleroma/application.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/endpoint.ex
lib/pleroma/web/mastodon_api/mastodon_socket.ex [new file with mode: 0644]
lib/pleroma/web/streamer.ex [new file with mode: 0644]
lib/transports.ex [new file with mode: 0644]

index 1f0a055681cab7698c8f390d95cfd27ea44fb981..5422cbc28c0ca16cfc81cd0c848693cc524888b9 100644 (file)
@@ -19,7 +19,8 @@ defmodule Pleroma.Application do
                          ttl_interval: 1000,
                          limit: 2500
                        ]]),
-      worker(Pleroma.Web.Federator, [])
+      worker(Pleroma.Web.Federator, []),
+      worker(Pleroma.Web.Streamer, [])
     ]
 
     # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
index 1624c6545b31e0af5fd1a7683431a3c1eb13a751..35536a1e41cb5530886a10fbb8a6ced31c873c97 100644 (file)
@@ -22,6 +22,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     with create_data <- make_create_data(%{to: to, actor: actor, published: published, context: context, object: object}, additional),
          {:ok, activity} <- insert(create_data, local),
          :ok <- maybe_federate(activity) do
+      if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
+        Pleroma.Web.Streamer.stream("public", activity)
+      end
       {:ok, activity}
     end
   end
index a1b4108cd6e6f99fc2a3cf8abffc70889b83babc..dc1ba2a05c165fa5524321aa97c20b52bd0a992e 100644 (file)
@@ -2,6 +2,7 @@ defmodule Pleroma.Web.Endpoint do
   use Phoenix.Endpoint, otp_app: :pleroma
 
   socket "/socket", Pleroma.Web.UserSocket
+  socket "/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket
 
   # Serve at "/" the static files from "priv/static" directory.
   #
diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
new file mode 100644 (file)
index 0000000..c27d025
--- /dev/null
@@ -0,0 +1,27 @@
+defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
+  use Phoenix.Socket
+
+  transport :streaming, Phoenix.Transports.WebSocket.Raw
+
+  def connect(params, socket) do
+    IO.inspect(params)
+    Pleroma.Web.Streamer.add_socket(params["stream"], socket)
+    {:ok, socket}
+  end
+
+  def id(socket), do: nil
+
+  def handle(:text, message, state) do
+    IO.inspect message
+    #| :ok
+    #| state
+    #| {:text, message}
+    #| {:text, message, state}
+    #| {:close, "Goodbye!"}
+    {:text, message}
+  end
+
+  def handle(:closed, reason, _state) do
+    IO.inspect reason
+  end
+end
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
new file mode 100644 (file)
index 0000000..cc38058
--- /dev/null
@@ -0,0 +1,45 @@
+defmodule Pleroma.Web.Streamer do
+  use GenServer
+  require Logger
+  import Plug.Conn
+
+  def start_link do
+    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+  end
+
+  def add_socket(topic, socket) do
+    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
+  end
+
+  def stream(topic, item) do
+    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
+  end
+
+  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    Enum.each(topics[topic] || [], fn (socket) ->
+      json = %{
+        event: "update",
+        payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode!
+      } |> Poison.encode!
+
+      send socket.transport_pid, {:text, json}
+    end)
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+    sockets_for_topic = sockets[topic] || []
+    sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
+    sockets = Map.put(sockets, topic, sockets_for_topic)
+    Logger.debug("Got new conn for #{topic}")
+    IO.inspect(sockets)
+    {:noreply, sockets}
+  end
+
+  def handle_cast(m, state) do
+    IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
+    {:noreply, state}
+  end
+end
diff --git a/lib/transports.ex b/lib/transports.ex
new file mode 100644 (file)
index 0000000..5600a4f
--- /dev/null
@@ -0,0 +1,77 @@
+defmodule Phoenix.Transports.WebSocket.Raw do
+  import Plug.Conn, only: [
+    fetch_query_params: 1,
+    send_resp: 3
+  ]
+  alias Phoenix.Socket.Transport
+
+  def default_config do
+    [
+      timeout: 60_000,
+      transport_log: false,
+      cowboy: Phoenix.Endpoint.CowboyWebSocket
+    ]
+  end
+
+  def init(%Plug.Conn{method: "GET"} = conn, {endpoint, handler, transport}) do
+    {_, opts} = handler.__transport__(transport)
+
+    conn = conn
+    |> fetch_query_params
+    |> Transport.transport_log(opts[:transport_log])
+    |> Transport.force_ssl(handler, endpoint, opts)
+    |> Transport.check_origin(handler, endpoint, opts)
+
+    case conn do
+      %{halted: false} = conn ->
+        case Transport.connect(endpoint, handler, transport, __MODULE__, nil, conn.params) do
+          {:ok, socket} ->
+            {:ok, conn, {__MODULE__, {socket, opts}}}
+          :error ->
+            send_resp(conn, :forbidden, "")
+            {:error, conn}
+        end
+      _ ->
+        {:error, conn}
+    end
+  end
+
+  def init(conn, _) do
+    send_resp(conn, :bad_request, "")
+    {:error, conn}
+  end
+
+  def ws_init({socket, config}) do
+    Process.flag(:trap_exit, true)
+    {:ok, %{socket: socket}, config[:timeout]}
+  end
+
+  def ws_handle(op, data, state) do
+    state.socket.handler
+    |> apply(:handle, [op, data, state])
+    |> case do
+      {op, data} ->
+        {:reply, {op, data}, state}
+      {op, data, state} ->
+        {:reply, {op, data}, state}
+      %{} = state ->
+        {:ok, state}
+      _ ->
+        {:ok, state}
+    end
+  end
+
+  def ws_info({op, data} = tuple, state) do
+    {:reply, tuple, state}
+  end
+
+  def ws_info(_tuple, state), do: {:ok, state}
+
+  def ws_close(state) do
+    ws_handle(:closed, :normal, state)
+  end
+
+  def ws_terminate(reason, state) do
+    ws_handle(:closed, reason, state)
+  end
+end