~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_consumer.cc

  • Committer: Andrew Hutchings
  • Date: 2011-03-29 20:45:43 UTC
  • mfrom: (2257 drizzle)
  • mto: (2257.1.3 build)
  • mto: This revision was merged to the branch mainline in revision 2258.
  • Revision ID: andrew@linuxjedi.co.uk-20110329204543-ssex0nuo8knncgwx
Merge with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include <config.h>
22
22
#include <plugin/slave/queue_consumer.h>
 
23
#include <drizzled/errmsg_print.h>
 
24
#include <drizzled/execute.h>
23
25
#include <drizzled/message/transaction.pb.h>
24
26
#include <drizzled/message/statement_transform.h>
25
27
#include <drizzled/sql/result_set.h>
26
 
#include <drizzled/execute.h>
27
28
#include <string>
28
29
#include <vector>
29
30
#include <boost/thread.hpp>
51
52
 
52
53
bool QueueConsumer::process()
53
54
{
 
55
  for (size_t index= 0; index < _master_ids.size(); index++)
 
56
  {
 
57
    /* We go ahead and get the string version of the master ID
 
58
     * so we don't have to keep converting it from int to string.
 
59
     */
 
60
    const string master_id= boost::lexical_cast<string>(_master_ids[index]);
 
61
 
 
62
    if (not processSingleMaster(master_id))
 
63
      return false;
 
64
  }
 
65
 
 
66
  return true;
 
67
}
 
68
 
 
69
bool QueueConsumer::processSingleMaster(const string &master_id)
 
70
{
54
71
  TrxIdList completedTransactionIds;
55
72
 
56
 
  getListOfCompletedTransactions(completedTransactionIds);
 
73
  getListOfCompletedTransactions(master_id, completedTransactionIds);
57
74
 
58
75
  for (size_t x= 0; x < completedTransactionIds.size(); x++)
59
76
  {
66
83
    message::Transaction transaction;
67
84
    uint32_t segment_id= 1;
68
85
 
69
 
    while (getMessage(transaction, commit_id, trx_id, segment_id++))
 
86
    while (getMessage(transaction, commit_id, master_id, trx_id, segment_id++))
70
87
    {
71
88
      convertToSQL(transaction, aggregate_sql, segmented_sql);
72
89
      transaction.Clear();
115
132
      }
116
133
    }
117
134
 
118
 
    if (not executeSQLWithCommitId(aggregate_sql, commit_id))
 
135
    if (not executeSQLWithCommitId(aggregate_sql, commit_id, master_id))
119
136
    {
120
 
      return false;
 
137
      if (_ignore_errors)
 
138
      {
 
139
        clearErrorState();
 
140
 
 
141
        /* Still need to record that we handled this trx */
 
142
        vector<string> sql;
 
143
        string tmp("UPDATE `sys_replication`.`applier_state`"
 
144
                   " SET `last_applied_commit_id` = ");
 
145
        tmp.append(commit_id);
 
146
        tmp.append(" WHERE `master_id` = ");
 
147
        tmp.append(master_id);
 
148
        sql.push_back(tmp);
 
149
        executeSQL(sql);
 
150
      }
 
151
      else
 
152
      {
 
153
        return false;
 
154
      }
121
155
    }
122
156
 
123
 
    if (not deleteFromQueue(trx_id))
 
157
    if (not deleteFromQueue(master_id, trx_id))
124
158
    {
125
159
      return false;
126
160
    }
132
166
 
133
167
bool QueueConsumer::getMessage(message::Transaction &transaction,
134
168
                              string &commit_id,
 
169
                              const string &master_id,
135
170
                              uint64_t trx_id,
136
171
                              uint32_t segment_id)
137
172
{
140
175
  sql.append(boost::lexical_cast<string>(trx_id));
141
176
  sql.append(" AND `seg_id` = ", 16);
142
177
  sql.append(boost::lexical_cast<string>(segment_id));
 
178
  sql.append(" AND `master_id` = ", 19),
 
179
  sql.append(master_id);
143
180
 
144
181
  sql::ResultSet result_set(2);
145
182
  Execute execute(*(_session.get()), true);
174
211
  return true;
175
212
}
176
213
 
177
 
bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
 
214
bool QueueConsumer::getListOfCompletedTransactions(const string &master_id,
 
215
                                                   TrxIdList &list)
178
216
{
179
217
  Execute execute(*(_session.get()), true);
180
218
  
181
219
  string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
182
220
             " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
183
 
             " ORDER BY `commit_order` ASC");
 
221
             " AND `master_id` = "
 
222
             + master_id
 
223
             + " ORDER BY `commit_order` ASC");
184
224
  
185
225
  /* ResultSet size must match column count */
186
226
  sql::ResultSet result_set(1);
314
354
}
315
355
 
316
356
 
 
357
/*
 
358
 * TODO: This currently updates every row in the applier_state table.
 
359
 * This use to be a single row. With multi-master support, we now need
 
360
 * a row for every master so we can track the last applied commit ID
 
361
 * value for each. Eventually, we may want multiple consumer threads,
 
362
 * so then we'd need to update each row independently.
 
363
 */
317
364
void QueueConsumer::setApplierState(const string &err_msg, bool status)
318
365
{
319
366
  vector<string> statements;
356
403
 
357
404
 
358
405
bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
359
 
                                           const string &commit_id)
 
406
                                           const string &commit_id,
 
407
                                           const string &master_id)
360
408
{
361
409
  string tmp("UPDATE `sys_replication`.`applier_state`"
362
410
             " SET `last_applied_commit_id` = ");
363
411
  tmp.append(commit_id);
 
412
  tmp.append(" WHERE `master_id` = ");
 
413
  tmp.append(master_id);
364
414
  sql.push_back(tmp);
365
415
  
366
416
  return executeSQL(sql);
367
417
}
368
418
 
369
419
 
370
 
bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
 
420
bool QueueConsumer::deleteFromQueue(const string &master_id, uint64_t trx_id)
371
421
{
372
422
  string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
373
423
  sql.append(boost::lexical_cast<std::string>(trx_id));
 
424
  sql.append(" AND `master_id` = ");
 
425
  sql.append(master_id);
374
426
 
375
427
  vector<string> sql_vect;
376
428
  sql_vect.push_back(sql);