~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_producer.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2011 David Shrewsbury
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; either version 2 of the License, or
9
 
 *  (at your option) any later version.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
#include <config.h>
22
 
#include <plugin/slave/queue_producer.h>
23
 
#include <drizzled/errmsg_print.h>
24
 
#include <drizzled/sql/result_set.h>
25
 
#include <drizzled/execute.h>
26
 
#include <drizzled/gettext.h>
27
 
#include <drizzled/message/transaction.pb.h>
28
 
#include <boost/lexical_cast.hpp>
29
 
#include <google/protobuf/text_format.h>
30
 
 
31
 
using namespace std;
32
 
using namespace drizzled;
33
 
 
34
 
namespace slave
35
 
{
36
 
 
37
 
QueueProducer::~QueueProducer()
38
 
{
39
 
  if (_is_connected)
40
 
    closeConnection();
41
 
}
42
 
 
43
 
bool QueueProducer::init()
44
 
{
45
 
  setIOState("", true);
46
 
  return reconnect(true);
47
 
}
48
 
 
49
 
bool QueueProducer::process()
50
 
{
51
 
  if (_saved_max_commit_id == 0)
52
 
  {
53
 
    if (not queryForMaxCommitId(&_saved_max_commit_id))
54
 
    {
55
 
      if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
56
 
      {
57
 
        if (reconnect(false))
58
 
        {
59
 
          return true;    /* reconnect successful, try again */
60
 
        }
61
 
        else
62
 
        {
63
 
          _last_error_message= "Master offline";
64
 
          return false;   /* reconnect failed, shutdown the thread */
65
 
        }
66
 
      }
67
 
      else
68
 
      {
69
 
        return false;     /* unrecoverable error, shutdown the thread */
70
 
      }
71
 
    }
72
 
  }
73
 
 
74
 
  /* Keep getting events until caught up */
75
 
  enum drizzled::error_t err;
76
 
  while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
77
 
  {}
78
 
 
79
 
  if (err == ER_YES)  /* We encountered an error */
80
 
  {
81
 
    if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
82
 
    {
83
 
      if (reconnect(false))
84
 
      {
85
 
        return true;    /* reconnect successful, try again */
86
 
      }
87
 
      else
88
 
      {
89
 
        _last_error_message= "Master offline";
90
 
        return false;   /* reconnect failed, shutdown the thread */
91
 
      }
92
 
    }
93
 
    else
94
 
    {
95
 
      return false;     /* unrecoverable error, shutdown the thread */
96
 
    }
97
 
  }
98
 
 
99
 
  return true;
100
 
}
101
 
 
102
 
void QueueProducer::shutdown()
103
 
{
104
 
  setIOState(_last_error_message, false);
105
 
  if (_is_connected)
106
 
    closeConnection();
107
 
}
108
 
 
109
 
bool QueueProducer::reconnect(bool initial_connection)
110
 
{
111
 
  if (not initial_connection)
112
 
  {
113
 
    errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
114
 
  }
115
 
 
116
 
  _is_connected= false;
117
 
  _last_return= DRIZZLE_RETURN_OK;
118
 
  _last_error_message.clear();
119
 
  boost::posix_time::seconds duration(_seconds_between_reconnects);
120
 
 
121
 
  uint32_t attempts= 1;
122
 
 
123
 
  while (not openConnection())
124
 
  {
125
 
    if (attempts++ == _max_reconnects)
126
 
      break;
127
 
    boost::this_thread::sleep(duration);
128
 
  }
129
 
 
130
 
  return _is_connected;
131
 
}
132
 
 
133
 
bool QueueProducer::openConnection()
134
 
{
135
 
  if (drizzle_create(&_drizzle) == NULL)
136
 
  {
137
 
    _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
138
 
    _last_error_message= "Replication slave: ";
139
 
    _last_error_message.append(drizzle_error(&_drizzle));
140
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
141
 
    return false;
142
 
  }
143
 
  
144
 
  if (drizzle_con_create(&_drizzle, &_connection) == NULL)
145
 
  {
146
 
    _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
147
 
    _last_error_message= "Replication slave: ";
148
 
    _last_error_message.append(drizzle_error(&_drizzle));
149
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
150
 
    return false;
151
 
  }
152
 
  
153
 
  drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
154
 
  drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
155
 
 
156
 
  drizzle_return_t ret= drizzle_con_connect(&_connection);
157
 
 
158
 
  if (ret != DRIZZLE_RETURN_OK)
159
 
  {
160
 
    _last_return= ret;
161
 
    _last_error_message= "Replication slave: ";
162
 
    _last_error_message.append(drizzle_error(&_drizzle));
163
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
164
 
    return false;
165
 
  }
166
 
  
167
 
  _is_connected= true;
168
 
 
169
 
  return true;
170
 
}
171
 
 
172
 
bool QueueProducer::closeConnection()
173
 
{
174
 
  drizzle_return_t ret;
175
 
  drizzle_result_st result;
176
 
 
177
 
  _is_connected= false;
178
 
 
179
 
  if (drizzle_quit(&_connection, &result, &ret) == NULL)
180
 
  {
181
 
    _last_return= ret;
182
 
    drizzle_result_free(&result);
183
 
    return false;
184
 
  }
185
 
 
186
 
  drizzle_result_free(&result);
187
 
 
188
 
  return true;
189
 
}
190
 
 
191
 
bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
192
 
{
193
 
  /*
194
 
   * This SQL will get the maximum commit_id value we have pulled over from
195
 
   * the master. We query two tables because either the queue will be empty,
196
 
   * in which case the last_applied_commit_id will be the value we want, or
197
 
   * we have yet to drain the queue,  we get the maximum value still in
198
 
   * the queue.
199
 
   */
200
 
  string sql("SELECT MAX(x.cid) FROM"
201
 
             " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
202
 
             "  UNION ALL SELECT `last_applied_commit_id` AS cid"
203
 
             "  FROM `sys_replication`.`applier_state`) AS x");
204
 
 
205
 
  sql::ResultSet result_set(1);
206
 
  Execute execute(*(_session.get()), true);
207
 
  execute.run(sql, result_set);
208
 
  assert(result_set.getMetaData().getColumnCount() == 1);
209
 
 
210
 
  /* Really should only be 1 returned row */
211
 
  uint32_t found_rows= 0;
212
 
  while (result_set.next())
213
 
  {
214
 
    string value= result_set.getString(0);
215
 
 
216
 
    if ((value == "") || (found_rows == 1))
217
 
      break;
218
 
 
219
 
    assert(result_set.isNull(0) == false);
220
 
    *max_commit_id= boost::lexical_cast<uint64_t>(value);
221
 
    found_rows++;
222
 
  }
223
 
 
224
 
  if (found_rows == 0)
225
 
  {
226
 
    _last_error_message= "Could not determine last committed transaction.";
227
 
    return false;
228
 
  }
229
 
 
230
 
  return true;
231
 
}
232
 
 
233
 
bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
234
 
                                      vector<uint64_t> &list)
