Revert "Merge branch 'streamer-refactoring' into 'develop'"
[akkoma] / test / web / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 alias Pleroma.List
9 alias Pleroma.User
10 alias Pleroma.Web.CommonAPI
11 alias Pleroma.Web.Streamer
12 import Pleroma.Factory
13
14 clear_config_all([:instance, :skip_thread_containment])
15
16 describe "user streams" do
17 setup do
18 GenServer.start(Streamer, %{}, name: Streamer)
19
20 on_exit(fn ->
21 if pid = Process.whereis(Streamer) do
22 Process.exit(pid, :kill)
23 end
24 end)
25
26 user = insert(:user)
27 notify = insert(:notification, user: user, activity: build(:note_activity))
28 {:ok, %{user: user, notify: notify}}
29 end
30
31 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
32 task =
33 Task.async(fn ->
34 assert_receive {:text, _}, 4_000
35 end)
36
37 Streamer.add_socket(
38 "user",
39 %{transport_pid: task.pid, assigns: %{user: user}}
40 )
41
42 Streamer.stream("user", notify)
43 Task.await(task)
44 end
45
46 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
47 task =
48 Task.async(fn ->
49 assert_receive {:text, _}, 4_000
50 end)
51
52 Streamer.add_socket(
53 "user:notification",
54 %{transport_pid: task.pid, assigns: %{user: user}}
55 )
56
57 Streamer.stream("user:notification", notify)
58 Task.await(task)
59 end
60
61 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
62 user: user
63 } do
64 blocked = insert(:user)
65 {:ok, user} = User.block(user, blocked)
66
67 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
68
69 Streamer.add_socket(
70 "user:notification",
71 %{transport_pid: task.pid, assigns: %{user: user}}
72 )
73
74 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
75 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
76
77 Streamer.stream("user:notification", notif)
78 Task.await(task)
79 end
80
81 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
82 user: user
83 } do
84 user2 = insert(:user)
85 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
86
87 Streamer.add_socket(
88 "user:notification",
89 %{transport_pid: task.pid, assigns: %{user: user}}
90 )
91
92 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
93 {:ok, activity} = CommonAPI.add_mute(user, activity)
94 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
95 Streamer.stream("user:notification", notif)
96 Task.await(task)
97 end
98
99 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
100 user: user
101 } do
102 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
103 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
104
105 Streamer.add_socket(
106 "user:notification",
107 %{transport_pid: task.pid, assigns: %{user: user}}
108 )
109
110 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
111 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
112 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
113
114 Streamer.stream("user:notification", notif)
115 Task.await(task)
116 end
117 end
118
119 test "it sends to public" do
120 user = insert(:user)
121 other_user = insert(:user)
122
123 task =
124 Task.async(fn ->
125 assert_receive {:text, _}, 4_000
126 end)
127
128 fake_socket = %{
129 transport_pid: task.pid,
130 assigns: %{
131 user: user
132 }
133 }
134
135 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
136
137 topics = %{
138 "public" => [fake_socket]
139 }
140
141 Streamer.push_to_socket(topics, "public", activity)
142
143 Task.await(task)
144
145 task =
146 Task.async(fn ->
147 expected_event =
148 %{
149 "event" => "delete",
150 "payload" => activity.id
151 }
152 |> Jason.encode!()
153
154 assert_receive {:text, received_event}, 4_000
155 assert received_event == expected_event
156 end)
157
158 fake_socket = %{
159 transport_pid: task.pid,
160 assigns: %{
161 user: user
162 }
163 }
164
165 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
166
167 topics = %{
168 "public" => [fake_socket]
169 }
170
171 Streamer.push_to_socket(topics, "public", activity)
172
173 Task.await(task)
174 end
175
176 describe "thread_containment" do
177 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
178 Pleroma.Config.put([:instance, :skip_thread_containment], false)
179 author = insert(:user)
180 user = insert(:user, following: [author.ap_id])
181
182 activity =
183 insert(:note_activity,
184 note:
185 insert(:note,
186 user: author,
187 data: %{"to" => ["TEST-FFF"]}
188 )
189 )
190
191 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
192 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
193 topics = %{"public" => [fake_socket]}
194 Streamer.push_to_socket(topics, "public", activity)
195
196 Task.await(task)
197 end
198
199 test "it sends message if recipients invalid and thread containment is disabled" do
200 Pleroma.Config.put([:instance, :skip_thread_containment], true)
201 author = insert(:user)
202 user = insert(:user, following: [author.ap_id])
203
204 activity =
205 insert(:note_activity,
206 note:
207 insert(:note,
208 user: author,
209 data: %{"to" => ["TEST-FFF"]}
210 )
211 )
212
213 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
214 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
215 topics = %{"public" => [fake_socket]}
216 Streamer.push_to_socket(topics, "public", activity)
217
218 Task.await(task)
219 end
220
221 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
222 Pleroma.Config.put([:instance, :skip_thread_containment], false)
223 author = insert(:user)
224 user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
225
226 activity =
227 insert(:note_activity,
228 note:
229 insert(:note,
230 user: author,
231 data: %{"to" => ["TEST-FFF"]}
232 )
233 )
234
235 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
236 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
237 topics = %{"public" => [fake_socket]}
238 Streamer.push_to_socket(topics, "public", activity)
239
240 Task.await(task)
241 end
242 end
243
244 test "it doesn't send to blocked users" do
245 user = insert(:user)
246 blocked_user = insert(:user)
247 {:ok, user} = User.block(user, blocked_user)
248
249 task =
250 Task.async(fn ->
251 refute_receive {:text, _}, 1_000
252 end)
253
254 fake_socket = %{
255 transport_pid: task.pid,
256 assigns: %{
257 user: user
258 }
259 }
260
261 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
262
263 topics = %{
264 "public" => [fake_socket]
265 }
266
267 Streamer.push_to_socket(topics, "public", activity)
268
269 Task.await(task)
270 end
271
272 test "it doesn't send unwanted DMs to list" do
273 user_a = insert(:user)
274 user_b = insert(:user)
275 user_c = insert(:user)
276
277 {:ok, user_a} = User.follow(user_a, user_b)
278
279 {:ok, list} = List.create("Test", user_a)
280 {:ok, list} = List.follow(list, user_b)
281
282 task =
283 Task.async(fn ->
284 refute_receive {:text, _}, 1_000
285 end)
286
287 fake_socket = %{
288 transport_pid: task.pid,
289 assigns: %{
290 user: user_a
291 }
292 }
293
294 {:ok, activity} =
295 CommonAPI.post(user_b, %{
296 "status" => "@#{user_c.nickname} Test",
297 "visibility" => "direct"
298 })
299
300 topics = %{
301 "list:#{list.id}" => [fake_socket]
302 }
303
304 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
305
306 Task.await(task)
307 end
308
309 test "it doesn't send unwanted private posts to list" do
310 user_a = insert(:user)
311 user_b = insert(:user)
312
313 {:ok, list} = List.create("Test", user_a)
314 {:ok, list} = List.follow(list, user_b)
315
316 task =
317 Task.async(fn ->
318 refute_receive {:text, _}, 1_000
319 end)
320
321 fake_socket = %{
322 transport_pid: task.pid,
323 assigns: %{
324 user: user_a
325 }
326 }
327
328 {:ok, activity} =
329 CommonAPI.post(user_b, %{
330 "status" => "Test",
331 "visibility" => "private"
332 })
333
334 topics = %{
335 "list:#{list.id}" => [fake_socket]
336 }
337
338 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
339
340 Task.await(task)
341 end
342
343 test "it send wanted private posts to list" do
344 user_a = insert(:user)
345 user_b = insert(:user)
346
347 {:ok, user_a} = User.follow(user_a, user_b)
348
349 {:ok, list} = List.create("Test", user_a)
350 {:ok, list} = List.follow(list, user_b)
351
352 task =
353 Task.async(fn ->
354 assert_receive {:text, _}, 1_000
355 end)
356
357 fake_socket = %{
358 transport_pid: task.pid,
359 assigns: %{
360 user: user_a
361 }
362 }
363
364 {:ok, activity} =
365 CommonAPI.post(user_b, %{
366 "status" => "Test",
367 "visibility" => "private"
368 })
369
370 topics = %{
371 "list:#{list.id}" => [fake_socket]
372 }
373
374 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
375
376 Task.await(task)
377 end
378
379 test "it doesn't send muted reblogs" do
380 user1 = insert(:user)
381 user2 = insert(:user)
382 user3 = insert(:user)
383 CommonAPI.hide_reblogs(user1, user2)
384
385 task =
386 Task.async(fn ->
387 refute_receive {:text, _}, 1_000
388 end)
389
390 fake_socket = %{
391 transport_pid: task.pid,
392 assigns: %{
393 user: user1
394 }
395 }
396
397 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
398 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
399
400 topics = %{
401 "public" => [fake_socket]
402 }
403
404 Streamer.push_to_socket(topics, "public", announce_activity)
405
406 Task.await(task)
407 end
408
409 test "it doesn't send posts from muted threads" do
410 user = insert(:user)
411 user2 = insert(:user)
412 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
413
414 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
415
416 {:ok, activity} = CommonAPI.add_mute(user2, activity)
417
418 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
419
420 Streamer.add_socket(
421 "user",
422 %{transport_pid: task.pid, assigns: %{user: user2}}
423 )
424
425 Streamer.stream("user", activity)
426 Task.await(task)
427 end
428
429 describe "direct streams" do
430 setup do
431 GenServer.start(Streamer, %{}, name: Streamer)
432
433 on_exit(fn ->
434 if pid = Process.whereis(Streamer) do
435 Process.exit(pid, :kill)
436 end
437 end)
438
439 :ok
440 end
441
442 test "it sends conversation update to the 'direct' stream", %{} do
443 user = insert(:user)
444 another_user = insert(:user)
445
446 task =
447 Task.async(fn ->
448 assert_receive {:text, _received_event}, 4_000
449 end)
450
451 Streamer.add_socket(
452 "direct",
453 %{transport_pid: task.pid, assigns: %{user: user}}
454 )
455
456 {:ok, _create_activity} =
457 CommonAPI.post(another_user, %{
458 "status" => "hey @#{user.nickname}",
459 "visibility" => "direct"
460 })
461
462 Task.await(task)
463 end
464
465 test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
466 user = insert(:user)
467 another_user = insert(:user)
468
469 {:ok, create_activity} =
470 CommonAPI.post(another_user, %{
471 "status" => "hi @#{user.nickname}",
472 "visibility" => "direct"
473 })
474
475 task =
476 Task.async(fn ->
477 assert_receive {:text, received_event}, 4_000
478 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
479
480 refute_receive {:text, _}, 4_000
481 end)
482
483 Streamer.add_socket(
484 "direct",
485 %{transport_pid: task.pid, assigns: %{user: user}}
486 )
487
488 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
489
490 Task.await(task)
491 end
492
493 test "it sends conversation update to the 'direct' stream when a message is deleted" do
494 user = insert(:user)
495 another_user = insert(:user)
496
497 {:ok, create_activity} =
498 CommonAPI.post(another_user, %{
499 "status" => "hi @#{user.nickname}",
500 "visibility" => "direct"
501 })
502
503 {:ok, create_activity2} =
504 CommonAPI.post(another_user, %{
505 "status" => "hi @#{user.nickname}",
506 "in_reply_to_status_id" => create_activity.id,
507 "visibility" => "direct"
508 })
509
510 task =
511 Task.async(fn ->
512 assert_receive {:text, received_event}, 4_000
513 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
514
515 assert_receive {:text, received_event}, 4_000
516
517 assert %{"event" => "conversation", "payload" => received_payload} =
518 Jason.decode!(received_event)
519
520 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
521 assert last_status["id"] == to_string(create_activity.id)
522 end)
523
524 Streamer.add_socket(
525 "direct",
526 %{transport_pid: task.pid, assigns: %{user: user}}
527 )
528
529 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
530
531 Task.await(task)
532 end
533 end
534 end