~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_consumer.cc

  • Committer: Mark Atwood
  • Date: 2011-07-06 23:13:00 UTC
  • mto: This revision was merged to the branch mainline in revision 2361.
  • Revision ID: me@mark.atwood.name-20110706231300-y49o7wu01avy1jh5
restore multi master replication

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/queue_consumer.h>
 
23
#include <drizzled/errmsg_print.h>
 
24
#include <drizzled/execute.h>
23
25
#include <drizzled/message/transaction.pb.h>
24
26
#include <drizzled/message/statement_transform.h>
25
27
#include <drizzled/sql/result_set.h>
26
 
#include <drizzled/execute.h>
27
28
#include <string>
28
29
#include <vector>
29
30
#include <boost/thread.hpp>
51
52
 
52
53
bool QueueConsumer::process()
53
54
{
 
55
  for (size_t index= 0; index < _master_ids.size(); index++)
 
56
  {
 
57
    /* We go ahead and get the string version of the master ID
 
58
     * so we don't have to keep converting it from int to string.
 
59
     */
 
60
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
 
61
 
 
62
    if (not processSingleMaster(master_id))
 
63
      return false;
 
64
  }
 
65
 
 
66
  return true;
 
67
}
 
68
 
 
69
bool QueueConsumer::processSingleMaster(const string &master_id)
 
70
{
54
71
  TrxIdList completedTransactionIds;
55
72
 
56
 
  getListOfCompletedTransactions(completedTransactionIds);
 
73
  getListOfCompletedTransactions(master_id, completedTransactionIds);
57
74
 
58
75
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
59
76
  {
68
85
    message::Transaction transaction;
69
86
    uint32_t segment_id= 1;
70
87
 
71
 
    while (getMessage(transaction, commit_id, trx_id, originating_server_uuid,
 
88
    while (getMessage(transaction, commit_id, master_id, trx_id, originating_server_uuid,
72
89
                      originating_commit_id, segment_id++))
73
90
    {
74
91
      convertToSQL(transaction, aggregate_sql, segmented_sql);
120
137
 
121
138
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, 
122
139
                                   originating_server_uuid, 
123
 
                                   originating_commit_id))
 
140
                                   originating_commit_id,
 
141
                                   master_id))
124
142
    {
125
 
      return false;
 
143
      if (_ignore_errors)
 
144
      {
 
145
        clearErrorState();
 
146
 
 
147
        /* Still need to record that we handled this trx */
 
148
        vector<string> sql;
 
149
        string tmp("UPDATE `sys_replication`.`applier_state`"
 
150
                   " SET `last_applied_commit_id` = ");
 
151
        tmp.append(commit_id);
 
152
        tmp.append(" WHERE `master_id` = ");
 
153
        tmp.append(master_id);
 
154
        sql.push_back(tmp);
 
155
        executeSQL(sql);
 
156
      }
 
157
      else
 
158
      {
 
159
        return false;
 
160
      }
126
161
    }
127
162
 
128
 
    if (not deleteFromQueue(trx_id))
 
163
    if (not deleteFromQueue(master_id, trx_id))
129
164
    {
130
165
      return false;
131
166
    }
137
172
 
138
173
bool QueueConsumer::getMessage(message::Transaction &transaction,
139
174
                              string &commit_id,
 
175
                              const string &master_id,
140
176
                              uint64_t trx_id,
141
177
                              string &originating_server_uuid,
142
178
                              uint64_t &originating_commit_id,
148
184
  sql.append(boost::lexical_cast<string>(trx_id));
149
185
  sql.append(" AND `seg_id` = ", 16);
150
186
  sql.append(boost::lexical_cast<string>(segment_id));
 
187
  sql.append(" AND `master_id` = ", 19),
 
188
  sql.append(master_id);
151
189
 
152
190
  sql::ResultSet result_set(4);
153
191
  Execute execute(*(_session.get()), true);
189
227
  return true;
190
228
}
191
229
 
192
 
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
 
230
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
 
231
                                                   TrxIdList &list)
193
232
{
194
233
  Execute execute(*(_session.get()), true);
195
234
  
196
235
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
197
236
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
198
 
             " ORDER BY `commit_order` ASC");
 
237
             " AND `master_id` = "
 
238
             + master_id
 
239
             + " ORDER BY `commit_order` ASC");
199
240
  
200
241
  /* ResultSet size must match column count */
201
242
  sql::ResultSet result_set(1);
329
370
}
330
371
 
331
372
 
 
373
/*
 
374
 * TODO: This currently updates every row in the applier_state table.
 
375
 * This use to be a single row. With multi-master support, we now need
 
376
 * a row for every master so we can track the last applied commit ID
 
377
 * value for each. Eventually, we may want multiple consumer threads,
 
378
 * so then we'd need to update each row independently.
 
379
 */
332
380
void QueueConsumer::setApplierState(const string &err_msg, bool status)
333
381
{
334
382
  vector<string> statements;
373
421
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
374
422
                                           const string &commit_id, 
375
423
                                           const string &originating_server_uuid, 
376
 
                                           uint64_t originating_commit_id)
 
424
                                           uint64_t originating_commit_id,
 
425
                                           const string &master_id)
377
426
{
378
427
  string tmp("UPDATE `sys_replication`.`applier_state`"
379
428
             " SET `last_applied_commit_id` = ");
383
432
  tmp.append("' , `originating_commit_id` = ");
384
433
  tmp.append(boost::lexical_cast<string>(originating_commit_id));
385
434
 
 
435
  tmp.append(" WHERE `master_id` = ");
 
436
  tmp.append(master_id);
 
437
 
386
438
  sql.push_back(tmp);
387
439
 
388
440
  _session->setOriginatingServerUUID(originating_server_uuid);
392
444
}
393
445
 
394
446
 
395
 
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
 
447
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
396
448
{
397
449
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
398
450
  sql.append(boost::lexical_cast<std::string>(trx_id));
399
451
 
 
452
  sql.append(" AND `master_id` = ");
 
453
  sql.append(master_id);
 
454
 
400
455
  vector<string> sql_vect;
401
456
  sql_vect.push_back(sql);
402
457