150
152
return urlutils.join(branch.base, url), self.check
155
class SingleSchemePolicy(BranchOpenPolicy):
156
"""Branch open policy that rejects URLs not on the given scheme."""
158
def __init__(self, allowed_scheme):
159
self.allowed_scheme = allowed_scheme
161
def shouldFollowReferences(self):
164
def transformFallbackLocation(self, branch, url):
165
return urlutils.join(branch.base, url), True
167
def checkOneURL(self, url):
168
"""Check that `url` is safe to open."""
169
if URI(url).scheme != self.allowed_scheme:
153
173
class SafeBranchOpener(object):
154
174
"""Safe branch opener.
176
All locations that are opened (stacked-on branches, references) are
177
checked against a policy object.
156
179
The policy object is expected to have the following methods:
158
181
* shouldFollowReferences
159
182
* transformFallbackLocation
185
_threading_data = threading.local()
162
187
def __init__(self, policy):
163
188
self.policy = policy
164
189
self._seen_urls = set()
166
def checkAndFollowBranchReference(self, url):
192
def install_hook(cls):
193
"""Install the ``transformFallbackLocation`` hook.
195
This is done at module import time, but transformFallbackLocationHook
196
doesn't do anything unless the `_active_openers` threading.Local
197
object has a 'opener' attribute in this thread.
199
This is in a module-level function rather than performed at module
200
level so that it can be called in setUp for testing `SafeBranchOpener`
201
as bzrlib.tests.TestCase.setUp clears hooks.
203
Branch.hooks.install_named_hook(
204
'transform_fallback_location',
205
cls.transformFallbackLocationHook,
206
'SafeBranchOpener.transformFallbackLocationHook')
208
def checkAndFollowBranchReference(self, url, open_dir=None):
167
209
"""Check URL (and possibly the referenced URL) for safety.
169
211
This method checks that `url` passes the policy's `checkOneURL`
180
225
raise BranchLoopError()
181
226
self._seen_urls.add(url)
182
227
self.policy.checkOneURL(url)
183
next_url = self.followReference(url)
228
next_url = self.followReference(url, open_dir=open_dir)
184
229
if next_url is None:
187
232
if not self.policy.shouldFollowReferences():
188
233
raise BranchReferenceForbidden(url)
190
def transformFallbackLocationHook(self, branch, url):
236
def transformFallbackLocationHook(cls, branch, url):
191
237
"""Installed as the 'transform_fallback_location' Branch hook.
193
239
This method calls `transformFallbackLocation` on the policy object and
194
240
either returns the url it provides or passes it back to
195
241
checkAndFollowBranchReference.
197
new_url, check = self.policy.transformFallbackLocation(branch, url)
244
opener = getattr(cls._threading_data, "opener")
245
except AttributeError:
247
new_url, check = opener.policy.transformFallbackLocation(branch, url)
199
return self.checkAndFollowBranchReference(new_url)
249
return opener.checkAndFollowBranchReference(new_url,
250
getattr(cls._threading_data, "open_dir"))
203
254
def runWithTransformFallbackLocationHookInstalled(
204
self, callable, *args, **kw):
205
Branch.hooks.install_named_hook(
206
'transform_fallback_location', self.transformFallbackLocationHook,
207
'SafeBranchOpener.transformFallbackLocationHook')
255
self, open_dir, callable, *args, **kw):
256
assert (self.transformFallbackLocationHook in
257
Branch.hooks['transform_fallback_location'])
258
self._threading_data.opener = self
259
self._threading_data.open_dir = open_dir
209
261
return callable(*args, **kw)
211
# XXX 2008-11-24 MichaelHudson, bug=301472: This is the hacky way
212
# to remove a hook. The linked bug report asks for an API to do
214
Branch.hooks['transform_fallback_location'].remove(
215
self.transformFallbackLocationHook)
263
del self._threading_data.open_dir
264
del self._threading_data.opener
216
265
# We reset _seen_urls here to avoid multiple calls to open giving
217
266
# spurious loop exceptions.
218
267
self._seen_urls = set()
220
def followReference(self, url):
269
def followReference(self, url, open_dir=None):
221
270
"""Get the branch-reference value at the specified url.
223
272
This exists as a separate method only to be overriden in unit tests.
225
bzrdir = BzrDir.open(url)
275
open_dir = BzrDir.open
276
bzrdir = open_dir(url)
226
277
return bzrdir.get_branch_reference()
279
def open(self, url, open_dir=None):
229
280
"""Open the Bazaar branch at url, first checking for safety.
231
282
What safety means is defined by a subclasses `followReference` and
232
283
`checkOneURL` methods.
285
:param open_dir: Optional function to use for opening control
286
directories (defaults to BzrDir.open)
234
url = self.checkAndFollowBranchReference(url)
288
url = self.checkAndFollowBranchReference(url, open_dir=open_dir)
290
open_dir = BzrDir.open
292
def open_branch(url):
294
return dir.open_branch()
235
295
return self.runWithTransformFallbackLocationHookInstalled(
239
class URLChecker(BranchOpenPolicy):
240
"""Branch open policy that rejects URLs not on the given scheme."""
242
def __init__(self, allowed_scheme):
243
self.allowed_scheme = allowed_scheme
245
def shouldFollowReferences(self):
248
def transformFallbackLocation(self, branch, url):
249
return urlutils.join(branch.base, url), True
251
def checkOneURL(self, url):
252
"""Check that `url` is safe to open."""
253
if URI(url).scheme != self.allowed_scheme:
296
open_dir, open_branch, url)
257
299
def safe_open(allowed_scheme, url):