~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/changecache.py

  • Committer: Robey Pointer
  • Date: 2007-01-14 05:40:40 UTC
  • Revision ID: robey@lag.net-20070114054040-7i9lbhq992e612rq
fix up dev.cfg so that nobody will ever have to edit it, by letting the
important params be overridable in loggerhead.conf.

make start-loggerhead actually daemonize, write a pid file, and write logs
to normal log files, instead of requiring 'nohup' stuff.  ie act like a real
server.  added stop-loggerhead to do a clean shutdown.  changed the README
to clarify how it should work now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
cached a change, it's good forever.
26
26
"""
27
27
 
28
 
import cPickle
 
28
import logging
29
29
import os
 
30
import shelve
 
31
import threading
 
32
import time
30
33
 
31
34
from loggerhead import util
 
35
from loggerhead.util import decorator
32
36
from loggerhead.lockfile import LockFile
33
37
 
 
38
 
34
39
with_lock = util.with_lock('_lock', 'ChangeCache')
35
 
 
36
 
try:
37
 
    from sqlite3 import dbapi2
38
 
except ImportError:
39
 
    from pysqlite2 import dbapi2
40
 
 
41
 
 
42
 
class FakeShelf(object):
43
 
 
44
 
    def __init__(self, filename):
45
 
        create_table = not os.path.exists(filename)
46
 
        self.connection = dbapi2.connect(filename)
47
 
        self.cursor = self.connection.cursor()
48
 
        if create_table:
49
 
            self._create_table()
50
 
 
51
 
    def _create_table(self):
52
 
        self.cursor.execute(
53
 
            "create table RevisionData "
54
 
            "(revid binary primary key, data binary)")
55
 
        self.connection.commit()
56
 
 
57
 
    def _serialize(self, obj):
58
 
        r = dbapi2.Binary(cPickle.dumps(obj, protocol=2))
59
 
        return r
60
 
 
61
 
    def _unserialize(self, data):
62
 
        return cPickle.loads(str(data))
63
 
 
64
 
    def get(self, revid):
65
 
        self.cursor.execute(
66
 
            "select data from revisiondata where revid = ?", (revid, ))
67
 
        filechange = self.cursor.fetchone()
68
 
        if filechange is None:
69
 
            return None
70
 
        else:
71
 
            return self._unserialize(filechange[0])
72
 
 
73
 
    def add(self, revid_obj_pairs):
74
 
        for (r, d) in revid_obj_pairs:
75
 
            self.cursor.execute(
76
 
                "insert into revisiondata (revid, data) values (?, ?)",
77
 
                (r, self._serialize(d)))
78
 
        self.connection.commit()
79
 
 
80
 
 
81
 
class FileChangeCache(object):
82
 
 
 
40
        
 
41
 
 
42
class ChangeCache (object):
 
43
    
83
44
    def __init__(self, history, cache_path):
84
45
        self.history = history
85
 
 
 
46
        self.log = history.log
 
47
        
86
48
        if not os.path.exists(cache_path):
87
49
            os.mkdir(cache_path)
88
50
 
89
 
        self._changes_filename = os.path.join(cache_path, 'filechanges.sql')
90
 
 
91
 
        # use a lockfile since the cache folder could be shared across
92
 
        # different processes.
93
 
        self._lock = LockFile(os.path.join(cache_path, 'filechange-lock'))
94
 
 
95
 
    @with_lock
96
 
    def get_file_changes(self, entries):
 
51
        # keep a separate cache for the diffs, because they're very time-consuming to fetch.
 
52
        self._changes_filename = os.path.join(cache_path, 'changes')
 
53
        self._changes_diffs_filename = os.path.join(cache_path, 'changes-diffs')
 
54
        
 
55
        # use a lockfile since the cache folder could be shared across different processes.
 
56
        self._lock = LockFile(os.path.join(cache_path, 'lock'))
 
57
        self._closed = False
 
58
        
 
59
        # this is fluff; don't slow down startup time with it.
 
60
        def log_sizes():
 
61
            s1, s2 = self.sizes()
 
62
            self.log.info('Using change cache %s; %d/%d entries.' % (cache_path, s1, s2))
 
63
        threading.Thread(target=log_sizes).start()
 
64
    
 
65
    @with_lock
 
66
    def close(self):
 
67
        self.log.debug('Closing cache file.')
 
68
        self._closed = True
 
69
    
 
70
    @with_lock
 
71
    def closed(self):
 
72
        return self._closed
 
73
 
 
74
    @with_lock
 
75
    def flush(self):
 
76
        pass
 
77
    
 
78
    @with_lock
 
79
    def get_changes(self, revid_list, get_diffs=False):
 
80
        """
 