235
 
{
236
 
  (void)list;
237
 
  string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
238
 
             " WHERE `commit_id` > ");
239
 
  sql.append(boost::lexical_cast<string>(max_commit_id));
240
 
  sql.append(" ORDER BY `commit_id` LIMIT 25");
241
 
 
242
 
  drizzle_return_t ret;
243
 
  drizzle_result_st result;
244
 
  drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
245
 
  
246
 
  if (ret != DRIZZLE_RETURN_OK)
247
 
  {
248
 
    _last_return= ret;
249
 
    _last_error_message= "Replication slave: ";
250
 
    _last_error_message.append(drizzle_error(&_drizzle));
251
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
252
 
    drizzle_result_free(&result);
253
 
    return false;
254
 
  }
255
 
 
256
 
  ret= drizzle_result_buffer(&result);
257
 
 
258
 
  if (ret != DRIZZLE_RETURN_OK)
259
 
  {
260
 
    _last_return= ret;
261
 
    _last_error_message= "Replication slave: ";
262
 
    _last_error_message.append(drizzle_error(&_drizzle));
263
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
264
 
    drizzle_result_free(&result);
265
 
    return false;
266
 
  }
267
 
 
268
 
  drizzle_row_t row;
269
 
 
270
 
  while ((row= drizzle_row_next(&result)) != NULL)
