~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Matt Nordhoff
  • Date: 2009-10-17 06:19:40 UTC
  • mto: (329.2.2 trailing-whitespace)
  • mto: This revision was merged to the branch mainline in revision 392.
  • Revision ID: mnordhoff@mattnordhoff.com-20091017061940-w9tcvy0xs1irno3y
Some random PEP 8 and otehr stylistic changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#
18
18
 
19
19
"""
20
 
a cache for chewed-up "change" data structures, which are basically just a
21
 
different way of storing a revision.  the cache improves lookup times 10x
22
 
over bazaar's xml revision structure, though, so currently still worth doing.
 
20
a cache for chewed-up 'file change' data structures, which are basically just
 
21
a different way of storing a revision delta.  the cache improves lookup times
 
22
10x over bazaar's xml revision structure, though, so currently still worth
 
23
doing.
23
24
 
24
25
once a revision is committed in bazaar, it never changes, so once we have
25
26
cached a change, it's good forever.
26
27
"""
27
28
 
28
29
import cPickle
 
30
import marshal
29
31
import os
30
 
import time
31
 
 
32
 
from loggerhead import util
33
 
from loggerhead.lockfile import LockFile
34
 
 
35
 
 
36
 
with_lock = util.with_lock('_lock', 'ChangeCache')
37
 
 
38
 
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
39
 
 
40
 
if SQLITE_INTERFACE == 'pysqlite2':
 
32
import tempfile
 
33
import zlib
 
34
 
 
35
try:
 
36
    from sqlite3 import dbapi2
 
37
except ImportError:
41
38
    from pysqlite2 import dbapi2
42
 
    _param_marker = '?'
43
 
elif SQLITE_INTERFACE == 'sqlite':
44
 
    import sqlite as dbapi2
45
 
    _param_marker = '%s'
46
 
else:
47
 
    raise AssertionError("bad sqlite interface %r!?"%SQLITE_INTERFACE)
48
 
 
49
 
_select_stmt = ("select data from revisiondata where revid = ?"
50
 
                ).replace('?', _param_marker)
51
 
_insert_stmt = ("insert into revisiondata (revid, data) "
52
 
                "values (?, ?)").replace('?', _param_marker)
53
 
_update_stmt = ("update revisiondata set data = ? where revid = ?"
54
 
                ).replace('?', _param_marker)
55
 
 
56
 
 
57
 
 
 
39
 
 
40
# We take an optimistic approach to concurrency here: we might do work twice
 
41
# in the case of races, but not crash or corrupt data.
 
42
 
 
43
def safe_init_db(filename, init_sql):
 
44
    # To avoid races around creating the database, we create the db in
 
45
    # a temporary file and rename it into the ultimate location.
 
46
    fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(filename))
 
47
    os.close(fd)
 
48
    con = dbapi2.connect(temp_path)
 
49
    cur = con.cursor()
 
50
    cur.execute(init_sql)
 
51
    con.commit()
 
52
    con.close()
 
53
    os.rename(temp_path, filename)
58
54
 
59
55
class FakeShelf(object):
 
56
 
60
57
    def __init__(self, filename):
61
58
        create_table = not os.path.exists(filename)
 
59
        if create_table:
 
60
            safe_init_db(
 
61
                filename, "create table RevisionData "
 
62
                "(revid binary primary key, data binary)")
62
63
        self.connection = dbapi2.connect(filename)
63
64
        self.cursor = self.connection.cursor()
64
 
        if create_table:
65
 
            self._create_table()
66
 
    def _create_table(self):
