1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include <plugin/slave/queue_consumer.h>
23
#include <drizzled/message/transaction.pb.h>
24
#include <drizzled/message/statement_transform.h>
25
#include <drizzled/sql/result_set.h>
26
#include <drizzled/execute.h>
29
#include <boost/thread.hpp>
30
#include <boost/lexical_cast.hpp>
31
#include <google/protobuf/text_format.h>
34
using namespace drizzled;
39
bool QueueConsumer::init()
41
setApplierState("", true);
46
void QueueConsumer::shutdown()
48
setApplierState(getErrorMessage(), false);
52
bool QueueConsumer::process()
54
TrxIdList completedTransactionIds;
56
getListOfCompletedTransactions(completedTransactionIds);
58
for (size_t x= 0; x < completedTransactionIds.size(); x++)
61
uint64_t trx_id= completedTransactionIds[x];
63
vector<string> aggregate_sql; /* final SQL to execute */
64
vector<string> segmented_sql; /* carryover from segmented statements */
66
message::Transaction transaction;
67
uint32_t segment_id= 1;
69
while (getMessage(transaction, commit_id, trx_id, segment_id++))
71
convertToSQL(transaction, aggregate_sql, segmented_sql);
76
* The last message in a transaction should always have a commit_id
77
* value larger than 0, though other messages of the same transaction
78
* will have commit_id = 0.
80
assert((not commit_id.empty()) && (commit_id != "0"));
81
assert(segmented_sql.empty());
83
if (not aggregate_sql.empty())
86
* Execution using drizzled::Execute requires some special escaping.
88
vector<string>::iterator agg_iter;
89
for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
91
string &sql= *agg_iter;
92
string::iterator si= sql.begin();
93
for (; si != sql.end(); ++si)
97
si= sql.insert(si, '\\');
100
else if (*si == '\\')
102
si= sql.insert(si, '\\');
104
si= sql.insert(si, '\\');
106
si= sql.insert(si, '\\');
111
si= sql.insert(si, '\\');
112
++si; /* advance back to the semicolon */
118
if (not executeSQLWithCommitId(aggregate_sql, commit_id))
123
if (not deleteFromQueue(trx_id))
133
bool QueueConsumer::getMessage(message::Transaction &transaction,
138
string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
139
" WHERE `trx_id` = ");
140
sql.append(boost::lexical_cast<string>(trx_id));
141
sql.append(" AND `seg_id` = ", 16);
142
sql.append(boost::lexical_cast<string>(segment_id));
144
sql::ResultSet result_set(2);
145
Execute execute(*(_session.get()), true);
147
execute.run(sql, result_set);
149
assert(result_set.getMetaData().getColumnCount() == 2);
151
/* Really should only be 1 returned row */
152
uint32_t found_rows= 0;
153
while (result_set.next())
155
string msg= result_set.getString(0);
156
string com_id= result_set.getString(1);
158
if ((msg == "") || (found_rows == 1))
161
/* Neither column should be NULL */
162
assert(result_set.isNull(0) == false);
163
assert(result_set.isNull(1) == false);
165
google::protobuf::TextFormat::ParseFromString(msg, &transaction);
177
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
179
Execute execute(*(_session.get()), true);
181
string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
182
" WHERE `commit_order` IS NOT NULL ORDER BY `commit_order` ASC");
184
/* ResultSet size must match column count */
185
sql::ResultSet result_set(1);
187
execute.run(sql, result_set);
189
assert(result_set.getMetaData().getColumnCount() == 1);
191
while (result_set.next())
193
assert(result_set.isNull(0) == false);
194
string value= result_set.getString(0);
196
/* empty string returned when no more results */
199
list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
207
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
208
vector<string> &aggregate_sql,
209
vector<string> &segmented_sql)
211
if (transaction.has_event())
214
size_t num_statements= transaction.statement_size();
217
* Loop through all Statement messages within this Transaction and
218
* convert each to equivalent SQL statements. Complete Statements will
219
* be appended to aggregate_sql, while segmented Statements will remain
220
* in segmented_sql to be appended to until completed, or rolled back.
223
for (size_t idx= 0; idx < num_statements; idx++)
225
const message::Statement &statement= transaction.statement(idx);
227
/* We won't bother with executing a rolled back transaction */
228
if (statement.type() == message::Statement::ROLLBACK)
230
assert(idx == (num_statements - 1)); /* should be the final Statement */
231
aggregate_sql.clear();
232
segmented_sql.clear();
236
switch (statement.type())
238
/* DDL cannot be in a transaction, so precede with a COMMIT */
239
case message::Statement::TRUNCATE_TABLE:
240
case message::Statement::CREATE_SCHEMA:
241
case message::Statement::ALTER_SCHEMA:
242
case message::Statement::DROP_SCHEMA:
243
case message::Statement::CREATE_TABLE:
244
case message::Statement::ALTER_TABLE:
245
case message::Statement::DROP_TABLE:
246
case message::Statement::RAW_SQL: /* currently ALTER TABLE or RENAME */
248
segmented_sql.push_back("COMMIT");
252
/* Cancel any ongoing statement */
253
case message::Statement::ROLLBACK_STATEMENT:
255
segmented_sql.clear();
265
if (message::transformStatementToSql(statement, segmented_sql,
266
message::DRIZZLE, true))
271
if (isEndStatement(statement))
273
aggregate_sql.insert(aggregate_sql.end(),
274
segmented_sql.begin(),
275
segmented_sql.end());
276
segmented_sql.clear();
284
bool QueueConsumer::isEndStatement(const message::Statement &statement)
286
switch (statement.type())
288
case (message::Statement::INSERT):
290
const message::InsertData &data= statement.insert_data();
291
if (not data.end_segment())
295
case (message::Statement::UPDATE):
297
const message::UpdateData &data= statement.update_data();
298
if (not data.end_segment())
302
case (message::Statement::DELETE):
304
const message::DeleteData &data= statement.delete_data();
305
if (not data.end_segment())
316
void QueueConsumer::setApplierState(const string &err_msg, bool status)
318
vector<string> statements;
324
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
328
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
331
sql.append(", `error_msg` = '", 17);
333
/* Escape embedded quotes and statement terminators */
335
for (it= msg.begin(); it != msg.end(); ++it)
339
it= msg.insert(it, '\'');
340
++it; /* advance back to the quote */
344
it= msg.insert(it, '\\');
345
++it; /* advance back to the semicolon */
352
statements.push_back(sql);
353
executeSQL(statements);
357
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
358
const string &commit_id)
360
string tmp("UPDATE `sys_replication`.`applier_state`"
361
" SET `last_applied_commit_id` = ");
362
tmp.append(commit_id);
365
return executeSQL(sql);
369
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
371
string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
372
sql.append(boost::lexical_cast<std::string>(trx_id));
374
vector<string> sql_vect;
375
sql_vect.push_back(sql);
377
return executeSQL(sql_vect);
380
} /* namespace slave */