Merge branch 'fix/bugfix-blocked-user-follow-reqs' into 'develop'
[akkoma] / test / web / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 alias Pleroma.List
9 alias Pleroma.User
10 alias Pleroma.Web.CommonAPI
11 alias Pleroma.Web.Streamer
12 import Pleroma.Factory
13
14 setup do
15 skip_thread_containment = Pleroma.Config.get([:instance, :skip_thread_containment])
16
17 on_exit(fn ->
18 Pleroma.Config.put([:instance, :skip_thread_containment], skip_thread_containment)
19 end)
20
21 :ok
22 end
23
24 describe "user streams" do
25 setup do
26 GenServer.start(Streamer, %{}, name: Streamer)
27
28 on_exit(fn ->
29 if pid = Process.whereis(Streamer) do
30 Process.exit(pid, :kill)
31 end
32 end)
33
34 user = insert(:user)
35 notify = insert(:notification, user: user, activity: build(:note_activity))
36 {:ok, %{user: user, notify: notify}}
37 end
38
39 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
40 task =
41 Task.async(fn ->
42 assert_receive {:text, _}, 4_000
43 end)
44
45 Streamer.add_socket(
46 "user",
47 %{transport_pid: task.pid, assigns: %{user: user}}
48 )
49
50 Streamer.stream("user", notify)
51 Task.await(task)
52 end
53
54 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
55 task =
56 Task.async(fn ->
57 assert_receive {:text, _}, 4_000
58 end)
59
60 Streamer.add_socket(
61 "user:notification",
62 %{transport_pid: task.pid, assigns: %{user: user}}
63 )
64
65 Streamer.stream("user:notification", notify)
66 Task.await(task)
67 end
68
69 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
70 user: user
71 } do
72 blocked = insert(:user)
73 {:ok, user} = User.block(user, blocked)
74
75 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
76
77 Streamer.add_socket(
78 "user:notification",
79 %{transport_pid: task.pid, assigns: %{user: user}}
80 )
81
82 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
83 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
84
85 Streamer.stream("user:notification", notif)
86 Task.await(task)
87 end
88
89 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
90 user: user
91 } do
92 user2 = insert(:user)
93 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
94
95 Streamer.add_socket(
96 "user:notification",
97 %{transport_pid: task.pid, assigns: %{user: user}}
98 )
99
100 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
101 {:ok, activity} = CommonAPI.add_mute(user, activity)
102 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
103 Streamer.stream("user:notification", notif)
104 Task.await(task)
105 end
106
107 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
108 user: user
109 } do
110 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
111 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
112
113 Streamer.add_socket(
114 "user:notification",
115 %{transport_pid: task.pid, assigns: %{user: user}}
116 )
117
118 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
119 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
120 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
121
122 Streamer.stream("user:notification", notif)
123 Task.await(task)
124 end
125 end
126
127 test "it sends to public" do
128 user = insert(:user)
129 other_user = insert(:user)
130
131 task =
132 Task.async(fn ->
133 assert_receive {:text, _}, 4_000
134 end)
135
136 fake_socket = %{
137 transport_pid: task.pid,
138 assigns: %{
139 user: user
140 }
141 }
142
143 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
144
145 topics = %{
146 "public" => [fake_socket]
147 }
148
149 Streamer.push_to_socket(topics, "public", activity)
150
151 Task.await(task)
152
153 task =
154 Task.async(fn ->
155 expected_event =
156 %{
157 "event" => "delete",
158 "payload" => activity.id
159 }
160 |> Jason.encode!()
161
162 assert_receive {:text, received_event}, 4_000
163 assert received_event == expected_event
164 end)
165
166 fake_socket = %{
167 transport_pid: task.pid,
168 assigns: %{
169 user: user
170 }
171 }
172
173 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
174
175 topics = %{
176 "public" => [fake_socket]
177 }
178
179 Streamer.push_to_socket(topics, "public", activity)
180
181 Task.await(task)
182 end
183
184 describe "thread_containment" do
185 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
186 Pleroma.Config.put([:instance, :skip_thread_containment], false)
187 author = insert(:user)
188 user = insert(:user, following: [author.ap_id])
189
190 activity =
191 insert(:note_activity,
192 note:
193 insert(:note,
194 user: author,
195 data: %{"to" => ["TEST-FFF"]}
196 )
197 )
198
199 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
200 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
201 topics = %{"public" => [fake_socket]}
202 Streamer.push_to_socket(topics, "public", activity)
203
204 Task.await(task)
205 end
206
207 test "it sends message if recipients invalid and thread containment is disabled" do
208 Pleroma.Config.put([:instance, :skip_thread_containment], true)
209 author = insert(:user)
210 user = insert(:user, following: [author.ap_id])
211
212 activity =
213 insert(:note_activity,
214 note:
215 insert(:note,
216 user: author,
217 data: %{"to" => ["TEST-FFF"]}
218 )
219 )
220
221 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
222 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
223 topics = %{"public" => [fake_socket]}
224 Streamer.push_to_socket(topics, "public", activity)
225
226 Task.await(task)
227 end
228
229 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
230 Pleroma.Config.put([:instance, :skip_thread_containment], false)
231 author = insert(:user)
232 user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
233
234 activity =
235 insert(:note_activity,
236 note:
237 insert(:note,
238 user: author,
239 data: %{"to" => ["TEST-FFF"]}
240 )
241 )
242
243 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
244 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
245 topics = %{"public" => [fake_socket]}
246 Streamer.push_to_socket(topics, "public", activity)
247
248 Task.await(task)
249 end
250 end
251
252 test "it doesn't send to blocked users" do
253 user = insert(:user)
254 blocked_user = insert(:user)
255 {:ok, user} = User.block(user, blocked_user)
256
257 task =
258 Task.async(fn ->
259 refute_receive {:text, _}, 1_000
260 end)
261
262 fake_socket = %{
263 transport_pid: task.pid,
264 assigns: %{
265 user: user
266 }
267 }
268
269 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
270
271 topics = %{
272 "public" => [fake_socket]
273 }
274
275 Streamer.push_to_socket(topics, "public", activity)
276
277 Task.await(task)
278 end
279
280 test "it doesn't send unwanted DMs to list" do
281 user_a = insert(:user)
282 user_b = insert(:user)
283 user_c = insert(:user)
284
285 {:ok, user_a} = User.follow(user_a, user_b)
286
287 {:ok, list} = List.create("Test", user_a)
288 {:ok, list} = List.follow(list, user_b)
289
290 task =
291 Task.async(fn ->
292 refute_receive {:text, _}, 1_000
293 end)
294
295 fake_socket = %{
296 transport_pid: task.pid,
297 assigns: %{
298 user: user_a
299 }
300 }
301
302 {:ok, activity} =
303 CommonAPI.post(user_b, %{
304 "status" => "@#{user_c.nickname} Test",
305 "visibility" => "direct"
306 })
307
308 topics = %{
309 "list:#{list.id}" => [fake_socket]
310 }
311
312 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
313
314 Task.await(task)
315 end
316
317 test "it doesn't send unwanted private posts to list" do
318 user_a = insert(:user)
319 user_b = insert(:user)
320
321 {:ok, list} = List.create("Test", user_a)
322 {:ok, list} = List.follow(list, user_b)
323
324 task =
325 Task.async(fn ->
326 refute_receive {:text, _}, 1_000
327 end)
328
329 fake_socket = %{
330 transport_pid: task.pid,
331 assigns: %{
332 user: user_a
333 }
334 }
335
336 {:ok, activity} =
337 CommonAPI.post(user_b, %{
338 "status" => "Test",
339 "visibility" => "private"
340 })
341
342 topics = %{
343 "list:#{list.id}" => [fake_socket]
344 }
345
346 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
347
348 Task.await(task)
349 end
350
351 test "it send wanted private posts to list" do
352 user_a = insert(:user)
353 user_b = insert(:user)
354
355 {:ok, user_a} = User.follow(user_a, user_b)
356
357 {:ok, list} = List.create("Test", user_a)
358 {:ok, list} = List.follow(list, user_b)
359
360 task =
361 Task.async(fn ->
362 assert_receive {:text, _}, 1_000
363 end)
364
365 fake_socket = %{
366 transport_pid: task.pid,
367 assigns: %{
368 user: user_a
369 }
370 }
371
372 {:ok, activity} =
373 CommonAPI.post(user_b, %{
374 "status" => "Test",
375 "visibility" => "private"
376 })
377
378 topics = %{
379 "list:#{list.id}" => [fake_socket]
380 }
381
382 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
383
384 Task.await(task)
385 end
386
387 test "it doesn't send muted reblogs" do
388 user1 = insert(:user)
389 user2 = insert(:user)
390 user3 = insert(:user)
391 CommonAPI.hide_reblogs(user1, user2)
392
393 task =
394 Task.async(fn ->
395 refute_receive {:text, _}, 1_000
396 end)
397
398 fake_socket = %{
399 transport_pid: task.pid,
400 assigns: %{
401 user: user1
402 }
403 }
404
405 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
406 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
407
408 topics = %{
409 "public" => [fake_socket]
410 }
411
412 Streamer.push_to_socket(topics, "public", announce_activity)
413
414 Task.await(task)
415 end
416
417 test "it doesn't send posts from muted threads" do
418 user = insert(:user)
419 user2 = insert(:user)
420 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
421
422 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
423
424 {:ok, activity} = CommonAPI.add_mute(user2, activity)
425
426 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
427
428 Streamer.add_socket(
429 "user",
430 %{transport_pid: task.pid, assigns: %{user: user2}}
431 )
432
433 Streamer.stream("user", activity)
434 Task.await(task)
435 end
436
437 describe "direct streams" do
438 setup do
439 GenServer.start(Streamer, %{}, name: Streamer)
440
441 on_exit(fn ->
442 if pid = Process.whereis(Streamer) do
443 Process.exit(pid, :kill)
444 end
445 end)
446
447 :ok
448 end
449
450 test "it sends conversation update to the 'direct' stream", %{} do
451 user = insert(:user)
452 another_user = insert(:user)
453
454 task =
455 Task.async(fn ->
456 assert_receive {:text, _received_event}, 4_000
457 end)
458
459 Streamer.add_socket(
460 "direct",
461 %{transport_pid: task.pid, assigns: %{user: user}}
462 )
463
464 {:ok, _create_activity} =
465 CommonAPI.post(another_user, %{
466 "status" => "hey @#{user.nickname}",
467 "visibility" => "direct"
468 })
469
470 Task.await(task)
471 end
472
473 test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
474 user = insert(:user)
475 another_user = insert(:user)
476
477 {:ok, create_activity} =
478 CommonAPI.post(another_user, %{
479 "status" => "hi @#{user.nickname}",
480 "visibility" => "direct"
481 })
482
483 task =
484 Task.async(fn ->
485 assert_receive {:text, received_event}, 4_000
486 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
487
488 refute_receive {:text, _}, 4_000
489 end)
490
491 Streamer.add_socket(
492 "direct",
493 %{transport_pid: task.pid, assigns: %{user: user}}
494 )
495
496 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
497
498 Task.await(task)
499 end
500
501 test "it sends conversation update to the 'direct' stream when a message is deleted" do
502 user = insert(:user)
503 another_user = insert(:user)
504
505 {:ok, create_activity} =
506 CommonAPI.post(another_user, %{
507 "status" => "hi @#{user.nickname}",
508 "visibility" => "direct"
509 })
510
511 {:ok, create_activity2} =
512 CommonAPI.post(another_user, %{
513 "status" => "hi @#{user.nickname}",
514 "in_reply_to_status_id" => create_activity.id,
515 "visibility" => "direct"
516 })
517
518 task =
519 Task.async(fn ->
520 assert_receive {:text, received_event}, 4_000
521 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
522
523 assert_receive {:text, received_event}, 4_000
524
525 assert %{"event" => "conversation", "payload" => received_payload} =
526 Jason.decode!(received_event)
527
528 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
529 assert last_status["id"] == to_string(create_activity.id)
530 end)
531
532 Streamer.add_socket(
533 "direct",
534 %{transport_pid: task.pid, assigns: %{user: user}}
535 )
536
537 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
538
539 Task.await(task)
540 end
541 end
542 end