~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to database/schema/preflight.py

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python2.6 -S
 
2
# Copyright 2011 Canonical Ltd.  This software is licensed under the
 
3
# GNU Affero General Public License version 3 (see the file LICENSE).
 
4
 
 
5
"""Confirm the database systems are ready to be patched as best we can."""
 
6
 
 
7
import _pythonpath
 
8
 
 
9
from datetime import timedelta
 
10
from optparse import OptionParser
 
11
import sys
 
12
 
 
13
import psycopg2
 
14
 
 
15
from canonical.database.sqlbase import (
 
16
    connect,
 
17
    ISOLATION_LEVEL_AUTOCOMMIT,
 
18
    )
 
19
from canonical.launchpad.scripts import (
 
20
    db_options,
 
21
    logger,
 
22
    logger_options,
 
23
    )
 
24
from canonical import lp
 
25
import replication.helpers
 
26
 
 
27
 
 
28
# Ignore connections by these users.
 
29
SYSTEM_USERS = frozenset(['postgres', 'slony', 'nagios'])
 
30
 
 
31
# How lagged the cluster can be before failing the preflight check.
 
32
MAX_LAG = timedelta(seconds=45)
 
33
 
 
34
 
 
35
class DatabasePreflight:
 
36
    def __init__(self, log, master_con):
 
37
        self.log = log
 
38
        self.is_replicated = replication.helpers.slony_installed(master_con)
 
39
        if self.is_replicated:
 
40
            self.nodes = replication.helpers.get_all_cluster_nodes(master_con)
 
41
            for node in self.nodes:
 
42
                node.con = psycopg2.connect(node.connection_string)
 
43
                node.con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
44
        else:
 
45
            node = replication.helpers.Node(None, None, None, True)
 
46
            node.con = master_con
 
47
            self.nodes = [node]
 
48
 
 
49
    def check_is_superuser(self):
 
50
        """Return True if all the node connections are as superusers."""
 
51
        success = True
 
52
        for node in self.nodes:
 
53
            cur = node.con.cursor()
 
54
            cur.execute("""
 
55
                SELECT current_database(), pg_user.usesuper
 
56
                FROM pg_user
 
57
                WHERE usename = current_user
 
58
                """)
 
59
            dbname, is_super = cur.fetchone()
 
60
            if is_super:
 
61
                self.log.debug("Connected to %s as a superuser.", dbname)
 
62
            else:
 
63
                self.log.fatal("Not connected to %s as a superuser.", dbname)
 
64
                success = False
 
65
        return success
 
66
 
 
67
    def check_open_connections(self):
 
68
        """Return False if any nodes have connections from non-system users.
 
69
 
 
70
        System users are defined by SYSTEM_USERS.
 
71
        """
 
72
        success = True
 
73
        for node in self.nodes:
 
74
            cur = node.con.cursor()
 
75
            cur.execute("""
 
76
                SELECT datname, usename, COUNT(*) AS num_connections
 
77
                FROM pg_stat_activity
 
78
                WHERE
 
79
                    datname=current_database()
 
80
                    AND procpid <> pg_backend_pid()
 
81
                GROUP BY datname, usename
 
82
                """)
 
83
            for datname, usename, num_connections in cur.fetchall():
 
84
                if usename in SYSTEM_USERS:
 
85
                    self.log.debug(
 
86
                        "%s has %d connections by %s",
 
87
                        datname, num_connections, usename)
 
88
                else:
 
89
                    self.log.fatal(
 
90
                        "%s has %d connections by %s",
 
91
                        datname, num_connections, usename)
 
92
                    success = False
 
93
        if success:
 
94
            self.log.info("Only system users connected to the cluster")
 
95
        return success
 
96
 
 
97
    def check_long_running_transactions(self, max_secs=10):
 
98
        """Return False if any nodes have long running transactions open.
 
99
 
 
100
        max_secs defines what is long running. For database rollouts,
 
101
        this will be short. Even if the transaction is benign like a
 
102
        autovacuum task, we should wait until things have settled down.
 
103
        """
 
104
        success = True
 
105
        for node in self.nodes:
 
106
            cur = node.con.cursor()
 
