~drizzle-trunk/drizzle/development

2116.1.23 by David Shrewsbury
Added empty version of producer thread
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2011 David Shrewsbury
5
 *
6
 *  This program is free software; you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation; either version 2 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
2116.1.38 by David Shrewsbury
Change include style
21
#include <plugin/slave/queue_producer.h>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
22
#include <drizzled/errmsg_print.h>
2116.1.32 by David Shrewsbury
incremental
23
#include <drizzled/sql/result_set.h>
24
#include <drizzled/execute.h>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
25
#include <drizzled/gettext.h>
2116.1.32 by David Shrewsbury
incremental
26
#include <drizzled/message/transaction.pb.h>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
27
#include <boost/lexical_cast.hpp>
2116.1.32 by David Shrewsbury
incremental
28
#include <google/protobuf/text_format.h>
2360.1.1 by Mark Atwood
restore multi master replication
29
#include <string>
30
#include <vector>
2116.1.23 by David Shrewsbury
Added empty version of producer thread
31
2116.1.29 by David Shrewsbury
Initial stages of master connection
32
using namespace std;
33
using namespace drizzled;
2116.1.28 by David Shrewsbury
More config file work
34
2116.1.23 by David Shrewsbury
Added empty version of producer thread
35
namespace slave
36
{
37
2116.1.29 by David Shrewsbury
Initial stages of master connection
38
QueueProducer::~QueueProducer()
39
{
2116.1.30 by David Shrewsbury
incremental
40
  if (_is_connected)
41
    closeConnection();
2116.1.29 by David Shrewsbury
Initial stages of master connection
42
}
43
2116.1.23 by David Shrewsbury
Added empty version of producer thread
44
bool QueueProducer::init()
45
{
2116.1.33 by David Shrewsbury
incremental
46
  setIOState("", true);
2116.2.1 by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908
47
  return reconnect(true);
2116.1.23 by David Shrewsbury
Added empty version of producer thread
48
}
49
50
bool QueueProducer::process()
51
{
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
52
  if (_saved_max_commit_id == 0)
53
  {
54
    if (not queryForMaxCommitId(&_saved_max_commit_id))
55
    {
2116.1.32 by David Shrewsbury
incremental
56
      if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
57
      {
2116.2.1 by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908
58
        if (reconnect(false))
2116.1.32 by David Shrewsbury
incremental
59
        {
60
          return true;    /* reconnect successful, try again */
61
        }
62
        else
63
        {
2116.1.33 by David Shrewsbury
incremental
64
          _last_error_message= "Master offline";
2116.1.32 by David Shrewsbury
incremental
65
          return false;   /* reconnect failed, shutdown the thread */
66
        }
67
      }
68
      else
69
      {
70
        return false;     /* unrecoverable error, shutdown the thread */
71
      }
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
72
    }
73
  }
74
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
75
  /* Keep getting events until caught up */
2221.4.2 by David Shrewsbury
Fix ambiguity of error_t in slave plugin.
76
  enum drizzled::error_t err;
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
77
  while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
78
  {}
79
80
  if (err == ER_YES)  /* We encountered an error */
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
81
  {
2116.1.32 by David Shrewsbury
incremental
82
    if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
83
    {
2116.2.1 by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908
84
      if (reconnect(false))
2116.1.32 by David Shrewsbury
incremental
85
      {
86
        return true;    /* reconnect successful, try again */
87
      }
88
      else
89
      {
2116.1.33 by David Shrewsbury
incremental
90
        _last_error_message= "Master offline";
2116.1.32 by David Shrewsbury
incremental
91
        return false;   /* reconnect failed, shutdown the thread */
92
      }
93
    }
94
    else
95
    {
96
      return false;     /* unrecoverable error, shutdown the thread */
97
    }
2116.1.30 by David Shrewsbury
incremental
98
  }
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
99
2116.1.23 by David Shrewsbury
Added empty version of producer thread
100
  return true;
101
}
102
103
void QueueProducer::shutdown()
104
{
2116.1.33 by David Shrewsbury
incremental
105
  setIOState(_last_error_message, false);
2116.1.30 by David Shrewsbury
incremental
106
  if (_is_connected)
107
    closeConnection();
2116.1.23 by David Shrewsbury
Added empty version of producer thread
108
}
109
2116.2.2 by Joseph Daly
fix bad variable names
110
bool QueueProducer::reconnect(bool initial_connection)
2116.1.32 by David Shrewsbury
incremental
111
{
2116.2.2 by Joseph Daly
fix bad variable names
112
  if (not initial_connection)
2116.2.1 by Joseph Daly
call reconnect() rather then openConnection() upon initialization, this fixes bug 720908
113
  {
114
    errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
115
  }
2116.1.32 by David Shrewsbury
incremental
116
117
  _is_connected= false;
118
  _last_return= DRIZZLE_RETURN_OK;
2116.1.33 by David Shrewsbury
incremental
119
  _last_error_message.clear();
2116.1.32 by David Shrewsbury
incremental
120
  boost::posix_time::seconds duration(_seconds_between_reconnects);
121
122
  uint32_t attempts= 1;
123
124
  while (not openConnection())
125
  {
126
    if (attempts++ == _max_reconnects)
127
      break;
128
    boost::this_thread::sleep(duration);
129
  }
130
131
  return _is_connected;
132
}
133
2116.1.29 by David Shrewsbury
Initial stages of master connection
134
bool QueueProducer::openConnection()
135
{
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
136
  if (drizzle_create(&_drizzle) == NULL)
2116.1.29 by David Shrewsbury
Initial stages of master connection
137
  {
2116.1.32 by David Shrewsbury
incremental
138
    _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
2116.1.33 by David Shrewsbury
incremental
139
    _last_error_message= "Replication slave: ";
140
    _last_error_message.append(drizzle_error(&_drizzle));
141
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2116.1.29 by David Shrewsbury
Initial stages of master connection
142
    return false;
143
  }
144
  
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
145
  if (drizzle_con_create(&_drizzle, &_connection) == NULL)
2116.1.29 by David Shrewsbury
Initial stages of master connection
146
  {
2116.1.32 by David Shrewsbury
incremental
147
    _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
2116.1.33 by David Shrewsbury
incremental
148
    _last_error_message= "Replication slave: ";
149
    _last_error_message.append(drizzle_error(&_drizzle));
150
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2116.1.29 by David Shrewsbury
Initial stages of master connection
151
    return false;
152
  }
153
  
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
154
  drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
155
  drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
2116.1.29 by David Shrewsbury
Initial stages of master connection
156
2116.1.32 by David Shrewsbury
incremental
157
  drizzle_return_t ret= drizzle_con_connect(&_connection);
2116.1.29 by David Shrewsbury
Initial stages of master connection
158
159
  if (ret != DRIZZLE_RETURN_OK)
160
  {
2116.1.32 by David Shrewsbury
incremental
161
    _last_return= ret;
2116.1.33 by David Shrewsbury
incremental
162
    _last_error_message= "Replication slave: ";
163
    _last_error_message.append(drizzle_error(&_drizzle));
164
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2116.1.29 by David Shrewsbury
Initial stages of master connection
165
    return false;
166
  }
167
  
2116.1.30 by David Shrewsbury
incremental
168
  _is_connected= true;
169
2116.1.29 by David Shrewsbury
Initial stages of master connection
170
  return true;
171
}
172
173
bool QueueProducer::closeConnection()
174
{
175
  drizzle_return_t ret;
176
  drizzle_result_st result;
177
2116.1.30 by David Shrewsbury
incremental
178
  _is_connected= false;
179
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
180
  if (drizzle_quit(&_connection, &result, &ret) == NULL)
181
  {
2116.1.32 by David Shrewsbury
incremental
182
    _last_return= ret;
2217.1.2 by Andrew Hutchings
Fix leaks in slave result sets
183
    drizzle_result_free(&result);
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
184
    return false;
185
  }
186
187
  drizzle_result_free(&result);
188
189
  return true;
190
}
191
2116.1.50 by David Shrewsbury
Fix commit_id to correct uint64_t type
192
bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
193
{
194
  /*
195
   * This SQL will get the maximum commit_id value we have pulled over from
196
   * the master. We query two tables because either the queue will be empty,
197
   * in which case the last_applied_commit_id will be the value we want, or
198
   * we have yet to drain the queue,  we get the maximum value still in
199
   * the queue.
200
   */
201
  string sql("SELECT MAX(x.cid) FROM"
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
202
             " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
2360.1.1 by Mark Atwood
restore multi master replication
203
             "  WHERE `master_id` = "
204
             + boost::lexical_cast<string>(masterId())
205
             + "  UNION ALL SELECT `last_applied_commit_id` AS cid"
206
             + "  FROM `sys_replication`.`applier_state` WHERE `master_id` = "
207
             + boost::lexical_cast<string>(masterId())
208
             + ") AS x");
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
209
2116.1.32 by David Shrewsbury
incremental
210
  sql::ResultSet result_set(1);
211
  Execute execute(*(_session.get()), true);
212
  execute.run(sql, result_set);
213
  assert(result_set.getMetaData().getColumnCount() == 1);
214
215
  /* Really should only be 1 returned row */
216
  uint32_t found_rows= 0;
217
  while (result_set.next())
218
  {
219
    string value= result_set.getString(0);
220
221
    if ((value == "") || (found_rows == 1))
222
      break;
223
224
    assert(result_set.isNull(0) == false);
2116.1.50 by David Shrewsbury
Fix commit_id to correct uint64_t type
225
    *max_commit_id= boost::lexical_cast<uint64_t>(value);
2116.1.32 by David Shrewsbury
incremental
226
    found_rows++;
227
  }
228
229
  if (found_rows == 0)
2116.1.33 by David Shrewsbury
incremental
230
  {
231
    _last_error_message= "Could not determine last committed transaction.";
2116.1.32 by David Shrewsbury
incremental
232
    return false;
2116.1.33 by David Shrewsbury
incremental
233
  }
2116.1.32 by David Shrewsbury
incremental
234
235
  return true;
236
}
237
2116.1.50 by David Shrewsbury
Fix commit_id to correct uint64_t type
238
bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
2116.1.32 by David Shrewsbury
incremental
239
                                      vector<uint64_t> &list)
