~launchpad-pqm/launchpad/devel

10693.2.4 by Jonathan Lange
Parametrize the keys in the factory
1
# Copyright 2009-2010 Canonical Ltd.  This software is licensed under the
8687.15.17 by Karl Fogel
Add the copyright header block to the rest of the files under lib/lp/.
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
4
import os
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
5
6
from testtools.deferredruntest import (
7
    assert_fails_with,
11705.2.29 by Jonathan Lange
One flushLoggedErrors call missed
8
    flush_logged_errors,
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
9
    AsynchronousDeferredRunTest,
10
    )
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
11
12
from twisted.conch.checkers import SSHPublicKeyDatabase
13
from twisted.conch.error import ConchError
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
14
from twisted.conch.ssh import userauth
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
15
from twisted.conch.ssh.common import (
16
    getNS,
17
    NS,
18
    )
19
from twisted.conch.ssh.keys import (
20
    BadKeyError,
21
    Key,
22
    )
23
from twisted.conch.ssh.transport import (
24
    SSHCiphers,
25
    SSHServerTransport,
26
    )
27
from twisted.cred.error import UnauthorizedLogin
28
from twisted.cred.portal import (
29
    IRealm,
30
    Portal,
31
    )
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
32
from twisted.internet import defer
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
33
from twisted.python import failure
34
from twisted.python.util import sibpath
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
35
from zope.interface import implements
6789.6.34 by Michael Hudson
remove the final uses of the custom authserver client and the client itself
36
8137.6.6 by Michael Hudson
use the new interface in the checker, removes one call per key presented at least
37
from canonical.launchpad.xmlrpc import faults
10693.5.5 by Jonathan Lange
Move auth logic to services
38
from lp.services.sshserver import auth
10548.1.1 by Jonathan Lange
Move twistedsupport to lp.services
39
from lp.services.twistedsupport import suppress_stderr
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
40
from lp.testing import TestCase
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
41
7483.1.11 by Jonathan Lange
Get rid of compatibility guff.
42
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
43
class MockRealm:
44
    """A mock realm for testing userauth.SSHUserAuthServer.
45
46
    This realm is not actually used in the course of testing, so calls to
47
    requestAvatar will raise an exception.
48
    """
49
50
    implements(IRealm)
51
10693.3.29 by Jonathan Lange
Clean up a bunch of coding standard violations.
52
    def requestAvatar(self, avatar_id, mind, *interfaces):
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
53
        user_dict = {
10693.3.29 by Jonathan Lange
Clean up a bunch of coding standard violations.
54
            'id': avatar_id, 'name': avatar_id, 'teams': [],
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
55
            'initialBranches': []}
56
        return (
10693.3.25 by Jonathan Lange
Move codehosting-specific stuff out of LaunchpadAvatar
57
            interfaces[0], auth.LaunchpadAvatar(user_dict), lambda: None)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
58
59
60
class MockSSHTransport(SSHServerTransport):
61
    """A mock SSH transport for testing userauth.SSHUserAuthServer.
62
63
    SSHUserAuthServer expects an SSH transport which has a factory attribute
64
    which in turn has a portal attribute. Because the portal is important for
65
    testing authentication, we need to be able to provide an interesting portal
66
    object to the SSHUserAuthServer.
67
68
    In addition, we want to be able to capture any packets sent over the
69
    transport.
70
    """
71
72
    class Factory:
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
73
        def getService(self, transport, nextService):
74
            return lambda: None
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
75
76
    def __init__(self, portal):
5990.2.7 by Jonathan Lange
Add a comment about the new dummy variable.
77
        # In Twisted 8.0.1, Conch's transport starts referring to
78
        # currentEncryptions where it didn't before. Provide a dummy value for
79
        # it.
5990.2.4 by Jonathan Lange
Fix up our customized banner support and make the ssh server tests work with Twisted.
80
        self.currentEncryptions = SSHCiphers('none', 'none', 'none', 'none')
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
81
        self.packets = []
82
        self.factory = self.Factory()
83
        self.factory.portal = portal
84
85
    def sendPacket(self, messageType, payload):
86
        self.packets.append((messageType, payload))
