87
91
message::Transaction transaction;
88
92
uint32_t segment_id= 1;
90
while (getMessage(*(session.get()), transaction, trx_id, segment_id++))
94
while (getMessage(*(session.get()), transaction, commit_id,
95
trx_id, segment_id++))
91
97
convertToSQL(transaction, aggregate_sql, segmented_sql);
101
* The last message in a transaction should always have a commit_id
102
* value larger than 0, though other messages of the same transaction
103
* will have commit_id = 0.
105
assert((not commit_id.empty()) && (commit_id != "0"));
93
107
assert(segmented_sql.empty());
95
109
if (not aggregate_sql.empty())
96
executeSQL(*(session.get()), aggregate_sql);
111
executeSQL(*(session.get()), aggregate_sql, commit_id);
98
114
deleteFromQueue(*(session.get()), trx_id);
131
bool QueueManager::createApplierSchemaAndTables(Session &session)
135
sql.push_back("COMMIT");
136
sql.push_back("CREATE SCHEMA IF NOT EXISTS replication");
138
if (not executeSQL(session, sql))
142
sql.push_back("COMMIT");
143
sql.push_back("CREATE TABLE IF NOT EXISTS replication.applier_state"
144
" (last_applied_commit_id BIGINT NOT NULL PRIMARY KEY)");
147
SELECT COUNT(*) FROM replication.applier_state
149
INSERT INTO replication.applier_state VALUES (0)
152
if (not executeSQL(session, sql))
115
159
bool QueueManager::getMessage(Session &session,
116
160
message::Transaction &transaction,
118
163
uint32_t segment_id)
120
string sql("SELECT msg FROM ");
165
string sql("SELECT msg, commit_order FROM ");
121
166
sql.append(getSchema());
123
168
sql.append(getTable());
126
171
sql.append(" AND seg_id = ");
127
172
sql.append(boost::lexical_cast<std::string>(segment_id));
129
printf("%s\n", sql.c_str()); fflush(stdout);
130
sql::ResultSet result_set(1);
174
//printf("%s\n", sql.c_str()); fflush(stdout);
175
sql::ResultSet result_set(2);
131
176
Execute execute(session, true);
133
178
execute.run(sql, &result_set);
135
assert(result_set.getMetaData().getColumnCount() == 1);
180
assert(result_set.getMetaData().getColumnCount() == 2);
137
182
/* Really should only be 1 returned row */
139
183
uint32_t found_rows= 0;
141
184
while (result_set.next())
143
string value= result_set.getString(0);
145
if ((value == "") || (found_rows == 1))
186
string msg= result_set.getString(0);
187
string com_id= result_set.getString(1);
189
if ((msg == "") || (found_rows == 1))
192
/* Neither column should be NULL */
148
193
assert(result_set.isNull(0) == false);
149
transaction.ParseFromArray(value.c_str(), value.length());
194
assert(result_set.isNull(1) == false);
196
//transaction.ParseFromString(value);
197
google::protobuf::TextFormat::ParseFromString(msg, &transaction);
223
if (statement.type() == message::Statement::ROLLBACK_STATEMENT)
272
switch (statement.type())
225
segmented_sql.clear();
274
/* DDL cannot be in a transaction, so precede with a COMMIT */
275
case message::Statement::TRUNCATE_TABLE:
276
case message::Statement::CREATE_SCHEMA:
277
case message::Statement::ALTER_SCHEMA:
278
case message::Statement::DROP_SCHEMA:
279
case message::Statement::CREATE_TABLE:
280
case message::Statement::ALTER_TABLE:
281
case message::Statement::DROP_TABLE:
283
segmented_sql.push_back("COMMIT");
287
/* Cancel any ongoing statement */
288
case message::Statement::ROLLBACK_STATEMENT:
290
segmented_sql.clear();
229
300
if (message::transformStatementToSql(statement, segmented_sql,
294
bool QueueManager::executeSQL(Session &session, vector<string> &sql)
364
bool QueueManager::executeSQL(Session &session,
366
const string &commit_id)
368
string tmp("UPDATE replication.applier_state"
369
" SET last_applied_commit_id = ");
370
tmp.append(commit_id);
373
return executeSQL(session, sql);
377
bool QueueManager::executeSQL(Session &session,
296
380
string combined_sql;