1
# Copyright (C) 2011 Canonical Ltd.
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Direct tests of the loggerhead/history.py module"""
20
from bzrlib.foreign import (
26
from datetime import datetime
27
from bzrlib import tag, tests
29
from loggerhead import history as _mod_history
32
class TestCaseWithExamples(tests.TestCaseWithMemoryTransport):
34
def make_linear_ancestry(self):
41
builder = self.make_branch_builder('branch')
42
builder.start_series()
43
builder.build_snapshot('rev-1', None, [
44
('add', ('', 'root-id', 'directory', None))])
45
builder.build_snapshot('rev-2', ['rev-1'], [])
46
builder.build_snapshot('rev-3', ['rev-2'], [])
47
builder.finish_series()
48
b = builder.get_branch()
49
self.addCleanup(b.lock_read().unlock)
50
return _mod_history.History(b, {})
52
def make_long_linear_ancestry(self):
53
builder = self.make_branch_builder('branch')
54
builder.start_series()
55
builder.build_snapshot('A', None, [
56
('add', ('', 'root-id', 'directory', None))])
57
for r in "BCDEFGHIJKLMNOPQRSTUVWXYZ":
58
builder.build_snapshot(r, None, [])
59
builder.finish_series()
60
b = builder.get_branch()
61
self.addCleanup(b.lock_read().unlock)
62
return _mod_history.History(b, {})
64
def make_merged_ancestry(self):
71
builder = self.make_branch_builder('branch')
72
builder.start_series()
73
builder.build_snapshot('rev-1', None, [
74
('add', ('', 'root-id', 'directory', None))])
75
builder.build_snapshot('rev-2', ['rev-1'], [])
76
builder.build_snapshot('rev-3', ['rev-1', 'rev-2'], [])
77
builder.finish_series()
78
b = builder.get_branch()
79
self.addCleanup(b.lock_read().unlock)
80
return _mod_history.History(b, {})
82
def make_deep_merged_ancestry(self):
93
builder = self.make_branch_builder('branch')
94
builder.start_series()
95
builder.build_snapshot('A', None, [
96
('add', ('', 'root-id', 'directory', None))])
97
builder.build_snapshot('B', ['A'], [])
98
builder.build_snapshot('C', ['A'], [])
99
builder.build_snapshot('D', ['C'], [])
100
builder.build_snapshot('E', ['C', 'D'], [])
101
builder.build_snapshot('F', ['B', 'E'], [])
102
builder.finish_series()
103
b = builder.get_branch()
104
self.addCleanup(b.lock_read().unlock)
105
return _mod_history.History(b, {})
107
def assertRevidsFrom(self, expected, history, search_revs, tip_rev):
108
self.assertEqual(expected,
109
list(history.get_revids_from(search_revs, tip_rev)))
112
class _DictProxy(object):
114
def __init__(self, d):
116
self._accessed = set()
117
self.__setitem__ = d.__setitem__
119
def __getitem__(self, name):
120
self._accessed.add(name)
127
def track_rev_info_accesses(h):
128
"""Track __getitem__ access to History._rev_info,
130
:return: set of items accessed
132
h._rev_info = _DictProxy(h._rev_info)
133
return h._rev_info._accessed
136
class TestHistoryGetRevidsFrom(TestCaseWithExamples):
138
def test_get_revids_from_simple_mainline(self):
139
history = self.make_linear_ancestry()
140
self.assertRevidsFrom(['rev-3', 'rev-2', 'rev-1'],
141
history, None, 'rev-3')
143
def test_get_revids_from_merged_mainline(self):
144
history = self.make_merged_ancestry()
145
self.assertRevidsFrom(['rev-3', 'rev-1'],
146
history, None, 'rev-3')
148
def test_get_revids_given_one_rev(self):
149
history = self.make_merged_ancestry()
150
# rev-3 was the first mainline revision to see rev-2.
151
self.assertRevidsFrom(['rev-3'], history, ['rev-2'], 'rev-3')
153
def test_get_revids_deep_ancestry(self):
154
history = self.make_deep_merged_ancestry()
155
self.assertRevidsFrom(['F'], history, ['F'], 'F')
156
self.assertRevidsFrom(['F'], history, ['E'], 'F')
157
self.assertRevidsFrom(['F'], history, ['D'], 'F')
158
self.assertRevidsFrom(['F'], history, ['C'], 'F')
159
self.assertRevidsFrom(['B'], history, ['B'], 'F')
160
self.assertRevidsFrom(['A'], history, ['A'], 'F')
162
def test_get_revids_doesnt_over_produce_simple_mainline(self):
163
# get_revids_from shouldn't walk the whole ancestry just to get the
164
# answers for the first few revisions.
165
history = self.make_long_linear_ancestry()
166
accessed = track_rev_info_accesses(history)
167
result = history.get_revids_from(None, 'Z')
168
self.assertEqual(set(), accessed)
169
self.assertEqual('Z', result.next())
170
# We already know 'Z' because we passed it in.
171
self.assertEqual(set(), accessed)
172
self.assertEqual('Y', result.next())
173
self.assertEqual(set([history._rev_indices['Z']]), accessed)
175
def test_get_revids_doesnt_over_produce_for_merges(self):
176
# get_revids_from shouldn't walk the whole ancestry just to get the
177
# answers for the first few revisions.
178
history = self.make_long_linear_ancestry()
179
accessed = track_rev_info_accesses(history)
180
result = history.get_revids_from(['X', 'V'], 'Z')
181
self.assertEqual(set(), accessed)
182
self.assertEqual('X', result.next())
183
# We access 'W' because we are checking that W wasn't merged into X.
184
# The important bit is that we aren't getting the whole ancestry.
185
self.assertEqual(set([history._rev_indices[x] for x in 'ZYXW']),
187
self.assertEqual('V', result.next())
188
self.assertEqual(set([history._rev_indices[x] for x in 'ZYXWVU']),
190
self.assertRaises(StopIteration, result.next)
191
self.assertEqual(set([history._rev_indices[x] for x in 'ZYXWVU']),
196
class TestHistoryChangeFromRevision(tests.TestCaseWithTransport):
198
def make_single_commit(self):
199
tree = self.make_branch_and_tree('test')
200
rev_id = tree.commit('Commit Message', timestamp=1299838474.317,
201
timezone=3600, committer='Joe Example <joe@example.com>',
203
self.addCleanup(tree.branch.lock_write().unlock)
204
rev = tree.branch.repository.get_revision(rev_id)
205
history = _mod_history.History(tree.branch, {})
208
def test_simple(self):
209
history, rev = self.make_single_commit()
210
change = history._change_from_revision(rev)
211
self.assertEqual(rev.revision_id, change.revid)
212
self.assertEqual(datetime.fromtimestamp(1299838474.317),
214
self.assertEqual(datetime.utcfromtimestamp(1299838474.317),
216
self.assertEqual(['Joe Example <joe@example.com>'],
218
self.assertEqual('test', change.branch_nick)
219
self.assertEqual('Commit Message', change.short_comment)
220
self.assertEqual('Commit Message', change.comment)
221
self.assertEqual(['Commit Message'], change.comment_clean)
222
self.assertEqual([], change.parents)
223
self.assertEqual([], change.bugs)
224
self.assertEqual(None, change.tags)
227
history, rev = self.make_single_commit()
229
b.tags.set_tag('tag-1', rev.revision_id)
230
b.tags.set_tag('tag-2', rev.revision_id)
231
b.tags.set_tag('Tag-10', rev.revision_id)
232
change = history._change_from_revision(rev)
233
# If available, tags are 'naturally' sorted. (sorting numbers in order,
234
# and ignoring case, etc.)
235
if getattr(tag, 'sort_natural', None) is not None:
236
self.assertEqual('tag-1, tag-2, Tag-10', change.tags)
238
self.assertEqual('Tag-10, tag-1, tag-2', change.tags)
240
def test_committer_vs_authors(self):
241
tree = self.make_branch_and_tree('test')
242
rev_id = tree.commit('Commit Message', timestamp=1299838474.317,
243
timezone=3600, committer='Joe Example <joe@example.com>',
244
revprops={'authors': u'A Author <aauthor@example.com>\n'
245
u'B Author <bauthor@example.com>'})
246
self.addCleanup(tree.branch.lock_write().unlock)
247
rev = tree.branch.repository.get_revision(rev_id)
248
history = _mod_history.History(tree.branch, {})
249
change = history._change_from_revision(rev)
250
self.assertEqual(u'Joe Example <joe@example.com>',
252
self.assertEqual([u'A Author <aauthor@example.com>',
253
u'B Author <bauthor@example.com>'],
257
class TestHistory_IterateSufficiently(tests.TestCase):
259
def assertIterate(self, expected, iterable, stop_at, extra_rev_count):
260
self.assertEqual(expected, _mod_history.History._iterate_sufficiently(
261
iterable, stop_at, extra_rev_count))
263
def test_iter_no_extra(self):
264
lst = list('abcdefghijklmnopqrstuvwxyz')
265
self.assertIterate(['a', 'b', 'c'], iter(lst), 'c', 0)
266
self.assertIterate(['a', 'b', 'c', 'd'], iter(lst), 'd', 0)
268
def test_iter_not_found(self):
269
# If the key in question isn't found, we just exhaust the list
270
lst = list('abcdefghijklmnopqrstuvwxyz')
271
self.assertIterate(lst, iter(lst), 'not-there', 0)
273
def test_iter_with_extra(self):
274
lst = list('abcdefghijklmnopqrstuvwxyz')
275
self.assertIterate(['a', 'b', 'c'], iter(lst), 'b', 1)
276
self.assertIterate(['a', 'b', 'c', 'd', 'e'], iter(lst), 'c', 2)
278
def test_iter_with_too_many_extra(self):
279
lst = list('abcdefghijklmnopqrstuvwxyz')
280
self.assertIterate(lst, iter(lst), 'y', 10)
281
self.assertIterate(lst, iter(lst), 'z', 10)
283
def test_iter_with_extra_None(self):
284
lst = list('abcdefghijklmnopqrstuvwxyz')
285
self.assertIterate(lst, iter(lst), 'z', None)
289
class TestHistoryGetView(TestCaseWithExamples):
291
def test_get_view_limited_history(self):
292
# get_view should only load enough history to serve the result, not all
294
history = self.make_long_linear_ancestry()
295
accessed = track_rev_info_accesses(history)
296
revid, start_revid, revid_list = history.get_view('Z', 'Z', None,
298
self.assertEqual(['Z', 'Y', 'X', 'W', 'V', 'U'], revid_list)
299
self.assertEqual('Z', revid)
300
self.assertEqual('Z', start_revid)
301
self.assertEqual(set([history._rev_indices[x] for x in 'ZYXWVU']),
305
class TestHistoryGetChangedUncached(TestCaseWithExamples):
307
def test_native(self):
308
history = self.make_linear_ancestry()
309
changes = history.get_changes_uncached(['rev-1', 'rev-2'])
310
self.assertEquals(2, len(changes))
311
self.assertEquals('rev-1', changes[0].revid)
312
self.assertEquals('rev-2', changes[1].revid)
313
self.assertIs(None, getattr(changes[0], 'foreign_vcs', None))
314
self.assertIs(None, getattr(changes[0], 'foreign_revid', None))
316
def test_foreign(self):
317
# Test with a mocked foreign revision, as it's not possible
318
# to rely on any foreign plugins being installed.
319
history = self.make_linear_ancestry()
320
foreign_vcs = ForeignVcs(None, "vcs")
321
foreign_vcs.show_foreign_revid = repr
322
foreign_rev = ForeignRevision(("uuid", 1234), VcsMapping(foreign_vcs),
323
"revid-in-bzr", message="message",
324
timestamp=234423423.3)
325
change = history._change_from_revision(foreign_rev)
326
self.assertEquals('revid-in-bzr', change.revid)
327
self.assertEquals("('uuid', 1234)", change.foreign_revid)
328
self.assertEquals("vcs", change.foreign_vcs)