Merge branch 'bugfix/public-mastoapi-websocket' into 'develop'
[akkoma] / lib / pleroma / web / streamer.ex
1 defmodule Pleroma.Web.Streamer do
2 use GenServer
3 require Logger
4 alias Pleroma.{User, Notification, Activity, Object, Repo}
5 alias Pleroma.Web.ActivityPub.ActivityPub
6
7 def init(args) do
8 {:ok, args}
9 end
10
11 def start_link do
12 spawn(fn ->
13 # 30 seconds
14 Process.sleep(1000 * 30)
15 GenServer.cast(__MODULE__, %{action: :ping})
16 end)
17
18 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
19 end
20
21 def add_socket(topic, socket) do
22 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
23 end
24
25 def remove_socket(topic, socket) do
26 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
27 end
28
29 def stream(topic, item) do
30 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
31 end
32
33 def handle_cast(%{action: :ping}, topics) do
34 Map.values(topics)
35 |> List.flatten()
36 |> Enum.each(fn socket ->
37 Logger.debug("Sending keepalive ping")
38 send(socket.transport_pid, {:text, ""})
39 end)
40
41 spawn(fn ->
42 # 30 seconds
43 Process.sleep(1000 * 30)
44 GenServer.cast(__MODULE__, %{action: :ping})
45 end)
46
47 {:noreply, topics}
48 end
49
50 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
51 recipient_topics =
52 User.get_recipients_from_activity(item)
53 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
54
55 Enum.each(recipient_topics || [], fn user_topic ->
56 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
57 push_to_socket(topics, user_topic, item)
58 end)
59
60 {:noreply, topics}
61 end
62
63 def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
64 author = User.get_cached_by_ap_id(item.data["actor"])
65
66 # filter the recipient list if the activity is not public, see #270.
67 recipient_lists =
68 case ActivityPub.is_public?(item) do
69 true ->
70 Pleroma.List.get_lists_from_activity(item)
71
72 _ ->
73 Pleroma.List.get_lists_from_activity(item)
74 |> Enum.filter(fn list ->
75 owner = Repo.get(User, list.user_id)
76 author.follower_address in owner.following
77 end)
78 end
79
80 recipient_topics =
81 recipient_lists
82 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
83
84 Enum.each(recipient_topics || [], fn list_topic ->
85 Logger.debug("Trying to push message to #{list_topic}\n\n")
86 push_to_socket(topics, list_topic, item)
87 end)
88
89 {:noreply, topics}
90 end
91
92 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
93 topic = "user:#{item.user_id}"
94
95 Enum.each(topics[topic] || [], fn socket ->
96 json =
97 %{
98 event: "notification",
99 payload:
100 Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
101 socket.assigns["user"],
102 item
103 )
104 |> Jason.encode!()
105 }
106 |> Jason.encode!()
107
108 send(socket.transport_pid, {:text, json})
109 end)
110
111 {:noreply, topics}
112 end
113
114 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
115 Logger.debug("Trying to push to users")
116
117 recipient_topics =
118 User.get_recipients_from_activity(item)
119 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120
121 Enum.each(recipient_topics, fn topic ->
122 push_to_socket(topics, topic, item)
123 end)
124
125 {:noreply, topics}
126 end
127
128 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
129 Logger.debug("Trying to push to #{topic}")
130 Logger.debug("Pushing item to #{topic}")
131 push_to_socket(topics, topic, item)
132 {:noreply, topics}
133 end
134
135 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
136 topic = internal_topic(topic, socket)
137 sockets_for_topic = sockets[topic] || []
138 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
139 sockets = Map.put(sockets, topic, sockets_for_topic)
140 Logger.debug("Got new conn for #{topic}")
141 {:noreply, sockets}
142 end
143
144 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
145 topic = internal_topic(topic, socket)
146 sockets_for_topic = sockets[topic] || []
147 sockets_for_topic = List.delete(sockets_for_topic, socket)
148 sockets = Map.put(sockets, topic, sockets_for_topic)
149 Logger.debug("Removed conn for #{topic}")
150 {:noreply, sockets}
151 end
152
153 def handle_cast(m, state) do
154 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
155 {:noreply, state}
156 end
157
158 defp represent_update(%Activity{} = activity, %User{} = user) do
159 %{
160 event: "update",
161 payload:
162 Pleroma.Web.MastodonAPI.StatusView.render(
163 "status.json",
164 activity: activity,
165 for: user
166 )
167 |> Jason.encode!()
168 }
169 |> Jason.encode!()
170 end
171
172 defp represent_update(%Activity{} = activity) do
173 %{
174 event: "update",
175 payload:
176 Pleroma.Web.MastodonAPI.StatusView.render(
177 "status.json",
178 activity: activity
179 )
180 |> Jason.encode!()
181 }
182 |> Jason.encode!()
183 end
184
185 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
186 Enum.each(topics[topic] || [], fn socket ->
187 # Get the current user so we have up-to-date blocks etc.
188 if socket.assigns[:user] do
189 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
190 blocks = user.info["blocks"] || []
191
192 parent = Object.normalize(item.data["object"])
193
194 unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
195 send(socket.transport_pid, {:text, represent_update(item, user)})
196 end
197 else
198 send(socket.transport_pid, {:text, represent_update(item)})
199 end
200 end)
201 end
202
203 def push_to_socket(topics, topic, item) do
204 Enum.each(topics[topic] || [], fn socket ->
205 # Get the current user so we have up-to-date blocks etc.
206 if socket.assigns[:user] do
207 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
208 blocks = user.info["blocks"] || []
209
210 unless item.actor in blocks do
211 send(socket.transport_pid, {:text, represent_update(item, user)})
212 end
213 else
214 send(socket.transport_pid, {:text, represent_update(item)})
215 end
216 end)
217 end
218
219 defp internal_topic(topic, socket) when topic in ~w[user direct] do
220 "#{topic}:#{socket.assigns[:user].id}"
221 end
222
223 defp internal_topic(topic, _), do: topic
224 end