~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_reader.cc

  • Committer: lbieber
  • Date: 2010-09-27 18:21:19 UTC
  • mfrom: (1797.1.2 build)
  • Revision ID: lbieber@orisndriz08-20100927182119-mz1a2hoo2pszmnom
Merge Stewart - Allow access to internal InnoDB (HailDB) data dictionary tables, fix enum now that it is 4 bytes
Merge Shrews - fix bug 643935 - The transaction_reader program should handle mixed transaction messages

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
#include <unistd.h>
40
40
#include <drizzled/message/transaction.pb.h>
41
41
#include <drizzled/message/statement_transform.h>
 
42
#include <drizzled/message/transaction_manager.h>
42
43
#include <drizzled/util/convert.h>
43
44
 
44
45
#include <google/protobuf/io/coded_stream.h>
126
127
  return true;
127
128
}
128
129
 
 
130
static bool isEndTransaction(const message::Transaction &transaction)
 
131
{
 
132
  const message::TransactionContext trx= transaction.transaction_context();
 
133
 
 
134
  size_t num_statements= transaction.statement_size();
 
135
 
 
136
  /*
 
137
   * If any Statement is partial, then we can expect another Transaction
 
138
   * message.
 
139
   */
 
140
  for (size_t x= 0; x < num_statements; ++x)
 
141
  {
 
142
    const message::Statement &statement= transaction.statement(x);
 
143
 
 
144
    if (not isEndStatement(statement))
 
145
      return false;
 
146
  }
 
147
 
 
148
  return true;
 
149
}
 
150
 
129
151
static void printTransaction(const message::Transaction &transaction)
130
152
{
131
153
  static uint64_t last_trx_id= 0;
137
159
 
138
160
  /*
139
161
   * One way to determine when a new transaction begins is when the
140
 
   * transaction id changes. We check that here.
 
162
   * transaction id changes (if all transactions have their GPB messages
 
163
   * grouped together, which this program will). We check that here.
141
164
   */
142
165
  if (trx.transaction_id() != last_trx_id)
143
166
    cout << "START TRANSACTION;" << endl;
196
219
      do_checksum= true;
197
220
  }
198
221
 
 
222
  message::TransactionManager trx_mgr;
 
223
 
199
224
  protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
200
225
  protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
201
226
 
276
301
      break;
277
302
    }
278
303
 
279
 
    /* Print the transaction */
280
 
    printTransaction(transaction);
 
304
    if (not isEndTransaction(transaction))
 
305
    {
 
306
      trx_mgr.store(transaction);
 
307
    }
 
308
    else
 
309
    {
 
310
      const message::TransactionContext trx= transaction.transaction_context();
 
311
      uint64_t transaction_id= trx.transaction_id();
 
312
 
 
313
      /*
 
314
       * If there are any previous Transaction messages for this transaction,
 
315
       * store this one, then output all of them together.
 
316
       */
 
317
      if (trx_mgr.contains(transaction_id))
 
318
      {
 
319
        trx_mgr.store(transaction);
 
320
 
 
321
        uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
 
322
        uint32_t idx= 0;
 
323
 
 
324
        while (idx != size)
 
325
        {
 
326
          message::Transaction new_trx;
 
327
          trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
 
328
          printTransaction(new_trx);
 
329
          idx++;
 
330
        }
 
331
 
 
332
        /* No longer need this transaction */
 
333
        trx_mgr.remove(transaction_id);
 
334
      }
 
335
      else
 
336
      {
 
337
        printTransaction(transaction);
 
338
      }
 
339
    }
281
340
 
282
341
    /* Skip 4 byte checksum */
283
342
    coded_input->ReadLittleEndian32(&checksum);
291
350
    }
292
351
 
293
352
    previous_length= length;
294
 
  }
 
353
  } /* end while */
 
354
 
295
355
  if (buffer)
296
356
    free(buffer);
297
 
  
 
357
 
298
358
  delete coded_input;
299
359
  delete raw_input;
300
360