87
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
88
    def setService(self, service):
89
        pass
90
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
91
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
92
class UserAuthServerMixin(object):
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
93
    def setUp(self):
94
        self.portal = Portal(MockRealm())
95
        self.transport = MockSSHTransport(self.portal)
7483.1.22 by Jonathan Lange
All that's left in the server module is authentication, so call it auth.
96
        self.user_auth = auth.SSHUserAuthServer(self.transport)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
97
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
98
    def _getMessageName(self, message_type):
99
        """Get the name of the message for the given message type constant."""
8757.2.5 by Michael Hudson
twisted cleans up after itself better now
100
        return userauth.SSHUserAuthServer.protocolMessages[message_type]
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
101
102
    def assertMessageOrder(self, message_types):
5977.5.3 by Jonathan Lange
Clean up lint.
103
        """Assert that SSH messages were sent in the given order."""
8757.2.5 by Michael Hudson
twisted cleans up after itself better now
104
        messages = userauth.SSHUserAuthServer.protocolMessages
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
105
        self.assertEqual(
8757.2.5 by Michael Hudson
twisted cleans up after itself better now
106
            [messages[msg_type] for msg_type in message_types],
107
            [messages[packet_type]
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
108
             for packet_type, contents in self.transport.packets])
109
4690.2.1 by Jonathan Lange
Extract assertion about a banner being sent into a special assert method.
110
    def assertBannerSent(self, banner_message, expected_language='en'):
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
111
        """Assert that 'banner_message' was sent as an SSH banner."""
4690.2.1 by Jonathan Lange
Extract assertion about a banner being sent into a special assert method.
112
        # Check that we received a BANNER, then a FAILURE.
113
        for packet_type, packet_content in self.transport.packets:
114
            if packet_type == userauth.MSG_USERAUTH_BANNER:
115
                bytes, language, empty = getNS(packet_content, 2)
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
116
                self.assertEqual(banner_message, bytes.decode('UTF8'))
4690.2.1 by Jonathan Lange
Extract assertion about a banner being sent into a special assert method.
117
                self.assertEqual(expected_language, language)
118
                self.assertEqual('', empty)
119
                break
120
        else:
121
            self.fail("No banner logged.")
4785.3.7 by Jeroen Vermeulen
Removed whitespace at ends of lines
122
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
123
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
124
class TestUserAuthServer(TestCase, UserAuthServerMixin):
125
126
    def setUp(self):
127
        TestCase.setUp(self)
128
        UserAuthServerMixin.setUp(self)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
129
130
    def test_sendBanner(self):
131
        # sendBanner should send an SSH 'packet' with type MSG_USERAUTH_BANNER
5977.5.3 by Jonathan Lange
Clean up lint.
132
        # and two fields. The first field is the message itself, and the
133
        # second is the language tag.
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
134
        #
5977.5.3 by Jonathan Lange
Clean up lint.
135
        # sendBanner automatically adds a trailing newline, because openssh
136
        # and Twisted don't add one when displaying the banner.
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
137
        #
138
        # See RFC 4252, Section 5.4.
139
        message = u"test message"
140
        self.user_auth.sendBanner(message, language='en-US')
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
141
        self.assertBannerSent(message + '\r\n', 'en-US')
4690.2.1 by Jonathan Lange
Extract assertion about a banner being sent into a special assert method.
142
        self.assertEqual(
143
            1, len(self.transport.packets),
144
            "More than just banner was sent: %r" % self.transport.packets)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
145
146
    def test_sendBannerUsesCRLF(self):
147
        # sendBanner should make sure that any line breaks in the message are
148
        # sent as CR LF pairs.
149
        #
150
        # See RFC 4252, Section 5.4.
151
        self.user_auth.sendBanner(u"test\nmessage")
152
        [(messageType, payload)] = self.transport.packets
153
        bytes, language, empty = getNS(payload, 2)
154
        self.assertEqual(bytes.decode('UTF8'), u"test\r\nmessage\r\n")
155
156
    def test_requestRaisesConchError(self):
157
        # ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
158
        # None. Added to catch a bug noticed by pyflakes.
159
        # Whitebox test.
160
        def mock_try_auth(kind, user, data):
