~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright 2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Tests for the web resources of the testkeyserver."""

__metaclass__ = type

import os
import shutil

from testtools.deferredruntest import AsynchronousDeferredRunTest

from twisted.internet.endpoints import serverFromString
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.server import Site

from lp.testing import TestCase
from lp.testing.keyserver.harness import KEYS_DIR
from lp.testing.keyserver.web import KeyServerResource
from lp.testing.matchers import DocTestMatches


class RegularCallbackExecuted(Exception):
    """Raised if a regular Twisted callback is called when the request
    is supposed to return an HTTP error."""


class TestWebResources(TestCase):

    run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=2)

    def setUpKeysDirectory(self):
        path = self.makeTemporaryDirectory()
        path = os.path.join(path, 'keys')
        shutil.copytree(KEYS_DIR, path)
        return path

    def makeService(self):
        """Run a test key server on whatever port we have available."""
        from twisted.internet import reactor
        resource = KeyServerResource(self.setUpKeysDirectory())
        site = Site(resource)
        endpoint = serverFromString(reactor, 'tcp:0')
        return endpoint.listen(site)

    def fetchResource(self, listening_port, path):
        """GET the content at 'path' from the web server at 'listening_port'.
        """
        url = 'http://localhost:%s/%s' % (
            listening_port.getHost().port,
            path.lstrip('/'))
        return getPage(url)

    def getURL(self, path):
        """Start a test key server and get the content at 'path'."""
        d = self.makeService()

        def service_started(port):
            self.addCleanup(port.stopListening)
            return self.fetchResource(port, path)
        return d.addCallback(service_started)

    def assertContentMatches(self, path, content):
        """Assert that the key server content at 'path' matches 'content'."""
        d = self.getURL(path)
        return d.addCallback(self.assertThat, DocTestMatches(content))

    def assertRaises500ErrorForKeyNotFound(self, path):
        """Assert that the test server returns a 500 response
        for attempts to retrieve an unknown key.
        ."""
        d = self.getURL(path)

        def regular_execution_callback(content):
            # A really Twisted(tm) error check:
            #
            # This callback should _not_ be called, because setting
            # the HTTP status code to 500 in Lookup.processRequest()
            # prevents this. On the other hand, if the status code is
            # _not_ set, the callback check_error_details below is
            # is not executed, and the test would simply pass, while
            # it shouldn't.
            #
            # So we should assert that this callback is not executed.
            # But raising an exception here leads again to the
            # execution of check_error_details() -- for this exception.
            # So we can't simply call self.fail(), but we can raise
            # a custom exception and we can check in the error
            # callback that this exception was _not_ raised.
            raise RegularCallbackExecuted

        def check_error_details(failure):
            if isinstance(failure.value, RegularCallbackExecuted):
                self.fail('Response was not an HTTP error response.')
            if not isinstance(failure, Failure):
                raise failure
            self.assertEqual('500', failure.value.status)
            self.assertEqual(
                '<html><head><title>Error handling request</title></head>\n'
                '<body><h1>Error handling request</h1>'
                'Error handling request: No keys found</body></html>',
                failure.value.response)

        d.addCallback(regular_execution_callback)
        return d.addErrback(check_error_details)

    def test_index_lookup(self):
        # A key index lookup form via GET.
        return self.assertContentMatches(
            '/pks/lookup?op=index&search=0xDFD20543',
            '''\
<html>
...
<title>Results for Key 0xDFD20543</title>
...
pub  1024D/DFD20543 2005-04-13 Sample Person (revoked) &lt;sample.revoked@canonical.com&gt;
...
''')

    def test_content_lookup(self):
        # A key content lookup form via GET.
        return self.assertContentMatches(
            '/pks/lookup?op=get&'
            'search=0xA419AE861E88BC9E04B9C26FBA2B9389DFD20543',
            '''\
<html>
...
<title>Results for Key 0xA419AE861E88BC9E04B9C26FBA2B9389DFD20543</title>
...
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.9 (GNU/Linux)
<BLANKLINE>
mQGiBEJdmOcRBADkNJPTBuCIefBdRAhvWyD9SSVHh8GHQWS7l9sRLEsirQkKz1yB
...
''')

    def test_lookup_key_id(self):
        # We can also request a key ID instead of a fingerprint, and it will
        # glob for the fingerprint.
        return self.assertContentMatches(
            '/pks/lookup?op=get&search=0xDFD20543',
            '''\
<html>
...
<title>Results for Key 0xDFD20543</title>
...
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.9 (GNU/Linux)
<BLANKLINE>
mQGiBEJdmOcRBADkNJPTBuCIefBdRAhvWyD9SSVHh8GHQWS7l9sRLEsirQkKz1yB
...
''')

    def test_nonexistent_key(self):
        # If we request a nonexistent key, we get a nice error.
        return self.assertRaises500ErrorForKeyNotFound(
            '/pks/lookup?op=get&search=0xDFD20544')

    def test_add_key(self):
        # A key submit form via POST (see doc/gpghandler.txt for more
        # information).
        return self.assertContentMatches(
            '/pks/add',
            '''\
<html>
...
<title>Submit a key</title>
...
''')