~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to database/replication/generate_migration.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-10-12 11:56:15 UTC
  • mfrom: (9795.4.34 replication)
  • Revision ID: launchpad@pqm.canonical.com-20111012115615-me2z8tpaal17v113
[no-qa] [r=stub] Slony-I 2.0 compatibility and migration script for
        staging tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python -S
 
2
# Copyright 2011 Canonical Ltd.  This software is licensed under the
 
3
# GNU Affero General Public License version 3 (see the file LICENSE).
 
4
 
 
5
"""Generate slonik scripts for Slony 1.2 to 2.0 migration.
 
6
 
 
7
Remove this script after migration is complete.
 
8
"""
 
9
 
 
10
__metaclass__ = type
 
11
__all__ = []
 
12
 
 
13
import _pythonpath
 
14
 
 
15
from optparse import OptionParser
 
16
import os.path
 
17
from textwrap import dedent
 
18
 
 
19
from canonical.config import config
 
20
from canonical.database.sqlbase import connect
 
21
from canonical.launchpad import scripts
 
22
import replication.helpers
 
23
from replication.helpers import (
 
24
    LPMAIN_SET_ID,
 
25
    LPMIRROR_SET_ID,
 
26
    SSO_SET_ID,
 
27
    get_all_cluster_nodes,
 
28
    get_master_node,
 
29
    )
 
30
 
 
31
 
 
32
con = None
 
33
options = None
 
34
 
 
35
sets = {
 
36
    LPMAIN_SET_ID: 'lpmain_set',
 
37
    SSO_SET_ID: 'sso_set',
 
38
    LPMIRROR_SET_ID: 'lpmirror_set',
 
39
    }
 
40
 
 
41
 
 
42
def outpath(filename):
 
43
    return os.path.join(options.outdir, filename)
 
44
 
 
45
 
 
46
def message(outf, msg):
 
47
    assert "'" not in msg
 
48
    print >> outf, "echo '%s';" % msg
 
49
 
 
50
 
 
51
def generate_preamble():
 
52
    outf = open(outpath('mig_preamble.sk'), 'w')
 
53
    print >> outf, replication.helpers.preamble(con)
 
54
 
 
55
    cur = con.cursor()
 
56
 
 
57
    for set_id, set_name in list(sets.items()):
 
58
        cur.execute(
 
59
            "SELECT set_origin FROM _sl.sl_set WHERE set_id=%s", [set_id])
 
60
        result = cur.fetchone()
 
61
        if result:
 
62
            origin = result[0]
 
63
            print >> outf, "define %s_origin %d;" % (set_name, origin)
 
64
        else:
 
65
            del sets[set_id] # For testing. Production will have 3 sets.
 
66
    outf.close()
 
67
 
 
68
 
 
69
def generate_uninstall():
 
70
    outf = open(outpath('mig_uninstall.sk'), 'w')
 
71
    print >> outf, "# Uninstall Slony-I 1.2 from all nodes"
 
72
    print >> outf, "include <mig_preamble.sk>;"
 
73
 
 
74
    nodes = get_all_cluster_nodes(con)
 
75
 
 
76
    # Ensure everything is really, really synced since we will be
 
77
    # resubscribing with 'omit copy'
 
78
    for node in nodes:
 
79
        print >> outf, dedent("""\
 
80
                sync (id=%d);
 
81
                wait for event (origin=%d, confirmed=all, wait on=%d);
 
82
                """).strip() % (node.node_id, node.node_id, node.node_id)
 
83
 
 
84
    for node in nodes:
 
85
        message(outf, "Uninstall node %d" % node.node_id)
 
86
        print >> outf, "uninstall node (id=%d);" % node.node_id
 
87
    outf.close()
 
88
 
 
89
 
 
90
def generate_sync():
 
91
    outf = open(outpath('mig_sync.sk'), 'w')
 
92
    message(outf, "Waiting for sync")
 
93
    print >> outf, "sync (id=@master_node);"
 
94
    print >> outf, dedent("""\
 
95
        wait for event (
 
96
                origin=@master_node, confirmed=all, wait on=@master_node);
 
97
            """).strip()
 
98
    outf.close()
 
99
 
 
100
 
 
101
def generate_rebuild():
 
102
    outf = open(outpath('mig_rebuild.sk'), 'w')
 
103
    print >> outf, "# Rebuild the replication cluster with Slony-I 2.0"
 
104
    print >> outf, "include <mig_preamble.sk>;"
 
105
 
 
106
    nodes = get_all_cluster_nodes(con)
 
107
    first_node = nodes[0]
 
108
    remaining_nodes = nodes[1:]
 
109
 
 
110
    # Initialize the cluster
 
111
    message(outf, "Initializing cluster (node %d)" % first_node.node_id)
 
112
    print >> outf, "init cluster (id=%d);" % first_node.node_id
 
113
 
 
114
    # Create all the other nodes
 
115
    for node in remaining_nodes:
 
116
        message(outf, "Initializing node %d" % node.node_id)
 
117
        print >> outf, "store node (id=%d, event node=%d);" % (
 
118
            node.node_id, first_node.node_id)
 
