1
"""The WSGI application for serving a Bazaar branch."""
7
import bzrlib.lru_cache
9
from paste import request
10
from paste import httpexceptions
12
from loggerhead.apps import static_app
13
from loggerhead.controllers.changelog_ui import ChangeLogUI
14
from loggerhead.controllers.inventory_ui import InventoryUI
15
from loggerhead.controllers.annotate_ui import AnnotateUI
16
from loggerhead.controllers.revision_ui import RevisionUI
17
from loggerhead.controllers.atom_ui import AtomUI
18
from loggerhead.controllers.download_ui import DownloadUI
19
from loggerhead.controllers.search_ui import SearchUI
20
from loggerhead.history import History
21
from loggerhead import util
24
class BranchWSGIApp(object):
26
def __init__(self, branch, friendly_name=None, config={},
27
graph_cache=None, branch_link=None):
30
self.friendly_name = friendly_name
31
self.branch_link = branch_link # Currently only used in Launchpad
32
self.log = logging.getLogger('loggerhead.%s' % (friendly_name,))
33
if graph_cache is None:
34
graph_cache = bzrlib.lru_cache.LRUCache()
35
self.graph_cache = graph_cache
37
def get_history(self):
38
_history = History(self.branch, self.graph_cache)
39
cache_path = self._config.get('cachepath', None)
40
if cache_path is not None:
41
# Only import the cache if we're going to use it.
42
# This makes sqlite optional
44
from loggerhead.changecache import FileChangeCache
46
self.log.debug("Couldn't load python-sqlite,"
47
" continuing without using a cache")
49
_history.use_file_cache(
50
FileChangeCache(_history, cache_path))
53
def url(self, *args, **kw):
54
if isinstance(args[0], list):
57
for k, v in kw.iteritems():
59
qs.append('%s=%s'%(k, urllib.quote(v)))
61
return request.construct_url(
62
self._environ, script_name=self._url_base,
63
path_info='/'.join(args),
66
def context_url(self, *args, **kw):
67
kw = util.get_context(**kw)
68
return self.url(*args, **kw)
70
def static_url(self, path):
71
return self._static_url_base + path
74
'annotate': AnnotateUI,
75
'changes': ChangeLogUI,
77
'revision': RevisionUI,
78
'download': DownloadUI,
83
def last_updated(self):
84
h = self.get_history()
85
change = h.get_changes([ h.last_revid ])[0]
89
return self.branch.get_config().get_user_option('public_branch')
91
def app(self, environ, start_response):
92
self._url_base = environ['SCRIPT_NAME']
93
self._static_url_base = environ.get('loggerhead.static.url')
94
if self._static_url_base is None:
95
self._static_url_base = self._url_base
96
self._environ = environ
97
path = request.path_info_pop(environ)
99
raise httpexceptions.HTTPMovedPermanently(
100
self._url_base + '/changes')
102
return static_app(environ, start_response)
103
cls = self.controllers_dict.get(path)
105
raise httpexceptions.HTTPNotFound()
106
self.branch.lock_read()
108
c = cls(self, self.get_history())
109
return c(environ, start_response)