1
# Copyright (C) 2008-2011 Canonical Ltd.
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from cStringIO import StringIO
22
from paste.fixture import (
25
from paste.httpexceptions import HTTPServerError
27
from testtools.matchers import (
32
from bzrlib import errors
1
35
from loggerhead.apps.branch import BranchWSGIApp
2
36
from loggerhead.controllers.annotate_ui import AnnotateUI
3
37
from loggerhead.controllers.inventory_ui import InventoryUI
4
from loggerhead.controllers.revision_ui import RevisionUI
5
from loggerhead.tests.test_simple import BasicTests
6
from loggerhead import util
38
from loggerhead.tests.test_simple import (
9
45
class TestInventoryUI(BasicTests):
47
def make_bzrbranch_for_tree_shape(self, shape):
48
tree = self.make_branch_and_tree('.')
49
self.build_tree(shape)
52
self.addCleanup(tree.branch.lock_read().unlock)
11
55
def make_bzrbranch_and_inventory_ui_for_tree_shape(self, shape):
12
tree = self.make_branch_and_tree('.')
13
self.build_tree(shape)
16
tree.branch.lock_read()
17
self.addCleanup(tree.branch.unlock)
18
branch_app = BranchWSGIApp(tree.branch)
19
return tree.branch, InventoryUI(branch_app, branch_app.get_history)
56
branch = self.make_bzrbranch_for_tree_shape(shape)
57
branch_app = self.make_branch_app(branch)
58
return branch, InventoryUI(branch_app, branch_app.get_history)
21
60
def test_get_filelist(self):
22
61
bzrbranch, inv_ui = self.make_bzrbranch_and_inventory_ui_for_tree_shape(
24
63
inv = bzrbranch.repository.get_inventory(bzrbranch.last_revision())
25
self.assertEqual(1, len(inv_ui.get_filelist(inv, '', 'filename')))
64
self.assertEqual(1, len(inv_ui.get_filelist(inv, '', 'filename', 'head')))
67
bzrbranch, inv_ui = self.make_bzrbranch_and_inventory_ui_for_tree_shape(
69
start, content = consume_app(inv_ui,
70
{'SCRIPT_NAME': '/files', 'PATH_INFO': ''})
71
self.assertEqual(('200 OK', [('Content-Type', 'text/html')], None),
73
self.assertContainsRe(content, 'filename')
75
def test_no_content_for_HEAD(self):
76
bzrbranch, inv_ui = self.make_bzrbranch_and_inventory_ui_for_tree_shape(
78
start, content = consume_app(inv_ui,
79
{'SCRIPT_NAME': '/files', 'PATH_INFO': '',
80
'REQUEST_METHOD': 'HEAD'})
81
self.assertEqual(('200 OK', [('Content-Type', 'text/html')], None),
83
self.assertEqual('', content)
85
def test_get_values_smoke(self):
86
branch = self.make_bzrbranch_for_tree_shape(['a-file'])
87
branch_app = self.make_branch_app(branch)
88
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/files'}
89
inv_ui = branch_app.lookup_app(env)
90
inv_ui.parse_args(env)
91
values = inv_ui.get_values('', {}, {})
92
self.assertEqual('a-file', values['filelist'][0].filename)
94
def test_json_render_smoke(self):
95
branch = self.make_bzrbranch_for_tree_shape(['a-file'])
96
branch_app = self.make_branch_app(branch)
97
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/+json/files'}
98
inv_ui = branch_app.lookup_app(env)
99
self.assertOkJsonResponse(inv_ui, env)
28
102
class TestRevisionUI(BasicTests):
30
def make_bzrbranch_and_revision_ui_for_tree_shapes(self, shape1, shape2):
104
def make_branch_app_for_revision_ui(self, shape1, shape2):
31
105
tree = self.make_branch_and_tree('.')
32
106
self.build_tree_contents(shape1)
33
107
tree.smart_add([])
108
tree.commit('msg 1', rev_id='rev-1')
35
109
self.build_tree_contents(shape2)
36
110
tree.smart_add([])
38
tree.branch.lock_read()
39
self.addCleanup(tree.branch.unlock)
40
branch_app = BranchWSGIApp(tree.branch)
41
branch_app._environ = {
46
branch_app._url_base = ''
47
branch_app.friendly_name = ''
48
return tree.branch, RevisionUI(branch_app, branch_app.get_history)
111
tree.commit('msg 2', rev_id='rev-2')
113
self.addCleanup(branch.lock_read().unlock)
114
return self.make_branch_app(branch)
50
116
def test_get_values(self):
51
branch, rev_ui = self.make_bzrbranch_and_revision_ui_for_tree_shapes(
55
self.assertIsInstance(
56
rev_ui.get_values('', {}, []),
117
branch_app = self.make_branch_app_for_revision_ui([], [])
118
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/revision/2'}
119
rev_ui = branch_app.lookup_app(env)
120
rev_ui.parse_args(env)
121
self.assertIsInstance(rev_ui.get_values('', {}, []), dict)
123
def test_add_template_values(self):
124
branch_app = self.make_branch_app_for_revision_ui(
125
[('file', 'content\n')], [('file', 'new content\n')])
126
env = {'SCRIPT_NAME': '/',
127
'PATH_INFO': '/revision/1/non-existent-file',
128
'QUERY_STRING':'start_revid=1' }
129
revision_ui = branch_app.lookup_app(env)
130
path = revision_ui.parse_args(env)
131
values = revision_ui.get_values(path, revision_ui.kwargs, {})
132
revision_ui.add_template_values(values)
133
self.assertIs(values['diff_chunks'], None)
135
def test_get_values_smoke(self):
136
branch_app = self.make_branch_app_for_revision_ui(
137
[('file', 'content\n'), ('other-file', 'other\n')],
138
[('file', 'new content\n')])
139
env = {'SCRIPT_NAME': '/',
140
'PATH_INFO': '/revision/head:'}
141
revision_ui = branch_app.lookup_app(env)
142
revision_ui.parse_args(env)
143
values = revision_ui.get_values('', {}, {})
145
self.assertEqual(values['revid'], 'rev-2')
146
self.assertEqual(values['change'].comment, 'msg 2')
147
self.assertEqual(values['file_changes'].modified[0].filename, 'file')
148
self.assertEqual(values['merged_in'], None)
150
def test_json_render_smoke(self):
151
branch_app = self.make_branch_app_for_revision_ui(
152
[('file', 'content\n'), ('other-file', 'other\n')],
153
[('file', 'new content\n')])
154
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/+json/revision/head:'}
155
revision_ui = branch_app.lookup_app(env)
156
self.assertOkJsonResponse(revision_ui, env)
60
160
class TestAnnotateUI(BasicTests):
62
162
def make_annotate_ui_for_file_history(self, file_id, rev_ids_texts):
63
163
tree = self.make_branch_and_tree('.')
64
open('filename', 'w').write('')
164
self.build_tree_contents([('filename', '')])
65
165
tree.add(['filename'], [file_id])
66
for rev_id, text in rev_ids_texts:
67
open('filename', 'w').write(text)
68
tree.commit(rev_id=rev_id, message='.')
166
for rev_id, text, message in rev_ids_texts:
167
self.build_tree_contents([('filename', text)])
168
tree.commit(rev_id=rev_id, message=message)
69
169
tree.branch.lock_read()
70
170
self.addCleanup(tree.branch.unlock)
71
branch_app = BranchWSGIApp(tree.branch)
171
branch_app = BranchWSGIApp(tree.branch, friendly_name='test_name')
72
172
return AnnotateUI(branch_app, branch_app.get_history)
74
174
def test_annotate_file(self):
75
history = [('rev1', 'old\nold\n'), ('rev2', 'new\nold\n')]
175
history = [('rev1', 'old\nold\n', '.'), ('rev2', 'new\nold\n', '.')]
76
176
ann_ui = self.make_annotate_ui_for_file_history('file_id', history)
77
annotated = list(ann_ui.annotate_file('file_id', 'rev2'))
177
# A lot of this state is set up by __call__, but we'll do it directly
179
ann_ui.args = ['rev2']
180
annotate_info = ann_ui.get_values('filename',
181
kwargs={'file_id': 'file_id'}, headers={})
182
annotated = annotate_info['annotated']
78
183
self.assertEqual(2, len(annotated))
79
self.assertEqual('2', annotated[0].change.revno)
80
self.assertEqual('1', annotated[1].change.revno)
184
self.assertEqual('2', annotated[1].change.revno)
185
self.assertEqual('1', annotated[2].change.revno)
187
def test_annotate_empty_comment(self):
188
# Testing empty comment handling without breaking
189
history = [('rev1', 'old\nold\n', '.'), ('rev2', 'new\nold\n', '')]
190
ann_ui = self.make_annotate_ui_for_file_history('file_id', history)
191
ann_ui.args = ['rev2']
192
annotate_info = ann_ui.get_values('filename',
193
kwargs={'file_id': 'file_id'}, headers={})
195
def test_annotate_file_zero_sized(self):
196
# Test against a zero-sized file without breaking. No annotation must be present.
197
history = [('rev1', '' , '.')]
198
ann_ui = self.make_annotate_ui_for_file_history('file_id', history)
199
ann_ui.args = ['rev1']
200
annotate_info = ann_ui.get_values('filename',
201
kwargs={'file_id': 'file_id'}, headers={})
202
annotated = annotate_info['annotated']
203
self.assertEqual(0, len(annotated))
206
class TestFileDiffUI(BasicTests):
208
def make_branch_app_for_filediff_ui(self):
209
builder = self.make_branch_builder('branch')
210
builder.start_series()
211
builder.build_snapshot('rev-1-id', None, [
212
('add', ('', 'root-id', 'directory', '')),
213
('add', ('filename', 'f-id', 'file', 'content\n'))],
214
message="First commit.")
215
builder.build_snapshot('rev-2-id', None, [
216
('modify', ('f-id', 'new content\n'))])
217
builder.finish_series()
218
branch = builder.get_branch()
219
self.addCleanup(branch.lock_read().unlock)
220
return self.make_branch_app(branch)
222
def test_get_values_smoke(self):
223
branch_app = self.make_branch_app_for_filediff_ui()
224
env = {'SCRIPT_NAME': '/',
225
'PATH_INFO': '/+filediff/rev-2-id/rev-1-id/f-id'}
226
filediff_ui = branch_app.lookup_app(env)
227
filediff_ui.parse_args(env)
228
values = filediff_ui.get_values('', {}, {})
229
chunks = values['chunks']
230
self.assertEqual('insert', chunks[0].diff[1].type)
231
self.assertEqual('new content', chunks[0].diff[1].line)
233
def test_json_render_smoke(self):
234
branch_app = self.make_branch_app_for_filediff_ui()
235
env = {'SCRIPT_NAME': '/',
236
'PATH_INFO': '/+json/+filediff/rev-2-id/rev-1-id/f-id'}
237
filediff_ui = branch_app.lookup_app(env)
238
self.assertOkJsonResponse(filediff_ui, env)
241
class TestRevLogUI(BasicTests):
243
def make_branch_app_for_revlog_ui(self):
244
builder = self.make_branch_builder('branch')
245
builder.start_series()
246
builder.build_snapshot('rev-id', None, [
247
('add', ('', 'root-id', 'directory', '')),
248
('add', ('filename', 'f-id', 'file', 'content\n'))],
249
message="First commit.")
250
builder.finish_series()
251
branch = builder.get_branch()
252
self.addCleanup(branch.lock_read().unlock)
253
return self.make_branch_app(branch)
255
def test_get_values_smoke(self):
256
branch_app = self.make_branch_app_for_revlog_ui()
257
env = {'SCRIPT_NAME': '/',
258
'PATH_INFO': '/+revlog/rev-id'}
259
revlog_ui = branch_app.lookup_app(env)
260
revlog_ui.parse_args(env)
261
values = revlog_ui.get_values('', {}, {})
262
self.assertEqual(values['file_changes'].added[1].filename, 'filename')
263
self.assertEqual(values['entry'].comment, "First commit.")
265
def test_json_render_smoke(self):
266
branch_app = self.make_branch_app_for_revlog_ui()
267
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/+json/+revlog/rev-id'}
268
revlog_ui = branch_app.lookup_app(env)
269
self.assertOkJsonResponse(revlog_ui, env)
272
class TestControllerHooks(BasicTests):
274
def test_dummy_hook(self):
276
# A hook that returns None doesn't influence the searching for
278
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/custom'}
279
myhook = lambda app, environ: None
280
branch = self.make_branch('.')
281
self.addCleanup(branch.lock_read().unlock)
282
app = self.make_branch_app(branch)
283
self.addCleanup(BranchWSGIApp.hooks.uninstall_named_hook, 'controller',
285
BranchWSGIApp.hooks.install_named_hook('controller', myhook, "captain hook")
286
self.assertRaises(KeyError, app.lookup_app, env)
288
def test_working_hook(self):
289
# A hook can provide an app to use for a particular request.
290
env = {'SCRIPT_NAME': '', 'PATH_INFO': '/custom'}
291
myhook = lambda app, environ: "I am hooked"
292
branch = self.make_branch('.')
293
self.addCleanup(branch.lock_read().unlock)
294
app = self.make_branch_app(branch)
295
self.addCleanup(BranchWSGIApp.hooks.uninstall_named_hook, 'controller',
297
BranchWSGIApp.hooks.install_named_hook('controller', myhook, "captain hook")
298
self.assertEquals("I am hooked", app.lookup_app(env))
301
class IsTarfile(Matcher):
303
def __init__(self, compression):
304
self.compression = compression
306
def match(self, content_bytes):
307
f = tempfile.NamedTemporaryFile()
309
f.write(content_bytes)
311
tarfile.open(f.name, mode='r|' + self.compression)
316
class MatchesTarballHeaders(Matcher):
318
def __init__(self, expect_filename):
319
self.expect_filename = expect_filename
321
def match(self, response):
322
# Maybe the c-t should be more specific, but this is probably good for
323
# making sure it gets saved without the client trying to decompress it
325
if (response.header('Content-Type') == 'application/octet-stream'
326
and response.header('Content-Disposition') ==
327
"attachment; filename*=utf-8''" + self.expect_filename):
330
return Mismatch("wrong response headers: %r"
334
class TestDownloadTarballUI(TestWithSimpleTree):
337
super(TestDownloadTarballUI, self).setUp()
339
def test_download_tarball(self):
340
# Tarball downloads are enabled by default.
341
app = self.setUpLoggerhead()
342
response = app.get('/tarball')
348
MatchesTarballHeaders('branch.tgz'))
350
def test_download_tarball_of_version(self):
351
app = self.setUpLoggerhead()
352
response = app.get('/tarball/1')
358
MatchesTarballHeaders('branch-r1.tgz'))
360
def test_download_tarball_forbidden(self):
361
app = self.setUpLoggerhead(export_tarballs=False)
362
e = self.assertRaises(
366
self.assertContainsRe(
368
'(?s).*403 Forbidden'
369
'.*Tarball downloads are not allowed')