~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to wsgitest.py

  • Committer: Michael Hudson
  • Date: 2008-06-17 00:01:47 UTC
  • mto: This revision was merged to the branch mainline in revision 164.
  • Revision ID: michael.hudson@canonical.com-20080617000147-d3wf2fdf6arrmfo2
extract some methods

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""Serve branches at urls that mimic a transport's file system layout."""
2
 
 
3
 
from bzrlib import branch, errors, lru_cache, urlutils
4
 
from bzrlib.transport import get_transport
5
 
from bzrlib.transport.http import wsgi
6
 
 
 
1
import cgi, os, tempfile
 
2
from bzrlib import branch, errors
 
3
from loggerhead.history import History
 
4
from loggerhead.wsgiapp import BranchWSGIApp, static_app
7
5
from paste.request import path_info_pop
 
6
from paste.wsgiwrappers import WSGIRequest, WSGIResponse
8
7
from paste import httpexceptions
9
 
from paste import urlparser
10
 
 
11
 
from loggerhead.apps.branch import BranchWSGIApp
12
 
from loggerhead.apps import favicon_app, static_app
13
 
from loggerhead.config import LoggerheadConfig
14
 
from loggerhead.controllers.directory_ui import DirectoryUI
15
 
 
16
 
 
17
 
class BranchesFromTransportServer(object):
18
 
 
19
 
    def __init__(self, transport, root, name=None):
20
 
        self.transport = transport
 
8
from paste import httpserver
 
9
from paste.httpexceptions import make_middleware
 
10
from paste.translogger import make_filter
 
11
from loggerhead.changecache import FileChangeCache
 
12
 
 
13
 
 
14
sql_dir = tempfile.mkdtemp()
 
15
 
 
16
class BranchesFromFileSystemServer(object):
 
17
    def __init__(self, folder, root):
 
18
        self.folder = folder
21
19
        self.root = root
22
 
        self.name = name
23
 
        self._config = root._config
24
 
 
25
 
    def app_for_branch(self, branch):
26
 
        if not self.name:
27
 
            name = branch._get_nick(local=True)
28
 
            is_root = True
29
 
        else:
30
 
            name = self.name
31
 
            is_root = False
32
 
        branch_app = BranchWSGIApp(
33
 
            branch, name,
34
 
            {'cachepath': self._config.SQL_DIR},
35
 
            self.root.graph_cache, is_root=is_root,
36
 
            use_cdn=self._config.get_option('use_cdn'))
37
 
        return branch_app.app
38
 
 
39
 
    def app_for_non_branch(self, environ):
40
 
        segment = path_info_pop(environ)
41
 
        if segment is None:
42
 
            raise httpexceptions.HTTPMovedPermanently(
43
 
                environ['SCRIPT_NAME'] + '/')
44
 
        elif segment == '':
45
 
            if self.name:
46
 
                name = self.name
47
 
            else:
48
 
                name = '/'
49
 
            return DirectoryUI(environ['loggerhead.static.url'],
50
 
                               self.transport,
51
 
                               name)
52
 
        else:
53
 
            new_transport = self.transport.clone(segment)
54
 
            if self.name:
55
 
                new_name = urlutils.join(self.name, segment)
56
 
            else:
57
 
                new_name = '/' + segment
58
 
            return BranchesFromTransportServer(new_transport, self.root, new_name)
 
20
 
 
21
    def directory_listing(self, path, environ, start_response):
 
22
        request = WSGIRequest(environ)
 
23
        response = WSGIResponse()
 
24
        listing = [d for d in os.listdir(path) if not d.startswith('.')]
 
25
        response.headers['Content-Type'] = 'text/html'
 
26
        print >> response, '<html><body>'
 
27
        for d in sorted(listing):
 
28
            if os.path.isdir(os.path.join(path, d)):
 
29
                d = cgi.escape(d)
 
30
                print >> response, '<li><a href="%s/">%s</a></li>' % (d, d)
 
31
        print >> response, '</body></html>'
 
32
        return response(environ, start_response)
 
33
 
 
34
    def app_for_branch(self, b, path):
 
35
        b.lock_read()
 
36
        try:
 
37
            _history = History.from_branch(b)
 
38
            _history.use_file_cache(FileChangeCache(_history, sql_dir))
 
39
            if not self.folder:
 
40
                name = os.path.basename(os.path.abspath(path))
 
41
            else:
 
42
                name = self.folder
 
43
            h = BranchWSGIApp(_history, name).app
 
44
            self.root.cache[path] = h
 
45
            return h
 
46
        finally:
 
47
            b.unlock()
59
48
 
60
49
    def __call__(self, environ, start_response):
 
50
        path = os.path.join(self.root.folder, self.folder)
 
51
        if not os.path.isdir(path):
 
52
            raise httpexceptions.HTTPNotFound()
 
53
        if path in self.root.cache:
 
