~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to wsgitest.py

  • Committer: Michael Hudson
  • Date: 2008-06-16 10:56:29 UTC
  • mto: This revision was merged to the branch mainline in revision 164.
  • Revision ID: michael.hudson@canonical.com-20080616105629-ea9dahymo4rl9cad
improvements borne out of the fires of contact with reality &c

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import cgi
2
1
import os
3
 
import tempfile
4
 
 
5
2
from bzrlib import branch, errors
6
 
 
 
3
from loggerhead.history import History
 
4
from loggerhead.wsgiapp import BranchWSGIApp, static_app
7
5
from paste.request import path_info_pop
8
 
from paste.wsgiwrappers import WSGIRequest, WSGIResponse
9
6
from paste import httpexceptions
10
 
 
11
 
from loggerhead.apps.branch import BranchWSGIApp
12
 
from loggerhead.apps import favicon_app, static_app
13
 
 
14
 
 
15
 
sql_dir = tempfile.mkdtemp()
 
7
from paste import httpserver
 
8
from paste.httpexceptions import make_middleware
 
9
from paste.translogger import make_filter
 
10
from loggerhead.changecache import FileChangeCache
 
11
 
 
12
 
16
13
 
17
14
class BranchesFromFileSystemServer(object):
18
15
    def __init__(self, folder, root):
19
16
        self.folder = folder
20
17
        self.root = root
21
18
 
22
 
    def directory_listing(self, path, environ, start_response):
23
 
        request = WSGIRequest(environ)
24
 
        response = WSGIResponse()
25
 
        listing = [d for d in os.listdir(path) if not d.startswith('.')]
26
 
        response.headers['Content-Type'] = 'text/html'
27
 
        print >> response, '<html><body>'
28
 
        for d in sorted(listing):
29
 
            if os.path.isdir(os.path.join(path, d)):
30
 
                d = cgi.escape(d)
31
 
                print >> response, '<li><a href="%s/">%s</a></li>' % (d, d)
32
 
        print >> response, '</body></html>'
33
 
        return response(environ, start_response)
34
 
 
35
 
    def app_for_branch(self, b, path):
36
 
        if not self.folder:
37
 
            name = os.path.basename(os.path.abspath(path))
38
 
        else:
39
 
            name = self.folder
40
 
        h = BranchWSGIApp(path, name, {'cachepath': sql_dir})
41
 
        self.root.cache[path] = h
42
 
        return h.app
43
 
 
44
19
    def __call__(self, environ, start_response):
45
 
        path = os.path.join(self.root.folder, self.folder)
46
 
        if not os.path.isdir(path):
47
 
            raise httpexceptions.HTTPNotFound()
48
 
        cached = self.root.cache.get(path)
49
 
        if cached is not None:
50
 
            return cached.app(environ, start_response)
 
20
        segment = path_info_pop(environ)
 
21
        if segment is None:
 
22
            raise httpexceptions.HTTPNotFound()
 
23
        relpath = os.path.join(self.folder, segment)
 
24
        f = os.path.join(self.root.folder, relpath)
 
25
        if not os.path.isdir(f):
 
26
            raise httpexceptions.HTTPNotFound()
 
27
        if f in self.root.cache:
 
28
            return self.root.cache[f](environ, start_response)
51
29
        try:
52
 
            b = branch.Branch.open(path)
 
30
            b = branch.Branch.open(f)
53
31
        except errors.NotBranchError:
54
 
            segment = path_info_pop(environ)
55
 
            if segment is None:
56
 
                raise httpexceptions.HTTPMovedPermanently(
57
 
                    environ['SCRIPT_NAME'] + '/')
58
 
            elif segment == '':
59
 
                return self.directory_listing(path, environ, start_response)
60
 
            else:
61
 
                relpath = os.path.join(self.folder, segment)
62
 
                return BranchesFromFileSystemServer(relpath, self.root)(
63
 
                    environ, start_response)
 
32
            return BranchesFromFileSystemServer(relpath, self.root)(environ, start_response)
64
33
        else:
65
 
            return self.app_for_branch(b, path)(environ, start_response)
66
 
 
 
34
            b.lock_read()
 
35
            try:
 
36
                _history = History.from_branch(b)
 
37
                _history.use_file_cache(FileChangeCache(_history, 'sql'))
 
38
                h = BranchWSGIApp(_history, relpath).app
 
39
                self.root.cache[f] = h
 
40
                return h(environ, start_response)
 
41
            finally:
 
42
                b.unlock()
67
43
 
68
44
class BranchesFromFileSystemRoot(object):
69
45
    def __init__(self, folder):
71
47
        self.folder = folder
72
48
    def __call__(self, environ, start_response):
73
49
        environ['loggerhead.static.url'] = environ['SCRIPT_NAME']
74
 
        if environ['PATH_INFO'].startswith('/static/'):
75
 
            segment = path_info_pop(environ)
76
 
            assert segment == 'static'
 
50
        segment = path_info_pop(environ)
 
51
        if segment == 'static':
77
52
            return static_app(environ, start_response)
78
 
        elif environ['PATH_INFO'] == '/favicon.ico':
79
 
            return favicon_app(environ, start_response)
80
53
        else:
81
54
            return BranchesFromFileSystemServer(
82
 
                '', self)(environ, start_response)
 
55
                segment, self)(environ, start_response)
 
56
 
 
57
app = BranchesFromFileSystemRoot('../..')
 
58
 
 
59
app = app
 
60
app = make_middleware(app)
 
61
app = make_filter(app, None)
 
62
 
 
63
from paste.evalexception.middleware import EvalException
 
64
 
 
65
httpserver.serve(EvalException(app), host='127.0.0.1', port='9876')
 
66