119
 
 
120
    # Create paths so they can communicate.
 
121
    message(outf, "Storing %d paths" % pow(len(nodes),2))
 
122
    for client_node in nodes:
 
123
        for server_node in nodes:
 
124
            print >> outf, (
 
125
                "store path (server=%d, client=%d, "
 
126
                "conninfo=@node%d_node_conninfo);" % (
 
127
                    server_node.node_id, client_node.node_id,
 
128
                    server_node.node_id))
 
129
 
 
130
    # sync to ensure replication is happening.
 
131
    print >> outf, "include <mig_sync.sk>;"
 
132
 
 
133
    # Create replication sets.
 
134
    for set_id, set_name in sets.items():
 
135
        generate_initialize_set(set_id, set_name, outf)
 
136
    print >> outf, "include <mig_sync.sk>;"
 
137
 
 
138
    # Subscribe slave nodes to replication sets.
 
139
    for set_id, set_name in sets.items():
 
140
        generate_subscribe_set(set_id, set_name, outf)
 
141
 
 
142
    outf.close()
 
143
 
 
144
 
 
145
def generate_initialize_set(set_id, set_name, outf):
 
146
    origin_node = get_master_node(con, set_id)
 
147
    message(outf, "Creating %s origin %d" % (set_name, origin_node.node_id))
 
148
    print >> outf, "create set (id=%d, origin=@%s_origin, comment='%s');" % (
 
149
        set_id, set_name, set_name)
 
150
    cur = con.cursor()
 
151
    cur.execute("""
 
152
        SELECT tab_id, tab_nspname, tab_relname, tab_comment
 
153
        FROM _sl.sl_table WHERE tab_set=%s
 
154
        """, (set_id,))
 
155
    results = cur.fetchall()
 
156
    message(outf, "Adding %d tables to %s" % (len(results), set_name))
 
157
    for tab_id, tab_nspname, tab_relname, tab_comment in results:
 
158
        if not tab_comment:
 
159
            tab_comment=''
 
160
        print >> outf, dedent("""\
 
161
                set add table (
 
162
                    set id=@%s, origin=@%s_origin, id=%d,
 
163
                    fully qualified name='%s.%s',
 
164
                    comment='%s');
 
165
                """).strip() % (
 
166
                    set_name, set_name, tab_id,
 
167
                    tab_nspname, tab_relname, tab_comment)
 
168
    cur.execute("""
 
169
        SELECT seq_id, seq_nspname, seq_relname, seq_comment
 
170
        FROM _sl.sl_sequence WHERE seq_set=%s
 
171
        """, (set_id,))
 
172
    results = cur.fetchall()
 
173
    message(outf, "Adding %d sequences to %s" % (len(results), set_name))
 
174
    for seq_id, seq_nspname, seq_relname, seq_comment in results:
 
175
        if not seq_comment:
 
176
            seq_comment=''
 
177
        print >> outf, dedent("""\
 
178
                set add sequence (
 
179
                    set id=@%s, origin=@%s_origin, id=%d,
 
180
                    fully qualified name='%s.%s',
 
181
                    comment='%s');
 
182
                """).strip() % (
 
183
                    set_name, set_name, seq_id,
 
184
                    seq_nspname, seq_relname, seq_comment)
 
185
 
 
186
 
 
187
def generate_subscribe_set(set_id, set_name, outf):
 
188
    origin_node = get_master_node(con, set_id)
 
189
    cur = con.cursor()
 
190
    cur.execute("""
 
191
        SELECT sub_receiver FROM _sl.sl_subscribe
 
192
        WHERE sub_set=%s and sub_active is true
 
193
        """, (set_id,))
 
194
    for receiver_id, in cur.fetchall():
 
195
        message(outf, "Subscribing node %d to %s" % (receiver_id, set_name))
 
196
        print >> outf, dedent("""\
 
197
                subscribe set (
 
198
                    id=%d, provider=@%s_origin, receiver=%d,
 
199
                    forward=true, omit copy=true);
 
200
                wait for event (
 
201
                    origin=@%s_origin, confirmed=all, wait on=@%s_origin);
 
202
                """).strip() % (
 
203
                    set_id, set_name, receiver_id,
 
204
                    set_name, set_name)
 
205
        print >> outf, "include <mig_sync.sk>;"
 
206
 
 
207
 
 
208
def main():
 
209
    parser = OptionParser()
 
210
    scripts.db_options(parser)
 
211
    parser.add_option(
 
212
        "-o", "--output-dir", dest='outdir', default=".",
 
213
        help="Write slonik scripts to DIR", metavar="DIR")
 
214
    global options
 
215
    options, args = parser.parse_args()
 
216
    if args:
 
217
        parser.error("Too many arguments")
 
218
    scripts.execute_zcml_for_scripts(use_web_security=False)
 
219
 
 
220
    global con
 
221
    con = connect()
 
222
 
 
223
    generate_preamble()
 
224
    generate_sync()
 
225
    generate_uninstall()
 
226
    generate_rebuild()
 
227
 
 
228
    return 0
 
229
 
 
230
 
 
231
if __name__ == '__main__':
 
232
    raise SystemExit(main())