~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/codehosting/safe_open.py

  • Committer: Graham Binns
  • Date: 2011-09-07 15:59:13 UTC
  • mto: This revision was merged to the branch mainline in revision 13914.
  • Revision ID: graham@canonical.com-20110907155913-p97tx2e34ysbcgp3
Added an XXX.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
__metaclass__ = type
7
7
 
 
8
import threading
 
9
 
8
10
from bzrlib import urlutils
9
11
from bzrlib.branch import Branch
10
12
from bzrlib.bzrdir import BzrDir
11
 
 
12
13
from lazr.uri import URI
13
14
 
 
15
 
14
16
__all__ = [
15
17
    'AcceptAnythingPolicy',
16
18
    'BadUrl',
150
152
        return urlutils.join(branch.base, url), self.check
151
153
 
152
154
 
 
155
class SingleSchemePolicy(BranchOpenPolicy):
 
156
    """Branch open policy that rejects URLs not on the given scheme."""
 
157
 
 
158
    def __init__(self, allowed_scheme):
 
159
        self.allowed_scheme = allowed_scheme
 
160
 
 
161
    def shouldFollowReferences(self):
 
162
        return True
 
163
 
 
164
    def transformFallbackLocation(self, branch, url):
 
165
        return urlutils.join(branch.base, url), True
 
166
 
 
167
    def checkOneURL(self, url):
 
168
        """Check that `url` is safe to open."""
 
169
        if URI(url).scheme != self.allowed_scheme:
 
170
            raise BadUrl(url)
 
171
 
 
172
 
153
173
class SafeBranchOpener(object):
154
174
    """Safe branch opener.
155
175
 
 
176
    All locations that are opened (stacked-on branches, references) are
 
177
    checked against a policy object.
 
178
 
156
179
    The policy object is expected to have the following methods:
157
180
    * checkOneURL
158
181
    * shouldFollowReferences
159
182
    * transformFallbackLocation
160
183
    """
161
184
 
 
185
    _threading_data = threading.local()
 
186
 
162
187
    def __init__(self, policy):
163
188
        self.policy = policy
164
189
        self._seen_urls = set()
165
190
 
166
 
    def checkAndFollowBranchReference(self, url):
 
191
    @classmethod
 
192
    def install_hook(cls):
 
193
        """Install the ``transformFallbackLocation`` hook.
 
194
 
 
195
        This is done at module import time, but transformFallbackLocationHook
 
196
        doesn't do anything unless the `_active_openers` threading.Local
 
197
        object has a 'opener' attribute in this thread.
 
198
 
 
199
        This is in a module-level function rather than performed at module
 
200
        level so that it can be called in setUp for testing `SafeBranchOpener`
 
201
        as bzrlib.tests.TestCase.setUp clears hooks.
 
202
        """
 
203
        Branch.hooks.install_named_hook(
 
204
            'transform_fallback_location',
 
205
            cls.transformFallbackLocationHook,
 
206
            'SafeBranchOpener.transformFallbackLocationHook')
 
207
 
 
208
    def checkAndFollowBranchReference(self, url, open_dir=None):
167
209
        """Check URL (and possibly the referenced URL) for safety.
168
210
 
169
211
        This method checks that `url` passes the policy's `checkOneURL`
171
213
        references are allowed and whether the reference's URL passes muster
172
214
        also -- recursively, until a real branch is found.
173
215
 
 
216
        :param url: URL to check
 
217
        :param open_dir: Optional function to use for opening control
 
218
            directories (defaults to BzrDir.open)
174
219
        :raise BranchLoopError: If the branch references form a loop.
175
220
        :raise BranchReferenceForbidden: If this opener forbids branch
176
221
            references.
180
225
                raise BranchLoopError()
181
226
            self._seen_urls.add(url)
182
227
            self.policy.checkOneURL(url)
183
 
            next_url = self.followReference(url)
 
228
            next_url = self.followReference(url, open_dir=open_dir)
184
229
            if next_url is None:
185
230
                return url
186
231
            url = next_url
187
232
            if not self.policy.shouldFollowReferences():
188
233
                raise BranchReferenceForbidden(url)
189
234
 
190
 
    def transformFallbackLocationHook(self, branch, url):
 
235
    @classmethod
 
236
    def transformFallbackLocationHook(cls, branch, url):
191
237
        """Installed as the 'transform_fallback_location' Branch hook.
192
238
 
193
239
        This method calls `transformFallbackLocation` on the policy object and
194
240
        either returns the url it provides or passes it back to
195
241
        checkAndFollowBranchReference.
196
242
        """
197
 
        new_url, check = self.policy.transformFallbackLocation(branch, url)
 
243
        try:
 
244
            opener = getattr(cls._threading_data, "opener")
 
245
        except AttributeError:
 
246
            return url
 
247
        new_url, check = opener.policy.transformFallbackLocation(branch, url)
198
248
        if check:
199
 
            return self.checkAndFollowBranchReference(new_url)
 
249
            return opener.checkAndFollowBranchReference(new_url,
 
250
                getattr(cls._threading_data, "open_dir"))
200
251
        else:
201
252
            return new_url
202
253
 
203
254
    def runWithTransformFallbackLocationHookInstalled(
204
 
            self, callable, *args, **kw):
205
 
        Branch.hooks.install_named_hook(
206
 
            'transform_fallback_location', self.transformFallbackLocationHook,
207
 
            'SafeBranchOpener.transformFallbackLocationHook')
 
255
            self, open_dir, callable, *args, **kw):
 
256
        assert (self.transformFallbackLocationHook in
 
257
                Branch.hooks['transform_fallback_location'])
 
258
        self._threading_data.opener = self
 
259
        self._threading_data.open_dir = open_dir
208
260
        try:
209
261
            return callable(*args, **kw)
210
262
        finally:
211
 
            # XXX 2008-11-24 MichaelHudson, bug=301472: This is the hacky way
212
 
            # to remove a hook.  The linked bug report asks for an API to do
213
 
            # it.
214
 
            Branch.hooks['transform_fallback_location'].remove(
215
 
                self.transformFallbackLocationHook)
 
263
            del self._threading_data.open_dir
 
264
            del self._threading_data.opener
216
265
            # We reset _seen_urls here to avoid multiple calls to open giving
217
266
            # spurious loop exceptions.
218
267
            self._seen_urls = set()
219
268
 
220
 
    def followReference(self, url):
 
269
    def followReference(self, url, open_dir=None):
221
270
        """Get the branch-reference value at the specified url.
222
271
 
223
272
        This exists as a separate method only to be overriden in unit tests.
224
273
        """
225
 
        bzrdir = BzrDir.open(url)
 
274
        if open_dir is None:
 
275
            open_dir = BzrDir.open
 
276
        bzrdir = open_dir(url)
226
277
        return bzrdir.get_branch_reference()
227
278
 
228
 
    def open(self, url):
 
279
    def open(self, url, open_dir=None):
229
280
        """Open the Bazaar branch at url, first checking for safety.
230
281
 
231
282
        What safety means is defined by a subclasses `followReference` and
232
283
        `checkOneURL` methods.
 
284
 
 
285
        :param open_dir: Optional function to use for opening control
 
286
            directories (defaults to BzrDir.open)
233
287
        """
234
 
        url = self.checkAndFollowBranchReference(url)
 
288
        url = self.checkAndFollowBranchReference(url, open_dir=open_dir)
 
289
        if open_dir is None:
 
290
            open_dir = BzrDir.open
 
291
 
 
292
        def open_branch(url):
 
293
            dir = open_dir(url)
 
294
            return dir.open_branch()
235
295
        return self.runWithTransformFallbackLocationHookInstalled(
236
 
            Branch.open, url)
237
 
 
238
 
 
239
 
class URLChecker(BranchOpenPolicy):
240
 
    """Branch open policy that rejects URLs not on the given scheme."""
241
 
 
242
 
    def __init__(self, allowed_scheme):
243
 
        self.allowed_scheme = allowed_scheme
244
 
 
245
 
    def shouldFollowReferences(self):
246
 
        return True
247
 
 
248
 
    def transformFallbackLocation(self, branch, url):
249
 
        return urlutils.join(branch.base, url), True
250
 
 
251
 
    def checkOneURL(self, url):
252
 
        """Check that `url` is safe to open."""
253
 
        if URI(url).scheme != self.allowed_scheme:
254
 
            raise BadUrl(url)
 
296
            open_dir, open_branch, url)
255
297
 
256
298
 
257
299
def safe_open(allowed_scheme, url):
260
302
    :raises BadUrl: An attempt was made to open a URL that was not on
261
303
        `allowed_scheme`.
262
304
    """
263
 
    return SafeBranchOpener(URLChecker(allowed_scheme)).open(url)
 
305
    return SafeBranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
 
306
 
 
307
 
 
308
SafeBranchOpener.install_hook()