~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Robey Pointer
  • Date: 2007-03-26 06:09:27 UTC
  • Revision ID: robey@lag.net-20070326060927-1o991yjbbxkqpf3d
fix some lame setup.py errors

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
cached a change, it's good forever.
26
26
"""
27
27
 
28
 
import cPickle
 
28
import logging
29
29
import os
 
30
import shelve
 
31
import threading
 
32
import time
30
33
 
31
34
from loggerhead import util
 
35
from loggerhead.util import decorator
32
36
from loggerhead.lockfile import LockFile
33
37
 
 
38
 
34
39
with_lock = util.with_lock('_lock', 'ChangeCache')
35
 
 
36
 
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
37
 
 
38
 
if SQLITE_INTERFACE == 'pysqlite2':
39
 
    from pysqlite2 import dbapi2
40
 
    _param_marker = '?'
41
 
elif SQLITE_INTERFACE == 'sqlite':
42
 
    import sqlite as dbapi2
43
 
    _param_marker = '%s'
44
 
 
45
 
 
46
 
_select_stmt = ("select data from revisiondata where revid = ?"
47
 
                ).replace('?', _param_marker)
48
 
_insert_stmt = ("insert into revisiondata (revid, data) "
49
 
                "values (?, ?)").replace('?', _param_marker)
50
 
 
51
 
 
52
 
 
53
 
 
54
 
class FakeShelf(object):
55
 
    def __init__(self, filename):
56
 
        create_table = not os.path.exists(filename)
57
 
        self.connection = dbapi2.connect(filename)
58
 
        self.cursor = self.connection.cursor()
59
 
        if create_table:
60
 
            self._create_table()
61
 
    def _create_table(self):
62
 
        self.cursor.execute(
63
 
            "create table RevisionData "
64
 
            "(revid binary primary key, data binary)")
65
 
        self.connection.commit()
66
 
    def _serialize(self, obj):
67
 
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
68
 
        return r
69
 
    def _unserialize(self, data):
70
 
        return cPickle.loads(str(data))
71
 
    def get(self, revid):
72
 
        self.cursor.execute(_select_stmt, (revid,))
73
 
        filechange = self.cursor.fetchone()
74
 
        if filechange is None:
75
 
            return None
76
 
        else:
77
 
            return self._unserialize(filechange[0])
78
 
    def add(self, revid_obj_pairs):
79
 
        for  (r, d) in revid_obj_pairs:
80
 
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
81
 
        self.connection.commit()
82
 
 
83
 
 
84
 
class FileChangeCache(object):
 
40
        
 
41
 
 
42
class ChangeCache (object):
 
43
    
85
44
    def __init__(self, history, cache_path):
86
45
        self.history = history
87
 
 
 
46
        self.log = history.log
 
47
        
88
48
        if not os.path.exists(cache_path):
89
49
            os.mkdir(cache_path)
90
50
 
91
 
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
92
 
 
93
 
        # use a lockfile since the cache folder could be shared across
94
 
        # different processes.
95
 
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
96
 
 
97
 
    @with_lock
98
 
    def get_file_changes(self, entries):
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
 
55
        # use a lockfile since the cache folder could be shared across different processes.
 
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
 
57
        self._closed = False
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
 
65
    @with_lock
 
66
    def close(self):
 
67
        self.log.debug('Closing cache file.')
 
68
        self._closed = True
 
69
    
 
70
    @with_lock
 
71
    def closed(self):
 
72
        return self._closed
 
73
 
 
74
    @with_lock
 
75
    def flush(self):
 
76
        pass
 
77
    
 
78
    @with_lock
 
79
    def get_changes(self, revid_list, get_diffs=False):
 
80
        """
 
81
        get a list of changes by their revision_ids.  any changes missing
 
82
        from the cache are fetched by calling L{History.get_change_uncached}
 
83
        and inserted into the cache before returning.
 
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
99
90
        out = []
100
 
        missing_entries = []
101
 
        missing_entry_indices = []
102
 
        cache = FakeShelf(self._changes_filename)
103
 
        for entry in entries:
104
 
            changes = cache.get(entry.revid)
105
 
            if changes is not None:
106
 
                out.append(changes)
 
91
        fetch_list = []
 
92
        sfetch_list = []
 
93
        for revid in revid_list:
 
94
            # if the revid is in unicode, use the utf-8 encoding as the key
 
95
            srevid = util.to_utf8(revid)
 
96
            
 
97
            if srevid in cache:
 
98
                out.append(cache[srevid])
107
99
            else:
108
 
                missing_entries.append(entry)
109
 
                missing_entry_indices.append(len(out))
 
100
                #self.log.debug('Entry cache miss: %r' % (revid,))
110
101
                out.append(None)
111
 
        if missing_entries:
112
 
            missing_changes = self.history.get_file_changes_uncached(missing_entries)
113
 
            revid_changes_pairs = []
114
 
            for i, entry, changes in zip(
115
 
                missing_entry_indices, missing_entries, missing_changes):
116
 
                revid_changes_pairs.append((entry.revid, changes))
117
 
                out[i] = changes
118
 
            cache.add(revid_changes_pairs)
 
102
                fetch_list.append(revid)
 
103
                sfetch_list.append(srevid)
 
104
        
 
105
        if len(fetch_list) > 0:
 
106
            # some revisions weren't in the cache; fetch them
 
107
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
108
            if changes is None:
 
109
                return changes
 
110
            for i in xrange(len(revid_list)):
 
111
                if out[i] is None:
 
112
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
113
        
 
114
        cache.close()
119
115
        return out
 
116
    
 
117
    @with_lock
 
118
    def full(self, get_diffs=False):
 
119
        if get_diffs:
 
120
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
121
        else:
 
122
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
123
        try:
 
124
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
125
        finally:
 
126
            cache.close()
 
127
 
 
128
    @with_lock
 
129
    def sizes(self):
 
130
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
131
        s1 = len(cache)
 
132
        cache.close()
 
133
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
134
        s2 = len(cache)
 
135
        cache.close()
 
136
        return s1, s2
 
137
        
 
138
    def check_rebuild(self, max_time=3600):
 
139
        """
 
140
        check if we need to fill in any missing pieces of the cache.  pull in
 
141
        any missing changes, but don't work any longer than C{max_time}
 
142
        seconds.
 
143
        """
 
144
        if self.closed() or self.full():
 
145
            return
 
146
        
 
147
        self.log.info('Building revision cache...')
 
148
        start_time = time.time()
 
149
        last_update = time.time()
 
150
        count = 0
 
151
 
 
152
        work = list(self.history.get_revision_history())
 
153
        jump = 100
 
154
        for i in xrange(0, len(work), jump):
 
155
            r = work[i:i + jump]
 
156
            # must call into history so we grab the branch lock (otherwise, lock inversion)
 
157
            self.history.get_changes(r)
 
158
            if self.closed():
 
159
                self.flush()
 
160
                return
 
161
            count += jump
 
162
            now = time.time()
 
163
            if now - start_time > max_time:
 
164
                self.log.info('Cache rebuilding will pause for now.')
 
165
                self.flush()
 
166
                return
 
167
            if now - last_update > 60:
 
168
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
 
169
                last_update = time.time()
 
170
                self.flush()
 
171
            # give someone else a chance at the lock
 
172
            time.sleep(1)
 
173
        self.log.info('Revision cache rebuild completed.')
 
174
        self.flush()
 
175
 
 
176