Merge branch 'feld-warnings' into 'develop'
[akkoma] / lib / pleroma / web / streamer.ex
1 defmodule Pleroma.Web.Streamer do
2 use GenServer
3 require Logger
4 alias Pleroma.{User, Notification}
5
6 def start_link do
7 spawn(fn ->
8 # 30 seconds
9 Process.sleep(1000 * 30)
10 GenServer.cast(__MODULE__, %{action: :ping})
11 end)
12
13 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
14 end
15
16 def add_socket(topic, socket) do
17 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
18 end
19
20 def remove_socket(topic, socket) do
21 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
22 end
23
24 def stream(topic, item) do
25 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
26 end
27
28 def handle_cast(%{action: :ping}, topics) do
29 Map.values(topics)
30 |> List.flatten()
31 |> Enum.each(fn socket ->
32 Logger.debug("Sending keepalive ping")
33 send(socket.transport_pid, {:text, ""})
34 end)
35
36 spawn(fn ->
37 # 30 seconds
38 Process.sleep(1000 * 30)
39 GenServer.cast(__MODULE__, %{action: :ping})
40 end)
41
42 {:noreply, topics}
43 end
44
45 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
46 topic = "user:#{item.user_id}"
47
48 Enum.each(topics[topic] || [], fn socket ->
49 json =
50 %{
51 event: "notification",
52 payload:
53 Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
54 socket.assigns["user"],
55 item
56 )
57 |> Jason.encode!()
58 }
59 |> Jason.encode!()
60
61 send(socket.transport_pid, {:text, json})
62 end)
63
64 {:noreply, topics}
65 end
66
67 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
68 Logger.debug("Trying to push to users")
69
70 recipient_topics =
71 User.get_recipients_from_activity(item)
72 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
73
74 Enum.each(recipient_topics, fn topic ->
75 push_to_socket(topics, topic, item)
76 end)
77
78 {:noreply, topics}
79 end
80
81 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
82 Logger.debug("Trying to push to #{topic}")
83 Logger.debug("Pushing item to #{topic}")
84 push_to_socket(topics, topic, item)
85 {:noreply, topics}
86 end
87
88 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
89 topic = internal_topic(topic, socket)
90 sockets_for_topic = sockets[topic] || []
91 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
92 sockets = Map.put(sockets, topic, sockets_for_topic)
93 Logger.debug("Got new conn for #{topic}")
94 {:noreply, sockets}
95 end
96
97 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
98 topic = internal_topic(topic, socket)
99 sockets_for_topic = sockets[topic] || []
100 sockets_for_topic = List.delete(sockets_for_topic, socket)
101 sockets = Map.put(sockets, topic, sockets_for_topic)
102 Logger.debug("Removed conn for #{topic}")
103 {:noreply, sockets}
104 end
105
106 def handle_cast(m, state) do
107 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
108 {:noreply, state}
109 end
110
111 def push_to_socket(topics, topic, item) do
112 Enum.each(topics[topic] || [], fn socket ->
113 # Get the current user so we have up-to-date blocks etc.
114 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
115 blocks = user.info["blocks"] || []
116
117 unless item.actor in blocks do
118 json =
119 %{
120 event: "update",
121 payload:
122 Pleroma.Web.MastodonAPI.StatusView.render(
123 "status.json",
124 activity: item,
125 for: user
126 )
127 |> Jason.encode!()
128 }
129 |> Jason.encode!()
130
131 send(socket.transport_pid, {:text, json})
132 end
133 end)
134 end
135
136 defp internal_topic("user", socket) do
137 "user:#{socket.assigns[:user].id}"
138 end
139
140 defp internal_topic(topic, _), do: topic
141 end