Fix user topic streaming.
[akkoma] / lib / pleroma / web / streamer.ex
1 defmodule Pleroma.Web.Streamer do
2 use GenServer
3 require Logger
4 alias Pleroma.{User, Notification}
5
6 def init(args) do
7 {:ok, args}
8 end
9
10 def start_link do
11 spawn(fn ->
12 # 30 seconds
13 Process.sleep(1000 * 30)
14 GenServer.cast(__MODULE__, %{action: :ping})
15 end)
16
17 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
18 end
19
20 def add_socket(topic, socket) do
21 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
22 end
23
24 def remove_socket(topic, socket) do
25 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
26 end
27
28 def stream(topic, item) do
29 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
30 end
31
32 def handle_cast(%{action: :ping}, topics) do
33 Map.values(topics)
34 |> List.flatten()
35 |> Enum.each(fn socket ->
36 Logger.debug("Sending keepalive ping")
37 send(socket.transport_pid, {:text, ""})
38 end)
39
40 spawn(fn ->
41 # 30 seconds
42 Process.sleep(1000 * 30)
43 GenServer.cast(__MODULE__, %{action: :ping})
44 end)
45
46 {:noreply, topics}
47 end
48
49 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
50 recipient_topics =
51 User.get_recipients_from_activity(item)
52 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
53
54 Enum.each(recipient_topics || [], fn user_topic ->
55 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
56 push_to_socket(topics, user_topic, item)
57 end)
58
59 {:noreply, topics}
60 end
61
62 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
63 topic = "user:#{item.user_id}"
64
65 Enum.each(topics[topic] || [], fn socket ->
66 json =
67 %{
68 event: "notification",
69 payload:
70 Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
71 socket.assigns["user"],
72 item
73 )
74 |> Jason.encode!()
75 }
76 |> Jason.encode!()
77
78 send(socket.transport_pid, {:text, json})
79 end)
80
81 {:noreply, topics}
82 end
83
84 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
85 Logger.debug("Trying to push to users")
86
87 recipient_topics =
88 User.get_recipients_from_activity(item)
89 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
90
91 Enum.each(recipient_topics, fn topic ->
92 push_to_socket(topics, topic, item)
93 end)
94
95 {:noreply, topics}
96 end
97
98 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
99 Logger.debug("Trying to push to #{topic}")
100 Logger.debug("Pushing item to #{topic}")
101 push_to_socket(topics, topic, item)
102 {:noreply, topics}
103 end
104
105 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
106 topic = internal_topic(topic, socket)
107 sockets_for_topic = sockets[topic] || []
108 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
109 sockets = Map.put(sockets, topic, sockets_for_topic)
110 Logger.debug("Got new conn for #{topic}")
111 {:noreply, sockets}
112 end
113
114 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
115 topic = internal_topic(topic, socket)
116 sockets_for_topic = sockets[topic] || []
117 sockets_for_topic = List.delete(sockets_for_topic, socket)
118 sockets = Map.put(sockets, topic, sockets_for_topic)
119 Logger.debug("Removed conn for #{topic}")
120 {:noreply, sockets}
121 end
122
123 def handle_cast(m, state) do
124 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
125 {:noreply, state}
126 end
127
128 def push_to_socket(topics, topic, item) do
129 Enum.each(topics[topic] || [], fn socket ->
130 # Get the current user so we have up-to-date blocks etc.
131 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
132 blocks = user.info["blocks"] || []
133
134 unless item.actor in blocks do
135 json =
136 %{
137 event: "update",
138 payload:
139 Pleroma.Web.MastodonAPI.StatusView.render(
140 "status.json",
141 activity: item,
142 for: user
143 )
144 |> Jason.encode!()
145 }
146 |> Jason.encode!()
147
148 send(socket.transport_pid, {:text, json})
149 end
150 end)
151 end
152
153 defp internal_topic(topic, socket) when topic in ~w[user direct] do
154 "#{topic}:#{socket.assigns[:user].id}"
155 end
156
157 defp internal_topic(topic, _), do: topic
158 end