~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_consumer.cc

mergeĀ lp:~hingo/drizzle/drizzle-auth_ldap-fix-and-docs

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2011 David Shrewsbury
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#include <config.h>
 
22
#include <plugin/slave/queue_consumer.h>
 
23
#include <drizzled/errmsg_print.h>
 
24
#include <drizzled/execute.h>
 
25
#include <drizzled/message/transaction.pb.h>
 
26
#include <drizzled/message/statement_transform.h>
 
27
#include <drizzled/sql/result_set.h>
 
28
#include <string>
 
29
#include <vector>
 
30
#include <boost/thread.hpp>
 
31
#include <boost/lexical_cast.hpp>
 
32
#include <google/protobuf/text_format.h>
 
33
 
 
34
using namespace std;
 
35
using namespace drizzled;
 
36
 
 
37
namespace slave
 
38
{
 
39
 
 
40
bool QueueConsumer::init()
 
41
{
 
42
  setApplierState("", true);
 
43
  return true;
 
44
}
 
45
 
 
46
 
 
47
void QueueConsumer::shutdown()
 
48
{
 
49
  setApplierState(getErrorMessage(), false);
 
50
}
 
51
 
 
52
 
 
53
bool QueueConsumer::process()
 
54
{
 
55
  for (size_t index= 0; index < _master_ids.size(); index++)
 
56
  {
 
57
    /* We go ahead and get the string version of the master ID
 
58
     * so we don't have to keep converting it from int to string.
 
59
     */
 
60
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
 
61
 
 
62
    if (not processSingleMaster(master_id))
 
63
      return false;
 
64
  }
 
65
 
 
66
  return true;
 
67
}
 
68
 
 
69
bool QueueConsumer::processSingleMaster(const string &master_id)
 
70
{
 
71
  TrxIdList completedTransactionIds;
 
72
 
 
73
  getListOfCompletedTransactions(master_id, completedTransactionIds);
 
74
 
 
75
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
 
76
  {
 
77
    string commit_id;
 
78
    string originating_server_uuid;
 
79
    uint64_t originating_commit_id= 0;
 
80
    uint64_t trx_id= completedTransactionIds[x];
 
81
 
 
82
    vector<string> aggregate_sql;  /* final SQL to execute */
 
83
    vector<string> segmented_sql;  /* carryover from segmented statements */
 
84
 
 
85
    message::Transaction transaction;
 
86
    uint32_t segment_id= 1;
 
87
 
 
88
    while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
 
89
                      originating_commit_id, segment_id++))
 
90
    {
 
91
      convertToSQL(transaction, aggregate_sql, segmented_sql);
 
92
      transaction.Clear();
 
93
    }
 
94
 
 
95
    /*
 
96
     * The last message in a transaction should always have a commit_id
 
97
     * value larger than 0, though other messages of the same transaction
 
98
     * will have commit_id = 0.
 
99
     */
 
100
    assert((not commit_id.empty()) && (commit_id != "0"));
 
101
    assert(segmented_sql.empty());
 
102
 
 
103
    if (not aggregate_sql.empty())
 
104
    {
 
105
      /*
 
106
       * Execution using drizzled::Execute requires some special escaping.
 
107
       */
 
108
      vector<string>::iterator agg_iter;
 
109
      for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
 
110
      {
 
111
        string &sql= *agg_iter;
 
112
        string::iterator si= sql.begin();
 
113
        for (; si != sql.end(); ++si)
 
114
        {
 
115
          if (*si == '\"')
 
116
          {
 
117
            si= sql.insert(si, '\\');
 
118
            ++si;
 
119
          }
 
120
          else if (*si == '\\')
 
121
          {
 
122
            si= sql.insert(si, '\\');
 
123
            ++si;
 
124
            si= sql.insert(si, '\\');
 
125
            ++si;
 
126
            si= sql.insert(si, '\\');
 
127
            ++si;
 
128
          }
 
129
          else if (*si == ';')
 
130
          {
 
131
            si= sql.insert(si, '\\');
 
132
            ++si;  /* advance back to the semicolon */
 
133
          }
 
134
        }
 
135
      }
 
136
    }
 
137
 
 
138
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, 
 
139
                                   originating_server_uuid, 
 
140
                                   originating_commit_id,
 
141
                                   master_id))
 
