1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.StreamerTest do
10 alias Pleroma.Web.CommonAPI
11 alias Pleroma.Web.Streamer
12 import Pleroma.Factory
14 clear_config_all([:instance, :skip_thread_containment])
16 describe "user streams" do
18 GenServer.start(Streamer, %{}, name: Streamer)
21 if pid = Process.whereis(Streamer) do
22 Process.exit(pid, :kill)
27 notify = insert(:notification, user: user, activity: build(:note_activity))
28 {:ok, %{user: user, notify: notify}}
31 test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
34 assert_receive {:text, _}, 4_000
39 %{transport_pid: task.pid, assigns: %{user: user}}
42 Streamer.stream("user", notify)
46 test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
49 assert_receive {:text, _}, 4_000
54 %{transport_pid: task.pid, assigns: %{user: user}}
57 Streamer.stream("user:notification", notify)
61 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
64 blocked = insert(:user)
65 {:ok, user} = User.block(user, blocked)
67 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
71 %{transport_pid: task.pid, assigns: %{user: user}}
74 {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
75 {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
77 Streamer.stream("user:notification", notif)
81 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
85 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
89 %{transport_pid: task.pid, assigns: %{user: user}}
92 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
93 {:ok, activity} = CommonAPI.add_mute(user, activity)
94 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
95 Streamer.stream("user:notification", notif)
99 test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
102 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
103 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
107 %{transport_pid: task.pid, assigns: %{user: user}}
110 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
111 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
112 {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
114 Streamer.stream("user:notification", notif)
119 test "it sends to public" do
121 other_user = insert(:user)
125 assert_receive {:text, _}, 4_000
129 transport_pid: task.pid,
135 {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
138 "public" => [fake_socket]
141 Streamer.push_to_socket(topics, "public", activity)
150 "payload" => activity.id
154 assert_receive {:text, received_event}, 4_000
155 assert received_event == expected_event
159 transport_pid: task.pid,
165 {:ok, activity} = CommonAPI.delete(activity.id, other_user)
168 "public" => [fake_socket]
171 Streamer.push_to_socket(topics, "public", activity)
176 describe "thread_containment" do
177 test "it doesn't send to user if recipients invalid and thread containment is enabled" do
178 Pleroma.Config.put([:instance, :skip_thread_containment], false)
179 author = insert(:user)
180 user = insert(:user, following: [author.ap_id])
183 insert(:note_activity,
187 data: %{"to" => ["TEST-FFF"]}
191 task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
192 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
193 topics = %{"public" => [fake_socket]}
194 Streamer.push_to_socket(topics, "public", activity)
199 test "it sends message if recipients invalid and thread containment is disabled" do
200 Pleroma.Config.put([:instance, :skip_thread_containment], true)
201 author = insert(:user)
202 user = insert(:user, following: [author.ap_id])
205 insert(:note_activity,
209 data: %{"to" => ["TEST-FFF"]}
213 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
214 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
215 topics = %{"public" => [fake_socket]}
216 Streamer.push_to_socket(topics, "public", activity)
221 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
222 Pleroma.Config.put([:instance, :skip_thread_containment], false)
223 author = insert(:user)
224 user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
227 insert(:note_activity,
231 data: %{"to" => ["TEST-FFF"]}
235 task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
236 fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
237 topics = %{"public" => [fake_socket]}
238 Streamer.push_to_socket(topics, "public", activity)
244 test "it doesn't send to blocked users" do
246 blocked_user = insert(:user)
247 {:ok, user} = User.block(user, blocked_user)
251 refute_receive {:text, _}, 1_000
255 transport_pid: task.pid,
261 {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
264 "public" => [fake_socket]
267 Streamer.push_to_socket(topics, "public", activity)
272 test "it doesn't send unwanted DMs to list" do
273 user_a = insert(:user)
274 user_b = insert(:user)
275 user_c = insert(:user)
277 {:ok, user_a} = User.follow(user_a, user_b)
279 {:ok, list} = List.create("Test", user_a)
280 {:ok, list} = List.follow(list, user_b)
284 refute_receive {:text, _}, 1_000
288 transport_pid: task.pid,
295 CommonAPI.post(user_b, %{
296 "status" => "@#{user_c.nickname} Test",
297 "visibility" => "direct"
301 "list:#{list.id}" => [fake_socket]
304 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
309 test "it doesn't send unwanted private posts to list" do
310 user_a = insert(:user)
311 user_b = insert(:user)
313 {:ok, list} = List.create("Test", user_a)
314 {:ok, list} = List.follow(list, user_b)
318 refute_receive {:text, _}, 1_000
322 transport_pid: task.pid,
329 CommonAPI.post(user_b, %{
331 "visibility" => "private"
335 "list:#{list.id}" => [fake_socket]
338 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
343 test "it send wanted private posts to list" do
344 user_a = insert(:user)
345 user_b = insert(:user)
347 {:ok, user_a} = User.follow(user_a, user_b)
349 {:ok, list} = List.create("Test", user_a)
350 {:ok, list} = List.follow(list, user_b)
354 assert_receive {:text, _}, 1_000
358 transport_pid: task.pid,
365 CommonAPI.post(user_b, %{
367 "visibility" => "private"
371 "list:#{list.id}" => [fake_socket]
374 Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
379 test "it doesn't send muted reblogs" do
380 user1 = insert(:user)
381 user2 = insert(:user)
382 user3 = insert(:user)
383 CommonAPI.hide_reblogs(user1, user2)
387 refute_receive {:text, _}, 1_000
391 transport_pid: task.pid,
397 {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
398 {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
401 "public" => [fake_socket]
404 Streamer.push_to_socket(topics, "public", announce_activity)
409 test "it doesn't send posts from muted threads" do
411 user2 = insert(:user)
412 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
414 {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
416 {:ok, activity} = CommonAPI.add_mute(user2, activity)
418 task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
422 %{transport_pid: task.pid, assigns: %{user: user2}}
425 Streamer.stream("user", activity)
429 describe "direct streams" do
431 GenServer.start(Streamer, %{}, name: Streamer)
434 if pid = Process.whereis(Streamer) do
435 Process.exit(pid, :kill)
442 test "it sends conversation update to the 'direct' stream", %{} do
444 another_user = insert(:user)
448 assert_receive {:text, _received_event}, 4_000
453 %{transport_pid: task.pid, assigns: %{user: user}}
456 {:ok, _create_activity} =
457 CommonAPI.post(another_user, %{
458 "status" => "hey @#{user.nickname}",
459 "visibility" => "direct"
465 test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
467 another_user = insert(:user)
469 {:ok, create_activity} =
470 CommonAPI.post(another_user, %{
471 "status" => "hi @#{user.nickname}",
472 "visibility" => "direct"
477 assert_receive {:text, received_event}, 4_000
478 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
480 refute_receive {:text, _}, 4_000
485 %{transport_pid: task.pid, assigns: %{user: user}}
488 {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
493 test "it sends conversation update to the 'direct' stream when a message is deleted" do
495 another_user = insert(:user)
497 {:ok, create_activity} =
498 CommonAPI.post(another_user, %{
499 "status" => "hi @#{user.nickname}",
500 "visibility" => "direct"
503 {:ok, create_activity2} =
504 CommonAPI.post(another_user, %{
505 "status" => "hi @#{user.nickname}",
506 "in_reply_to_status_id" => create_activity.id,
507 "visibility" => "direct"
512 assert_receive {:text, received_event}, 4_000
513 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
515 assert_receive {:text, received_event}, 4_000
517 assert %{"event" => "conversation", "payload" => received_payload} =
518 Jason.decode!(received_event)
520 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
521 assert last_status["id"] == to_string(create_activity.id)
526 %{transport_pid: task.pid, assigns: %{user: user}}
529 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)