240
{
241
  (void)list;
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
242
  string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
2116.1.50 by David Shrewsbury
Fix commit_id to correct uint64_t type
243
             " WHERE `commit_id` > ");
2116.1.32 by David Shrewsbury
incremental
244
  sql.append(boost::lexical_cast<string>(max_commit_id));
2116.1.51 by David Shrewsbury
Fix for bug 720886: cached max commit_id value was not being set properly causing us to pull the same message from the master more than once
245
  sql.append(" ORDER BY `commit_id` LIMIT 25");
2116.1.32 by David Shrewsbury
incremental
246
247
  drizzle_return_t ret;
248
  drizzle_result_st result;
249
  drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
250
  
251
  if (ret != DRIZZLE_RETURN_OK)
252
  {
253
    _last_return= ret;
2116.1.33 by David Shrewsbury
incremental
254
    _last_error_message= "Replication slave: ";
255
    _last_error_message.append(drizzle_error(&_drizzle));
256
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2217.1.2 by Andrew Hutchings
Fix leaks in slave result sets
257
    drizzle_result_free(&result);
2116.1.32 by David Shrewsbury
incremental
258
    return false;
259
  }
260
261
  ret= drizzle_result_buffer(&result);
262
263
  if (ret != DRIZZLE_RETURN_OK)
