~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/tests/test_controllers.py

  • Committer: Michael Hudson-Doyle
  • Date: 2012-01-08 23:01:10 UTC
  • mto: This revision was merged to the branch mainline in revision 463.
  • Revision ID: michael.hudson@linaro.org-20120108230110-a249w8yrifkakp1q
indicate changes to the executable bit on the revision page

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd.
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from cStringIO import StringIO
 
18
import logging
 
19
import tarfile
 
20
import tempfile
 
21
 
 
22
from paste.fixture import (
 
23
    AppError,
 
24
    )
 
25
from paste.httpexceptions import HTTPServerError
 
26
 
 
27
from testtools.matchers import (
 
28
    Matcher,
 
29
    Mismatch,
 
30
    )
 
31
 
 
32
from bzrlib import errors
 
33
import simplejson
 
34
 
1
35
from loggerhead.apps.branch import BranchWSGIApp
2
36
from loggerhead.controllers.annotate_ui import AnnotateUI
3
37
from loggerhead.controllers.inventory_ui import InventoryUI
4
 
from loggerhead.controllers.revision_ui import RevisionUI
5
 
from loggerhead.tests.test_simple import BasicTests
6
 
from loggerhead import util
 
38
from loggerhead.tests.test_simple import (
 
39
    BasicTests,
 
40
    consume_app,
 
41
    TestWithSimpleTree,
 
42
    )
7
43
 
8
44
 
9
45
class TestInventoryUI(BasicTests):
10
46
 
 
47
    def make_bzrbranch_for_tree_shape(self, shape):
 
48
        tree = self.make_branch_and_tree('.')
 
49
        self.build_tree(shape)
 
50
        tree.smart_add([])
 
51
        tree.commit('')
 
52
        self.addCleanup(tree.branch.lock_read().unlock)
 
53
        return tree.branch
 
54
 
11
55
    def make_bzrbranch_and_inventory_ui_for_tree_shape(self, shape):
12
 
        tree = self.make_branch_and_tree('.')
13
 
        self.build_tree(shape)
14
 
        tree.smart_add([])
15
 
        tree.commit('')
16
 
        tree.branch.lock_read()
17
 
        self.addCleanup(tree.branch.unlock)
18
 
        branch_app = BranchWSGIApp(tree.branch)
19
 
        return tree.branch, InventoryUI(branch_app, branch_app.get_history)
 
56
        branch = self.make_bzrbranch_for_tree_shape(shape)
 
57
        branch_app = self.make_branch_app(branch)
 
58
        return branch, InventoryUI(branch_app, branch_app.get_history)
20
59
 
21
60
    def test_get_filelist(self):
22
61
        bzrbranch, inv_ui = self.make_bzrbranch_and_inventory_ui_for_tree_shape(
23
62
            ['filename'])
24
63
        inv = bzrbranch.repository.get_inventory(bzrbranch.last_revision())
25
 
        self.assertEqual(1, len(inv_ui.get_filelist(inv, '', 'filename')))
 
64
        self.assertEqual(1, len(inv_ui.get_filelist(inv, '', 'filename', 'head')))
 
65
 
 
66
    def test_smoke(self):
 
67
        bzrbranch, inv_ui = self.make_bzrbranch_and_inventory_ui_for_tree_shape(
 
68
            ['filename'])
 
69
        start, content = consume_app(inv_ui,
 
70
            {'SCRIPT_NAME': '/files', 'PATH_INFO': ''})
 
71
        self.assertEqual(('200 OK', [('Content-Type', 'text/html')], None),
 
72
                         start)
 
73
        self.assertContainsRe(content, 'filename')
 
74
 
 
75
    def test_no_content_for_HEAD(self):
 
76
        bzrbranch, inv_ui = self.make_bzrbranch_and_inventory_ui_for_tree_shape(
 
77
            ['filename'])
 
78
        start, content = consume_app(inv_ui,
 
79
            {'SCRIPT_NAME': '/files', 'PATH_INFO': '',
 
80
             'REQUEST_METHOD': 'HEAD'})
 
