~drizzle-trunk/drizzle/development

2116.1.20 by David Shrewsbury
Refactor design pattern
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2011 David Shrewsbury
5
 *
6
 *  This program is free software; you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation; either version 2 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
2116.1.38 by David Shrewsbury
Change include style
21
#include <config.h>
22
#include <plugin/slave/queue_consumer.h>
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
23
#include <drizzled/errmsg_print.h>
24
#include <drizzled/execute.h>
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
25
#include <drizzled/message/transaction.pb.h>
26
#include <drizzled/message/statement_transform.h>
27
#include <drizzled/sql/result_set.h>
2116.1.20 by David Shrewsbury
Refactor design pattern
28
#include <string>
29
#include <vector>
30
#include <boost/thread.hpp>
31
#include <boost/lexical_cast.hpp>
32
#include <google/protobuf/text_format.h>
33
34
using namespace std;
35
using namespace drizzled;
36
37
namespace slave
38
{
39
40
bool QueueConsumer::init()
41
{
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
42
  setApplierState("", true);
2116.1.20 by David Shrewsbury
Refactor design pattern
43
  return true;
44
}
45
46
47
void QueueConsumer::shutdown()
48
{
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
49
  setApplierState(getErrorMessage(), false);
2116.1.20 by David Shrewsbury
Refactor design pattern
50
}
51
52
53
bool QueueConsumer::process()
54
{
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
55
  for (size_t index= 0; index < _master_ids.size(); index++)
56
  {
57
    /* We go ahead and get the string version of the master ID
58
     * so we don't have to keep converting it from int to string.
59
     */
60
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
61
62
    if (not processSingleMaster(master_id))
63
      return false;
64
  }
65
66
  return true;
67
}
68
69
bool QueueConsumer::processSingleMaster(const string &master_id)
70
{
2116.1.20 by David Shrewsbury
Refactor design pattern
71
  TrxIdList completedTransactionIds;
72
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
73
  getListOfCompletedTransactions(master_id, completedTransactionIds);
2116.1.20 by David Shrewsbury
Refactor design pattern
74
75
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
76
  {
77
    string commit_id;
78
    uint64_t trx_id= completedTransactionIds[x];
79
80
    vector<string> aggregate_sql;  /* final SQL to execute */
81
    vector<string> segmented_sql;  /* carryover from segmented statements */
82
83
    message::Transaction transaction;
84
    uint32_t segment_id= 1;
85
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
86
    while (getMessage(transaction, commit_id, master_id, trx_id, segment_id++))
2116.1.20 by David Shrewsbury
Refactor design pattern
87
    {
88
      convertToSQL(transaction, aggregate_sql, segmented_sql);
2116.1.33 by David Shrewsbury
incremental
89
      transaction.Clear();
2116.1.20 by David Shrewsbury
Refactor design pattern
90
    }
91
92
    /*
93
     * The last message in a transaction should always have a commit_id
94
     * value larger than 0, though other messages of the same transaction
95
     * will have commit_id = 0.
96
     */
97
    assert((not commit_id.empty()) && (commit_id != "0"));
98
    assert(segmented_sql.empty());
99
2116.1.46 by David Shrewsbury
Fix for bug 720819
100
    if (not aggregate_sql.empty())
101
    {
102
      /*
103
       * Execution using drizzled::Execute requires some special escaping.
104
       */
105
      vector<string>::iterator agg_iter;
106
      for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
107
      {
108
        string &sql= *agg_iter;
109
        string::iterator si= sql.begin();
110
        for (; si != sql.end(); ++si)
111
        {
112
          if (*si == '\"')
113
          {
114
            si= sql.insert(si, '\\');
115
            ++si;
116
          }
117
          else if (*si == '\\')
118
          {
119
            si= sql.insert(si, '\\');
120
            ++si;
121
            si= sql.insert(si, '\\');
122
            ++si;
123
            si= sql.insert(si, '\\');
124
            ++si;
125
          }
2219.2.1 by David Shrewsbury
Escape semicolons before calling Execute::run() in slave plugin.
126
          else if (*si == ';')
127
          {
128
            si= sql.insert(si, '\\');
129
            ++si;  /* advance back to the semicolon */
130
          }
2116.1.46 by David Shrewsbury
Fix for bug 720819
131
        }
132
      }
133
    }
134
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
135
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id))
2116.1.20 by David Shrewsbury
Refactor design pattern
136
    {
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
137
      if (_ignore_errors)
138
      {
139
        clearErrorState();
140
141
        /* Still need to record that we handled this trx */
142
        vector<string> sql;
143
        string tmp("UPDATE `sys_replication`.`applier_state`"
144
                   " SET `last_applied_commit_id` = ");
145
        tmp.append(commit_id);
146
        tmp.append(" WHERE `master_id` = ");
147
        tmp.append(master_id);
148
        sql.push_back(tmp);
149
        executeSQL(sql);
150
      }
151
      else
152
      {
153
        return false;
154
      }
2116.1.20 by David Shrewsbury
Refactor design pattern
155
    }
156
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
157
    if (not deleteFromQueue(master_id, trx_id))
2116.1.20 by David Shrewsbury
Refactor design pattern
158
    {
159
      return false;
160
    }
161
  }
