~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Robey Pointer
  • Date: 2007-03-26 06:09:27 UTC
  • Revision ID: robey@lag.net-20070326060927-1o991yjbbxkqpf3d
fix some lame setup.py errors

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#
18
18
 
19
19
"""
20
 
a cache for chewed-up 'file change' data structures, which are basically just
21
 
a different way of storing a revision delta.  the cache improves lookup times
22
 
10x over bazaar's xml revision structure, though, so currently still worth
23
 
doing.
 
20
a cache for chewed-up "change" data structures, which are basically just a
 
21
different way of storing a revision.  the cache improves lookup times 10x
 
22
over bazaar's xml revision structure, though, so currently still worth doing.
24
23
 
25
24
once a revision is committed in bazaar, it never changes, so once we have
26
25
cached a change, it's good forever.
27
26
"""
28
27
 
29
 
import cPickle
30
 
import marshal
 
28
import logging
31
29
import os
32
 
import tempfile
33
 
import zlib
34
 
 
35
 
try:
36
 
    from sqlite3 import dbapi2
37
 
except ImportError:
38
 
    from pysqlite2 import dbapi2
39
 
 
40
 
# We take an optimistic approach to concurrency here: we might do work twice
41
 
# in the case of races, but not crash or corrupt data.
42
 
 
43
 
def safe_init_db(filename, init_sql):
44
 
    # To avoid races around creating the database, we create the db in
45
 
    # a temporary file and rename it into the ultimate location.
46
 
    fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(filename))
47
 
    os.close(fd)
48
 
    con = dbapi2.connect(temp_path)
49
 
    cur = con.cursor()
50
 
    cur.execute(init_sql)
51
 
    con.commit()
52
 
    con.close()
53
 
    os.rename(temp_path, filename)
54
 
 
55
 
class FakeShelf(object):
56
 
 
57
 
    def __init__(self, filename):
58
 
        create_table = not os.path.exists(filename)
59
 
        if create_table:
60
 
            safe_init_db(
61
 
                filename, "create table RevisionData "
62
 
                "(revid binary primary key, data binary)")
63
 
        self.connection = dbapi2.connect(filename)
64
 
        self.cursor = self.connection.cursor()
65
 
 
66
 
    def _create_table(self, filename):
67
 
        con = dbapi2.connect(filename)
68
 
        cur = con.cursor()
69
 
        cur.execute(
70
 
            "create table RevisionData "
71
 
            "(revid binary primary key, data binary)")
72
 
        con.commit()
73
 
        con.close()
74
 
 
75
 
    def _serialize(self, obj):
76
 
        return dbapi2.Binary(cPickle.dumps(obj, protocol=2))
77
 
 
78
 
    def _unserialize(self, data):
79
 
        return cPickle.loads(str(data))
80
 
 
81
 
    def get(self, revid):
82
 
        self.cursor.execute(
83
 
            "select data from revisiondata where revid = ?", (revid, ))
84
 
        filechange = self.cursor.fetchone()
85
 
        if filechange is None:
86
 
            return None
87
 
        else:
88
 
            return self._unserialize(filechange[0])
89
 
 
90
 
    def add(self, revid, object):
91
 
        try:
92
 
            self.cursor.execute(
93
 
                "insert into revisiondata (revid, data) values (?, ?)",
94
 
                (revid, self._serialize(object)))
95
 
            self.connection.commit()
96
 
        except dbapi2.IntegrityError:
97
 
            # If another thread or process attempted to set the same key, we
98
 
            # assume it set it to the same value and carry on with our day.
99
 
            pass
100
 
 
101
 
 
102
 
class RevInfoDiskCache(object):
103
 
    """Like `RevInfoMemoryCache` but backed in a sqlite DB."""
104
 
 
105
 
    def __init__(self, cache_path):
 
30
import shelve
 
31
import threading
 
32
import time
 
33
 
 
34
from loggerhead import util
 
35
from loggerhead.util import decorator
 
36
from loggerhead.lockfile import LockFile
 
37
 
 
38
 
 
39
with_lock = util.with_lock('_lock', 'ChangeCache')
 
40
        
 
41
 
 
42
class ChangeCache (object):
 
43
    
 
44
    def __init__(self, history, cache_path):
 
45
        self.history = history
 
46
        self.log = history.log
 
47
        
106
48
        if not os.path.exists(cache_path):
107
49
            os.mkdir(cache_path)
108
 
        filename = os.path.join(cache_path, 'revinfo.sql')
109
 
        create_table = not os.path.exists(filename)
110
 
        if create_table:
111
 
            safe_init_db(
112
 
                filename, "create table Data "
113
 
                "(key binary primary key, revid binary, data binary)")
114
 
        self.connection = dbapi2.connect(filename)
