1 defmodule Pleroma.Web.Streamer do
8 Process.sleep(1000 * 30) # 30 seconds
9 GenServer.cast(__MODULE__, %{action: :ping})
11 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
14 def add_socket(topic, socket) do
15 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
18 def remove_socket(topic, socket) do
19 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
22 def stream(topic, item) do
23 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
26 def handle_cast(%{action: :ping}, topics) do
29 |> Enum.each(fn (socket) ->
30 Logger.debug("Sending keepalive ping")
31 send socket.transport_pid, {:text, ""}
34 Process.sleep(1000 * 30) # 30 seconds
35 GenServer.cast(__MODULE__, %{action: :ping})
40 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
41 Logger.debug("Trying to push to #{topic}")
42 Logger.debug("Pushing item to #{topic}")
43 Enum.each(topics[topic] || [], fn (socket) ->
46 payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode!
49 send socket.transport_pid, {:text, json}
54 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
55 sockets_for_topic = sockets[topic] || []
56 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
57 sockets = Map.put(sockets, topic, sockets_for_topic)
58 Logger.debug("Got new conn for #{topic}")
63 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
64 sockets_for_topic = sockets[topic] || []
65 sockets_for_topic = List.delete(sockets_for_topic, socket)
66 sockets = Map.put(sockets, topic, sockets_for_topic)
67 Logger.debug("Removed conn for #{topic}")
72 def handle_cast(m, state) do
73 IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")