81
        self.assertEqual(('200 OK', [('Content-Type', 'text/html')], None),
 
82
                         start)
 
83
        self.assertEqual('', content)
 
84
 
 
85
    def test_get_values_smoke(self):
 
86
        branch = self.make_bzrbranch_for_tree_shape(['a-file'])
 
87
        branch_app = self.make_branch_app(branch)
 
88
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/files'}
 
89
        inv_ui = branch_app.lookup_app(env)
 
90
        inv_ui.parse_args(env)
 
91
        values = inv_ui.get_values('', {}, {})
 
92
        self.assertEqual('a-file', values['filelist'][0].filename)
 
93
 
 
94
    def test_json_render_smoke(self):
 
95
        branch = self.make_bzrbranch_for_tree_shape(['a-file'])
 
96
        branch_app = self.make_branch_app(branch)
 
97
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/+json/files'}
 
98
        inv_ui = branch_app.lookup_app(env)
 
99
        self.assertOkJsonResponse(inv_ui, env)
26
100
 
27
101
 
28
102
class TestRevisionUI(BasicTests):
29
103
 
30
 
    def make_bzrbranch_and_revision_ui_for_tree_shapes(self, shape1, shape2):
 
104
    def make_branch_app_for_revision_ui(self, shape1, shape2):
31
105
        tree = self.make_branch_and_tree('.')
32
106
        self.build_tree_contents(shape1)
33
107
        tree.smart_add([])
34
 
        tree.commit('')
 
108
        tree.commit('msg 1', rev_id='rev-1')
35
109
        self.build_tree_contents(shape2)
36
110
        tree.smart_add([])
37
 
        tree.commit('')
38
 
        tree.branch.lock_read()
39
 
        self.addCleanup(tree.branch.unlock)
40
 
        branch_app = BranchWSGIApp(tree.branch)
41
 
        branch_app._environ = {
42
 
            'wsgi.url_scheme':'',
43
 
            'SERVER_NAME':'',
44
 
            'SERVER_PORT':'80',
45
 
            }
46
 
        branch_app._url_base = ''
47
 
        branch_app.friendly_name = ''
48
 
        return tree.branch, RevisionUI(branch_app, branch_app.get_history)
 
111
        tree.commit('msg 2', rev_id='rev-2')
 
112
        branch = tree.branch
 
113
        self.addCleanup(branch.lock_read().unlock)
 
114
        return self.make_branch_app(branch)
49
115
 
50
116
    def test_get_values(self):
51
 
        branch, rev_ui = self.make_bzrbranch_and_revision_ui_for_tree_shapes(
52
 
            [], [])
53
 
        rev_ui.args = ['2']
54
 
        util.set_context({})
55
 
        self.assertIsInstance(
56
 
            rev_ui.get_values('', {}, []),
57
 
            dict)
 
117
        branch_app = self.make_branch_app_for_revision_ui([], [])
 
118
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/revision/2'}
 
119
        rev_ui = branch_app.lookup_app(env)
 
120
        rev_ui.parse_args(env)
 
121
        self.assertIsInstance(rev_ui.get_values('', {}, []), dict)
 
122
 
 
123
    def test_add_template_values(self):
 
124
        branch_app = self.make_branch_app_for_revision_ui(
 
125
                [('file', 'content\n')], [('file', 'new content\n')])
 
126
        env = {'SCRIPT_NAME': '/',
 
127
               'PATH_INFO': '/revision/1/non-existent-file',
 
128
               'QUERY_STRING':'start_revid=1' }
 
129
        revision_ui = branch_app.lookup_app(env)
 
130
        path = revision_ui.parse_args(env)
 
131
        values = revision_ui.get_values(path, revision_ui.kwargs, {})
 
132
        revision_ui.add_template_values(values)
 
133
        self.assertIs(values['diff_chunks'], None)
 
134
 
 
135
    def test_get_values_smoke(self):
 
136
        branch_app = self.make_branch_app_for_revision_ui(
 
137
                [('file', 'content\n'), ('other-file', 'other\n')],
 
138
                [('file', 'new content\n')])
 
