~drizzle-trunk/drizzle/development

2116.1.20 by David Shrewsbury
Refactor design pattern
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2011 David Shrewsbury
5
 *
6
 *  This program is free software; you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation; either version 2 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
2116.1.38 by David Shrewsbury
Change include style
21
#include <config.h>
22
#include <plugin/slave/queue_consumer.h>
2360.1.1 by Mark Atwood
restore multi master replication
23
#include <drizzled/errmsg_print.h>
24
#include <drizzled/execute.h>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
25
#include <drizzled/message/transaction.pb.h>
26
#include <drizzled/message/statement_transform.h>
27
#include <drizzled/sql/result_set.h>
2116.1.20 by David Shrewsbury
Refactor design pattern
28
#include <string>
29
#include <vector>
30
#include <boost/thread.hpp>
31
#include <boost/lexical_cast.hpp>
32
#include <google/protobuf/text_format.h>
33
34
using namespace std;
35
using namespace drizzled;
36
37
namespace slave
38
{
39
40
bool QueueConsumer::init()
41
{
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
42
  setApplierState("", true);
2116.1.20 by David Shrewsbury
Refactor design pattern
43
  return true;
44
}
45
46
47
void QueueConsumer::shutdown()
48
{
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
49
  setApplierState(getErrorMessage(), false);
2116.1.20 by David Shrewsbury
Refactor design pattern
50
}
51
52
53
bool QueueConsumer::process()
54
{
2360.1.1 by Mark Atwood
restore multi master replication
55
  for (size_t index= 0; index < _master_ids.size(); index++)
56
  {
57
    /* We go ahead and get the string version of the master ID
58
     * so we don't have to keep converting it from int to string.
59
     */
60
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
61
62
    if (not processSingleMaster(master_id))
63
      return false;
64
  }
65
66
  return true;
67
}
68
69
bool QueueConsumer::processSingleMaster(const string &master_id)
70
{
2116.1.20 by David Shrewsbury
Refactor design pattern
71
  TrxIdList completedTransactionIds;
72
2360.1.1 by Mark Atwood
restore multi master replication
73
  getListOfCompletedTransactions(master_id, completedTransactionIds);
2116.1.20 by David Shrewsbury
Refactor design pattern
74
75
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
76
  {
77
    string commit_id;
2290.1.3 by Joseph Daly
slave plugin work
78
    string originating_server_uuid;
79
    uint64_t originating_commit_id= 0;
2116.1.20 by David Shrewsbury
Refactor design pattern
80
    uint64_t trx_id= completedTransactionIds[x];
81
82
    vector<string> aggregate_sql;  /* final SQL to execute */
83
    vector<string> segmented_sql;  /* carryover from segmented statements */
84
85
    message::Transaction transaction;
86
    uint32_t segment_id= 1;
87
2360.1.1 by Mark Atwood
restore multi master replication
88
    while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
2290.1.3 by Joseph Daly
slave plugin work
89
                      originating_commit_id, segment_id++))
2116.1.20 by David Shrewsbury
Refactor design pattern
90
    {
91
      convertToSQL(transaction, aggregate_sql, segmented_sql);
2116.1.33 by David Shrewsbury
incremental
92
      transaction.Clear();
2116.1.20 by David Shrewsbury
Refactor design pattern
93
    }
94
95
    /*
96
     * The last message in a transaction should always have a commit_id
97
     * value larger than 0, though other messages of the same transaction
98
     * will have commit_id = 0.
99
     */
100
    assert((not commit_id.empty()) && (commit_id != "0"));
101
    assert(segmented_sql.empty());
102
2116.1.46 by David Shrewsbury
Fix for bug 720819
103
    if (not aggregate_sql.empty())
104
    {
105
      /*
106
       * Execution using drizzled::Execute requires some special escaping.
107
       */
108
      vector<string>::iterator agg_iter;
109
      for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
110
      {
111
        string &sql= *agg_iter;
112
        string::iterator si= sql.begin();
113
        for (; si != sql.end(); ++si)
114
        {
115
          if (*si == '\"')
116
          {
117
            si= sql.insert(si, '\\');
118
            ++si;
119
          }
120
          else if (*si == '\\')
121
          {
122
            si= sql.insert(si, '\\');
123
            ++si;
124
            si= sql.insert(si, '\\');
125
            ++si;
126
            si= sql.insert(si, '\\');
127
            ++si;
128
          }
2219.2.1 by David Shrewsbury
Escape semicolons before calling Execute::run() in slave plugin.
129
          else if (*si == ';')
130
          {
131
            si= sql.insert(si, '\\');
132
            ++si;  /* advance back to the semicolon */
133
          }
2116.1.46 by David Shrewsbury
Fix for bug 720819
134
        }
135
      }
136
    }
137
2290.1.3 by Joseph Daly
slave plugin work
138
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, 
139
                                   originating_server_uuid, 
2360.1.1 by Mark Atwood
restore multi master replication
140
                                   originating_commit_id,
141
                                   master_id))