142
    {
 
143
      if (_ignore_errors)
 
144
      {
 
145
        clearErrorState();
 
146
 
 
147
        /* Still need to record that we handled this trx */
 
148
        vector<string> sql;
 
149
        string tmp("UPDATE `sys_replication`.`applier_state`"
 
150
                   " SET `last_applied_commit_id` = ");
 
151
        tmp.append(commit_id);
 
152
        tmp.append(" WHERE `master_id` = ");
 
153
        tmp.append(master_id);
 
154
        sql.push_back(tmp);
 
155
        executeSQL(sql);
 
156
      }
 
157
      else
 
158
      {
 
159
        return false;
 
160
      }
 
161
    }
 
162
 
 
163
    if (not deleteFromQueue(master_id, trx_id))
 
164
    {
 
165
      return false;
 
166
    }
 
167
  }
 
168
 
 
169
  return true;
 
170
}
 
171
 
 
172
 
 
173
bool QueueConsumer::getMessage(message::Transaction &transaction,
 
174
                              string &commit_id,
 
175
                              const string &master_id,
 
176
                              uint64_t trx_id,
 
177
                              string &originating_server_uuid,
 
178
                              uint64_t &originating_commit_id,
 
179
                              uint32_t segment_id)
 
180
{
 
181
  string sql("SELECT `msg`, `commit_order`, `originating_server_uuid`, "
 
182
             "`originating_commit_id` FROM `sys_replication`.`queue`"
 
183
             " WHERE `trx_id` = ");
 
184
  sql.append(boost::lexical_cast<string>(trx_id));
 
185
  sql.append(" AND `seg_id` = ", 16);
 
186
  sql.append(boost::lexical_cast<string>(segment_id));
 
187
  sql.append(" AND `master_id` = ", 19),
 
188
  sql.append(master_id);
 
189
 
 
190
  sql::ResultSet result_set(4);
 
191
  Execute execute(*(_session.get()), true);
 
192
  
 
193
  execute.run(sql, result_set);
 
194
  
 
195
  assert(result_set.getMetaData().getColumnCount() == 4);
 
196
 
 
197
  /* Really should only be 1 returned row */
 
198
  uint32_t found_rows= 0;
 
199
  while (result_set.next())
 
200
  {
 
201
    string msg= result_set.getString(0);
 
202
    string com_id= result_set.getString(1);
 
203
    string orig_server_uuid= result_set.getString(2);
 
204
    string orig_commit_id= result_set.getString(3);
 
205
 
 
206
    if ((msg == "") || (found_rows == 1))
 
207
      break;
 
208
 
 
209
    /* No columns should be NULL */
 
210
    assert(result_set.isNull(0) == false);
 
211
    assert(result_set.isNull(1) == false);
 
212
    assert(result_set.isNull(2) == false);
 
213
    assert(result_set.isNull(3) == false);
 
214
 
 
215
 
 
216
    google::protobuf::TextFormat::ParseFromString(msg, &transaction);
 
217
 
 
218
    commit_id= com_id;
 
219
    originating_server_uuid= orig_server_uuid;
 
220
    originating_commit_id= boost::lexical_cast<uint64_t>(orig_commit_id);
 
221
    found_rows++;
 
222
  }
 
223
 
 
224
  if (found_rows == 0)
 
225
    return false;
 
226
  
 
227
  return true;
 
228
}
 
229
 
 
230
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
 
231
                                                   TrxIdList &list)
 
