~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Robey Pointer
  • Date: 2007-01-14 05:40:40 UTC
  • Revision ID: robey@lag.net-20070114054040-7i9lbhq992e612rq
fix up dev.cfg so that nobody will ever have to edit it, by letting the
important params be overridable in loggerhead.conf.

make start-loggerhead actually daemonize, write a pid file, and write logs
to normal log files, instead of requiring 'nohup' stuff.  ie act like a real
server.  added stop-loggerhead to do a clean shutdown.  changed the README
to clarify how it should work now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
cached a change, it's good forever.
26
26
"""
27
27
 
28
 
import cPickle
 
28
import logging
29
29
import os
 
30
import shelve
 
31
import threading
30
32
import time
31
33
 
32
34
from loggerhead import util
 
35
from loggerhead.util import decorator
33
36
from loggerhead.lockfile import LockFile
34
37
 
35
38
 
36
39
with_lock = util.with_lock('_lock', 'ChangeCache')
37
 
 
38
 
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
39
 
 
40
 
if SQLITE_INTERFACE == 'pysqlite2':
41
 
    from pysqlite2 import dbapi2
42
 
    _param_marker = '?'
43
 
elif SQLITE_INTERFACE == 'sqlite':
44
 
    import sqlite as dbapi2
45
 
    _param_marker = '%s'
46
 
else:
47
 
    raise AssertionError("bad sqlite interface %r!?"%SQLITE_INTERFACE)
48
 
 
49
 
_select_stmt = ("select data from revisiondata where revid = ?"
50
 
                ).replace('?', _param_marker)
51
 
_insert_stmt = ("insert into revisiondata (revid, data) "
52
 
                "values (?, ?)").replace('?', _param_marker)
53
 
_update_stmt = ("update revisiondata set data = ? where revid = ?"
54
 
                ).replace('?', _param_marker)
55
 
 
56
 
 
57
 
 
58
 
 
59
 
class FakeShelf(object):
60
 
    def __init__(self, filename):
61
 
        create_table = not os.path.exists(filename)
62
 
        self.connection = dbapi2.connect(filename)
63
 
        self.cursor = self.connection.cursor()
64
 
        if create_table:
65
 
            self._create_table()
66
 
    def _create_table(self):
67
 
        self.cursor.execute(
68
 
            "create table RevisionData "
69
 
            "(revid binary primary key, data binary)")
70
 
        self.connection.commit()
71
 
    def _serialize(self, obj):
72
 
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
73
 
        return r
74
 
    def _unserialize(self, data):
75
 
        return cPickle.loads(str(data))
76
 
    def get(self, revid):
77
 
        self.cursor.execute(_select_stmt, (revid,))
78
 
        filechange = self.cursor.fetchone()
79
 
        if filechange is None:
80
 
            return None
81
 
        else:
82
 
            return self._unserialize(filechange[0])
83
 
    def add(self, revid_obj_pairs, commit=True):
84
 
        for  (r, d) in revid_obj_pairs:
85
 
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
86
 
        if commit:
87
 
            self.connection.commit()
88
 
    def update(self, revid_obj_pairs, commit=True):
89
 
        for  (r, d) in revid_obj_pairs:
90
 
            self.cursor.execute(_update_stmt, (self._serialize(d), r))
91
 
        if commit:
92
 
            self.connection.commit()
93
 
    def count(self):
94
 
        self.cursor.execute(
95
 
            "select count(*) from revisiondata")
96
 
        return self.cursor.fetchone()[0]
97
 
    def close(self, commit=False):
98
 
        if commit:
99
 
            self.connection.commit()
100
 
        self.connection.close()
 
40
        
101
41
 
102
42
class ChangeCache (object):
103
 
 
 
43
    
104
44
    def __init__(self, history, cache_path):
105
45
        self.history = history
106
46
        self.log = history.log
107
 
 
 
47
        
108
48
        if not os.path.exists(cache_path):
109
49
            os.mkdir(cache_path)
110
50
 
111
 
        self._changes_filename = os.path.join(cache_path, 'changes.sql')
112
 
 
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
113
55
        # use a lockfile since the cache folder could be shared across different processes.
114
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
115
57
        self._closed = False
116
 
 
117
 
##         # this is fluff; don't slow down startup time with it.
118
 
##         # but it is racy in tests :(
119
 
##         def log_sizes():
120
 
##             self.log.info('Using change cache %s; %d entries.' % (cache_path, self.size()))
121
 
##         threading.Thread(target=log_sizes).start()
122
 
 
123
 
    def _cache(self):
124
 
        return FakeShelf(self._changes_filename)
125
 
 
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
126
65
    @with_lock
127
66
    def close(self):
128
67
        self.log.debug('Closing cache file.')
129
68
        self._closed = True
130
 
 
 
69
    
131
70
    @with_lock
132
71
    def closed(self):
133
72
        return self._closed
135
74
    @with_lock
136
75
    def flush(self):
137
76
        pass
138
 
 
 
77
    
139
78
    @with_lock
140
 
    def get_changes(self, revid_list):
 
79
    def get_changes(self, revid_list, get_diffs=False):
141
80
        """
