1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include <plugin/slave/queue_consumer.h>
23
#include <drizzled/message/transaction.pb.h>
24
#include <drizzled/message/statement_transform.h>
25
#include <drizzled/sql/result_set.h>
26
#include <drizzled/execute.h>
29
#include <boost/thread.hpp>
30
#include <boost/lexical_cast.hpp>
31
#include <google/protobuf/text_format.h>
34
using namespace drizzled;
39
bool QueueConsumer::init()
41
setApplierState("", true);
46
void QueueConsumer::shutdown()
48
setApplierState(getErrorMessage(), false);
52
bool QueueConsumer::process()
54
TrxIdList completedTransactionIds;
56
getListOfCompletedTransactions(completedTransactionIds);
58
for (size_t x= 0; x < completedTransactionIds.size(); x++)
61
uint64_t trx_id= completedTransactionIds[x];
63
vector<string> aggregate_sql; /* final SQL to execute */
64
vector<string> segmented_sql; /* carryover from segmented statements */
66
message::Transaction transaction;
67
uint32_t segment_id= 1;
69
while (getMessage(transaction, commit_id, trx_id, segment_id++))
71
convertToSQL(transaction, aggregate_sql, segmented_sql);
76
* The last message in a transaction should always have a commit_id
77
* value larger than 0, though other messages of the same transaction
78
* will have commit_id = 0.
80
assert((not commit_id.empty()) && (commit_id != "0"));
81
assert(segmented_sql.empty());
83
if (not aggregate_sql.empty())
86
* Execution using drizzled::Execute requires some special escaping.
88
vector<string>::iterator agg_iter;
89
for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
91
string &sql= *agg_iter;
92
string::iterator si= sql.begin();
93
for (; si != sql.end(); ++si)
97
si= sql.insert(si, '\\');
100
else if (*si == '\\')
102
si= sql.insert(si, '\\');
104
si= sql.insert(si, '\\');
106
si= sql.insert(si, '\\');
113
if (not executeSQLWithCommitId(aggregate_sql, commit_id))
118
if (not deleteFromQueue(trx_id))
128
bool QueueConsumer::getMessage(message::Transaction &transaction,
133
string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
134
" WHERE `trx_id` = ");
135
sql.append(boost::lexical_cast<string>(trx_id));
136
sql.append(" AND `seg_id` = ", 16);
137
sql.append(boost::lexical_cast<string>(segment_id));
139
sql::ResultSet result_set(2);
140
Execute execute(*(_session.get()), true);
142
execute.run(sql, result_set);
144
assert(result_set.getMetaData().getColumnCount() == 2);
146
/* Really should only be 1 returned row */
147
uint32_t found_rows= 0;
148
while (result_set.next())
150
string msg= result_set.getString(0);
151
string com_id= result_set.getString(1);
153
if ((msg == "") || (found_rows == 1))
156
/* Neither column should be NULL */
157
assert(result_set.isNull(0) == false);
158
assert(result_set.isNull(1) == false);
160
google::protobuf::TextFormat::ParseFromString(msg, &transaction);
172
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
174
Execute execute(*(_session.get()), true);
176
string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
177
" WHERE `commit_order` IS NOT NULL ORDER BY `commit_order` ASC");
179
/* ResultSet size must match column count */
180
sql::ResultSet result_set(1);
182
execute.run(sql, result_set);
184
assert(result_set.getMetaData().getColumnCount() == 1);
186
while (result_set.next())
188
assert(result_set.isNull(0) == false);
189
string value= result_set.getString(0);
191
/* empty string returned when no more results */
194
list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
202
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
203
vector<string> &aggregate_sql,
204
vector<string> &segmented_sql)
206
if (transaction.has_event())
209
size_t num_statements= transaction.statement_size();
212
* Loop through all Statement messages within this Transaction and
213
* convert each to equivalent SQL statements. Complete Statements will
214
* be appended to aggregate_sql, while segmented Statements will remain
215
* in segmented_sql to be appended to until completed, or rolled back.
218
for (size_t idx= 0; idx < num_statements; idx++)
220
const message::Statement &statement= transaction.statement(idx);
222
/* We won't bother with executing a rolled back transaction */
223
if (statement.type() == message::Statement::ROLLBACK)
225
assert(idx == (num_statements - 1)); /* should be the final Statement */
226
aggregate_sql.clear();
227
segmented_sql.clear();
231
switch (statement.type())
233
/* DDL cannot be in a transaction, so precede with a COMMIT */
234
case message::Statement::TRUNCATE_TABLE:
235
case message::Statement::CREATE_SCHEMA:
236
case message::Statement::ALTER_SCHEMA:
237
case message::Statement::DROP_SCHEMA:
238
case message::Statement::CREATE_TABLE:
239
case message::Statement::ALTER_TABLE:
240
case message::Statement::DROP_TABLE:
241
case message::Statement::RAW_SQL: /* currently ALTER TABLE or RENAME */
243
segmented_sql.push_back("COMMIT");
247
/* Cancel any ongoing statement */
248
case message::Statement::ROLLBACK_STATEMENT:
250
segmented_sql.clear();
260
if (message::transformStatementToSql(statement, segmented_sql,
261
message::DRIZZLE, true))
266
if (isEndStatement(statement))
268
aggregate_sql.insert(aggregate_sql.end(),
269
segmented_sql.begin(),
270
segmented_sql.end());
271
segmented_sql.clear();
279
bool QueueConsumer::isEndStatement(const message::Statement &statement)
281
switch (statement.type())
283
case (message::Statement::INSERT):
285
const message::InsertData &data= statement.insert_data();
286
if (not data.end_segment())
290
case (message::Statement::UPDATE):
292
const message::UpdateData &data= statement.update_data();
293
if (not data.end_segment())
297
case (message::Statement::DELETE):
299
const message::DeleteData &data= statement.delete_data();
300
if (not data.end_segment())
311
void QueueConsumer::setApplierState(const string &err_msg, bool status)
313
vector<string> statements;
319
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
323
sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
326
sql.append(", `error_msg` = '", 17);
328
/* Escape embedded quotes and statement terminators */
330
for (it= msg.begin(); it != msg.end(); ++it)
334
it= msg.insert(it, '\'');
335
++it; /* advance back to the quote */
339
it= msg.insert(it, '\\');
340
++it; /* advance back to the semicolon */
347
statements.push_back(sql);
348
executeSQL(statements);
352
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
353
const string &commit_id)
355
string tmp("UPDATE `sys_replication`.`applier_state`"
356
" SET `last_applied_commit_id` = ");
357
tmp.append(commit_id);
360
return executeSQL(sql);
364
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
366
string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
367
sql.append(boost::lexical_cast<std::string>(trx_id));
369
vector<string> sql_vect;
370
sql_vect.push_back(sql);
372
return executeSQL(sql_vect);
375
} /* namespace slave */