10693.2.4
by Jonathan Lange
Parametrize the keys in the factory |
1 |
# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
|
8687.15.17
by Karl Fogel
Add the copyright header block to the rest of the files under lib/lp/. |
2 |
# GNU Affero General Public License version 3 (see the file LICENSE).
|
3 |
||
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
4 |
import os |
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
5 |
|
6 |
from testtools.deferredruntest import ( |
|
7 |
assert_fails_with, |
|
11705.2.29
by Jonathan Lange
One flushLoggedErrors call missed |
8 |
flush_logged_errors, |
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
9 |
AsynchronousDeferredRunTest, |
10 |
)
|
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
11 |
|
12 |
from twisted.conch.checkers import SSHPublicKeyDatabase |
|
13 |
from twisted.conch.error import ConchError |
|
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
14 |
from twisted.conch.ssh import userauth |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
15 |
from twisted.conch.ssh.common import ( |
16 |
getNS, |
|
17 |
NS, |
|
18 |
)
|
|
19 |
from twisted.conch.ssh.keys import ( |
|
20 |
BadKeyError, |
|
21 |
Key, |
|
22 |
)
|
|
23 |
from twisted.conch.ssh.transport import ( |
|
24 |
SSHCiphers, |
|
25 |
SSHServerTransport, |
|
26 |
)
|
|
27 |
from twisted.cred.error import UnauthorizedLogin |
|
28 |
from twisted.cred.portal import ( |
|
29 |
IRealm, |
|
30 |
Portal, |
|
31 |
)
|
|
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
32 |
from twisted.internet import defer |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
33 |
from twisted.python import failure |
34 |
from twisted.python.util import sibpath |
|
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
35 |
from zope.interface import implements |
6789.6.34
by Michael Hudson
remove the final uses of the custom authserver client and the client itself |
36 |
|
8137.6.6
by Michael Hudson
use the new interface in the checker, removes one call per key presented at least |
37 |
from canonical.launchpad.xmlrpc import faults |
10693.5.5
by Jonathan Lange
Move auth logic to services |
38 |
from lp.services.sshserver import auth |
10548.1.1
by Jonathan Lange
Move twistedsupport to lp.services |
39 |
from lp.services.twistedsupport import suppress_stderr |
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
40 |
from lp.testing import TestCase |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
41 |
|
7483.1.11
by Jonathan Lange
Get rid of compatibility guff. |
42 |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
43 |
class MockRealm: |
44 |
"""A mock realm for testing userauth.SSHUserAuthServer.
|
|
45 |
||
46 |
This realm is not actually used in the course of testing, so calls to
|
|
47 |
requestAvatar will raise an exception.
|
|
48 |
"""
|
|
49 |
||
50 |
implements(IRealm) |
|
51 |
||
10693.3.29
by Jonathan Lange
Clean up a bunch of coding standard violations. |
52 |
def requestAvatar(self, avatar_id, mind, *interfaces): |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
53 |
user_dict = { |
10693.3.29
by Jonathan Lange
Clean up a bunch of coding standard violations. |
54 |
'id': avatar_id, 'name': avatar_id, 'teams': [], |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
55 |
'initialBranches': []} |
56 |
return ( |
|
10693.3.25
by Jonathan Lange
Move codehosting-specific stuff out of LaunchpadAvatar |
57 |
interfaces[0], auth.LaunchpadAvatar(user_dict), lambda: None) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
58 |
|
59 |
||
60 |
class MockSSHTransport(SSHServerTransport): |
|
61 |
"""A mock SSH transport for testing userauth.SSHUserAuthServer.
|
|
62 |
||
63 |
SSHUserAuthServer expects an SSH transport which has a factory attribute
|
|
64 |
which in turn has a portal attribute. Because the portal is important for
|
|
65 |
testing authentication, we need to be able to provide an interesting portal
|
|
66 |
object to the SSHUserAuthServer.
|
|
67 |
||
68 |
In addition, we want to be able to capture any packets sent over the
|
|
69 |
transport.
|
|
70 |
"""
|
|
71 |
||
72 |
class Factory: |
|
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
73 |
def getService(self, transport, nextService): |
74 |
return lambda: None |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
75 |
|
76 |
def __init__(self, portal): |
|
5990.2.7
by Jonathan Lange
Add a comment about the new dummy variable. |
77 |
# In Twisted 8.0.1, Conch's transport starts referring to
|
78 |
# currentEncryptions where it didn't before. Provide a dummy value for
|
|
79 |
# it.
|
|
5990.2.4
by Jonathan Lange
Fix up our customized banner support and make the ssh server tests work with Twisted. |
80 |
self.currentEncryptions = SSHCiphers('none', 'none', 'none', 'none') |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
81 |
self.packets = [] |
82 |
self.factory = self.Factory() |
|
83 |
self.factory.portal = portal |
|
84 |
||
85 |
def sendPacket(self, messageType, payload): |
|
86 |
self.packets.append((messageType, payload)) |
|
87 |
||
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
88 |
def setService(self, service): |
89 |
pass
|
|
90 |
||
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
91 |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
92 |
class UserAuthServerMixin(object): |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
93 |
def setUp(self): |
94 |
self.portal = Portal(MockRealm()) |
|
95 |
self.transport = MockSSHTransport(self.portal) |
|
7483.1.22
by Jonathan Lange
All that's left in the server module is authentication, so call it auth. |
96 |
self.user_auth = auth.SSHUserAuthServer(self.transport) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
97 |
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
98 |
def _getMessageName(self, message_type): |
99 |
"""Get the name of the message for the given message type constant."""
|
|
8757.2.5
by Michael Hudson
twisted cleans up after itself better now |
100 |
return userauth.SSHUserAuthServer.protocolMessages[message_type] |
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
101 |
|
102 |
def assertMessageOrder(self, message_types): |
|
5977.5.3
by Jonathan Lange
Clean up lint. |
103 |
"""Assert that SSH messages were sent in the given order."""
|
8757.2.5
by Michael Hudson
twisted cleans up after itself better now |
104 |
messages = userauth.SSHUserAuthServer.protocolMessages |
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
105 |
self.assertEqual( |
8757.2.5
by Michael Hudson
twisted cleans up after itself better now |
106 |
[messages[msg_type] for msg_type in message_types], |
107 |
[messages[packet_type] |
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
108 |
for packet_type, contents in self.transport.packets]) |
109 |
||
4690.2.1
by Jonathan Lange
Extract assertion about a banner being sent into a special assert method. |
110 |
def assertBannerSent(self, banner_message, expected_language='en'): |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
111 |
"""Assert that 'banner_message' was sent as an SSH banner."""
|
4690.2.1
by Jonathan Lange
Extract assertion about a banner being sent into a special assert method. |
112 |
# Check that we received a BANNER, then a FAILURE.
|
113 |
for packet_type, packet_content in self.transport.packets: |
|
114 |
if packet_type == userauth.MSG_USERAUTH_BANNER: |
|
115 |
bytes, language, empty = getNS(packet_content, 2) |
|
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
116 |
self.assertEqual(banner_message, bytes.decode('UTF8')) |
4690.2.1
by Jonathan Lange
Extract assertion about a banner being sent into a special assert method. |
117 |
self.assertEqual(expected_language, language) |
118 |
self.assertEqual('', empty) |
|
119 |
break
|
|
120 |
else: |
|
121 |
self.fail("No banner logged.") |
|
4785.3.7
by Jeroen Vermeulen
Removed whitespace at ends of lines |
122 |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
123 |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
124 |
class TestUserAuthServer(TestCase, UserAuthServerMixin): |
125 |
||
126 |
def setUp(self): |
|
127 |
TestCase.setUp(self) |
|
128 |
UserAuthServerMixin.setUp(self) |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
129 |
|
130 |
def test_sendBanner(self): |
|
131 |
# sendBanner should send an SSH 'packet' with type MSG_USERAUTH_BANNER
|
|
5977.5.3
by Jonathan Lange
Clean up lint. |
132 |
# and two fields. The first field is the message itself, and the
|
133 |
# second is the language tag.
|
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
134 |
#
|
5977.5.3
by Jonathan Lange
Clean up lint. |
135 |
# sendBanner automatically adds a trailing newline, because openssh
|
136 |
# and Twisted don't add one when displaying the banner.
|
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
137 |
#
|
138 |
# See RFC 4252, Section 5.4.
|
|
139 |
message = u"test message" |
|
140 |
self.user_auth.sendBanner(message, language='en-US') |
|
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
141 |
self.assertBannerSent(message + '\r\n', 'en-US') |
4690.2.1
by Jonathan Lange
Extract assertion about a banner being sent into a special assert method. |
142 |
self.assertEqual( |
143 |
1, len(self.transport.packets), |
|
144 |
"More than just banner was sent: %r" % self.transport.packets) |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
145 |
|
146 |
def test_sendBannerUsesCRLF(self): |
|
147 |
# sendBanner should make sure that any line breaks in the message are
|
|
148 |
# sent as CR LF pairs.
|
|
149 |
#
|
|
150 |
# See RFC 4252, Section 5.4.
|
|
151 |
self.user_auth.sendBanner(u"test\nmessage") |
|
152 |
[(messageType, payload)] = self.transport.packets |
|
153 |
bytes, language, empty = getNS(payload, 2) |
|
154 |
self.assertEqual(bytes.decode('UTF8'), u"test\r\nmessage\r\n") |
|
155 |
||
156 |
def test_requestRaisesConchError(self): |
|
157 |
# ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
|
|
158 |
# None. Added to catch a bug noticed by pyflakes.
|
|
159 |
# Whitebox test.
|
|
160 |
def mock_try_auth(kind, user, data): |
|
161 |
return None |
|
162 |
def mock_eb_bad_auth(reason): |
|
163 |
reason.trap(ConchError) |
|
5977.5.3
by Jonathan Lange
Clean up lint. |
164 |
tryAuth = self.user_auth.tryAuth |
165 |
self.user_auth.tryAuth = mock_try_auth |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
166 |
_ebBadAuth, self.user_auth._ebBadAuth = (self.user_auth._ebBadAuth, |
167 |
mock_eb_bad_auth) |
|
168 |
self.user_auth.serviceStarted() |
|
169 |
try: |
|
170 |
packet = NS('jml') + NS('foo') + NS('public_key') + NS('data') |
|
171 |
self.user_auth.ssh_USERAUTH_REQUEST(packet) |
|
172 |
finally: |
|
173 |
self.user_auth.serviceStopped() |
|
174 |
self.user_auth.tryAuth = tryAuth |
|
175 |
self.user_auth._ebBadAuth = _ebBadAuth |
|
176 |
||
177 |
||
178 |
class MockChecker(SSHPublicKeyDatabase): |
|
179 |
"""A very simple public key checker which rejects all offered credentials.
|
|
180 |
||
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
181 |
Used by TestAuthenticationBannerDisplay to test that errors raised by
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
182 |
checkers are sent to SSH clients.
|
183 |
"""
|
|
184 |
||
185 |
error_message = u'error message' |
|
186 |
||
187 |
def requestAvatarId(self, credentials): |
|
4690.2.4
by Jonathan Lange
Change the mock checker so that its response depends entirely on the username. |
188 |
if credentials.username == 'success': |
189 |
return credentials.username |
|
190 |
else: |
|
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
191 |
return failure.Failure( |
7483.1.22
by Jonathan Lange
All that's left in the server module is authentication, so call it auth. |
192 |
auth.UserDisplayedUnauthorizedLogin(self.error_message)) |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
193 |
|
194 |
||
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
195 |
class TestAuthenticationBannerDisplay(UserAuthServerMixin, TestCase): |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
196 |
"""Check that auth error information is passed through to the client.
|
197 |
||
198 |
Normally, SSH servers provide minimal information on failed authentication.
|
|
199 |
With Launchpad, much more user information is public, so it is helpful and
|
|
200 |
not insecure to tell users why they failed to authenticate.
|
|
201 |
||
202 |
SSH doesn't provide a standard way of doing this, but the
|
|
203 |
MSG_USERAUTH_BANNER message is allowed and seems appropriate. See RFC 4252,
|
|
204 |
Section 5.4 for more information.
|
|
205 |
"""
|
|
206 |
||
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
207 |
run_tests_with = AsynchronousDeferredRunTest |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
208 |
|
209 |
def setUp(self): |
|
210 |
UserAuthServerMixin.setUp(self) |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
211 |
TestCase.setUp(self) |
4690.2.4
by Jonathan Lange
Change the mock checker so that its response depends entirely on the username. |
212 |
self.portal.registerChecker(MockChecker()) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
213 |
self.user_auth.serviceStarted() |
214 |
self.key_data = self._makeKey() |
|
215 |
||
216 |
def tearDown(self): |
|
217 |
self.user_auth.serviceStopped() |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
218 |
TestCase.tearDown(self) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
219 |
|
220 |
def _makeKey(self): |
|
221 |
keydir = sibpath(__file__, 'keys') |
|
7483.1.11
by Jonathan Lange
Get rid of compatibility guff. |
222 |
public_key = Key.fromString( |
223 |
open(os.path.join(keydir, 'ssh_host_key_rsa.pub'), 'rb').read()) |
|
5990.2.4
by Jonathan Lange
Fix up our customized banner support and make the ssh server tests work with Twisted. |
224 |
if isinstance(public_key, str): |
225 |
return chr(0) + NS('rsa') + NS(public_key) |
|
226 |
else: |
|
227 |
return chr(0) + NS('rsa') + NS(public_key.blob()) |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
228 |
|
4690.2.5
by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw! |
229 |
def requestFailedAuthentication(self): |
230 |
return self.user_auth.ssh_USERAUTH_REQUEST( |
|
231 |
NS('failure') + NS('') + NS('publickey') + self.key_data) |
|
232 |
||
233 |
def requestSuccessfulAuthentication(self): |
|
234 |
return self.user_auth.ssh_USERAUTH_REQUEST( |
|
235 |
NS('success') + NS('') + NS('publickey') + self.key_data) |
|
236 |
||
237 |
def requestUnsupportedAuthentication(self): |
|
238 |
# Note that it doesn't matter how the checker responds -- the server
|
|
239 |
# doesn't get that far.
|
|
240 |
return self.user_auth.ssh_USERAUTH_REQUEST( |
|
241 |
NS('success') + NS('') + NS('none') + NS('')) |
|
242 |
||
4690.2.3
by Jonathan Lange
Send the banner if its configured. Still got bugs, but we'll fix it in later |
243 |
def test_bannerNotSentOnSuccess(self): |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
244 |
# No banner is printed when the user authenticates successfully.
|
10693.5.11
by Jonathan Lange
Clean up some references to codehosting. |
245 |
self.user_auth._banner = None |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
246 |
|
4690.2.5
by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw! |
247 |
d = self.requestSuccessfulAuthentication() |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
248 |
def check(ignored): |
249 |
# Check that no banner was sent to the user.
|
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
250 |
self.assertMessageOrder([userauth.MSG_USERAUTH_SUCCESS]) |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
251 |
return d.addCallback(check) |
252 |
||
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
253 |
def test_defaultBannerSentOnSuccess(self): |
254 |
# If a banner was passed to the user auth agent then we send it to the
|
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
255 |
# user when they log in.
|
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
256 |
self.user_auth._banner = "Boogedy boo" |
4690.2.5
by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw! |
257 |
d = self.requestSuccessfulAuthentication() |
4690.2.3
by Jonathan Lange
Send the banner if its configured. Still got bugs, but we'll fix it in later |
258 |
def check(ignored): |
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
259 |
self.assertMessageOrder( |
4690.2.3
by Jonathan Lange
Send the banner if its configured. Still got bugs, but we'll fix it in later |
260 |
[userauth.MSG_USERAUTH_BANNER, userauth.MSG_USERAUTH_SUCCESS]) |
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
261 |
self.assertBannerSent(self.user_auth._banner + '\r\n') |
262 |
return d.addCallback(check) |
|
4785.3.7
by Jeroen Vermeulen
Removed whitespace at ends of lines |
263 |
|
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
264 |
def test_defaultBannerSentOnlyOnce(self): |
4690.2.6
by Jonathan Lange
Only send the banner once. |
265 |
# We don't send the banner on each authentication attempt, just on the
|
266 |
# first one. It is usual for there to be many authentication attempts
|
|
267 |
# per SSH session.
|
|
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
268 |
self.user_auth._banner = "Boogedy boo" |
4690.2.6
by Jonathan Lange
Only send the banner once. |
269 |
|
270 |
d = self.requestUnsupportedAuthentication() |
|
271 |
d.addCallback(lambda ignored: self.requestSuccessfulAuthentication()) |
|
4785.3.7
by Jeroen Vermeulen
Removed whitespace at ends of lines |
272 |
|
4690.2.6
by Jonathan Lange
Only send the banner once. |
273 |
def check(ignored): |
274 |
# Check that no banner was sent to the user.
|
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
275 |
self.assertMessageOrder( |
276 |
[userauth.MSG_USERAUTH_FAILURE, userauth.MSG_USERAUTH_BANNER, |
|
4690.2.6
by Jonathan Lange
Only send the banner once. |
277 |
userauth.MSG_USERAUTH_SUCCESS]) |
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
278 |
self.assertBannerSent(self.user_auth._banner + '\r\n') |
279 |
||
280 |
return d.addCallback(check) |
|
281 |
||
282 |
def test_defaultBannerNotSentOnFailure(self): |
|
283 |
# Failed authentication attempts do not get the default banner
|
|
5977.5.3
by Jonathan Lange
Clean up lint. |
284 |
# sent.
|
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
285 |
self.user_auth._banner = "You come away two hundred quid down" |
4690.2.8
by Jonathan Lange
Add a test to show that the banner isn't sent on failed auth attempts. |
286 |
|
287 |
d = self.requestFailedAuthentication() |
|
288 |
||
289 |
def check(ignored): |
|
290 |
self.assertMessageOrder( |
|
291 |
[userauth.MSG_USERAUTH_BANNER, userauth.MSG_USERAUTH_FAILURE]) |
|
292 |
self.assertBannerSent(MockChecker.error_message + '\r\n') |
|
293 |
||
10693.2.5
by Jonathan Lange
Parametrize the banner in the UserAuth agent |
294 |
return d.addCallback(check) |
4690.2.8
by Jonathan Lange
Add a test to show that the banner isn't sent on failed auth attempts. |
295 |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
296 |
def test_loggedToBanner(self): |
297 |
# When there's an authentication failure, we display an informative
|
|
298 |
# error message through the SSH authentication protocol 'banner'.
|
|
4690.2.5
by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw! |
299 |
d = self.requestFailedAuthentication() |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
300 |
def check(ignored): |
301 |
# Check that we received a BANNER, then a FAILURE.
|
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
302 |
self.assertMessageOrder( |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
303 |
[userauth.MSG_USERAUTH_BANNER, userauth.MSG_USERAUTH_FAILURE]) |
4690.2.2
by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully. |
304 |
self.assertBannerSent(MockChecker.error_message + '\r\n') |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
305 |
return d.addCallback(check) |
306 |
||
307 |
def test_unsupportedAuthMethodNotLogged(self): |
|
308 |
# Trying various authentication methods is a part of the normal
|
|
309 |
# operation of the SSH authentication protocol. We should not spam the
|
|
5977.5.3
by Jonathan Lange
Clean up lint. |
310 |
# client with warnings about this, as whenever it becomes a problem,
|
311 |
# we can rely on the SSH client itself to report it to the user.
|
|
4690.2.5
by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw! |
312 |
d = self.requestUnsupportedAuthentication() |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
313 |
def check(ignored): |
314 |
# Check that we received only a FAILRE.
|
|
4690.2.7
by Jonathan Lange
Extract message order assertions into an assert method. |
315 |
self.assertMessageOrder([userauth.MSG_USERAUTH_FAILURE]) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
316 |
return d.addCallback(check) |
317 |
||
318 |
||
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
319 |
class TestPublicKeyFromLaunchpadChecker(TestCase): |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
320 |
"""Tests for the SSH server authentication mechanism.
|
321 |
||
322 |
PublicKeyFromLaunchpadChecker accepts the SSH authentication information
|
|
323 |
and contacts the authserver to determine if the given details are valid.
|
|
324 |
||
325 |
Any authentication errors are displayed back to the user via an SSH
|
|
326 |
MSG_USERAUTH_BANNER message.
|
|
327 |
"""
|
|
328 |
||
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
329 |
run_tests_with = AsynchronousDeferredRunTest |
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
330 |
|
331 |
class FakeAuthenticationEndpoint: |
|
7024.1.3
by Jonathan Lange
Move the authentication endpoint interface to the launchpad/interfaces |
332 |
"""A fake client for enough of `IAuthServer` for this test.
|
6789.12.13
by Michael Hudson
review comments |
333 |
"""
|
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
334 |
|
335 |
valid_user = 'valid_user' |
|
336 |
no_key_user = 'no_key_user' |
|
337 |
valid_key = 'valid_key' |
|
338 |
||
8137.6.13
by Michael Hudson
better tests, documentation |
339 |
def __init__(self): |
340 |
self.calls = [] |
|
341 |
||
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
342 |
def callRemote(self, function_name, *args, **kwargs): |
343 |
return getattr( |
|
344 |
self, 'xmlrpc_%s' % function_name)(*args, **kwargs) |
|
345 |
||
8137.6.6
by Michael Hudson
use the new interface in the checker, removes one call per key presented at least |
346 |
def xmlrpc_getUserAndSSHKeys(self, username): |
8137.6.13
by Michael Hudson
better tests, documentation |
347 |
self.calls.append(username) |
8137.6.6
by Michael Hudson
use the new interface in the checker, removes one call per key presented at least |
348 |
if username == self.valid_user: |
349 |
return defer.succeed({ |
|
350 |
'name': username, |
|
351 |
'keys': [('DSA', self.valid_key.encode('base64'))], |
|
352 |
})
|
|
353 |
elif username == self.no_key_user: |
|
354 |
return defer.succeed({ |
|
355 |
'name': username, |
|
356 |
'keys': [], |
|
357 |
})
|
|
358 |
else: |
|
359 |
try: |
|
360 |
raise faults.NoSuchPersonWithName(username) |
|
361 |
except faults.NoSuchPersonWithName: |
|
362 |
return defer.fail() |
|
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
363 |
|
8137.6.13
by Michael Hudson
better tests, documentation |
364 |
def makeCredentials(self, username, public_key, mind=None): |
365 |
if mind is None: |
|
366 |
mind = auth.UserDetailsMind() |
|
8137.6.9
by Michael Hudson
fix tests |
367 |
return auth.SSHPrivateKeyWithMind( |
368 |
username, 'ssh-dss', public_key, '', None, mind) |
|
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
369 |
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
370 |
def makeChecker(self, do_signature_checking=False): |
6789.11.5
by Michael Hudson
increase readability of tests. |
371 |
"""Construct a PublicKeyFromLaunchpadChecker.
|
372 |
||
373 |
:param do_signature_checking: if False, as is the default, monkeypatch
|
|
374 |
the returned instance to not verify the signatures of the keys.
|
|
375 |
"""
|
|
7483.1.22
by Jonathan Lange
All that's left in the server module is authentication, so call it auth. |
376 |
checker = auth.PublicKeyFromLaunchpadChecker(self.authserver) |
6789.11.5
by Michael Hudson
increase readability of tests. |
377 |
if not do_signature_checking: |
378 |
checker._cbRequestAvatarId = self._cbRequestAvatarId |
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
379 |
return checker |
380 |
||
6789.11.2
by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication |
381 |
def _cbRequestAvatarId(self, is_key_valid, credentials): |
382 |
if is_key_valid: |
|
383 |
return credentials.username |
|
384 |
return failure.Failure(UnauthorizedLogin()) |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
385 |
|
386 |
def setUp(self): |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
387 |
TestCase.setUp(self) |
6789.11.4
by Michael Hudson
move checker creation to helper method |
388 |
self.authserver = self.FakeAuthenticationEndpoint() |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
389 |
|
390 |
def test_successful(self): |
|
6789.11.5
by Michael Hudson
increase readability of tests. |
391 |
# Attempting to log in with a username and key known to the
|
392 |
# authentication end-point succeeds.
|
|
393 |
creds = self.makeCredentials( |
|
394 |
self.authserver.valid_user, self.authserver.valid_key) |
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
395 |
checker = self.makeChecker() |
396 |
d = checker.requestAvatarId(creds) |
|
6789.11.5
by Michael Hudson
increase readability of tests. |
397 |
return d.addCallback(self.assertEqual, self.authserver.valid_user) |
398 |
||
399 |
@suppress_stderr
|
|
400 |
def test_invalid_signature(self): |
|
401 |
# The checker requests attempts to authenticate if the requests have
|
|
402 |
# an invalid signature.
|
|
403 |
creds = self.makeCredentials( |
|
404 |
self.authserver.valid_user, self.authserver.valid_key) |
|
405 |
creds.signature = 'a' |
|
406 |
checker = self.makeChecker(True) |
|
407 |
d = checker.requestAvatarId(creds) |
|
408 |
def flush_errback(f): |
|
11705.2.29
by Jonathan Lange
One flushLoggedErrors call missed |
409 |
flush_logged_errors(BadKeyError) |
6789.11.5
by Michael Hudson
increase readability of tests. |
410 |
return f |
411 |
d.addErrback(flush_errback) |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
412 |
return assert_fails_with(d, UnauthorizedLogin) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
413 |
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
414 |
def assertLoginError(self, checker, creds, error_message): |
6789.11.5
by Michael Hudson
increase readability of tests. |
415 |
"""Logging in with 'creds' against 'checker' fails with 'message'.
|
416 |
||
417 |
In particular, this tests that the login attempt fails in a way that
|
|
418 |
is sent to the client.
|
|
419 |
||
420 |
:param checker: The `ICredentialsChecker` used.
|
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
421 |
:param creds: SSHPrivateKey credentials.
|
422 |
:param error_message: String excepted to match the exception's message.
|
|
423 |
:return: Deferred. You must return this from your test.
|
|
424 |
"""
|
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
425 |
d = assert_fails_with( |
6789.11.4
by Michael Hudson
move checker creation to helper method |
426 |
checker.requestAvatarId(creds), |
7483.1.22
by Jonathan Lange
All that's left in the server module is authentication, so call it auth. |
427 |
auth.UserDisplayedUnauthorizedLogin) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
428 |
d.addCallback( |
429 |
lambda exception: self.assertEqual(str(exception), error_message)) |
|
430 |
return d |
|
431 |
||
432 |
def test_noSuchUser(self): |
|
433 |
# When someone signs in with a non-existent user, they should be told
|
|
434 |
# that. The usual security issues don't apply here because the list of
|
|
435 |
# Launchpad user names is public.
|
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
436 |
checker = self.makeChecker() |
6789.11.5
by Michael Hudson
increase readability of tests. |
437 |
creds = self.makeCredentials( |
438 |
'no-such-user', self.authserver.valid_key) |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
439 |
return self.assertLoginError( |
6789.11.4
by Michael Hudson
move checker creation to helper method |
440 |
checker, creds, 'No such Launchpad account: no-such-user') |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
441 |
|
442 |
def test_noKeys(self): |
|
443 |
# When you sign into an existing account with no SSH keys, the SSH
|
|
6789.11.5
by Michael Hudson
increase readability of tests. |
444 |
# server informs you that the account has no keys.
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
445 |
checker = self.makeChecker() |
6789.11.5
by Michael Hudson
increase readability of tests. |
446 |
creds = self.makeCredentials( |
447 |
self.authserver.no_key_user, self.authserver.valid_key) |
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
448 |
return self.assertLoginError( |
6789.11.4
by Michael Hudson
move checker creation to helper method |
449 |
checker, creds, |
5977.5.3
by Jonathan Lange
Clean up lint. |
450 |
"Launchpad user %r doesn't have a registered SSH key" |
6789.11.5
by Michael Hudson
increase readability of tests. |
451 |
% self.authserver.no_key_user) |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
452 |
|
453 |
def test_wrongKey(self): |
|
454 |
# When you sign into an existing account using the wrong key, you
|
|
6789.11.5
by Michael Hudson
increase readability of tests. |
455 |
# are *not* informed of the wrong key. This is because SSH often
|
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
456 |
# tries several keys as part of normal operation.
|
6789.11.4
by Michael Hudson
move checker creation to helper method |
457 |
checker = self.makeChecker() |
6789.11.5
by Michael Hudson
increase readability of tests. |
458 |
creds = self.makeCredentials( |
459 |
self.authserver.valid_user, 'invalid key') |
|
460 |
# We cannot use assertLoginError because we are checking that we fail
|
|
461 |
# with UnauthorizedLogin and not its subclass
|
|
462 |
# UserDisplayedUnauthorizedLogin.
|
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
463 |
d = assert_fails_with( |
6789.11.4
by Michael Hudson
move checker creation to helper method |
464 |
checker.requestAvatarId(creds), |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
465 |
UnauthorizedLogin) |
466 |
d.addCallback( |
|
467 |
lambda exception: |
|
468 |
self.failIf(isinstance(exception, |
|
7483.1.22
by Jonathan Lange
All that's left in the server module is authentication, so call it auth. |
469 |
auth.UserDisplayedUnauthorizedLogin), |
4292.1.32
by Jonathan Lange
Split the SSH tests into a separate module. |
470 |
"Should not be a UserDisplayedUnauthorizedLogin")) |
471 |
return d |
|
472 |
||
8137.6.13
by Michael Hudson
better tests, documentation |
473 |
def test_successful_with_second_key_calls_authserver_once(self): |
474 |
# It is normal in SSH authentication to be presented with a number of
|
|
475 |
# keys. When the valid key is presented after some invalid ones (a)
|
|
476 |
# the login succeeds and (b) only one call is made to the authserver
|
|
477 |
# to retrieve the user's details.
|
|
478 |
checker = self.makeChecker() |
|
479 |
mind = auth.UserDetailsMind() |
|
480 |
wrong_key_creds = self.makeCredentials( |
|
481 |
self.authserver.valid_user, 'invalid key', mind) |
|
482 |
right_key_creds = self.makeCredentials( |
|
483 |
self.authserver.valid_user, self.authserver.valid_key, mind) |
|
484 |
d = checker.requestAvatarId(wrong_key_creds) |
|
485 |
def try_second_key(failure): |
|
486 |
failure.trap(UnauthorizedLogin) |
|
487 |
return checker.requestAvatarId(right_key_creds) |
|
488 |
d.addErrback(try_second_key) |
|
489 |
d.addCallback(self.assertEqual, self.authserver.valid_user) |
|
490 |
def check_one_call(r): |
|
491 |
self.assertEqual( |
|
492 |
[self.authserver.valid_user], self.authserver.calls) |
|
493 |
return r |
|
494 |
d.addCallback(check_one_call) |
|
495 |
return d |
|
496 |
||
8137.6.15
by Michael Hudson
failing test |
497 |
def test_noSuchUser_with_two_keys_calls_authserver_once(self): |
498 |
# When more than one key is presented for a username that does not
|
|
499 |
# exist, only one call is made to the authserver.
|
|
500 |
checker = self.makeChecker() |
|
501 |
mind = auth.UserDetailsMind() |
|
502 |
creds_1 = self.makeCredentials( |
|
503 |
'invalid-user', 'invalid key 1', mind) |
|
504 |
creds_2 = self.makeCredentials( |
|
505 |
'invalid-user', 'invalid key 2', mind) |
|
506 |
d = checker.requestAvatarId(creds_1) |
|
507 |
def try_second_key(failure): |
|
11705.2.1
by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial. |
508 |
return assert_fails_with( |
8137.6.15
by Michael Hudson
failing test |
509 |
checker.requestAvatarId(creds_2), |
510 |
UnauthorizedLogin) |
|
511 |
d.addErrback(try_second_key) |
|
512 |
def check_one_call(r): |
|
513 |
self.assertEqual( |
|
514 |
['invalid-user'], self.authserver.calls) |
|
515 |
return r |
|
516 |
d.addCallback(check_one_call) |
|
517 |
return d |