139
        env = {'SCRIPT_NAME': '/',
 
140
               'PATH_INFO': '/revision/head:'}
 
141
        revision_ui = branch_app.lookup_app(env)
 
142
        revision_ui.parse_args(env)
 
143
        values = revision_ui.get_values('', {}, {})
 
144
 
 
145
        self.assertEqual(values['revid'], 'rev-2')
 
146
        self.assertEqual(values['change'].comment, 'msg 2')
 
147
        self.assertEqual(values['file_changes'].modified[0].filename, 'file')
 
148
        self.assertEqual(values['merged_in'], None)
 
149
 
 
150
    def test_json_render_smoke(self):
 
151
        branch_app = self.make_branch_app_for_revision_ui(
 
152
                [('file', 'content\n'), ('other-file', 'other\n')],
 
153
                [('file', 'new content\n')])
 
154
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/+json/revision/head:'}
 
155
        revision_ui = branch_app.lookup_app(env)
 
156
        self.assertOkJsonResponse(revision_ui, env)
 
157
 
58
158
 
59
159
 
60
160
class TestAnnotateUI(BasicTests):
61
161
 
62
162
    def make_annotate_ui_for_file_history(self, file_id, rev_ids_texts):
63
163
        tree = self.make_branch_and_tree('.')
64
 
        open('filename', 'w').write('')
 
164
        self.build_tree_contents([('filename', '')])
65
165
        tree.add(['filename'], [file_id])
66
 
        for rev_id, text in rev_ids_texts:
67
 
            open('filename', 'w').write(text)
68
 
            tree.commit(rev_id=rev_id, message='.')
 
166
        for rev_id, text, message in rev_ids_texts:
 
167
            self.build_tree_contents([('filename', text)])
 
168
            tree.commit(rev_id=rev_id, message=message)
69
169
        tree.branch.lock_read()
70
170
        self.addCleanup(tree.branch.unlock)
71
 
        branch_app = BranchWSGIApp(tree.branch)
 
171
        branch_app = BranchWSGIApp(tree.branch, friendly_name='test_name')
72
172
        return AnnotateUI(branch_app, branch_app.get_history)
73
173
 
74
174
    def test_annotate_file(self):
75
 
        history = [('rev1', 'old\nold\n'), ('rev2', 'new\nold\n')]
 
175
        history = [('rev1', 'old\nold\n', '.'), ('rev2', 'new\nold\n', '.')]
76
176
        ann_ui = self.make_annotate_ui_for_file_history('file_id', history)
77
 
        annotated = list(ann_ui.annotate_file('file_id', 'rev2'))
 
177
        # A lot of this state is set up by __call__, but we'll do it directly
 
178
        # here.
 
179
        ann_ui.args = ['rev2']
 
180
        annotate_info = ann_ui.get_values('filename',
 
181
            kwargs={'file_id': 'file_id'}, headers={})
 
182
        annotated = annotate_info['annotated']
78
183
        self.assertEqual(2, len(annotated))
79
 
        self.assertEqual('2', annotated[0].change.revno)
80
 
        self.assertEqual('1', annotated[1].change.revno)
 
184
        self.assertEqual('2', annotated[1].change.revno)
 
185
        self.assertEqual('1', annotated[2].change.revno)
 
186
 
 
187
    def test_annotate_empty_comment(self):
 
188
        # Testing empty comment handling without breaking
 
189
        history = [('rev1', 'old\nold\n', '.'), ('rev2', 'new\nold\n', '')]
 
190
        ann_ui = self.make_annotate_ui_for_file_history('file_id', history)
 
191
        ann_ui.args = ['rev2']
 
192
        annotate_info = ann_ui.get_values('filename',
 
193
            kwargs={'file_id': 'file_id'}, headers={})
 
194
 
 
195
    def test_annotate_file_zero_sized(self):
 
196
        # Test against a zero-sized file without breaking. No annotation must be present.
 
197
        history = [('rev1', '' , '.')]
 
198
        ann_ui = self.make_annotate_ui_for_file_history('file_id', history)
 