161
            return None
162
        def mock_eb_bad_auth(reason):
163
            reason.trap(ConchError)
5977.5.3 by Jonathan Lange
Clean up lint.
164
        tryAuth = self.user_auth.tryAuth
165
        self.user_auth.tryAuth = mock_try_auth
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
166
        _ebBadAuth, self.user_auth._ebBadAuth = (self.user_auth._ebBadAuth,
167
                                                 mock_eb_bad_auth)
168
        self.user_auth.serviceStarted()
169
        try:
170
            packet = NS('jml') + NS('foo') + NS('public_key') + NS('data')
171
            self.user_auth.ssh_USERAUTH_REQUEST(packet)
172
        finally:
173
            self.user_auth.serviceStopped()
174
            self.user_auth.tryAuth = tryAuth
175
            self.user_auth._ebBadAuth = _ebBadAuth
176
177
178
class MockChecker(SSHPublicKeyDatabase):
179
    """A very simple public key checker which rejects all offered credentials.
180
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
181
    Used by TestAuthenticationBannerDisplay to test that errors raised by
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
182
    checkers are sent to SSH clients.
183
    """
184
185
    error_message = u'error message'
186
187
    def requestAvatarId(self, credentials):
4690.2.4 by Jonathan Lange
Change the mock checker so that its response depends entirely on the username.
188
        if credentials.username == 'success':
189
            return credentials.username
190
        else:
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
191
            return failure.Failure(
7483.1.22 by Jonathan Lange
All that's left in the server module is authentication, so call it auth.
192
                auth.UserDisplayedUnauthorizedLogin(self.error_message))
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
193
194
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
195
class TestAuthenticationBannerDisplay(UserAuthServerMixin, TestCase):
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
196
    """Check that auth error information is passed through to the client.
197
198
    Normally, SSH servers provide minimal information on failed authentication.
199
    With Launchpad, much more user information is public, so it is helpful and
200
    not insecure to tell users why they failed to authenticate.
201
202
    SSH doesn't provide a standard way of doing this, but the
203
    MSG_USERAUTH_BANNER message is allowed and seems appropriate. See RFC 4252,
204
    Section 5.4 for more information.
205
    """
206
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
207
    run_tests_with = AsynchronousDeferredRunTest
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
208
209
    def setUp(self):
210
        UserAuthServerMixin.setUp(self)
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
211
        TestCase.setUp(self)
4690.2.4 by Jonathan Lange
Change the mock checker so that its response depends entirely on the username.
212
        self.portal.registerChecker(MockChecker())
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
213
        self.user_auth.serviceStarted()
214
        self.key_data = self._makeKey()
215
216
    def tearDown(self):
217
        self.user_auth.serviceStopped()
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
218
        TestCase.tearDown(self)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
219
220
    def _makeKey(self):
221
        keydir = sibpath(__file__, 'keys')
7483.1.11 by Jonathan Lange
Get rid of compatibility guff.
222
        public_key = Key.fromString(
223
            open(os.path.join(keydir, 'ssh_host_key_rsa.pub'), 'rb').read())
5990.2.4 by Jonathan Lange
Fix up our customized banner support and make the ssh server tests work with Twisted.
224
        if isinstance(public_key, str):
225
            return chr(0) + NS('rsa') + NS(public_key)
226
        else:
227
            return chr(0) + NS('rsa') + NS(public_key.blob())
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
228
4690.2.5 by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw!
229
    def requestFailedAuthentication(self):
230
        return self.user_auth.ssh_USERAUTH_REQUEST(
231
            NS('failure') + NS('') + NS('publickey') + self.key_data)
232
233
    def requestSuccessfulAuthentication(self):
234
        return self.user_auth.ssh_USERAUTH_REQUEST(
235
            NS('success') + NS('') + NS('publickey') + self.key_data)
236
237
    def requestUnsupportedAuthentication(self):
238
        # Note that it doesn't matter how the checker responds -- the server
239
        # doesn't get that far.
240
        return self.user_auth.ssh_USERAUTH_REQUEST(
241
            NS('success') + NS('') + NS('none') + NS(''))