232
{
 
233
  Execute execute(*(_session.get()), true);
 
234
  
 
235
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
 
236
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
 
237
             " AND `master_id` = "
 
238
             + master_id
 
239
             + " ORDER BY `commit_order` ASC");
 
240
  
 
241
  /* ResultSet size must match column count */
 
242
  sql::ResultSet result_set(1);
 
243
 
 
244
  execute.run(sql, result_set);
 
245
 
 
246
  assert(result_set.getMetaData().getColumnCount() == 1);
 
247
 
 
248
  while (result_set.next())
 
249
  {
 
250
    assert(result_set.isNull(0) == false);
 
251
    string value= result_set.getString(0);
 
252
    
 
253
    /* empty string returned when no more results */
 
254
    if (value != "")
 
255
    {
 
256
      list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
 
257
    }
 
258
  }
 
259
  
 
260
  return true;
 
261
}
 
262
 
 
263
 
 
264
bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
 
265
                                vector<string> &aggregate_sql,
 
266
                                vector<string> &segmented_sql)
 
267
{
 
268
  if (transaction.has_event())
 
269
    return true;
 
270
 
 
271
  size_t num_statements= transaction.statement_size();
 
272
 
 
273
  /*
 
274
   * Loop through all Statement messages within this Transaction and
 
275
   * convert each to equivalent SQL statements. Complete Statements will
 
276
   * be appended to aggregate_sql, while segmented Statements will remain
 
277
   * in segmented_sql to be appended to until completed, or rolled back.
 
278
   */
 
279
 
 
280
  for (size_t idx= 0; idx < num_statements; idx++)
 
281
  {
 
282
    const message::Statement &statement= transaction.statement(idx);
 
283
    
 
284
    /* We won't bother with executing a rolled back transaction */
 
285
    if (statement.type() == message::Statement::ROLLBACK)
 
286
    {
 
287
      assert(idx == (num_statements - 1));  /* should be the final Statement */
 
288
      aggregate_sql.clear();
 
289
      segmented_sql.clear();
 
290
      break;
 
291
    }
 
292
 
 
293
    switch (statement.type())
 
294
    {
 
295
      /* DDL cannot be in a transaction, so precede with a COMMIT */
 
296
      case message::Statement::TRUNCATE_TABLE:
 
297
      case message::Statement::CREATE_SCHEMA:
 
298
      case message::Statement::ALTER_SCHEMA:
 
299
      case message::Statement::DROP_SCHEMA:
 
300
      case message::Statement::CREATE_TABLE:
 
301
      case message::Statement::ALTER_TABLE:
 
302
      case message::Statement::DROP_TABLE:
 
303
      case message::Statement::RAW_SQL:  /* currently ALTER TABLE or RENAME */
 
304
      {
 
305
        segmented_sql.push_back("COMMIT");
 
306
        break;
 
307
      }
 
308
 
 
309
      /* Cancel any ongoing statement */
 
310
      case message::Statement::ROLLBACK_STATEMENT:
 
311
      {
 
312
        segmented_sql.clear();
 
313
        continue;
 
314
      }
 
315
      
 
316
      default:
 
317
      {
 
318
        break;
 
319
      }
 
320
    }
 
321
 
 
322
    if (message::transformStatementToSql(statement, segmented_sql,
 
323
                                         message::DRIZZLE, true))
 
324
    {
 
325
      return false;
 
326
    }
 
327
 
 
328
    if (isEndStatement(statement))
 
329
    {
 
330
      aggregate_sql.insert(aggregate_sql.end(),
 
331
                           segmented_sql.begin(),
 
332
                           segmented_sql.end());
 
333
      segmented_sql.clear();
 
334
    }
 
335
  }
 
336
 
 
337
  return true;
 
338
}
 
339
 
 
340
 
 
341
bool QueueConsumer::isEndStatement(const message::Statement &statement)
 
