X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=test%2Fpleroma%2Fweb%2Fstreamer_test.exs;h=b07c16faa1aba84525e6d1daf690e77ca7cbde2f;hb=d55de5debf3645d97ec792e669f1249e62e07191;hp=b788a913861b56cceac3d10604b4f8cf9ed557f3;hpb=762be6ce10d2145e8e31d42c5d1a0bab93dbe7b0;p=akkoma diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index b788a9138..b07c16faa 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -3,19 +3,15 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.StreamerTest do - use Pleroma.DataCase + use Pleroma.DataCase, async: false import Pleroma.Factory - alias Pleroma.Chat - alias Pleroma.Chat.MessageReference alias Pleroma.Conversation.Participation alias Pleroma.List - alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.CommonAPI alias Pleroma.Web.Streamer - alias Pleroma.Web.StreamerView @moduletag needs_streamer: true, capture_log: true @@ -80,20 +76,16 @@ defmodule Pleroma.Web.StreamerTest do expected_user_topic = "user:#{user.id}" expected_notification_topic = "user:notification:#{user.id}" expected_direct_topic = "direct:#{user.id}" - expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}" for valid_user_token <- [read_oauth_token, read_statuses_token] do assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token) assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user, valid_user_token) - - assert {:ok, ^expected_pleroma_chat_topic} = - Streamer.get_topic("user:pleroma_chat", user, valid_user_token) end for invalid_user_token <- [read_notifications_token, badly_scoped_token], - user_topic <- ["user", "direct", "user:pleroma_chat"] do + user_topic <- ["user", "direct"] do assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token) end @@ -165,7 +157,8 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user", user, oauth_token) {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) - assert_receive {:render_with_user, _, _, ^activity} + stream_name = "user:#{user.id}" + assert_receive {:render_with_user, _, _, ^activity, ^stream_name} refute Streamer.filtered_by_user?(user, activity) end @@ -176,7 +169,11 @@ defmodule Pleroma.Web.StreamerTest do {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, announce} = CommonAPI.repeat(activity.id, user) - assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} + stream_name = "user:#{user.id}" + + assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, + ^stream_name} + refute Streamer.filtered_by_user?(user, announce) end @@ -229,7 +226,11 @@ defmodule Pleroma.Web.StreamerTest do {:ok, %Pleroma.Activity{data: _data, local: false} = announce} = Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data) - assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} + stream_name = "user:#{user.id}" + + assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, + ^stream_name} + refute Streamer.filtered_by_user?(user, announce) end @@ -241,7 +242,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.stream("user", notify) - assert_receive {:render_with_user, _, _, ^notify} + assert_receive {:render_with_user, _, _, ^notify, "user"} refute Streamer.filtered_by_user?(user, notify) end @@ -253,67 +254,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.stream("user:notification", notify) - assert_receive {:render_with_user, _, _, ^notify} - refute Streamer.filtered_by_user?(user, notify) - end - - test "it sends chat messages to the 'user:pleroma_chat' stream", %{ - user: user, - token: oauth_token - } do - other_user = insert(:user) - - {:ok, create_activity} = - CommonAPI.post_chat_message(other_user, user, "hey cirno", idempotency_key: "123") - - object = Object.normalize(create_activity, fetch: false) - chat = Chat.get(user.id, other_user.ap_id) - cm_ref = MessageReference.for_chat_and_object(chat, object) - cm_ref = %{cm_ref | chat: chat, object: object} - - Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token) - Streamer.stream("user:pleroma_chat", {user, cm_ref}) - - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) - - assert text =~ "hey cirno" - assert_receive {:text, ^text} - end - - test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do - other_user = insert(:user) - - {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") - object = Object.normalize(create_activity, fetch: false) - chat = Chat.get(user.id, other_user.ap_id) - cm_ref = MessageReference.for_chat_and_object(chat, object) - cm_ref = %{cm_ref | chat: chat, object: object} - - Streamer.get_topic_and_add_socket("user", user, oauth_token) - Streamer.stream("user", {user, cm_ref}) - - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) - - assert text =~ "hey cirno" - assert_receive {:text, ^text} - end - - test "it sends chat message notifications to the 'user:notification' stream", %{ - user: user, - token: oauth_token - } do - other_user = insert(:user) - - {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") - - notify = - Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id) - |> Repo.preload(:activity) - - Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) - Streamer.stream("user:notification", notify) - - assert_receive {:render_with_user, _, _, ^notify} + assert_receive {:render_with_user, _, _, ^notify, "user:notification"} refute Streamer.filtered_by_user?(user, notify) end @@ -359,7 +300,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, "user:notification"} assert notif.activity.id == favorite_activity.id refute Streamer.filtered_by_user?(user, notif) end @@ -388,7 +329,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, "user:notification"} assert notif.activity.id == follow_activity.id refute Streamer.filtered_by_user?(user, notif) end @@ -442,6 +383,63 @@ defmodule Pleroma.Web.StreamerTest do "state" => "follow_accept" } = Jason.decode!(payload) end + + test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} do + sender = insert(:user) + {:ok, _, _, _} = CommonAPI.follow(user, sender) + + {:ok, activity} = CommonAPI.post(sender, %{status: "hey"}) + + Streamer.get_topic_and_add_socket("user", user, oauth_token) + {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"}) + create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"]) + + stream = "user:#{user.id}" + assert_receive {:render_with_user, _, "status_update.json", ^create, ^stream} + refute Streamer.filtered_by_user?(user, edited) + end + + test "it streams own edits in the 'user' stream", %{user: user, token: oauth_token} do + {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) + + Streamer.get_topic_and_add_socket("user", user, oauth_token) + {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"}) + create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"]) + + stream = "user:#{user.id}" + assert_receive {:render_with_user, _, "status_update.json", ^create, ^stream} + refute Streamer.filtered_by_user?(user, edited) + end + + test "it streams posts containing followed hashtags on the 'user' stream", %{ + user: user, + token: oauth_token + } do + hashtag = insert(:hashtag, %{name: "tenshi"}) + other_user = insert(:user) + {:ok, user} = User.follow_hashtag(user, hashtag) + + Streamer.get_topic_and_add_socket("user", user, oauth_token) + {:ok, activity} = CommonAPI.post(other_user, %{status: "hey #tenshi"}) + + assert_receive {:render_with_user, _, "update.json", ^activity, _} + end + + test "should not stream private posts containing followed hashtags on the 'user' stream", %{ + user: user, + token: oauth_token + } do + hashtag = insert(:hashtag, %{name: "tenshi"}) + other_user = insert(:user) + {:ok, user} = User.follow_hashtag(user, hashtag) + + Streamer.get_topic_and_add_socket("user", user, oauth_token) + + {:ok, activity} = + CommonAPI.post(other_user, %{status: "hey #tenshi", visibility: "private"}) + + refute_receive {:render_with_user, _, "update.json", ^activity, _} + end end describe "public streams" do @@ -452,7 +450,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("public", user, oauth_token) {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, "public"} refute Streamer.filtered_by_user?(other_user, activity) end @@ -484,6 +482,54 @@ defmodule Pleroma.Web.StreamerTest do assert_receive {:text, event} assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event) end + + test "it streams edits in the 'public' stream" do + sender = insert(:user) + + Streamer.get_topic_and_add_socket("public", nil, nil) + {:ok, activity} = CommonAPI.post(sender, %{status: "hey"}) + assert_receive {:text, _} + + {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"}) + + edited = Pleroma.Activity.normalize(edited) + + %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"]) + + assert_receive {:text, event} + assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event) + assert %{"id" => ^activity_id} = Jason.decode!(payload) + refute Streamer.filtered_by_user?(sender, edited) + end + + test "it streams multiple edits in the 'public' stream correctly" do + sender = insert(:user) + + Streamer.get_topic_and_add_socket("public", nil, nil) + {:ok, activity} = CommonAPI.post(sender, %{status: "hey"}) + assert_receive {:text, _} + + {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"}) + + edited = Pleroma.Activity.normalize(edited) + + %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"]) + + assert_receive {:text, event} + assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event) + assert %{"id" => ^activity_id} = Jason.decode!(payload) + refute Streamer.filtered_by_user?(sender, edited) + + {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew 2"}) + + edited = Pleroma.Activity.normalize(edited) + + %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"]) + assert_receive {:text, event} + assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event) + assert %{"id" => ^activity_id, "content" => "mew mew 2"} = Jason.decode!(payload) + refute Streamer.filtered_by_user?(sender, edited) + end end describe "thread_containment/2" do @@ -504,7 +550,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, "public"} assert Streamer.filtered_by_user?(user, activity) end @@ -526,7 +572,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, "public"} refute Streamer.filtered_by_user?(user, activity) end @@ -549,7 +595,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, "public"} refute Streamer.filtered_by_user?(user, activity) end end @@ -563,7 +609,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("public", user, oauth_token) {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, "public"} assert Streamer.filtered_by_user?(user, activity) end @@ -580,17 +626,17 @@ defmodule Pleroma.Web.StreamerTest do {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"}) - assert_receive {:render_with_user, _, _, ^activity_one} + assert_receive {:render_with_user, _, _, ^activity_one, "public"} assert Streamer.filtered_by_user?(blocker, activity_one) {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"}) - assert_receive {:render_with_user, _, _, ^activity_two} + assert_receive {:render_with_user, _, _, ^activity_two, "public"} assert Streamer.filtered_by_user?(blocker, activity_two) {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"}) - assert_receive {:render_with_user, _, _, ^activity_three} + assert_receive {:render_with_user, _, _, ^activity_three, "public"} assert Streamer.filtered_by_user?(blocker, activity_three) end end @@ -651,7 +697,8 @@ defmodule Pleroma.Web.StreamerTest do visibility: "private" }) - assert_receive {:render_with_user, _, _, ^activity} + stream_name = "list:#{list.id}" + assert_receive {:render_with_user, _, _, ^activity, ^stream_name} refute Streamer.filtered_by_user?(user_a, activity) end end @@ -669,7 +716,8 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2) - assert_receive {:render_with_user, _, _, ^announce_activity} + stream_name = "user:#{user1.id}" + assert_receive {:render_with_user, _, _, ^announce_activity, ^stream_name} assert Streamer.filtered_by_user?(user1, announce_activity) end @@ -685,7 +733,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, "user"} assert Streamer.filtered_by_user?(user1, notif) end @@ -701,7 +749,7 @@ defmodule Pleroma.Web.StreamerTest do Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, "user"} refute Streamer.filtered_by_user?(user1, notif) end end @@ -716,7 +764,8 @@ defmodule Pleroma.Web.StreamerTest do {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, _} = CommonAPI.add_mute(user2, activity) - assert_receive {:render_with_user, _, _, ^activity} + stream_name = "user:#{user2.id}" + assert_receive {:render_with_user, _, _, ^activity, ^stream_name} assert Streamer.filtered_by_user?(user2, activity) end end @@ -758,7 +807,8 @@ defmodule Pleroma.Web.StreamerTest do }) create_activity_id = create_activity.id - assert_receive {:render_with_user, _, _, ^create_activity} + stream_name = "direct:#{user.id}" + assert_receive {:render_with_user, _, _, ^create_activity, ^stream_name} assert_receive {:text, received_conversation1} assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) @@ -772,6 +822,7 @@ defmodule Pleroma.Web.StreamerTest do refute_receive _ end + @tag :erratic test "it sends conversation update to the 'direct' stream when a message is deleted", %{ user: user, token: oauth_token @@ -792,8 +843,9 @@ defmodule Pleroma.Web.StreamerTest do visibility: "direct" }) - assert_receive {:render_with_user, _, _, ^create_activity} - assert_receive {:render_with_user, _, _, ^create_activity2} + stream_name = "direct:#{user.id}" + assert_receive {:render_with_user, _, _, ^create_activity, ^stream_name} + assert_receive {:render_with_user, _, _, ^create_activity2, ^stream_name} assert_receive {:text, received_conversation1} assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) assert_receive {:text, received_conversation1} @@ -813,4 +865,105 @@ defmodule Pleroma.Web.StreamerTest do assert last_status["id"] == to_string(create_activity.id) end end + + describe "stop streaming if token got revoked" do + setup do + child_proc = fn start, finalize -> + fn -> + start.() + + receive do + {StreamerTest, :ready} -> + assert_receive {:render_with_user, _, "update.json", _, _} + + receive do + {StreamerTest, :revoked} -> finalize.() + end + end + end + end + + starter = fn user, token -> + fn -> Streamer.get_topic_and_add_socket("user", user, token) end + end + + hit = fn -> assert_receive :close end + miss = fn -> refute_receive :close end + + send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end + + %{ + child_proc: child_proc, + starter: starter, + hit: hit, + miss: miss, + send_all: send_all + } + end + + test "do not revoke other tokens", %{ + child_proc: child_proc, + starter: starter, + hit: hit, + miss: miss, + send_all: send_all + } do + %{user: user, token: token} = oauth_access(["read"]) + %{token: token2} = oauth_access(["read"], user: user) + %{user: user2, token: user2_token} = oauth_access(["read"]) + + post_user = insert(:user) + CommonAPI.follow(user, post_user) + CommonAPI.follow(user2, post_user) + + tasks = [ + Task.async(child_proc.(starter.(user, token), hit)), + Task.async(child_proc.(starter.(user, token2), miss)), + Task.async(child_proc.(starter.(user2, user2_token), miss)) + ] + + {:ok, _} = + CommonAPI.post(post_user, %{ + status: "hi" + }) + + send_all.(tasks, {StreamerTest, :ready}) + + Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) + + send_all.(tasks, {StreamerTest, :revoked}) + + Enum.each(tasks, &Task.await/1) + end + + test "revoke all streams for this token", %{ + child_proc: child_proc, + starter: starter, + hit: hit, + send_all: send_all + } do + %{user: user, token: token} = oauth_access(["read"]) + + post_user = insert(:user) + CommonAPI.follow(user, post_user) + + tasks = [ + Task.async(child_proc.(starter.(user, token), hit)), + Task.async(child_proc.(starter.(user, token), hit)) + ] + + {:ok, _} = + CommonAPI.post(post_user, %{ + status: "hi" + }) + + send_all.(tasks, {StreamerTest, :ready}) + + Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) + + send_all.(tasks, {StreamerTest, :revoked}) + + Enum.each(tasks, &Task.await/1) + end + end end