242
4690.2.3 by Jonathan Lange
Send the banner if its configured. Still got bugs, but we'll fix it in later
243
    def test_bannerNotSentOnSuccess(self):
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
244
        # No banner is printed when the user authenticates successfully.
10693.5.11 by Jonathan Lange
Clean up some references to codehosting.
245
        self.user_auth._banner = None
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
246
4690.2.5 by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw!
247
        d = self.requestSuccessfulAuthentication()
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
248
        def check(ignored):
249
            # Check that no banner was sent to the user.
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
250
            self.assertMessageOrder([userauth.MSG_USERAUTH_SUCCESS])
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
251
        return d.addCallback(check)
252
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
253
    def test_defaultBannerSentOnSuccess(self):
254
        # If a banner was passed to the user auth agent then we send it to the
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
255
        # user when they log in.
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
256
        self.user_auth._banner = "Boogedy boo"
4690.2.5 by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw!
257
        d = self.requestSuccessfulAuthentication()
4690.2.3 by Jonathan Lange
Send the banner if its configured. Still got bugs, but we'll fix it in later
258
        def check(ignored):
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
259
            self.assertMessageOrder(
4690.2.3 by Jonathan Lange
Send the banner if its configured. Still got bugs, but we'll fix it in later
260
                [userauth.MSG_USERAUTH_BANNER, userauth.MSG_USERAUTH_SUCCESS])
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
261
            self.assertBannerSent(self.user_auth._banner + '\r\n')
262
        return d.addCallback(check)
4785.3.7 by Jeroen Vermeulen
Removed whitespace at ends of lines
263
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
264
    def test_defaultBannerSentOnlyOnce(self):
4690.2.6 by Jonathan Lange
Only send the banner once.
265
        # We don't send the banner on each authentication attempt, just on the
266
        # first one. It is usual for there to be many authentication attempts
267
        # per SSH session.
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
268
        self.user_auth._banner = "Boogedy boo"
4690.2.6 by Jonathan Lange
Only send the banner once.
269
270
        d = self.requestUnsupportedAuthentication()
271
        d.addCallback(lambda ignored: self.requestSuccessfulAuthentication())
4785.3.7 by Jeroen Vermeulen
Removed whitespace at ends of lines
272
4690.2.6 by Jonathan Lange
Only send the banner once.
273
        def check(ignored):
274
            # Check that no banner was sent to the user.
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
275
            self.assertMessageOrder(
276
                [userauth.MSG_USERAUTH_FAILURE, userauth.MSG_USERAUTH_BANNER,
4690.2.6 by Jonathan Lange
Only send the banner once.
277
                 userauth.MSG_USERAUTH_SUCCESS])
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
278
            self.assertBannerSent(self.user_auth._banner + '\r\n')
279
280
        return d.addCallback(check)
281
282
    def test_defaultBannerNotSentOnFailure(self):
283
        # Failed authentication attempts do not get the default banner
5977.5.3 by Jonathan Lange
Clean up lint.
284
        # sent.
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
285
        self.user_auth._banner = "You come away two hundred quid down"
4690.2.8 by Jonathan Lange
Add a test to show that the banner isn't sent on failed auth attempts.
286
287
        d = self.requestFailedAuthentication()
288
289
        def check(ignored):
290
            self.assertMessageOrder(
291
                [userauth.MSG_USERAUTH_BANNER, userauth.MSG_USERAUTH_FAILURE])
292
            self.assertBannerSent(MockChecker.error_message + '\r\n')
293
10693.2.5 by Jonathan Lange
Parametrize the banner in the UserAuth agent
294
        return d.addCallback(check)
4690.2.8 by Jonathan Lange
Add a test to show that the banner isn't sent on failed auth attempts.
295
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
296
    def test_loggedToBanner(self):
297
        # When there's an authentication failure, we display an informative
298
        # error message through the SSH authentication protocol 'banner'.
4690.2.5 by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw!
299
        d = self.requestFailedAuthentication()
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
300
        def check(ignored):
301
            # Check that we received a BANNER, then a FAILURE.
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
302
            self.assertMessageOrder(
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
303
                [userauth.MSG_USERAUTH_BANNER, userauth.MSG_USERAUTH_FAILURE])
