ttl_interval: 1000,
limit: 2500
]]),
- worker(Pleroma.Web.Federator, [])
+ worker(Pleroma.Web.Federator, []),
+ worker(Pleroma.Web.Streamer, [])
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
with create_data <- make_create_data(%{to: to, actor: actor, published: published, context: context, object: object}, additional),
{:ok, activity} <- insert(create_data, local),
:ok <- maybe_federate(activity) do
+ if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
+ Pleroma.Web.Streamer.stream("public", activity)
+ end
{:ok, activity}
end
end
use Phoenix.Endpoint, otp_app: :pleroma
socket "/socket", Pleroma.Web.UserSocket
+ socket "/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket
# Serve at "/" the static files from "priv/static" directory.
#
--- /dev/null
+defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
+ use Phoenix.Socket
+
+ transport :streaming, Phoenix.Transports.WebSocket.Raw
+
+ def connect(params, socket) do
+ IO.inspect(params)
+ Pleroma.Web.Streamer.add_socket(params["stream"], socket)
+ {:ok, socket}
+ end
+
+ def id(socket), do: nil
+
+ def handle(:text, message, state) do
+ IO.inspect message
+ #| :ok
+ #| state
+ #| {:text, message}
+ #| {:text, message, state}
+ #| {:close, "Goodbye!"}
+ {:text, message}
+ end
+
+ def handle(:closed, reason, _state) do
+ IO.inspect reason
+ end
+end
--- /dev/null
+defmodule Pleroma.Web.Streamer do
+ use GenServer
+ require Logger
+ import Plug.Conn
+
+ def start_link do
+ GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+ end
+
+ def add_socket(topic, socket) do
+ GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
+ end
+
+ def stream(topic, item) do
+ GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
+ end
+
+ def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ Enum.each(topics[topic] || [], fn (socket) ->
+ json = %{
+ event: "update",
+ payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode!
+ } |> Poison.encode!
+
+ send socket.transport_pid, {:text, json}
+ end)
+ {:noreply, topics}
+ end
+
+ def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+ sockets_for_topic = sockets[topic] || []
+ sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
+ sockets = Map.put(sockets, topic, sockets_for_topic)
+ Logger.debug("Got new conn for #{topic}")
+ IO.inspect(sockets)
+ {:noreply, sockets}
+ end
+
+ def handle_cast(m, state) do
+ IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
+ {:noreply, state}
+ end
+end
--- /dev/null
+defmodule Phoenix.Transports.WebSocket.Raw do
+ import Plug.Conn, only: [
+ fetch_query_params: 1,
+ send_resp: 3
+ ]
+ alias Phoenix.Socket.Transport
+
+ def default_config do
+ [
+ timeout: 60_000,
+ transport_log: false,
+ cowboy: Phoenix.Endpoint.CowboyWebSocket
+ ]
+ end
+
+ def init(%Plug.Conn{method: "GET"} = conn, {endpoint, handler, transport}) do
+ {_, opts} = handler.__transport__(transport)
+
+ conn = conn
+ |> fetch_query_params
+ |> Transport.transport_log(opts[:transport_log])
+ |> Transport.force_ssl(handler, endpoint, opts)
+ |> Transport.check_origin(handler, endpoint, opts)
+
+ case conn do
+ %{halted: false} = conn ->
+ case Transport.connect(endpoint, handler, transport, __MODULE__, nil, conn.params) do
+ {:ok, socket} ->
+ {:ok, conn, {__MODULE__, {socket, opts}}}
+ :error ->
+ send_resp(conn, :forbidden, "")
+ {:error, conn}
+ end
+ _ ->
+ {:error, conn}
+ end
+ end
+
+ def init(conn, _) do
+ send_resp(conn, :bad_request, "")
+ {:error, conn}
+ end
+
+ def ws_init({socket, config}) do
+ Process.flag(:trap_exit, true)
+ {:ok, %{socket: socket}, config[:timeout]}
+ end
+
+ def ws_handle(op, data, state) do
+ state.socket.handler
+ |> apply(:handle, [op, data, state])
+ |> case do
+ {op, data} ->
+ {:reply, {op, data}, state}
+ {op, data, state} ->
+ {:reply, {op, data}, state}
+ %{} = state ->
+ {:ok, state}
+ _ ->
+ {:ok, state}
+ end
+ end
+
+ def ws_info({op, data} = tuple, state) do
+ {:reply, tuple, state}
+ end
+
+ def ws_info(_tuple, state), do: {:ok, state}
+
+ def ws_close(state) do
+ ws_handle(:closed, :normal, state)
+ end
+
+ def ws_terminate(reason, state) do
+ ws_handle(:closed, reason, state)
+ end
+end