~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

ok i should've known those API calls wouldn't be consistent.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
cached a change, it's good forever.
26
26
"""
27
27
 
28
 
import cPickle
 
28
import logging
29
29
import os
 
30
import shelve
 
31
import threading
 
32
import time
30
33
 
31
34
from loggerhead import util
 
35
from loggerhead.util import decorator
32
36
from loggerhead.lockfile import LockFile
33
37
 
 
38
 
34
39
with_lock = util.with_lock('_lock', 'ChangeCache')
35
 
 
36
 
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite3')
37
 
 
38
 
if SQLITE_INTERFACE == 'sqlite3':
39
 
    from sqlite3 import dbapi2
40
 
    _param_marker = '?'
41
 
 
42
 
_select_stmt = ("select data from revisiondata where revid = ?"
43
 
                ).replace('?', _param_marker)
44
 
_insert_stmt = ("insert into revisiondata (revid, data) "
45
 
                "values (?, ?)").replace('?', _param_marker)
46
 
 
47
 
 
48
 
 
49
 
 
50
 
class FakeShelf(object):
51
 
    def __init__(self, filename):
52
 
        create_table = not os.path.exists(filename)
53
 
        self.connection = dbapi2.connect(filename)
54
 
        self.cursor = self.connection.cursor()
55
 
        if create_table:
56
 
            self._create_table()
57
 
    def _create_table(self):
58
 
        self.cursor.execute(
59
 
            "create table RevisionData "
60
 
            "(revid binary primary key, data binary)")
61
 
        self.connection.commit()
62
 
    def _serialize(self, obj):
63
 
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
64
 
        return r
65
 
    def _unserialize(self, data):
66
 
        return cPickle.loads(str(data))
67
 
    def get(self, revid):
68
 
        self.cursor.execute(_select_stmt, (revid,))
69
 
        filechange = self.cursor.fetchone()
70
 
        if filechange is None:
71
 
            return None
72
 
        else:
73
 
            return self._unserialize(filechange[0])
74
 
    def add(self, revid_obj_pairs):
75
 
        for  (r, d) in revid_obj_pairs:
76
 
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
77
 
        self.connection.commit()
78
 
 
79
 
 
80
 
class FileChangeCache(object):
 
40
        
 
41
 
 
42
class ChangeCache (object):
 
43
    
81
44
    def __init__(self, history, cache_path):
82
45
        self.history = history
83
 
 
 
46
        self.log = history.log
 
47
        
84
48
        if not os.path.exists(cache_path):
85
49
            os.mkdir(cache_path)
86
50
 
87
 
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
88
 
 
89
 
        # use a lockfile since the cache folder could be shared across
90
 
        # different processes.
91
 
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
92
 
 
93
 
    @with_lock
94
 
    def get_file_changes(self, entries):
95
 
        out = []
96
 
        missing_entries = []
97
 
        missing_entry_indices = []
98
 
        cache = FakeShelf(self._changes_filename)
99
 
        for entry in entries:
100
 
            changes = cache.get(entry.revid)
101
 
            if changes is not None:
102
 
                out.append(changes)
103
 
            else:
104
 
                missing_entries.append(entry)
105
 
                missing_entry_indices.append(len(out))
106
 
                out.append(None)
107
 
        if missing_entries:
108
 
            missing_changes = self.history.get_file_changes_uncached(missing_entries)
109
 
            revid_changes_pairs = []
110
 
            for i, entry, changes in zip(
111
 
                missing_entry_indices, missing_entries, missing_changes):
112
 
                revid_changes_pairs.append((entry.revid, changes))
113
 
                out[i] = changes
114
 
            cache.add(revid_changes_pairs)
115
 
        return out
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
 
55
        # use a lockfile since the cache folder could be shared across different processes.
 
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
 
57
        self._closed = False
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
 
65
    @with_lock
 
66
    def close(self):
 
67
        self.log.debug('Closing cache file.')
 
68
        self._closed = True
 
69
    
 
70
    @with_lock
 
71
    def closed(self):
 
72
        return self._closed
 
73
 
 
74
    @with_lock
 
75
    def flush(self):
 
76
        pass
 
77
    
 
78
    @with_lock
 
79
    def get_changes(self, revid_list, get_diffs=False):
 
80
        """
 
81
        get a list of changes by their revision_ids.  any changes missing
 
82
        from the cache are fetched by calling L{History.get_change_uncached}
 
83
        and inserted into the cache before returning.
 
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
 
90
        try:
 
91
            out = []
 
92
            fetch_list = []
 
93
            sfetch_list = []
 
94
            for revid in revid_list:
 
95
                # if the revid is in unicode, use the utf-8 encoding as the key
 
96
                srevid = util.to_utf8(revid)
 
97
                
 
98
                if srevid in cache:
 
99
                    out.append(cache[srevid])
 
100
                else:
 
101
                    #self.log.debug('Entry cache miss: %r' % (revid,))
 
102
                    out.append(None)
 
103
                    fetch_list.append(revid)
 
104
                    sfetch_list.append(srevid)
 
105
            
 
106
            if len(fetch_list) > 0:
 
107
                # some revisions weren't in the cache; fetch them
 
108
                changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
109
                if changes is None:
 
110
                    return changes
 
111
                for i in xrange(len(revid_list)):
 
112
                    if out[i] is None:
 
113
                        cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
114
            return out
 
115
        finally:
 
116
            cache.close()
 
117
    
 
118
    @with_lock
 
119
    def full(self, get_diffs=False):
 
120
        if get_diffs:
 
121
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
122
        else:
 
123
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
124
        try:
 
125
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
126
        finally:
 
127
            cache.close()
 
128
 
 
129
    @with_lock
 
130
    def sizes(self):
 
131
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
132
        s1 = len(cache)
 
133
        cache.close()
 
134
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
135
        s2 = len(cache)
 
136
        cache.close()
 
137
        return s1, s2
 
138
        
 
139
    def check_rebuild(self, max_time=3600):
 
140
        """
 
141
        check if we need to fill in any missing pieces of the cache.  pull in
 
142
        any missing changes, but don't work any longer than C{max_time}
 
143
        seconds.
 
144
        """
 
145
        if self.closed() or self.full():
 
146
            return
 
147
        
 
148
        self.log.info('Building revision cache...')
 
149
        start_time = time.time()
 
150
        last_update = time.time()
 
151
        count = 0
 
152
 
 
153
        work = list(self.history.get_revision_history())
 
154
        jump = 100
 
155
        for i in xrange(0, len(work), jump):
 
156
            r = work[i:i + jump]
 
157
            # must call into history so we grab the branch lock (otherwise, lock inversion)
 
158
            self.history.get_changes(r)
 
159
            if self.closed():
 
160
                self.flush()
 
161
                return
 
162
            count += jump
 
163
            now = time.time()
 
164
            if now - start_time > max_time:
 
165
                self.log.info('Cache rebuilding will pause for now.')
 
166
                self.flush()
 
167
                return
 
168
            if now - last_update > 60:
 
169
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
 
170
                last_update = time.time()
 
171
                self.flush()
 
172
            # give someone else a chance at the lock
 
173
            time.sleep(1)
 
174
        self.log.info('Revision cache rebuild completed.')
 
175
        self.flush()
 
176
 
 
177