20
a cache for chewed-up 'file change' data structures, which are basically just
21
a different way of storing a revision delta. the cache improves lookup times
22
10x over bazaar's xml revision structure, though, so currently still worth
20
a cache for chewed-up "change" data structures, which are basically just a
21
different way of storing a revision. the cache improves lookup times 10x
22
over bazaar's xml revision structure, though, so currently still worth doing.
25
24
once a revision is committed in bazaar, it never changes, so once we have
26
25
cached a change, it's good forever.
36
from sqlite3 import dbapi2
38
from pysqlite2 import dbapi2
40
# We take an optimistic approach to concurrency here: we might do work twice
41
# in the case of races, but not crash or corrupt data.
43
def safe_init_db(filename, init_sql):
44
# To avoid races around creating the database, we create the db in
45
# a temporary file and rename it into the ultimate location.
46
fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(filename))
48
con = dbapi2.connect(temp_path)
53
os.rename(temp_path, filename)
55
class FakeShelf(object):
57
def __init__(self, filename):
58
create_table = not os.path.exists(filename)
61
filename, "create table RevisionData "
62
"(revid binary primary key, data binary)")
63
self.connection = dbapi2.connect(filename)
64
self.cursor = self.connection.cursor()
66
def _create_table(self, filename):
67
con = dbapi2.connect(filename)
70
"create table RevisionData "
71
"(revid binary primary key, data binary)")
75
def _serialize(self, obj):
76
return dbapi2.Binary(cPickle.dumps(obj, protocol=2))
78
def _unserialize(self, data):
79
return cPickle.loads(str(data))
83
"select data from revisiondata where revid = ?", (revid, ))
84
filechange = self.cursor.fetchone()
85
if filechange is None:
88
return self._unserialize(filechange[0])
90
def add(self, revid, object):
93
"insert into revisiondata (revid, data) values (?, ?)",
94
(revid, self._serialize(object)))
95
self.connection.commit()
96
except dbapi2.IntegrityError:
97
# If another thread or process attempted to set the same key, we
98
# assume it set it to the same value and carry on with our day.
102
class RevInfoDiskCache(object):
103
"""Like `RevInfoMemoryCache` but backed in a sqlite DB."""
105
def __init__(self, cache_path):
34
from loggerhead import util
35
from loggerhead.util import decorator
36
from loggerhead.lockfile import LockFile
39
with_lock = util.with_lock('_lock', 'ChangeCache')
42
class ChangeCache (object):
44
def __init__(self, history, cache_path):
45
self.history = history
46
self.log = history.log
106
48
if not os.path.exists(cache_path):
107
49
os.mkdir(cache_path)
108
filename = os.path.join(cache_path, 'revinfo.sql')
109
create_table = not os.path.exists(filename)
112
filename, "create table Data "
113
"(key binary primary key, revid binary, data binary)")
114
self.connection = dbapi2.connect(filename)
115
self.cursor = self.connection.cursor()
117
def get(self, key, revid):
119
"select revid, data from data where key = ?", (dbapi2.Binary(key),))
120
row = self.cursor.fetchone()
123
elif str(row[0]) != revid:
126
return marshal.loads(zlib.decompress(row[1]))
128
def set(self, key, revid, data):
51
# keep a separate cache for the diffs, because they're very time-consuming to fetch.
52
self._changes_filename = os.path.join(cache_path, 'changes')
53
self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
55
# use a lockfile since the cache folder could be shared across different processes.
56
self._lock = LockFile(os.path.join(cache_path, 'lock'))
59
# this is fluff; don't slow down startup time with it.
62
self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
63
threading.Thread(target=log_sizes).start()
67
self.log.debug('Closing cache file.')
79
def get_changes(self, revid_list, get_diffs=False):
81
get a list of changes by their revision_ids. any changes missing
82
from the cache are fetched by calling L{History.get_change_uncached}
83
and inserted into the cache before returning.
86
cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
88
cache = shelve.open(self._changes_filename, 'c', protocol=2)
93
for revid in revid_list:
94
# if the revid is in unicode, use the utf-8 encoding as the key
95
srevid = util.to_utf8(revid)
98
out.append(cache[srevid])
100
#self.log.debug('Entry cache miss: %r' % (revid,))
102
fetch_list.append(revid)
103
sfetch_list.append(srevid)
105
if len(fetch_list) > 0:
106
# some revisions weren't in the cache; fetch them
107
changes = self.history.get_changes_uncached(fetch_list, get_diffs)
110
for i in xrange(len(revid_list)):
112
cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
118
def full(self, get_diffs=False):
120
cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
122
cache = shelve.open(self._changes_filename, 'c', protocol=2)
131
'delete from data where key = ?', (dbapi2.Binary(key), ))
132
blob = zlib.compress(marshal.dumps(data))
134
"insert into data (key, revid, data) values (?, ?, ?)",
135
map(dbapi2.Binary, [key, revid, blob]))
136
self.connection.commit()
137
except dbapi2.IntegrityError:
138
# If another thread or process attempted to set the same key, we
139
# don't care too much -- it's only a cache after all!
124
return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
130
cache = shelve.open(self._changes_filename, 'c', protocol=2)
133
cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
138
def check_rebuild(self, max_time=3600):
140
check if we need to fill in any missing pieces of the cache. pull in
141
any missing changes, but don't work any longer than C{max_time}
144
if self.closed() or self.full():
147
self.log.info('Building revision cache...')
148
start_time = time.time()
149
last_update = time.time()
152
work = list(self.history.get_revision_history())
154
for i in xrange(0, len(work), jump):
156
# must call into history so we grab the branch lock (otherwise, lock inversion)
157
self.history.get_changes(r)
163
if now - start_time > max_time:
164
self.log.info('Cache rebuilding will pause for now.')
167
if now - last_update > 60:
168
self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
169
last_update = time.time()
171
# give someone else a chance at the lock
173
self.log.info('Revision cache rebuild completed.')