~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_reader.cc

merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 */
23
23
 
24
24
#include "config.h"
25
 
#include <drizzled/definitions.h>
26
 
#include <drizzled/gettext.h>
27
 
#include <drizzled/replication_services.h>
28
 
#include <drizzled/algorithm/crc32.h>
29
25
#include <sys/types.h>
30
26
#include <sys/stat.h>
31
27
#include <fcntl.h>
32
28
#include <limits.h>
33
 
#include <cstdio>
34
29
#include <cerrno>
35
30
#include <iostream>
36
31
#include <string>
37
32
#include <algorithm>
38
33
#include <vector>
39
34
#include <unistd.h>
40
 
#include <drizzled/message/transaction.pb.h>
41
 
#include <drizzled/message/statement_transform.h>
42
 
#include <drizzled/message/transaction_manager.h>
43
 
#include <drizzled/util/convert.h>
 
35
#include "drizzled/definitions.h"
 
36
#include "drizzled/gettext.h"
 
37
#include "drizzled/replication_services.h"
 
38
#include "drizzled/algorithm/crc32.h"
 
39
#include "drizzled/message/transaction.pb.h"
 
40
#include "drizzled/message/statement_transform.h"
 
41
#include "drizzled/message/transaction_manager.h"
 
42
#include "drizzled/util/convert.h"
44
43
 
45
44
#include <google/protobuf/io/coded_stream.h>
46
45
#include <google/protobuf/io/zero_copy_stream_impl.h>
70
69
  {
71
70
    string &sql= *sql_string_iter;
72
71
 
73
 
    /* 
74
 
     * Replace \n and \r with spaces so that SQL statements 
75
 
     * are always on a single line 
 
72
    /*
 
73
     * Replace \n and \r with spaces so that SQL statements
 
74
     * are always on a single line
76
75
     */
77
76
    {
78
77
      string::size_type found= sql.find_first_of(replace_with_spaces);
175
174
}
176
175
 
177
176
static void printTransaction(const message::Transaction &transaction,
178
 
                             bool ignore_events)
 
177
                             bool ignore_events,
 
178
                             bool print_as_raw)
