~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

 * Removed passing revid to revision info and using change instead

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
 
28
28
import cPickle
29
29
import os
 
30
import time
30
31
 
31
32
from loggerhead import util
32
33
from loggerhead.lockfile import LockFile
33
34
 
 
35
 
34
36
with_lock = util.with_lock('_lock', 'ChangeCache')
35
37
 
36
38
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
41
43
elif SQLITE_INTERFACE == 'sqlite':
42
44
    import sqlite as dbapi2
43
45
    _param_marker = '%s'
44
 
 
 
46
else:
 
47
    raise AssertionError("bad sqlite interface %r!?"%SQLITE_INTERFACE)
45
48
 
46
49
_select_stmt = ("select data from revisiondata where revid = ?"
47
50
                ).replace('?', _param_marker)
48
51
_insert_stmt = ("insert into revisiondata (revid, data) "
49
52
                "values (?, ?)").replace('?', _param_marker)
 
53
_update_stmt = ("update revisiondata set data = ? where revid = ?"
 
54
                ).replace('?', _param_marker)
50
55
 
51
56
 
52
57
 
75
80
            return None
76
81
        else:
77
82
            return self._unserialize(filechange[0])
78
 
    def add(self, revid_obj_pairs):
 
83
    def add(self, revid_obj_pairs, commit=True):
79
84
        for  (r, d) in revid_obj_pairs:
80
85
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
81
 
        self.connection.commit()
82
 
 
 
86
        if commit:
 
87
            self.connection.commit()
 
88
    def update(self, revid_obj_pairs, commit=True):
 
89
        for  (r, d) in revid_obj_pairs:
 
90
            self.cursor.execute(_update_stmt, (self._serialize(d), r))
 
91
        if commit:
 
92
            self.connection.commit()
 
93
    def count(self):
 
94
        self.cursor.execute(
 
95
            "select count(*) from revisiondata")
 
96
        return self.cursor.fetchone()[0]
 
97
    def close(self, commit=False):
 
98
        if commit:
 
99
            self.connection.commit()
 
100
        self.connection.close()
 
101
 
 
102
class ChangeCache (object):
 
103
 
 
104
    def __init__(self, history, cache_path):
 
105
        self.history = history
 
106
        self.log = history.log
 
107
 
 
108
        if not os.path.exists(cache_path):
 
109
            os.mkdir(cache_path)
 
110
 
 
111
        self._changes_filename = os.path.join(cache_path, 'changes.sql')
 
112
 
 
113
        # use a lockfile since the cache folder could be shared across different processes.
 
114
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
 
115
        self._closed = False
 
116
 
 
117
##         # this is fluff; don't slow down startup time with it.
 
118
##         # but it is racy in tests :(
 
119
##         def log_sizes():
 
120
##             self.log.info('Using change cache %s; %d entries.' % (cache_path, self.size()))
 
121
##         threading.Thread(target=log_sizes).start()
 
122
 
 
123
    def _cache(self):
 
124
        return FakeShelf(self._changes_filename)
 
125
 
 
126
    @with_lock
 
127
    def close(self):
 
128
        self.log.debug('Closing cache file.')
 
129
        self._closed = True
 
130
 
 
131
    @with_lock
 
132
    def closed(self):
 
133
        return self._closed
 
134
 
 
135
    @with_lock
 
136
    def flush(self):
 
137
        pass
 
138
 
 
139
    @with_lock
 
140
    def get_changes(self, revid_list):
 
141
        """
 
142
        get a list of changes by their revision_ids.  any changes missing
 
143
        from the cache are fetched by calling L{History.get_change_uncached}
 
144
        and inserted into the cache before returning.
 
145
        """
 
146
        out = []
 
147
        missing_revids = []
 
148
        missing_revid_indices = []
 
149
        cache = self._cache()
 
150
        for revid in revid_list:
 
151
            entry = cache.get(revid)
 
152
            if entry is not None:
 
153
                out.append(entry)
 
154
            else:
 
155
                missing_revids.append(revid)
 
156
                missing_revid_indices.append(len(out))
 
157
                out.append(None)
 
158
        if missing_revids:
 
159
            missing_entries = self.history.get_changes_uncached(missing_revids)
 
160
            missing_entry_dict = {}
 
161
            for entry in missing_entries:
 
162
                missing_entry_dict[entry.revid] = entry
 
163
            revid_entry_pairs = []
 
164
            for i, revid in zip(missing_revid_indices, missing_revids):
 
165
                out[i] = entry = missing_entry_dict.get(revid)
 
166
                if entry is not None:
 
167
                    revid_entry_pairs.append((revid, entry))
 
168
            cache.add(revid_entry_pairs)
 
169
        return filter(None, out)
 
170
 
 
171
    @with_lock
 
172
    def full(self):
 
173
        cache = self._cache()
 
174
        last_revid = util.to_utf8(self.history.last_revid)
 
175
        revision_history = self.history.get_revision_history()
 
176
        return (cache.count() >= len(revision_history)
 
177
                and cache.get(last_revid) is not None)
 
178
 
 
179
    @with_lock
 
180
    def size(self):
 
181
        return self._cache().count()
 
182
 
 
183
    def check_rebuild(self, max_time=3600):
 
184
        """
 
185
        check if we need to fill in any missing pieces of the cache.  pull in
 
186
        any missing changes, but don't work any longer than C{max_time}
 
187
        seconds.
 
188
        """
 
189
        if self.closed() or self.full():
 
190
            return
 
191
 
 
192
        self.log.info('Building revision cache...')
 
193
        start_time = time.time()
 
194
        last_update = time.time()
 
195
        count = 0
 
196
 
 
197
        work = list(self.history.get_revision_history())
 
198
        jump = 100
 
199
        for i in xrange(0, len(work), jump):
 
200
            r = work[i:i + jump]
 
201
            # must call into history so we grab the branch lock (otherwise, lock inversion)
 
202
            self.history.get_changes(r)
 
203
            if self.closed():
 
204
                self.flush()
 
205
                return
 
206
            count += jump
 
207
            now = time.time()
 
208
            if now - start_time > max_time:
 
209
                self.log.info('Cache rebuilding will pause for now.')
 
210
                self.flush()
 
211
                return
 
212
            if now - last_update > 60:
 
213
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
 
214
                last_update = time.time()
 
215
                self.flush()
 
216
            # give someone else a chance at the lock
 
217
            time.sleep(1)
 
218
        self.log.info('Revision cache rebuild completed.')
 
219
        self.flush()
83
220
 
84
221
class FileChangeCache(object):
85
222
    def __init__(self, history, cache_path):