~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/codehosting/safe_open.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-15 10:09:59 UTC
  • mto: This revision was merged to the branch mainline in revision 14522.
  • Revision ID: jelmer@canonical.com-20111215100959-1y8wjq8g3pv7rw0m
Re-merge always loading of foreign plugins.

Show diffs side-by-side

added added

removed removed

Lines of Context:
7
7
 
8
8
import threading
9
9
 
10
 
from bzrlib import urlutils
 
10
from bzrlib import (
 
11
    trace,
 
12
    errors,
 
13
    urlutils,
 
14
    )
11
15
from bzrlib.branch import Branch
12
 
from bzrlib.bzrdir import BzrDir
 
16
from bzrlib.bzrdir import (
 
17
    BzrProber,
 
18
    RemoteBzrProber,
 
19
    )
13
20
from lazr.uri import URI
 
21
from bzrlib.transport import (
 
22
    do_catching_redirections,
 
23
    get_transport,
 
24
    )
14
25
 
15
26
 
16
27
__all__ = [
29
40
# TODO JelmerVernooij 2011-08-06: This module is generic enough to be
30
41
# in bzrlib, and may be of use to others. bug=850843
31
42
 
 
43
# These are the default probers that SafeBranchOpener will try to use,
 
44
# unless a different set was specified.
 
45
 
 
46
DEFAULT_PROBERS = [BzrProber, RemoteBzrProber]
32
47
 
33
48
class BadUrl(Exception):
34
49
    """Tried to access a branch from a bad URL."""
184
199
 
185
200
    _threading_data = threading.local()
186
201
 
187
 
    def __init__(self, policy):
 
202
    def __init__(self, policy, probers=None):
 
203
        """Create a new SafeBranchOpener.
 
204
 
 
205
        :param policy: The opener policy to use.
 
206
        :param probers: Optional list of probers to allow.
 
207
            Defaults to local and remote bzr probers.
 
208
        """
188
209
        self.policy = policy
189
210
        self._seen_urls = set()
 
211
        if probers is None:
 
212
            self.probers = list(DEFAULT_PROBERS)
 
213
        else:
 
214
            self.probers = probers
190
215
 
191
216
    @classmethod
192
217
    def install_hook(cls):
205
230
            cls.transformFallbackLocationHook,
206
231
            'SafeBranchOpener.transformFallbackLocationHook')
207
232
 
208
 
    def checkAndFollowBranchReference(self, url, open_dir=None):
 
233
    def checkAndFollowBranchReference(self, url):
209
234
        """Check URL (and possibly the referenced URL) for safety.
210
235
 
211
236
        This method checks that `url` passes the policy's `checkOneURL`
214
239
        also -- recursively, until a real branch is found.
215
240
 
216
241
        :param url: URL to check
217
 
        :param open_dir: Optional function to use for opening control
218
 
            directories (defaults to BzrDir.open)
219
242
        :raise BranchLoopError: If the branch references form a loop.
220
243
        :raise BranchReferenceForbidden: If this opener forbids branch
221
244
            references.
225
248
                raise BranchLoopError()
226
249
            self._seen_urls.add(url)
227
250
            self.policy.checkOneURL(url)
228
 
            next_url = self.followReference(url, open_dir=open_dir)
 
251
            next_url = self.followReference(url)
229
252
            if next_url is None:
230
253
                return url
231
254
            url = next_url
246
269
            return url
247
270
        new_url, check = opener.policy.transformFallbackLocation(branch, url)
248
271
        if check:
249
 
            return opener.checkAndFollowBranchReference(new_url,
250
 
                getattr(cls._threading_data, "open_dir"))
 
272
            return opener.checkAndFollowBranchReference(new_url)
251
273
        else:
252
274
            return new_url
253
275
 
254
276
    def runWithTransformFallbackLocationHookInstalled(
255
 
            self, open_dir, callable, *args, **kw):
 
277
            self, callable, *args, **kw):
256
278
        assert (self.transformFallbackLocationHook in
257
279
                Branch.hooks['transform_fallback_location'])
258
280
        self._threading_data.opener = self
259
 
        self._threading_data.open_dir = open_dir
260
281
        try:
261
282
            return callable(*args, **kw)
262
283
        finally:
263
 
            del self._threading_data.open_dir
264
284
            del self._threading_data.opener
265
285
            # We reset _seen_urls here to avoid multiple calls to open giving
266
286
            # spurious loop exceptions.
267
287
            self._seen_urls = set()
268
288
 
269
 
    def followReference(self, url, open_dir=None):
 
289
    def followReference(self, url):
270
290
        """Get the branch-reference value at the specified url.
271
291
 
272
292
        This exists as a separate method only to be overriden in unit tests.
273
293
        """
274
 
        if open_dir is None:
275
 
            open_dir = BzrDir.open
276
 
        bzrdir = open_dir(url)
 
294
        bzrdir = self._open_dir(url)
277
295
        return bzrdir.get_branch_reference()
278
296
 
279
 
    def open(self, url, open_dir=None):
 
297
    def _open_dir(self, url):
 
298
        """Simple BzrDir.open clone that only uses specific probers.
 
299
 
 
300
        :param url: URL to open
 
301
        :return: ControlDir instance
 
302
        """
 
303
        def redirected(transport, e, redirection_notice):
 
304
            self.policy.checkOneURL(e.target)
 
305
            redirected_transport = transport._redirected_to(
 
306
                e.source, e.target)
 
307
            if redirected_transport is None:
 
308
                raise errors.NotBranchError(e.source)
 
309
            trace.note('%s is%s redirected to %s',
 
310
                 transport.base, e.permanently, redirected_transport.base)
 
311
            return redirected_transport
 
312
 
 
313
        def find_format(transport):
 
314
            last_error = errors.NotBranchError(transport.base)
 
315
            for prober_kls in self.probers:
 
316
                prober = prober_kls()
 
317
                try:
 
318
                    return transport, prober.probe_transport(transport)
 
319
                except errors.NotBranchError, e:
 
320
                    last_error = e
 
321
            else:
 
322
                raise last_error
 
323
        transport = get_transport(url)
 
324
        transport, format = do_catching_redirections(find_format, transport,
 
325
            redirected)
 
326
        return format.open(transport)
 
327
 
 
328
    def open(self, url):
280
329
        """Open the Bazaar branch at url, first checking for safety.
281
330
 
282
331
        What safety means is defined by a subclasses `followReference` and
283
332
        `checkOneURL` methods.
284
 
 
285
 
        :param open_dir: Optional function to use for opening control
286
 
            directories (defaults to BzrDir.open)
287
333
        """
288
 
        url = self.checkAndFollowBranchReference(url, open_dir=open_dir)
289
 
        if open_dir is None:
290
 
            open_dir = BzrDir.open
 
334
        url = self.checkAndFollowBranchReference(url)
291
335
 
292
336
        def open_branch(url):
293
 
            dir = open_dir(url)
 
337
            dir = self._open_dir(url)
294
338
            return dir.open_branch()
295
339
        return self.runWithTransformFallbackLocationHookInstalled(
296
 
            open_dir, open_branch, url)
 
340
            open_branch, url)
297
341
 
298
342
 
299
343
def safe_open(allowed_scheme, url):