162
163
  return true;
164
}
165
166
167
bool QueueConsumer::getMessage(message::Transaction &transaction,
168
                              string &commit_id,
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
169
                              const string &master_id,
2116.1.20 by David Shrewsbury
Refactor design pattern
170
                              uint64_t trx_id,
171
                              uint32_t segment_id)
172
{
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
173
  string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
174
             " WHERE `trx_id` = ");
2116.1.32 by David Shrewsbury
incremental
175
  sql.append(boost::lexical_cast<string>(trx_id));
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
176
  sql.append(" AND `seg_id` = ", 16);
2116.1.32 by David Shrewsbury
incremental
177
  sql.append(boost::lexical_cast<string>(segment_id));
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
178
  sql.append(" AND `master_id` = ", 19),
179
  sql.append(master_id);
2116.1.20 by David Shrewsbury
Refactor design pattern
180
181
  sql::ResultSet result_set(2);
182
  Execute execute(*(_session.get()), true);
183
  
184
  execute.run(sql, result_set);
185
  
186
  assert(result_set.getMetaData().getColumnCount() == 2);
187
188
  /* Really should only be 1 returned row */
189
  uint32_t found_rows= 0;
190
  while (result_set.next())
191
  {
192
    string msg= result_set.getString(0);
193
    string com_id= result_set.getString(1);
194
195
    if ((msg == "") || (found_rows == 1))
196
      break;
197
198
    /* Neither column should be NULL */
199
    assert(result_set.isNull(0) == false);
200
    assert(result_set.isNull(1) == false);
201
202
    google::protobuf::TextFormat::ParseFromString(msg, &transaction);
2116.1.33 by David Shrewsbury
incremental
203
2116.1.20 by David Shrewsbury
Refactor design pattern
204
    commit_id= com_id;
205
    found_rows++;
206
  }
207
208
  if (found_rows == 0)
209
    return false;
210
  
211
  return true;
212
}
213
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
214
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
215
                                                   TrxIdList &list)
2116.1.20 by David Shrewsbury
Refactor design pattern
216
{
217
  Execute execute(*(_session.get()), true);
218
  
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
219
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
2225.4.1 by Joseph Daly
bug 731738 need to check that commit_order is greater then 0 this is needed for multi part messages
220
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
221
             " AND `master_id` = "
222
             + master_id
223
             + " ORDER BY `commit_order` ASC");
2116.1.20 by David Shrewsbury
Refactor design pattern
224
  
225
  /* ResultSet size must match column count */
226
  sql::ResultSet result_set(1);
227
228
  execute.run(sql, result_set);
229
230
  assert(result_set.getMetaData().getColumnCount() == 1);
231
232
  while (result_set.next())
233
  {
234
    assert(result_set.isNull(0) == false);
235
    string value= result_set.getString(0);
236
    
237
    /* empty string returned when no more results */
238
    if (value != "")
239
    {
240
      list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
241
    }
242
  }
243
  
244
  return true;
245
}
246
247
248
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
249
                                vector<string> &aggregate_sql,
250
                                vector<string> &segmented_sql)
