1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21
#include <plugin/slave/queue_producer.h>
22
#include <drizzled/errmsg_print.h>
23
#include <drizzled/sql/result_set.h>
24
#include <drizzled/execute.h>
25
#include <drizzled/gettext.h>
26
#include <drizzled/message/transaction.pb.h>
27
#include <boost/lexical_cast.hpp>
28
#include <google/protobuf/text_format.h>
33
using namespace drizzled;
38
QueueProducer::~QueueProducer()
44
bool QueueProducer::init()
47
return reconnect(true);
50
bool QueueProducer::process()
52
if (_saved_max_commit_id == 0)
54
if (not queryForMaxCommitId(&_saved_max_commit_id))
56
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
60
return true; /* reconnect successful, try again */
64
_last_error_message= "Master offline";
65
return false; /* reconnect failed, shutdown the thread */
70
return false; /* unrecoverable error, shutdown the thread */
75
/* Keep getting events until caught up */
76
enum drizzled::error_t err;
77
while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
80
if (err == ER_YES) /* We encountered an error */
82
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
86
return true; /* reconnect successful, try again */
90
_last_error_message= "Master offline";
91
return false; /* reconnect failed, shutdown the thread */
96
return false; /* unrecoverable error, shutdown the thread */
103
void QueueProducer::shutdown()
105
setIOState(_last_error_message, false);
110
bool QueueProducer::reconnect(bool initial_connection)
112
if (not initial_connection)
114
errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
117
_is_connected= false;
118
_last_return= DRIZZLE_RETURN_OK;
119
_last_error_message.clear();
120
boost::posix_time::seconds duration(_seconds_between_reconnects);
122
uint32_t attempts= 1;
124
while (not openConnection())
126
if (attempts++ == _max_reconnects)
128
boost::this_thread::sleep(duration);
131
return _is_connected;
134
bool QueueProducer::openConnection()
136
if (drizzle_create(&_drizzle) == NULL)
138
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
139
_last_error_message= "Replication slave: ";
140
_last_error_message.append(drizzle_error(&_drizzle));
141
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
145
if (drizzle_con_create(&_drizzle, &_connection) == NULL)
147
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
148
_last_error_message= "Replication slave: ";
149
_last_error_message.append(drizzle_error(&_drizzle));
150
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
154
drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
155
drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
157
drizzle_return_t ret= drizzle_con_connect(&_connection);
159
if (ret != DRIZZLE_RETURN_OK)
162
_last_error_message= "Replication slave: ";
163
_last_error_message.append(drizzle_error(&_drizzle));
164
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
173
bool QueueProducer::closeConnection()
175
drizzle_return_t ret;
176
drizzle_result_st result;
178
_is_connected= false;
180
if (drizzle_quit(&_connection, &result, &ret) == NULL)
183
drizzle_result_free(&result);
187
drizzle_result_free(&result);
192
bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
195
* This SQL will get the maximum commit_id value we have pulled over from
196
* the master. We query two tables because either the queue will be empty,
197
* in which case the last_applied_commit_id will be the value we want, or
198
* we have yet to drain the queue, we get the maximum value still in
201
string sql("SELECT MAX(x.cid) FROM"
202
" (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
203
" WHERE `master_id` = "
204
+ boost::lexical_cast<string>(masterId())
205
+ " UNION ALL SELECT `last_applied_commit_id` AS cid"
206
+ " FROM `sys_replication`.`applier_state` WHERE `master_id` = "
207
+ boost::lexical_cast<string>(masterId())
210
sql::ResultSet result_set(1);
211
Execute execute(*(_session.get()), true);
212
execute.run(sql, result_set);
213
assert(result_set.getMetaData().getColumnCount() == 1);
215
/* Really should only be 1 returned row */
216
uint32_t found_rows= 0;
217
while (result_set.next())
219
string value= result_set.getString(0);
221
if ((value == "") || (found_rows == 1))
224
assert(result_set.isNull(0) == false);
225
*max_commit_id= boost::lexical_cast<uint64_t>(value);
231
_last_error_message= "Could not determine last committed transaction.";
238
bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
239
vector<uint64_t> &list)
242
string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
243
" WHERE `commit_id` > ");
244
sql.append(boost::lexical_cast<string>(max_commit_id));
245
sql.append(" ORDER BY `commit_id` LIMIT 25");
247
drizzle_return_t ret;
248
drizzle_result_st result;
249
drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
251
if (ret != DRIZZLE_RETURN_OK)
254
_last_error_message= "Replication slave: ";
255
_last_error_message.append(drizzle_error(&_drizzle));
256
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
257
drizzle_result_free(&result);
261
ret= drizzle_result_buffer(&result);
263
if (ret != DRIZZLE_RETURN_OK)
266
_last_error_message= "Replication slave: ";
267
_last_error_message.append(drizzle_error(&_drizzle));
268
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
269
drizzle_result_free(&result);
275
while ((row= drizzle_row_next(&result)) != NULL)
279
list.push_back(boost::lexical_cast<uint32_t>(row[0]));
284
_last_error_message= "Replication slave: Unexpected NULL for trx id";
285
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
286
drizzle_result_free(&result);
291
drizzle_result_free(&result);
296
bool QueueProducer::queueInsert(const char *trx_id,
298
const char *commit_id,
299
const char *originating_server_uuid,
300
const char *originating_commit_id,
302
const char *msg_length)
304
message::Transaction message;
306
message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
309
* The SQL to insert our results into the local queue.
311
string sql= "INSERT INTO `sys_replication`.`queue`"
312
" (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
313
" `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
314
sql.append(boost::lexical_cast<string>(masterId()));
320
sql.append(commit_id);
321
sql.append(", '", 3);
322
sql.append(originating_server_uuid);
323
sql.append("' , ", 4);
324
sql.append(originating_commit_id);
325
sql.append(", '", 3);
328
* Ideally we would store the Transaction message in binary form, as it
329
* it stored on the master and tranferred to the slave. However, we are
330
* inserting using drizzle::Execute which doesn't really handle binary
331
* data. Until that is changed, we store as plain text.
334
google::protobuf::TextFormat::PrintToString(message, &message_text);
337
* Execution using drizzled::Execute requires some special escaping.
339
string::iterator it= message_text.begin();
340
for (; it != message_text.end(); ++it)
344
it= message_text.insert(it, '\\');
347
else if (*it == '\'')
349
it= message_text.insert(it, '\\');
351
it= message_text.insert(it, '\\');
354
else if (*it == '\\')
356
it= message_text.insert(it, '\\');
358
it= message_text.insert(it, '\\');
360
it= message_text.insert(it, '\\');
365
it= message_text.insert(it, '\\');
366
++it; /* advance back to the semicolon */
370
sql.append(message_text);
373
vector<string> statements;
374
statements.push_back(sql);
376
if (not executeSQL(statements))
382
uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
383
if (tmp_commit_id > _saved_max_commit_id)
384
_saved_max_commit_id= tmp_commit_id;
390
enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
392
vector<uint64_t> trx_id_list;
394
if (not queryForTrxIdList(max_commit_id, trx_id_list))
397
if (trx_id_list.size() == 0) /* nothing to get from the master */
403
* The SQL to pull everything we need from the master.
405
string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`,"
406
" `originating_commit_id`, `message`, `message_len` "
407
" FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
409
for (size_t x= 0; x < trx_id_list.size(); x++)
413
sql.append(boost::lexical_cast<string>(trx_id_list[x]));
417
sql.append(" ORDER BY `commit_id` ASC");
419
drizzle_return_t ret;
420
drizzle_result_st result;
421
drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
423
if (ret != DRIZZLE_RETURN_OK)
426
_last_error_message= "Replication slave: ";
427
_last_error_message.append(drizzle_error(&_drizzle));
428
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
429
drizzle_result_free(&result);
433
/* TODO: Investigate 1-row-at-a-time buffering */
435
ret= drizzle_result_buffer(&result);
437
if (ret != DRIZZLE_RETURN_OK)
440
_last_error_message= "Replication slave: ";
441
_last_error_message.append(drizzle_error(&_drizzle));
442
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
443
drizzle_result_free(&result);
449
while ((row= drizzle_row_next(&result)) != NULL)
451
if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
453
errmsg_printf(error::ERROR,
454
_("Replication slave: Unable to insert into queue."));
455
drizzle_result_free(&result);
460
drizzle_result_free(&result);
466
void QueueProducer::setIOState(const string &err_msg, bool status)
468
vector<string> statements;
474
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
478
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
481
sql.append(", `error_msg` = '", 17);
483
/* Escape embedded quotes and statement terminators */
485
for (it= msg.begin(); it != msg.end(); ++it)
489
it= msg.insert(it, '\'');
490
++it; /* advance back to the quote */
494
it= msg.insert(it, '\\');
495
++it; /* advance back to the semicolon */
500
sql.append("' WHERE `master_id` = ");
501
sql.append(boost::lexical_cast<string>(masterId()));
503
statements.push_back(sql);
504
executeSQL(statements);
507
} /* namespace slave */