67
 
        self.cursor.execute(
 
65
 
 
66
    def _create_table(self, filename):
 
67
        con = dbapi2.connect(filename)
 
68
        cur = con.cursor()
 
69
        cur.execute(
68
70
            "create table RevisionData "
69
71
            "(revid binary primary key, data binary)")
70
 
        self.connection.commit()
 
72
        con.commit()
 
73
        con.close()
 
74
 
71
75
    def _serialize(self, obj):
72
 
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
73
 
        return r
 
76
        return dbapi2.Binary(cPickle.dumps(obj, protocol=2))
 
77
 
74
78
    def _unserialize(self, data):
75
79
        return cPickle.loads(str(data))
 
80
 
76
81
    def get(self, revid):
77
 
        self.cursor.execute(_select_stmt, (revid,))
 
82
        self.cursor.execute(
 
83
            "select data from revisiondata where revid = ?", (revid, ))
78
84
        filechange = self.cursor.fetchone()
79
85
        if filechange is None:
80
86
            return None
81
87
        else:
82
88
            return self._unserialize(filechange[0])
83
 
    def add(self, revid_obj_pairs, commit=True):
84
 
        for  (r, d) in revid_obj_pairs:
85
 
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
86
 
        if commit:
87
 
            self.connection.commit()
88
 
    def update(self, revid_obj_pairs, commit=True):
89
 
        for  (r, d) in revid_obj_pairs:
90
 
            self.cursor.execute(_update_stmt, (self._serialize(d), r))
91
 
        if commit:
92
 
            self.connection.commit()
93
 
    def count(self):
94
 
        self.cursor.execute(
95
 
            "select count(*) from revisiondata")
96
 
        return self.cursor.fetchone()[0]
97
 
    def close(self, commit=False):
98
 
        if commit:
99
 
            self.connection.commit()
100
 
        self.connection.close()
101
 
 
102
 
class ChangeCache (object):
103
 
 
104
 
    def __init__(self, history, cache_path):
105
 
        self.history = history
106
 
        self.log = history.log
107
 
 
108
 
        if not os.path.exists(cache_path):
109
 
            os.mkdir(cache_path)
110
 
 
111
 
        self._changes_filename = os.path.join(cache_path, 'changes.sql')
112
 
 
113
 
        # use a lockfile since the cache folder could be shared across different processes.
114
 
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
115
 
        self._closed = False
116
 
 
117
 
##         # this is fluff; don't slow down startup time with it.
118
 
##         # but it is racy in tests :(
119
 
##         def log_sizes():
120
 
##             self.log.info('Using change cache %s; %d entries.' % (cache_path, self.size()))
121
 
##         threading.Thread(target=log_sizes).start()
122
 
 
123
 
    def _cache(self):
124
 
        return FakeShelf(self._changes_filename)
125
 
 
126
 
    @with_lock
127
 
    def close(self):
128
 
        self.log.debug('Closing cache file.')
129
 
        self._closed = True
130
 
 
131
 
    @with_lock
132
 
    def closed(self):
133
 
        return self._closed
134
 
 
135
 
    @with_lock
136
 
    def flush(self):
137
 
        pass
138
 
 
139
 
    @with_lock
140
 
    def get_changes(self, revid_list):
141
 
        """
142
 
        get a list of changes by their revision_ids.  any changes missing
143
 
        from the cache are fetched by calling L{History.get_change_uncached}
144
 
        and inserted into the cache before returning.
145
 
        """
146
 
        out = []
147
 
        missing_revids = []
148
 
        missing_revid_indices = []
149
 
        cache = self._cache()
150
 
        for revid in revid_list:
151
 
            entry = cache.get(revid)
152
 
            if entry is not None:
153
 
                out.append(entry)
154
 
            else:
155
 
                missing_revids.append(revid)
156
 
                missing_revid_indices.append(len(out))
157
 
                out.append(None)
158
 
        if missing_revids:
159
 
            missing_entries = self.history.get_changes_uncached(missing_revids)
160
 
            missing_entry_dict = {}
161
 
            for entry in missing_entries:
162
 
                missing_entry_dict[entry.revid] = entry
163
 
            revid_entry_pairs = []
164
 
            for i, revid in zip(missing_revid_indices, missing_revids):
165
 
                out[i] = entry = missing_entry_dict.get(revid)
166
 
                if entry is not None:
167
 
                    revid_entry_pairs.append((revid, entry))
168
 
            cache.add(revid_entry_pairs)
169
 
        return filter(None, out)
170
 
 
171
 
    @with_lock
172
 
    def full(self):
173
 
        cache = self._cache()
174
 
        last_revid = util.to_utf8(self.history.last_revid)
175
 
        revision_history = self.history.get_revision_history()
176
 
        return (cache.count() >= len(revision_history)
177
 
                and cache.get(last_revid) is not None)
178
 
 
179
 
    @with_lock
180
 
    def size(self):
181
 
        return self._cache().count()
182
 
 
183
 
    def check_rebuild(self, max_time=3600):
184
 
        """
185
 
        check if we need to fill in any missing pieces of the cache.  pull in
186
 
        any missing changes, but don't work any longer than C{max_time}
187
 
        seconds.
188
 
        """
189
 
        if self.closed() or self.full():
190
 
            return
191
 
 
192
 
        self.log.info('Building revision cache...')
193
 
        start_time = time.time()
194
 
        last_update = time.time()
195
 
        count = 0
196
 
 
197
 
        work = list(self.history.get_revision_history())
198
 
        jump = 100
199
 
        for i in xrange(0, len(work), jump):
200
 
            r = work[i:i + jump]
201
 
            # must call into history so we grab the branch lock (otherwise, lock inversion)
202
 
            self.history.get_changes(r)
203
 
            if self.closed():
204
 
                self.flush()
205
 
                return
206
 
            count += jump
207
 
            now = time.time()
208
 
            if now - start_time > max_time:
209
 
                self.log.info('Cache rebuilding will pause for now.')
210
 
                self.flush()
211
 
                return
212
 
            if now - last_update > 60:
213
 
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
214
 
                last_update = time.time()
215
 
                self.flush()
216
 
            # give someone else a chance at the lock
217
 
            time.sleep(1)
218
 
        self.log.info('Revision cache rebuild completed.')
219
 
        self.flush()
 
89
 
 
90
    def add(self, revid, object):
 
91
        try:
 
92
            self.cursor.execute(
 
93
                "insert into revisiondata (revid, data) values (?, ?)",
 
94
                (revid, self._serialize(object)))
 
95
            self.connection.commit()
 
96
        except dbapi2.IntegrityError:
 
97
            # If another thread or process attempted to set the same key, we
 
98
            # assume it set it to the same value and carry on with our day.
 
99
            pass
 
100
 
220
101
 
221
102
class FileChangeCache(object):
222
 
    def __init__(self, history, cache_path):
223
 
        self.history = history
 
103
 
 
104
    def __init__(self, cache_path):
224
105
 
225
106
        if not os.path.exists(cache_path):
226
107
            os.mkdir(cache_path)
227
108
 
228
109
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
229
110
 
230
 
        # use a lockfile since the cache folder could be shared across
231
 
        # different processes.
232
 
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
233
 
 
234
 
    @with_lock
235
 
    def get_file_changes(self, entries):
236
 
        out = []
237
 
        missing_entries = []
238
 
        missing_entry_indices = []
 
111
    def get_file_changes(self, entry):
239
112
        cache = FakeShelf(self._changes_filename)
240
 
        for entry in entries:
241
 
            changes = cache.get(entry.revid)
242
 
            if changes is not None:
243
 
                out.append(changes)
244
 
            else:
245
 
                missing_entries.append(entry)
246
 
                missing_entry_indices.append(len(out))
247
 
                out.append(None)
248
 
        if missing_entries:
249
 
            missing_changes = self.history.get_file_changes_uncached(missing_entries)
250
 
            revid_changes_pairs = []
251
 
            for i, entry, changes in zip(
252
 
                missing_entry_indices, missing_entries, missing_changes):
253
 
                revid_changes_pairs.append((entry.revid, changes))
254
 
                out[i] = changes
255
 
            cache.add(revid_changes_pairs)
256
 
        return out
 
113
        changes = cache.get(entry.revid)
 
114
        if changes is None:
 
115
            changes = self.history.get_file_changes_uncached(entry)
 
116
            cache.add(entry.revid, changes)
 
117
        return changes
 
118
 
 
119
 
 
120
class RevInfoDiskCache(object):
 
121
    """Like `RevInfoMemoryCache` but backed in a sqlite DB."""
 
122
 
 
123
    def __init__(self, cache_path):
 
124
        if not os.path.exists(cache_path):
 
125
            os.mkdir(cache_path)
 
126
        filename = os.path.join(cache_path, 'revinfo.sql')
 
127
        create_table = not os.path.exists(filename)
 
128
        if create_table:
 
129
            safe_init_db(
 
130
                filename, "create table Data "
 
131
                "(key binary primary key, revid binary, data binary)")
 
132
        self.connection = dbapi2.connect(filename)
 
133
        self.cursor = self.connection.cursor()
 
134
 
 
135
    def get(self, key, revid):
 
136
        self.cursor.execute(
 
137
            "select revid, data from data where key = ?", (dbapi2.Binary(key),))
 
138
        row = self.cursor.fetchone()
 
139
        if row is None:
 
140
            return None
 
141
        elif str(row[0]) != revid:
 
142
            return None
 
143
        else:
 
144
            return marshal.loads(zlib.decompress(row[1]))
 
145
 
 
146
    def set(self, key, revid, data):
 
147
        try:
 
148
            self.cursor.execute(
 
149
                'delete from data where key = ?', (dbapi2.Binary(key), ))
 
150
            blob = zlib.compress(marshal.dumps(data))
 
151
            self.cursor.execute(
 
152
                "insert into data (key, revid, data) values (?, ?, ?)",
 
153
                map(dbapi2.Binary, [key, revid, blob]))
 
154
            self.connection.commit()
 
155
        except dbapi2.IntegrityError:
 
156
            # If another thread or process attempted to set the same key, we
 
157
            # don't care too much -- it's only a cache after all!
 
158
            pass