251
{
252
  if (transaction.has_event())
253
    return true;
254
255
  size_t num_statements= transaction.statement_size();
256
257
  /*
258
   * Loop through all Statement messages within this Transaction and
259
   * convert each to equivalent SQL statements. Complete Statements will
260
   * be appended to aggregate_sql, while segmented Statements will remain
261
   * in segmented_sql to be appended to until completed, or rolled back.
262
   */
263
264
  for (size_t idx= 0; idx < num_statements; idx++)
265
  {
266
    const message::Statement &statement= transaction.statement(idx);
267
    
268
    /* We won't bother with executing a rolled back transaction */
269
    if (statement.type() == message::Statement::ROLLBACK)
270
    {
271
      assert(idx == (num_statements - 1));  /* should be the final Statement */
272
      aggregate_sql.clear();
273
      segmented_sql.clear();
274
      break;
275
    }
276
277
    switch (statement.type())
278
    {
279
      /* DDL cannot be in a transaction, so precede with a COMMIT */
280
      case message::Statement::TRUNCATE_TABLE:
281
      case message::Statement::CREATE_SCHEMA:
282
      case message::Statement::ALTER_SCHEMA:
283
      case message::Statement::DROP_SCHEMA:
284
      case message::Statement::CREATE_TABLE:
285
      case message::Statement::ALTER_TABLE:
286
      case message::Statement::DROP_TABLE:
2211.2.3 by David Shrewsbury
Treat RAW_SQL as DDL in the slave.
287
      case message::Statement::RAW_SQL:  /* currently ALTER TABLE or RENAME */
2116.1.20 by David Shrewsbury
Refactor design pattern
288
      {
289
        segmented_sql.push_back("COMMIT");
290
        break;
291
      }
292
293
      /* Cancel any ongoing statement */
294
      case message::Statement::ROLLBACK_STATEMENT:
295
      {
296
        segmented_sql.clear();
297
        continue;
298
      }
299
      
300
      default:
301
      {
302
        break;
303
      }
304
    }
305
306
    if (message::transformStatementToSql(statement, segmented_sql,
307
                                         message::DRIZZLE, true))
308
    {
309
      return false;
310
    }
311
312
    if (isEndStatement(statement))
313
    {
314
      aggregate_sql.insert(aggregate_sql.end(),
315
                           segmented_sql.begin(),
316
                           segmented_sql.end());
317
      segmented_sql.clear();
318
    }
319
  }
320
321
  return true;
322
}
323
324
325
bool QueueConsumer::isEndStatement(const message::Statement &statement)
326
{
327
  switch (statement.type())
328
  {
329
    case (message::Statement::INSERT):
330
    {
331
      const message::InsertData &data= statement.insert_data();
332
      if (not data.end_segment())
333
        return false;
334
      break;
335
    }
336
    case (message::Statement::UPDATE):
337
    {
338
      const message::UpdateData &data= statement.update_data();
339
      if (not data.end_segment())
340
        return false;
341
      break;
342
    }
343
    case (message::Statement::DELETE):
344
    {
345
      const message::DeleteData &data= statement.delete_data();
346
      if (not data.end_segment())
347
        return false;
348
      break;
349
    }
350
    default:
351
      return true;
352
  }
353
  return true;
354
}
355
356
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
357
/*
358
 * TODO: This currently updates every row in the applier_state table.
359
 * This use to be a single row. With multi-master support, we now need
360
 * a row for every master so we can track the last applied commit ID
361
 * value for each. Eventually, we may want multiple consumer threads,
362
 * so then we'd need to update each row independently.
363
 */
2116.1.20 by David Shrewsbury
Refactor design pattern
364
void QueueConsumer::setApplierState(const string &err_msg, bool status)
365
{
366
  vector<string> statements;
367
  string sql;
368
  string msg(err_msg);
369
370
  if (not status)
371
  {
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
372
    sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
2116.1.20 by David Shrewsbury
Refactor design pattern
373
  }
374
  else
375
  {
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
376
    sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
2116.1.20 by David Shrewsbury
Refactor design pattern
377
  }
378
  
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
379
  sql.append(", `error_msg` = '", 17);
2116.1.20 by David Shrewsbury
Refactor design pattern
380
381
  /* Escape embedded quotes and statement terminators */
382
  string::iterator it;
383
  for (it= msg.begin(); it != msg.end(); ++it)
384
  {
385
    if (*it == '\'')
386
    {
387
      it= msg.insert(it, '\'');
388
      ++it;  /* advance back to the quote */
389
    }
390
    else if (*it == ';')
391
    {
392
      it= msg.insert(it, '\\');
393
      ++it;  /* advance back to the semicolon */
394
    }
395
  }
396
  
397
  sql.append(msg);
2116.1.32 by David Shrewsbury
incremental
398
  sql.append("'", 1);
2116.1.20 by David Shrewsbury
Refactor design pattern
399
400
  statements.push_back(sql);
401
  executeSQL(statements);
402
}
403
404
2116.1.31 by David Shrewsbury
Major refactor of common functionality into new classes.
405
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
406
                                           const string &commit_id,
407
                                           const string &master_id)
2116.1.20 by David Shrewsbury
Refactor design pattern
408
{
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
409
  string tmp("UPDATE `sys_replication`.`applier_state`"
2116.1.40 by David Shrewsbury
Add quotes around schema and table names (replication is now a reserved word)
410
             " SET `last_applied_commit_id` = ");
2116.1.20 by David Shrewsbury
Refactor design pattern
411
  tmp.append(commit_id);
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
412
  tmp.append(" WHERE `master_id` = ");
413
  tmp.append(master_id);
2116.1.20 by David Shrewsbury
Refactor design pattern
414
  sql.push_back(tmp);
415
  
416
  return executeSQL(sql);
417
}
418
419
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
420
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
2116.1.20 by David Shrewsbury
Refactor design pattern
421
{
2116.1.52 by David Shrewsbury
Changed slave services dedicated schema name from 'replication' to 'sys_replication' to avoid use of reserved word.
422
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
2116.1.20 by David Shrewsbury
Refactor design pattern
423
  sql.append(boost::lexical_cast<std::string>(trx_id));
2239.7.1 by David Shrewsbury
Initial beta version of multi-master code.
424
  sql.append(" AND `master_id` = ");
425
  sql.append(master_id);
2116.1.20 by David Shrewsbury
Refactor design pattern
426
427
  vector<string> sql_vect;
428
  sql_vect.push_back(sql);
429
430
  return executeSQL(sql_vect);
431
}
432
433
} /* namespace slave */