1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.StreamerTest do
10 alias Pleroma.Conversation.Participation
13 alias Pleroma.Web.CommonAPI
14 alias Pleroma.Web.Streamer
15 alias Pleroma.Web.Streamer.StreamerSocket
16 alias Pleroma.Web.Streamer.Worker
18 @moduletag needs_streamer: true, capture_log: true
21 @streamer_start_wait 10
23 clear_config([:instance, :skip_thread_containment])
25 describe "user streams" do
28 notify = insert(:notification, user: user, activity: build(:note_activity))
29 {:ok, %{user: user, notify: notify}}
32 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
35 assert_receive {:text, _}, @streamer_timeout
40 %{transport_pid: task.pid, assigns: %{user: user}}
43 Streamer.stream("user", notify)
47 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
50 assert_receive {:text, _}, @streamer_timeout
55 %{transport_pid: task.pid, assigns: %{user: user}}
58 Streamer.stream("user:notification", notify)
62 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
65 blocked = insert(:user)
66 {:ok, _user_relationship} = User.block(user, blocked)
68 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
69 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
71 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
75 %{transport_pid: task.pid, assigns: %{user: user}}
78 Streamer.stream("user:notification", notif)
82 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
87 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
88 {:ok, activity} = CommonAPI.add_mute(user, activity)
89 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
91 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
95 %{transport_pid: task.pid, assigns: %{user: user}}
98 Streamer.stream("user:notification", notif)
102 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
105 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
107 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
108 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
109 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
111 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
115 %{transport_pid: task.pid, assigns: %{user: user}}
118 Streamer.stream("user:notification", notif)
122 test "it sends follow activities to the 'user:notification' stream", %{
125 user2 = insert(:user)
126 task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
128 Process.sleep(@streamer_start_wait)
132 %{transport_pid: task.pid, assigns: %{user: user}}
135 {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
137 # We don't directly pipe the notification to the streamer as it's already
138 # generated as a side effect of CommonAPI.follow().
143 test "it sends to public" do
145 other_user = insert(:user)
149 assert_receive {:text, _}, @streamer_timeout
152 fake_socket = %StreamerSocket{
153 transport_pid: task.pid,
157 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
160 "public" => [fake_socket]
163 Worker.push_to_socket(topics, "public", activity)
172 "payload" => activity.id
176 assert_receive {:text, received_event}, @streamer_timeout
177 assert received_event == expected_event
180 fake_socket = %StreamerSocket{
181 transport_pid: task.pid,
185 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
188 "public" => [fake_socket]
191 Worker.push_to_socket(topics, "public", activity)
196 describe "thread_containment" do
197 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
198 Pleroma.Config.put([:instance, :skip_thread_containment], false)
199 author = insert(:user)
201 User.follow(user, author, "accept")
204 insert(:note_activity,
208 data: %{"to" => ["TEST-FFF"]}
212 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
213 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
214 topics = %{"public" => [fake_socket]}
215 Worker.push_to_socket(topics, "public", activity)
220 test "it sends message if recipients invalid and thread containment is disabled" do
221 Pleroma.Config.put([:instance, :skip_thread_containment], true)
222 author = insert(:user)
224 User.follow(user, author, "accept")
227 insert(:note_activity,
231 data: %{"to" => ["TEST-FFF"]}
235 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
236 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
237 topics = %{"public" => [fake_socket]}
238 Worker.push_to_socket(topics, "public", activity)
243 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
244 Pleroma.Config.put([:instance, :skip_thread_containment], false)
245 author = insert(:user)
246 user = insert(:user, skip_thread_containment: true)
247 User.follow(user, author, "accept")
250 insert(:note_activity,
254 data: %{"to" => ["TEST-FFF"]}
258 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
259 fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
260 topics = %{"public" => [fake_socket]}
261 Worker.push_to_socket(topics, "public", activity)
268 test "it doesn't send messages involving blocked users" do
270 blocked_user = insert(:user)
271 {:ok, _user_relationship} = User.block(user, blocked_user)
273 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
277 refute_receive {:text, _}, 1_000
280 fake_socket = %StreamerSocket{
281 transport_pid: task.pid,
286 "public" => [fake_socket]
289 Worker.push_to_socket(topics, "public", activity)
294 test "it doesn't send messages transitively involving blocked users" do
295 blocker = insert(:user)
296 blockee = insert(:user)
297 friend = insert(:user)
301 refute_receive {:text, _}, 1_000
304 fake_socket = %StreamerSocket{
305 transport_pid: task.pid,
310 "public" => [fake_socket]
313 {:ok, _user_relationship} = User.block(blocker, blockee)
315 {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
317 Worker.push_to_socket(topics, "public", activity_one)
319 {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
321 Worker.push_to_socket(topics, "public", activity_two)
323 {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
325 Worker.push_to_socket(topics, "public", activity_three)
331 test "it doesn't send unwanted DMs to list" do
332 user_a = insert(:user)
333 user_b = insert(:user)
334 user_c = insert(:user)
336 {:ok, user_a} = User.follow(user_a, user_b)
338 {:ok, list} = List.create("Test", user_a)
339 {:ok, list} = List.follow(list, user_b)
342 CommonAPI.post(user_b, %{
343 "status" => "@#{user_c.nickname} Test",
344 "visibility" => "direct"
349 refute_receive {:text, _}, 1_000
352 fake_socket = %StreamerSocket{
353 transport_pid: task.pid,
358 "list:#{list.id}" => [fake_socket]
361 Worker.handle_call({:stream, "list", activity}, self(), topics)
366 test "it doesn't send unwanted private posts to list" do
367 user_a = insert(:user)
368 user_b = insert(:user)
370 {:ok, list} = List.create("Test", user_a)
371 {:ok, list} = List.follow(list, user_b)
374 CommonAPI.post(user_b, %{
376 "visibility" => "private"
381 refute_receive {:text, _}, 1_000
384 fake_socket = %StreamerSocket{
385 transport_pid: task.pid,
390 "list:#{list.id}" => [fake_socket]
393 Worker.handle_call({:stream, "list", activity}, self(), topics)
398 test "it sends wanted private posts to list" do
399 user_a = insert(:user)
400 user_b = insert(:user)
402 {:ok, user_a} = User.follow(user_a, user_b)
404 {:ok, list} = List.create("Test", user_a)
405 {:ok, list} = List.follow(list, user_b)
408 CommonAPI.post(user_b, %{
410 "visibility" => "private"
415 assert_receive {:text, _}, 1_000
418 fake_socket = %StreamerSocket{
419 transport_pid: task.pid,
428 Worker.handle_call({:stream, "list", activity}, self(), %{})
433 test "it doesn't send muted reblogs" do
434 user1 = insert(:user)
435 user2 = insert(:user)
436 user3 = insert(:user)
437 CommonAPI.hide_reblogs(user1, user2)
439 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
440 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
444 refute_receive {:text, _}, 1_000
447 fake_socket = %StreamerSocket{
448 transport_pid: task.pid,
453 "public" => [fake_socket]
456 Worker.push_to_socket(topics, "public", announce_activity)
461 test "it does send non-reblog notification for reblog-muted actors" do
462 user1 = insert(:user)
463 user2 = insert(:user)
464 user3 = insert(:user)
465 CommonAPI.hide_reblogs(user1, user2)
467 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
468 {:ok, favorite_activity, _} = CommonAPI.favorite(create_activity.id, user2)
472 assert_receive {:text, _}, 1_000
475 fake_socket = %StreamerSocket{
476 transport_pid: task.pid,
481 "public" => [fake_socket]
484 Worker.push_to_socket(topics, "public", favorite_activity)
489 test "it doesn't send posts from muted threads" do
491 user2 = insert(:user)
492 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
494 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
496 {:ok, activity} = CommonAPI.add_mute(user2, activity)
498 task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
502 %{transport_pid: task.pid, assigns: %{user: user2}}
505 Streamer.stream("user", activity)
509 describe "direct streams" do
514 test "it sends conversation update to the 'direct' stream", %{} do
516 another_user = insert(:user)
520 assert_receive {:text, received_event}, @streamer_timeout
522 assert %{"event" => "conversation", "payload" => received_payload} =
523 Jason.decode!(received_event)
525 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
526 [participation] = Participation.for_user(user)
527 assert last_status["pleroma"]["direct_conversation_id"] == participation.id
532 %{transport_pid: task.pid, assigns: %{user: user}}
535 {:ok, _create_activity} =
536 CommonAPI.post(another_user, %{
537 "status" => "hey @#{user.nickname}",
538 "visibility" => "direct"
544 test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
546 another_user = insert(:user)
548 {:ok, create_activity} =
549 CommonAPI.post(another_user, %{
550 "status" => "hi @#{user.nickname}",
551 "visibility" => "direct"
556 assert_receive {:text, received_event}, @streamer_timeout
557 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
559 refute_receive {:text, _}, @streamer_timeout
562 Process.sleep(@streamer_start_wait)
566 %{transport_pid: task.pid, assigns: %{user: user}}
569 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
574 test "it sends conversation update to the 'direct' stream when a message is deleted" do
576 another_user = insert(:user)
578 {:ok, create_activity} =
579 CommonAPI.post(another_user, %{
580 "status" => "hi @#{user.nickname}",
581 "visibility" => "direct"
584 {:ok, create_activity2} =
585 CommonAPI.post(another_user, %{
586 "status" => "hi @#{user.nickname}",
587 "in_reply_to_status_id" => create_activity.id,
588 "visibility" => "direct"
593 assert_receive {:text, received_event}, @streamer_timeout
594 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
596 assert_receive {:text, received_event}, @streamer_timeout
598 assert %{"event" => "conversation", "payload" => received_payload} =
599 Jason.decode!(received_event)
601 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
602 assert last_status["id"] == to_string(create_activity.id)
605 Process.sleep(@streamer_start_wait)
609 %{transport_pid: task.pid, assigns: %{user: user}}
612 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)