Add tests for LDAP authorization
[akkoma] / test / web / oauth / ldap_authorization_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.OAuth.LDAPAuthorizationTest do
6 use Pleroma.Web.ConnCase
7 alias Pleroma.Repo
8 alias Pleroma.Web.OAuth.Token
9 import Pleroma.Factory
10 import ExUnit.CaptureLog
11 import Mock
12
13 setup_all do
14 ldap_authenticator = Pleroma.Config.get([Pleroma.Web.Auth.Authenticator])
15 ldap_enabled = Pleroma.Config.get([:ldap, :enabled])
16
17 on_exit(fn ->
18 Pleroma.Config.put([Pleroma.Web.Auth.Authenticator], ldap_authenticator)
19 Pleroma.Config.put([:ldap, :enabled], ldap_enabled)
20 end)
21
22 Pleroma.Config.put([Pleroma.Web.Auth.Authenticator], Pleroma.Web.Auth.LDAPAuthenticator)
23 Pleroma.Config.put([:ldap, :enabled], true)
24
25 :ok
26 end
27
28 test "authorizes the existing user using LDAP credentials" do
29 password = "testpassword"
30 user = insert(:user, password_hash: Comeonin.Pbkdf2.hashpwsalt(password))
31 app = insert(:oauth_app, scopes: ["read", "write"])
32
33 host = Pleroma.Config.get([:ldap, :host]) |> to_charlist
34 port = Pleroma.Config.get([:ldap, :port])
35
36 with_mocks [
37 {:eldap, [],
38 [
39 open: fn [^host], [{:port, ^port}, {:ssl, false} | _] -> {:ok, self()} end,
40 simple_bind: fn _connection, _dn, ^password -> :ok end,
41 close: fn _connection ->
42 send(self(), :close_connection)
43 :ok
44 end
45 ]}
46 ] do
47 conn =
48 build_conn()
49 |> post("/oauth/token", %{
50 "grant_type" => "password",
51 "username" => user.nickname,
52 "password" => password,
53 "client_id" => app.client_id,
54 "client_secret" => app.client_secret
55 })
56
57 assert %{"access_token" => token} = json_response(conn, 200)
58
59 token = Repo.get_by(Token, token: token)
60
61 assert token.user_id == user.id
62 assert_received :close_connection
63 end
64 end
65
66 test "creates a new user after successful LDAP authorization" do
67 password = "testpassword"
68 user = build(:user)
69 app = insert(:oauth_app, scopes: ["read", "write"])
70
71 host = Pleroma.Config.get([:ldap, :host]) |> to_charlist
72 port = Pleroma.Config.get([:ldap, :port])
73
74 with_mocks [
75 {:eldap, [],
76 [
77 open: fn [^host], [{:port, ^port}, {:ssl, false} | _] -> {:ok, self()} end,
78 simple_bind: fn _connection, _dn, ^password -> :ok end,
79 equalityMatch: fn _type, _value -> :ok end,
80 wholeSubtree: fn -> :ok end,
81 search: fn _connection, _options ->
82 {:ok,
83 {:eldap_search_result, [{:eldap_entry, '', [{'mail', [to_charlist(user.email)]}]}],
84 []}}
85 end,
86 close: fn _connection ->
87 send(self(), :close_connection)
88 :ok
89 end
90 ]}
91 ] do
92 conn =
93 build_conn()
94 |> post("/oauth/token", %{
95 "grant_type" => "password",
96 "username" => user.nickname,
97 "password" => password,
98 "client_id" => app.client_id,
99 "client_secret" => app.client_secret
100 })
101
102 assert %{"access_token" => token} = json_response(conn, 200)
103
104 token = Repo.get_by(Token, token: token) |> Repo.preload(:user)
105
106 assert token.user.nickname == user.nickname
107 assert_received :close_connection
108 end
109 end
110
111 test "falls back to the default authorization when LDAP is unavailable" do
112 password = "testpassword"
113 user = insert(:user, password_hash: Comeonin.Pbkdf2.hashpwsalt(password))
114 app = insert(:oauth_app, scopes: ["read", "write"])
115
116 host = Pleroma.Config.get([:ldap, :host]) |> to_charlist
117 port = Pleroma.Config.get([:ldap, :port])
118
119 with_mocks [
120 {:eldap, [],
121 [
122 open: fn [^host], [{:port, ^port}, {:ssl, false} | _] -> {:error, 'connect failed'} end,
123 simple_bind: fn _connection, _dn, ^password -> :ok end,
124 close: fn _connection ->
125 send(self(), :close_connection)
126 :ok
127 end
128 ]}
129 ] do
130 log =
131 capture_log(fn ->
132 conn =
133 build_conn()
134 |> post("/oauth/token", %{
135 "grant_type" => "password",
136 "username" => user.nickname,
137 "password" => password,
138 "client_id" => app.client_id,
139 "client_secret" => app.client_secret
140 })
141
142 assert %{"access_token" => token} = json_response(conn, 200)
143
144 token = Repo.get_by(Token, token: token)
145
146 assert token.user_id == user.id
147 end)
148
149 assert log =~ "Could not open LDAP connection: 'connect failed'"
150 refute_received :close_connection
151 end
152 end
153
154 test "disallow authorization for wrong LDAP credentials" do
155 password = "testpassword"
156 user = insert(:user, password_hash: Comeonin.Pbkdf2.hashpwsalt(password))
157 app = insert(:oauth_app, scopes: ["read", "write"])
158
159 host = Pleroma.Config.get([:ldap, :host]) |> to_charlist
160 port = Pleroma.Config.get([:ldap, :port])
161
162 with_mocks [
163 {:eldap, [],
164 [
165 open: fn [^host], [{:port, ^port}, {:ssl, false} | _] -> {:ok, self()} end,
166 simple_bind: fn _connection, _dn, ^password -> {:error, :invalidCredentials} end,
167 close: fn _connection ->
168 send(self(), :close_connection)
169 :ok
170 end
171 ]}
172 ] do
173 conn =
174 build_conn()
175 |> post("/oauth/token", %{
176 "grant_type" => "password",
177 "username" => user.nickname,
178 "password" => password,
179 "client_id" => app.client_id,
180 "client_secret" => app.client_secret
181 })
182
183 assert %{"error" => "Invalid credentials"} = json_response(conn, 400)
184 assert_received :close_connection
185 end
186 end
187 end