179
179
{
180
180
  static uint64_t last_trx_id= 0;
181
181
  bool should_commit= true;
188
188
  {
189
189
    last_trx_id= trx.transaction_id();
190
190
    if (not ignore_events)
191
 
      printEvent(transaction.event());
 
191
    {
 
192
      if (print_as_raw)
 
193
        transaction.PrintDebugString();
 
194
      else
 
195
        printEvent(transaction.event());
 
196
    }
 
197
    return;
 
198
  }
 
199
 
 
200
  if (print_as_raw)
 
201
  {
 
202
    transaction.PrintDebugString();
192
203
    return;
193
204
  }
194
205
 
238
249
int main(int argc, char* argv[])
239
250
{
240
251
  GOOGLE_PROTOBUF_VERIFY_VERSION;
241
 
  int file;
 
252
  int opt_start_pos= 0;
 
253
  uint64_t opt_transaction_id= 0;
242
254
 
243
255
  /*
244
256
   * Setup program options
248
260
    ("help", N_("Display help and exit"))
249
261
    ("checksum", N_("Perform checksum"))
250
262
    ("ignore-events", N_("Ignore event messages"))
251
 
    ("input-file", po::value< vector<string> >(), N_("Transaction log file"));
 
263
    ("input-file", po::value< vector<string> >(), N_("Transaction log file"))
 
264
    ("raw", N_("Print raw Protobuf messages instead of SQL"))
 
265
    ("start-pos",
 
266
      po::value<int>(&opt_start_pos),
 
267
      N_("Start reading from the given file position"))
 
268
    ("transaction-id",
 
269
      po::value<uint64_t>(&opt_transaction_id),
 
270
      N_("Only output for the given transaction ID"));
252
271
 
253
272
  /*
254
273
   * We allow one positional argument that will be transaction file name
270
289
   */
271
290
  if (vm.count("help") || not vm.count("input-file"))
272
291
  {
273
 
    fprintf(stderr, _("Usage: %s [options] TRANSACTION_LOG \n"),
274
 
            argv[0]);
275
 
    fprintf(stderr, _("OPTIONS:\n"));
276
 
    fprintf(stderr, _("--help           :  Display help and exit\n"));
277
 
    fprintf(stderr, _("--checksum       :  Perform checksum\n"));
278
 
    fprintf(stderr, _("--ignore-events  :  Ignore event messages\n"));
 
292
    cerr << desc << endl;
 
293
    return -1;
 
294
  }
 
295
 
 
296
  /*
 
297
   * Specifying both a transaction ID and a start position
 
298
   * is not logical.
 
299
   */
 
300
  if (vm.count("start-pos") && vm.count("transaction-id"))
 
301
  {
 
302
    cerr << _("Cannot use --start-pos and --transaction-id together\n");
279
303
    return -1;
280
304
  }
281
305
 
282
306
  bool do_checksum= vm.count("checksum") ? true : false;
283
307
  bool ignore_events= vm.count("ignore-events") ? true : false;
 
308
  bool print_as_raw= vm.count("raw") ? true : false;
284
309
 
285
310
  string filename= vm["input-file"].as< vector<string> >()[0];
286
 
  file= open(filename.c_str(), O_RDONLY);
 
311
  int file= open(filename.c_str(), O_RDONLY);
287
312
  if (file == -1)
288
313
  {
289
 
    fprintf(stderr, _("Cannot open file: %s\n"), filename.c_str());
 
314
    cerr << _("Cannot open file: ") << filename << endl;
290
315
    return -1;
291
316
  }
292
317
 
294
319
  message::TransactionManager trx_mgr;
295
320
 
296
321
  protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
297
 
  protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
 
322
 
 
323
  /* Skip ahead to user supplied position */
 
324
  if (opt_start_pos)
 
325
  {
 
326
    if (not raw_input->Skip(opt_start_pos))
 
327
    {
 
328
      cerr << _("Could not skip to position ") << opt_start_pos
 
329
           << _(" in file ") << filename << endl;
 
330
      exit(-1);
 
331
    }
 
332
  }
298
333
 
299
334
  char *buffer= NULL;
300
335
  char *temp_buffer= NULL;
304
339
  bool result= true;
305
340
  uint32_t message_type= 0;
306
341
 
307
 
  /* Read in the length of the command */
308
 
  while (result == true && 
309
 
         coded_input->ReadLittleEndian32(&message_type) == true &&
310
 
         coded_input->ReadLittleEndian32(&length) == true)
 
342
  while (result == true)
311
343
  {
 
344
   /*
 
345
     * Odd thing to note about using CodedInputStream: This class wasn't
 
346
     * intended to read large amounts of GPB messages. It has an upper
 
347
     * limit on the number of bytes it will read (see Protobuf docs for
 
348
     * this class for more info). A warning will be produced as you
 
349
     * get close to this limit. Since this is a pretty lightweight class,
 
350
     * we should be able to simply create a new one for each message we
 
351
     * want to read.
 
352
     */
 
353
    protobuf::io::CodedInputStream coded_input(raw_input);
 
354
 
 
355
    /* Read in the type and length of the command */
 
356
    if (not coded_input.ReadLittleEndian32(&message_type) ||
 
357
        not coded_input.ReadLittleEndian32(&length))
 
358
    {
 
359
      break;  /* EOF */
 
360
    }
 
361
 
312
362
    if (message_type != ReplicationServices::TRANSACTION)
313
363
    {
314
 
      fprintf(stderr, _("Found a non-transaction message in log.  Currently, not supported.\n"));
315
 
      exit(1);
 
364
      cerr << _("Found a non-transaction message in log.  Currently, not supported.\n");
 
365
      exit(-1);
316
366
    }
317
367
 
318
368
    if (length > INT_MAX)
319
369
    {
320
 
      fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
321
 
      exit(1);
 
370
      cerr << _("Attempted to read record bigger than INT_MAX\n");
 
371
      exit(-1);
322
372
    }
323
373
 
324
374
    if (buffer == NULL)
325
375
    {
326
 
      /* 
 
376
      /*
327
377
       * First time around...just malloc the length.  This block gets rid
328
378
       * of a GCC warning about uninitialized temp_buffer.
329
379
       */
337
387
 
338
388
    if (temp_buffer == NULL)
339
389
    {
340
 
      fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
341
 
              static_cast<uint64_t>(length));
 
390
      cerr << _("Memory allocation failure trying to allocate ") << length << _(" bytes\n");
342
391
      break;
343
392
    }
344
393
    else
345
394
      buffer= temp_buffer;
346
395
 
347
396
    /* Read the Command */
348
 
    result= coded_input->ReadRaw(buffer, (int) length);
 
397
    result= coded_input.ReadRaw(buffer, (int) length);
349
398
    if (result == false)
350
399
    {
351
400
      char errmsg[STRERROR_MAX];
352
401
      strerror_r(errno, errmsg, sizeof(errmsg));
353
 
      fprintf(stderr, _("Could not read transaction message.\n"));
354
 
      fprintf(stderr, _("GPB ERROR: %s.\n"), errmsg);
 
402
      cerr << _("Could not read transaction message.\n");
 
403
      cerr << _("GPB ERROR: ") << errmsg << endl;;
355
404
      string hexdump;
356
405
      hexdump.reserve(length * 4);
357
406
      bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
358
 
      fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
 
407
      cerr << _("HEXDUMP:\n\n") << hexdump << endl;
359
408
      break;
360
409
    }
361
410
 
362
411
    result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
363
412
    if (result == false)
364
413
    {
365
 
      fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
 
414
      cerr << _("Unable to parse command. Got error: ")
 
415
           << transaction.InitializationErrorString() << endl;
366
416
      if (buffer != NULL)
367
417
      {
368
418
        string hexdump;
369
419
        hexdump.reserve(length * 4);
370
420
        bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
371
 
        fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
 
421
        cerr <<  _("HEXDUMP:\n\n") << hexdump << endl;
372
422
      }
373
423
      break;
374
424
    }
375
425
 
376
 
    if (not isEndTransaction(transaction))
 
426
    const message::TransactionContext trx= transaction.transaction_context();
 
427
    uint64_t transaction_id= trx.transaction_id();
 
428
 
 
429
    /*
 
430
     * If we are given a transaction ID, we only look for that one and
 
431
     * print it out.
 
432
     */
 
433
    if (vm.count("transaction-id"))
377
434
    {
378
 
      trx_mgr.store(transaction);
 
435
      if (opt_transaction_id == transaction_id)
 
436
      {
 
437
        printTransaction(transaction, ignore_events, print_as_raw);
 
438
      }
 
439
      else
 
440
      {
 
441
        /* Need to get the checksum bytes out of stream */
 
442
        coded_input.ReadLittleEndian32(&checksum);
 
443
        previous_length = length;
 
444
        continue;
 
445
      }
379
446
    }
 
447
 
 
448
    /*
 
449
     * No transaction ID given, so process all messages.
 
450
     */
380
451
    else
381
452
    {
382
 
      const message::TransactionContext trx= transaction.transaction_context();
383
 
      uint64_t transaction_id= trx.transaction_id();
384
 
 
385
 
      /*
386
 
       * If there are any previous Transaction messages for this transaction,
387
 
       * store this one, then output all of them together.
388
 
       */
389
 
      if (trx_mgr.contains(transaction_id))
 
453
      if (not isEndTransaction(transaction))
390
454
      {
391
455
        trx_mgr.store(transaction);
392
 
 
393
 
        uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
394
 
        uint32_t idx= 0;
395
 
 
396
 
        while (idx != size)
397
 
        {
398
 
          message::Transaction new_trx;
399
 
          trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
400
 
          printTransaction(new_trx, ignore_events);
401
 
          idx++;
402
 
        }
403
 
 
404
 
        /* No longer need this transaction */
405
 
        trx_mgr.remove(transaction_id);
406
456
      }
407
457
      else
408
458
      {
409
 
        printTransaction(transaction, ignore_events);
 
459
        /*
 
460
         * If there are any previous Transaction messages for this transaction,
 
461
         * store this one, then output all of them together.
 
462
         */
 
463
        if (trx_mgr.contains(transaction_id))
 
464
        {
 
465
          trx_mgr.store(transaction);
 
466
 
 
467
          uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
 
468
          uint32_t idx= 0;
 
469
 
 
470
          while (idx != size)
 
471
          {
 
472
            message::Transaction new_trx;
 
473
            trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
 
474
            printTransaction(new_trx, ignore_events, print_as_raw);
 
475
            idx++;
 
476
          }
 
477
 
 
478
          /* No longer need this transaction */
 
479
          trx_mgr.remove(transaction_id);
 
480
        }
 
481
        else
 
482
        {
 
483
          printTransaction(transaction, ignore_events, print_as_raw);
 
484
        }
410
485
      }
411
 
    }
 
486
    } /* end ! vm.count("transaction-id") */
412
487
 
413
488
    /* Skip 4 byte checksum */
414
 
    coded_input->ReadLittleEndian32(&checksum);
 
489
    coded_input.ReadLittleEndian32(&checksum);
415
490
 
416
491
    if (do_checksum)
417
492
    {
418
493
      if (checksum != drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)))
419
494
      {
420
 
        fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)));
 
495
        cerr << _("Checksum failed. Wanted ")
 
496
             << checksum
 
497
             << _(" got ")
 
498
             << drizzled::algorithm::crc32(buffer, static_cast<size_t>(length))
 
499
             << endl;
421
500
      }
422
501
    }
423
502
 
427
506
  if (buffer)
428
507
    free(buffer);
429
508
 
430
 
  delete coded_input;
431
509
  delete raw_input;
432
510
 
433
511
  return (result == true ? 0 : 1);