142
81
        get a list of changes by their revision_ids.  any changes missing
143
82
        from the cache are fetched by calling L{History.get_change_uncached}
144
83
        and inserted into the cache before returning.
145
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
146
90
        out = []
147
 
        missing_revids = []
148
 
        missing_revid_indices = []
149
 
        cache = self._cache()
 
91
        fetch_list = []
 
92
        sfetch_list = []
150
93
        for revid in revid_list:
151
 
            entry = cache.get(revid)
152
 
            if entry is not None:
153
 
                out.append(entry)
 
94
            # if the revid is in unicode, use the utf-8 encoding as the key
 
95
            srevid = util.to_utf8(revid)
 
96
            
 
97
            if srevid in cache:
 
98
                out.append(cache[srevid])
154
99
            else:
155
 
                missing_revids.append(revid)
156
 
                missing_revid_indices.append(len(out))
 
100
                #self.log.debug('Entry cache miss: %r' % (revid,))
157
101
                out.append(None)
158
 
        if missing_revids:
159
 
            missing_entries = self.history.get_changes_uncached(missing_revids)
160
 
            missing_entry_dict = {}
161
 
            for entry in missing_entries:
162
 
                missing_entry_dict[entry.revid] = entry
163
 
            revid_entry_pairs = []
164
 
            for i, revid in zip(missing_revid_indices, missing_revids):
165
 
                out[i] = entry = missing_entry_dict.get(revid)
166
 
                if entry is not None:
167
 
                    revid_entry_pairs.append((revid, entry))
168
 
            cache.add(revid_entry_pairs)
169
 
        return filter(None, out)
170
 
 
171
 
    @with_lock
172
 
    def full(self):
173
 
        cache = self._cache()
174
 
        last_revid = util.to_utf8(self.history.last_revid)
175
 
        revision_history = self.history.get_revision_history()
176
 
        return (cache.count() >= len(revision_history)
177
 
                and cache.get(last_revid) is not None)
178
 
 
179
 
    @with_lock
180
 
    def size(self):
181
 
        return self._cache().count()
182
 
 
 
102
                fetch_list.append(revid)
 
103
                sfetch_list.append(srevid)
 
104
        
 
105
        if len(fetch_list) > 0:
 
106
            # some revisions weren't in the cache; fetch them
 
107
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
108
            if changes is None:
 
109
                return changes
 
110
            for i in xrange(len(revid_list)):
 
111
                if out[i] is None:
 
112
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
113
        
 
114
        cache.close()
 
115
        return out
 
116
    
 
117
    @with_lock
 
118
    def full(self, get_diffs=False):
 
119
        if get_diffs:
 
120
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
121
        else:
 
122
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
123
        try:
 
124
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
125
        finally:
 
126
            cache.close()
 
127
 
 
128
    @with_lock
 
129
    def sizes(self):
 
130
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
131
        s1 = len(cache)
 
132
        cache.close()
 
133
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
134
        s2 = len(cache)
 
135
        cache.close()
 
136
        return s1, s2
 
137
        
183
138
    def check_rebuild(self, max_time=3600):
184
139
        """
185
140
        check if we need to fill in any missing pieces of the cache.  pull in
188
143
        """
189
144
        if self.closed() or self.full():
190
145
            return
191
 
 
 
146
        
192
147
        self.log.info('Building revision cache...')
193
148
        start_time = time.time()
194
149
        last_update = time.time()
218
173
        self.log.info('Revision cache rebuild completed.')
219
174
        self.flush()
220
175
 
221
 
class FileChangeCache(object):
222
 
    def __init__(self, history, cache_path):
223
 
        self.history = history
224
 
 
225
 
        if not os.path.exists(cache_path):
226
 
            os.mkdir(cache_path)
227
 
 
228
 
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
229
 
 
230
 
        # use a lockfile since the cache folder could be shared across
231
 
        # different processes.
232
 
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
233
 
 
234
 
    @with_lock
235
 
    def get_file_changes(self, entries):
236
 
        out = []
237
 
        missing_entries = []
238
 
        missing_entry_indices = []
239
 
        cache = FakeShelf(self._changes_filename)
240
 
        for entry in entries:
241
 
            changes = cache.get(entry.revid)
242
 
            if changes is not None:
243
 
                out.append(changes)
244
 
            else:
245
 
                missing_entries.append(entry)
246
 
                missing_entry_indices.append(len(out))
247
 
                out.append(None)
248
 
        if missing_entries:
249
 
            missing_changes = self.history.get_file_changes_uncached(missing_entries)
250
 
            revid_changes_pairs = []
251
 
            for i, entry, changes in zip(
252
 
                missing_entry_indices, missing_entries, missing_changes):
253
 
                revid_changes_pairs.append((entry.revid, changes))
254
 
                out[i] = changes
255
 
            cache.add(revid_changes_pairs)
256
 
        return out
 
176