~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Matt Nordhoff
  • Date: 2010-01-14 00:20:24 UTC
  • mfrom: (399.1.3 loggerhead)
  • Revision ID: mnordhoff@mattnordhoff.com-20100114002024-w9wojo0hu8tg1j2q
Don't build a branch's revision graph cache more than once at the same time. (Max Kanat-Alexander)

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#
18
18
 
19
19
"""
20
 
a cache for chewed-up "change" data structures, which are basically just a
21
 
different way of storing a revision.  the cache improves lookup times 10x
22
 
over bazaar's xml revision structure, though, so currently still worth doing.
 
20
a cache for chewed-up 'file change' data structures, which are basically just
 
21
a different way of storing a revision delta.  the cache improves lookup times
 
22
10x over bazaar's xml revision structure, though, so currently still worth
 
23
doing.
23
24
 
24
25
once a revision is committed in bazaar, it never changes, so once we have
25
26
cached a change, it's good forever.
26
27
"""
27
28
 
28
 
import logging
 
29
import cPickle
 
30
import marshal
29
31
import os
30
 
import shelve
31
 
import threading
32
 
import time
33
 
 
34
 
from loggerhead import util
35
 
from loggerhead.util import decorator
36
 
 
37
 
 
38
 
with_lock = util.with_lock('_lock', 'ChangeCache')
39
 
 
40
 
 
41
 
class ChangeCache (object):
42
 
    
43
 
    def __init__(self, history, cache_path):
44
 
        self.history = history
45
 
        self.log = history.log
46
 
        
47
 
        if not os.path.exists(cache_path):
48
 
            os.mkdir(cache_path)
49
 
 
50
 
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
51
 
        changes_filename = os.path.join(cache_path, 'changes')
52
 
        changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
53
 
        
54
 
        self._cache = shelve.open(changes_filename, 'c', protocol=2)
55
 
        self._cache_diffs = shelve.open(changes_diffs_filename, 'c', protocol=2)
56
 
        
57
 
        self._lock = threading.RLock()
58
 
        self._closed = False
59
 
        
60
 
        # once we process a change (revision), it should be the same forever.
61
 
        self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, len(self._cache), len(self._cache_diffs)))
62
 
    
63
 
    @with_lock
64
 
    def close(self):
65
 
        self._cache.close()
66
 
        self._cache_diffs.close()
67
 
        self._closed = True
68
 
    
69
 
    @with_lock
70
 
    def closed(self):
71
 
        return self._closed
72
 
 
73
 
    @with_lock
74
 
    def flush(self):
75
 
        self._cache.sync()
76
 
        self._cache_diffs.sync()
77
 
    
78
 
    @with_lock
79
 
    def get_changes(self, revid_list, get_diffs=False):
80
 
        """
81
 
        get a list of changes by their revision_ids.  any changes missing
82
 
        from the cache are fetched by calling L{History.get_change_uncached}
83
 
        and inserted into the cache before returning.
84
 
        """
85
 
        if get_diffs:
86
 
            cache = self._cache_diffs
87
 
        else:
88
 
            cache = self._cache
89
 
 
90
 
        out = []
91
 
        fetch_list = []
92
 
        sfetch_list = []
93
 
        for revid in revid_list:
94
 
            # if the revid is in unicode, use the utf-8 encoding as the key
95
 
            srevid = util.to_utf8(revid)
96
 
            
97
 
            if srevid in cache:
98
 
                out.append(cache[srevid])
99
 
            else:
100
 
                #self.log.debug('Entry cache miss: %r' % (revid,))
101
 
                out.append(None)
102
 
                fetch_list.append(revid)
103
 
                sfetch_list.append(srevid)
104
 
        
105
 
        if len(fetch_list) > 0:
106
 
            # some revisions weren't in the cache; fetch them
107
 
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
108
 
            if changes is None:
109
 
                return changes
110
 
            for i in xrange(len(revid_list)):
111
 
                if out[i] is None:
112
 
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
113
 
        
114
 
        return out
115
 
    
116
 
    @with_lock
117
 
    def full(self, get_diffs=False):
118
 
        if get_diffs:
119
 
            cache = self._cache_diffs
120
 
        else:
121
 
            cache = self._cache
122
 
        return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
123
 
 
124
 
    def check_rebuild(self, max_time=3600):
125
 
        """
126
 
        check if we need to fill in any missing pieces of the cache.  pull in
127
 
        any missing changes, but don't work any longer than C{max_time}
128
 
        seconds.
129
 
        """
130
 
        if self.full():
131
 
            return
132
 
        
133
 
        self.log.info('Building revision cache...')
134
 
        start_time = time.time()
135
 
        last_update = time.time()
136
 
        count = 0
137
 
 
138
 
        work = list(self.history.get_revision_history())
139
 
        jump = 100
140
 
        for i in xrange(0, len(work), jump):
141
 
            r = work[i:i + jump]
142
 
            # must call into history so we grab the branch lock (otherwise, lock inversion)
143
 
            self.history.get_changes(r)
144
 
            if self.closed():
145
 
                return
146
 
            count += jump
147
 
            now = time.time()
148
 
            if now - start_time > max_time:
149
 
                self.log.info('Cache rebuilding will pause for now.')
150
 
                self.flush()
151
 
                return
