Disconnect streaming sessions when token is revoked
[akkoma] / test / pleroma / integration / mastodon_websocket_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Integration.MastodonWebsocketTest do
6 # Needs a streamer, needs to stay synchronous
7 use Pleroma.DataCase
8
9 import ExUnit.CaptureLog
10 import Pleroma.Factory
11
12 alias Pleroma.Integration.WebsocketClient
13 alias Pleroma.Web.CommonAPI
14 alias Pleroma.Web.OAuth
15
16 @moduletag needs_streamer: true, capture_log: true
17
18 @path Pleroma.Web.Endpoint.url()
19 |> URI.parse()
20 |> Map.put(:scheme, "ws")
21 |> Map.put(:path, "/api/v1/streaming")
22 |> URI.to_string()
23
24 def start_socket(qs \\ nil, headers \\ []) do
25 path =
26 case qs do
27 nil -> @path
28 qs -> @path <> qs
29 end
30
31 WebsocketClient.start_link(self(), path, headers)
32 end
33
34 test "allows multi-streams" do
35 capture_log(fn ->
36 assert {:ok, _} = start_socket()
37
38 assert {:error, %WebSockex.RequestError{code: 404, message: "Not Found"}} =
39 start_socket("?stream=ncjdk")
40
41 Process.sleep(30)
42 end)
43 end
44
45 test "requires authentication and a valid token for protected streams" do
46 capture_log(fn ->
47 assert {:error, %WebSockex.RequestError{code: 401}} =
48 start_socket("?stream=user&access_token=aaaaaaaaaaaa")
49
50 assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
51 Process.sleep(30)
52 end)
53 end
54
55 test "allows public streams without authentication" do
56 assert {:ok, _} = start_socket("?stream=public")
57 assert {:ok, _} = start_socket("?stream=public:local")
58 assert {:ok, _} = start_socket("?stream=public:remote&instance=lain.com")
59 assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
60 end
61
62 test "receives well formatted events" do
63 user = insert(:user)
64 {:ok, _} = start_socket("?stream=public")
65 {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
66
67 assert_receive {:text, raw_json}, 1_000
68 assert {:ok, json} = Jason.decode(raw_json)
69
70 assert "update" == json["event"]
71 assert json["payload"]
72 assert {:ok, json} = Jason.decode(json["payload"])
73
74 view_json =
75 Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
76 |> Jason.encode!()
77 |> Jason.decode!()
78
79 assert json == view_json
80 end
81
82 describe "with a valid user token" do
83 setup do
84 {:ok, app} =
85 Pleroma.Repo.insert(
86 OAuth.App.register_changeset(%OAuth.App{}, %{
87 client_name: "client",
88 scopes: ["read"],
89 redirect_uris: "url"
90 })
91 )
92
93 user = insert(:user)
94
95 {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
96
97 {:ok, token} = OAuth.Token.exchange_token(app, auth)
98
99 %{app: app, user: user, token: token}
100 end
101
102 test "accepts valid tokens", state do
103 assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
104 end
105
106 test "accepts the 'user' stream", %{token: token} = _state do
107 assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
108
109 capture_log(fn ->
110 assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
111 Process.sleep(30)
112 end)
113 end
114
115 test "accepts the 'user:notification' stream", %{token: token} = _state do
116 assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
117
118 capture_log(fn ->
119 assert {:error, %WebSockex.RequestError{code: 401}} =
120 start_socket("?stream=user:notification")
121
122 Process.sleep(30)
123 end)
124 end
125
126 test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
127 assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
128
129 capture_log(fn ->
130 assert {:error, %WebSockex.RequestError{code: 401}} =
131 start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
132
133 Process.sleep(30)
134 end)
135 end
136
137 test "disconnect when token is revoked", %{app: app, user: user, token: token} do
138 assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
139 assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
140
141 {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
142
143 {:ok, token2} = OAuth.Token.exchange_token(app, auth)
144 assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}")
145
146 OAuth.Token.Strategy.Revoke.revoke(token)
147
148 assert_receive {:close, _}
149 assert_receive {:close, _}
150 refute_receive {:close, _}
151 end
152 end
153 end