Merge branch 'develop' into feature/activitypub
[akkoma] / lib / pleroma / web / streamer.ex
1 defmodule Pleroma.Web.Streamer do
2 use GenServer
3 require Logger
4 alias Pleroma.{User, Notification}
5
6 def start_link do
7 spawn(fn ->
8 Process.sleep(1000 * 30) # 30 seconds
9 GenServer.cast(__MODULE__, %{action: :ping})
10 end)
11 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
12 end
13
14 def add_socket(topic, socket) do
15 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
16 end
17
18 def remove_socket(topic, socket) do
19 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
20 end
21
22 def stream(topic, item) do
23 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
24 end
25
26 def handle_cast(%{action: :ping}, topics) do
27 Map.values(topics)
28 |> List.flatten
29 |> Enum.each(fn (socket) ->
30 Logger.debug("Sending keepalive ping")
31 send socket.transport_pid, {:text, ""}
32 end)
33 spawn(fn ->
34 Process.sleep(1000 * 30) # 30 seconds
35 GenServer.cast(__MODULE__, %{action: :ping})
36 end)
37 {:noreply, topics}
38 end
39
40 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
41 topic = "user:#{item.user_id}"
42 Enum.each(topics[topic] || [], fn (socket) ->
43 json = %{
44 event: "notification",
45 payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
46 } |> Poison.encode!
47
48 send socket.transport_pid, {:text, json}
49 end)
50 {:noreply, topics}
51 end
52
53 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
54 Logger.debug("Trying to push to users")
55 recipient_topics = User.get_recipients_from_activity(item)
56 |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
57
58 Enum.each(recipient_topics, fn (topic) ->
59 push_to_socket(topics, topic, item)
60 end)
61 {:noreply, topics}
62 end
63
64 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
65 Logger.debug("Trying to push to #{topic}")
66 Logger.debug("Pushing item to #{topic}")
67 push_to_socket(topics, topic, item)
68 {:noreply, topics}
69 end
70
71 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
72 topic = internal_topic(topic, socket)
73 sockets_for_topic = sockets[topic] || []
74 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
75 sockets = Map.put(sockets, topic, sockets_for_topic)
76 Logger.debug("Got new conn for #{topic}")
77 IO.inspect(sockets)
78 {:noreply, sockets}
79 end
80
81 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
82 topic = internal_topic(topic, socket)
83 sockets_for_topic = sockets[topic] || []
84 sockets_for_topic = List.delete(sockets_for_topic, socket)
85 sockets = Map.put(sockets, topic, sockets_for_topic)
86 Logger.debug("Removed conn for #{topic}")
87 IO.inspect(sockets)
88 {:noreply, sockets}
89 end
90
91 def handle_cast(m, state) do
92 IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
93 {:noreply, state}
94 end
95
96 def push_to_socket(topics, topic, item) do
97 Enum.each(topics[topic] || [], fn (socket) ->
98 json = %{
99 event: "update",
100 payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item, for: socket.assigns[:user]) |> Poison.encode!
101 } |> Poison.encode!
102
103 send socket.transport_pid, {:text, json}
104 end)
105 end
106
107 defp internal_topic("user", socket) do
108 "user:#{socket.assigns[:user].id}"
109 end
110
111 defp internal_topic(topic, _), do: topic
112 end