From bd5bdc4c247e2ebb239215540a51b69c356da65c Mon Sep 17 00:00:00 2001 From: Roger Braun Date: Sat, 11 Nov 2017 14:59:25 +0100 Subject: [PATCH] MastoAPI: Basic streaming. --- lib/pleroma/application.ex | 3 +- lib/pleroma/web/activity_pub/activity_pub.ex | 3 + lib/pleroma/web/endpoint.ex | 1 + .../web/mastodon_api/mastodon_socket.ex | 27 +++++++ lib/pleroma/web/streamer.ex | 45 +++++++++++ lib/transports.ex | 77 +++++++++++++++++++ 6 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 lib/pleroma/web/mastodon_api/mastodon_socket.ex create mode 100644 lib/pleroma/web/streamer.ex create mode 100644 lib/transports.ex diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1f0a05568..5422cbc28 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -19,7 +19,8 @@ defmodule Pleroma.Application do ttl_interval: 1000, limit: 2500 ]]), - worker(Pleroma.Web.Federator, []) + worker(Pleroma.Web.Federator, []), + worker(Pleroma.Web.Streamer, []) ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 1624c6545..35536a1e4 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -22,6 +22,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do with create_data <- make_create_data(%{to: to, actor: actor, published: published, context: context, object: object}, additional), {:ok, activity} <- insert(create_data, local), :ok <- maybe_federate(activity) do + if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do + Pleroma.Web.Streamer.stream("public", activity) + end {:ok, activity} end end diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex index a1b4108cd..dc1ba2a05 100644 --- a/lib/pleroma/web/endpoint.ex +++ b/lib/pleroma/web/endpoint.ex @@ -2,6 +2,7 @@ defmodule Pleroma.Web.Endpoint do use Phoenix.Endpoint, otp_app: :pleroma socket "/socket", Pleroma.Web.UserSocket + socket "/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket # Serve at "/" the static files from "priv/static" directory. # diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex new file mode 100644 index 000000000..c27d025c4 --- /dev/null +++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex @@ -0,0 +1,27 @@ +defmodule Pleroma.Web.MastodonAPI.MastodonSocket do + use Phoenix.Socket + + transport :streaming, Phoenix.Transports.WebSocket.Raw + + def connect(params, socket) do + IO.inspect(params) + Pleroma.Web.Streamer.add_socket(params["stream"], socket) + {:ok, socket} + end + + def id(socket), do: nil + + def handle(:text, message, state) do + IO.inspect message + #| :ok + #| state + #| {:text, message} + #| {:text, message, state} + #| {:close, "Goodbye!"} + {:text, message} + end + + def handle(:closed, reason, _state) do + IO.inspect reason + end +end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex new file mode 100644 index 000000000..cc3805894 --- /dev/null +++ b/lib/pleroma/web/streamer.ex @@ -0,0 +1,45 @@ +defmodule Pleroma.Web.Streamer do + use GenServer + require Logger + import Plug.Conn + + def start_link do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def add_socket(topic, socket) do + GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) + end + + def stream(topic, item) do + GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) + end + + def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + Enum.each(topics[topic] || [], fn (socket) -> + json = %{ + event: "update", + payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode! + } |> Poison.encode! + + send socket.transport_pid, {:text, json} + end) + {:noreply, topics} + end + + def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do + sockets_for_topic = sockets[topic] || [] + sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) + sockets = Map.put(sockets, topic, sockets_for_topic) + Logger.debug("Got new conn for #{topic}") + IO.inspect(sockets) + {:noreply, sockets} + end + + def handle_cast(m, state) do + IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") + {:noreply, state} + end +end diff --git a/lib/transports.ex b/lib/transports.ex new file mode 100644 index 000000000..5600a4fdd --- /dev/null +++ b/lib/transports.ex @@ -0,0 +1,77 @@ +defmodule Phoenix.Transports.WebSocket.Raw do + import Plug.Conn, only: [ + fetch_query_params: 1, + send_resp: 3 + ] + alias Phoenix.Socket.Transport + + def default_config do + [ + timeout: 60_000, + transport_log: false, + cowboy: Phoenix.Endpoint.CowboyWebSocket + ] + end + + def init(%Plug.Conn{method: "GET"} = conn, {endpoint, handler, transport}) do + {_, opts} = handler.__transport__(transport) + + conn = conn + |> fetch_query_params + |> Transport.transport_log(opts[:transport_log]) + |> Transport.force_ssl(handler, endpoint, opts) + |> Transport.check_origin(handler, endpoint, opts) + + case conn do + %{halted: false} = conn -> + case Transport.connect(endpoint, handler, transport, __MODULE__, nil, conn.params) do + {:ok, socket} -> + {:ok, conn, {__MODULE__, {socket, opts}}} + :error -> + send_resp(conn, :forbidden, "") + {:error, conn} + end + _ -> + {:error, conn} + end + end + + def init(conn, _) do + send_resp(conn, :bad_request, "") + {:error, conn} + end + + def ws_init({socket, config}) do + Process.flag(:trap_exit, true) + {:ok, %{socket: socket}, config[:timeout]} + end + + def ws_handle(op, data, state) do + state.socket.handler + |> apply(:handle, [op, data, state]) + |> case do + {op, data} -> + {:reply, {op, data}, state} + {op, data, state} -> + {:reply, {op, data}, state} + %{} = state -> + {:ok, state} + _ -> + {:ok, state} + end + end + + def ws_info({op, data} = tuple, state) do + {:reply, tuple, state} + end + + def ws_info(_tuple, state), do: {:ok, state} + + def ws_close(state) do + ws_handle(:closed, :normal, state) + end + + def ws_terminate(reason, state) do + ws_handle(:closed, reason, state) + end +end -- 2.45.2