~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to utilities/pgmassacre.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-10-06 01:24:37 UTC
  • mfrom: (14039.1.12 bug-759467)
  • Revision ID: launchpad@pqm.canonical.com-20111006012437-x4xn9cohnyp5ztlx
[r=gmb][bug=759467] [r=gmb][bug=759467] Store
 INCOMPLETE_WITH_RESPONSE and INCOMPLETE_WITHOUT_RESPONSE for BugTask status
 to make queries more efficient.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
2
#
3
 
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
 
3
# Copyright 2009 Canonical Ltd.  This software is licensed under the
4
4
# GNU Affero General Public License version 3 (see the file LICENSE).
5
5
 
6
 
# This file is mirrored into lp:losa-db-scripts, so please keep that
7
 
# version in sync with the master in the Launchpad tree.
8
 
 
9
6
"""
10
7
dropdb only more so.
11
8
 
12
 
Cut off access, slaughter connections and burn the database to the ground
13
 
(but do nothing that could put the system into recovery mode).
 
9
Cut off access, slaughter connections and burn the database to the ground.
14
10
"""
15
11
 
 
12
# Nothing but system installed libraries - this script sometimes
 
13
# gets installed standalone with no Launchpad tree available.
 
14
from distutils.version import LooseVersion
16
15
import sys
17
16
import time
18
17
import psycopg2
19
18
import psycopg2.extensions
 
19
from signal import SIGTERM, SIGQUIT, SIGKILL, SIGINT
20
20
from optparse import OptionParser
21
21
 
22
22
 
28
28
        return psycopg2.connect("dbname=%s" % dbname)
29
29
 
30
30
 
 
31
def send_signal(database, signal):
 
32
    con = connect()
 
33
    con.set_isolation_level(1) # READ COMMITTED. We rollback changes we make.
 
34
    cur = con.cursor()
 
35
 
 
36
    # Install PL/PythonU if it isn't already.
 
37
    cur.execute("SELECT TRUE FROM pg_language WHERE lanname = 'plpythonu'")
 
38
    if cur.fetchone() is None:
 
39
        cur.execute('CREATE LANGUAGE "plpythonu"')
 
40
 
 
41
    # Create a stored procedure to kill a backend process.
 
42
    qdatabase = str(psycopg2.extensions.QuotedString(database))
 
43
    cur.execute("""
 
44
        CREATE OR REPLACE FUNCTION _pgmassacre_killall(integer)
 
45
        RETURNS Boolean AS $$
 
46
        import os
 
47
 
 
48
        signal = args[0]
 
49
        for row in plpy.execute('''
 
50
            SELECT procpid FROM pg_stat_activity WHERE datname=%(qdatabase)s
 
51
                AND procpid != pg_backend_pid()
 
52
            '''):
 
53
            try:
 
54
                os.kill(row['procpid'], signal)
 
55
            except OSError:
 
56
                pass
 
57
        else:
 
58
            return False
 
59
 
 
60
        return True
 
61
        $$ LANGUAGE plpythonu
 
62
        """ % vars())
 
63
 
 
64
    cur.execute("SELECT _pgmassacre_killall(%(signal)s)", vars())
 
65
    con.rollback()
 
66
    con.close()
 
67
 
 
68
 
31
69
def rollback_prepared_transactions(database):
32
70
    """Rollback any prepared transactions.
33
71
 
35
73
    transactions.
36
74
    """
37
75
    con = connect(database)
38
 
    con.set_isolation_level(0)  # Autocommit so we can ROLLBACK PREPARED.
 
76
    con.set_isolation_level(0) # Autocommit so we can ROLLBACK PREPARED.
39
77
    cur = con.cursor()
40
78
 
41
79
    # Get a list of outstanding prepared transactions.
48
86
    con.close()
49
87
 
50
88
 
51
 
def still_open(database, max_wait=120):
 
89
def still_open(database, max_wait=10):
52
90
    """Return True if there are still open connections, apart from our own.
53
91
 
54
 
    Waits a while to ensure that connections shutting down have a chance
55
 
    to. This might take a while if there is a big transaction to
56
 
    rollback.
 
92
    Waits a while to ensure that connections shutting down have a chance to.
57
93
    """
58
94
    con = connect()
59
 
    con.set_isolation_level(0)  # Autocommit.
 
95
    con.set_isolation_level(0) # Autocommit.
60
96
    cur = con.cursor()
61
 
    # Keep checking until the timeout is reached, returning True if all
62
 
    # of the backends are gone.
 
97
    # Wait for up to 10 seconds, returning True if all backends are gone.
63
98
    start = time.time()
64
99
    while time.time() < start + max_wait:
65
100
        cur.execute("""
71
106
            """, vars())
72
107
        if cur.fetchone() is None:
73
108
            return False
