~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Robey Pointer
  • Date: 2007-01-14 05:40:40 UTC
  • Revision ID: robey@lag.net-20070114054040-7i9lbhq992e612rq
fix up dev.cfg so that nobody will ever have to edit it, by letting the
important params be overridable in loggerhead.conf.

make start-loggerhead actually daemonize, write a pid file, and write logs
to normal log files, instead of requiring 'nohup' stuff.  ie act like a real
server.  added stop-loggerhead to do a clean shutdown.  changed the README
to clarify how it should work now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#
18
18
 
19
19
"""
20
 
a cache for chewed-up 'file change' data structures, which are basically just
21
 
a different way of storing a revision delta.  the cache improves lookup times
22
 
10x over bazaar's xml revision structure, though, so currently still worth
23
 
doing.
 
20
a cache for chewed-up "change" data structures, which are basically just a
 
21
different way of storing a revision.  the cache improves lookup times 10x
 
22
over bazaar's xml revision structure, though, so currently still worth doing.
24
23
 
25
24
once a revision is committed in bazaar, it never changes, so once we have
26
25
cached a change, it's good forever.
27
26
"""
28
27
 
29
 
import cPickle
30
 
import marshal
 
28
import logging
31
29
import os
32
 
import tempfile
33
 
import zlib
34
 
 
35
 
try:
36
 
    from sqlite3 import dbapi2
37
 
except ImportError:
38
 
    from pysqlite2 import dbapi2
39
 
 
40
 
# We take an optimistic approach to concurrency here: we might do work twice
41
 
# in the case of races, but not crash or corrupt data.
42
 
 
43
 
def safe_init_db(filename, init_sql):
44
 
    # To avoid races around creating the database, we create the db in
45
 
    # a temporary file and rename it into the ultimate location.
46
 
    fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(filename))
47
 
    os.close(fd)
48
 
    con = dbapi2.connect(temp_path)
49
 
    cur = con.cursor()
50
 
    cur.execute(init_sql)
51
 
    con.commit()
52
 
    con.close()
53
 
    os.rename(temp_path, filename)
54
 
 
55
 
class FakeShelf(object):
56
 
 
57
 
    def __init__(self, filename):
58
 
        create_table = not os.path.exists(filename)
59
 
        if create_table:
60
 
            safe_init_db(
61
 
                filename, "create table RevisionData "
62
 
                "(revid binary primary key, data binary)")
63
 
        self.connection = dbapi2.connect(filename)
64
 
        self.cursor = self.connection.cursor()
65
 
 
66
 
    def _create_table(self, filename):
67
 
        con = dbapi2.connect(filename)
68
 
        cur = con.cursor()
69
 
        cur.execute(
70
 
            "create table RevisionData "
71
 
            "(revid binary primary key, data binary)")
72
 
        con.commit()
73
 
        con.close()
74
 
 
75
 
    def _serialize(self, obj):
76
 
        return dbapi2.Binary(cPickle.dumps(obj, protocol=2))
77
 
 
78
 
    def _unserialize(self, data):
79
 
        return cPickle.loads(str(data))
80
 
 
81
 
    def get(self, revid):
82
 
        self.cursor.execute(
83
 
            "select data from revisiondata where revid = ?", (revid, ))
84
 
        filechange = self.cursor.fetchone()
85
 
        if filechange is None:
86
 
            return None
87
 
        else:
88
 
            return self._unserialize(filechange[0])
89
 
 
90
 
    def add(self, revid, object):
91
 
        try:
92
 
            self.cursor.execute(
93
 
                "insert into revisiondata (revid, data) values (?, ?)",
94
 
                (revid, self._serialize(object)))
95
 
            self.connection.commit()
96
 
        except dbapi2.IntegrityError:
97
 
            # If another thread or process attempted to set the same key, we
98
 
            # assume it set it to the same value and carry on with our day.
99
 
            pass
100
 
 
101
 
 
102
 
class FileChangeCache(object):
103
 
 
104
 
    def __init__(self, cache_path):
105
 
 
106
 
        if not os.path.exists(cache_path):
107
 
            os.mkdir(cache_path)
108
 
 
109
 
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
110
 
 
111
 
    def get_file_changes(self, entry):
112
 
        cache = FakeShelf(self._changes_filename)
113
 
        changes = cache.get(entry.revid)
114
 
        if changes is None:
115
 
            changes = self.history.get_file_changes_uncached(entry)
116
 
            cache.add(entry.revid, changes)
117
 
        return changes
118
 
 
119
 
 
120
 
class RevInfoDiskCache(object):
121
 
    """Like `RevInfoMemoryCache` but backed in a sqlite DB."""
122
 
 
123
 
    def __init__(self, cache_path):
124
 
        if not os.path.exists(cache_path):
125
 
            os.mkdir(cache_path)
126
 
        filename = os.path.join(cache_path, 'revinfo.sql')
127
 
        create_table = not os.path.exists(filename)
128
 
        if create_table:
129
 
            safe_init_db(
130
 
                filename, "create table Data "
131
 
                "(key binary primary key, revid binary, data binary)")
132
 
        self.connection = dbapi2.connect(filename)
133
 
        self.cursor = self.connection.cursor()
134
 
 
135
 
    def get(self, key, revid):
