Merge branch 'develop' into feature/hide-follows-remote
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 use GenServer
7 require Logger
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.MastodonAPI.NotificationView
18
19 @keepalive_interval :timer.seconds(30)
20
21 def start_link do
22 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
23 end
24
25 def add_socket(topic, socket) do
26 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
27 end
28
29 def remove_socket(topic, socket) do
30 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
31 end
32
33 def stream(topic, item) do
34 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
35 end
36
37 def init(args) do
38 spawn(fn ->
39 # 30 seconds
40 Process.sleep(@keepalive_interval)
41 GenServer.cast(__MODULE__, %{action: :ping})
42 end)
43
44 {:ok, args}
45 end
46
47 def handle_cast(%{action: :ping}, topics) do
48 Map.values(topics)
49 |> List.flatten()
50 |> Enum.each(fn socket ->
51 Logger.debug("Sending keepalive ping")
52 send(socket.transport_pid, {:text, ""})
53 end)
54
55 spawn(fn ->
56 # 30 seconds
57 Process.sleep(@keepalive_interval)
58 GenServer.cast(__MODULE__, %{action: :ping})
59 end)
60
61 {:noreply, topics}
62 end
63
64 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
65 recipient_topics =
66 User.get_recipients_from_activity(item)
67 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
68
69 Enum.each(recipient_topics || [], fn user_topic ->
70 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
71 push_to_socket(topics, user_topic, item)
72 end)
73
74 {:noreply, topics}
75 end
76
77 def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
78 user_topic = "direct:#{participation.user_id}"
79 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
80
81 push_to_socket(topics, user_topic, participation)
82
83 {:noreply, topics}
84 end
85
86 def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
87 # filter the recipient list if the activity is not public, see #270.
88 recipient_lists =
89 case Visibility.is_public?(item) do
90 true ->
91 Pleroma.List.get_lists_from_activity(item)
92
93 _ ->
94 Pleroma.List.get_lists_from_activity(item)
95 |> Enum.filter(fn list ->
96 owner = User.get_cached_by_id(list.user_id)
97
98 Visibility.visible_for_user?(item, owner)
99 end)
100 end
101
102 recipient_topics =
103 recipient_lists
104 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
105
106 Enum.each(recipient_topics || [], fn list_topic ->
107 Logger.debug("Trying to push message to #{list_topic}\n\n")
108 push_to_socket(topics, list_topic, item)
109 end)
110
111 {:noreply, topics}
112 end
113
114 def handle_cast(
115 %{action: :stream, topic: topic, item: %Notification{} = item},
116 topics
117 )
118 when topic in ["user", "user:notification"] do
119 topics
120 |> Map.get("#{topic}:#{item.user_id}", [])
121 |> Enum.each(fn socket ->
122 with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
123 true <- should_send?(user, item),
124 false <- CommonAPI.thread_muted?(user, item.activity) do
125 send(
126 socket.transport_pid,
127 {:text, represent_notification(socket.assigns[:user], item)}
128 )
129 end
130 end)
131
132 {:noreply, topics}
133 end
134
135 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
136 Logger.debug("Trying to push to users")
137
138 recipient_topics =
139 User.get_recipients_from_activity(item)
140 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
141
142 Enum.each(recipient_topics, fn topic ->
143 push_to_socket(topics, topic, item)
144 end)
145
146 {:noreply, topics}
147 end
148
149 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
150 Logger.debug("Trying to push to #{topic}")
151 Logger.debug("Pushing item to #{topic}")
152 push_to_socket(topics, topic, item)
153 {:noreply, topics}
154 end
155
156 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
157 topic = internal_topic(topic, socket)
158 sockets_for_topic = sockets[topic] || []
159 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
160 sockets = Map.put(sockets, topic, sockets_for_topic)
161 Logger.debug("Got new conn for #{topic}")
162 {:noreply, sockets}
163 end
164
165 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
166 topic = internal_topic(topic, socket)
167 sockets_for_topic = sockets[topic] || []
168 sockets_for_topic = List.delete(sockets_for_topic, socket)
169 sockets = Map.put(sockets, topic, sockets_for_topic)
170 Logger.debug("Removed conn for #{topic}")
171 {:noreply, sockets}
172 end
173
174 def handle_cast(m, state) do
175 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
176 {:noreply, state}
177 end
178
179 defp represent_update(%Activity{} = activity, %User{} = user) do
180 %{
181 event: "update",
182 payload:
183 Pleroma.Web.MastodonAPI.StatusView.render(
184 "status.json",
185 activity: activity,
186 for: user
187 )
188 |> Jason.encode!()
189 }
190 |> Jason.encode!()
191 end
192
193 defp represent_update(%Activity{} = activity) do
194 %{
195 event: "update",
196 payload:
197 Pleroma.Web.MastodonAPI.StatusView.render(
198 "status.json",
199 activity: activity
200 )
201 |> Jason.encode!()
202 }
203 |> Jason.encode!()
204 end
205
206 def represent_conversation(%Participation{} = participation) do
207 %{
208 event: "conversation",
209 payload:
210 Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
211 participation: participation,
212 user: participation.user
213 })
214 |> Jason.encode!()
215 }
216 |> Jason.encode!()
217 end
218
219 @spec represent_notification(User.t(), Notification.t()) :: binary()
220 defp represent_notification(%User{} = user, %Notification{} = notify) do
221 %{
222 event: "notification",
223 payload:
224 NotificationView.render(
225 "show.json",
226 %{notification: notify, for: user}
227 )
228 |> Jason.encode!()
229 }
230 |> Jason.encode!()
231 end
232
233 defp should_send?(%User{} = user, %Activity{} = item) do
234 blocks = user.info.blocks || []
235 mutes = user.info.mutes || []
236 reblog_mutes = user.info.muted_reblogs || []
237 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
238
239 with parent when not is_nil(parent) <- Object.normalize(item),
240 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
241 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
242 %{host: item_host} <- URI.parse(item.actor),
243 %{host: parent_host} <- URI.parse(parent.data["actor"]),
244 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
245 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
246 true <- thread_containment(item, user) do
247 true
248 else
249 _ -> false
250 end
251 end
252
253 defp should_send?(%User{} = user, %Notification{activity: activity}) do
254 should_send?(user, activity)
255 end
256
257 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
258 Enum.each(topics[topic] || [], fn socket ->
259 # Get the current user so we have up-to-date blocks etc.
260 if socket.assigns[:user] do
261 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
262
263 if should_send?(user, item) do
264 send(socket.transport_pid, {:text, represent_update(item, user)})
265 end
266 else
267 send(socket.transport_pid, {:text, represent_update(item)})
268 end
269 end)
270 end
271
272 def push_to_socket(topics, topic, %Participation{} = participation) do
273 Enum.each(topics[topic] || [], fn socket ->
274 send(socket.transport_pid, {:text, represent_conversation(participation)})
275 end)
276 end
277
278 def push_to_socket(topics, topic, %Activity{
279 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
280 }) do
281 Enum.each(topics[topic] || [], fn socket ->
282 send(
283 socket.transport_pid,
284 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
285 )
286 end)
287 end
288
289 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
290
291 def push_to_socket(topics, topic, item) do
292 Enum.each(topics[topic] || [], fn socket ->
293 # Get the current user so we have up-to-date blocks etc.
294 if socket.assigns[:user] do
295 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
296 blocks = user.info.blocks || []
297 mutes = user.info.mutes || []
298
299 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
300 true <- thread_containment(item, user) do
301 send(socket.transport_pid, {:text, represent_update(item, user)})
302 end
303 else
304 send(socket.transport_pid, {:text, represent_update(item)})
305 end
306 end)
307 end
308
309 defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
310 "#{topic}:#{socket.assigns[:user].id}"
311 end
312
313 defp internal_topic(topic, _), do: topic
314
315 @spec thread_containment(Activity.t(), User.t()) :: boolean()
316 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
317
318 defp thread_containment(activity, user) do
319 if Config.get([:instance, :skip_thread_containment]) do
320 true
321 else
322 ActivityPub.contain_activity(activity, user)
323 end
324 end
325 end