74
 
        time.sleep(0.6)  # Stats only updated every 500ms.
 
109
        time.sleep(0.6) # Stats only updated every 500ms.
75
110
    con.close()
76
111
    return True
77
112
 
78
113
 
79
114
def massacre(database):
80
115
    con = connect()
81
 
    con.set_isolation_level(0)  # Autocommit
 
116
    con.set_isolation_level(0) # Autocommit
82
117
    cur = con.cursor()
83
118
 
84
119
    # Allow connections to the doomed database if something turned this off,
96
131
            "UPDATE pg_database SET datallowconn=FALSE WHERE datname=%s",
97
132
            [database])
98
133
 
99
 
        # New connections are disabled, but pg_stat_activity is only
100
 
        # updated every 500ms. Ensure that pg_stat_activity has
101
 
        # been refreshed to catch any connections that opened
102
 
        # immediately before setting datallowconn.
103
 
        time.sleep(1)
104
 
 
105
 
        # Terminate open connections.
106
 
        cur.execute("""
107
 
            SELECT procpid, pg_terminate_backend(procpid)
108
 
            FROM pg_stat_activity
109
 
            WHERE datname=%s AND procpid <> pg_backend_pid()
110
 
            """, [database])
111
 
        for procpid, success in cur.fetchall():
112
 
            if not success:
113
 
                print >> sys.stderr, (
114
 
                    "pg_terminate_backend(%s) failed" % procpid)
115
134
        con.close()
116
135
 
 
136
        # Terminate current statements.
 
137
        send_signal(database, SIGINT)
 
138
 
 
139
        # Shutdown current connections normally.
 
140
        if still_open(database, 1):
 
141
            send_signal(database, SIGTERM)
 
142
 
 
143
        # Shutdown current connections immediately.
 
144
        if still_open(database):
 
145
            send_signal(database, SIGQUIT)
 
146
 
 
147
        # Shutdown current connections nastily.
 
148
        if still_open(database):
 
149
            send_signal(database, SIGKILL)
 
150
 
117
151
        if still_open(database):
118
152
            print >> sys.stderr, (
119
153
                    "Unable to kill all backends! Database not destroyed.")
124
158
        # AUTOCOMMIT required to execute commands like DROP DATABASE.
125
159
        con.set_isolation_level(0)
126
160
        cur = con.cursor()
127
 
        cur.execute("DROP DATABASE %s" % database)  # Not quoted.
 
161
        cur.execute("DROP DATABASE %s" % database) # Not quoted.
128
162
        con.close()
129
163
        return 0
130
164
    finally:
150
184
    now = start
151
185
    error_msg = None
152
186
    con = connect()
153
 
    con.set_isolation_level(0)  # Autocommit required for CREATE DATABASE.
 
187
    con.set_isolation_level(0) # Autocommit required for CREATE DATABASE.
154
188
    create_db_cmd = """
155
189
        CREATE DATABASE %s WITH ENCODING='UTF8' TEMPLATE=%s
156
190
        """ % (database, template)
159
193
    # We make use of this feature so we don't have to care what locale
160
194
    # was used to create the database cluster rather than requiring it
161
195
    # to be rebuilt in the C locale.
162
 
    if template == "template0":
 
196
    if pg_version >= LooseVersion("8.4.0") and template == "template0":
163
197
        create_db_cmd += "LC_COLLATE='C' LC_CTYPE='C'"
164
198
    while now < start + 20:
165
199
        cur = con.cursor()
169
203
            return 0
170
204
        except psycopg2.Error, exception:
171
205
            error_msg = str(exception)
172
 
        time.sleep(0.6)  # Stats only updated every 500ms.
 
206
        time.sleep(0.6) # Stats only updated every 500ms.
173
207
        now = time.time()
174
208
    con.close()
175
209
 
194
228
 
195
229
 
196
230
options = None
 
231
pg_version = None # LooseVersion - Initialized in main()
197
232
 
198
233
 
199
234
def main():
201
236
    parser.add_option("-U", "--user", dest="user", default=None,
202
237
        help="Connect as USER", metavar="USER")
203
238
    parser.add_option("-t", "--template", dest="template", default=None,
204
 
        help="Recreate database using DBNAME as a template database."
205
 
            " If template0, database will be created in the C locale.",
 
239
        help="Recreate database using DBNAME as a template database.",
206
240
        metavar="DBNAME")
207
241
    global options
208
242
    (options, args) = parser.parse_args()
220
254
    con = connect()
221
255
    cur = con.cursor()
222
256
 
 
257
    # Store the database version for version specific code.
 
258
    global pg_version
 
259
    cur.execute("show server_version")
 
260
    pg_version = LooseVersion(cur.fetchone()[0])
 
261
 
223
262
    # Ensure the template database exists.
224
263
    if options.template is not None:
225
264
        cur.execute(