264
  {
2116.1.33 by David Shrewsbury
incremental
265
    _last_return= ret;
266
    _last_error_message= "Replication slave: ";
267
    _last_error_message.append(drizzle_error(&_drizzle));
268
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2116.1.32 by David Shrewsbury
incremental
269
    drizzle_result_free(&result);
270
    return false;
271
  }
272
273
  drizzle_row_t row;
274
275
  while ((row= drizzle_row_next(&result)) != NULL)
276
  {
277
    if (row[0])
278
    {
279
      list.push_back(boost::lexical_cast<uint32_t>(row[0]));
280
    }
281
    else
282
    {
2116.1.33 by David Shrewsbury
incremental
283
      _last_return= ret;
284
      _last_error_message= "Replication slave: Unexpected NULL for trx id";
285
      errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2116.1.32 by David Shrewsbury
incremental
286
      drizzle_result_free(&result);
287
      return false;
288
    }
289
  }
290
291
  drizzle_result_free(&result);
292
  return true;
293
}
294
295
296
bool QueueProducer::queueInsert(const char *trx_id,
297
                                const char *seg_id,
298
                                const char *commit_id,
2290.1.3 by Joseph Daly
slave plugin work
299
                                const char *originating_server_uuid,
300
                                const char *originating_commit_id,
2116.1.32 by David Shrewsbury
incremental
301
                                const char *msg,
302
                                const char *msg_length)