4690.2.2 by Jonathan Lange
Add a test to show that no banner is sent when the user logs in successfully.
304
            self.assertBannerSent(MockChecker.error_message + '\r\n')
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
305
        return d.addCallback(check)
306
307
    def test_unsupportedAuthMethodNotLogged(self):
308
        # Trying various authentication methods is a part of the normal
309
        # operation of the SSH authentication protocol. We should not spam the
5977.5.3 by Jonathan Lange
Clean up lint.
310
        # client with warnings about this, as whenever it becomes a problem,
311
        # we can rely on the SSH client itself to report it to the user.
4690.2.5 by Jonathan Lange
Extract out auth request types into helper methods. Clarity ftw!
312
        d = self.requestUnsupportedAuthentication()
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
313
        def check(ignored):
314
            # Check that we received only a FAILRE.
4690.2.7 by Jonathan Lange
Extract message order assertions into an assert method.
315
            self.assertMessageOrder([userauth.MSG_USERAUTH_FAILURE])
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
316
        return d.addCallback(check)
317
318
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
319
class TestPublicKeyFromLaunchpadChecker(TestCase):
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
320
    """Tests for the SSH server authentication mechanism.
321
322
    PublicKeyFromLaunchpadChecker accepts the SSH authentication information
323
    and contacts the authserver to determine if the given details are valid.
324
325
    Any authentication errors are displayed back to the user via an SSH
326
    MSG_USERAUTH_BANNER message.
327
    """
328
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
329
    run_tests_with = AsynchronousDeferredRunTest
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
330
331
    class FakeAuthenticationEndpoint:
7024.1.3 by Jonathan Lange
Move the authentication endpoint interface to the launchpad/interfaces
332
        """A fake client for enough of `IAuthServer` for this test.
6789.12.13 by Michael Hudson
review comments
333
        """
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
334
335
        valid_user = 'valid_user'
336
        no_key_user = 'no_key_user'
337
        valid_key = 'valid_key'
338
8137.6.13 by Michael Hudson
better tests, documentation
339
        def __init__(self):
340
            self.calls = []
341
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
342
        def callRemote(self, function_name, *args, **kwargs):
343
            return getattr(
344
                self, 'xmlrpc_%s' % function_name)(*args, **kwargs)
345
8137.6.6 by Michael Hudson
use the new interface in the checker, removes one call per key presented at least
346
        def xmlrpc_getUserAndSSHKeys(self, username):
8137.6.13 by Michael Hudson
better tests, documentation
347
            self.calls.append(username)
8137.6.6 by Michael Hudson
use the new interface in the checker, removes one call per key presented at least
348
            if username == self.valid_user:
349
                return defer.succeed({
350
                    'name': username,
351
                    'keys': [('DSA', self.valid_key.encode('base64'))],
352
                    })
353
            elif username == self.no_key_user:
354
                return defer.succeed({
355
                    'name': username,
356
                    'keys': [],
357
                    })
358
            else:
359
                try:
360
                    raise faults.NoSuchPersonWithName(username)
361
                except faults.NoSuchPersonWithName:
362
                    return defer.fail()
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
363
8137.6.13 by Michael Hudson
better tests, documentation
364
    def makeCredentials(self, username, public_key, mind=None):
365
        if mind is None:
366
            mind = auth.UserDetailsMind()
8137.6.9 by Michael Hudson
fix tests
367
        return auth.SSHPrivateKeyWithMind(
368
            username, 'ssh-dss', public_key, '', None, mind)
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
369
6789.11.4 by Michael Hudson
move checker creation to helper method
370
    def makeChecker(self, do_signature_checking=False):
6789.11.5 by Michael Hudson
increase readability of tests.
371
        """Construct a PublicKeyFromLaunchpadChecker.
372
373
        :param do_signature_checking: if False, as is the default, monkeypatch
374
            the returned instance to not verify the signatures of the keys.
375
        """
7483.1.22 by Jonathan Lange
All that's left in the server module is authentication, so call it auth.
376
        checker = auth.PublicKeyFromLaunchpadChecker(self.authserver)
