~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/codehosting/safe_open.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-15 10:01:15 UTC
  • mfrom: (14521 devel)
  • mto: This revision was merged to the branch mainline in revision 14522.
  • Revision ID: jelmer@canonical.com-20111215100115-y6bg928fwdqd16ej
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
7
7
 
8
8
import threading
9
9
 
10
 
from bzrlib import (
11
 
    trace,
12
 
    errors,
13
 
    urlutils,
14
 
    )
 
10
from bzrlib import urlutils
15
11
from bzrlib.branch import Branch
16
 
from bzrlib.bzrdir import (
17
 
    BzrProber,
18
 
    RemoteBzrProber,
19
 
    )
 
12
from bzrlib.bzrdir import BzrDir
20
13
from lazr.uri import URI
21
 
from bzrlib.transport import (
22
 
    do_catching_redirections,
23
 
    get_transport,
24
 
    )
25
14
 
26
15
 
27
16
__all__ = [
40
29
# TODO JelmerVernooij 2011-08-06: This module is generic enough to be
41
30
# in bzrlib, and may be of use to others. bug=850843
42
31
 
43
 
# These are the default probers that SafeBranchOpener will try to use,
44
 
# unless a different set was specified.
45
 
 
46
 
DEFAULT_PROBERS = [BzrProber, RemoteBzrProber]
47
32
 
48
33
class BadUrl(Exception):
49
34
    """Tried to access a branch from a bad URL."""
199
184
 
200
185
    _threading_data = threading.local()
201
186
 
202
 
    def __init__(self, policy, probers=None):
203
 
        """Create a new SafeBranchOpener.
204
 
 
205
 
        :param policy: The opener policy to use.
206
 
        :param probers: Optional list of probers to allow.
207
 
            Defaults to local and remote bzr probers.
208
 
        """
 
187
    def __init__(self, policy):
209
188
        self.policy = policy
210
189
        self._seen_urls = set()
211
 
        if probers is None:
212
 
            self.probers = list(DEFAULT_PROBERS)
213
 
        else:
214
 
            self.probers = probers
215
190
 
216
191
    @classmethod
217
192
    def install_hook(cls):
230
205
            cls.transformFallbackLocationHook,
231
206
            'SafeBranchOpener.transformFallbackLocationHook')
232
207
 
233
 
    def checkAndFollowBranchReference(self, url):
 
208
    def checkAndFollowBranchReference(self, url, open_dir=None):
234
209
        """Check URL (and possibly the referenced URL) for safety.
235
210
 
236
211
        This method checks that `url` passes the policy's `checkOneURL`
239
214
        also -- recursively, until a real branch is found.
240
215
 
241
216
        :param url: URL to check
 
217
        :param open_dir: Optional function to use for opening control
 
218
            directories (defaults to BzrDir.open)
242
219
        :raise BranchLoopError: If the branch references form a loop.
243
220
        :raise BranchReferenceForbidden: If this opener forbids branch
244
221
            references.
248
225
                raise BranchLoopError()
249
226
            self._seen_urls.add(url)
250
227
            self.policy.checkOneURL(url)
251
 
            next_url = self.followReference(url)
 
228
            next_url = self.followReference(url, open_dir=open_dir)
252
229
            if next_url is None:
253
230
                return url
254
231
            url = next_url
269
246
            return url
270
247
        new_url, check = opener.policy.transformFallbackLocation(branch, url)
271
248
        if check:
272
 
            return opener.checkAndFollowBranchReference(new_url)
 
249
            return opener.checkAndFollowBranchReference(new_url,
 
250
                getattr(cls._threading_data, "open_dir"))
273
251
        else:
274
252
            return new_url
275
253
 
276
254
    def runWithTransformFallbackLocationHookInstalled(
277
 
            self, callable, *args, **kw):
 
255
            self, open_dir, callable, *args, **kw):
278
256
        assert (self.transformFallbackLocationHook in
279
257
                Branch.hooks['transform_fallback_location'])
280
258
        self._threading_data.opener = self
 
259
        self._threading_data.open_dir = open_dir
281
260
        try:
282
261
            return callable(*args, **kw)
283
262
        finally:
 
263
            del self._threading_data.open_dir
284
264
            del self._threading_data.opener
285
265
            # We reset _seen_urls here to avoid multiple calls to open giving
286
266
            # spurious loop exceptions.
287
267
            self._seen_urls = set()
288
268
 
289
 
    def followReference(self, url):
 
269
    def followReference(self, url, open_dir=None):
290
270
        """Get the branch-reference value at the specified url.
291
271
 
292
272
        This exists as a separate method only to be overriden in unit tests.
293
273
        """
294
 
        bzrdir = self._open_dir(url)
 
274
        if open_dir is None:
 
275
            open_dir = BzrDir.open
 
276
        bzrdir = open_dir(url)
295
277
        return bzrdir.get_branch_reference()
296
278
 
297
 
    def _open_dir(self, url):
298
 
        """Simple BzrDir.open clone that only uses specific probers.
299
 
 
300
 
        :param url: URL to open
301
 
        :return: ControlDir instance
302
 
        """
303
 
        def redirected(transport, e, redirection_notice):
304
 
            self.policy.checkOneURL(e.target)
305
 
            redirected_transport = transport._redirected_to(
306
 
                e.source, e.target)
307
 
            if redirected_transport is None:
308
 
                raise errors.NotBranchError(e.source)
309
 
            trace.note('%s is%s redirected to %s',
310
 
                 transport.base, e.permanently, redirected_transport.base)
311
 
            return redirected_transport
312
 
 
313
 
        def find_format(transport):
314
 
            last_error = errors.NotBranchError(transport.base)
315
 
            for prober_kls in self.probers:
316
 
                prober = prober_kls()
317
 
                try:
318
 
                    return transport, prober.probe_transport(transport)
319
 
                except errors.NotBranchError, e:
320
 
                    last_error = e
321
 
            else:
322
 
                raise last_error
323
 
        transport = get_transport(url)
324
 
        transport, format = do_catching_redirections(find_format, transport,
325
 
            redirected)
326
 
        return format.open(transport)
327
 
 
328
 
    def open(self, url):
 
279
    def open(self, url, open_dir=None):
329
280
        """Open the Bazaar branch at url, first checking for safety.
330
281
 
331
282
        What safety means is defined by a subclasses `followReference` and
332
283
        `checkOneURL` methods.
 
284
 
 
285
        :param open_dir: Optional function to use for opening control
 
286
            directories (defaults to BzrDir.open)
333
287
        """
334
 
        url = self.checkAndFollowBranchReference(url)
 
288
        url = self.checkAndFollowBranchReference(url, open_dir=open_dir)
 
289
        if open_dir is None:
 
290
            open_dir = BzrDir.open
335
291
 
336
292
        def open_branch(url):
337
 
            dir = self._open_dir(url)
 
293
            dir = open_dir(url)
338
294
            return dir.open_branch()
339
295
        return self.runWithTransformFallbackLocationHookInstalled(
340
 
            open_branch, url)
 
296
            open_dir, open_branch, url)
341
297
 
342
298
 
343
299
def safe_open(allowed_scheme, url):