54
            return self.root.cache[path](environ, start_response)
61
55
        try:
62
 
            b = branch.Branch.open_from_transport(self.transport)
 
56
            b = branch.Branch.open(path)
63
57
        except errors.NotBranchError:
64
 
            if not self.transport.listable() or not self.transport.has('.'):
65
 
                raise httpexceptions.HTTPNotFound()
66
 
            return self.app_for_non_branch(environ)(environ, start_response)
 
58
            segment = path_info_pop(environ)
 
59
            if segment is None:
 
60
                raise httpexceptions.HTTPMovedPermanently(environ['SCRIPT_NAME'] + '/')
 
61
            elif segment == '':
 
62
                return self.directory_listing(path, environ, start_response)
 
63
            else:
 
64
                relpath = os.path.join(self.folder, segment)
 
65
                return BranchesFromFileSystemServer(relpath, self.root)(
 
66
                    environ, start_response)
67
67
        else:
68
 
            return self.app_for_branch(b)(environ, start_response)
69
 
 
70
 
 
71
 
class BranchesFromTransportRoot(object):
72
 
 
73
 
    def __init__(self, transport, config):
74
 
        self.graph_cache = lru_cache.LRUCache(10)
75
 
        self.transport = transport
76
 
        self._config = config
77
 
 
 
68
            return self.app_for_branch(b, path)(environ, start_response)
 
69
 
 
70
 
 
71
class BranchesFromFileSystemRoot(object):
 
72
    def __init__(self, folder):
 
73
        self.cache = {}
 
74
        self.folder = folder
78
75
    def __call__(self, environ, start_response):
79
76
        environ['loggerhead.static.url'] = environ['SCRIPT_NAME']
80
77
        if environ['PATH_INFO'].startswith('/static/'):
81
78
            segment = path_info_pop(environ)
82
79
            assert segment == 'static'
83
80
            return static_app(environ, start_response)
84
 
        elif environ['PATH_INFO'] == '/favicon.ico':
85
 
            return favicon_app(environ, start_response)
86
 
        elif environ['PATH_INFO'].endswith("/.bzr/smart"):
87
 
            # Only do readonly for now
88
 
            transport = get_transport("readonly+" + self.transport.base)
89
 
            wsgi_app = wsgi.SmartWSGIApp(self.transport)
90
 
            wsgi_app = wsgi.RelpathSetter(wsgi_app, '', 'PATH_INFO')
91
 
            return wsgi_app(environ, start_response)
92
 
        elif '/.bzr/' in environ['PATH_INFO']:
93
 
            # TODO: Use something here that uses the transport API 
94
 
            # rather than relying on the local filesystem API.
95
 
            try:
96
 
                path = urlutils.local_path_from_url(self.transport.base)
97
 
            except errors.InvalidURL:
98
 
                raise httpexceptions.HTTPNotFound()
99
 
            else:
100
 
                app = urlparser.make_static(None, path)
101
 
                return app(environ, start_response)
102
 
        else:
103
 
            return BranchesFromTransportServer(
104
 
                self.transport, self)(environ, start_response)
105
 
 
106
 
 
107
 
class UserBranchesFromTransportRoot(object):
108
 
 
109
 
    def __init__(self, transport, config):
110
 
        self.graph_cache = lru_cache.LRUCache(10)
111
 
        self.transport = transport
112
 
        self._config = config
113
 
        self.trunk_dir = config.get_option('trunk_dir')
114
 
 
115
 
    def __call__(self, environ, start_response):
116
 
        environ['loggerhead.static.url'] = environ['SCRIPT_NAME']
117
 
        path_info = environ['PATH_INFO']
118
 
        if path_info.startswith('/static/'):
119
 
            segment = path_info_pop(environ)
120
 
            assert segment == 'static'
121
 
            return static_app(environ, start_response)
122
 
        elif path_info == '/favicon.ico':
123
 
            return favicon_app(environ, start_response)
124
 
        else:
125
 
            # segments starting with ~ are user branches
126
 
            if path_info.startswith('/~'):
127
 
                segment = path_info_pop(environ)
128
 
                new_transport = self.transport.clone(segment[1:])
129
 
                return BranchesFromTransportServer(
130
 
                    new_transport, self, segment)(environ, start_response)
131
 
            else:
132
 
                new_transport = self.transport.clone(self.trunk_dir)
133
 
                return BranchesFromTransportServer(
134
 
                    new_transport, self)(environ, start_response)
 
81
        else:
 
82
            return BranchesFromFileSystemServer(
 
83
                '', self)(environ, start_response)
 
84
 
 
85
app = BranchesFromFileSystemRoot('.')
 
86
 
 
87
app = app
 
88
app = make_middleware(app)
 
89
app = make_filter(app, None)
 
90
 
 
91
 
 
92
httpserver.serve(app, host='127.0.0.1', port='9876')
 
93