cc38058946980ac0aff19de8cabe5e6532e6ae45
[akkoma] / lib / pleroma / web / streamer.ex
1 defmodule Pleroma.Web.Streamer do
2 use GenServer
3 require Logger
4 import Plug.Conn
5
6 def start_link do
7 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
8 end
9
10 def add_socket(topic, socket) do
11 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
12 end
13
14 def stream(topic, item) do
15 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
16 end
17
18 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
19 Logger.debug("Trying to push to #{topic}")
20 Logger.debug("Pushing item to #{topic}")
21 Enum.each(topics[topic] || [], fn (socket) ->
22 json = %{
23 event: "update",
24 payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode!
25 } |> Poison.encode!
26
27 send socket.transport_pid, {:text, json}
28 end)
29 {:noreply, topics}
30 end
31
32 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
33 sockets_for_topic = sockets[topic] || []
34 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
35 sockets = Map.put(sockets, topic, sockets_for_topic)
36 Logger.debug("Got new conn for #{topic}")
37 IO.inspect(sockets)
38 {:noreply, sockets}
39 end
40
41 def handle_cast(m, state) do
42 IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
43 {:noreply, state}
44 end
45 end