~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_manager.cc

  • Committer: David Shrewsbury
  • Date: 2011-02-09 16:20:32 UTC
  • mto: (2207.1.1 build) (2208.1.3 build)
  • mto: This revision was merged to the branch mainline in revision 2202.
  • Revision ID: shrewsbury.dave@gmail.com-20110209162032-elqexiuvppb4savj
Various fixes

Show diffs side-by-side

added added

removed removed

Lines of Context:
59
59
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
60
60
 
61
61
  /* setup a Session object */
62
 
  Session::shared_ptr session= Session::make_shared(plugin::Listen::getNullClient(),
63
 
                                                    catalog::local());
 
62
  _session= Session::make_shared(plugin::Listen::getNullClient(),
 
63
                                 catalog::local());
64
64
  identifier::User::shared_ptr user= identifier::User::make_shared();
65
65
  user->setUser("slave");
66
 
  session->setUser(user);
67
 
  session->set_db(getSchema());
 
66
  _session->setUser(user);
 
67
  _session->set_db(getSchema());
68
68
 
69
 
  createApplierSchemaAndTables(*(session.get()));
 
69
  if (not createApplierSchemaAndTables())
 
70
    return;
70
71
 
71
72
  uint64_t trx_id= 0;
72
73
 
78
79
 
79
80
      TrxIdList completedTransactionIds;
80
81
 
81
 
      getListOfCompletedTransactions(*(session.get()), completedTransactionIds);
 
82
      getListOfCompletedTransactions(completedTransactionIds);
82
83
 
83
84
      for (size_t x= 0; x < completedTransactionIds.size(); x++)
84
85
      {
91
92
        message::Transaction transaction;
92
93
        uint32_t segment_id= 1;
93
94
 
94
 
        while (getMessage(*(session.get()), transaction, commit_id,
95
 
                          trx_id, segment_id++))
 
95
        while (getMessage(transaction, commit_id, trx_id, segment_id++))
96
96
        {
97
97
          convertToSQL(transaction, aggregate_sql, segmented_sql);
98
98
        }
103
103
         * will have commit_id = 0.
104
104
         */
105
105
        assert((not commit_id.empty()) && (commit_id != "0"));
106
 
 
107
106
        assert(segmented_sql.empty());
108
107
 
109
108
        if (not aggregate_sql.empty())
110
109
        {
111
 
          executeSQL(*(session.get()), aggregate_sql, commit_id);
 
110
          if (not executeSQL(aggregate_sql, commit_id))
 
111
          {
 
112
            /* TODO: Handle errors better. For now, just shutdown the slave. */
 
113
            return;
 
114
          }
112
115
        }
113
116
 
114
 
        deleteFromQueue(*(session.get()), trx_id);      
 
117
        deleteFromQueue(trx_id);      
115
118
      }
116
119
    }
117
120
    
128
131
}
129
132
 
130
133
 
131
 
bool QueueManager::createApplierSchemaAndTables(Session &session)
 
134
bool QueueManager::createApplierSchemaAndTables()
132
135
{
133
136
  vector<string> sql;
134
 
  
 
137
 
135
138
  sql.push_back("COMMIT");
136
139
  sql.push_back("CREATE SCHEMA IF NOT EXISTS replication");
137
140
  
138
 
  if (not executeSQL(session, sql))
 
141
  if (not executeSQL(sql))
139
142
    return false;
140
143
  
141
144
  sql.clear();
142
145
  sql.push_back("COMMIT");
143
146
  sql.push_back("CREATE TABLE IF NOT EXISTS replication.applier_state"
144
 
                " (last_applied_commit_id BIGINT NOT NULL PRIMARY KEY)");
145
 
  
146
 
  /* TODO: code here
147
 
   SELECT COUNT(*) FROM replication.applier_state
148
 
   if (count == 0)
149
 
   INSERT INTO replication.applier_state VALUES (0)
150
 
   */
151
 
  
152
 
  if (not executeSQL(session, sql))
 
147
                " (last_applied_commit_id BIGINT NOT NULL PRIMARY KEY,"
 
148
                "  status VARCHAR(20) NOT NULL,"
 
149
                "  error_msg VARCHAR(250))");
 
150
  
 
151
  if (not executeSQL(sql))
153
152
    return false;
 
153
 
 
154
  sql.clear();
 
155
  sql.push_back("SELECT COUNT(*) FROM replication.applier_state");
154
156
  
 
157
  sql::ResultSet result_set(1);
 
158
  Execute execute(*(_session.get()), true);
 
159
  execute.run(sql[0], result_set);
 
160
  result_set.next();
 
161
  string count= result_set.getString(0);
 
162
 
 
163
  if (count == "0")
 
164
  {
 
165
    sql.clear();
 
166
    sql.push_back("INSERT INTO replication.applier_state"
 
167
                  " (last_applied_commit_id, status)"
 
168
                  " VALUES (0, 'RUNNING')");
 
169
    if (not executeSQL(sql))
 
170
      return false;
 
171
  }
 
172
 
155
173
  return true;
156
174
}  
157
175
  
158
176
  
159
 
bool QueueManager::getMessage(Session &session,
160
 
                              message::Transaction &transaction,
 
177
bool QueueManager::getMessage(message::Transaction &transaction,
161
178
                              string &commit_id,
162
179
                              uint64_t trx_id,
163
180
                              uint32_t segment_id)
173
190
 
174
191
  //printf("%s\n", sql.c_str()); fflush(stdout);
175
192
  sql::ResultSet result_set(2);
176
 
  Execute execute(session, true);
 
193
  Execute execute(*(_session.get()), true);
177
194
  
178
 
  execute.run(sql, &result_set);
 
195
  execute.run(sql, result_set);
179
196
  
180
197
  assert(result_set.getMetaData().getColumnCount() == 2);
181
198
 
206
223
  return true;
207
224
}
208
225
 
