1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 David Shrewsbury
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
#include <plugin/slave/queue_producer.h>
23
#include <drizzled/errmsg_print.h>
24
#include <drizzled/sql/result_set.h>
25
#include <drizzled/execute.h>
26
#include <drizzled/gettext.h>
27
#include <drizzled/message/transaction.pb.h>
28
#include <boost/lexical_cast.hpp>
29
#include <google/protobuf/text_format.h>
32
using namespace drizzled;
37
QueueProducer::~QueueProducer()
43
bool QueueProducer::init()
46
return openConnection();
49
bool QueueProducer::process()
51
if (_saved_max_commit_id == 0)
53
if (not queryForMaxCommitId(&_saved_max_commit_id))
55
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
59
return true; /* reconnect successful, try again */
63
_last_error_message= "Master offline";
64
return false; /* reconnect failed, shutdown the thread */
69
return false; /* unrecoverable error, shutdown the thread */
74
if (not queryForReplicationEvents(_saved_max_commit_id))
76
if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
80
return true; /* reconnect successful, try again */
84
_last_error_message= "Master offline";
85
return false; /* reconnect failed, shutdown the thread */
90
return false; /* unrecoverable error, shutdown the thread */
97
void QueueProducer::shutdown()
99
setIOState(_last_error_message, false);
104
bool QueueProducer::reconnect()
106
errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
108
_is_connected= false;
109
_last_return= DRIZZLE_RETURN_OK;
110
_last_error_message.clear();
111
boost::posix_time::seconds duration(_seconds_between_reconnects);
113
uint32_t attempts= 1;
115
while (not openConnection())
117
if (attempts++ == _max_reconnects)
119
boost::this_thread::sleep(duration);
122
return _is_connected;
125
bool QueueProducer::openConnection()
127
if (drizzle_create(&_drizzle) == NULL)
129
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
130
_last_error_message= "Replication slave: ";
131
_last_error_message.append(drizzle_error(&_drizzle));
132
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
136
if (drizzle_con_create(&_drizzle, &_connection) == NULL)
138
_last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
139
_last_error_message= "Replication slave: ";
140
_last_error_message.append(drizzle_error(&_drizzle));
141
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
145
drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
146
drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
148
drizzle_return_t ret= drizzle_con_connect(&_connection);
150
if (ret != DRIZZLE_RETURN_OK)
153
_last_error_message= "Replication slave: ";
154
_last_error_message.append(drizzle_error(&_drizzle));
155
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
164
bool QueueProducer::closeConnection()
166
drizzle_return_t ret;
167
drizzle_result_st result;
169
_is_connected= false;
171
if (drizzle_quit(&_connection, &result, &ret) == NULL)
177
drizzle_result_free(&result);
182
bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
185
* This SQL will get the maximum commit_id value we have pulled over from
186
* the master. We query two tables because either the queue will be empty,
187
* in which case the last_applied_commit_id will be the value we want, or
188
* we have yet to drain the queue, we get the maximum value still in
191
string sql("SELECT MAX(x.cid) FROM"
192
" (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
193
" UNION ALL SELECT `last_applied_commit_id` AS cid"
194
" FROM `sys_replication`.`applier_state`) AS x");
196
sql::ResultSet result_set(1);
197
Execute execute(*(_session.get()), true);
198
execute.run(sql, result_set);
199
assert(result_set.getMetaData().getColumnCount() == 1);
201
/* Really should only be 1 returned row */
202
uint32_t found_rows= 0;
203
while (result_set.next())
205
string value= result_set.getString(0);
207
if ((value == "") || (found_rows == 1))
210
assert(result_set.isNull(0) == false);
211
*max_commit_id= boost::lexical_cast<uint64_t>(value);
217
_last_error_message= "Could not determine last committed transaction.";
224
bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
225
vector<uint64_t> &list)
228
string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
229
" WHERE `commit_id` > ");
230
sql.append(boost::lexical_cast<string>(max_commit_id));
231
sql.append(" ORDER BY `commit_id` LIMIT 25");
233
drizzle_return_t ret;
234
drizzle_result_st result;
235
drizzle_result_create(&_connection, &result);
236
drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
238
if (ret != DRIZZLE_RETURN_OK)
241
_last_error_message= "Replication slave: ";
242
_last_error_message.append(drizzle_error(&_drizzle));
243
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
247
ret= drizzle_result_buffer(&result);
249
if (ret != DRIZZLE_RETURN_OK)
252
_last_error_message= "Replication slave: ";
253
_last_error_message.append(drizzle_error(&_drizzle));
254
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
255
drizzle_result_free(&result);
261
while ((row= drizzle_row_next(&result)) != NULL)
265
list.push_back(boost::lexical_cast<uint32_t>(row[0]));
270
_last_error_message= "Replication slave: Unexpected NULL for trx id";
271
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
272
drizzle_result_free(&result);
277
drizzle_result_free(&result);
282
bool QueueProducer::queueInsert(const char *trx_id,
284
const char *commit_id,
286
const char *msg_length)
288
message::Transaction message;
290
message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
293
* The SQL to insert our results into the local queue.
295
string sql= "INSERT INTO `sys_replication`.`queue`"
296
" (`trx_id`, `seg_id`, `commit_order`, `msg`) VALUES (";
301
sql.append(commit_id);
302
sql.append(", '", 3);
305
* Ideally we would store the Transaction message in binary form, as it
306
* it stored on the master and tranferred to the slave. However, we are
307
* inserting using drizzle::Execute which doesn't really handle binary
308
* data. Until that is changed, we store as plain text.
311
google::protobuf::TextFormat::PrintToString(message, &message_text);
314
* Execution using drizzled::Execute requires some special escaping.
316
string::iterator it= message_text.begin();
317
for (; it != message_text.end(); ++it)
321
it= message_text.insert(it, '\\');
324
else if (*it == '\'')
326
it= message_text.insert(it, '\\');
328
it= message_text.insert(it, '\\');
331
else if (*it == '\\')
333
it= message_text.insert(it, '\\');
335
it= message_text.insert(it, '\\');
337
it= message_text.insert(it, '\\');
342
sql.append(message_text);
345
vector<string> statements;
346
statements.push_back(sql);
348
if (not executeSQL(statements))
354
uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
355
if (tmp_commit_id > _saved_max_commit_id)
356
_saved_max_commit_id= tmp_commit_id;
362
bool QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
364
vector<uint64_t> trx_id_list;
366
if (not queryForTrxIdList(max_commit_id, trx_id_list))
369
if (trx_id_list.size() == 0) /* nothing to get from the master */
375
* The SQL to pull everything we need from the master.
377
string sql= "SELECT `id`, `segid`, `commit_id`, `message`, `message_len` "
378
" FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
380
for (size_t x= 0; x < trx_id_list.size(); x++)
384
sql.append(boost::lexical_cast<string>(trx_id_list[x]));
389
drizzle_return_t ret;
390
drizzle_result_st result;
391
drizzle_result_create(&_connection, &result);
392
drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
394
if (ret != DRIZZLE_RETURN_OK)
397
_last_error_message= "Replication slave: ";
398
_last_error_message.append(drizzle_error(&_drizzle));
399
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
403
/* TODO: Investigate 1-row-at-a-time buffering */
405
ret= drizzle_result_buffer(&result);
407
if (ret != DRIZZLE_RETURN_OK)
410
_last_error_message= "Replication slave: ";
411
_last_error_message.append(drizzle_error(&_drizzle));
412
errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
413
drizzle_result_free(&result);
419
while ((row= drizzle_row_next(&result)) != NULL)
421
if (not queueInsert(row[0], row[1], row[2], row[3], row[4]))
423
errmsg_printf(error::ERROR,
424
_("Replication slave: Unable to insert into queue."));
429
drizzle_result_free(&result);
435
void QueueProducer::setIOState(const string &err_msg, bool status)
437
vector<string> statements;
443
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
447
sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
450
sql.append(", `error_msg` = '", 17);
452
/* Escape embedded quotes and statement terminators */
454
for (it= msg.begin(); it != msg.end(); ++it)
458
it= msg.insert(it, '\'');
459
++it; /* advance back to the quote */
463
it= msg.insert(it, '\\');
464
++it; /* advance back to the semicolon */
471
statements.push_back(sql);
472
executeSQL(statements);
475
} /* namespace slave */