~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Robey Pointer
  • Date: 2007-05-21 06:19:12 UTC
  • Revision ID: robey@lag.net-20070521061912-doe5d89zirkvd22r
yes, of course, the first release was in december *2006* :)

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
cached a change, it's good forever.
26
26
"""
27
27
 
28
 
import cPickle
29
28
import logging
30
29
import os
 
30
import shelve
31
31
import threading
32
32
import time
33
33
 
37
37
 
38
38
 
39
39
with_lock = util.with_lock('_lock', 'ChangeCache')
40
 
 
41
 
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
42
 
 
43
 
if SQLITE_INTERFACE == 'pysqlite2':
44
 
    from pysqlite2 import dbapi2
45
 
    _param_marker = '?'
46
 
elif SQLITE_INTERFACE == 'sqlite':
47
 
    import sqlite as dbapi2
48
 
    _param_marker = '%s'
49
 
else:
50
 
    raise AssertionError("bad sqlite interface %r!?"%SQLITE_INTERFACE)
51
 
 
52
 
_select_stmt = ("select data from revisiondata where revid = ?"
53
 
                ).replace('?', _param_marker)
54
 
_insert_stmt = ("insert into revisiondata (revid, data) "
55
 
                "values (?, ?)").replace('?', _param_marker)
56
 
_update_stmt = ("update revisiondata set data = ? where revid = ?"
57
 
                ).replace('?', _param_marker)
58
 
 
59
 
 
60
 
 
61
 
 
62
 
class FakeShelf(object):
63
 
    def __init__(self, filename):
64
 
        create_table = not os.path.exists(filename)
65
 
        self.connection = dbapi2.connect(filename)
66
 
        self.cursor = self.connection.cursor()
67
 
        if create_table:
68
 
            self._create_table()
69
 
    def _create_table(self):
70
 
        self.cursor.execute(
71
 
            "create table RevisionData "
72
 
            "(revid binary primary key, data binary)")
73
 
        self.connection.commit()
74
 
    def _serialize(self, obj):
75
 
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
76
 
        return r
77
 
    def _unserialize(self, data):
78
 
        return cPickle.loads(str(data))
79
 
    def get(self, revid):
80
 
        self.cursor.execute(_select_stmt, (revid,))
81
 
        filechange = self.cursor.fetchone()
82
 
        if filechange is None:
83
 
            return None
84
 
        else:
85
 
            return self._unserialize(filechange[0])
86
 
    def add(self, revid_obj_pairs, commit=True):
87
 
        for  (r, d) in revid_obj_pairs:
88
 
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
89
 
        if commit:
90
 
            self.connection.commit()
91
 
    def update(self, revid_obj_pairs, commit=True):
92
 
        for  (r, d) in revid_obj_pairs:
93
 
            self.cursor.execute(_update_stmt, (self._serialize(d), r))
94
 
        if commit:
95
 
            self.connection.commit()
96
 
    def count(self):
97
 
        self.cursor.execute(
98
 
            "select count(*) from revisiondata")
99
 
        return self.cursor.fetchone()[0]
100
 
    def close(self, commit=False):
101
 
        if commit:
102
 
            self.connection.commit()
103
 
        self.connection.close()
 
40
        
104
41
 
105
42
class ChangeCache (object):
106
 
 
 
43
    
107
44
    def __init__(self, history, cache_path):
108
45
        self.history = history
109
46
        self.log = history.log
111
48
        if not os.path.exists(cache_path):
112
49
            os.mkdir(cache_path)
113
50
 
114
 
        self._changes_filename = os.path.join(cache_path, 'changes.sql')
115
 
 
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
116
55
        # use a lockfile since the cache folder could be shared across different processes.
117
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
118
57
        self._closed = False
119
 
 
120
 
##         # this is fluff; don't slow down startup time with it.
121
 
##         # but it is racy in tests :(
122
 
##         def log_sizes():
123
 
##             self.log.info('Using change cache %s; %d entries.' % (cache_path, self.size()))
124
 
##         threading.Thread(target=log_sizes).start()
125
 
 
126
 
    def _cache(self):
127
 
        return FakeShelf(self._changes_filename)
128
 
 
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
129
65
    @with_lock
130
66
    def close(self):
131
67
        self.log.debug('Closing cache file.')
140
76
        pass
141
77
    
142
78
    @with_lock
143
 
    def get_changes(self, revid_list):
 
79
    def get_changes(self, revid_list, get_diffs=False):
144
80
        """
145
81
        get a list of changes by their revision_ids.  any changes missing
