honour domain blocks on streaming notifications
[akkoma] / test / web / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 alias Pleroma.List
9 alias Pleroma.User
10 alias Pleroma.Web.CommonAPI
11 alias Pleroma.Web.Streamer
12 import Pleroma.Factory
13
14 setup do
15 skip_thread_containment = Pleroma.Config.get([:instance, :skip_thread_containment])
16
17 on_exit(fn ->
18 Pleroma.Config.put([:instance, :skip_thread_containment], skip_thread_containment)
19 end)
20
21 :ok
22 end
23
24 describe "user streams" do
25 setup do
26 GenServer.start(Streamer, %{}, name: Streamer)
27
28 on_exit(fn ->
29 if pid = Process.whereis(Streamer) do
30 Process.exit(pid, :kill)
31 end
32 end)
33
34 user = insert(:user)
35 notify = insert(:notification, user: user, activity: build(:note_activity))
36 {:ok, %{user: user, notify: notify}}
37 end
38
39 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
40 task =
41 Task.async(fn ->
42 assert_receive {:text, _}, 4_000
43 end)
44
45 Streamer.add_socket(
46 "user",
47 %{transport_pid: task.pid, assigns: %{user: user}}
48 )
49
50 Streamer.stream("user", notify)
51 Task.await(task)
52 end
53
54 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
55 task =
56 Task.async(fn ->
57 assert_receive {:text, _}, 4_000
58 end)
59
60 Streamer.add_socket(
61 "user:notification",
62 %{transport_pid: task.pid, assigns: %{user: user}}
63 )
64
65 Streamer.stream("user:notification", notify)
66 Task.await(task)
67 end
68
69 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
70 user: user
71 } do
72 blocked = insert(:user)
73 {:ok, user} = User.block(user, blocked)
74
75 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
76
77 Streamer.add_socket(
78 "user:notification",
79 %{transport_pid: task.pid, assigns: %{user: user}}
80 )
81
82 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
83 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
84
85 Streamer.stream("user:notification", notif)
86 Task.await(task)
87 end
88
89 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
90 user: user
91 } do
92 user2 = insert(:user)
93 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
94
95 Streamer.add_socket(
96 "user:notification",
97 %{transport_pid: task.pid, assigns: %{user: user}}
98 )
99
100 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
101 {:ok, activity} = CommonAPI.add_mute(user, activity)
102 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
103 Streamer.stream("user:notification", notif)
104 Task.await(task)
105 end
106
107 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
108 user: user
109 } do
110 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
111 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
112
113 Streamer.add_socket(
114 "user:notification",
115 %{transport_pid: task.pid, assigns: %{user: user}}
116 )
117
118 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
119 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
120 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
121 Streamer.stream("user:notification", notif)
122 Task.await(task)
123 end
124 end
125
126 test "it sends to public" do
127 user = insert(:user)
128 other_user = insert(:user)
129
130 task =
131 Task.async(fn ->
132 assert_receive {:text, _}, 4_000
133 end)
134
135 fake_socket = %{
136 transport_pid: task.pid,
137 assigns: %{
138 user: user
139 }
140 }
141
142 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
143
144 topics = %{
145 "public" => [fake_socket]
146 }
147
148 Streamer.push_to_socket(topics, "public", activity)
149
150 Task.await(task)
151
152 task =
153 Task.async(fn ->
154 expected_event =
155 %{
156 "event" => "delete",
157 "payload" => activity.id
158 }
159 |> Jason.encode!()
160
161 assert_receive {:text, received_event}, 4_000
162 assert received_event == expected_event
163 end)
164
165 fake_socket = %{
166 transport_pid: task.pid,
167 assigns: %{
168 user: user
169 }
170 }
171
172 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
173
174 topics = %{
175 "public" => [fake_socket]
176 }
177
178 Streamer.push_to_socket(topics, "public", activity)
179
180 Task.await(task)
181 end
182
183 describe "thread_containment" do
184 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
185 Pleroma.Config.put([:instance, :skip_thread_containment], false)
186 author = insert(:user)
187 user = insert(:user, following: [author.ap_id])
188
189 activity =
190 insert(:note_activity,
191 note:
192 insert(:note,
193 user: author,
194 data: %{"to" => ["TEST-FFF"]}
195 )
196 )
197
198 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
199 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
200 topics = %{"public" => [fake_socket]}
201 Streamer.push_to_socket(topics, "public", activity)
202
203 Task.await(task)
204 end
205
206 test "it sends message if recipients invalid and thread containment is disabled" do
207 Pleroma.Config.put([:instance, :skip_thread_containment], true)
208 author = insert(:user)
209 user = insert(:user, following: [author.ap_id])
210
211 activity =
212 insert(:note_activity,
213 note:
214 insert(:note,
215 user: author,
216 data: %{"to" => ["TEST-FFF"]}
217 )
218 )
219
220 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
221 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
222 topics = %{"public" => [fake_socket]}
223 Streamer.push_to_socket(topics, "public", activity)
224
225 Task.await(task)
226 end
227
228 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
229 Pleroma.Config.put([:instance, :skip_thread_containment], false)
230 author = insert(:user)
231 user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
232
233 activity =
234 insert(:note_activity,
235 note:
236 insert(:note,
237 user: author,
238 data: %{"to" => ["TEST-FFF"]}
239 )
240 )
241
242 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
243 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
244 topics = %{"public" => [fake_socket]}
245 Streamer.push_to_socket(topics, "public", activity)
246
247 Task.await(task)
248 end
249 end
250
251 test "it doesn't send to blocked users" do
252 user = insert(:user)
253 blocked_user = insert(:user)
254 {:ok, user} = User.block(user, blocked_user)
255
256 task =
257 Task.async(fn ->
258 refute_receive {:text, _}, 1_000
259 end)
260
261 fake_socket = %{
262 transport_pid: task.pid,
263 assigns: %{
264 user: user
265 }
266 }
267
268 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
269
270 topics = %{
271 "public" => [fake_socket]
272 }
273
274 Streamer.push_to_socket(topics, "public", activity)
275
276 Task.await(task)
277 end
278
279 test "it doesn't send unwanted DMs to list" do
280 user_a = insert(:user)
281 user_b = insert(:user)
282 user_c = insert(:user)
283
284 {:ok, user_a} = User.follow(user_a, user_b)
285
286 {:ok, list} = List.create("Test", user_a)
287 {:ok, list} = List.follow(list, user_b)
288
289 task =
290 Task.async(fn ->
291 refute_receive {:text, _}, 1_000
292 end)
293
294 fake_socket = %{
295 transport_pid: task.pid,
296 assigns: %{
297 user: user_a
298 }
299 }
300
301 {:ok, activity} =
302 CommonAPI.post(user_b, %{
303 "status" => "@#{user_c.nickname} Test",
304 "visibility" => "direct"
305 })
306
307 topics = %{
308 "list:#{list.id}" => [fake_socket]
309 }
310
311 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
312
313 Task.await(task)
314 end
315
316 test "it doesn't send unwanted private posts to list" do
317 user_a = insert(:user)
318 user_b = insert(:user)
319
320 {:ok, list} = List.create("Test", user_a)
321 {:ok, list} = List.follow(list, user_b)
322
323 task =
324 Task.async(fn ->
325 refute_receive {:text, _}, 1_000
326 end)
327
328 fake_socket = %{
329 transport_pid: task.pid,
330 assigns: %{
331 user: user_a
332 }
333 }
334
335 {:ok, activity} =
336 CommonAPI.post(user_b, %{
337 "status" => "Test",
338 "visibility" => "private"
339 })
340
341 topics = %{
342 "list:#{list.id}" => [fake_socket]
343 }
344
345 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
346
347 Task.await(task)
348 end
349
350 test "it send wanted private posts to list" do
351 user_a = insert(:user)
352 user_b = insert(:user)
353
354 {:ok, user_a} = User.follow(user_a, user_b)
355
356 {:ok, list} = List.create("Test", user_a)
357 {:ok, list} = List.follow(list, user_b)
358
359 task =
360 Task.async(fn ->
361 assert_receive {:text, _}, 1_000
362 end)
363
364 fake_socket = %{
365 transport_pid: task.pid,
366 assigns: %{
367 user: user_a
368 }
369 }
370
371 {:ok, activity} =
372 CommonAPI.post(user_b, %{
373 "status" => "Test",
374 "visibility" => "private"
375 })
376
377 topics = %{
378 "list:#{list.id}" => [fake_socket]
379 }
380
381 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
382
383 Task.await(task)
384 end
385
386 test "it doesn't send muted reblogs" do
387 user1 = insert(:user)
388 user2 = insert(:user)
389 user3 = insert(:user)
390 CommonAPI.hide_reblogs(user1, user2)
391
392 task =
393 Task.async(fn ->
394 refute_receive {:text, _}, 1_000
395 end)
396
397 fake_socket = %{
398 transport_pid: task.pid,
399 assigns: %{
400 user: user1
401 }
402 }
403
404 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
405 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
406
407 topics = %{
408 "public" => [fake_socket]
409 }
410
411 Streamer.push_to_socket(topics, "public", announce_activity)
412
413 Task.await(task)
414 end
415
416 describe "direct streams" do
417 setup do
418 GenServer.start(Streamer, %{}, name: Streamer)
419
420 on_exit(fn ->
421 if pid = Process.whereis(Streamer) do
422 Process.exit(pid, :kill)
423 end
424 end)
425
426 :ok
427 end
428
429 test "it sends conversation update to the 'direct' stream", %{} do
430 user = insert(:user)
431 another_user = insert(:user)
432
433 task =
434 Task.async(fn ->
435 assert_receive {:text, _received_event}, 4_000
436 end)
437
438 Streamer.add_socket(
439 "direct",
440 %{transport_pid: task.pid, assigns: %{user: user}}
441 )
442
443 {:ok, _create_activity} =
444 CommonAPI.post(another_user, %{
445 "status" => "hey @#{user.nickname}",
446 "visibility" => "direct"
447 })
448
449 Task.await(task)
450 end
451
452 test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
453 user = insert(:user)
454 another_user = insert(:user)
455
456 {:ok, create_activity} =
457 CommonAPI.post(another_user, %{
458 "status" => "hi @#{user.nickname}",
459 "visibility" => "direct"
460 })
461
462 task =
463 Task.async(fn ->
464 assert_receive {:text, received_event}, 4_000
465 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
466
467 refute_receive {:text, _}, 4_000
468 end)
469
470 Streamer.add_socket(
471 "direct",
472 %{transport_pid: task.pid, assigns: %{user: user}}
473 )
474
475 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
476
477 Task.await(task)
478 end
479
480 test "it sends conversation update to the 'direct' stream when a message is deleted" do
481 user = insert(:user)
482 another_user = insert(:user)
483
484 {:ok, create_activity} =
485 CommonAPI.post(another_user, %{
486 "status" => "hi @#{user.nickname}",
487 "visibility" => "direct"
488 })
489
490 {:ok, create_activity2} =
491 CommonAPI.post(another_user, %{
492 "status" => "hi @#{user.nickname}",
493 "in_reply_to_status_id" => create_activity.id,
494 "visibility" => "direct"
495 })
496
497 task =
498 Task.async(fn ->
499 assert_receive {:text, received_event}, 4_000
500 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
501
502 assert_receive {:text, received_event}, 4_000
503
504 assert %{"event" => "conversation", "payload" => received_payload} =
505 Jason.decode!(received_event)
506
507 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
508 assert last_status["id"] == to_string(create_activity.id)
509 end)
510
511 Streamer.add_socket(
512 "direct",
513 %{transport_pid: task.pid, assigns: %{user: user}}
514 )
515
516 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
517
518 Task.await(task)
519 end
520 end
521 end