150
152
return urlutils.join(branch.base, url), self.check
155
class SingleSchemePolicy(BranchOpenPolicy):
156
"""Branch open policy that rejects URLs not on the given scheme."""
158
def __init__(self, allowed_scheme):
159
self.allowed_scheme = allowed_scheme
161
def shouldFollowReferences(self):
164
def transformFallbackLocation(self, branch, url):
165
return urlutils.join(branch.base, url), True
167
def checkOneURL(self, url):
168
"""Check that `url` is safe to open."""
169
if URI(url).scheme != self.allowed_scheme:
153
173
class SafeBranchOpener(object):
154
174
"""Safe branch opener.
176
All locations that are opened (stacked-on branches, references) are checked
177
against a policy object.
156
179
The policy object is expected to have the following methods:
158
181
* shouldFollowReferences
159
182
* transformFallbackLocation
185
_threading_data = threading.local()
162
187
def __init__(self, policy):
163
188
self.policy = policy
164
189
self._seen_urls = set()
192
def install_hook(cls):
193
"""Install the ``transformFallbackLocation`` hook.
195
This is done at module import time, but transformFallbackLocationHook
196
doesn't do anything unless the `_active_openers` threading.Local object
197
has a 'opener' attribute in this thread.
199
This is in a module-level function rather than performed at module level
200
so that it can be called in setUp for testing `SafeBranchOpener` as
201
bzrlib.tests.TestCase.setUp clears hooks.
203
Branch.hooks.install_named_hook(
204
'transform_fallback_location',
205
cls.transformFallbackLocationHook,
206
'SafeBranchOpener.transformFallbackLocationHook')
166
208
def checkAndFollowBranchReference(self, url):
167
209
"""Check URL (and possibly the referenced URL) for safety.
187
229
if not self.policy.shouldFollowReferences():
188
230
raise BranchReferenceForbidden(url)
190
def transformFallbackLocationHook(self, branch, url):
233
def transformFallbackLocationHook(cls, branch, url):
191
234
"""Installed as the 'transform_fallback_location' Branch hook.
193
236
This method calls `transformFallbackLocation` on the policy object and
194
237
either returns the url it provides or passes it back to
195
238
checkAndFollowBranchReference.
197
new_url, check = self.policy.transformFallbackLocation(branch, url)
241
opener = getattr(cls._threading_data, "opener")
242
except AttributeError:
244
new_url, check = opener.policy.transformFallbackLocation(branch, url)
199
return self.checkAndFollowBranchReference(new_url)
246
return opener.checkAndFollowBranchReference(new_url)
203
250
def runWithTransformFallbackLocationHookInstalled(
204
251
self, callable, *args, **kw):
205
Branch.hooks.install_named_hook(
206
'transform_fallback_location', self.transformFallbackLocationHook,
207
'SafeBranchOpener.transformFallbackLocationHook')
252
assert (self.transformFallbackLocationHook in
253
Branch.hooks['transform_fallback_location'])
254
self._threading_data.opener = self
209
256
return callable(*args, **kw)
211
# XXX 2008-11-24 MichaelHudson, bug=301472: This is the hacky way
212
# to remove a hook. The linked bug report asks for an API to do
214
Branch.hooks['transform_fallback_location'].remove(
215
self.transformFallbackLocationHook)
258
del self._threading_data.opener
216
259
# We reset _seen_urls here to avoid multiple calls to open giving
217
260
# spurious loop exceptions.
218
261
self._seen_urls = set()
236
279
Branch.open, url)
239
class URLChecker(BranchOpenPolicy):
240
"""Branch open policy that rejects URLs not on the given scheme."""
242
def __init__(self, allowed_scheme):
243
self.allowed_scheme = allowed_scheme
245
def shouldFollowReferences(self):
248
def transformFallbackLocation(self, branch, url):
249
return urlutils.join(branch.base, url), True
251
def checkOneURL(self, url):
252
"""Check that `url` is safe to open."""
253
if URI(url).scheme != self.allowed_scheme:
257
282
def safe_open(allowed_scheme, url):
258
283
"""Open the branch at `url`, only accessing URLs on `allowed_scheme`.
260
285
:raises BadUrl: An attempt was made to open a URL that was not on
261
286
`allowed_scheme`.
263
return SafeBranchOpener(URLChecker(allowed_scheme)).open(url)
288
return SafeBranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
291
SafeBranchOpener.install_hook()