~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Martin Albisetti
  • Date: 2008-07-25 01:32:19 UTC
  • mto: (157.1.3 loggerhead)
  • mto: This revision was merged to the branch mainline in revision 187.
  • Revision ID: argentina@gmail.com-20080725013219-a2f9vntlapuki522
 * Add javascript library to order stuff in tables
 * Make the fileview order-able

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
cached a change, it's good forever.
26
26
"""
27
27
 
28
 
import logging
 
28
import cPickle
29
29
import os
30
 
import shelve
31
 
import threading
32
 
import time
33
30
 
34
31
from loggerhead import util
35
 
from loggerhead.util import decorator
36
32
from loggerhead.lockfile import LockFile
37
33
 
38
 
 
39
34
with_lock = util.with_lock('_lock', 'ChangeCache')
40
 
        
41
 
 
42
 
class ChangeCache (object):
43
 
    
 
35
 
 
36
SQLITE_INTERFACE = os.environ.get('SQLITE_INTERFACE', 'sqlite')
 
37
 
 
38
if SQLITE_INTERFACE == 'pysqlite2':
 
39
    from pysqlite2 import dbapi2
 
40
    _param_marker = '?'
 
41
elif SQLITE_INTERFACE == 'sqlite':
 
42
    import sqlite as dbapi2
 
43
    _param_marker = '%s'
 
44
 
 
45
 
 
46
_select_stmt = ("select data from revisiondata where revid = ?"
 
47
                ).replace('?', _param_marker)
 
48
_insert_stmt = ("insert into revisiondata (revid, data) "
 
49
                "values (?, ?)").replace('?', _param_marker)
 
50
 
 
51
 
 
52
 
 
53
 
 
54
class FakeShelf(object):
 
55
    def __init__(self, filename):
 
56
        create_table = not os.path.exists(filename)
 
57
        self.connection = dbapi2.connect(filename)
 
58
        self.cursor = self.connection.cursor()
 
59
        if create_table:
 
60
            self._create_table()
 
61
    def _create_table(self):
 
62
        self.cursor.execute(
 
63
            "create table RevisionData "
 
64
            "(revid binary primary key, data binary)")
 
65
        self.connection.commit()
 
66
    def _serialize(self, obj):
 
67
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
 
68
        return r
 
69
    def _unserialize(self, data):
 
70
        return cPickle.loads(str(data))
 
71
    def get(self, revid):
 
72
        self.cursor.execute(_select_stmt, (revid,))
 
73
        filechange = self.cursor.fetchone()
 
74
        if filechange is None:
 
75
            return None
 
76
        else:
 
77
            return self._unserialize(filechange[0])
 
78
    def add(self, revid_obj_pairs):
 
79
        for  (r, d) in revid_obj_pairs:
 
80
            self.cursor.execute(_insert_stmt, (r, self._serialize(d)))
 
81
        self.connection.commit()
 
82
 
 
83
 
 
84
class FileChangeCache(object):
44
85
    def __init__(self, history, cache_path):
45
86
        self.history = history
46
 
        self.log = history.log
47
 
        
 
87
 
48
88
        if not os.path.exists(cache_path):
49
89
            os.mkdir(cache_path)
50
90
 
51
 
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
52
 
        self._changes_filename = os.path.join(cache_path, 'changes')
53
 
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
54
 
        
55
 
        # use a lockfile since the cache folder could be shared across different processes.
56
 
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
57
 
        self._closed = False
58
 
        
59
 
        # this is fluff; don't slow down startup time with it.
60
 
        def log_sizes():
61
 
            s1, s2 = self.sizes()
62
 
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
63
 
        threading.Thread(target=log_sizes).start()
64
 
    
65
 
    @with_lock
66
 
    def close(self):
67
 
        self.log.debug('Closing cache file.')
68
 
        self._closed = True
69
 
    
70
 
    @with_lock
71
 
    def closed(self):
72
 
        return self._closed
73
 
 
74
 
    @with_lock
75
 
    def flush(self):
76
 
        pass
77
 
    
78
 
    @with_lock
79
 
    def get_changes(self, revid_list, get_diffs=False):
80
 
        """