6789.11.5 by Michael Hudson
increase readability of tests.
377
        if not do_signature_checking:
378
            checker._cbRequestAvatarId = self._cbRequestAvatarId
6789.11.4 by Michael Hudson
move checker creation to helper method
379
        return checker
380
6789.11.2 by Michael Hudson
rewrite TestPublicKeyFromLaunchpadChecker to use a fake authentication
381
    def _cbRequestAvatarId(self, is_key_valid, credentials):
382
        if is_key_valid:
383
            return credentials.username
384
        return failure.Failure(UnauthorizedLogin())
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
385
386
    def setUp(self):
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
387
        TestCase.setUp(self)
6789.11.4 by Michael Hudson
move checker creation to helper method
388
        self.authserver = self.FakeAuthenticationEndpoint()
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
389
390
    def test_successful(self):
6789.11.5 by Michael Hudson
increase readability of tests.
391
        # Attempting to log in with a username and key known to the
392
        # authentication end-point succeeds.
393
        creds = self.makeCredentials(
394
            self.authserver.valid_user, self.authserver.valid_key)
6789.11.4 by Michael Hudson
move checker creation to helper method
395
        checker = self.makeChecker()
396
        d = checker.requestAvatarId(creds)
6789.11.5 by Michael Hudson
increase readability of tests.
397
        return d.addCallback(self.assertEqual, self.authserver.valid_user)
398
399
    @suppress_stderr
400
    def test_invalid_signature(self):
401
        # The checker requests attempts to authenticate if the requests have
402
        # an invalid signature.
403
        creds = self.makeCredentials(
404
            self.authserver.valid_user, self.authserver.valid_key)
405
        creds.signature = 'a'
406
        checker = self.makeChecker(True)
407
        d = checker.requestAvatarId(creds)
408
        def flush_errback(f):
11705.2.29 by Jonathan Lange
One flushLoggedErrors call missed
409
            flush_logged_errors(BadKeyError)
6789.11.5 by Michael Hudson
increase readability of tests.
410
            return f
411
        d.addErrback(flush_errback)
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
412
        return assert_fails_with(d, UnauthorizedLogin)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
413
6789.11.4 by Michael Hudson
move checker creation to helper method
414
    def assertLoginError(self, checker, creds, error_message):
6789.11.5 by Michael Hudson
increase readability of tests.
415
        """Logging in with 'creds' against 'checker' fails with 'message'.
416
417
        In particular, this tests that the login attempt fails in a way that
418
        is sent to the client.
419
420
        :param checker: The `ICredentialsChecker` used.
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
421
        :param creds: SSHPrivateKey credentials.
422
        :param error_message: String excepted to match the exception's message.
423
        :return: Deferred. You must return this from your test.
424
        """
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
425
        d = assert_fails_with(
6789.11.4 by Michael Hudson
move checker creation to helper method
426
            checker.requestAvatarId(creds),
7483.1.22 by Jonathan Lange
All that's left in the server module is authentication, so call it auth.
427
            auth.UserDisplayedUnauthorizedLogin)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
428
        d.addCallback(
429
            lambda exception: self.assertEqual(str(exception), error_message))
430
        return d
431
432
    def test_noSuchUser(self):
433
        # When someone signs in with a non-existent user, they should be told
434
        # that. The usual security issues don't apply here because the list of
435
        # Launchpad user names is public.
6789.11.4 by Michael Hudson
move checker creation to helper method
436
        checker = self.makeChecker()
6789.11.5 by Michael Hudson
increase readability of tests.
437
        creds = self.makeCredentials(
438
            'no-such-user', self.authserver.valid_key)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
439
        return self.assertLoginError(
6789.11.4 by Michael Hudson
move checker creation to helper method
440
            checker, creds, 'No such Launchpad account: no-such-user')
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
441
442
    def test_noKeys(self):
443
        # When you sign into an existing account with no SSH keys, the SSH
6789.11.5 by Michael Hudson
increase readability of tests.
444
        # server informs you that the account has no keys.
6789.11.4 by Michael Hudson
move checker creation to helper method
445
        checker = self.makeChecker()