271
 
  {
272
 
    if (row[0])
273
 
    {
274
 
      list.push_back(boost::lexical_cast<uint32_t>(row[0]));
275
 
    }
276
 
    else
277
 
    {
278
 
      _last_return= ret;
279
 
      _last_error_message= "Replication slave: Unexpected NULL for trx id";
280
 
      errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
281
 
      drizzle_result_free(&result);
282
 
      return false;
283
 
    }
284
 
  }
285
 
 
286
 
  drizzle_result_free(&result);
287
 
  return true;
288
 
}
289
 
 
290
 
 
291
 
bool QueueProducer::queueInsert(const char *trx_id,
292
 
                                const char *seg_id,
293
 
                                const char *commit_id,
294
 
                                const char *originating_server_uuid,
295
 
                                const char *originating_commit_id,
296
 
                                const char *msg,
297
 
                                const char *msg_length)
298
 
{
299
 
  message::Transaction message;
300
 
 
301
 
  message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
302
 
 
303
 
  /*
304
 
   * The SQL to insert our results into the local queue.
305
 
   */
306
 
  string sql= "INSERT INTO `sys_replication`.`queue`"
307
 
              " (`trx_id`, `seg_id`, `commit_order`,"
308
 
              "  `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
309
 
  sql.append(trx_id);
310
 
  sql.append(", ", 2);
311
 
  sql.append(seg_id);
312
 
  sql.append(", ", 2);
313
 
  sql.append(commit_id);
314
 
  sql.append(", '", 3);
315
 
  sql.append(originating_server_uuid);
316
 
  sql.append("' , ", 4);
317
 
  sql.append(originating_commit_id);
318
 
  sql.append(", '", 3);
319
 
 
320
 
  /*
321
 
   * Ideally we would store the Transaction message in binary form, as it
322
 
   * it stored on the master and tranferred to the slave. However, we are
323
 
   * inserting using drizzle::Execute which doesn't really handle binary
324
 
   * data. Until that is changed, we store as plain text.
325
 
   */
326
 
  string message_text;
327
 
  google::protobuf::TextFormat::PrintToString(message, &message_text);  
328
 
 
329
 
  /*
330
 
   * Execution using drizzled::Execute requires some special escaping.
331
 
   */
332
 
  string::iterator it= message_text.begin();
333
 
  for (; it != message_text.end(); ++it)
334
 
  {
335
 
    if (*it == '\"')
336
 
    {
337
 
      it= message_text.insert(it, '\\');
338
 
      ++it;
339
 
    }
340
 
    else if (*it == '\'')
341
 
    {
342
 
      it= message_text.insert(it, '\\');
343
 
      ++it;
344
 
      it= message_text.insert(it, '\\');
345
 
      ++it;
346
 
    }
347
 
    else if (*it == '\\')
348
 
    {
349
 
      it= message_text.insert(it, '\\');
350
 
      ++it;
351
 
      it= message_text.insert(it, '\\');
352
 
      ++it;
353
 
      it= message_text.insert(it, '\\');
354
 
      ++it;
355
 
    }
356
 
    else if (*it == ';')
357
 
    {
358
 
      it= message_text.insert(it, '\\');
359
 
      ++it;  /* advance back to the semicolon */
360
 
    }
361
 
  }
362
 
 
363
 
  sql.append(message_text);
364
 
  sql.append("')", 2);
365
 
 
366
 
  vector<string> statements;
367
 
  statements.push_back(sql);
368
 
 
369
 
  if (not executeSQL(statements))
370
 
  {
371
 
    markInErrorState();
372
 
    return false;
373
 
  }
374
 
 
375
 
  uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
376
 
  if (tmp_commit_id > _saved_max_commit_id)
377
 
    _saved_max_commit_id= tmp_commit_id;
378
 
 
379
 
  return true;
380
 
}
381
 
 
382
 
 
383
 
enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
384
 
{
385
 
  vector<uint64_t> trx_id_list;
386
 
 
387
 
  if (not queryForTrxIdList(max_commit_id, trx_id_list))
388
 
    return ER_YES;
389
 
 
390
 
  if (trx_id_list.size() == 0)    /* nothing to get from the master */
391
 
  {
392
 
    return ER_NO;
393
 
  }
394
 
 
395
 
  /*
396
 
   * The SQL to pull everything we need from the master.
397
 
   */
398
 
  string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`,"
399
 
              " `originating_commit_id`, `message`, `message_len` "
400
 
              " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
401
 
 
402
 
  for (size_t x= 0; x < trx_id_list.size(); x++)
403
 
  {
404
 
    if (x > 0)
405
 
      sql.append(", ", 2);
406
 
    sql.append(boost::lexical_cast<string>(trx_id_list[x]));
407
 
  }
408
 
 
409
 
  sql.append(")", 1);
410
 
  sql.append(" ORDER BY `commit_id` ASC");
411
 
 
412
 
  drizzle_return_t ret;
413
 
  drizzle_result_st result;
414
 
  drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
415
 
  
416
 
  if (ret != DRIZZLE_RETURN_OK)
417
 
  {
418
 
    _last_return= ret;
419
 
    _last_error_message= "Replication slave: ";
420
 
    _last_error_message.append(drizzle_error(&_drizzle));
421
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
422
 
    drizzle_result_free(&result);
423
 
    return ER_YES;
424
 
  }
425
 
 
426
 
  /* TODO: Investigate 1-row-at-a-time buffering */
427
 
 
428
 
  ret= drizzle_result_buffer(&result);
429
 
 
430
 
  if (ret != DRIZZLE_RETURN_OK)
431
 
  {
432
 
    _last_return= ret;
433
 
    _last_error_message= "Replication slave: ";
434
 
    _last_error_message.append(drizzle_error(&_drizzle));
435
 
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
436
 
    drizzle_result_free(&result);
437
 
    return ER_YES;
438
 
  }
439
 
 
440
 
  drizzle_row_t row;
441
 
 
442
 
  while ((row= drizzle_row_next(&result)) != NULL)
443
 
  {
444
 
    if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
445
 
    {
446
 
      errmsg_printf(error::ERROR,
447
 
                    _("Replication slave: Unable to insert into queue."));
448
 
      drizzle_result_free(&result);
449
 
      return ER_YES;
450
 
    }
451
 
  }
452
 
 
453
 
  drizzle_result_free(&result);
454
 
 
455
 
  return EE_OK;
456
 
}
457
 
 
458
 
 
459
 
void QueueProducer::setIOState(const string &err_msg, bool status)
460
 
{
461
 
  vector<string> statements;
462
 
  string sql;
463
 
  string msg(err_msg);
464
 
 
465
 
  if (not status)
466
 
  {
467
 
    sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
468
 
  }
469
 
  else
470
 
  {
471
 
    sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
472
 
  }
473
 
  
474
 
  sql.append(", `error_msg` = '", 17);
475
 
 
476
 
  /* Escape embedded quotes and statement terminators */
477
 
  string::iterator it;
478
 
  for (it= msg.begin(); it != msg.end(); ++it)
479
 
  {
480
 
    if (*it == '\'')
481
 
    {
482
 
      it= msg.insert(it, '\'');
483
 
      ++it;  /* advance back to the quote */
484
 
    }
485
 
    else if (*it == ';')
486
 
    {
487
 
      it= msg.insert(it, '\\');
488
 
      ++it;  /* advance back to the semicolon */
489
 
    }
490
 
  }
491
 
  
492
 
  sql.append(msg);
493
 
  sql.append("'", 1);
494
 
 
495
 
  statements.push_back(sql);
496
 
  executeSQL(statements);
497
 
}
498
 
 
499
 
} /* namespace slave */