199
        ann_ui.args = ['rev1']
 
200
        annotate_info = ann_ui.get_values('filename',
 
201
            kwargs={'file_id': 'file_id'}, headers={})
 
202
        annotated = annotate_info['annotated']
 
203
        self.assertEqual(0, len(annotated))
 
204
 
 
205
 
 
206
class TestFileDiffUI(BasicTests):
 
207
 
 
208
    def make_branch_app_for_filediff_ui(self):
 
209
        builder = self.make_branch_builder('branch')
 
210
        builder.start_series()
 
211
        builder.build_snapshot('rev-1-id', None, [
 
212
            ('add', ('', 'root-id', 'directory', '')),
 
213
            ('add', ('filename', 'f-id', 'file', 'content\n'))],
 
214
            message="First commit.")
 
215
        builder.build_snapshot('rev-2-id', None, [
 
216
            ('modify', ('f-id', 'new content\n'))])
 
217
        builder.finish_series()
 
218
        branch = builder.get_branch()
 
219
        self.addCleanup(branch.lock_read().unlock)
 
220
        return self.make_branch_app(branch)
 
221
 
 
222
    def test_get_values_smoke(self):
 
223
        branch_app = self.make_branch_app_for_filediff_ui()
 
224
        env = {'SCRIPT_NAME': '/',
 
225
               'PATH_INFO': '/+filediff/rev-2-id/rev-1-id/f-id'}
 
226
        filediff_ui = branch_app.lookup_app(env)
 
227
        filediff_ui.parse_args(env)
 
228
        values = filediff_ui.get_values('', {}, {})
 
229
        chunks = values['chunks']
 
230
        self.assertEqual('insert', chunks[0].diff[1].type)
 
231
        self.assertEqual('new content', chunks[0].diff[1].line)
 
232
 
 
233
    def test_json_render_smoke(self):
 
234
        branch_app = self.make_branch_app_for_filediff_ui()
 
235
        env = {'SCRIPT_NAME': '/',
 
236
               'PATH_INFO': '/+json/+filediff/rev-2-id/rev-1-id/f-id'}
 
237
        filediff_ui = branch_app.lookup_app(env)
 
238
        self.assertOkJsonResponse(filediff_ui, env)
 
239
 
 
240
 
 
241
class TestRevLogUI(BasicTests):
 
242
 
 
243
    def make_branch_app_for_revlog_ui(self):
 
244
        builder = self.make_branch_builder('branch')
 
245
        builder.start_series()
 
246
        builder.build_snapshot('rev-id', None, [
 
247
            ('add', ('', 'root-id', 'directory', '')),
 
248
            ('add', ('filename', 'f-id', 'file', 'content\n'))],
 
249
            message="First commit.")
 
250
        builder.finish_series()
 
251
        branch = builder.get_branch()
 
252
        self.addCleanup(branch.lock_read().unlock)
 
253
        return self.make_branch_app(branch)
 
254
 
 
255
    def test_get_values_smoke(self):
 
256
        branch_app = self.make_branch_app_for_revlog_ui()
 
257
        env = {'SCRIPT_NAME': '/',
 
258
               'PATH_INFO': '/+revlog/rev-id'}
 
259
        revlog_ui = branch_app.lookup_app(env)
 
260
        revlog_ui.parse_args(env)
 
261
        values = revlog_ui.get_values('', {}, {})
 
262
        self.assertEqual(values['file_changes'].added[1].filename, 'filename')
 
263
        self.assertEqual(values['entry'].comment, "First commit.")
 
264
 
 
265
    def test_json_render_smoke(self):
 
266
        branch_app = self.make_branch_app_for_revlog_ui()
 
267
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/+json/+revlog/rev-id'}
 
268
        revlog_ui = branch_app.lookup_app(env)
 
269
        self.assertOkJsonResponse(revlog_ui, env)
 
270
 
 
271
 
 
272
class TestControllerHooks(BasicTests):
 
273
 
 
274
    def test_dummy_hook(self):
 
275
        return
 
