59
59
boost::this_thread::at_thread_exit(&internal::my_thread_end);
61
61
/* setup a Session object */
62
Session::shared_ptr session= Session::make_shared(plugin::Listen::getNullClient(),
62
_session= Session::make_shared(plugin::Listen::getNullClient(),
64
64
identifier::User::shared_ptr user= identifier::User::make_shared();
65
65
user->setUser("slave");
66
session->setUser(user);
67
session->set_db(getSchema());
66
_session->setUser(user);
67
_session->set_db(getSchema());
69
createApplierSchemaAndTables(*(session.get()));
69
if (not createApplierSchemaAndTables())
71
72
uint64_t trx_id= 0;
79
80
TrxIdList completedTransactionIds;
81
getListOfCompletedTransactions(*(session.get()), completedTransactionIds);
82
getListOfCompletedTransactions(completedTransactionIds);
83
84
for (size_t x= 0; x < completedTransactionIds.size(); x++)
91
92
message::Transaction transaction;
92
93
uint32_t segment_id= 1;
94
while (getMessage(*(session.get()), transaction, commit_id,
95
trx_id, segment_id++))
95
while (getMessage(transaction, commit_id, trx_id, segment_id++))
97
97
convertToSQL(transaction, aggregate_sql, segmented_sql);
103
103
* will have commit_id = 0.
105
105
assert((not commit_id.empty()) && (commit_id != "0"));
107
106
assert(segmented_sql.empty());
109
108
if (not aggregate_sql.empty())
111
executeSQL(*(session.get()), aggregate_sql, commit_id);
110
if (not executeSQL(aggregate_sql, commit_id))
112
/* TODO: Handle errors better. For now, just shutdown the slave. */
114
deleteFromQueue(*(session.get()), trx_id);
117
deleteFromQueue(trx_id);
131
bool QueueManager::createApplierSchemaAndTables(Session &session)
134
bool QueueManager::createApplierSchemaAndTables()
133
136
vector<string> sql;
135
138
sql.push_back("COMMIT");
136
139
sql.push_back("CREATE SCHEMA IF NOT EXISTS replication");
138
if (not executeSQL(session, sql))
141
if (not executeSQL(sql))
142
145
sql.push_back("COMMIT");
143
146
sql.push_back("CREATE TABLE IF NOT EXISTS replication.applier_state"
144
" (last_applied_commit_id BIGINT NOT NULL PRIMARY KEY)");
147
SELECT COUNT(*) FROM replication.applier_state
149
INSERT INTO replication.applier_state VALUES (0)
152
if (not executeSQL(session, sql))
147
" (last_applied_commit_id BIGINT NOT NULL PRIMARY KEY,"
148
" status VARCHAR(20) NOT NULL,"
149
" error_msg VARCHAR(250))");
151
if (not executeSQL(sql))
155
sql.push_back("SELECT COUNT(*) FROM replication.applier_state");
157
sql::ResultSet result_set(1);
158
Execute execute(*(_session.get()), true);
159
execute.run(sql[0], result_set);
161
string count= result_set.getString(0);
166
sql.push_back("INSERT INTO replication.applier_state"
167
" (last_applied_commit_id, status)"
168
" VALUES (0, 'RUNNING')");
169
if (not executeSQL(sql))
159
bool QueueManager::getMessage(Session &session,
160
message::Transaction &transaction,
177
bool QueueManager::getMessage(message::Transaction &transaction,
161
178
string &commit_id,
163
180
uint32_t segment_id)
174
191
//printf("%s\n", sql.c_str()); fflush(stdout);
175
192
sql::ResultSet result_set(2);
176
Execute execute(session, true);
193
Execute execute(*(_session.get()), true);
178
execute.run(sql, &result_set);
195
execute.run(sql, result_set);
180
197
assert(result_set.getMetaData().getColumnCount() == 2);
209
bool QueueManager::getListOfCompletedTransactions(Session &session,
226
bool QueueManager::getListOfCompletedTransactions(TrxIdList &list)
212
Execute execute(session, true);
228
Execute execute(*(_session.get()), true);
214
230
string sql("SELECT trx_id FROM ");
215
231
sql.append(getSchema());
220
236
/* ResultSet size must match column count */
221
237
sql::ResultSet result_set(1);
223
execute.run(sql, &result_set);
239
execute.run(sql, result_set);
225
241
assert(result_set.getMetaData().getColumnCount() == 1);
364
bool QueueManager::executeSQL(Session &session,
381
void QueueManager::setApplierState(const string &err_msg, bool status)
383
vector<string> statements;
389
sql= "UPDATE replication.applier_state SET status = 'STOPPED'";
393
sql= "UPDATE replication.applier_state SET status = 'RUNNING'";
396
sql.append(", error_msg = '");
398
/* Escape embedded quotes and statement terminators */
400
for (it= msg.begin(); it != msg.end(); ++it)
404
it= msg.insert(it, '\'');
405
++it; /* advance back to the quote */
409
it= msg.insert(it, '\\');
410
++it; /* advance back to the semicolon */
417
statements.push_back(sql);
418
executeSQL(statements);
422
bool QueueManager::executeSQL(vector<string> &sql,
366
423
const string &commit_id)
368
425
string tmp("UPDATE replication.applier_state"
369
426
" SET last_applied_commit_id = ");
370
427
tmp.append(commit_id);
371
428
sql.push_back(tmp);
373
return executeSQL(session, sql);
430
return executeSQL(sql);
377
bool QueueManager::executeSQL(Session &session,
434
bool QueueManager::executeSQL(vector<string> &sql)
380
436
string combined_sql;
382
Execute execute(session, true);
438
Execute execute(*(_session.get()), true);
384
440
vector<string>::iterator iter= sql.begin();
393
449
printf("execute: %s\n", combined_sql.c_str()); fflush(stdout);
451
sql::ResultSet result_set(1);
395
453
/* Execute wraps the SQL to run within a transaction */
396
execute.run(combined_sql);
454
execute.run(combined_sql, result_set);
456
sql::Exception exception= result_set.getException();
458
drizzled::error_t err= exception.getErrorCode();
460
if ((err != drizzled::EE_OK) && (err != drizzled::ER_EMPTY_QUERY))
462
/* avoid recursive errors from setApplierState() */
466
_in_error_state= true;
467
string err_msg("(SQLSTATE ");
468
err_msg.append(exception.getSQLState());
469
err_msg.append(") ");
470
err_msg.append(exception.getErrorMessage());
472
std::cerr << err_msg << std::endl;
473
std::cerr << "Slave failed while executing:\n";
474
for (size_t y= 0; y < sql.size(); y++)
475
std::cerr << sql[y] << std::endl;
477
setApplierState(err_msg, false);
402
bool QueueManager::deleteFromQueue(Session &session, uint64_t trx_id)
485
bool QueueManager::deleteFromQueue(uint64_t trx_id)
404
487
string sql("DELETE FROM ");
405
488
sql.append(getSchema());