2116.1.20 by David Shrewsbury
Refactor design pattern
142
    {
2360.1.1 by Mark Atwood
restore multi master replication
143
      if (_ignore_errors)
144
      {
145
        clearErrorState();
146
147
        /* Still need to record that we handled this trx */
148
        vector<string> sql;
149
        string tmp("UPDATE `sys_replication`.`applier_state`"
150
                   " SET `last_applied_commit_id` = ");
151
        tmp.append(commit_id);
152
        tmp.append(" WHERE `master_id` = ");
153
        tmp.append(master_id);
154
        sql.push_back(tmp);
155
        executeSQL(sql);
156
      }
157
      else
158
      {
159
        return false;
160
      }
2116.1.20 by David Shrewsbury
Refactor design pattern
161
    }
162
2360.1.1 by Mark Atwood
restore multi master replication
163
    if (not deleteFromQueue(master_id, trx_id))
2116.1.20 by David Shrewsbury
Refactor design pattern
164
    {
165
      return false;
166
    }
167
  }
168
169
  return true;
170
}
171
172
173
bool QueueConsumer::getMessage(message::Transaction &transaction,
174
                              string &commit_id,
2360.1.1 by Mark Atwood
restore multi master replication
175
                              const string &master_id,
2116.1.20 by David Shrewsbury
Refactor design pattern
176
                              uint64_t trx_id,
2290.1.3 by Joseph Daly
slave plugin work
177
                              string &originating_server_uuid,
178
                              uint64_t &originating_commit_id,
2116.1.20 by David Shrewsbury
Refactor design pattern
179
                              uint32_t segment_id)
180
{
2290.1.5 by Joseph Daly
slave fixes
181
  string sql("SELECT `msg`, `commit_order`, `originating_server_uuid`, "
182
             "`originating_commit_id` FROM `sys_replication`.`queue`"
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
183
             " WHERE `trx_id` = ");
2116.1.32 by David Shrewsbury
incremental
184
  sql.append(boost::lexical_cast<string>(trx_id));
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
185
  sql.append(" AND `seg_id` = ", 16);
2116.1.32 by David Shrewsbury
incremental
186
  sql.append(boost::lexical_cast<string>(segment_id));
2360.1.1 by Mark Atwood
restore multi master replication
187
  sql.append(" AND `master_id` = ", 19),
188
  sql.append(master_id);
2116.1.20 by David Shrewsbury
Refactor design pattern
189
2290.1.3 by Joseph Daly
slave plugin work
190
  sql::ResultSet result_set(4);
2116.1.20 by David Shrewsbury
Refactor design pattern
191
  Execute execute(*(_session.get()), true);
192
  
193
  execute.run(sql, result_set);
194
  
2290.1.3 by Joseph Daly
slave plugin work
195
  assert(result_set.getMetaData().getColumnCount() == 4);
2116.1.20 by David Shrewsbury
Refactor design pattern
196
197
  /* Really should only be 1 returned row */
198
  uint32_t found_rows= 0;
199
  while (result_set.next())
200
  {
201
    string msg= result_set.getString(0);
202
    string com_id= result_set.getString(1);
2290.1.3 by Joseph Daly
slave plugin work
203
    string orig_server_uuid= result_set.getString(2);
204
    string orig_commit_id= result_set.getString(3);
2116.1.20 by David Shrewsbury
Refactor design pattern
205
206
    if ((msg == "") || (found_rows == 1))
207
      break;
208
2290.1.3 by Joseph Daly
slave plugin work
209
    /* No columns should be NULL */
2116.1.20 by David Shrewsbury
Refactor design pattern
210
    assert(result_set.isNull(0) == false);
211
    assert(result_set.isNull(1) == false);
2290.1.3 by Joseph Daly
slave plugin work
212
    assert(result_set.isNull(2) == false);
213
    assert(result_set.isNull(3) == false);
214
2116.1.20 by David Shrewsbury
Refactor design pattern
215
216
    google::protobuf::TextFormat::ParseFromString(msg, &transaction);
2116.1.33 by David Shrewsbury
incremental
217
2116.1.20 by David Shrewsbury
Refactor design pattern
218
    commit_id= com_id;
2290.1.3 by Joseph Daly
slave plugin work
219
    originating_server_uuid= orig_server_uuid;
220
    originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
2116.1.20 by David Shrewsbury
Refactor design pattern
221
    found_rows++;
222
  }
223
224
  if (found_rows == 0)
225
    return false;
226
  
227
  return true;
228
}
229
2360.1.1 by Mark Atwood
restore multi master replication
230
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
231
                                                   TrxIdList &list)
