1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
|
# Copyright 2009 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Common helpers for replication scripts."""
__metaclass__ = type
__all__ = []
import subprocess
import sys
from tempfile import NamedTemporaryFile
from textwrap import dedent
from canonical.config import config
from canonical.database.sqlbase import connect, sqlvalues
from canonical.database.postgresql import (
fqn, all_tables_in_schema, all_sequences_in_schema, ConnectionString
)
from canonical.launchpad.scripts.logger import log, DEBUG2
# The Slony-I clustername we use with Launchpad. Hardcoded because there
# is no point changing this, ever.
CLUSTERNAME = 'sl'
# The namespace in the database used to contain all the Slony-I tables.
CLUSTER_NAMESPACE = '_%s' % CLUSTERNAME
# Replication set id constants. Don't change these without DBA help.
LPMAIN_SET_ID = 1
HOLDING_SET_ID = 666
LPMIRROR_SET_ID = 4
# Seed tables for the lpmain replication set to be passed to
# calculate_replication_set().
LPMAIN_SEED = frozenset([
('public', 'account'),
('public', 'openidnonce'),
('public', 'openidassociation'),
('public', 'person'),
('public', 'launchpaddatabaserevision'),
('public', 'databasereplicationlag'),
('public', 'fticache'),
('public', 'nameblacklist'),
('public', 'openidconsumerassociation'),
('public', 'openidconsumernonce'),
('public', 'codeimportmachine'),
('public', 'scriptactivity'),
('public', 'standardshipitrequest'),
('public', 'bugtag'),
('public', 'launchpadstatistic'),
('public', 'parsedapachelog'),
('public', 'shipitsurvey'),
('public', 'databasereplicationlag'),
])
# Explicitly list tables that should not be replicated. This includes the
# session tables, as these might exist in developer databases but will not
# exist in the production launchpad database.
IGNORED_TABLES = set([
# Session tables that in some situations will exist in the main lp
# database.
'public.secret', 'public.sessiondata', 'public.sessionpkgdata',
# Mirror tables, per Bug #489078. These tables have their own private
# replication set that is setup manually.
'public.lp_account',
'public.lp_person',
'public.lp_personlocation',
'public.lp_teamparticipation',
# Database statistics
'public.databasetablestats',
'public.databasecpustats',
# Don't replicate OAuthNonce - too busy and no real gain.
'public.oauthnonce',
# Ubuntu SSO database. These tables where created manually by ISD
# and the Launchpad scripts should not mess with them. Eventually
# these tables will be in a totally separate database.
'public.auth_permission',
'public.auth_group',
'public.auth_user',
'public.auth_message',
'public.django_content_type',
'public.auth_permission',
'public.django_session',
'public.django_site',
'public.django_admin_log',
'public.ssoopenidrpconfig',
'public.auth_group_permissions',
'public.auth_user_groups',
'public.auth_user_user_permissions',
'public.oauth_nonce',
'public.oauth_consumer',
'public.oauth_token',
'public.api_user',
'public.oauth_consumer_id_seq',
'public.api_user_id_seq',
'public.oauth_nonce_id_seq',
])
# Calculate IGNORED_SEQUENCES
IGNORED_SEQUENCES = set('%s_id_seq' % table for table in IGNORED_TABLES)
def slony_installed(con):
"""Return True if the connected database is part of a Launchpad Slony-I
cluster.
"""
cur = con.cursor()
cur.execute("""
SELECT TRUE FROM pg_class,pg_namespace
WHERE
nspname = %s
AND relname = 'sl_table'
AND pg_class.relnamespace = pg_namespace.oid
""" % sqlvalues(CLUSTER_NAMESPACE))
return cur.fetchone() is not None
class TableReplicationInfo:
"""Internal table replication details."""
table_id = None
replication_set_id = None
master_node_id = None
def __init__(self, con, namespace, table_name):
cur = con.cursor()
cur.execute("""
SELECT tab_id, tab_set, set_origin
FROM %s.sl_table, %s.sl_set
WHERE tab_set = set_id
AND tab_nspname = %s
AND tab_relname = %s
""" % (
(CLUSTER_NAMESPACE, CLUSTER_NAMESPACE)
+ sqlvalues(namespace, table_name)))
row = cur.fetchone()
if row is None:
raise LookupError(fqn(namespace, table_name))
self.table_id, self.replication_set_id, self.master_node_id = row
def sync(timeout):
"""Generate a sync event and wait for it to complete on all nodes.
This means that all pending events have propagated and are in sync
to the point in time this method was called. This might take several
hours if there is a large backlog of work to replicate.
:param timeout: Number of seconds to wait for the sync. 0 to block
indefinitely.
"""
return execute_slonik("", sync=timeout)
def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
"""Use the slonik command line tool to run a slonik script.
:param script: The script as a string. Preamble should not be included.
:param sync: Number of seconds to wait for sync before failing. 0 to
block indefinitely.
:param exit_on_fail: If True, on failure of the slonik script
sys.exit is invoked using the slonik return code.
:param auto_preamble: If True, the generated preamble will be
automatically included.
:returns: True if the script completed successfully. False if
exit_on_fail is False and the script failed for any reason.
"""
# Add the preamble and optional sync to the script.
if auto_preamble:
script = preamble() + script
if sync is not None:
sync_script = dedent("""\
sync (id = @master_node);
wait for event (
origin = @master_node, confirmed = ALL,
wait on = @master_node, timeout = %d);
""" % sync)
script = script + sync_script
# Copy the script to a NamedTemporaryFile rather than just pumping it
# to slonik via stdin. This way it can be examined if slonik appears
# to hang.
script_on_disk = NamedTemporaryFile(prefix="slonik", suffix=".sk")
print >> script_on_disk, script
script_on_disk.flush()
# Run slonik
log.debug("Executing slonik script %s" % script_on_disk.name)
log.log(DEBUG2, 'Running script:\n%s' % script)
returncode = subprocess.call(['slonik', script_on_disk.name])
if returncode != 0:
log.error("slonik script failed")
if exit_on_fail:
sys.exit(1)
return returncode == 0
class Node:
"""Simple data structure for holding information about a Slony node."""
def __init__(self, node_id, nickname, connection_string, is_master):
self.node_id = node_id
self.nickname = nickname
self.connection_string = connection_string
self.is_master = is_master
def _get_nodes(con, query):
"""Return a list of Nodes."""
if not slony_installed(con):
return []
cur = con.cursor()
cur.execute(query)
nodes = []
for node_id, nickname, connection_string, is_master in cur.fetchall():
nodes.append(Node(node_id, nickname, connection_string, is_master))
return nodes
def get_master_node(con, set_id=1):
"""Return the master Node, or None if the cluster is still being setup."""
nodes = _get_nodes(con, """
SELECT DISTINCT
set_origin AS node_id,
'master',
pa_conninfo AS connection_string,
True
FROM _sl.sl_set
LEFT OUTER JOIN _sl.sl_path ON set_origin = pa_server
WHERE set_id = %d
""" % set_id)
if not nodes:
return None
assert len(nodes) == 1, "More than one master found for set %s" % set_id
return nodes[0]
def get_slave_nodes(con, set_id=1):
"""Return the list of slave Nodes."""
return _get_nodes(con, """
SELECT DISTINCT
pa_server AS node_id,
'slave' || pa_server,
pa_conninfo AS connection_string,
False
FROM _sl.sl_set
JOIN _sl.sl_subscribe ON set_id = sub_set
JOIN _sl.sl_path ON sub_receiver = pa_server
WHERE
set_id = %d
ORDER BY node_id
""" % set_id)
def get_nodes(con, set_id=1):
"""Return a list of all Nodes."""
master_node = get_master_node(con, set_id)
if master_node is None:
return []
else:
return [master_node] + get_slave_nodes(con, set_id)
def get_all_cluster_nodes(con):
"""Return a list of all Nodes in the cluster.
node.is_master will be None, as this boolean doesn't make sense
in the context of a cluster rather than a single replication set.
"""
if not slony_installed(con):
return []
nodes = _get_nodes(con, """
SELECT DISTINCT
pa_server AS node_id,
'node' || pa_server || '_node',
pa_conninfo AS connection_string,
NULL
FROM _sl.sl_path
ORDER BY node_id
""")
if not nodes:
# There are no subscriptions yet, so no paths. Generate the
# master Node.
cur = con.cursor()
cur.execute("SELECT no_id from _sl.sl_node")
node_ids = [row[0] for row in cur.fetchall()]
if len(node_ids) == 0:
return []
assert len(node_ids) == 1, "Multiple nodes but no paths."
master_node_id = node_ids[0]
master_connection_string = ConnectionString(
config.database.rw_main_master)
master_connection_string.user = 'slony'
return [Node(
master_node_id, 'node%d_node' % master_node_id,
master_connection_string, True)]
return nodes
def preamble(con=None):
"""Return the preable needed at the start of all slonik scripts."""
if con is None:
con = connect('slony')
master_node = get_master_node(con)
nodes = get_all_cluster_nodes(con)
if master_node is None and len(nodes) == 1:
master_node = nodes[0]
preamble = [dedent("""\
#
# Every slonik script must start with a clustername, which cannot
# be changed once the cluster is initialized.
#
cluster name = sl;
# Symbolic ids for replication sets.
define lpmain_set %d;
define holding_set %d;
define lpmirror_set %d;
""" % (LPMAIN_SET_ID, HOLDING_SET_ID, LPMIRROR_SET_ID))]
if master_node is not None:
preamble.append(dedent("""\
# Symbolic id for the main replication set master node.
define master_node %d;
define master_node_conninfo '%s';
""" % (master_node.node_id, master_node.connection_string)))
for node in nodes:
preamble.append(dedent("""\
define %s %d;
define %s_conninfo '%s';
node @%s admin conninfo = @%s_conninfo;
""" % (
node.nickname, node.node_id,
node.nickname, node.connection_string,
node.nickname, node.nickname)))
return '\n\n'.join(preamble)
def calculate_replication_set(cur, seeds):
"""Return the minimal set of tables and sequences needed in a
replication set containing the seed table.
A replication set must contain all tables linked by foreign key
reference to the given table, and sequences used to generate keys.
Tables and sequences can be added to the IGNORED_TABLES and
IGNORED_SEQUENCES lists for cases where we known can safely ignore
this restriction.
:param seeds: [(namespace, tablename), ...]
:returns: (tables, sequences)
"""
# Results
tables = set()
sequences = set()
# Our pending set to check
pending_tables = set(seeds)
# Generate the set of tables that reference the seed directly
# or indirectly via foreign key constraints, including the seed itself.
while pending_tables:
namespace, tablename = pending_tables.pop()
# Skip if the table doesn't exist - we might have seeds listed that
# have been removed or are yet to be created.
cur.execute("""
SELECT TRUE
FROM pg_class, pg_namespace
WHERE pg_class.relnamespace = pg_namespace.oid
AND pg_namespace.nspname = %s
AND pg_class.relname = %s
""" % sqlvalues(namespace, tablename))
if cur.fetchone() is None:
log.debug("Table %s.%s doesn't exist" % (namespace, tablename))
continue
tables.add((namespace, tablename))
# Find all tables that reference the current (seed) table
# and all tables that the seed table references.
cur.execute("""
SELECT ref_namespace.nspname, ref_class.relname
FROM
-- One of the seed tables
pg_class AS seed_class,
pg_namespace AS seed_namespace,
-- A table referencing the seed, or being referenced by
-- the seed.
pg_class AS ref_class,
pg_namespace AS ref_namespace,
pg_constraint
WHERE
seed_class.relnamespace = seed_namespace.oid
AND ref_class.relnamespace = ref_namespace.oid
AND seed_namespace.nspname = %s
AND seed_class.relname = %s
-- Foreign key constraints are all we care about.
AND pg_constraint.contype = 'f'
-- We want tables referenced by, or referred to, the
-- seed table.
AND ((pg_constraint.conrelid = ref_class.oid
AND pg_constraint.confrelid = seed_class.oid)
OR (pg_constraint.conrelid = seed_class.oid
AND pg_constraint.confrelid = ref_class.oid)
)
""" % sqlvalues(namespace, tablename))
for namespace, tablename in cur.fetchall():
key = (namespace, tablename)
if (key not in tables and key not in pending_tables
and '%s.%s' % (namespace, tablename) not in IGNORED_TABLES):
pending_tables.add(key)
# Generate the set of sequences that are linked to any of our set of
# tables. We assume these are all sequences created by creation of
# serial or bigserial columns, or other sequences OWNED BY a particular
# column.
for namespace, tablename in tables:
cur.execute("""
SELECT seq
FROM (
SELECT pg_get_serial_sequence(%s, attname) AS seq
FROM pg_namespace, pg_class, pg_attribute
WHERE pg_namespace.nspname = %s
AND pg_class.relnamespace = pg_namespace.oid
AND pg_class.relname = %s
AND pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attisdropped IS FALSE
) AS whatever
WHERE seq IS NOT NULL;
""" % sqlvalues(fqn(namespace, tablename), namespace, tablename))
for sequence, in cur.fetchall():
if sequence not in IGNORED_SEQUENCES:
sequences.add(sequence)
# We can't easily convert the sequence name to (namespace, name) tuples,
# so we might as well convert the tables to dot notation for consistancy.
tables = set(fqn(namespace, tablename) for namespace, tablename in tables)
return tables, sequences
def discover_unreplicated(cur):
"""Inspect the database for tables and sequences in the public schema
that are not in a replication set.
:returns: (unreplicated_tables_set, unreplicated_sequences_set)
"""
all_tables = all_tables_in_schema(cur, 'public')
all_sequences = all_sequences_in_schema(cur, 'public')
cur.execute("""
SELECT tab_nspname, tab_relname FROM %s
WHERE tab_nspname = 'public'
""" % fqn(CLUSTER_NAMESPACE, "sl_table"))
replicated_tables = set(fqn(*row) for row in cur.fetchall())
cur.execute("""
SELECT seq_nspname, seq_relname FROM %s
WHERE seq_nspname = 'public'
""" % fqn(CLUSTER_NAMESPACE, "sl_sequence"))
replicated_sequences = set(fqn(*row) for row in cur.fetchall())
return (
all_tables - replicated_tables - IGNORED_TABLES,
all_sequences - replicated_sequences - IGNORED_SEQUENCES)
class ReplicationConfigError(Exception):
"""Exception raised by validate_replication_sets() when our replication
setup is misconfigured.
"""
def validate_replication(cur):
"""Raise a ReplicationSetupError if there is something wrong with
our replication sets.
This might include tables exist that are not in a replication set,
or tables that exist in multiple replication sets for example.
These is not necessarily limits with what Slony-I allows, but might
be due to policies we have made (eg. a table allowed in just one
replication set).
"""
unrepl_tables, unrepl_sequences = discover_unreplicated(cur)
if unrepl_tables:
raise ReplicationConfigError(
"Unreplicated tables: %s" % repr(unrepl_tables))
if unrepl_sequences:
raise ReplicationConfigError(
"Unreplicated sequences: %s" % repr(unrepl_sequences))
lpmain_tables, lpmain_sequences = calculate_replication_set(
cur, LPMAIN_SEED)
|