303
{
304
  message::Transaction message;
305
306
  message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
307
308
  /*
309
   * The SQL to insert our results into the local queue.
310
   */
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
311
  string sql= "INSERT INTO `sys_replication`.`queue`"
2360.1.1 by Mark Atwood
restore multi master replication
312
              " (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
2290.1.5 by Joseph Daly
slave fixes
313
              "  `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
2360.1.1 by Mark Atwood
restore multi master replication
314
  sql.append(boost::lexical_cast<string>(masterId()));
315
  sql.append(", ", 2);
2116.1.32 by David Shrewsbury
incremental
316
  sql.append(trx_id);
317
  sql.append(", ", 2);
318
  sql.append(seg_id);
319
  sql.append(", ", 2);
320
  sql.append(commit_id);
2290.1.5 by Joseph Daly
slave fixes
321
  sql.append(", '", 3);
2290.1.3 by Joseph Daly
slave plugin work
322
  sql.append(originating_server_uuid);
2290.1.5 by Joseph Daly
slave fixes
323
  sql.append("' , ", 4);
2290.1.3 by Joseph Daly
slave plugin work
324
  sql.append(originating_commit_id);
2116.1.32 by David Shrewsbury
incremental
325
  sql.append(", '", 3);
326
327
  /*
328
   * Ideally we would store the Transaction message in binary form, as it
329
   * it stored on the master and tranferred to the slave. However, we are
330
   * inserting using drizzle::Execute which doesn't really handle binary
331
   * data. Until that is changed, we store as plain text.
332
   */
333
  string message_text;
334
  google::protobuf::TextFormat::PrintToString(message, &message_text);  
335
2116.1.34 by David Shrewsbury
Working replication.
336
  /*
2116.1.46 by David Shrewsbury
Fix for bug 720819
337
   * Execution using drizzled::Execute requires some special escaping.
2116.1.34 by David Shrewsbury
Working replication.
338
   */
2116.1.32 by David Shrewsbury
incremental
339
  string::iterator it= message_text.begin();
340
  for (; it != message_text.end(); ++it)
341
  {
2116.1.46 by David Shrewsbury
Fix for bug 720819
342
    if (*it == '\"')
343
    {
344
      it= message_text.insert(it, '\\');
345
      ++it;
346
    }
347
    else if (*it == '\'')
348
    {
349
      it= message_text.insert(it, '\\');
350
      ++it;
351
      it= message_text.insert(it, '\\');
352
      ++it;
353
    }
354
    else if (*it == '\\')
355
    {
356
      it= message_text.insert(it, '\\');
357
      ++it;
358
      it= message_text.insert(it, '\\');
359
      ++it;
2116.1.34 by David Shrewsbury
Working replication.
360
      it= message_text.insert(it, '\\');
361
      ++it;
362
    }
2218.1.2 by David Shrewsbury
Escape semicolons properly in slave plugin.
363
    else if (*it == ';')
364
    {
365
      it= message_text.insert(it, '\\');
366
      ++it;  /* advance back to the semicolon */
367
    }
2116.1.32 by David Shrewsbury
incremental
368
  }
369
370
  sql.append(message_text);
371
  sql.append("')", 2);
372
2116.1.33 by David Shrewsbury
incremental
373
  vector<string> statements;
374
  statements.push_back(sql);
375
376
  if (not executeSQL(statements))
377
  {
378
    markInErrorState();
379
    return false;
380
  }
381
2116.1.51 by David Shrewsbury
Fix for bug 720886: cached max commit_id value was not being set properly causing us to pull the same message from the master more than once
382
  uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
383
  if (tmp_commit_id > _saved_max_commit_id)
384
    _saved_max_commit_id= tmp_commit_id;
385
2116.1.32 by David Shrewsbury
incremental
386
  return true;
387
}
388
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
389
2221.4.2 by David Shrewsbury
Fix ambiguity of error_t in slave plugin.
390
enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
391
{
2116.1.32 by David Shrewsbury
incremental
392
  vector<uint64_t> trx_id_list;
393
394
  if (not queryForTrxIdList(max_commit_id, trx_id_list))
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
395
    return ER_YES;
2116.1.32 by David Shrewsbury
incremental
396
2116.1.34 by David Shrewsbury
Working replication.
397
  if (trx_id_list.size() == 0)    /* nothing to get from the master */
2116.1.32 by David Shrewsbury
incremental
398
  {
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
399
    return ER_NO;
2116.1.32 by David Shrewsbury
incremental
400
  }
401
402
  /*
403
   * The SQL to pull everything we need from the master.
404
   */
2290.1.5 by Joseph Daly
slave fixes
405
  string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`,"
2290.1.3 by Joseph Daly
slave plugin work
406
              " `originating_commit_id`, `message`, `message_len` "
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
407
              " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
2116.1.32 by David Shrewsbury
incremental
408
409
  for (size_t x= 0; x < trx_id_list.size(); x++)
410
  {
411
    if (x > 0)
412
      sql.append(", ", 2);
413
    sql.append(boost::lexical_cast<string>(trx_id_list[x]));
414
  }
415
416
  sql.append(")", 1);
2270.2.1 by Joseph Daly
bug 755201 must order the select from the sys_replication log this order is later used for the insertion order
417
  sql.append(" ORDER BY `commit_id` ASC");
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
418
419
  drizzle_return_t ret;
420
  drizzle_result_st result;
2116.1.32 by David Shrewsbury
incremental
421
  drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
422
  
423
  if (ret != DRIZZLE_RETURN_OK)
424
  {
2116.1.32 by David Shrewsbury
incremental
425
    _last_return= ret;
2116.1.33 by David Shrewsbury
incremental
426
    _last_error_message= "Replication slave: ";
427
    _last_error_message.append(drizzle_error(&_drizzle));
428
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2217.1.2 by Andrew Hutchings
Fix leaks in slave result sets
429
    drizzle_result_free(&result);
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
430
    return ER_YES;
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
431
  }
432
2116.1.32 by David Shrewsbury
incremental
433
  /* TODO: Investigate 1-row-at-a-time buffering */
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
434
435
  ret= drizzle_result_buffer(&result);
436
437
  if (ret != DRIZZLE_RETURN_OK)
438
  {
2116.1.33 by David Shrewsbury
incremental
439
    _last_return= ret;
440
    _last_error_message= "Replication slave: ";
441
    _last_error_message.append(drizzle_error(&_drizzle));
442
    errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
443
    drizzle_result_free(&result);
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
444
    return ER_YES;
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
445
  }
446
447
  drizzle_row_t row;
448
2116.1.32 by David Shrewsbury
incremental
449
  while ((row= drizzle_row_next(&result)) != NULL)
450
  {
2290.1.3 by Joseph Daly
slave plugin work
451
    if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
2116.1.32 by David Shrewsbury
incremental
452
    {
453
      errmsg_printf(error::ERROR,
454
                    _("Replication slave: Unable to insert into queue."));
2217.1.2 by Andrew Hutchings
Fix leaks in slave result sets
455
      drizzle_result_free(&result);
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
456
      return ER_YES;
2116.1.32 by David Shrewsbury
incremental
457
    }
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
458
  }
459
460
  drizzle_result_free(&result);
2116.1.32 by David Shrewsbury
incremental
461
2221.4.1 by David Shrewsbury
Change producer thread to continously poll the master for replication events until no more are available, then sleep.
462
  return EE_OK;
2116.1.29 by David Shrewsbury
Initial stages of master connection
463
}
464
2116.1.33 by David Shrewsbury
incremental
465
466
void QueueProducer::setIOState(const string &err_msg, bool status)
467
{
468
  vector<string> statements;
469
  string sql;
470
  string msg(err_msg);
471
472
  if (not status)
473
  {
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
474
    sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
2116.1.33 by David Shrewsbury
incremental
475
  }
476
  else
477
  {
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
478
    sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
2116.1.33 by David Shrewsbury
incremental
479
  }
480
  
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
481
  sql.append(", `error_msg` = '", 17);
2116.1.33 by David Shrewsbury
incremental
482
483
  /* Escape embedded quotes and statement terminators */
484
  string::iterator it;
485
  for (it= msg.begin(); it != msg.end(); ++it)
486
  {
487
    if (*it == '\'')
488
    {
489
      it= msg.insert(it, '\'');
490
      ++it;  /* advance back to the quote */
491
    }
492
    else if (*it == ';')
493
    {
494
      it= msg.insert(it, '\\');
495
      ++it;  /* advance back to the semicolon */
496
    }
497
  }
498
  
499
  sql.append(msg);
2360.1.1 by Mark Atwood
restore multi master replication
500
  sql.append("' WHERE `master_id` = ");
501
  sql.append(boost::lexical_cast<string>(masterId()));
2116.1.33 by David Shrewsbury
incremental
502
503
  statements.push_back(sql);
504
  executeSQL(statements);
505
}
506
2116.1.23 by David Shrewsbury
Added empty version of producer thread
507
} /* namespace slave */