2116.1.20 by David Shrewsbury
Refactor design pattern
232
{
233
  Execute execute(*(_session.get()), true);
234
  
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
235
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
2225.4.1 by Joseph Daly
bug 731738 need to check that commit_order is greater then 0 this is needed for multi part messages
236
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
2360.1.1 by Mark Atwood
restore multi master replication
237
             " AND `master_id` = "
238
             + master_id
239
             + " ORDER BY `commit_order` ASC");
2116.1.20 by David Shrewsbury
Refactor design pattern
240
  
241
  /* ResultSet size must match column count */
242
  sql::ResultSet result_set(1);
243
244
  execute.run(sql, result_set);
245
246
  assert(result_set.getMetaData().getColumnCount() == 1);
247
248
  while (result_set.next())
249
  {
250
    assert(result_set.isNull(0) == false);
251
    string value= result_set.getString(0);
252
    
253
    /* empty string returned when no more results */
254
    if (value != "")
255
    {
256
      list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
257
    }
258
  }
259
  
260
  return true;
261
}
262
263
264
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
265
                                vector<string> &aggregate_sql,
266
                                vector<string> &segmented_sql)
267
{
268
  if (transaction.has_event())
269
    return true;
270
271
  size_t num_statements= transaction.statement_size();
272
273
  /*
274
   * Loop through all Statement messages within this Transaction and
275
   * convert each to equivalent SQL statements. Complete Statements will
276
   * be appended to aggregate_sql, while segmented Statements will remain
277
   * in segmented_sql to be appended to until completed, or rolled back.
278
   */
279
280
  for (size_t idx= 0; idx < num_statements; idx++)
281
  {
282
    const message::Statement &statement= transaction.statement(idx);
283
    
284
    /* We won't bother with executing a rolled back transaction */
285
    if (statement.type() == message::Statement::ROLLBACK)
286
    {
287
      assert(idx == (num_statements - 1));  /* should be the final Statement */
288
      aggregate_sql.clear();
289
      segmented_sql.clear();
290
      break;
291
    }
292
293
    switch (statement.type())
294
    {
295
      /* DDL cannot be in a transaction, so precede with a COMMIT */
296
      case message::Statement::TRUNCATE_TABLE:
297
      case message::Statement::CREATE_SCHEMA:
298
      case message::Statement::ALTER_SCHEMA:
299
      case message::Statement::DROP_SCHEMA:
300
      case message::Statement::CREATE_TABLE:
301
      case message::Statement::ALTER_TABLE:
302
      case message::Statement::DROP_TABLE:
2211.2.3 by David Shrewsbury
Treat RAW_SQL as DDL in the slave.
303
      case message::Statement::RAW_SQL:  /* currently ALTER TABLE or RENAME */
2116.1.20 by David Shrewsbury
Refactor design pattern
304
      {
305
        segmented_sql.push_back("COMMIT");
306
        break;
307
      }
308
309
      /* Cancel any ongoing statement */
310
      case message::Statement::ROLLBACK_STATEMENT:
311
      {
312
        segmented_sql.clear();
313
        continue;
314
      }
315
      
316
      default:
317
      {
318
        break;
319
      }
320
    }
321
322
    if (message::transformStatementToSql(statement, segmented_sql,
323
                                         message::DRIZZLE, true))
324
    {
325
      return false;
326
    }
327
328
    if (isEndStatement(statement))
329
    {
330
      aggregate_sql.insert(aggregate_sql.end(),
331
                           segmented_sql.begin(),
332
                           segmented_sql.end());
333
      segmented_sql.clear();
334
    }
335
  }
336
337
  return true;
338
}
339
340
341
bool QueueConsumer::isEndStatement(const message::Statement &statement)
342
{
343
  switch (statement.type())
344
  {
345
    case (message::Statement::INSERT):
346
    {
347
      const message::InsertData &data= statement.insert_data();
348
      if (not data.end_segment())
349
        return false;
350
      break;
351
    }
352
    case (message::Statement::UPDATE):
353
    {
354
      const message::UpdateData &data= statement.update_data();
355
      if (not data.end_segment())
356
        return false;
357
      break;
358
    }
359
    case (message::Statement::DELETE):
360
    {
361
      const message::DeleteData &data= statement.delete_data();
362
      if (not data.end_segment())
363
        return false;
364
      break;
365
    }
366
    default:
367
      return true;
368
  }
369
  return true;
370
}
371
372
2360.1.1 by Mark Atwood
restore multi master replication
373
/*
374
 * TODO: This currently updates every row in the applier_state table.
375
 * This use to be a single row. With multi-master support, we now need
376
 * a row for every master so we can track the last applied commit ID
377
 * value for each. Eventually, we may want multiple consumer threads,
378
 * so then we'd need to update each row independently.
379
 */