81
        get a list of changes by their revision_ids.  any changes missing
 
82
        from the cache are fetched by calling L{History.get_change_uncached}
 
83
        and inserted into the cache before returning.
 
84
        """
 
85
        if get_diffs:
 
86
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
87
        else:
 
88
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
89
 
97
90
        out = []
98
 
        missing_entries = []
99
 
        missing_entry_indices = []
100
 
        cache = FakeShelf(self._changes_filename)
101
 
        for entry in entries:
102
 
            changes = cache.get(entry.revid)
103
 
            if changes is not None:
104
 
                out.append(changes)
 
91
        fetch_list = []
 
92
        sfetch_list = []
 
93
        for revid in revid_list:
 
94
            # if the revid is in unicode, use the utf-8 encoding as the key
 
95
            srevid = util.to_utf8(revid)
 
96
            
 
97
            if srevid in cache:
 
98
                out.append(cache[srevid])
105
99
            else:
106
 
                missing_entries.append(entry)
107
 
                missing_entry_indices.append(len(out))
 
100
                #self.log.debug('Entry cache miss: %r' % (revid,))
108
101
                out.append(None)
109
 
        if missing_entries:
110
 
            missing_changes = self.history.get_file_changes_uncached(
111
 
                                  missing_entries)
112
 
            revid_changes_pairs = []
113
 
            for i, entry, changes in zip(
114
 
                missing_entry_indices, missing_entries, missing_changes):
115
 
                revid_changes_pairs.append((entry.revid, changes))
116
 
                out[i] = changes
117
 
            cache.add(revid_changes_pairs)
 
102
                fetch_list.append(revid)
 
103
                sfetch_list.append(srevid)
 
104
        
 
105
        if len(fetch_list) > 0:
 
106
            # some revisions weren't in the cache; fetch them
 
107
            changes = self.history.get_changes_uncached(fetch_list, get_diffs)
 
108
            if changes is None:
 
109
                return changes
 
110
            for i in xrange(len(revid_list)):
 
111
                if out[i] is None:
 
112
                    cache[sfetch_list.pop(0)] = out[i] = changes.pop(0)
 
113
        
 
114
        cache.close()
118
115
        return out
 
116
    
 
117
    @with_lock
 
118
    def full(self, get_diffs=False):
 
119
        if get_diffs:
 
120
            cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
121
        else:
 
122
            cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
123
        try:
 
124
            return (len(cache) >= len(self.history.get_revision_history())) and (util.to_utf8(self.history.last_revid) in cache)
 
125
        finally:
 
126
            cache.close()
 
127
 
 
128
    @with_lock
 
129
    def sizes(self):
 
130
        cache = shelve.open(self._changes_filename, 'c', protocol=2)
 
131
        s1 = len(cache)
 
132
        cache.close()
 
133
        cache = shelve.open(self._changes_diffs_filename, 'c', protocol=2)
 
134
        s2 = len(cache)
 
135
        cache.close()
 
136
        return s1, s2
 
137
        
 
138
    def check_rebuild(self, max_time=3600):
 
139
        """
 
140
        check if we need to fill in any missing pieces of the cache.  pull in
 
141
        any missing changes, but don't work any longer than C{max_time}
 
142
        seconds.
 
143
        """
 
144
        if self.closed() or self.full():
 
145
            return
 
146
        
 
147
        self.log.info('Building revision cache...')
 
148
        start_time = time.time()
 
149
        last_update = time.time()
 
150
        count = 0
 
151
 
 
152
        work = list(self.history.get_revision_history())
 
153
        jump = 100
 
154
        for i in xrange(0, len(work), jump):
 
155
            r = work[i:i + jump]
 
156
            # must call into history so we grab the branch lock (otherwise, lock inversion)
 
157
            self.history.get_changes(r)
 
158
            if self.closed():
 
159
                self.flush()
 
160
                return
 
161
            count += jump
 
162
            now = time.time()
 
163
            if now - start_time > max_time:
 
164
                self.log.info('Cache rebuilding will pause for now.')
 
165
                self.flush()
 
166
                return
 
167
            if now - last_update > 60:
 
168
                self.log.info('Revision cache rebuilding continues: %d/%d' % (min(count, len(work)), len(work)))
 
169
                last_update = time.time()
 
170
                self.flush()
 
171
            # give someone else a chance at the lock
 
172
            time.sleep(1)
 
173
        self.log.info('Revision cache rebuild completed.')
 
174
        self.flush()
 
175
 
 
176