47
47
* Create our IO thread state information table if we need to.
52
* Version 1.0: Initial definition
53
* Version 1.1: Added master_id and PK on master_id
57
51
sql.push_back("COMMIT");
58
52
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
59
" `master_id` BIGINT NOT NULL,"
60
53
" `status` VARCHAR(20) NOT NULL,"
61
" `error_msg` VARCHAR(250),"
62
" PRIMARY KEY(`master_id`))"
63
" COMMENT = 'VERSION 1.1'");
54
" `error_msg` VARCHAR(250))"
55
" COMMENT = 'VERSION 1.0'");
65
57
if (not executeSQL(sql))
61
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
64
sql::ResultSet result_set(1);
65
Execute execute(*(_session.get()), true);
66
execute.run(sql[0], result_set);
68
string count= result_set.getString(0);
70
/* Must always be at least one row in the table */
74
sql.push_back("INSERT INTO `sys_replication`.`io_state` (`status`)"
75
" VALUES ('STOPPED')");
76
if (not executeSQL(sql))
69
82
* Create our applier thread state information table if we need to.
73
* Table: applier_state
74
* Version 1.0: Initial definition
75
* Version 1.1: Added master_id and changed PK to master_id
79
86
sql.push_back("COMMIT");
80
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("
81
" `master_id` BIGINT NOT NULL,"
82
" `last_applied_commit_id` BIGINT NOT NULL,"
87
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
88
" (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
83
89
" `status` VARCHAR(20) NOT NULL,"
84
" `error_msg` VARCHAR(250),"
85
" PRIMARY KEY(`master_id`))"
86
" COMMENT = 'VERSION 1.1'");
90
" `error_msg` VARCHAR(250))"
91
" COMMENT = 'VERSION 1.0'");
88
93
if (not executeSQL(sql))
97
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
100
sql::ResultSet result_set(1);
101
Execute execute(*(_session.get()), true);
102
execute.run(sql[0], result_set);
104
string count= result_set.getString(0);
106
/* Must always be at least one row in the table */
110
sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
111
" (`last_applied_commit_id`, `status`)"
112
" VALUES (0, 'STOPPED')");
113
if (not executeSQL(sql))
92
119
* Create our message queue table if we need to.
93
* Version 1.0: Initial definition
94
* Version 1.1: Added master_id and changed PK to (master_id, trx_id, seg_id)
98
123
sql.push_back("COMMIT");
99
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue` ("
100
" `master_id` BIGINT NOT NULL,"
101
" `trx_id` BIGINT NOT NULL,"
102
" `seg_id` INT NOT NULL,"
103
" `commit_order` BIGINT,"
105
" PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
106
" COMMENT = 'VERSION 1.1'");
124
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue`"
125
" (`trx_id` BIGINT NOT NULL, `seg_id` INT NOT NULL,"
126
" `commit_order` BIGINT, `msg` BLOB,"
127
" PRIMARY KEY(`trx_id`, `seg_id`))"
128
" COMMENT = 'VERSION 1.0'");
107
129
if (not executeSQL(sql))
113
bool ReplicationSchema::createInitialIORow(uint32_t master_id)
117
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
119
sql::ResultSet result_set(1);
120
Execute execute(*(_session.get()), true);
121
execute.run(sql[0], result_set);
123
string count= result_set.getString(0);
128
sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
129
+ boost::lexical_cast<string>(master_id)
131
if (not executeSQL(sql))
138
bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
142
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
144
sql::ResultSet result_set(1);
145
Execute execute(*(_session.get()), true);
146
execute.run(sql[0], result_set);
148
string count= result_set.getString(0);
153
sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
154
" (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
155
+ boost::lexical_cast<string>(master_id)
156
+ ",0 , 'STOPPED')");
157
if (not executeSQL(sql))
164
bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
135
bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
166
137
vector<string> sql;
168
139
sql.push_back("UPDATE `sys_replication`.`applier_state`"
169
140
" SET `last_applied_commit_id` = "
170
+ lexical_cast<string>(value)
171
+ " WHERE `master_id` = "
172
+ lexical_cast<string>(master_id));
141
+ lexical_cast<string>(value));
174
143
return executeSQL(sql);