Merge branch 'develop' into issue/1276-2
[akkoma] / test / web / streamer / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 import Pleroma.Factory
9
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.List
12 alias Pleroma.User
13 alias Pleroma.Web.CommonAPI
14 alias Pleroma.Web.Streamer
15 alias Pleroma.Web.Streamer.StreamerSocket
16 alias Pleroma.Web.Streamer.Worker
17
18 @moduletag needs_streamer: true, capture_log: true
19
20 @streamer_timeout 150
21 @streamer_start_wait 10
22 setup do: clear_config([:instance, :skip_thread_containment])
23
24 describe "user streams" do
25 setup do
26 user = insert(:user)
27 notify = insert(:notification, user: user, activity: build(:note_activity))
28 {:ok, %{user: user, notify: notify}}
29 end
30
31 test "it streams the user's post in the 'user' stream", %{user: user} do
32 task =
33 Task.async(fn ->
34 assert_receive {:text, _}, @streamer_timeout
35 end)
36
37 Streamer.add_socket(
38 "user",
39 %{transport_pid: task.pid, assigns: %{user: user}}
40 )
41
42 {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
43
44 Streamer.stream("user", activity)
45 Task.await(task)
46 end
47
48 test "it streams boosts of the user in the 'user' stream", %{user: user} do
49 task =
50 Task.async(fn ->
51 assert_receive {:text, _}, @streamer_timeout
52 end)
53
54 Streamer.add_socket(
55 "user",
56 %{transport_pid: task.pid, assigns: %{user: user}}
57 )
58
59 other_user = insert(:user)
60 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
61 {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
62
63 Streamer.stream("user", announce)
64 Task.await(task)
65 end
66
67 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
68 task =
69 Task.async(fn ->
70 assert_receive {:text, _}, @streamer_timeout
71 end)
72
73 Streamer.add_socket(
74 "user",
75 %{transport_pid: task.pid, assigns: %{user: user}}
76 )
77
78 Streamer.stream("user", notify)
79 Task.await(task)
80 end
81
82 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
83 task =
84 Task.async(fn ->
85 assert_receive {:text, _}, @streamer_timeout
86 end)
87
88 Streamer.add_socket(
89 "user:notification",
90 %{transport_pid: task.pid, assigns: %{user: user}}
91 )
92
93 Streamer.stream("user:notification", notify)
94 Task.await(task)
95 end
96
97 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
98 user: user
99 } do
100 blocked = insert(:user)
101 {:ok, _user_relationship} = User.block(user, blocked)
102
103 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
104
105 Streamer.add_socket(
106 "user:notification",
107 %{transport_pid: task.pid, assigns: %{user: user}}
108 )
109
110 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
111 {:ok, notif} = CommonAPI.favorite(blocked, activity.id)
112
113 Streamer.stream("user:notification", notif)
114 Task.await(task)
115 end
116
117 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
118 user: user
119 } do
120 user2 = insert(:user)
121
122 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
123
124 Streamer.add_socket(
125 "user:notification",
126 %{transport_pid: task.pid, assigns: %{user: user}}
127 )
128
129 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
130 {:ok, activity} = CommonAPI.add_mute(user, activity)
131 {:ok, notif} = CommonAPI.favorite(user2, activity.id)
132
133 Streamer.stream("user:notification", notif)
134 Task.await(task)
135 end
136
137 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
138 user: user
139 } do
140 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
141
142 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
143
144 Streamer.add_socket(
145 "user:notification",
146 %{transport_pid: task.pid, assigns: %{user: user}}
147 )
148
149 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
150 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
151 {:ok, notif} = CommonAPI.favorite(user2, activity.id)
152
153 Streamer.stream("user:notification", notif)
154 Task.await(task)
155 end
156
157 test "it sends follow activities to the 'user:notification' stream", %{
158 user: user
159 } do
160 user_url = user.ap_id
161
162 body =
163 File.read!("test/fixtures/users_mock/localhost.json")
164 |> String.replace("{{nickname}}", user.nickname)
165 |> Jason.encode!()
166
167 Tesla.Mock.mock_global(fn
168 %{method: :get, url: ^user_url} ->
169 %Tesla.Env{status: 200, body: body}
170 end)
171
172 user2 = insert(:user)
173 task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
174
175 Process.sleep(@streamer_start_wait)
176
177 Streamer.add_socket(
178 "user:notification",
179 %{transport_pid: task.pid, assigns: %{user: user}}
180 )
181
182 {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
183
184 # We don't directly pipe the notification to the streamer as it's already
185 # generated as a side effect of CommonAPI.follow().
186 Task.await(task)
187 end
188 end
189
190 test "it sends to public" do
191 user = insert(:user)
192 other_user = insert(:user)
193
194 task =
195 Task.async(fn ->
196 assert_receive {:text, _}, @streamer_timeout
197 end)
198
199 fake_socket = %StreamerSocket{
200 transport_pid: task.pid,
201 user: user
202 }
203
204 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
205
206 topics = %{
207 "public" => [fake_socket]
208 }
209
210 Worker.push_to_socket(topics, "public", activity)
211
212 Task.await(task)
213
214 task =
215 Task.async(fn ->
216 expected_event =
217 %{
218 "event" => "delete",
219 "payload" => activity.id
220 }
221 |> Jason.encode!()
222
223 assert_receive {:text, received_event}, @streamer_timeout
224 assert received_event == expected_event
225 end)
226
227 fake_socket = %StreamerSocket{
228 transport_pid: task.pid,
229 user: user
230 }
231
232 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
233
234 topics = %{
235 "public" => [fake_socket]
236 }
237
238 Worker.push_to_socket(topics, "public", activity)
239
240 Task.await(task)
241 end
242
243 describe "thread_containment" do
244 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
245 Pleroma.Config.put([:instance, :skip_thread_containment], false)
246 author = insert(:user)
247 user = insert(:user)
248 User.follow(user, author, :follow_accept)
249
250 activity =
251 insert(:note_activity,
252 note:
253 insert(:note,
254 user: author,
255 data: %{"to" => ["TEST-FFF"]}
256 )
257 )
258
259 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
260 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
261 topics = %{"public" => [fake_socket]}
262 Worker.push_to_socket(topics, "public", activity)
263
264 Task.await(task)
265 end
266
267 test "it sends message if recipients invalid and thread containment is disabled" do
268 Pleroma.Config.put([:instance, :skip_thread_containment], true)
269 author = insert(:user)
270 user = insert(:user)
271 User.follow(user, author, :follow_accept)
272
273 activity =
274 insert(:note_activity,
275 note:
276 insert(:note,
277 user: author,
278 data: %{"to" => ["TEST-FFF"]}
279 )
280 )
281
282 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
283 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
284 topics = %{"public" => [fake_socket]}
285 Worker.push_to_socket(topics, "public", activity)
286
287 Task.await(task)
288 end
289
290 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
291 Pleroma.Config.put([:instance, :skip_thread_containment], false)
292 author = insert(:user)
293 user = insert(:user, skip_thread_containment: true)
294 User.follow(user, author, :follow_accept)
295
296 activity =
297 insert(:note_activity,
298 note:
299 insert(:note,
300 user: author,
301 data: %{"to" => ["TEST-FFF"]}
302 )
303 )
304
305 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
306 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
307 topics = %{"public" => [fake_socket]}
308 Worker.push_to_socket(topics, "public", activity)
309
310 Task.await(task)
311 end
312 end
313
314 describe "blocks" do
315 test "it doesn't send messages involving blocked users" do
316 user = insert(:user)
317 blocked_user = insert(:user)
318 {:ok, _user_relationship} = User.block(user, blocked_user)
319
320 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
321
322 task =
323 Task.async(fn ->
324 refute_receive {:text, _}, 1_000
325 end)
326
327 fake_socket = %StreamerSocket{
328 transport_pid: task.pid,
329 user: user
330 }
331
332 topics = %{
333 "public" => [fake_socket]
334 }
335
336 Worker.push_to_socket(topics, "public", activity)
337
338 Task.await(task)
339 end
340
341 test "it doesn't send messages transitively involving blocked users" do
342 blocker = insert(:user)
343 blockee = insert(:user)
344 friend = insert(:user)
345
346 task =
347 Task.async(fn ->
348 refute_receive {:text, _}, 1_000
349 end)
350
351 fake_socket = %StreamerSocket{
352 transport_pid: task.pid,
353 user: blocker
354 }
355
356 topics = %{
357 "public" => [fake_socket]
358 }
359
360 {:ok, _user_relationship} = User.block(blocker, blockee)
361
362 {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
363
364 Worker.push_to_socket(topics, "public", activity_one)
365
366 {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
367
368 Worker.push_to_socket(topics, "public", activity_two)
369
370 {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
371
372 Worker.push_to_socket(topics, "public", activity_three)
373
374 Task.await(task)
375 end
376 end
377
378 test "it doesn't send unwanted DMs to list" do
379 user_a = insert(:user)
380 user_b = insert(:user)
381 user_c = insert(:user)
382
383 {:ok, user_a} = User.follow(user_a, user_b)
384
385 {:ok, list} = List.create("Test", user_a)
386 {:ok, list} = List.follow(list, user_b)
387
388 {:ok, activity} =
389 CommonAPI.post(user_b, %{
390 "status" => "@#{user_c.nickname} Test",
391 "visibility" => "direct"
392 })
393
394 task =
395 Task.async(fn ->
396 refute_receive {:text, _}, 1_000
397 end)
398
399 fake_socket = %StreamerSocket{
400 transport_pid: task.pid,
401 user: user_a
402 }
403
404 topics = %{
405 "list:#{list.id}" => [fake_socket]
406 }
407
408 Worker.handle_call({:stream, "list", activity}, self(), topics)
409
410 Task.await(task)
411 end
412
413 test "it doesn't send unwanted private posts to list" do
414 user_a = insert(:user)
415 user_b = insert(:user)
416
417 {:ok, list} = List.create("Test", user_a)
418 {:ok, list} = List.follow(list, user_b)
419
420 {:ok, activity} =
421 CommonAPI.post(user_b, %{
422 "status" => "Test",
423 "visibility" => "private"
424 })
425
426 task =
427 Task.async(fn ->
428 refute_receive {:text, _}, 1_000
429 end)
430
431 fake_socket = %StreamerSocket{
432 transport_pid: task.pid,
433 user: user_a
434 }
435
436 topics = %{
437 "list:#{list.id}" => [fake_socket]
438 }
439
440 Worker.handle_call({:stream, "list", activity}, self(), topics)
441
442 Task.await(task)
443 end
444
445 test "it sends wanted private posts to list" do
446 user_a = insert(:user)
447 user_b = insert(:user)
448
449 {:ok, user_a} = User.follow(user_a, user_b)
450
451 {:ok, list} = List.create("Test", user_a)
452 {:ok, list} = List.follow(list, user_b)
453
454 {:ok, activity} =
455 CommonAPI.post(user_b, %{
456 "status" => "Test",
457 "visibility" => "private"
458 })
459
460 task =
461 Task.async(fn ->
462 assert_receive {:text, _}, 1_000
463 end)
464
465 fake_socket = %StreamerSocket{
466 transport_pid: task.pid,
467 user: user_a
468 }
469
470 Streamer.add_socket(
471 "list:#{list.id}",
472 fake_socket
473 )
474
475 Worker.handle_call({:stream, "list", activity}, self(), %{})
476
477 Task.await(task)
478 end
479
480 test "it doesn't send muted reblogs" do
481 user1 = insert(:user)
482 user2 = insert(:user)
483 user3 = insert(:user)
484 CommonAPI.hide_reblogs(user1, user2)
485
486 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
487 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
488
489 task =
490 Task.async(fn ->
491 refute_receive {:text, _}, 1_000
492 end)
493
494 fake_socket = %StreamerSocket{
495 transport_pid: task.pid,
496 user: user1
497 }
498
499 topics = %{
500 "public" => [fake_socket]
501 }
502
503 Worker.push_to_socket(topics, "public", announce_activity)
504
505 Task.await(task)
506 end
507
508 test "it does send non-reblog notification for reblog-muted actors" do
509 user1 = insert(:user)
510 user2 = insert(:user)
511 user3 = insert(:user)
512 CommonAPI.hide_reblogs(user1, user2)
513
514 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
515 {:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
516
517 task =
518 Task.async(fn ->
519 assert_receive {:text, _}, 1_000
520 end)
521
522 fake_socket = %StreamerSocket{
523 transport_pid: task.pid,
524 user: user1
525 }
526
527 topics = %{
528 "public" => [fake_socket]
529 }
530
531 Worker.push_to_socket(topics, "public", favorite_activity)
532
533 Task.await(task)
534 end
535
536 test "it doesn't send posts from muted threads" do
537 user = insert(:user)
538 user2 = insert(:user)
539 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
540
541 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
542
543 {:ok, activity} = CommonAPI.add_mute(user2, activity)
544
545 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
546
547 Streamer.add_socket(
548 "user",
549 %{transport_pid: task.pid, assigns: %{user: user2}}
550 )
551
552 Streamer.stream("user", activity)
553 Task.await(task)
554 end
555
556 describe "direct streams" do
557 setup do
558 :ok
559 end
560
561 test "it sends conversation update to the 'direct' stream", %{} do
562 user = insert(:user)
563 another_user = insert(:user)
564
565 task =
566 Task.async(fn ->
567 assert_receive {:text, received_event}, @streamer_timeout
568
569 assert %{"event" => "conversation", "payload" => received_payload} =
570 Jason.decode!(received_event)
571
572 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
573 [participation] = Participation.for_user(user)
574 assert last_status["pleroma"]["direct_conversation_id"] == participation.id
575 end)
576
577 Streamer.add_socket(
578 "direct",
579 %{transport_pid: task.pid, assigns: %{user: user}}
580 )
581
582 {:ok, _create_activity} =
583 CommonAPI.post(another_user, %{
584 "status" => "hey @#{user.nickname}",
585 "visibility" => "direct"
586 })
587
588 Task.await(task)
589 end
590
591 test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
592 user = insert(:user)
593 another_user = insert(:user)
594
595 {:ok, create_activity} =
596 CommonAPI.post(another_user, %{
597 "status" => "hi @#{user.nickname}",
598 "visibility" => "direct"
599 })
600
601 task =
602 Task.async(fn ->
603 assert_receive {:text, received_event}, @streamer_timeout
604 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
605
606 refute_receive {:text, _}, @streamer_timeout
607 end)
608
609 Process.sleep(@streamer_start_wait)
610
611 Streamer.add_socket(
612 "direct",
613 %{transport_pid: task.pid, assigns: %{user: user}}
614 )
615
616 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
617
618 Task.await(task)
619 end
620
621 test "it sends conversation update to the 'direct' stream when a message is deleted" do
622 user = insert(:user)
623 another_user = insert(:user)
624
625 {:ok, create_activity} =
626 CommonAPI.post(another_user, %{
627 "status" => "hi @#{user.nickname}",
628 "visibility" => "direct"
629 })
630
631 {:ok, create_activity2} =
632 CommonAPI.post(another_user, %{
633 "status" => "hi @#{user.nickname}",
634 "in_reply_to_status_id" => create_activity.id,
635 "visibility" => "direct"
636 })
637
638 task =
639 Task.async(fn ->
640 assert_receive {:text, received_event}, @streamer_timeout
641 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
642
643 assert_receive {:text, received_event}, @streamer_timeout
644
645 assert %{"event" => "conversation", "payload" => received_payload} =
646 Jason.decode!(received_event)
647
648 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
649 assert last_status["id"] == to_string(create_activity.id)
650 end)
651
652 Process.sleep(@streamer_start_wait)
653
654 Streamer.add_socket(
655 "direct",
656 %{transport_pid: task.pid, assigns: %{user: user}}
657 )
658
659 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
660
661 Task.await(task)
662 end
663 end
664 end