146
82
        from the cache are fetched by calling L{History.get_change_uncached}
147
83
        and inserted into the cache before returning.
148
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
149
90
        out = []
150
 
        missing_revids = []
151
 
        missing_revid_indices = []
152
 
        cache = self._cache()
 
91
        fetch_list = []
 
92
        sfetch_list = []
153
93
        for revid in revid_list:
154
 
            entry = cache.get(revid)
155
 
            if entry is not None:
156
 
                out.append(entry)
 
94
            # if the revid is in unicode, use the utf-8 encoding as the key
 
95
            srevid = util.to_utf8(revid)
 
96
            
 
97
            if srevid in cache:
 
98
                out.append(cache[srevid])
157
99
            else:
158
 
                missing_revids.append(revid)
159
 
                missing_revid_indices.append(len(out))
 
100
                #self.log.debug('Entry cache miss: %r' % (revid,))
160
101
                out.append(None)
161
 
        if missing_revids:
162
 
            missing_entries = self.history.get_changes_uncached(missing_revids)
163
 
            missing_entry_dict = {}
164
 
            for entry in missing_entries:
165
 
                missing_entry_dict[entry.revid] = entry
166
 
            revid_entry_pairs = []
167
 
            for i, revid in zip(missing_revid_indices, missing_revids):
168
 
                out[i] = entry = missing_entry_dict.get(revid)
169
 
                if entry is not None:
170
 
                    revid_entry_pairs.append((revid, entry))
171
 
            cache.add(revid_entry_pairs)
172
 
        return filter(None, out)
173
 
 
174
 
    @with_lock
175
 
    def full(self):
176
 
        cache = self._cache()
177
 
        last_revid = util.to_utf8(self.history.last_revid)
178
 
        revision_history = self.history.get_revision_history()
179
 
        return (cache.count() >= len(revision_history)
180
 
                and cache.get(last_revid) is not None)
181
 
 
182
 
    @with_lock
183
 
    def size(self):
184
 
        return self._cache().count()
185
 
 
 
102
                fetch_list.append(revid)
 
103
                sfetch_list.append(srevid)
 
104
        
 
105
        if len(fetch_list) > 0:
 
106
            # some revisions weren't in the cache; fetch them
 
107
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
108
            if changes is None:
 
109
                return changes
 
110
            for i in xrange(len(revid_list)):
 
111
                if out[i] is None:
 
112
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
113
        
 
114
        cache.close()
 
115
        return out
 
116
    
 
117
    @with_lock
 
118
    def full(self, get_diffs=False):
 
119
        if get_diffs:
 
120
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
121
        else:
 
122
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
123
        try:
 
124
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
125
        finally:
 
126
            cache.close()
 
127
 
 
128
    @with_lock
 
129
    def sizes(self):
 
130
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
131
        s1 = len(cache)
 
132
        cache.close()
 
133
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
134
        s2 = len(cache)
 
135
        cache.close()
 
136
        return s1, s2
 
137
        
186
138
    def check_rebuild(self, max_time=3600):
187
139
        """
188
140
        check if we need to fill in any missing pieces of the cache.  pull in
221
173
        self.log.info('Revision cache rebuild completed.')
222
174
        self.flush()
223
175
 
224
 
class FileChangeCache(object):
225
 
    def __init__(self, history, cache_path):
226
 
        self.history = history
227
 
 
228
 
        if not os.path.exists(cache_path):
229
 
            os.mkdir(cache_path)
230
 
 
231
 
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
232
 
 
233
 
        # use a lockfile since the cache folder could be shared across
234
 
        # different processes.
235
 
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
236
 
 
237
 
    @with_lock
238
 
    def get_file_changes(self, entries):
239
 
        out = []
240
 
        missing_entries = []
241
 
        missing_entry_indices = []
242
 
        cache = FakeShelf(self._changes_filename)
243
 
        for entry in entries:
244
 
            changes = cache.get(entry.revid)
245
 
            if changes is not None:
246
 
                out.append(changes)
247
 
            else:
248
 
                missing_entries.append(entry)
249
 
                missing_entry_indices.append(len(out))
250
 
                out.append(None)
251
 
        if missing_entries:
252
 
            missing_changes = self.history.get_file_changes_uncached(missing_entries)
253
 
            revid_changes_pairs = []
254
 
            for i, entry, changes in zip(
255
 
                missing_entry_indices, missing_entries, missing_changes):
256
 
                revid_changes_pairs.append((entry.revid, changes))
257
 
                out[i] = changes
258
 
            cache.add(revid_changes_pairs)
259
 
        return out
 
176