47
47
* Create our IO thread state information table if we need to.
52
* Version 1.0: Initial definition
53
* Version 1.1: Added master_id and PK on master_id
51
57
sql.push_back("COMMIT");
52
58
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
59
" `master_id` BIGINT NOT NULL,"
53
60
" `status` VARCHAR(20) NOT NULL,"
54
" `error_msg` VARCHAR(250))"
55
" COMMENT = 'VERSION 1.0'");
61
" `error_msg` VARCHAR(250),"
62
" PRIMARY KEY(`master_id`))"
63
" COMMENT = 'VERSION 1.1'");
57
65
if (not executeSQL(sql))
61
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
64
sql::ResultSet result_set(1);
65
Execute execute(*(_session.get()), true);
66
execute.run(sql[0], result_set);
68
string count= result_set.getString(0);
70
/* Must always be at least one row in the table */
74
sql.push_back("INSERT INTO `sys_replication`.`io_state` (`status`)"
75
" VALUES ('STOPPED')");
76
if (not executeSQL(sql))
82
69
* Create our applier thread state information table if we need to.
73
* Table: applier_state
74
* Version 1.0: Initial definition
75
* Version 1.1: Added master_id and changed PK to master_id
86
79
sql.push_back("COMMIT");
87
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
88
" (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
80
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state` ("
81
" `master_id` BIGINT NOT NULL,"
82
" `last_applied_commit_id` BIGINT NOT NULL,"
89
83
" `status` VARCHAR(20) NOT NULL,"
90
" `error_msg` VARCHAR(250))"
91
" COMMENT = 'VERSION 1.0'");
84
" `error_msg` VARCHAR(250),"
85
" PRIMARY KEY(`master_id`))"
86
" COMMENT = 'VERSION 1.1'");
93
88
if (not executeSQL(sql))
97
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
100
sql::ResultSet result_set(1);
101
Execute execute(*(_session.get()), true);
102
execute.run(sql[0], result_set);
104
string count= result_set.getString(0);
106
/* Must always be at least one row in the table */
110
sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
111
" (`last_applied_commit_id`, `status`)"
112
" VALUES (0, 'STOPPED')");
113
if (not executeSQL(sql))
119
92
* Create our message queue table if we need to.
93
* Version 1.0: Initial definition
94
* Version 1.1: Added master_id and changed PK to (master_id, trx_id, seg_id)
123
98
sql.push_back("COMMIT");
124
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue`"
125
" (`trx_id` BIGINT NOT NULL, `seg_id` INT NOT NULL,"
126
" `commit_order` BIGINT, `msg` BLOB,"
127
" PRIMARY KEY(`trx_id`, `seg_id`))"
128
" COMMENT = 'VERSION 1.0'");
99
sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue` ("
100
" `master_id` BIGINT NOT NULL,"
101
" `trx_id` BIGINT NOT NULL,"
102
" `seg_id` INT NOT NULL,"
103
" `commit_order` BIGINT,"
105
" PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
106
" COMMENT = 'VERSION 1.1'");
129
107
if (not executeSQL(sql))
135
bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
113
bool ReplicationSchema::createInitialIORow(uint32_t master_id)
117
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
119
sql::ResultSet result_set(1);
120
Execute execute(*(_session.get()), true);
121
execute.run(sql[0], result_set);
123
string count= result_set.getString(0);
128
sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
129
+ boost::lexical_cast<string>(master_id)
131
if (not executeSQL(sql))
138
bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
142
sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
144
sql::ResultSet result_set(1);
145
Execute execute(*(_session.get()), true);
146
execute.run(sql[0], result_set);
148
string count= result_set.getString(0);
153
sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
154
" (`master_id`, `last_applied_commit_id`, `status`) VALUES ("
155
+ boost::lexical_cast<string>(master_id)
156
+ ",0 , 'STOPPED')");
157
if (not executeSQL(sql))
164
bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
137
166
vector<string> sql;
139
168
sql.push_back("UPDATE `sys_replication`.`applier_state`"
140
169
" SET `last_applied_commit_id` = "
141
+ lexical_cast<string>(value));
170
+ lexical_cast<string>(value)
171
+ " WHERE `master_id` = "
172
+ lexical_cast<string>(master_id));
143
174
return executeSQL(sql);