47
* Create our IO thread state information table if we need to.
47
Create our IO thread state information table if we need to.
52
Version 1.0: Initial definition
53
Version 1.1: Added master_id and PK on master_id
51
58
sql.push_back("COMMIT");
52
59
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
60
" `master_id` BIGINT NOT NULL,"
53
61
" `status` VARCHAR(20) NOT NULL,"
54
62
" `error_msg` VARCHAR(250))"
55
" COMMENT = 'VERSION 1.0'");
63
" COMMENT = 'VERSION 1.1'");
57
65
if (not executeSQL(sql))
61
70
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
86
96
* Table: applier_state
87
97
* Version 1.0: Initial definition
88
98
* Version 1.1: Added originating_server_uuid and originating_commit_id
99
* Version 1.2: Added master_id and changed PK to master_id
92
103
sql.push_back("COMMIT");
93
104
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
94
105
" (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
106
" `master_id` BIGINT NOT NULL,"
95
107
" `originating_server_uuid` VARCHAR(36) NOT NULL,"
96
108
" `originating_commit_id` BIGINT NOT NULL,"
97
109
" `status` VARCHAR(20) NOT NULL,"
98
110
" `error_msg` VARCHAR(250))"
99
" COMMENT = 'VERSION 1.1'");
111
" COMMENT = 'VERSION 1.2'");
101
113
if (not executeSQL(sql))
105
118
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
128
142
* Create our message queue table if we need to.
129
143
* Version 1.0: Initial definition
130
144
* Version 1.1: Added originating_server_uuid and originating_commit_id
145
* Version 1.2: Added master_id and changed PK to (master_id, trx_id, seg_id)
138
153
" `originating_server_uuid` VARCHAR(36) NOT NULL,"
139
154
" `originating_commit_id` BIGINT NOT NULL,"
156
" `master_id` BIGINT NOT NULL,"
141
157
" PRIMARY KEY(`trx_id`, `seg_id`))"
142
" COMMENT = 'VERSION 1.1'");
158
" COMMENT = 'VERSION 1.2'");
143
159
if (not executeSQL(sql))
149
bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
165
bool ReplicationSchema::createInitialIORow(uint32_t master_id)
169
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
171
sql::ResultSet result_set(1);
172
Execute execute(*(_session.get()), true);
173
execute.run(sql[0], result_set);
175
string count= result_set.getString(0);
180
sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
181
+ boost::lexical_cast<string>(master_id)
183
if (not executeSQL(sql))
190
bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
194
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
196
sql::ResultSet result_set(1);
197
Execute execute(*(_session.get()), true);
198
execute.run(sql[0], result_set);
200
string count= result_set.getString(0);
205
sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
206
" (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
207
+ boost::lexical_cast<string>(master_id)
208
+ ",0 , 'STOPPED')");
209
if (not executeSQL(sql))
217
bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
151
219
vector<string> sql;
153
221
sql.push_back("UPDATE `sys_replication`.`applier_state`"
154
222
" SET `last_applied_commit_id` = "
155
+ lexical_cast<string>(value));
223
+ lexical_cast<string>(value)
224
+ " WHERE `master_id` = "
225
+ lexical_cast<string>(master_id));
157
227
return executeSQL(sql);