115
 
        self.cursor = self.connection.cursor()
116
 
 
117
 
    def get(self, key, revid):
118
 
        self.cursor.execute(
119
 
            "select revid, data from data where key = ?", (dbapi2.Binary(key),))
120
 
        row = self.cursor.fetchone()
121
 
        if row is None:
122
 
            return None
123
 
        elif str(row[0]) != revid:
124
 
            return None
125
 
        else:
126
 
            return marshal.loads(zlib.decompress(row[1]))
127
 
 
128
 
    def set(self, key, revid, data):
 
50
 
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
 
55
        # use a lockfile since the cache folder could be shared across different processes.
 
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
 
57
        self._closed = False
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
 
65
    @with_lock
 
66
    def close(self):
 
67
        self.log.debug('Closing cache file.')
 
68
        self._closed = True
 
69
    
 
70
    @with_lock
 
71
    def closed(self):
 
72
        return self._closed
 
73
 
 
74
    @with_lock
 
75
    def flush(self):
 
76
        pass
 
77
    
 
78
    @with_lock
 
79
    def get_changes(self, revid_list, get_diffs=False):
 
80
        """
 
81
        get a list of changes by their revision_ids.  any changes missing
 
82
        from the cache are fetched by calling L{History.get_change_uncached}
 
83
        and inserted into the cache before returning.
 
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
 
90
        out = []
 
91
        fetch_list = []
 
92
        sfetch_list = []
 
93
        for revid in revid_list:
 
94
            # if the revid is in unicode, use the utf-8 encoding as the key
 
95
            srevid = util.to_utf8(revid)
 
96
            
 
97
            if srevid in cache:
 
98
                out.append(cache[srevid])
 
99
            else:
 
100
                #self.log.debug('Entry cache miss: %r' % (revid,))
 
101
                out.append(None)
 
102
                fetch_list.append(revid)
 
103
                sfetch_list.append(srevid)
 
104
        
 
105
        if len(fetch_list) > 0:
 
106
            # some revisions weren't in the cache; fetch them
 
107
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
108
            if changes is None:
 
109
                return changes
 
110
            for i in xrange(len(revid_list)):
 
111
                if out[i] is None:
 
112
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
113
        
 
114
        cache.close()
 
115
        return out
 
116
    
 
117
    @with_lock
 
118
    def full(self, get_diffs=False):
 
119
        if get_diffs:
 
120
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
121
        else:
 
122
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
129
123
        try:
130
 
            self.cursor.execute(
131
 
                'delete from data where key = ?', (dbapi2.Binary(key), ))
132
 
            blob = zlib.compress(marshal.dumps(data))
133
 
            self.cursor.execute(
134
 
                "insert into data (key, revid, data) values (?, ?, ?)",
135
 
                map(dbapi2.Binary, [key, revid, blob]))
136
 
            self.connection.commit()
137
 
        except dbapi2.IntegrityError:
138
 
            # If another thread or process attempted to set the same key, we
139
 
            # don't care too much -- it's only a cache after all!
140
 
            pass
 
124
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
125
        finally:
 
126
            cache.close()
 
127
 
 
128
    @with_lock
 
129
    def sizes(self):
 
130
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
131
        s1 = len(cache)
 
132
        cache.close()
 
133
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
134
        s2 = len(cache)
 
135
        cache.close()
 
136
        return s1, s2
 
137
        
 
138
    def check_rebuild(self, max_time=3600):
 
139
        """
 
140
        check if we need to fill in any missing pieces of the cache.  pull in
 
141
        any missing changes, but don't work any longer than C{max_time}
 
142
        seconds.
 
143
        """
 
144
        if self.closed() or self.full():
 
145
            return
 
146
        
 
147
        self.log.info('Building revision cache...')
 
148
        start_time = time.time()
 
149
        last_update = time.time()
 
150
        count = 0
 
151
 
 
152
        work = list(self.history.get_revision_history())
 
153
        jump = 100
 
154
        for i in xrange(0, len(work), jump):
 
155
            r = work[i:i + jump]
 
156
            # must call into history so we grab the branch lock (otherwise, lock inversion)
 
157
            self.history.get_changes(r)
 
158
            if self.closed():
 
159
                self.flush()
 
160
                return
 
161
            count += jump
 
162
            now = time.time()
 
163
            if now - start_time > max_time:
 
164
                self.log.info('Cache rebuilding will pause for now.')
 
165
                self.flush()
 
166
                return
 
167
            if now - last_update > 60:
 
168
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
 
169
                last_update = time.time()
 
170
                self.flush()
 
171
            # give someone else a chance at the lock
 
172
            time.sleep(1)
 
173
        self.log.info('Revision cache rebuild completed.')
 
174
        self.flush()
 
175
 
 
176