1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include <plugin/slave/queue_producer.h>
23
#include <drizzled/errmsg_print.h>
24
#include <drizzled/sql/result_set.h>
25
#include <drizzled/execute.h>
26
#include <drizzled/gettext.h>
27
#include <drizzled/message/transaction.pb.h>
28
#include <boost/lexical_cast.hpp>
29
#include <google/protobuf/text_format.h>
32
using namespace drizzled;
37
QueueProducer::~QueueProducer()
43
bool QueueProducer::init()
46
return reconnect(true);
49
bool QueueProducer::process()
51
if (_saved_max_commit_id == 0)
53
if (not queryForMaxCommitId(&_saved_max_commit_id))
55
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
59
return true; /* reconnect successful, try again */
63
_last_error_message= "Master offline";
64
return false; /* reconnect failed, shutdown the thread */
69
return false; /* unrecoverable error, shutdown the thread */
74
/* Keep getting events until caught up */
75
enum drizzled::error_t err;
76
while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
79
if (err == ER_YES) /* We encountered an error */
81
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
85
return true; /* reconnect successful, try again */
89
_last_error_message= "Master offline";
90
return false; /* reconnect failed, shutdown the thread */
95
return false; /* unrecoverable error, shutdown the thread */
102
void QueueProducer::shutdown()
104
setIOState(_last_error_message, false);
109
bool QueueProducer::reconnect(bool initial_connection)
111
if (not initial_connection)
113
errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
116
_is_connected= false;
117
_last_return= DRIZZLE_RETURN_OK;
118
_last_error_message.clear();
119
boost::posix_time::seconds duration(_seconds_between_reconnects);
121
uint32_t attempts= 1;
123
while (not openConnection())
125
if (attempts++ == _max_reconnects)
127
boost::this_thread::sleep(duration);
130
return _is_connected;
133
bool QueueProducer::openConnection()
135
if (drizzle_create(&_drizzle) == NULL)
137
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
138
_last_error_message= "Replication slave: ";
139
_last_error_message.append(drizzle_error(&_drizzle));
140
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
144
if (drizzle_con_create(&_drizzle, &_connection) == NULL)
146
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
147
_last_error_message= "Replication slave: ";
148
_last_error_message.append(drizzle_error(&_drizzle));
149
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
153
drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
154
drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
156
drizzle_return_t ret= drizzle_con_connect(&_connection);
158
if (ret != DRIZZLE_RETURN_OK)
161
_last_error_message= "Replication slave: ";
162
_last_error_message.append(drizzle_error(&_drizzle));
163
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
172
bool QueueProducer::closeConnection()
174
drizzle_return_t ret;
175
drizzle_result_st result;
177
_is_connected= false;
179
if (drizzle_quit(&_connection, &result, &ret) == NULL)
182
drizzle_result_free(&result);
186
drizzle_result_free(&result);
191
bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
194
* This SQL will get the maximum commit_id value we have pulled over from
195
* the master. We query two tables because either the queue will be empty,
196
* in which case the last_applied_commit_id will be the value we want, or
197
* we have yet to drain the queue, we get the maximum value still in
200
string sql("SELECT MAX(x.cid) FROM"
201
" (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
202
" UNION ALL SELECT `last_applied_commit_id` AS cid"
203
" FROM `sys_replication`.`applier_state`) AS x");
205
sql::ResultSet result_set(1);
206
Execute execute(*(_session.get()), true);
207
execute.run(sql, result_set);
208
assert(result_set.getMetaData().getColumnCount() == 1);
210
/* Really should only be 1 returned row */
211
uint32_t found_rows= 0;
212
while (result_set.next())
214
string value= result_set.getString(0);
216
if ((value == "") || (found_rows == 1))
219
assert(result_set.isNull(0) == false);
220
*max_commit_id= boost::lexical_cast<uint64_t>(value);
226
_last_error_message= "Could not determine last committed transaction.";
233
bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
234
vector<uint64_t> &list)
237
string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
238
" WHERE `commit_id` > ");
239
sql.append(boost::lexical_cast<string>(max_commit_id));
240
sql.append(" ORDER BY `commit_id` LIMIT 25");
242
drizzle_return_t ret;
243
drizzle_result_st result;
244
drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
246
if (ret != DRIZZLE_RETURN_OK)
249
_last_error_message= "Replication slave: ";
250
_last_error_message.append(drizzle_error(&_drizzle));
251
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
252
drizzle_result_free(&result);
256
ret= drizzle_result_buffer(&result);
258
if (ret != DRIZZLE_RETURN_OK)
261
_last_error_message= "Replication slave: ";
262
_last_error_message.append(drizzle_error(&_drizzle));
263
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
264
drizzle_result_free(&result);
270
while ((row= drizzle_row_next(&result)) != NULL)
274
list.push_back(boost::lexical_cast<uint32_t>(row[0]));
279
_last_error_message= "Replication slave: Unexpected NULL for trx id";
280
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
281
drizzle_result_free(&result);
286
drizzle_result_free(&result);
291
bool QueueProducer::queueInsert(const char *trx_id,
293
const char *commit_id,
294
const char *originating_server_uuid,
295
const char *originating_commit_id,
297
const char *msg_length)
299
message::Transaction message;
301
message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
304
* The SQL to insert our results into the local queue.
306
string sql= "INSERT INTO `sys_replication`.`queue`"
307
" (`trx_id`, `seg_id`, `commit_order`,"
308
" `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
313
sql.append(commit_id);
314
sql.append(", '", 3);
315
sql.append(originating_server_uuid);
316
sql.append("' , ", 4);
317
sql.append(originating_commit_id);
318
sql.append(", '", 3);
321
* Ideally we would store the Transaction message in binary form, as it
322
* it stored on the master and tranferred to the slave. However, we are
323
* inserting using drizzle::Execute which doesn't really handle binary
324
* data. Until that is changed, we store as plain text.
327
google::protobuf::TextFormat::PrintToString(message, &message_text);
330
* Execution using drizzled::Execute requires some special escaping.
332
string::iterator it= message_text.begin();
333
for (; it != message_text.end(); ++it)
337
it= message_text.insert(it, '\\');
340
else if (*it == '\'')
342
it= message_text.insert(it, '\\');
344
it= message_text.insert(it, '\\');
347
else if (*it == '\\')
349
it= message_text.insert(it, '\\');
351
it= message_text.insert(it, '\\');
353
it= message_text.insert(it, '\\');
358
it= message_text.insert(it, '\\');
359
++it; /* advance back to the semicolon */
363
sql.append(message_text);
366
vector<string> statements;
367
statements.push_back(sql);
369
if (not executeSQL(statements))
375
uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
376
if (tmp_commit_id > _saved_max_commit_id)
377
_saved_max_commit_id= tmp_commit_id;
383
enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
385
vector<uint64_t> trx_id_list;
387
if (not queryForTrxIdList(max_commit_id, trx_id_list))
390
if (trx_id_list.size() == 0) /* nothing to get from the master */
396
* The SQL to pull everything we need from the master.
398
string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`,"
399
" `originating_commit_id`, `message`, `message_len` "
400
" FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
402
for (size_t x= 0; x < trx_id_list.size(); x++)
406
sql.append(boost::lexical_cast<string>(trx_id_list[x]));
410
sql.append(" ORDER BY `commit_id` ASC");
412
drizzle_return_t ret;
413
drizzle_result_st result;
414
drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
416
if (ret != DRIZZLE_RETURN_OK)
419
_last_error_message= "Replication slave: ";
420
_last_error_message.append(drizzle_error(&_drizzle));
421
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
422
drizzle_result_free(&result);
426
/* TODO: Investigate 1-row-at-a-time buffering */
428
ret= drizzle_result_buffer(&result);
430
if (ret != DRIZZLE_RETURN_OK)
433
_last_error_message= "Replication slave: ";
434
_last_error_message.append(drizzle_error(&_drizzle));
435
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
436
drizzle_result_free(&result);
442
while ((row= drizzle_row_next(&result)) != NULL)
444
if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
446
errmsg_printf(error::ERROR,
447
_("Replication slave: Unable to insert into queue."));
448
drizzle_result_free(&result);
453
drizzle_result_free(&result);
459
void QueueProducer::setIOState(const string &err_msg, bool status)
461
vector<string> statements;
467
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
471
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
474
sql.append(", `error_msg` = '", 17);
476
/* Escape embedded quotes and statement terminators */
478
for (it= msg.begin(); it != msg.end(); ++it)
482
it= msg.insert(it, '\'');
483
++it; /* advance back to the quote */
487
it= msg.insert(it, '\\');
488
++it; /* advance back to the semicolon */
495
statements.push_back(sql);
496
executeSQL(statements);
499
} /* namespace slave */