~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

[rs=mwhudson][release-critical=Rinchen] update to loggerhead trunk,
        mainly to get the code to not hold branches open the whole time but
        also getting some other improvements

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
 
28
28
import cPickle
29
29
import os
30
 
import time
31
30
 
32
31
from loggerhead import util
33
32
from loggerhead.lockfile import LockFile
34
33
 
35
 
 
36
34
with_lock = util.with_lock('_lock', 'ChangeCache')
37
35
 
38
36
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
43
41
elif SQLITE_INTERFACE == 'sqlite':
44
42
    import sqlite as dbapi2
45
43
    _param_marker = '%s'
46
 
else:
47
 
    raise AssertionError("bad sqlite interface %r!?"%SQLITE_INTERFACE)
 
44
 
48
45
 
49
46
_select_stmt = ("select data from revisiondata where revid = ?"
50
47
                ).replace('?', _param_marker)
51
48
_insert_stmt = ("insert into revisiondata (revid, data) "
52
49
                "values (?, ?)").replace('?', _param_marker)
53
 
_update_stmt = ("update revisiondata set data = ? where revid = ?"
54
 
                ).replace('?', _param_marker)
55
50
 
56
51
 
57
52
 
80
75
            return None
81
76
        else:
82
77
            return self._unserialize(filechange[0])
83
 
    def add(self, revid_obj_pairs, commit=True):
 
78
    def add(self, revid_obj_pairs):
84
79
        for  (r, d) in revid_obj_pairs:
85
80
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
86
 
        if commit:
87
 
            self.connection.commit()
88
 
    def update(self, revid_obj_pairs, commit=True):
89
 
        for  (r, d) in revid_obj_pairs:
90
 
            self.cursor.execute(_update_stmt, (self._serialize(d), r))
91
 
        if commit:
92
 
            self.connection.commit()
93
 
    def count(self):
94
 
        self.cursor.execute(
95
 
            "select count(*) from revisiondata")
96
 
        return self.cursor.fetchone()[0]
97
 
    def close(self, commit=False):
98
 
        if commit:
99
 
            self.connection.commit()
100
 
        self.connection.close()
101
 
 
102
 
class ChangeCache (object):
103
 
 
104
 
    def __init__(self, history, cache_path):
105
 
        self.history = history
106
 
        self.log = history.log
107
 
 
108
 
        if not os.path.exists(cache_path):
109
 
            os.mkdir(cache_path)
110
 
 
111
 
        self._changes_filename = os.path.join(cache_path, 'changes.sql')
112
 
 
113
 
        # use a lockfile since the cache folder could be shared across different processes.
114
 
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
115
 
        self._closed = False
116
 
 
117
 
##         # this is fluff; don't slow down startup time with it.
118
 
##         # but it is racy in tests :(
119
 
##         def log_sizes():
120
 
##             self.log.info('Using change cache %s; %d entries.' % (cache_path, self.size()))
121
 
##         threading.Thread(target=log_sizes).start()
122
 
 
123
 
    def _cache(self):
124
 
        return FakeShelf(self._changes_filename)
125
 
 
126
 
    @with_lock
127
 
    def close(self):
128
 
        self.log.debug('Closing cache file.')
129
 
        self._closed = True
130
 
 
131
 
    @with_lock
132
 
    def closed(self):
133
 
        return self._closed
134
 
 
135
 
    @with_lock
136
 
    def flush(self):
137
 
        pass
138
 
 
139
 
    @with_lock
140
 
    def get_changes(self, revid_list):
141
 
        """
142
 
        get a list of changes by their revision_ids.  any changes missing
143
 
        from the cache are fetched by calling L{History.get_change_uncached}
144
 
        and inserted into the cache before returning.
145
 
        """
146
 
        out = []
147
 
        missing_revids = []
148
 
        missing_revid_indices = []
149
 
        cache = self._cache()
150
 
        for revid in revid_list:
151
 
            entry = cache.get(revid)
152
 
            if entry is not None:
153
 
                out.append(entry)
154
 
            else:
155
 
                missing_revids.append(revid)
156
 
                missing_revid_indices.append(len(out))
157
 
                out.append(None)
158
 
        if missing_revids:
159
 
            missing_entries = self.history.get_changes_uncached(missing_revids)
160
 
            missing_entry_dict = {}
161
 
            for entry in missing_entries:
162
 
                missing_entry_dict[entry.revid] = entry
163
 
            revid_entry_pairs = []
164
 
            for i, revid in zip(missing_revid_indices, missing_revids):
165
 
                out[i] = entry = missing_entry_dict.get(revid)
166
 
                if entry is not None:
167
 
                    revid_entry_pairs.append((revid, entry))
168
 
            cache.add(revid_entry_pairs)
169
 
        return filter(None, out)
170
 
 
171
 
    @with_lock
172
 
    def full(self):
173
 
        cache = self._cache()
174
 
        last_revid = util.to_utf8(self.history.last_revid)
175
 
        revision_history = self.history.get_revision_history()
176
 
        return (cache.count() >= len(revision_history)
177
 
                and cache.get(last_revid) is not None)
178
 
 
179
 
    @with_lock
180
 
    def size(self):
181
 
        return self._cache().count()
182
 
 
183
 
    def check_rebuild(self, max_time=3600):
184
 
        """
185
 
        check if we need to fill in any missing pieces of the cache.  pull in
186
 
        any missing changes, but don't work any longer than C{max_time}
187
 
        seconds.
188
 
        """
189
 
        if self.closed() or self.full():
190
 
            return
191
 
 
192
 
        self.log.info('Building revision cache...')
193
 
        start_time = time.time()
194
 
        last_update = time.time()
195
 
        count = 0
196
 
 
197
 
        work = list(self.history.get_revision_history())
198
 
        jump = 100
199
 
        for i in xrange(0, len(work), jump):
200
 
            r = work[i:i + jump]
201
 
            # must call into history so we grab the branch lock (otherwise, lock inversion)
202
 
            self.history.get_changes(r)
203
 
            if self.closed():
204
 
                self.flush()
205
 
                return
206
 
            count += jump
207
 
            now = time.time()
208
 
            if now - start_time > max_time:
209
 
                self.log.info('Cache rebuilding will pause for now.')
210
 
                self.flush()
211
 
                return
212
 
            if now - last_update > 60:
213
 
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
214
 
                last_update = time.time()
215
 
                self.flush()
216
 
            # give someone else a chance at the lock
217
 
            time.sleep(1)
218
 
        self.log.info('Revision cache rebuild completed.')
219
 
        self.flush()
 
81
        self.connection.commit()
 
82
 
220
83
 
221
84
class FileChangeCache(object):
222
85
    def __init__(self, history, cache_path):