342
{
 
343
  switch (statement.type())
 
344
  {
 
345
    case (message::Statement::INSERT):
 
346
    {
 
347
      const message::InsertData &data= statement.insert_data();
 
348
      if (not data.end_segment())
 
349
        return false;
 
350
      break;
 
351
    }
 
352
    case (message::Statement::UPDATE):
 
353
    {
 
354
      const message::UpdateData &data= statement.update_data();
 
355
      if (not data.end_segment())
 
356
        return false;
 
357
      break;
 
358
    }
 
359
    case (message::Statement::DELETE):
 
360
    {
 
361
      const message::DeleteData &data= statement.delete_data();
 
362
      if (not data.end_segment())
 
363
        return false;
 
364
      break;
 
365
    }
 
366
    default:
 
367
      return true;
 
368
  }
 
369
  return true;
 
370
}
 
371
 
 
372
 
 
373
/*
 
374
 * TODO: This currently updates every row in the applier_state table.
 
375
 * This use to be a single row. With multi-master support, we now need
 
376
 * a row for every master so we can track the last applied commit ID
 
377
 * value for each. Eventually, we may want multiple consumer threads,
 
378
 * so then we'd need to update each row independently.
 
379
 */
 
380
void QueueConsumer::setApplierState(const string &err_msg, bool status)
 
381
{
 
382
  vector<string> statements;
 
383
  string sql;
 
384
  string msg(err_msg);
 
385
 
 
386
  if (not status)
 
387
  {
 
388
    sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
 
389
  }
 
390
  else
 
391
  {
 
392
    sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
 
393
  }
 
394
  
 
395
  sql.append(", `error_msg` = '", 17);
 
396
 
 
397
  /* Escape embedded quotes and statement terminators */
 
398
  string::iterator it;
 
399
  for (it= msg.begin(); it != msg.end(); ++it)
 
400
  {
 
401
    if (*it == '\'')
 
402
    {
 
403
      it= msg.insert(it, '\'');
 
404
      ++it;  /* advance back to the quote */
 
405
    }
 
406
    else if (*it == ';')
 
407
    {
 
408
      it= msg.insert(it, '\\');
 
409
      ++it;  /* advance back to the semicolon */
 
410
    }
 
411
  }
 
412
  
 
413
  sql.append(msg);
 
414
  sql.append("'", 1);
 
415
 
 
416
  statements.push_back(sql);
 
417
  executeSQL(statements);
 
418
}
 
419
 
 
420
 
 
421
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
 
422
                                           const string &commit_id, 
 
423
                                           const string &originating_server_uuid, 
 
424
                                           uint64_t originating_commit_id,
 
425
                                           const string &master_id)
 
426
{
 
427
  string tmp("UPDATE `sys_replication`.`applier_state`"
 
428
             " SET `last_applied_commit_id` = ");
 
429
  tmp.append(commit_id);
 
430
  tmp.append(", `originating_server_uuid` = '");
 
431
  tmp.append(originating_server_uuid);
 
432
  tmp.append("' , `originating_commit_id` = ");
 
433
  tmp.append(boost::lexical_cast<string>(originating_commit_id));
 
434
 
 
435
  tmp.append(" WHERE `master_id` = ");
 
436
  tmp.append(master_id);
 
437
 
 
438
  sql.push_back(tmp);
 
439
 
 
440
  _session->setOriginatingServerUUID(originating_server_uuid);
 
441
  _session->setOriginatingCommitID(originating_commit_id);
 
442
 
 
443
  return executeSQL(sql);
 
444
}
 
445
 
 
446
 
 
447
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
 
448
{
 
449
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
 
450
  sql.append(boost::lexical_cast<std::string>(trx_id));
 
451
 
 
452
  sql.append(" AND `master_id` = ");
 
453
  sql.append(master_id);
 
454
 
 
455
  vector<string> sql_vect;
 
456
  sql_vect.push_back(sql);
 
457
 
 
458
  return executeSQL(sql_vect);
 
459
}
 
460
 
 
461
} /* namespace slave */