209
 
bool QueueManager::getListOfCompletedTransactions(Session &session,
210
 
                                                  TrxIdList &list)
 
226
bool QueueManager::getListOfCompletedTransactions(TrxIdList &list)
211
227
{
212
 
  Execute execute(session, true);
 
228
  Execute execute(*(_session.get()), true);
213
229
  
214
230
  string sql("SELECT trx_id FROM ");
215
231
  sql.append(getSchema());
220
236
  /* ResultSet size must match column count */
221
237
  sql::ResultSet result_set(1);
222
238
 
223
 
  execute.run(sql, &result_set);
 
239
  execute.run(sql, result_set);
224
240
 
225
241
  assert(result_set.getMetaData().getColumnCount() == 1);
226
242
 
361
377
  return true;
362
378
}
363
379
 
364
 
bool QueueManager::executeSQL(Session &session,
365
 
                              vector<string> &sql,
 
380
 
 
381
void QueueManager::setApplierState(const string &err_msg, bool status)
 
382
{
 
383
  vector<string> statements;
 
384
  string sql;
 
385
  string msg(err_msg);
 
386
 
 
387
  if (not status)
 
388
  {
 
389
    sql= "UPDATE replication.applier_state SET status = 'STOPPED'";
 
390
  }
 
391
  else
 
392
  {
 
393
    sql= "UPDATE replication.applier_state SET status = 'RUNNING'";
 
394
  }
 
395
  
 
396
  sql.append(", error_msg = '");
 
397
 
 
398
  /* Escape embedded quotes and statement terminators */
 
399
  string::iterator it;
 
400
  for (it= msg.begin(); it != msg.end(); ++it)
 
401
  {
 
402
    if (*it == '\'')
 
403
    {
 
404
      it= msg.insert(it, '\'');
 
405
      ++it;  /* advance back to the quote */
 
406
    }
 
407
    else if (*it == ';')
 
408
    {
 
409
      it= msg.insert(it, '\\');
 
410
      ++it;  /* advance back to the semicolon */
 
411
    }
 
412
  }
 
413
  
 
414
  sql.append(msg);
 
415
  sql.append("'");
 
416
 
 
417
  statements.push_back(sql);
 
418
  executeSQL(statements);
 
419
}
 
420
 
 
421
 
 
422
bool QueueManager::executeSQL(vector<string> &sql,
366
423
                              const string &commit_id)
367
424
{
368
425
  string tmp("UPDATE replication.applier_state"
369
426
             " SET last_applied_commit_id = ");
370
427
  tmp.append(commit_id);
371
428
  sql.push_back(tmp);
372
 
 
373
 
  return executeSQL(session, sql);
 
429
  
 
430
  return executeSQL(sql);
374
431
}
375
432
 
376
433
 
377
 
bool QueueManager::executeSQL(Session &session,
378
 
                              vector<string> &sql)
 
434
bool QueueManager::executeSQL(vector<string> &sql)
379
435
{
380
436
  string combined_sql;
381
437
 
382
 
  Execute execute(session, true);
 
438
  Execute execute(*(_session.get()), true);
383
439
 
384
440
  vector<string>::iterator iter= sql.begin();
385
441
 
392
448
 
393
449
  printf("execute: %s\n", combined_sql.c_str()); fflush(stdout);
394
450
 
 
451
  sql::ResultSet result_set(1);
 
452
 
395
453
  /* Execute wraps the SQL to run within a transaction */
396
 
  execute.run(combined_sql);
 
454
  execute.run(combined_sql, result_set);
 
455
 
 
456
  sql::Exception exception= result_set.getException();
 
457
  
 
458
  drizzled::error_t err= exception.getErrorCode();
 
459
 
 
460
  if ((err != drizzled::EE_OK) && (err != drizzled::ER_EMPTY_QUERY))
 
461
  {
 
462
    /* avoid recursive errors from setApplierState() */
 
463
    if (_in_error_state)
 
464
      return true;
 
465
 
 
466
    _in_error_state= true;
 
467
    string err_msg("(SQLSTATE ");
 
468
    err_msg.append(exception.getSQLState());
 
469
    err_msg.append(") ");
 
470
    err_msg.append(exception.getErrorMessage());
 
471
 
 
472
    std::cerr << err_msg << std::endl;
 
473
    std::cerr << "Slave failed while executing:\n";
 
474
    for (size_t y= 0; y < sql.size(); y++)
 
475
      std::cerr << sql[y] << std::endl;
 
476
 
 
477
    setApplierState(err_msg, false);
 
478
    return false;
 
479
  }
397
480
 
398
481
  return true;
399
482
}
400
483
 
401
484
 
402
 
bool QueueManager::deleteFromQueue(Session &session, uint64_t trx_id)
 
485
bool QueueManager::deleteFromQueue(uint64_t trx_id)
403
486
{
404
487
  string sql("DELETE FROM ");
405
488
  sql.append(getSchema());
411
494
  vector<string> sql_vect;
412
495
  sql_vect.push_back(sql);
413
496
 
414
 
  return executeSQL(session, sql_vect);
 
497
  return executeSQL(sql_vect);
415
498
}
416
499
 
417
500
} /* namespace slave */