~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/slave/queue_manager.cc

  • Committer: David Shrewsbury
  • Date: 2011-02-09 01:11:11 UTC
  • mto: (2207.1.1 build) (2208.1.3 build)
  • mto: This revision was merged to the branch mainline in revision 2202.
  • Revision ID: shrewsbury.dave@gmail.com-20110209011111-md35xdxy23mhv678
Incremental check - doesn't work. Adds replication state tables.

Show diffs side-by-side

added added

removed removed

Lines of Context:
32
32
#include <vector>
33
33
#include <boost/thread.hpp>
34
34
#include <boost/lexical_cast.hpp>
 
35
#include <google/protobuf/text_format.h>
35
36
 
36
37
using namespace std;
37
38
using namespace drizzled;
65
66
  session->setUser(user);
66
67
  session->set_db(getSchema());
67
68
 
 
69
  createApplierSchemaAndTables(*(session.get()));
 
70
 
68
71
  uint64_t trx_id= 0;
69
72
 
70
73
  while (1)
79
82
 
80
83
      for (size_t x= 0; x < completedTransactionIds.size(); x++)
81
84
      {
 
85
        string commit_id;
82
86
        trx_id= completedTransactionIds[x];
83
87
 
84
88
        vector<string> aggregate_sql;  /* final SQL to execute */
87
91
        message::Transaction transaction;
88
92
        uint32_t segment_id= 1;
89
93
 
90
 
        while (getMessage(*(session.get()), transaction, trx_id, segment_id++))
 
94
        while (getMessage(*(session.get()), transaction, commit_id,
 
95
                          trx_id, segment_id++))
 
96
        {
91
97
          convertToSQL(transaction, aggregate_sql, segmented_sql);
 
98
        }
 
99
 
 
100
        /*
 
101
         * The last message in a transaction should always have a commit_id
 
102
         * value larger than 0, though other messages of the same transaction
 
103
         * will have commit_id = 0.
 
104
         */
 
105
        assert((not commit_id.empty()) && (commit_id != "0"));
92
106
 
93
107
        assert(segmented_sql.empty());
94
108
 
95
109
        if (not aggregate_sql.empty())
96
 
          executeSQL(*(session.get()), aggregate_sql);
 
110
        {
 
111
          executeSQL(*(session.get()), aggregate_sql, commit_id);
 
112
        }
97
113
 
98
114
        deleteFromQueue(*(session.get()), trx_id);      
99
115
      }
112
128
}
113
129
 
114
130
 
 
131
bool QueueManager::createApplierSchemaAndTables(Session &session)
 
132
{
 
133
  vector<string> sql;
 
134
  
 
135
  sql.push_back("COMMIT");
 
136
  sql.push_back("CREATE SCHEMA IF NOT EXISTS replication");
 
137
  
 
138
  if (not executeSQL(session, sql))
 
139
    return false;
 
140
  
 
141
  sql.clear();
 
142
  sql.push_back("COMMIT");
 
143
  sql.push_back("CREATE TABLE IF NOT EXISTS replication.applier_state"
 
144
                " (last_applied_commit_id BIGINT NOT NULL PRIMARY KEY)");
 
145
  
 
146
  /* TODO: code here
 
147
   SELECT COUNT(*) FROM replication.applier_state
 
148
   if (count == 0)
 
149
   INSERT INTO replication.applier_state VALUES (0)
 
150
   */
 
151
  
 
152
  if (not executeSQL(session, sql))
 
153
    return false;
 
154
  
 
155
  return true;
 
156
}  
 
157
  
 
158
  
115
159
bool QueueManager::getMessage(Session &session,
116
160
                              message::Transaction &transaction,
 
161
                              string &commit_id,
117
162
                              uint64_t trx_id,
118
163
                              uint32_t segment_id)
