MastoAPI: Implement all streaming functions.
[akkoma] / lib / pleroma / web / streamer.ex
1 defmodule Pleroma.Web.Streamer do
2 use GenServer
3 require Logger
4 import Plug.Conn
5 alias Pleroma.{User, Notification}
6
7 def start_link do
8 spawn(fn ->
9 Process.sleep(1000 * 30) # 30 seconds
10 GenServer.cast(__MODULE__, %{action: :ping})
11 end)
12 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
13 end
14
15 def add_socket(topic, socket) do
16 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
17 end
18
19 def remove_socket(topic, socket) do
20 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
21 end
22
23 def stream(topic, item) do
24 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
25 end
26
27 def handle_cast(%{action: :ping}, topics) do
28 Map.values(topics)
29 |> List.flatten
30 |> Enum.each(fn (socket) ->
31 Logger.debug("Sending keepalive ping")
32 send socket.transport_pid, {:text, ""}
33 end)
34 spawn(fn ->
35 Process.sleep(1000 * 30) # 30 seconds
36 GenServer.cast(__MODULE__, %{action: :ping})
37 end)
38 {:noreply, topics}
39 end
40
41 def push_to_socket(topics, topic, item) do
42 Enum.each(topics[topic] || [], fn (socket) ->
43 json = %{
44 event: "update",
45 payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item, for: socket.assigns[:user]) |> Poison.encode!
46 } |> Poison.encode!
47
48 send socket.transport_pid, {:text, json}
49 end)
50 end
51
52 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
53 topic = "user:#{item.user_id}"
54 Enum.each(topics[topic] || [], fn (socket) ->
55 json = %{
56 event: "notification",
57 payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
58 } |> Poison.encode!
59
60 send socket.transport_pid, {:text, json}
61 end)
62 {:noreply, topics}
63 end
64
65 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
66 Logger.debug("Trying to push to users")
67 recipient_topics = User.get_recipients_from_activity(item)
68 |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
69
70 Enum.each(recipient_topics, fn (topic) ->
71 push_to_socket(topics, topic, item)
72 end)
73 {:noreply, topics}
74 end
75
76 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
77 Logger.debug("Trying to push to #{topic}")
78 Logger.debug("Pushing item to #{topic}")
79 push_to_socket(topics, topic, item)
80 {:noreply, topics}
81 end
82
83 defp internal_topic("user", socket) do
84 "user:#{socket.assigns[:user].id}"
85 end
86 defp internal_topic(topic, socket), do: topic
87
88 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
89 topic = internal_topic(topic, socket)
90 sockets_for_topic = sockets[topic] || []
91 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
92 sockets = Map.put(sockets, topic, sockets_for_topic)
93 Logger.debug("Got new conn for #{topic}")
94 IO.inspect(sockets)
95 {:noreply, sockets}
96 end
97
98 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
99 topic = internal_topic(topic, socket)
100 sockets_for_topic = sockets[topic] || []
101 sockets_for_topic = List.delete(sockets_for_topic, socket)
102 sockets = Map.put(sockets, topic, sockets_for_topic)
103 Logger.debug("Removed conn for #{topic}")
104 IO.inspect(sockets)
105 {:noreply, sockets}
106 end
107
108 def handle_cast(m, state) do
109 IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
110 {:noreply, state}
111 end
112 end