Merge branch 'issue/209' into 'develop'
[akkoma] / test / web / streamer / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 import Pleroma.Factory
9
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.List
12 alias Pleroma.User
13 alias Pleroma.Web.CommonAPI
14 alias Pleroma.Web.Streamer
15 alias Pleroma.Web.Streamer.StreamerSocket
16 alias Pleroma.Web.Streamer.Worker
17
18 @moduletag needs_streamer: true, capture_log: true
19
20 @streamer_timeout 150
21 @streamer_start_wait 10
22 setup do: clear_config([:instance, :skip_thread_containment])
23
24 describe "user streams" do
25 setup do
26 user = insert(:user)
27 notify = insert(:notification, user: user, activity: build(:note_activity))
28 {:ok, %{user: user, notify: notify}}
29 end
30
31 test "it streams the user's post in the 'user' stream", %{user: user} do
32 task =
33 Task.async(fn ->
34 assert_receive {:text, _}, @streamer_timeout
35 end)
36
37 Streamer.add_socket(
38 "user",
39 %{transport_pid: task.pid, assigns: %{user: user}}
40 )
41
42 {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
43
44 Streamer.stream("user", activity)
45 Task.await(task)
46 end
47
48 test "it streams boosts of the user in the 'user' stream", %{user: user} do
49 task =
50 Task.async(fn ->
51 assert_receive {:text, _}, @streamer_timeout
52 end)
53
54 Streamer.add_socket(
55 "user",
56 %{transport_pid: task.pid, assigns: %{user: user}}
57 )
58
59 other_user = insert(:user)
60 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
61 {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
62
63 Streamer.stream("user", announce)
64 Task.await(task)
65 end
66
67 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
68 task =
69 Task.async(fn ->
70 assert_receive {:text, _}, @streamer_timeout
71 end)
72
73 Streamer.add_socket(
74 "user",
75 %{transport_pid: task.pid, assigns: %{user: user}}
76 )
77
78 Streamer.stream("user", notify)
79 Task.await(task)
80 end
81
82 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
83 task =
84 Task.async(fn ->
85 assert_receive {:text, _}, @streamer_timeout
86 end)
87
88 Streamer.add_socket(
89 "user:notification",
90 %{transport_pid: task.pid, assigns: %{user: user}}
91 )
92
93 Streamer.stream("user:notification", notify)
94 Task.await(task)
95 end
96
97 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
98 user: user
99 } do
100 blocked = insert(:user)
101 {:ok, _user_relationship} = User.block(user, blocked)
102
103 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
104
105 Streamer.add_socket(
106 "user:notification",
107 %{transport_pid: task.pid, assigns: %{user: user}}
108 )
109
110 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
111 {:ok, notif} = CommonAPI.favorite(blocked, activity.id)
112
113 Streamer.stream("user:notification", notif)
114 Task.await(task)
115 end
116
117 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
118 user: user
119 } do
120 user2 = insert(:user)
121
122 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
123
124 Streamer.add_socket(
125 "user:notification",
126 %{transport_pid: task.pid, assigns: %{user: user}}
127 )
128
129 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
130 {:ok, activity} = CommonAPI.add_mute(user, activity)
131 {:ok, notif} = CommonAPI.favorite(user2, activity.id)
132
133 Streamer.stream("user:notification", notif)
134 Task.await(task)
135 end
136
137 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
138 user: user
139 } do
140 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
141
142 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
143
144 Streamer.add_socket(
145 "user:notification",
146 %{transport_pid: task.pid, assigns: %{user: user}}
147 )
148
149 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
150 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
151 {:ok, notif} = CommonAPI.favorite(user2, activity.id)
152
153 Streamer.stream("user:notification", notif)
154 Task.await(task)
155 end
156
157 test "it sends follow activities to the 'user:notification' stream", %{
158 user: user
159 } do
160 user_url = user.ap_id
161
162 body =
163 File.read!("test/fixtures/users_mock/localhost.json")
164 |> String.replace("{{nickname}}", user.nickname)
165 |> Jason.encode!()
166
167 Tesla.Mock.mock_global(fn
168 %{method: :get, url: ^user_url} ->
169 %Tesla.Env{status: 200, body: body}
170 end)
171
172 user2 = insert(:user)
173 task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
174
175 Process.sleep(@streamer_start_wait)
176
177 Streamer.add_socket(
178 "user:notification",
179 %{transport_pid: task.pid, assigns: %{user: user}}
180 )
181
182 {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
183
184 # We don't directly pipe the notification to the streamer as it's already
185 # generated as a side effect of CommonAPI.follow().
186 Task.await(task)
187 end
188 end
189
190 test "it sends to public" do
191 user = insert(:user)
192 other_user = insert(:user)
193
194 task =
195 Task.async(fn ->
196 assert_receive {:text, _}, @streamer_timeout
197 end)
198
199 fake_socket = %StreamerSocket{
200 transport_pid: task.pid,
201 user: user
202 }
203
204 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
205
206 topics = %{
207 "public" => [fake_socket]
208 }
209
210 Worker.push_to_socket(topics, "public", activity)
211
212 Task.await(task)
213 end
214
215 test "works for deletions" do
216 user = insert(:user)
217 other_user = insert(:user)
218 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
219
220 task =
221 Task.async(fn ->
222 expected_event =
223 %{
224 "event" => "delete",
225 "payload" => activity.id
226 }
227 |> Jason.encode!()
228
229 assert_receive {:text, received_event}, @streamer_timeout
230 assert received_event == expected_event
231 end)
232
233 fake_socket = %StreamerSocket{
234 transport_pid: task.pid,
235 user: user
236 }
237
238 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
239
240 topics = %{
241 "public" => [fake_socket]
242 }
243
244 Worker.push_to_socket(topics, "public", activity)
245
246 Task.await(task)
247 end
248
249 describe "thread_containment" do
250 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
251 Pleroma.Config.put([:instance, :skip_thread_containment], false)
252 author = insert(:user)
253 user = insert(:user)
254 User.follow(user, author, :follow_accept)
255
256 activity =
257 insert(:note_activity,
258 note:
259 insert(:note,
260 user: author,
261 data: %{"to" => ["TEST-FFF"]}
262 )
263 )
264
265 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
266 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
267 topics = %{"public" => [fake_socket]}
268 Worker.push_to_socket(topics, "public", activity)
269
270 Task.await(task)
271 end
272
273 test "it sends message if recipients invalid and thread containment is disabled" do
274 Pleroma.Config.put([:instance, :skip_thread_containment], true)
275 author = insert(:user)
276 user = insert(:user)
277 User.follow(user, author, :follow_accept)
278
279 activity =
280 insert(:note_activity,
281 note:
282 insert(:note,
283 user: author,
284 data: %{"to" => ["TEST-FFF"]}
285 )
286 )
287
288 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
289 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
290 topics = %{"public" => [fake_socket]}
291 Worker.push_to_socket(topics, "public", activity)
292
293 Task.await(task)
294 end
295
296 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
297 Pleroma.Config.put([:instance, :skip_thread_containment], false)
298 author = insert(:user)
299 user = insert(:user, skip_thread_containment: true)
300 User.follow(user, author, :follow_accept)
301
302 activity =
303 insert(:note_activity,
304 note:
305 insert(:note,
306 user: author,
307 data: %{"to" => ["TEST-FFF"]}
308 )
309 )
310
311 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
312 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
313 topics = %{"public" => [fake_socket]}
314 Worker.push_to_socket(topics, "public", activity)
315
316 Task.await(task)
317 end
318 end
319
320 describe "blocks" do
321 test "it doesn't send messages involving blocked users" do
322 user = insert(:user)
323 blocked_user = insert(:user)
324 {:ok, _user_relationship} = User.block(user, blocked_user)
325
326 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
327
328 task =
329 Task.async(fn ->
330 refute_receive {:text, _}, 1_000
331 end)
332
333 fake_socket = %StreamerSocket{
334 transport_pid: task.pid,
335 user: user
336 }
337
338 topics = %{
339 "public" => [fake_socket]
340 }
341
342 Worker.push_to_socket(topics, "public", activity)
343
344 Task.await(task)
345 end
346
347 test "it doesn't send messages transitively involving blocked users" do
348 blocker = insert(:user)
349 blockee = insert(:user)
350 friend = insert(:user)
351
352 task =
353 Task.async(fn ->
354 refute_receive {:text, _}, 1_000
355 end)
356
357 fake_socket = %StreamerSocket{
358 transport_pid: task.pid,
359 user: blocker
360 }
361
362 topics = %{
363 "public" => [fake_socket]
364 }
365
366 {:ok, _user_relationship} = User.block(blocker, blockee)
367
368 {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
369
370 Worker.push_to_socket(topics, "public", activity_one)
371
372 {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
373
374 Worker.push_to_socket(topics, "public", activity_two)
375
376 {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
377
378 Worker.push_to_socket(topics, "public", activity_three)
379
380 Task.await(task)
381 end
382 end
383
384 test "it doesn't send unwanted DMs to list" do
385 user_a = insert(:user)
386 user_b = insert(:user)
387 user_c = insert(:user)
388
389 {:ok, user_a} = User.follow(user_a, user_b)
390
391 {:ok, list} = List.create("Test", user_a)
392 {:ok, list} = List.follow(list, user_b)
393
394 {:ok, activity} =
395 CommonAPI.post(user_b, %{
396 "status" => "@#{user_c.nickname} Test",
397 "visibility" => "direct"
398 })
399
400 task =
401 Task.async(fn ->
402 refute_receive {:text, _}, 1_000
403 end)
404
405 fake_socket = %StreamerSocket{
406 transport_pid: task.pid,
407 user: user_a
408 }
409
410 topics = %{
411 "list:#{list.id}" => [fake_socket]
412 }
413
414 Worker.handle_call({:stream, "list", activity}, self(), topics)
415
416 Task.await(task)
417 end
418
419 test "it doesn't send unwanted private posts to list" do
420 user_a = insert(:user)
421 user_b = insert(:user)
422
423 {:ok, list} = List.create("Test", user_a)
424 {:ok, list} = List.follow(list, user_b)
425
426 {:ok, activity} =
427 CommonAPI.post(user_b, %{
428 "status" => "Test",
429 "visibility" => "private"
430 })
431
432 task =
433 Task.async(fn ->
434 refute_receive {:text, _}, 1_000
435 end)
436
437 fake_socket = %StreamerSocket{
438 transport_pid: task.pid,
439 user: user_a
440 }
441
442 topics = %{
443 "list:#{list.id}" => [fake_socket]
444 }
445
446 Worker.handle_call({:stream, "list", activity}, self(), topics)
447
448 Task.await(task)
449 end
450
451 test "it sends wanted private posts to list" do
452 user_a = insert(:user)
453 user_b = insert(:user)
454
455 {:ok, user_a} = User.follow(user_a, user_b)
456
457 {:ok, list} = List.create("Test", user_a)
458 {:ok, list} = List.follow(list, user_b)
459
460 {:ok, activity} =
461 CommonAPI.post(user_b, %{
462 "status" => "Test",
463 "visibility" => "private"
464 })
465
466 task =
467 Task.async(fn ->
468 assert_receive {:text, _}, 1_000
469 end)
470
471 fake_socket = %StreamerSocket{
472 transport_pid: task.pid,
473 user: user_a
474 }
475
476 Streamer.add_socket(
477 "list:#{list.id}",
478 fake_socket
479 )
480
481 Worker.handle_call({:stream, "list", activity}, self(), %{})
482
483 Task.await(task)
484 end
485
486 test "it doesn't send muted reblogs" do
487 user1 = insert(:user)
488 user2 = insert(:user)
489 user3 = insert(:user)
490 CommonAPI.hide_reblogs(user1, user2)
491
492 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
493 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
494
495 task =
496 Task.async(fn ->
497 refute_receive {:text, _}, 1_000
498 end)
499
500 fake_socket = %StreamerSocket{
501 transport_pid: task.pid,
502 user: user1
503 }
504
505 topics = %{
506 "public" => [fake_socket]
507 }
508
509 Worker.push_to_socket(topics, "public", announce_activity)
510
511 Task.await(task)
512 end
513
514 test "it does send non-reblog notification for reblog-muted actors" do
515 user1 = insert(:user)
516 user2 = insert(:user)
517 user3 = insert(:user)
518 CommonAPI.hide_reblogs(user1, user2)
519
520 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
521 {:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
522
523 task =
524 Task.async(fn ->
525 assert_receive {:text, _}, 1_000
526 end)
527
528 fake_socket = %StreamerSocket{
529 transport_pid: task.pid,
530 user: user1
531 }
532
533 topics = %{
534 "public" => [fake_socket]
535 }
536
537 Worker.push_to_socket(topics, "public", favorite_activity)
538
539 Task.await(task)
540 end
541
542 test "it doesn't send posts from muted threads" do
543 user = insert(:user)
544 user2 = insert(:user)
545 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
546
547 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
548
549 {:ok, activity} = CommonAPI.add_mute(user2, activity)
550
551 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
552
553 Streamer.add_socket(
554 "user",
555 %{transport_pid: task.pid, assigns: %{user: user2}}
556 )
557
558 Streamer.stream("user", activity)
559 Task.await(task)
560 end
561
562 describe "direct streams" do
563 setup do
564 :ok
565 end
566
567 test "it sends conversation update to the 'direct' stream", %{} do
568 user = insert(:user)
569 another_user = insert(:user)
570
571 task =
572 Task.async(fn ->
573 assert_receive {:text, received_event}, @streamer_timeout
574
575 assert %{"event" => "conversation", "payload" => received_payload} =
576 Jason.decode!(received_event)
577
578 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
579 [participation] = Participation.for_user(user)
580 assert last_status["pleroma"]["direct_conversation_id"] == participation.id
581 end)
582
583 Streamer.add_socket(
584 "direct",
585 %{transport_pid: task.pid, assigns: %{user: user}}
586 )
587
588 {:ok, _create_activity} =
589 CommonAPI.post(another_user, %{
590 "status" => "hey @#{user.nickname}",
591 "visibility" => "direct"
592 })
593
594 Task.await(task)
595 end
596
597 test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
598 user = insert(:user)
599 another_user = insert(:user)
600
601 {:ok, create_activity} =
602 CommonAPI.post(another_user, %{
603 "status" => "hi @#{user.nickname}",
604 "visibility" => "direct"
605 })
606
607 task =
608 Task.async(fn ->
609 assert_receive {:text, received_event}, @streamer_timeout
610 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
611
612 refute_receive {:text, _}, @streamer_timeout
613 end)
614
615 Process.sleep(@streamer_start_wait)
616
617 Streamer.add_socket(
618 "direct",
619 %{transport_pid: task.pid, assigns: %{user: user}}
620 )
621
622 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
623
624 Task.await(task)
625 end
626
627 test "it sends conversation update to the 'direct' stream when a message is deleted" do
628 user = insert(:user)
629 another_user = insert(:user)
630
631 {:ok, create_activity} =
632 CommonAPI.post(another_user, %{
633 "status" => "hi @#{user.nickname}",
634 "visibility" => "direct"
635 })
636
637 {:ok, create_activity2} =
638 CommonAPI.post(another_user, %{
639 "status" => "hi @#{user.nickname}",
640 "in_reply_to_status_id" => create_activity.id,
641 "visibility" => "direct"
642 })
643
644 task =
645 Task.async(fn ->
646 assert_receive {:text, received_event}, @streamer_timeout
647 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
648
649 assert_receive {:text, received_event}, @streamer_timeout
650
651 assert %{"event" => "conversation", "payload" => received_payload} =
652 Jason.decode!(received_event)
653
654 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
655 assert last_status["id"] == to_string(create_activity.id)
656 end)
657
658 Process.sleep(@streamer_start_wait)
659
660 Streamer.add_socket(
661 "direct",
662 %{transport_pid: task.pid, assigns: %{user: user}}
663 )
664
665 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
666
667 Task.await(task)
668 end
669 end
670 end