276
        # A hook that returns None doesn't influence the searching for
 
277
        # a controller.
 
278
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/custom'}
 
279
        myhook = lambda app, environ: None
 
280
        branch = self.make_branch('.')
 
281
        self.addCleanup(branch.lock_read().unlock)
 
282
        app = self.make_branch_app(branch)
 
283
        self.addCleanup(BranchWSGIApp.hooks.uninstall_named_hook, 'controller',
 
284
            'captain hook')
 
285
        BranchWSGIApp.hooks.install_named_hook('controller', myhook, "captain hook")
 
286
        self.assertRaises(KeyError, app.lookup_app, env)
 
287
 
 
288
    def test_working_hook(self):
 
289
        # A hook can provide an app to use for a particular request.
 
290
        env = {'SCRIPT_NAME': '', 'PATH_INFO': '/custom'}
 
291
        myhook = lambda app, environ: "I am hooked"
 
292
        branch = self.make_branch('.')
 
293
        self.addCleanup(branch.lock_read().unlock)
 
294
        app = self.make_branch_app(branch)
 
295
        self.addCleanup(BranchWSGIApp.hooks.uninstall_named_hook, 'controller',
 
296
            'captain hook')
 
297
        BranchWSGIApp.hooks.install_named_hook('controller', myhook, "captain hook")
 
298
        self.assertEquals("I am hooked", app.lookup_app(env))
 
299
 
 
300
 
 
301
class IsTarfile(Matcher):
 
302
 
 
303
    def __init__(self, compression):
 
304
        self.compression = compression
 
305
 
 
306
    def match(self, content_bytes):
 
307
        f = tempfile.NamedTemporaryFile()
 
308
        try:
 
309
            f.write(content_bytes)
 
310
            f.flush()
 
311
            tarfile.open(f.name, mode='r|' + self.compression)
 
312
        finally:
 
313
            f.close()
 
314
 
 
315
 
 
316
class MatchesTarballHeaders(Matcher):
 
317
 
 
318
    def __init__(self, expect_filename):
 
319
        self.expect_filename = expect_filename
 
320
 
 
321
    def match(self, response):
 
322
        # Maybe the c-t should be more specific, but this is probably good for
 
323
        # making sure it gets saved without the client trying to decompress it
 
324
        # or anything.
 
325
        if (response.header('Content-Type') == 'application/octet-stream'
 
326
            and response.header('Content-Disposition') ==
 
327
            "attachment; filename*=utf-8''" + self.expect_filename):
 
328
            pass
 
329
        else:
 
330
            return Mismatch("wrong response headers: %r"
 
331
                % response.headers)
 
332
 
 
333
 
 
334
class TestDownloadTarballUI(TestWithSimpleTree):
 
335
 
 
336
    def setUp(self):
 
337
        super(TestDownloadTarballUI, self).setUp()
 
338
 
 
339
    def test_download_tarball(self):
 
340
        # Tarball downloads are enabled by default.
 
341
        app = self.setUpLoggerhead()
 
342
        response = app.get('/tarball')
 
343
        self.assertThat(
 
344
            response.body,
 
345
            IsTarfile('gz'))
 
346
        self.assertThat(
 
347
            response,
 
348
            MatchesTarballHeaders('branch.tgz'))
 
349
 
 
350
    def test_download_tarball_of_version(self):
 
351
        app = self.setUpLoggerhead()
 
352
        response = app.get('/tarball/1')
 
353
        self.assertThat(
 
354
            response.body,
 
355
            IsTarfile('gz'))
 
356
        self.assertThat(
 
357
            response,
 
358
            MatchesTarballHeaders('branch-r1.tgz'))
 
359
 
 
360
    def test_download_tarball_forbidden(self):
 
361
        app = self.setUpLoggerhead(export_tarballs=False)
 
362
        e = self.assertRaises(
 
363
            AppError, 
 
364
            app.get,
 
365
            '/tarball')
 
366
        self.assertContainsRe(
 
367
            str(e),
 
368
            '(?s).*403 Forbidden'
 
369
            '.*Tarball downloads are not allowed')