Merge remote-tracking branch 'pleroma/develop' into feature/disable-account
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 use GenServer
7 require Logger
8 alias Pleroma.Activity
9 alias Pleroma.Conversation.Participation
10 alias Pleroma.Notification
11 alias Pleroma.Object
12 alias Pleroma.User
13 alias Pleroma.Web.ActivityPub.ActivityPub
14 alias Pleroma.Web.ActivityPub.Visibility
15 alias Pleroma.Web.MastodonAPI.NotificationView
16
17 @keepalive_interval :timer.seconds(30)
18
19 def start_link do
20 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
21 end
22
23 def add_socket(topic, socket) do
24 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
25 end
26
27 def remove_socket(topic, socket) do
28 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
29 end
30
31 def stream(topic, item) do
32 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
33 end
34
35 def init(args) do
36 spawn(fn ->
37 # 30 seconds
38 Process.sleep(@keepalive_interval)
39 GenServer.cast(__MODULE__, %{action: :ping})
40 end)
41
42 {:ok, args}
43 end
44
45 def handle_cast(%{action: :ping}, topics) do
46 Map.values(topics)
47 |> List.flatten()
48 |> Enum.each(fn socket ->
49 Logger.debug("Sending keepalive ping")
50 send(socket.transport_pid, {:text, ""})
51 end)
52
53 spawn(fn ->
54 # 30 seconds
55 Process.sleep(@keepalive_interval)
56 GenServer.cast(__MODULE__, %{action: :ping})
57 end)
58
59 {:noreply, topics}
60 end
61
62 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
63 recipient_topics =
64 User.get_recipients_from_activity(item)
65 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
66
67 Enum.each(recipient_topics || [], fn user_topic ->
68 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
69 push_to_socket(topics, user_topic, item)
70 end)
71
72 {:noreply, topics}
73 end
74
75 def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
76 user_topic = "direct:#{participation.user_id}"
77 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
78
79 push_to_socket(topics, user_topic, participation)
80
81 {:noreply, topics}
82 end
83
84 def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
85 # filter the recipient list if the activity is not public, see #270.
86 recipient_lists =
87 case Visibility.is_public?(item) do
88 true ->
89 Pleroma.List.get_lists_from_activity(item)
90
91 _ ->
92 Pleroma.List.get_lists_from_activity(item)
93 |> Enum.filter(fn list ->
94 owner = User.get_cached_by_id(list.user_id)
95
96 Visibility.visible_for_user?(item, owner)
97 end)
98 end
99
100 recipient_topics =
101 recipient_lists
102 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
103
104 Enum.each(recipient_topics || [], fn list_topic ->
105 Logger.debug("Trying to push message to #{list_topic}\n\n")
106 push_to_socket(topics, list_topic, item)
107 end)
108
109 {:noreply, topics}
110 end
111
112 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
113 topic = "user:#{item.user_id}"
114
115 Enum.each(topics[topic] || [], fn socket ->
116 json =
117 %{
118 event: "notification",
119 payload:
120 NotificationView.render("show.json", %{
121 notification: item,
122 for: socket.assigns["user"]
123 })
124 |> Jason.encode!()
125 }
126 |> Jason.encode!()
127
128 send(socket.transport_pid, {:text, json})
129 end)
130
131 {:noreply, topics}
132 end
133
134 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
135 Logger.debug("Trying to push to users")
136
137 recipient_topics =
138 User.get_recipients_from_activity(item)
139 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
140
141 Enum.each(recipient_topics, fn topic ->
142 push_to_socket(topics, topic, item)
143 end)
144
145 {:noreply, topics}
146 end
147
148 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
149 Logger.debug("Trying to push to #{topic}")
150 Logger.debug("Pushing item to #{topic}")
151 push_to_socket(topics, topic, item)
152 {:noreply, topics}
153 end
154
155 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
156 topic = internal_topic(topic, socket)
157 sockets_for_topic = sockets[topic] || []
158 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
159 sockets = Map.put(sockets, topic, sockets_for_topic)
160 Logger.debug("Got new conn for #{topic}")
161 {:noreply, sockets}
162 end
163
164 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
165 topic = internal_topic(topic, socket)
166 sockets_for_topic = sockets[topic] || []
167 sockets_for_topic = List.delete(sockets_for_topic, socket)
168 sockets = Map.put(sockets, topic, sockets_for_topic)
169 Logger.debug("Removed conn for #{topic}")
170 {:noreply, sockets}
171 end
172
173 def handle_cast(m, state) do
174 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
175 {:noreply, state}
176 end
177
178 defp represent_update(%Activity{} = activity, %User{} = user) do
179 %{
180 event: "update",
181 payload:
182 Pleroma.Web.MastodonAPI.StatusView.render(
183 "status.json",
184 activity: activity,
185 for: user
186 )
187 |> Jason.encode!()
188 }
189 |> Jason.encode!()
190 end
191
192 defp represent_update(%Activity{} = activity) do
193 %{
194 event: "update",
195 payload:
196 Pleroma.Web.MastodonAPI.StatusView.render(
197 "status.json",
198 activity: activity
199 )
200 |> Jason.encode!()
201 }
202 |> Jason.encode!()
203 end
204
205 def represent_conversation(%Participation{} = participation) do
206 %{
207 event: "conversation",
208 payload:
209 Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
210 participation: participation,
211 user: participation.user
212 })
213 |> Jason.encode!()
214 }
215 |> Jason.encode!()
216 end
217
218 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
219 Enum.each(topics[topic] || [], fn socket ->
220 # Get the current user so we have up-to-date blocks etc.
221 if socket.assigns[:user] do
222 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
223 blocks = user.info.blocks || []
224 mutes = user.info.mutes || []
225 reblog_mutes = user.info.muted_reblogs || []
226
227 parent = Object.normalize(item)
228
229 unless is_nil(parent) or item.actor in blocks or item.actor in mutes or
230 item.actor in reblog_mutes or not ActivityPub.contain_activity(item, user) or
231 parent.data["actor"] in blocks or parent.data["actor"] in mutes do
232 send(socket.transport_pid, {:text, represent_update(item, user)})
233 end
234 else
235 send(socket.transport_pid, {:text, represent_update(item)})
236 end
237 end)
238 end
239
240 def push_to_socket(topics, topic, %Participation{} = participation) do
241 Enum.each(topics[topic] || [], fn socket ->
242 send(socket.transport_pid, {:text, represent_conversation(participation)})
243 end)
244 end
245
246 def push_to_socket(topics, topic, %Activity{
247 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
248 }) do
249 Enum.each(topics[topic] || [], fn socket ->
250 send(
251 socket.transport_pid,
252 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
253 )
254 end)
255 end
256
257 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
258
259 def push_to_socket(topics, topic, item) do
260 Enum.each(topics[topic] || [], fn socket ->
261 # Get the current user so we have up-to-date blocks etc.
262 if socket.assigns[:user] do
263 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
264 blocks = user.info.blocks || []
265 mutes = user.info.mutes || []
266
267 unless item.actor in blocks or item.actor in mutes or
268 not ActivityPub.contain_activity(item, user) do
269 send(socket.transport_pid, {:text, represent_update(item, user)})
270 end
271 else
272 send(socket.transport_pid, {:text, represent_update(item)})
273 end
274 end)
275 end
276
277 defp internal_topic(topic, socket) when topic in ~w[user direct] do
278 "#{topic}:#{socket.assigns[:user].id}"
279 end
280
281 defp internal_topic(topic, _), do: topic
282 end