81
 
        get a list of changes by their revision_ids.  any changes missing
82
 
        from the cache are fetched by calling L{History.get_change_uncached}
83
 
        and inserted into the cache before returning.
84
 
        """
85
 
        if get_diffs:
86
 
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
87
 
        else:
88
 
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
89
 
 
 
91
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
 
92
 
 
93
        # use a lockfile since the cache folder could be shared across
 
94
        # different processes.
 
95
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
 
96
 
 
97
    @with_lock
 
98
    def get_file_changes(self, entries):
90
99
        out = []
91
 
        fetch_list = []
92
 
        sfetch_list = []
93
 
        for revid in revid_list:
94
 
            # if the revid is in unicode, use the utf-8 encoding as the key
95
 
            srevid = util.to_utf8(revid)
96
 
            
97
 
            if srevid in cache:
98
 
                out.append(cache[srevid])
 
100
        missing_entries = []
 
101
        missing_entry_indices = []
 
102
        cache = FakeShelf(self._changes_filename)
 
103
        for entry in entries:
 
104
            changes = cache.get(entry.revid)
 
105
            if changes is not None:
 
106
                out.append(changes)
99
107
            else:
100
 
                #self.log.debug('Entry cache miss: %r' % (revid,))
 
108
                missing_entries.append(entry)
 
109
                missing_entry_indices.append(len(out))
101
110
                out.append(None)
102
 
                fetch_list.append(revid)
103
 
                sfetch_list.append(srevid)
104
 
        
105
 
        if len(fetch_list) > 0:
106
 
            # some revisions weren't in the cache; fetch them
107
 
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
108
 
            if changes is None:
109
 
                return changes
110
 
            for i in xrange(len(revid_list)):
111
 
                if out[i] is None:
112
 
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
113
 
        
114
 
        cache.close()
 
111
        if missing_entries:
 
112
            missing_changes = self.history.get_file_changes_uncached(missing_entries)
 
113
            revid_changes_pairs = []
 
114
            for i, entry, changes in zip(
 
115
                missing_entry_indices, missing_entries, missing_changes):
 
116
                revid_changes_pairs.append((entry.revid, changes))
 
117
                out[i] = changes
 
118
            cache.add(revid_changes_pairs)
115
119
        return out
116
 
    
117
 
    @with_lock
118
 
    def full(self, get_diffs=False):
119
 
        if get_diffs:
120
 
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
121
 
        else:
122
 
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
123
 
        try:
124
 
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
125
 
        finally:
126
 
            cache.close()
127
 
 
128
 
    @with_lock
129
 
    def sizes(self):
130
 
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
131
 
        s1 = len(cache)
132
 
        cache.close()
133
 
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
134
 
        s2 = len(cache)
135
 
        cache.close()
136
 
        return s1, s2
137
 
        
138
 
    def check_rebuild(self, max_time=3600):
139
 
        """
140
 
        check if we need to fill in any missing pieces of the cache.  pull in
141
 
        any missing changes, but don't work any longer than C{max_time}
142
 
        seconds.
143
 
        """
144
 
        if self.closed() or self.full():
145
 
            return
146
 
        
147
 
        self.log.info('Building revision cache...')
148
 
        start_time = time.time()
149
 
        last_update = time.time()
150
 
        count = 0
151
 
 
152
 
        work = list(self.history.get_revision_history())
153
 
        jump = 100
154
 
        for i in xrange(0, len(work), jump):
155
 
            r = work[i:i + jump]
156
 
            # must call into history so we grab the branch lock (otherwise, lock inversion)
157
 
            self.history.get_changes(r)
158
 
            if self.closed():
159
 
                self.flush()
160
 
                return
161
 
            count += jump
162
 
            now = time.time()
163
 
            if now - start_time > max_time:
164
 
                self.log.info('Cache rebuilding will pause for now.')
165
 
                self.flush()
166
 
                return
167
 
            if now - last_update > 60:
168
 
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
169
 
                last_update = time.time()
170
 
                self.flush()
171
 
            # give someone else a chance at the lock
172
 
            time.sleep(1)
173
 
        self.log.info('Revision cache rebuild completed.')
174
 
        self.flush()
175
 
 
176