6789.11.5 by Michael Hudson
increase readability of tests.
446
        creds = self.makeCredentials(
447
            self.authserver.no_key_user, self.authserver.valid_key)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
448
        return self.assertLoginError(
6789.11.4 by Michael Hudson
move checker creation to helper method
449
            checker, creds,
5977.5.3 by Jonathan Lange
Clean up lint.
450
            "Launchpad user %r doesn't have a registered SSH key"
6789.11.5 by Michael Hudson
increase readability of tests.
451
            % self.authserver.no_key_user)
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
452
453
    def test_wrongKey(self):
454
        # When you sign into an existing account using the wrong key, you
6789.11.5 by Michael Hudson
increase readability of tests.
455
        # are *not* informed of the wrong key. This is because SSH often
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
456
        # tries several keys as part of normal operation.
6789.11.4 by Michael Hudson
move checker creation to helper method
457
        checker = self.makeChecker()
6789.11.5 by Michael Hudson
increase readability of tests.
458
        creds = self.makeCredentials(
459
            self.authserver.valid_user, 'invalid key')
460
        # We cannot use assertLoginError because we are checking that we fail
461
        # with UnauthorizedLogin and not its subclass
462
        # UserDisplayedUnauthorizedLogin.
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
463
        d = assert_fails_with(
6789.11.4 by Michael Hudson
move checker creation to helper method
464
            checker.requestAvatarId(creds),
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
465
            UnauthorizedLogin)
466
        d.addCallback(
467
            lambda exception:
468
            self.failIf(isinstance(exception,
7483.1.22 by Jonathan Lange
All that's left in the server module is authentication, so call it auth.
469
                                   auth.UserDisplayedUnauthorizedLogin),
4292.1.32 by Jonathan Lange
Split the SSH tests into a separate module.
470
                        "Should not be a UserDisplayedUnauthorizedLogin"))
471
        return d
472
8137.6.13 by Michael Hudson
better tests, documentation
473
    def test_successful_with_second_key_calls_authserver_once(self):
474
        # It is normal in SSH authentication to be presented with a number of
475
        # keys.  When the valid key is presented after some invalid ones (a)
476
        # the login succeeds and (b) only one call is made to the authserver
477
        # to retrieve the user's details.
478
        checker = self.makeChecker()
479
        mind = auth.UserDetailsMind()
480
        wrong_key_creds = self.makeCredentials(
481
            self.authserver.valid_user, 'invalid key', mind)
482
        right_key_creds = self.makeCredentials(
483
            self.authserver.valid_user, self.authserver.valid_key, mind)
484
        d = checker.requestAvatarId(wrong_key_creds)
485
        def try_second_key(failure):
486
            failure.trap(UnauthorizedLogin)
487
            return checker.requestAvatarId(right_key_creds)
488
        d.addErrback(try_second_key)
489
        d.addCallback(self.assertEqual, self.authserver.valid_user)
490
        def check_one_call(r):
491
            self.assertEqual(
492
                [self.authserver.valid_user], self.authserver.calls)
493
            return r
494
        d.addCallback(check_one_call)
495
        return d
496
8137.6.15 by Michael Hudson
failing test
497
    def test_noSuchUser_with_two_keys_calls_authserver_once(self):
498
        # When more than one key is presented for a username that does not
499
        # exist, only one call is made to the authserver.
500
        checker = self.makeChecker()
501
        mind = auth.UserDetailsMind()
502
        creds_1 = self.makeCredentials(
503
            'invalid-user', 'invalid key 1', mind)
504
        creds_2 = self.makeCredentials(
505
            'invalid-user', 'invalid key 2', mind)
506
        d = checker.requestAvatarId(creds_1)
507
        def try_second_key(failure):
11705.2.1 by Jonathan Lange
Convert tests to use testtools and the new deferred support rather than trial.
508
            return assert_fails_with(
8137.6.15 by Michael Hudson
failing test
509
                checker.requestAvatarId(creds_2),
510
                UnauthorizedLogin)
511
        d.addErrback(try_second_key)
512
        def check_one_call(r):
513
            self.assertEqual(
514
                ['invalid-user'], self.authserver.calls)
515
            return r
516
        d.addCallback(check_one_call)
517
        return d