152
 
            if now - last_update > 60:
153
 
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
154
 
                last_update = time.time()
155
 
                self.flush()
156
 
        self.log.info('Revision cache rebuild completed.')
157
 
        self.flush()
158
 
 
159
 
 
 
32
import tempfile
 
33
import zlib
 
34
 
 
35
try:
 
36
    from sqlite3 import dbapi2
 
37
except ImportError:
 
38
    from pysqlite2 import dbapi2
 
39
 
 
40
# We take an optimistic approach to concurrency here: we might do work twice
 
41
# in the case of races, but not crash or corrupt data.
 
42
 
 
43
def safe_init_db(filename, init_sql):
 
44
    # To avoid races around creating the database, we create the db in
 
45
    # a temporary file and rename it into the ultimate location.
 
46
    fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(filename))
 
47
    os.close(fd)
 
48
    con = dbapi2.connect(temp_path)
 
49
    cur = con.cursor()
 
50
    cur.execute(init_sql)
 
51
    con.commit()
 
52
    con.close()
 
53
    os.rename(temp_path, filename)
 
54
 
 
55
class FakeShelf(object):
 
56
 
 
57
    def __init__(self, filename):
 
58
        create_table = not os.path.exists(filename)
 
59
        if create_table:
 
60
            safe_init_db(
 
61
                filename, "create table RevisionData "
 
62
                "(revid binary primary key, data binary)")
 
63
        self.connection = dbapi2.connect(filename)
 
64
        self.cursor = self.connection.cursor()
 
65
 
 
66
    def _create_table(self, filename):
 
67
        con = dbapi2.connect(filename)
 
68
        cur = con.cursor()
 
69
        cur.execute(
 
70
            "create table RevisionData "
 
71
            "(revid binary primary key, data binary)")
 
72
        con.commit()
 
73
        con.close()
 
74
 
 
75
    def _serialize(self, obj):
 
76
        return dbapi2.Binary(cPickle.dumps(obj, protocol=2))
 
77
 
 
78
    def _unserialize(self, data):
 
79
        return cPickle.loads(str(data))
 
80
 
 
81
    def get(self, revid):
 
82
        self.cursor.execute(
 
83
            "select data from revisiondata where revid = ?", (revid, ))
 
84
        filechange = self.cursor.fetchone()
 
85
        if filechange is None:
 
86
            return None
 
87
        else:
 
88
            return self._unserialize(filechange[0])
 
89
 
 
90
    def add(self, revid, object):
 
91
        try:
 
92
            self.cursor.execute(
 
93
                "insert into revisiondata (revid, data) values (?, ?)",
 
94
                (revid, self._serialize(object)))
 
95
            self.connection.commit()
 
96
        except dbapi2.IntegrityError:
 
97
            # If another thread or process attempted to set the same key, we
 
98
            # assume it set it to the same value and carry on with our day.
 
99
            pass
 
100
 
 
101
 
 
102
class FileChangeCache(object):
 
103
 
 
104
    def __init__(self, cache_path):
 
105
 
 
106
        if not os.path.exists(cache_path):
 
107
            os.mkdir(cache_path)
 
108
 
 
109
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
 
110
 
 
111
    def get_file_changes(self, entry):
 
112
        cache = FakeShelf(self._changes_filename)
 
113
        changes = cache.get(entry.revid)
 
114
        if changes is None:
 
115
            changes = self.history.get_file_changes_uncached(entry)
 
116
            cache.add(entry.revid, changes)
 
117
        return changes
 
118
 
 
119
 
 
120
class RevInfoDiskCache(object):
 
121
    """Like `RevInfoMemoryCache` but backed in a sqlite DB."""
 
122
 
 
123
    def __init__(self, cache_path):
 
124
        if not os.path.exists(cache_path):
 
125
            os.mkdir(cache_path)
 
126
        filename = os.path.join(cache_path, 'revinfo.sql')
 
127
        create_table = not os.path.exists(filename)
 
128
        if create_table:
 
129
            safe_init_db(
 
130
                filename, "create table Data "
 
131
                "(key binary primary key, revid binary, data binary)")
 
132
        self.connection = dbapi2.connect(filename)
 
133
        self.cursor = self.connection.cursor()
 
134
 
 
135
    def get(self, key, revid):
 
136
        self.cursor.execute(
 
137
            "select revid, data from data where key = ?", (dbapi2.Binary(key),))
 
138
        row = self.cursor.fetchone()
 
139
        if row is None:
 
140
            return None
 
141
        elif str(row[0]) != revid:
 
142
            return None
 
143
        else:
 
144
            return marshal.loads(zlib.decompress(row[1]))
 
145
 
 
146
    def set(self, key, revid, data):
 
147
        try:
 
148
            self.cursor.execute(
 
149
                'delete from data where key = ?', (dbapi2.Binary(key), ))
 
150
            blob = zlib.compress(marshal.dumps(data))
 
151
            self.cursor.execute(
 
152
                "insert into data (key, revid, data) values (?, ?, ?)",
 
153
                map(dbapi2.Binary, [key, revid, blob]))
 
154
            self.connection.commit()
 
155
        except dbapi2.IntegrityError:
 
156
            # If another thread or process attempted to set the same key, we
 
157
            # don't care too much -- it's only a cache after all!
 
158
            pass