Merge remote-tracking branch 'origin/develop' into benchmark-finishing
[akkoma] / test / web / streamer / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 import Pleroma.Factory
9
10 alias Pleroma.List
11 alias Pleroma.User
12 alias Pleroma.Web.CommonAPI
13 alias Pleroma.Web.Streamer
14 alias Pleroma.Web.Streamer.StreamerSocket
15 alias Pleroma.Web.Streamer.Worker
16
17 @moduletag needs_streamer: true
18 clear_config_all([:instance, :skip_thread_containment])
19
20 describe "user streams" do
21 setup do
22 user = insert(:user)
23 notify = insert(:notification, user: user, activity: build(:note_activity))
24 {:ok, %{user: user, notify: notify}}
25 end
26
27 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
28 task =
29 Task.async(fn ->
30 assert_receive {:text, _}, 4_000
31 end)
32
33 Streamer.add_socket(
34 "user",
35 %{transport_pid: task.pid, assigns: %{user: user}}
36 )
37
38 Streamer.stream("user", notify)
39 Task.await(task)
40 end
41
42 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
43 task =
44 Task.async(fn ->
45 assert_receive {:text, _}, 4_000
46 end)
47
48 Streamer.add_socket(
49 "user:notification",
50 %{transport_pid: task.pid, assigns: %{user: user}}
51 )
52
53 Streamer.stream("user:notification", notify)
54 Task.await(task)
55 end
56
57 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
58 user: user
59 } do
60 blocked = insert(:user)
61 {:ok, user} = User.block(user, blocked)
62
63 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
64
65 Streamer.add_socket(
66 "user:notification",
67 %{transport_pid: task.pid, assigns: %{user: user}}
68 )
69
70 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
71 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
72
73 Streamer.stream("user:notification", notif)
74 Task.await(task)
75 end
76
77 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
78 user: user
79 } do
80 user2 = insert(:user)
81 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
82
83 Streamer.add_socket(
84 "user:notification",
85 %{transport_pid: task.pid, assigns: %{user: user}}
86 )
87
88 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
89 {:ok, activity} = CommonAPI.add_mute(user, activity)
90 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
91 Streamer.stream("user:notification", notif)
92 Task.await(task)
93 end
94
95 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
96 user: user
97 } do
98 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
99 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
100
101 Streamer.add_socket(
102 "user:notification",
103 %{transport_pid: task.pid, assigns: %{user: user}}
104 )
105
106 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
107 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
108 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
109
110 Streamer.stream("user:notification", notif)
111 Task.await(task)
112 end
113 end
114
115 test "it sends to public" do
116 user = insert(:user)
117 other_user = insert(:user)
118
119 task =
120 Task.async(fn ->
121 assert_receive {:text, _}, 4_000
122 end)
123
124 fake_socket = %StreamerSocket{
125 transport_pid: task.pid,
126 user: user
127 }
128
129 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
130
131 topics = %{
132 "public" => [fake_socket]
133 }
134
135 Worker.push_to_socket(topics, "public", activity)
136
137 Task.await(task)
138
139 task =
140 Task.async(fn ->
141 expected_event =
142 %{
143 "event" => "delete",
144 "payload" => activity.id
145 }
146 |> Jason.encode!()
147
148 assert_receive {:text, received_event}, 4_000
149 assert received_event == expected_event
150 end)
151
152 fake_socket = %StreamerSocket{
153 transport_pid: task.pid,
154 user: user
155 }
156
157 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
158
159 topics = %{
160 "public" => [fake_socket]
161 }
162
163 Worker.push_to_socket(topics, "public", activity)
164
165 Task.await(task)
166 end
167
168 describe "thread_containment" do
169 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
170 Pleroma.Config.put([:instance, :skip_thread_containment], false)
171 author = insert(:user)
172 user = insert(:user, following: [author.ap_id])
173
174 activity =
175 insert(:note_activity,
176 note:
177 insert(:note,
178 user: author,
179 data: %{"to" => ["TEST-FFF"]}
180 )
181 )
182
183 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
184 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
185 topics = %{"public" => [fake_socket]}
186 Worker.push_to_socket(topics, "public", activity)
187
188 Task.await(task)
189 end
190
191 test "it sends message if recipients invalid and thread containment is disabled" do
192 Pleroma.Config.put([:instance, :skip_thread_containment], true)
193 author = insert(:user)
194 user = insert(:user, following: [author.ap_id])
195
196 activity =
197 insert(:note_activity,
198 note:
199 insert(:note,
200 user: author,
201 data: %{"to" => ["TEST-FFF"]}
202 )
203 )
204
205 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
206 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
207 topics = %{"public" => [fake_socket]}
208 Worker.push_to_socket(topics, "public", activity)
209
210 Task.await(task)
211 end
212
213 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
214 Pleroma.Config.put([:instance, :skip_thread_containment], false)
215 author = insert(:user)
216 user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
217
218 activity =
219 insert(:note_activity,
220 note:
221 insert(:note,
222 user: author,
223 data: %{"to" => ["TEST-FFF"]}
224 )
225 )
226
227 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
228 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
229 topics = %{"public" => [fake_socket]}
230 Worker.push_to_socket(topics, "public", activity)
231
232 Task.await(task)
233 end
234 end
235
236 describe "blocks" do
237 test "it doesn't send messages involving blocked users" do
238 user = insert(:user)
239 blocked_user = insert(:user)
240 {:ok, user} = User.block(user, blocked_user)
241
242 task =
243 Task.async(fn ->
244 refute_receive {:text, _}, 1_000
245 end)
246
247 fake_socket = %StreamerSocket{
248 transport_pid: task.pid,
249 user: user
250 }
251
252 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
253
254 topics = %{
255 "public" => [fake_socket]
256 }
257
258 Worker.push_to_socket(topics, "public", activity)
259
260 Task.await(task)
261 end
262
263 test "it doesn't send messages transitively involving blocked users" do
264 blocker = insert(:user)
265 blockee = insert(:user)
266 friend = insert(:user)
267
268 task =
269 Task.async(fn ->
270 refute_receive {:text, _}, 1_000
271 end)
272
273 fake_socket = %StreamerSocket{
274 transport_pid: task.pid,
275 user: blocker
276 }
277
278 topics = %{
279 "public" => [fake_socket]
280 }
281
282 {:ok, blocker} = User.block(blocker, blockee)
283
284 {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
285
286 Worker.push_to_socket(topics, "public", activity_one)
287
288 {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
289
290 Worker.push_to_socket(topics, "public", activity_two)
291
292 {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
293
294 Worker.push_to_socket(topics, "public", activity_three)
295
296 Task.await(task)
297 end
298 end
299
300 test "it doesn't send unwanted DMs to list" do
301 user_a = insert(:user)
302 user_b = insert(:user)
303 user_c = insert(:user)
304
305 {:ok, user_a} = User.follow(user_a, user_b)
306
307 {:ok, list} = List.create("Test", user_a)
308 {:ok, list} = List.follow(list, user_b)
309
310 task =
311 Task.async(fn ->
312 refute_receive {:text, _}, 1_000
313 end)
314
315 fake_socket = %StreamerSocket{
316 transport_pid: task.pid,
317 user: user_a
318 }
319
320 {:ok, activity} =
321 CommonAPI.post(user_b, %{
322 "status" => "@#{user_c.nickname} Test",
323 "visibility" => "direct"
324 })
325
326 topics = %{
327 "list:#{list.id}" => [fake_socket]
328 }
329
330 Worker.handle_call({:stream, "list", activity}, self(), topics)
331
332 Task.await(task)
333 end
334
335 test "it doesn't send unwanted private posts to list" do
336 user_a = insert(:user)
337 user_b = insert(:user)
338
339 {:ok, list} = List.create("Test", user_a)
340 {:ok, list} = List.follow(list, user_b)
341
342 task =
343 Task.async(fn ->
344 refute_receive {:text, _}, 1_000
345 end)
346
347 fake_socket = %StreamerSocket{
348 transport_pid: task.pid,
349 user: user_a
350 }
351
352 {:ok, activity} =
353 CommonAPI.post(user_b, %{
354 "status" => "Test",
355 "visibility" => "private"
356 })
357
358 topics = %{
359 "list:#{list.id}" => [fake_socket]
360 }
361
362 Worker.handle_call({:stream, "list", activity}, self(), topics)
363
364 Task.await(task)
365 end
366
367 test "it sends wanted private posts to list" do
368 user_a = insert(:user)
369 user_b = insert(:user)
370
371 {:ok, user_a} = User.follow(user_a, user_b)
372
373 {:ok, list} = List.create("Test", user_a)
374 {:ok, list} = List.follow(list, user_b)
375
376 task =
377 Task.async(fn ->
378 assert_receive {:text, _}, 1_000
379 end)
380
381 fake_socket = %StreamerSocket{
382 transport_pid: task.pid,
383 user: user_a
384 }
385
386 {:ok, activity} =
387 CommonAPI.post(user_b, %{
388 "status" => "Test",
389 "visibility" => "private"
390 })
391
392 Streamer.add_socket(
393 "list:#{list.id}",
394 fake_socket
395 )
396
397 Worker.handle_call({:stream, "list", activity}, self(), %{})
398
399 Task.await(task)
400 end
401
402 test "it doesn't send muted reblogs" do
403 user1 = insert(:user)
404 user2 = insert(:user)
405 user3 = insert(:user)
406 CommonAPI.hide_reblogs(user1, user2)
407
408 task =
409 Task.async(fn ->
410 refute_receive {:text, _}, 1_000
411 end)
412
413 fake_socket = %StreamerSocket{
414 transport_pid: task.pid,
415 user: user1
416 }
417
418 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
419 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
420
421 topics = %{
422 "public" => [fake_socket]
423 }
424
425 Worker.push_to_socket(topics, "public", announce_activity)
426
427 Task.await(task)
428 end
429
430 test "it doesn't send posts from muted threads" do
431 user = insert(:user)
432 user2 = insert(:user)
433 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
434
435 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
436
437 {:ok, activity} = CommonAPI.add_mute(user2, activity)
438
439 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
440
441 Process.sleep(4000)
442
443 Streamer.add_socket(
444 "user",
445 %{transport_pid: task.pid, assigns: %{user: user2}}
446 )
447
448 Streamer.stream("user", activity)
449 Task.await(task)
450 end
451
452 describe "direct streams" do
453 setup do
454 :ok
455 end
456
457 test "it sends conversation update to the 'direct' stream", %{} do
458 user = insert(:user)
459 another_user = insert(:user)
460
461 task =
462 Task.async(fn ->
463 assert_receive {:text, _received_event}, 4_000
464 end)
465
466 Streamer.add_socket(
467 "direct",
468 %{transport_pid: task.pid, assigns: %{user: user}}
469 )
470
471 {:ok, _create_activity} =
472 CommonAPI.post(another_user, %{
473 "status" => "hey @#{user.nickname}",
474 "visibility" => "direct"
475 })
476
477 Task.await(task)
478 end
479
480 test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
481 user = insert(:user)
482 another_user = insert(:user)
483
484 {:ok, create_activity} =
485 CommonAPI.post(another_user, %{
486 "status" => "hi @#{user.nickname}",
487 "visibility" => "direct"
488 })
489
490 task =
491 Task.async(fn ->
492 assert_receive {:text, received_event}, 4_000
493 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
494
495 refute_receive {:text, _}, 4_000
496 end)
497
498 Process.sleep(1000)
499
500 Streamer.add_socket(
501 "direct",
502 %{transport_pid: task.pid, assigns: %{user: user}}
503 )
504
505 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
506
507 Task.await(task)
508 end
509
510 test "it sends conversation update to the 'direct' stream when a message is deleted" do
511 user = insert(:user)
512 another_user = insert(:user)
513
514 {:ok, create_activity} =
515 CommonAPI.post(another_user, %{
516 "status" => "hi @#{user.nickname}",
517 "visibility" => "direct"
518 })
519
520 {:ok, create_activity2} =
521 CommonAPI.post(another_user, %{
522 "status" => "hi @#{user.nickname}",
523 "in_reply_to_status_id" => create_activity.id,
524 "visibility" => "direct"
525 })
526
527 task =
528 Task.async(fn ->
529 assert_receive {:text, received_event}, 4_000
530 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
531
532 assert_receive {:text, received_event}, 4_000
533
534 assert %{"event" => "conversation", "payload" => received_payload} =
535 Jason.decode!(received_event)
536
537 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
538 assert last_status["id"] == to_string(create_activity.id)
539 end)
540
541 Process.sleep(1000)
542
543 Streamer.add_socket(
544 "direct",
545 %{transport_pid: task.pid, assigns: %{user: user}}
546 )
547
548 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
549
550 Task.await(task)
551 end
552 end
553 end