136
 
        self.cursor.execute(
137
 
            "select revid, data from data where key = ?", (dbapi2.Binary(key),))
138
 
        row = self.cursor.fetchone()
139
 
        if row is None:
140
 
            return None
141
 
        elif str(row[0]) != revid:
142
 
            return None
143
 
        else:
144
 
            return marshal.loads(zlib.decompress(row[1]))
145
 
 
146
 
    def set(self, key, revid, data):
147
 
        try:
148
 
            self.cursor.execute(
149
 
                'delete from data where key = ?', (dbapi2.Binary(key), ))
150
 
            blob = zlib.compress(marshal.dumps(data))
151
 
            self.cursor.execute(
152
 
                "insert into data (key, revid, data) values (?, ?, ?)",
153
 
                map(dbapi2.Binary, [key, revid, blob]))
154
 
            self.connection.commit()
155
 
        except dbapi2.IntegrityError:
156
 
            # If another thread or process attempted to set the same key, we
157
 
            # don't care too much -- it's only a cache after all!
158
 
            pass
 
30
import shelve
 
31
import threading
 
32
import time
 
33
 
 
34
from loggerhead import util
 
35
from loggerhead.util import decorator
 
36
from loggerhead.lockfile import LockFile
 
37
 
 
38
 
 
39
with_lock = util.with_lock('_lock', 'ChangeCache')
 
40
        
 
41
 
 
42
class ChangeCache (object):
 
43
    
 
44
    def __init__(self, history, cache_path):
 
45
        self.history = history
 
46
        self.log = history.log
 
47
        
 
48
        if not os.path.exists(cache_path):
 
49
            os.mkdir(cache_path)
 
50
 
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
 
55
        # use a lockfile since the cache folder could be shared across different processes.
 
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
 
57
        self._closed = False
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
 
65
    @with_lock
 
66
    def close(self):
 
67
        self.log.debug('Closing cache file.')
 
68
        self._closed = True
 
69
    
 
70
    @with_lock
 
71
    def closed(self):
 
72
        return self._closed
 
73
 
 
74
    @with_lock
 
75
    def flush(self):
 
76
        pass
 
77
    
 
78
    @with_lock
 
79
    def get_changes(self, revid_list, get_diffs=False):
 
80
        """
 
81
        get a list of changes by their revision_ids.  any changes missing
 
82
        from the cache are fetched by calling L{History.get_change_uncached}
 
83
        and inserted into the cache before returning.
 
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
 
90
        out = []
 
91
        fetch_list = []
 
92
        sfetch_list = []
 
93
        for revid in revid_list:
 
94
            # if the revid is in unicode, use the utf-8 encoding as the key
 
95
            srevid = util.to_utf8(revid)
 
96
            
 
97
            if srevid in cache:
 
98
                out.append(cache[srevid])
 
99
            else:
 
100
                #self.log.debug('Entry cache miss: %r' % (revid,))
 
101
                out.append(None)
 
102
                fetch_list.append(revid)
 
103
                sfetch_list.append(srevid)
 
104
        
 
105
        if len(fetch_list) > 0:
 
106
            # some revisions weren't in the cache; fetch them
 
107
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
108
            if changes is None:
 
109
                return changes
 
110
            for i in xrange(len(revid_list)):
 
111
                if out[i] is None:
 
112
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
113
        
 
114
        cache.close()
 
115
        return out
 
116
    
 
117
    @with_lock
 
118
    def full(self, get_diffs=False):
 
119
        if get_diffs:
 
120
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
121
        else:
 
122
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
123
        try:
 
124
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
125
        finally:
 
126
            cache.close()
 
127
 
 
128
    @with_lock
 
129
    def sizes(self):
 
130
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
131
        s1 = len(cache)
 
132
        cache.close()
 
133
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
134
        s2 = len(cache)
 
135
        cache.close()
 
136
        return s1, s2
 
137
        
 
138
    def check_rebuild(self, max_time=3600):
 
139
        """
 
140
        check if we need to fill in any missing pieces of the cache.  pull in
 
141
        any missing changes, but don't work any longer than C{max_time}
 
142
        seconds.
 
143
        """
 
144
        if self.closed() or self.full():
 
145
            return
 
146
        
 
147
        self.log.info('Building revision cache...')
 
148
        start_time = time.time()
 
149
        last_update = time.time()
 
150
        count = 0
 
151
 
 
152
        work = list(self.history.get_revision_history())
 
153
        jump = 100
 
154
        for i in xrange(0, len(work), jump):
 
155
            r = work[i:i + jump]
 
156
            # must call into history so we grab the branch lock (otherwise, lock inversion)
 
157
            self.history.get_changes(r)
 
158
            if self.closed():
 
159
                self.flush()
 
160
                return
 
161
            count += jump
 
162
            now = time.time()
 
163
            if now - start_time > max_time:
 
164
                self.log.info('Cache rebuilding will pause for now.')
 
165
                self.flush()
 
166
                return
 
167
            if now - last_update > 60:
 
168
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
 
169
                last_update = time.time()
 
170
                self.flush()
 
171
            # give someone else a chance at the lock
 
172
            time.sleep(1)
 
173
        self.log.info('Revision cache rebuild completed.')
 
174
        self.flush()
 
175
 
 
176