~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to database/schema/preflight.py

  • Committer: Steve Kowalik
  • Date: 2011-08-07 04:05:52 UTC
  • mto: This revision was merged to the branch mainline in revision 13626.
  • Revision ID: stevenk@ubuntu.com-20110807040552-mwnxo0flmhvl35e8
Correct the notification based on review comments, and remove request{,ed}
from the function names, switching to create{,d}.

Show diffs side-by-side

added added

removed removed

Lines of Context:
4
4
 
5
5
"""Confirm the database systems are ready to be patched as best we can."""
6
6
 
 
7
import _pythonpath
 
8
 
7
9
__all__ = [
8
10
    'DatabasePreflight',
9
11
    'KillConnectionsPreflight',
10
12
    'NoConnectionCheckPreflight',
11
13
    ]
12
14
 
13
 
import _pythonpath
14
15
 
15
16
from datetime import timedelta
16
17
from optparse import OptionParser
17
 
import os.path
18
 
import time
 
18
import sys
19
19
 
20
20
import psycopg2
21
21
 
22
 
from lp.services.database.sqlbase import (
 
22
from canonical.database.sqlbase import (
23
23
    connect,
24
24
    ISOLATION_LEVEL_AUTOCOMMIT,
25
25
    sqlvalues,
26
26
    )
27
 
from lp.services.scripts import (
 
27
from canonical.launchpad.scripts import (
28
28
    db_options,
29
29
    logger,
30
30
    logger_options,
31
31
    )
 
32
from canonical import lp
32
33
import replication.helpers
33
 
import upgrade
 
34
 
34
35
 
35
36
# Ignore connections by these users.
36
37
SYSTEM_USERS = frozenset(['postgres', 'slony', 'nagios', 'lagmon'])
40
41
# added here. The preflight check will fail if any of these users are
41
42
# connected, so these systems will need to be shut down manually before
42
43
# a database update.
43
 
FRAGILE_USERS = frozenset([
44
 
    'buildd_manager',
45
 
    # process_accepted is fragile, but also fast so we likely shouldn't
46
 
    # need to ever manually shut it down.
47
 
    'process_accepted',
48
 
    'process_upload',
49
 
    'publish_distro',
50
 
    'publish_ftpmaster',
51
 
    ])
52
 
 
53
 
# If these users have long running transactions, just kill 'em. Entries
54
 
# added here must come with a bug number, a if part of Launchpad holds
55
 
# open a long running transaction it is a bug we need to fix.
56
 
BAD_USERS = frozenset([
57
 
    'karma',  # Bug #863109
58
 
    'rosettaadmin',  # Bug #863122
59
 
    ])
 
44
FRAGILE_USERS = frozenset(['archivepublisher'])
60
45
 
61
46
# How lagged the cluster can be before failing the preflight check.
62
 
# If this is set too low, perfectly normal state will abort rollouts. If
63
 
# this is set too high, then we will have unacceptable downtime as
64
 
# replication needs to catch up before the database patches will apply.
65
47
MAX_LAG = timedelta(seconds=60)
66
48
 
67
49
 
68
50
class DatabasePreflight:
69
51
    def __init__(self, log):
70
 
        master_con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
 
52
        master_con = connect(lp.dbuser)
 
53
        master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
71
54
 
72
55
        self.log = log
73
56
        self.is_replicated = replication.helpers.slony_installed(master_con)
94
77
            self.lpmain_nodes = set(
95
78
                node for node in self.nodes
96
79
                if node.node_id in lpmain_node_ids)
97
 
 
98
 
            # Store a reference to the lpmain origin.
99
 
            lpmain_master_node_id = replication.helpers.get_master_node(
100
 
                master_con, 1).node_id
101
 
            self.lpmain_master_node = [
102
 
                node for node in self.lpmain_nodes
103
 
                    if node.node_id == lpmain_master_node_id][0]
104
80
        else:
105
81
            node = replication.helpers.Node(None, None, None, True)
106
82
            node.con = master_con
107
83
            self.nodes = set([node])
108
84
            self.lpmain_nodes = self.nodes
109
 
            self.lpmain_master_node = node
110
85
 
111
86
    def check_is_superuser(self):
112
87
        """Return True if all the node connections are as superusers."""
189
164
                % ', '.join(FRAGILE_USERS))
190
165
        return success
191
166
 
192
 
    def check_long_running_transactions(self, max_secs=60):
 
167
    def check_long_running_transactions(self, max_secs=10):
193
168
        """Return False if any nodes have long running transactions open.
194
169
 
195
170
        max_secs defines what is long running. For database rollouts,
196
171
        this will be short. Even if the transaction is benign like a
197
172
        autovacuum task, we should wait until things have settled down.
198
 
 
199
 
        We ignore transactions held open by BAD_USERS. These are bugs
200
 
        that need to be fixed, but we have determined that rudely aborting
201
 
        them is fine for now and there is no need to block a rollout on
202
 
        their behalf.
203
173
        """
204
174
        success = True
205
175
        for node in self.nodes:
214
184
                    AND datname=current_database()
215
185
                """ % max_secs)
216
186
            for datname, usename, age, current_query in cur.fetchall():
217
 
                if usename in BAD_USERS:
218
 
                    self.log.info(
219
 
                        "%s has transactions by %s open %s (ignoring)",
220
 
                        datname, usename, age)
221
 
                else:
222
 
                    self.log.fatal(
223
 
                        "%s has transaction by %s open %s",
224
 
                        datname, usename, age)
225
 
                    success = False
 
187
                self.log.fatal(
 
188
                    "%s has transaction by %s open %s",
 
189
                    datname, usename, age)
 
190
                success = False
226
191
        if success:
227
192
            self.log.info("No long running transactions detected.")
228
193
        return success
261
226
        cluster to be quiescent.
262
227
        """
263
228
        if self.is_replicated:
264
 
            success = replication.helpers.sync(30, exit_on_fail=False)
 
229
            success = replication.helpers.sync(30)
265
230
            if success:
266
231
                self.log.info(
267
232
                    "Replication events are being propagated.")
276
241
        else:
277
242
            return True
278
243
 
279
 
    def report_patches(self):
280
 
        """Report what patches are due to be applied from this tree."""
281
 
        con = self.lpmain_master_node.con
282
 
        upgrade.log = self.log
283
 
        for patch_num, patch_file in upgrade.get_patchlist(con):
284
 
            self.log.info("%s is pending", os.path.basename(patch_file))
285
 
 
286
244
    def check_all(self):
287
245
        """Run all checks.
288
246
 
293
251
            # to pg_stat_activity
294
252
            return False
295
253
 
296
 
        self.report_patches()
297
 
 
298
254
        success = True
299
255
        if not self.check_replication_lag():
300
256
            success = False
326
282
 
327
283
        System users are defined by SYSTEM_USERS.
328
284
        """
329
 
        # We keep trying to terminate connections every 0.5 seconds for
330
 
        # up to 10 seconds.
331
 
        num_tries = 20
332
 
        seconds_to_pause = 0.5
333
 
        for loop_count in range(num_tries):
334
 
            all_clear = True
335
 
            for node in self.lpmain_nodes:
336
 
                cur = node.con.cursor()
337
 
                cur.execute("""
338
 
                    SELECT
339
 
                        procpid, datname, usename,
340
 
                        pg_terminate_backend(procpid)
341
 
                    FROM pg_stat_activity
342
 
                    WHERE
343
 
                        datname=current_database()
344
 
                        AND procpid <> pg_backend_pid()
345
 
                        AND usename NOT IN %s
346
 
                    """ % sqlvalues(SYSTEM_USERS))
347
 
                for procpid, datname, usename, ignored in cur.fetchall():
348
 
                    all_clear = False
349
 
                    if loop_count == num_tries - 1:
350
 
                        self.log.fatal(
351
 
                            "Unable to kill %s [%s] on %s",
352
 
                            usename, procpid, datname)
353
 
                    elif usename in BAD_USERS:
354
 
                        self.log.info(
355
 
                            "Killed %s [%s] on %s", usename, procpid, datname)
356
 
                    else:
357
 
                        self.log.warning(
358
 
                            "Killed %s [%s] on %s", usename, procpid, datname)
359
 
            if all_clear:
360
 
                break
361
 
 
362
 
            # Wait a little for any terminated connections to actually
363
 
            # terminate.
364
 
            time.sleep(seconds_to_pause)
365
 
        return all_clear
 
285
        for node in self.lpmain_nodes:
 
286
            cur = node.con.cursor()
 
287
            cur.execute("""
 
288
                SELECT
 
289
                    procpid, datname, usename, pg_terminate_backend(procpid)
 
290
                FROM pg_stat_activity
 
291
                WHERE
 
292
                    datname=current_database()
 
293
                    AND procpid <> pg_backend_pid()
 
294
                    AND usename NOT IN %s
 
295
                """ % sqlvalues(SYSTEM_USERS))
 
296
            for procpid, datname, usename, ignored in cur.fetchall():
 
297
                self.log.warning(
 
298
                    "Killed %s [%s] on %s", usename, procpid, datname)
 
299
        return True
366
300
 
367
301
 
368
302
def main():
403
337
 
404
338
 
405
339
if __name__ == '__main__':
406
 
    raise SystemExit(main())
 
340
    sys.exit(main())