107
            cur.execute("""
 
108
                SELECT
 
109
                    datname, usename,
 
110
                    age(current_timestamp, xact_start) AS age, current_query
 
111
                FROM pg_stat_activity
 
112
                WHERE
 
113
                    age(current_timestamp, xact_start) > interval '%d secs'
 
114
                    AND datname=current_database()
 
115
                """ % max_secs)
 
116
            for datname, usename, age, current_query in cur.fetchall():
 
117
                self.log.fatal(
 
118
                    "%s has transaction by %s open %s",
 
119
                    datname, usename, age)
 
120
                success = False
 
121
        if success:
 
122
            self.log.info("No long running transactions detected.")
 
123
        return success
 
124
 
 
125
    def check_replication_lag(self):
 
126
        """Return False if the replication cluster is badly lagged."""
 
127
        if not self.is_replicated:
 
128
            self.log.debug("Not replicated - no replication lag.")
 
129
            return True
 
130
 
 
131
        # Check replication lag on every node just in case there are
 
132
        # disagreements.
 
133
        max_lag = timedelta(seconds=-1)
 
134
        max_lag_node = None
 
135
        for node in self.nodes:
 
136
            cur = node.con.cursor()
 
137
            cur.execute("""
 
138
                SELECT current_database(),
 
139
                max(st_lag_time) AS lag FROM _sl.sl_status
 
140
            """)
 
141
            dbname, lag = cur.fetchone()
 
142
            if lag > max_lag:
 
143
                max_lag = lag
 
144
                max_lag_node = node
 
145
            self.log.debug(
 
146
                "%s reports database lag of %s.", dbname, lag)
 
147
        if max_lag <= MAX_LAG:
 
148
            self.log.info("Database cluster lag is ok (%s)", max_lag)
 
149
            return True
 
150
        else:
 
151
            self.log.fatal("Database cluster lag is high (%s)", max_lag)
 
152
            return False
 
153
 
 
154
    def check_can_sync(self):
 
155
        """Return True if a sync event is acknowledged by all nodes.
 
156
 
 
157
        We only wait 30 seconds for the sync, because we require the
 
158
        cluster to be quiescent.
 
159
        """
 
160
        if self.is_replicated:
 
161
            success = replication.helpers.sync(30)
 
162
            if success:
 
163
                self.log.info(
 
164
                    "Replication events are being propagated.")
 
165
            else:
 
166
                self.log.fatal(
 
167
                    "Replication events are not being propagated.")
 
168
                self.log.fatal(
 
169
                    "One or more replication daemons may be down.")
 
170
                self.log.fatal(
 
171
                    "Bounce the replication daemons and check the logs.")
 
172
            return success
 
173
        else:
 
174
            return True
 
175
 
 
176
    def check_all(self):
 
177
        """Run all checks.
 
178
 
 
179
        If any failed, return False. Otherwise return True.
 
180
        """
 
181
        if not self.check_is_superuser():
 
182
            # No point continuing - results will be bogus without access
 
183
            # to pg_stat_activity
 
184
            return False
 
185
 
 
186
        success = True
 
187
        if not self.check_open_connections():
 
188
            success = False
 
189
        if not self.check_long_running_transactions():
 
190
            success = False
 
191
        if not self.check_replication_lag():
 
192
            success = False
 
193
        if not self.check_can_sync():
 
194
            success = False
 
195
        return success
 
196
 
 
197
 
 
198
def main():
 
199
    parser = OptionParser()
 
200
    db_options(parser)
 
201
    logger_options(parser)
 
202
    (options, args) = parser.parse_args()
 
203
    if args:
 
204
        parser.error("Too many arguments")
 
205
 
 
206
    log = logger(options)
 
207
 
 
208
    master_con = connect(lp.dbuser)
 
209
    master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
210
 
 
211
    preflight_check = DatabasePreflight(log, master_con)
 
212
 
 
213
    if preflight_check.check_all():
 
214
        log.info('Preflight check succeeded. Good to go.')
 
215
        return 0
 
216
    else:
 
217
        log.error('Preflight check failed.')
 
218
        return 1
 
219
 
 
220
 
 
221
if __name__ == '__main__':
 
222
    sys.exit(main())