14
14
from bzrlib.bzrdir import (
19
from bzrlib.controldir import ControlDirFormat
20
from bzrlib.errors import NotBranchError
18
21
from bzrlib.repofmt.knitpack_repo import RepositoryFormatKnitPack1
19
22
from bzrlib.tests import TestCaseWithTransport
20
23
from bzrlib.transport import chroot
55
58
self._reference_values[references[i]] = references[i + 1]
56
59
self.follow_reference_calls = []
58
def followReference(self, url, open_dir=None):
61
def followReference(self, url):
59
62
self.follow_reference_calls.append(url)
60
63
return self._reference_values[url]
68
71
def testCheckInitialURL(self):
69
72
# checkSource rejects all URLs that are not allowed.
70
73
opener = self.makeBranchOpener(None, [], set(['a']))
71
self.assertRaises(BadUrl, opener.checkAndFollowBranchReference, 'a')
75
BadUrl, opener.checkAndFollowBranchReference, 'a')
73
77
def testNotReference(self):
74
78
# When branch references are forbidden, checkAndFollowBranchReference
75
79
# does not raise on non-references.
76
80
opener = self.makeBranchOpener(False, ['a', None])
77
self.assertEquals('a', opener.checkAndFollowBranchReference('a'))
82
'a', opener.checkAndFollowBranchReference('a'))
78
83
self.assertEquals(['a'], opener.follow_reference_calls)
80
85
def testBranchReferenceForbidden(self):
92
97
# is allowed and the source URL points to a branch reference to a
93
98
# permitted location.
94
99
opener = self.makeBranchOpener(True, ['a', 'b', None])
95
self.assertEquals('b', opener.checkAndFollowBranchReference('a'))
101
'b', opener.checkAndFollowBranchReference('a'))
96
102
self.assertEquals(['a', 'b'], opener.follow_reference_calls)
98
104
def testCheckReferencedURLs(self):
101
107
opener = self.makeBranchOpener(
102
108
True, ['a', 'b', None], unsafe_urls=set('b'))
103
self.assertRaises(BadUrl, opener.checkAndFollowBranchReference, 'a')
110
BadUrl, opener.checkAndFollowBranchReference, 'a')
104
111
self.assertEquals(['a'], opener.follow_reference_calls)
106
113
def testSelfReferencingBranch(self):
123
130
self.assertEquals(['a', 'b'], opener.follow_reference_calls)
133
class TrackingProber(BzrProber):
134
"""Subclass of BzrProber which tracks URLs it has been asked to open."""
139
def probe_transport(klass, transport):
140
klass.seen_urls.append(transport.base)
141
return BzrProber.probe_transport(transport)
126
144
class TestSafeBranchOpenerStacking(TestCaseWithTransport):
129
147
super(TestSafeBranchOpenerStacking, self).setUp()
130
148
SafeBranchOpener.install_hook()
132
def makeBranchOpener(self, allowed_urls):
150
def makeBranchOpener(self, allowed_urls, probers=None):
133
151
policy = WhitelistPolicy(True, allowed_urls, True)
134
return SafeBranchOpener(policy)
152
return SafeBranchOpener(policy, probers)
136
154
def makeBranch(self, path, branch_format, repository_format):
137
155
"""Make a Bazaar branch at 'path' with the given formats."""
141
159
repository_format.initialize(bzrdir)
142
160
return bzrdir.create_branch()
162
def testProbers(self):
163
# Only the specified probers should be used
164
b = self.make_branch('branch')
165
opener = self.makeBranchOpener([b.base], probers=[])
166
self.assertRaises(NotBranchError, opener.open, b.base)
167
opener = self.makeBranchOpener([b.base], probers=[BzrProber])
168
self.assertEquals(b.base, opener.open(b.base).base)
170
def testDefaultProbers(self):
171
# If no probers are specified to the constructor
172
# of SafeBranchOpener, then a safe set will be used,
173
# rather than all probers registered in bzr.
174
self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)
175
ControlDirFormat.register_prober(TrackingProber)
176
# Open a location without any branches, so that all probers are
178
# First, check that the TrackingProber tracks correctly.
179
TrackingProber.seen_urls = []
180
opener = self.makeBranchOpener(["."], probers=[TrackingProber])
181
self.assertRaises(NotBranchError, opener.open, ".")
182
self.assertEquals(1, len(TrackingProber.seen_urls))
183
TrackingProber.seen_urls = []
184
# And make sure it's registered in such a way that BzrDir.open would
186
self.assertRaises(NotBranchError, BzrDir.open, ".")
187
self.assertEquals(1, len(TrackingProber.seen_urls))
188
TrackingProber.seen_urls = []
189
# Make sure that SafeBranchOpener doesn't use it if no
190
# probers were specified
191
opener = self.makeBranchOpener(["."])
192
self.assertRaises(NotBranchError, opener.open, ".")
193
self.assertEquals(0, len(TrackingProber.seen_urls))
144
195
def testAllowedURL(self):
145
196
# checkSource does not raise an exception for branches stacked on
146
197
# branches with allowed URLs.
235
286
a = self.make_branch('a', format='2a')
236
287
b = self.make_branch('b', format='2a')
237
288
b.set_stacked_on_url(a.base)
242
return BzrDir.open(url)
244
opener = self.makeBranchOpener([a.base, b.base])
245
opener.open(b.base, open_dir=open_dir)
246
self.assertEquals(seen_urls, set([b.base, a.base]))
290
TrackingProber.seen_urls = []
291
opener = self.makeBranchOpener(
292
[a.base, b.base], probers=[TrackingProber])
295
set(TrackingProber.seen_urls), set([b.base, a.base]))
248
297
def testCustomOpenerWithBranchReference(self):
249
298
# A custom function for opening a control dir can be specified.
250
299
a = self.make_branch('a', format='2a')
251
300
b_dir = self.make_bzrdir('b')
252
301
b = BranchReferenceFormat().initialize(b_dir, target_branch=a)
257
return BzrDir.open(url)
259
opener = self.makeBranchOpener([a.base, b.base])
260
opener.open(b.base, open_dir=open_dir)
261
self.assertEquals(seen_urls, set([b.base, a.base]))
302
TrackingProber.seen_urls = []
303
opener = self.makeBranchOpener(
304
[a.base, b.base], probers=[TrackingProber])
307
set(TrackingProber.seen_urls), set([b.base, a.base]))
264
310
class TestSafeOpen(TestCaseWithTransport):