import Pleroma.Factory
+ alias Pleroma.Conversation.Participation
alias Pleroma.List
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.Streamer.Worker
- @moduletag needs_streamer: true
+ @moduletag needs_streamer: true, capture_log: true
clear_config_all([:instance, :skip_thread_containment])
describe "user streams" do
Streamer.stream("user:notification", notif)
Task.await(task)
end
+
+ test "it sends follow activities to the 'user:notification' stream", %{
+ user: user
+ } do
+ user2 = insert(:user)
+ task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
+
+ Streamer.add_socket(
+ "user:notification",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+
+ {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
+
+ # We don't directly pipe the notification to the streamer as it's already
+ # generated as a side effect of CommonAPI.follow().
+ Task.await(task)
+ end
end
test "it sends to public" do
test "it doesn't send to user if recipients invalid and thread containment is enabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user)
- user = insert(:user, following: [author.ap_id])
+ user = insert(:user)
+ User.follow(user, author, "accept")
activity =
insert(:note_activity,
test "it sends message if recipients invalid and thread containment is disabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], true)
author = insert(:user)
- user = insert(:user, following: [author.ap_id])
+ user = insert(:user)
+ User.follow(user, author, "accept")
activity =
insert(:note_activity,
test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user)
- user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
+ user = insert(:user, skip_thread_containment: true)
+ User.follow(user, author, "accept")
activity =
insert(:note_activity,
task =
Task.async(fn ->
- assert_receive {:text, _received_event}, 4_000
+ assert_receive {:text, received_event}, 4_000
+
+ assert %{"event" => "conversation", "payload" => received_payload} =
+ Jason.decode!(received_event)
+
+ assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+ [participation] = Participation.for_user(user)
+ assert last_status["pleroma"]["direct_conversation_id"] == participation.id
end)
Streamer.add_socket(
Task.await(task)
end
- test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
+ test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
user = insert(:user)
another_user = insert(:user)