d47b37efb9815ca9142da160d3eca94b7703978c
[akkoma] / test / web / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6 use Pleroma.DataCase
7
8 alias Pleroma.List
9 alias Pleroma.User
10 alias Pleroma.Web.CommonAPI
11 alias Pleroma.Web.Streamer
12 import Pleroma.Factory
13
14 setup do
15 skip_thread_containment = Pleroma.Config.get([:instance, :skip_thread_containment])
16
17 on_exit(fn ->
18 Pleroma.Config.put([:instance, :skip_thread_containment], skip_thread_containment)
19 end)
20
21 :ok
22 end
23
24 describe "user streams" do
25 setup do
26 GenServer.start(Streamer, %{}, name: Streamer)
27
28 on_exit(fn ->
29 if pid = Process.whereis(Streamer) do
30 Process.exit(pid, :kill)
31 end
32 end)
33
34 user = insert(:user)
35 notify = insert(:notification, user: user, activity: build(:note_activity))
36 {:ok, %{user: user, notify: notify}}
37 end
38
39 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
40 task =
41 Task.async(fn ->
42 assert_receive {:text, _}, 4_000
43 end)
44
45 Streamer.add_socket(
46 "user",
47 %{transport_pid: task.pid, assigns: %{user: user}}
48 )
49
50 Streamer.stream("user", notify)
51 Task.await(task)
52 end
53
54 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
55 task =
56 Task.async(fn ->
57 assert_receive {:text, _}, 4_000
58 end)
59
60 Streamer.add_socket(
61 "user:notification",
62 %{transport_pid: task.pid, assigns: %{user: user}}
63 )
64
65 Streamer.stream("user:notification", notify)
66 Task.await(task)
67 end
68
69 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
70 user: user
71 } do
72 blocked = insert(:user)
73 {:ok, user} = User.block(user, blocked)
74
75 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
76
77 Streamer.add_socket(
78 "user:notification",
79 %{transport_pid: task.pid, assigns: %{user: user}}
80 )
81
82 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
83 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
84
85 Streamer.stream("user:notification", notif)
86 Task.await(task)
87 end
88
89 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
90 user: user
91 } do
92 user2 = insert(:user)
93 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
94
95 Streamer.add_socket(
96 "user:notification",
97 %{transport_pid: task.pid, assigns: %{user: user}}
98 )
99
100 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
101 {:ok, activity} = CommonAPI.add_mute(user, activity)
102 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
103 Streamer.stream("user:notification", notif)
104 Task.await(task)
105 end
106
107 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
108 user: user
109 } do
110 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
111 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
112
113 Streamer.add_socket(
114 "user:notification",
115 %{transport_pid: task.pid, assigns: %{user: user}}
116 )
117
118 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
119 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
120 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
121
122 Streamer.stream("user:notification", notif)
123 Task.await(task)
124 end
125 end
126
127 test "it sends to public" do
128 user = insert(:user)
129 other_user = insert(:user)
130
131 task =
132 Task.async(fn ->
133 assert_receive {:text, _}, 4_000
134 end)
135
136 fake_socket = %{
137 transport_pid: task.pid,
138 assigns: %{
139 user: user
140 }
141 }
142
143 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
144
145 topics = %{
146 "public" => [fake_socket]
147 }
148
149 Streamer.push_to_socket(topics, "public", activity)
150
151 Task.await(task)
152
153 task =
154 Task.async(fn ->
155 expected_event =
156 %{
157 "event" => "delete",
158 "payload" => activity.id
159 }
160 |> Jason.encode!()
161
162 assert_receive {:text, received_event}, 4_000
163 assert received_event == expected_event
164 end)
165
166 fake_socket = %{
167 transport_pid: task.pid,
168 assigns: %{
169 user: user
170 }
171 }
172
173 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
174
175 topics = %{
176 "public" => [fake_socket]
177 }
178
179 Streamer.push_to_socket(topics, "public", activity)
180
181 Task.await(task)
182 end
183
184 describe "thread_containment" do
185 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
186 Pleroma.Config.put([:instance, :skip_thread_containment], false)
187 author = insert(:user)
188 user = insert(:user, following: [author.ap_id])
189
190 activity =
191 insert(:note_activity,
192 note:
193 insert(:note,
194 user: author,
195 data: %{"to" => ["TEST-FFF"]}
196 )
197 )
198
199 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
200 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
201 topics = %{"public" => [fake_socket]}
202 Streamer.push_to_socket(topics, "public", activity)
203
204 Task.await(task)
205 end
206
207 test "it sends message if recipients invalid and thread containment is disabled" do
208 Pleroma.Config.put([:instance, :skip_thread_containment], true)
209 author = insert(:user)
210 user = insert(:user, following: [author.ap_id])
211
212 activity =
213 insert(:note_activity,
214 note:
215 insert(:note,
216 user: author,
217 data: %{"to" => ["TEST-FFF"]}
218 )
219 )
220
221 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
222 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
223 topics = %{"public" => [fake_socket]}
224 Streamer.push_to_socket(topics, "public", activity)
225
226 Task.await(task)
227 end
228
229 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
230 Pleroma.Config.put([:instance, :skip_thread_containment], false)
231 author = insert(:user)
232 user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
233
234 activity =
235 insert(:note_activity,
236 note:
237 insert(:note,
238 user: author,
239 data: %{"to" => ["TEST-FFF"]}
240 )
241 )
242
243 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
244 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
245 topics = %{"public" => [fake_socket]}
246 Streamer.push_to_socket(topics, "public", activity)
247
248 Task.await(task)
249 end
250 end
251
252 test "it doesn't send to blocked users" do
253 user = insert(:user)
254 blocked_user = insert(:user)
255 {:ok, user} = User.block(user, blocked_user)
256
257 task =
258 Task.async(fn ->
259 refute_receive {:text, _}, 1_000
260 end)
261
262 fake_socket = %{
263 transport_pid: task.pid,
264 assigns: %{
265 user: user
266 }
267 }
268
269 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
270
271 topics = %{
272 "public" => [fake_socket]
273 }
274
275 Streamer.push_to_socket(topics, "public", activity)
276
277 Task.await(task)
278 end
279
280 test "it doesn't send unwanted DMs to list" do
281 user_a = insert(:user)
282 user_b = insert(:user)
283 user_c = insert(:user)
284
285 {:ok, user_a} = User.follow(user_a, user_b)
286
287 {:ok, list} = List.create("Test", user_a)
288 {:ok, list} = List.follow(list, user_b)
289
290 task =
291 Task.async(fn ->
292 refute_receive {:text, _}, 1_000
293 end)
294
295 fake_socket = %{
296 transport_pid: task.pid,
297 assigns: %{
298 user: user_a
299 }
300 }
301
302 {:ok, activity} =
303 CommonAPI.post(user_b, %{
304 "status" => "@#{user_c.nickname} Test",
305 "visibility" => "direct"
306 })
307
308 topics = %{
309 "list:#{list.id}" => [fake_socket]
310 }
311
312 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
313
314 Task.await(task)
315 end
316
317 test "it doesn't send unwanted private posts to list" do
318 user_a = insert(:user)
319 user_b = insert(:user)
320
321 {:ok, list} = List.create("Test", user_a)
322 {:ok, list} = List.follow(list, user_b)
323
324 task =
325 Task.async(fn ->
326 refute_receive {:text, _}, 1_000
327 end)
328
329 fake_socket = %{
330 transport_pid: task.pid,
331 assigns: %{
332 user: user_a
333 }
334 }
335
336 {:ok, activity} =
337 CommonAPI.post(user_b, %{
338 "status" => "Test",
339 "visibility" => "private"
340 })
341
342 topics = %{
343 "list:#{list.id}" => [fake_socket]
344 }
345
346 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
347
348 Task.await(task)
349 end
350
351 test "it send wanted private posts to list" do
352 user_a = insert(:user)
353 user_b = insert(:user)
354
355 {:ok, user_a} = User.follow(user_a, user_b)
356
357 {:ok, list} = List.create("Test", user_a)
358 {:ok, list} = List.follow(list, user_b)
359
360 task =
361 Task.async(fn ->
362 assert_receive {:text, _}, 1_000
363 end)
364
365 fake_socket = %{
366 transport_pid: task.pid,
367 assigns: %{
368 user: user_a
369 }
370 }
371
372 {:ok, activity} =
373 CommonAPI.post(user_b, %{
374 "status" => "Test",
375 "visibility" => "private"
376 })
377
378 topics = %{
379 "list:#{list.id}" => [fake_socket]
380 }
381
382 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
383
384 Task.await(task)
385 end
386
387 test "it doesn't send muted reblogs" do
388 user1 = insert(:user)
389 user2 = insert(:user)
390 user3 = insert(:user)
391 CommonAPI.hide_reblogs(user1, user2)
392
393 task =
394 Task.async(fn ->
395 refute_receive {:text, _}, 1_000
396 end)
397
398 fake_socket = %{
399 transport_pid: task.pid,
400 assigns: %{
401 user: user1
402 }
403 }
404
405 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
406 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
407
408 topics = %{
409 "public" => [fake_socket]
410 }
411
412 Streamer.push_to_socket(topics, "public", announce_activity)
413
414 Task.await(task)
415 end
416
417 describe "direct streams" do
418 setup do
419 GenServer.start(Streamer, %{}, name: Streamer)
420
421 on_exit(fn ->
422 if pid = Process.whereis(Streamer) do
423 Process.exit(pid, :kill)
424 end
425 end)
426
427 :ok
428 end
429
430 test "it sends conversation update to the 'direct' stream", %{} do
431 user = insert(:user)
432 another_user = insert(:user)
433
434 task =
435 Task.async(fn ->
436 assert_receive {:text, _received_event}, 4_000
437 end)
438
439 Streamer.add_socket(
440 "direct",
441 %{transport_pid: task.pid, assigns: %{user: user}}
442 )
443
444 {:ok, _create_activity} =
445 CommonAPI.post(another_user, %{
446 "status" => "hey @#{user.nickname}",
447 "visibility" => "direct"
448 })
449
450 Task.await(task)
451 end
452
453 test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
454 user = insert(:user)
455 another_user = insert(:user)
456
457 {:ok, create_activity} =
458 CommonAPI.post(another_user, %{
459 "status" => "hi @#{user.nickname}",
460 "visibility" => "direct"
461 })
462
463 task =
464 Task.async(fn ->
465 assert_receive {:text, received_event}, 4_000
466 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
467
468 refute_receive {:text, _}, 4_000
469 end)
470
471 Streamer.add_socket(
472 "direct",
473 %{transport_pid: task.pid, assigns: %{user: user}}
474 )
475
476 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
477
478 Task.await(task)
479 end
480
481 test "it sends conversation update to the 'direct' stream when a message is deleted" do
482 user = insert(:user)
483 another_user = insert(:user)
484
485 {:ok, create_activity} =
486 CommonAPI.post(another_user, %{
487 "status" => "hi @#{user.nickname}",
488 "visibility" => "direct"
489 })
490
491 {:ok, create_activity2} =
492 CommonAPI.post(another_user, %{
493 "status" => "hi @#{user.nickname}",
494 "in_reply_to_status_id" => create_activity.id,
495 "visibility" => "direct"
496 })
497
498 task =
499 Task.async(fn ->
500 assert_receive {:text, received_event}, 4_000
501 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
502
503 assert_receive {:text, received_event}, 4_000
504
505 assert %{"event" => "conversation", "payload" => received_payload} =
506 Jason.decode!(received_event)
507
508 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
509 assert last_status["id"] == to_string(create_activity.id)
510 end)
511
512 Streamer.add_socket(
513 "direct",
514 %{transport_pid: task.pid, assigns: %{user: user}}
515 )
516
517 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
518
519 Task.await(task)
520 end
521 end
522 end