2116.1.20 by David Shrewsbury
Refactor design pattern
380
void QueueConsumer::setApplierState(const string &err_msg, bool status)
381
{
382
  vector<string> statements;
383
  string sql;
384
  string msg(err_msg);
385
386
  if (not status)
387
  {
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
388
    sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
2116.1.20 by David Shrewsbury
Refactor design pattern
389
  }
390
  else
391
  {
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
392
    sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
2116.1.20 by David Shrewsbury
Refactor design pattern
393
  }
394
  
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
395
  sql.append(", `error_msg` = '", 17);
2116.1.20 by David Shrewsbury
Refactor design pattern
396
397
  /* Escape embedded quotes and statement terminators */
398
  string::iterator it;
399
  for (it= msg.begin(); it != msg.end(); ++it)
400
  {
401
    if (*it == '\'')
402
    {
403
      it= msg.insert(it, '\'');
404
      ++it;  /* advance back to the quote */
405
    }
406
    else if (*it == ';')
407
    {
408
      it= msg.insert(it, '\\');
409
      ++it;  /* advance back to the semicolon */
410
    }
411
  }
412
  
413
  sql.append(msg);
2116.1.32 by David Shrewsbury
incremental
414
  sql.append("'", 1);
2116.1.20 by David Shrewsbury
Refactor design pattern
415
416
  statements.push_back(sql);
417
  executeSQL(statements);
418
}
419
420
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
421
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
2290.1.3 by Joseph Daly
slave plugin work
422
                                           const string &commit_id, 
423
                                           const string &originating_server_uuid, 
2360.1.1 by Mark Atwood
restore multi master replication
424
                                           uint64_t originating_commit_id,
425
                                           const string &master_id)
2116.1.20 by David Shrewsbury
Refactor design pattern
426
{
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
427
  string tmp("UPDATE `sys_replication`.`applier_state`"
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
428
             " SET `last_applied_commit_id` = ");
2116.1.20 by David Shrewsbury
Refactor design pattern
429
  tmp.append(commit_id);
2290.1.5 by Joseph Daly
slave fixes
430
  tmp.append(", `originating_server_uuid` = '");
2290.1.3 by Joseph Daly
slave plugin work
431
  tmp.append(originating_server_uuid);
2290.1.5 by Joseph Daly
slave fixes
432
  tmp.append("' , `originating_commit_id` = ");
2290.1.3 by Joseph Daly
slave plugin work
433
  tmp.append(boost::lexical_cast<string>(originating_commit_id));
434
2360.1.1 by Mark Atwood
restore multi master replication
435
  tmp.append(" WHERE `master_id` = ");
436
  tmp.append(master_id);
437
2116.1.20 by David Shrewsbury
Refactor design pattern
438
  sql.push_back(tmp);
2290.1.3 by Joseph Daly
slave plugin work
439
 
440
  _session->setOriginatingServerUUID(originating_server_uuid);
441
  _session->setOriginatingCommitID(originating_commit_id);
442
 
2116.1.20 by David Shrewsbury
Refactor design pattern
443
  return executeSQL(sql);
444
}
445
446
2360.1.1 by Mark Atwood
restore multi master replication
447
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
2116.1.20 by David Shrewsbury
Refactor design pattern
448
{
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
449
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
2116.1.20 by David Shrewsbury
Refactor design pattern
450
  sql.append(boost::lexical_cast<std::string>(trx_id));
451
2360.1.1 by Mark Atwood
restore multi master replication
452
  sql.append(" AND `master_id` = ");
453
  sql.append(master_id);
454
2116.1.20 by David Shrewsbury
Refactor design pattern
455
  vector<string> sql_vect;
456
  sql_vect.push_back(sql);
457
458
  return executeSQL(sql_vect);
459
}
460
461
} /* namespace slave */