119
164
{
120
 
  string sql("SELECT msg FROM ");
 
165
  string sql("SELECT msg, commit_order FROM ");
121
166
  sql.append(getSchema());
122
167
  sql.append(".");
123
168
  sql.append(getTable());
126
171
  sql.append(" AND seg_id = ");
127
172
  sql.append(boost::lexical_cast<std::string>(segment_id));
128
173
 
129
 
  printf("%s\n", sql.c_str()); fflush(stdout);
130
 
  sql::ResultSet result_set(1);
 
174
  //printf("%s\n", sql.c_str()); fflush(stdout);
 
175
  sql::ResultSet result_set(2);
131
176
  Execute execute(session, true);
132
177
  
133
178
  execute.run(sql, &result_set);
134
179
  
135
 
  assert(result_set.getMetaData().getColumnCount() == 1);
 
180
  assert(result_set.getMetaData().getColumnCount() == 2);
136
181
 
137
182
  /* Really should only be 1 returned row */
138
 
  
139
183
  uint32_t found_rows= 0;
140
 
 
141
184
  while (result_set.next())
142
185
  {
143
 
    string value= result_set.getString(0);
144
 
    
145
 
    if ((value == "") || (found_rows == 1))
 
186
    string msg= result_set.getString(0);
 
187
    string com_id= result_set.getString(1);
 
188
 
 
189
    if ((msg == "") || (found_rows == 1))
146
190
      break;
147
191
 
 
192
    /* Neither column should be NULL */
148
193
    assert(result_set.isNull(0) == false);
149
 
    transaction.ParseFromArray(value.c_str(), value.length());
 
194
    assert(result_set.isNull(1) == false);
 
195
 
 
196
    //transaction.ParseFromString(value);
 
197
    google::protobuf::TextFormat::ParseFromString(msg, &transaction);
 
198
    commit_id= com_id;
150
199
 
151
200
    found_rows++;
152
201
  }
220
269
      break;
221
270
    }
222
271
 
223
 
    if (statement.type() == message::Statement::ROLLBACK_STATEMENT)
 
272
    switch (statement.type())
224
273
    {
225
 
      segmented_sql.clear();
226
 
      continue;
 
274
      /* DDL cannot be in a transaction, so precede with a COMMIT */
 
275
      case message::Statement::TRUNCATE_TABLE:
 
276
      case message::Statement::CREATE_SCHEMA:
 
277
      case message::Statement::ALTER_SCHEMA:
 
278
      case message::Statement::DROP_SCHEMA:
 
279
      case message::Statement::CREATE_TABLE:
 
280
      case message::Statement::ALTER_TABLE:
 
281
      case message::Statement::DROP_TABLE:
 
282
      {
 
283
        segmented_sql.push_back("COMMIT");
 
284
        break;
 
285
      }
 
286
 
 
287
      /* Cancel any ongoing statement */
 
288
      case message::Statement::ROLLBACK_STATEMENT:
 
289
      {
 
290
        segmented_sql.clear();
 
291
        continue;
 
292
      }
 
293
      
 
294
      default:
 
295
      {
 
296
        break;
 
297
      }
227
298
    }
228
299
 
229
300
    if (message::transformStatementToSql(statement, segmented_sql,
290
361
  return true;
291
362
}
292
363
 
293
 
 
294
 
bool QueueManager::executeSQL(Session &session, vector<string> &sql)
 
364
bool QueueManager::executeSQL(Session &session,
 
365
                              vector<string> &sql,
 
366
                              const string &commit_id)
 
367
{
 
368
  string tmp("UPDATE replication.applier_state"
 
369
             " SET last_applied_commit_id = ");
 
370
  tmp.append(commit_id);
 
371
  sql.push_back(tmp);
 
372
 
 
373
  return executeSQL(session, sql);
 
374
}
 
375
 
 
376
 
 
377
bool QueueManager::executeSQL(Session &session,
 
378
                              vector<string> &sql)
295
379
{
296
380
  string combined_sql;
297
381