1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include <plugin/slave/queue_consumer.h>
23
#include <drizzled/errmsg_print.h>
24
#include <drizzled/execute.h>
25
#include <drizzled/message/transaction.pb.h>
26
#include <drizzled/message/statement_transform.h>
27
#include <drizzled/sql/result_set.h>
30
#include <boost/thread.hpp>
31
#include <boost/lexical_cast.hpp>
32
#include <google/protobuf/text_format.h>
35
using namespace drizzled;
40
bool QueueConsumer::init()
42
setApplierState("", true);
47
void QueueConsumer::shutdown()
49
setApplierState(getErrorMessage(), false);
53
bool QueueConsumer::process()
55
for (size_t index= 0; index < _master_ids.size(); index++)
57
/* We go ahead and get the string version of the master ID
58
* so we don't have to keep converting it from int to string.
60
const string master_id= boost::lexical_cast<string>(_master_ids[index]);
62
if (not processSingleMaster(master_id))
69
bool QueueConsumer::processSingleMaster(const string &master_id)
71
TrxIdList completedTransactionIds;
73
getListOfCompletedTransactions(master_id, completedTransactionIds);
75
for (size_t x= 0; x < completedTransactionIds.size(); x++)
78
string originating_server_uuid;
79
uint64_t originating_commit_id= 0;
80
uint64_t trx_id= completedTransactionIds[x];
82
vector<string> aggregate_sql; /* final SQL to execute */
83
vector<string> segmented_sql; /* carryover from segmented statements */
85
message::Transaction transaction;
86
uint32_t segment_id= 1;
88
while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
89
originating_commit_id, segment_id++))
91
convertToSQL(transaction, aggregate_sql, segmented_sql);
96
* The last message in a transaction should always have a commit_id
97
* value larger than 0, though other messages of the same transaction
98
* will have commit_id = 0.
100
assert((not commit_id.empty()) && (commit_id != "0"));
101
assert(segmented_sql.empty());
103
if (not aggregate_sql.empty())
106
* Execution using drizzled::Execute requires some special escaping.
108
vector<string>::iterator agg_iter;
109
for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
111
string &sql= *agg_iter;
112
string::iterator si= sql.begin();
113
for (; si != sql.end(); ++si)
117
si= sql.insert(si, '\\');
120
else if (*si == '\\')
122
si= sql.insert(si, '\\');
124
si= sql.insert(si, '\\');
126
si= sql.insert(si, '\\');
131
si= sql.insert(si, '\\');
132
++si; /* advance back to the semicolon */
138
if (not executeSQLWithCommitId(aggregate_sql, commit_id,
139
originating_server_uuid,
140
originating_commit_id,
147
/* Still need to record that we handled this trx */
149
string tmp("UPDATE `sys_replication`.`applier_state`"
150
" SET `last_applied_commit_id` = ");
151
tmp.append(commit_id);
152
tmp.append(" WHERE `master_id` = ");
153
tmp.append(master_id);
163
if (not deleteFromQueue(master_id, trx_id))
173
bool QueueConsumer::getMessage(message::Transaction &transaction,
175
const string &master_id,
177
string &originating_server_uuid,
178
uint64_t &originating_commit_id,
181
string sql("SELECT `msg`, `commit_order`, `originating_server_uuid`, "
182
"`originating_commit_id` FROM `sys_replication`.`queue`"
183
" WHERE `trx_id` = ");
184
sql.append(boost::lexical_cast<string>(trx_id));
185
sql.append(" AND `seg_id` = ", 16);
186
sql.append(boost::lexical_cast<string>(segment_id));
187
sql.append(" AND `master_id` = ", 19),
188
sql.append(master_id);
190
sql::ResultSet result_set(4);
191
Execute execute(*(_session.get()), true);
193
execute.run(sql, result_set);
195
assert(result_set.getMetaData().getColumnCount() == 4);
197
/* Really should only be 1 returned row */
198
uint32_t found_rows= 0;
199
while (result_set.next())
201
string msg= result_set.getString(0);
202
string com_id= result_set.getString(1);
203
string orig_server_uuid= result_set.getString(2);
204
string orig_commit_id= result_set.getString(3);
206
if ((msg == "") || (found_rows == 1))
209
/* No columns should be NULL */
210
assert(result_set.isNull(0) == false);
211
assert(result_set.isNull(1) == false);
212
assert(result_set.isNull(2) == false);
213
assert(result_set.isNull(3) == false);
216
google::protobuf::TextFormat::ParseFromString(msg, &transaction);
219
originating_server_uuid= orig_server_uuid;
220
originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
230
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
233
Execute execute(*(_session.get()), true);
235
string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
236
" WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
237
" AND `master_id` = "
239
+ " ORDER BY `commit_order` ASC");
241
/* ResultSet size must match column count */
242
sql::ResultSet result_set(1);
244
execute.run(sql, result_set);
246
assert(result_set.getMetaData().getColumnCount() == 1);
248
while (result_set.next())
250
assert(result_set.isNull(0) == false);
251
string value= result_set.getString(0);
253
/* empty string returned when no more results */
256
list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
264
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
265
vector<string> &aggregate_sql,
266
vector<string> &segmented_sql)
268
if (transaction.has_event())
271
size_t num_statements= transaction.statement_size();
274
* Loop through all Statement messages within this Transaction and
275
* convert each to equivalent SQL statements. Complete Statements will
276
* be appended to aggregate_sql, while segmented Statements will remain
277
* in segmented_sql to be appended to until completed, or rolled back.
280
for (size_t idx= 0; idx < num_statements; idx++)
282
const message::Statement &statement= transaction.statement(idx);
284
/* We won't bother with executing a rolled back transaction */
285
if (statement.type() == message::Statement::ROLLBACK)
287
assert(idx == (num_statements - 1)); /* should be the final Statement */
288
aggregate_sql.clear();
289
segmented_sql.clear();
293
switch (statement.type())
295
/* DDL cannot be in a transaction, so precede with a COMMIT */
296
case message::Statement::TRUNCATE_TABLE:
297
case message::Statement::CREATE_SCHEMA:
298
case message::Statement::ALTER_SCHEMA:
299
case message::Statement::DROP_SCHEMA:
300
case message::Statement::CREATE_TABLE:
301
case message::Statement::ALTER_TABLE:
302
case message::Statement::DROP_TABLE:
303
case message::Statement::RAW_SQL: /* currently ALTER TABLE or RENAME */
305
segmented_sql.push_back("COMMIT");
309
/* Cancel any ongoing statement */
310
case message::Statement::ROLLBACK_STATEMENT:
312
segmented_sql.clear();
322
if (message::transformStatementToSql(statement, segmented_sql,
323
message::DRIZZLE, true))
328
if (isEndStatement(statement))
330
aggregate_sql.insert(aggregate_sql.end(),
331
segmented_sql.begin(),
332
segmented_sql.end());
333
segmented_sql.clear();
341
bool QueueConsumer::isEndStatement(const message::Statement &statement)
343
switch (statement.type())
345
case (message::Statement::INSERT):
347
const message::InsertData &data= statement.insert_data();
348
if (not data.end_segment())
352
case (message::Statement::UPDATE):
354
const message::UpdateData &data= statement.update_data();
355
if (not data.end_segment())
359
case (message::Statement::DELETE):
361
const message::DeleteData &data= statement.delete_data();
362
if (not data.end_segment())
374
* TODO: This currently updates every row in the applier_state table.
375
* This use to be a single row. With multi-master support, we now need
376
* a row for every master so we can track the last applied commit ID
377
* value for each. Eventually, we may want multiple consumer threads,
378
* so then we'd need to update each row independently.
380
void QueueConsumer::setApplierState(const string &err_msg, bool status)
382
vector<string> statements;
388
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
392
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
395
sql.append(", `error_msg` = '", 17);
397
/* Escape embedded quotes and statement terminators */
399
for (it= msg.begin(); it != msg.end(); ++it)
403
it= msg.insert(it, '\'');
404
++it; /* advance back to the quote */
408
it= msg.insert(it, '\\');
409
++it; /* advance back to the semicolon */
416
statements.push_back(sql);
417
executeSQL(statements);
421
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
422
const string &commit_id,
423
const string &originating_server_uuid,
424
uint64_t originating_commit_id,
425
const string &master_id)
427
string tmp("UPDATE `sys_replication`.`applier_state`"
428
" SET `last_applied_commit_id` = ");
429
tmp.append(commit_id);
430
tmp.append(", `originating_server_uuid` = '");
431
tmp.append(originating_server_uuid);
432
tmp.append("' , `originating_commit_id` = ");
433
tmp.append(boost::lexical_cast<string>(originating_commit_id));
435
tmp.append(" WHERE `master_id` = ");
436
tmp.append(master_id);
440
_session->setOriginatingServerUUID(originating_server_uuid);
441
_session->setOriginatingCommitID(originating_commit_id);
443
return executeSQL(sql);
447
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
449
string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
450
sql.append(boost::lexical_cast<std::string>(trx_id));
452
sql.append(" AND `master_id` = ");
453
sql.append(master_id);
455
vector<string> sql_vect;
456
sql_vect.push_back(sql);
458
return executeSQL(sql_vect);
461
} /* namespace slave */