1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include <plugin/slave/queue_consumer.h>
23
#include <drizzled/message/transaction.pb.h>
24
#include <drizzled/message/statement_transform.h>
25
#include <drizzled/sql/result_set.h>
26
#include <drizzled/execute.h>
29
#include <boost/thread.hpp>
30
#include <boost/lexical_cast.hpp>
31
#include <google/protobuf/text_format.h>
34
using namespace drizzled;
39
bool QueueConsumer::init()
41
setApplierState("", true);
46
void QueueConsumer::shutdown()
48
setApplierState(getErrorMessage(), false);
52
bool QueueConsumer::process()
54
TrxIdList completedTransactionIds;
56
getListOfCompletedTransactions(completedTransactionIds);
58
for (size_t x= 0; x < completedTransactionIds.size(); x++)
61
uint64_t trx_id= completedTransactionIds[x];
63
vector<string> aggregate_sql; /* final SQL to execute */
64
vector<string> segmented_sql; /* carryover from segmented statements */
66
message::Transaction transaction;
67
uint32_t segment_id= 1;
69
while (getMessage(transaction, commit_id, trx_id, segment_id++))
71
convertToSQL(transaction, aggregate_sql, segmented_sql);
76
* The last message in a transaction should always have a commit_id
77
* value larger than 0, though other messages of the same transaction
78
* will have commit_id = 0.
80
assert((not commit_id.empty()) && (commit_id != "0"));
81
assert(segmented_sql.empty());
83
if (not aggregate_sql.empty())
86
* Execution using drizzled::Execute requires some special escaping.
88
vector<string>::iterator agg_iter;
89
for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
91
string &sql= *agg_iter;
92
string::iterator si= sql.begin();
93
for (; si != sql.end(); ++si)
97
si= sql.insert(si, '\\');
100
else if (*si == '\\')
102
si= sql.insert(si, '\\');
104
si= sql.insert(si, '\\');
106
si= sql.insert(si, '\\');
111
si= sql.insert(si, '\\');
112
++si; /* advance back to the semicolon */
118
if (not executeSQLWithCommitId(aggregate_sql, commit_id))
123
if (not deleteFromQueue(trx_id))
133
bool QueueConsumer::getMessage(message::Transaction &transaction,
138
string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
139
" WHERE `trx_id` = ");
140
sql.append(boost::lexical_cast<string>(trx_id));
141
sql.append(" AND `seg_id` = ", 16);
142
sql.append(boost::lexical_cast<string>(segment_id));
144
sql::ResultSet result_set(2);
145
Execute execute(*(_session.get()), true);
147
execute.run(sql, result_set);
149
assert(result_set.getMetaData().getColumnCount() == 2);
151
/* Really should only be 1 returned row */
152
uint32_t found_rows= 0;
153
while (result_set.next())
155
string msg= result_set.getString(0);
156
string com_id= result_set.getString(1);
158
if ((msg == "") || (found_rows == 1))
161
/* Neither column should be NULL */
162
assert(result_set.isNull(0) == false);
163
assert(result_set.isNull(1) == false);
165
google::protobuf::TextFormat::ParseFromString(msg, &transaction);
177
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
179
Execute execute(*(_session.get()), true);
181
string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
182
" WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
183
" ORDER BY `commit_order` ASC");
185
/* ResultSet size must match column count */
186
sql::ResultSet result_set(1);
188
execute.run(sql, result_set);
190
assert(result_set.getMetaData().getColumnCount() == 1);
192
while (result_set.next())
194
assert(result_set.isNull(0) == false);
195
string value= result_set.getString(0);
197
/* empty string returned when no more results */
200
list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
208
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
209
vector<string> &aggregate_sql,
210
vector<string> &segmented_sql)
212
if (transaction.has_event())
215
size_t num_statements= transaction.statement_size();
218
* Loop through all Statement messages within this Transaction and
219
* convert each to equivalent SQL statements. Complete Statements will
220
* be appended to aggregate_sql, while segmented Statements will remain
221
* in segmented_sql to be appended to until completed, or rolled back.
224
for (size_t idx= 0; idx < num_statements; idx++)
226
const message::Statement &statement= transaction.statement(idx);
228
/* We won't bother with executing a rolled back transaction */
229
if (statement.type() == message::Statement::ROLLBACK)
231
assert(idx == (num_statements - 1)); /* should be the final Statement */
232
aggregate_sql.clear();
233
segmented_sql.clear();
237
switch (statement.type())
239
/* DDL cannot be in a transaction, so precede with a COMMIT */
240
case message::Statement::TRUNCATE_TABLE:
241
case message::Statement::CREATE_SCHEMA:
242
case message::Statement::ALTER_SCHEMA:
243
case message::Statement::DROP_SCHEMA:
244
case message::Statement::CREATE_TABLE:
245
case message::Statement::ALTER_TABLE:
246
case message::Statement::DROP_TABLE:
247
case message::Statement::RAW_SQL: /* currently ALTER TABLE or RENAME */
249
segmented_sql.push_back("COMMIT");
253
/* Cancel any ongoing statement */
254
case message::Statement::ROLLBACK_STATEMENT:
256
segmented_sql.clear();
266
if (message::transformStatementToSql(statement, segmented_sql,
267
message::DRIZZLE, true))
272
if (isEndStatement(statement))
274
aggregate_sql.insert(aggregate_sql.end(),
275
segmented_sql.begin(),
276
segmented_sql.end());
277
segmented_sql.clear();
285
bool QueueConsumer::isEndStatement(const message::Statement &statement)
287
switch (statement.type())
289
case (message::Statement::INSERT):
291
const message::InsertData &data= statement.insert_data();
292
if (not data.end_segment())
296
case (message::Statement::UPDATE):
298
const message::UpdateData &data= statement.update_data();
299
if (not data.end_segment())
303
case (message::Statement::DELETE):
305
const message::DeleteData &data= statement.delete_data();
306
if (not data.end_segment())
317
void QueueConsumer::setApplierState(const string &err_msg, bool status)
319
vector<string> statements;
325
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
329
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
332
sql.append(", `error_msg` = '", 17);
334
/* Escape embedded quotes and statement terminators */
336
for (it= msg.begin(); it != msg.end(); ++it)
340
it= msg.insert(it, '\'');
341
++it; /* advance back to the quote */
345
it= msg.insert(it, '\\');
346
++it; /* advance back to the semicolon */
353
statements.push_back(sql);
354
executeSQL(statements);
358
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
359
const string &commit_id)
361
string tmp("UPDATE `sys_replication`.`applier_state`"
362
" SET `last_applied_commit_id` = ");
363
tmp.append(commit_id);
366
return executeSQL(sql);
370
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
372
string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
373
sql.append(boost::lexical_cast<std::string>(trx_id));
375
vector<string> sql_vect;
376
sql_vect.push_back(sql);
378
return executeSQL(sql_vect);
381
} /* namespace slave */