~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/utilities/transaction_reader.cc

  • Committer: Lee Bieber
  • Date: 2010-12-05 04:52:45 UTC
  • mfrom: (1974.1.2 build)
  • Revision ID: kalebral@gmail.com-20101205045245-ey8nbzofn98dr7gb
Merge Padraig - Small fix needed when building a plugin out-of-tree.
Merge Joe - More work on storing transaction log in innodb

Show diffs side-by-side

added added

removed removed

Lines of Context:
35
35
#include "drizzled/message/statement_transform.h"
36
36
#include "transaction_manager.h"
37
37
#include "transaction_file_reader.h"
 
38
#include "transaction_log_connection.h"
38
39
 
39
40
#include <google/protobuf/io/coded_stream.h>
40
41
#include <google/protobuf/io/zero_copy_stream_impl.h>
351
352
    cout << "COMMIT;" << endl;
352
353
}
353
354
 
 
355
static void processTransactionMessage(TransactionManager &trx_mgr, 
 
356
                                      const message::Transaction &transaction, 
 
357
                                      bool summarize,
 
358
                                      bool ignore_events,
 
359
                                      bool print_as_raw)
 
360
{
 
361
  if (not isEndTransaction(transaction))
 
362
  {
 
363
    trx_mgr.store(transaction);
 
364
  }
 
365
  else
 
366
  {
 
367
    const message::TransactionContext trx= transaction.transaction_context();
 
368
    uint64_t transaction_id= trx.transaction_id();
 
369
 
 
370
    /*
 
371
     * If there are any previous Transaction messages for this transaction,
 
372
     * store this one, then output all of them together.
 
373
     */
 
374
    if (trx_mgr.contains(transaction_id))
 
375
    {
 
376
      trx_mgr.store(transaction);
 
377
 
 
378
      uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
 
379
      uint32_t idx= 0;
 
380
 
 
381
      while (idx != size)
 
382
      {
 
383
        message::Transaction new_trx;
 
384
        trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
 
385
        if (summarize)
 
386
        {
 
387
          printTransactionSummary(new_trx, ignore_events);
 
388
        }
 
389
        else
 
390
        {
 
391
          printTransaction(new_trx, ignore_events, print_as_raw);
 
392
        }
 
393
        idx++;
 
394
      }
 
395
 
 
396
      /* No longer need this transaction */
 
397
      trx_mgr.remove(transaction_id);
 
398
    }
 
399
    else
 
400
    {
 
401
      if (summarize)
 
402
      {
 
403
        printTransactionSummary(transaction, ignore_events);
 
404
      }
 
405
      else
 
406
      {
 
407
        printTransaction(transaction, ignore_events, print_as_raw);
 
408
      }
 
409
    }
 
410
  }
 
411
}
 
412
 
354
413
int main(int argc, char* argv[])
355
414
{
356
415
  GOOGLE_PROTOBUF_VERIFY_VERSION;
357
416
  int opt_start_pos= 0;
358
417
  uint64_t opt_transaction_id= 0;
 
418
  uint64_t opt_start_transaction_id= 0;
 
419
  uint32_t opt_drizzle_port= 0; 
 
420
  string current_user, opt_password, opt_protocol, current_host;
 
421
  bool use_drizzle_protocol= false; 
359
422
 
360
423
  /*
361
424
   * Setup program options
363
426
  po::options_description desc("Program options");
364
427
  desc.add_options()
365
428
    ("help", N_("Display help and exit"))
 
429
    ("use-innodb-replication-log", N_("Read from the innodb transaction log"))
 
430
    ("user,u", po::value<string>(&current_user)->default_value(""), 
 
431
      N_("User for login if not current user."))
 
432
    ("port,p", po::value<uint32_t>(&opt_drizzle_port)->default_value(0), 
 
433
      N_("Port number to use for connection."))
 
434
    ("password,P", po::value<string>(&opt_password)->default_value(""), 
 
435
      N_("Password to use when connecting to server"))
 
436
    ("protocol",po::value<string>(&opt_protocol)->default_value("mysql"),
 
437
      N_("The protocol of connection (mysql or drizzle)."))
366
438
    ("checksum", N_("Perform checksum"))
367
439
    ("ignore-events", N_("Ignore event messages"))
368
440
    ("input-file", po::value< vector<string> >(), N_("Transaction log file"))
370
442
    ("start-pos",
371
443
      po::value<int>(&opt_start_pos),
372
444
      N_("Start reading from the given file position"))
 
445
    ("start-transaction-id",
 
446
      po::value<uint64_t>(&opt_start_transaction_id),
 
447
      N_("Only output for the given transaction ID and later"))
373
448
    ("transaction-id",
374
449
      po::value<uint64_t>(&opt_transaction_id),
375
450
      N_("Only output for the given transaction ID"))
389
464
            options(desc).positional(pos).run(), vm);
390
465
  po::notify(vm);
391
466
 
392
 
  /*
393
 
   * If the help option was given, or not input file was supplied,
394
 
   * print out usage information.
395
 
   */
396
 
  if (vm.count("help") || not vm.count("input-file"))
 
467
  if (vm.count("help"))
 
468
  {
 
469
    cerr << desc << endl;
 
470
    return -1;
 
471
  }
 
472
 
 
473
  if (not vm.count("input-file") && not vm.count("use-innodb-replication-log"))
397
474
  {
398
475
    cerr << desc << endl;
399
476
    return -1;
416
493
  }
417
494
 
418
495
  bool do_checksum= vm.count("checksum") ? true : false;
 
496
  bool use_innodb_replication_log= vm.count("use-innodb-replication-log") ? true : false;
419
497
  bool ignore_events= vm.count("ignore-events") ? true : false;
420
498
  bool print_as_raw= vm.count("raw") ? true : false;
421
 
 
422
 
  string filename= vm["input-file"].as< vector<string> >()[0];
423
 
 
424
 
  TransactionFileReader fileReader;
425
 
 
426
 
  if (not fileReader.openFile(filename, opt_start_pos))
427
 
  {
428
 
    cerr << fileReader.getErrorString() << endl;
429
 
    return -1;
430
 
  }
431
 
 
432
 
  message::Transaction transaction;
433
 
  TransactionManager trx_mgr;
434
 
  uint32_t checksum= 0;
435
 
 
436
 
  while (fileReader.getNextTransaction(transaction, &checksum))
437
 
  {
438
 
    const message::TransactionContext trx= transaction.transaction_context();
439
 
    uint64_t transaction_id= trx.transaction_id();
440
 
 
441
 
    /*
442
 
     * If we are given a transaction ID, we only look for that one and
443
 
     * print it out.
444
 
     */
 
499
  bool summarize= vm.count("summarize") ? true : false;
 
500
 
 
501
  if (use_innodb_replication_log)
 
502
  {
 
503
    TransactionLogConnection *connection = new TransactionLogConnection(current_host, opt_drizzle_port,
 
504
      current_user, opt_password, use_drizzle_protocol);
 
505
 
 
506
    string query_string;
445
507
    if (vm.count("transaction-id"))
446
508
    {
447
 
      if (opt_transaction_id == transaction_id)
448
 
        printTransaction(transaction, ignore_events, print_as_raw);
449
 
      else
450
 
        continue;
451
 
    }
452
 
 
453
 
    /*
454
 
     * No transaction ID given, so process all messages.
455
 
     */
 
509
      query_string.append("SELECT transaction_message_binary, transaction_length FROM DATA_DICTIONARY.INNODB_REPLICATION_LOG WHERE transaction_id=");
 
510
      query_string.append(boost::lexical_cast<string>(opt_transaction_id));
 
511
    }
 
512
    else if (vm.count("start-transaction-id"))
 
513
    {
 
514
      query_string.append("SELECT transaction_message_binary, transaction_length FROM DATA_DICTIONARY.INNODB_REPLICATION_LOG WHERE transaction_id >=");
 
515
      query_string.append(boost::lexical_cast<string>(opt_start_transaction_id));
 
516
      query_string.append(" ORDER BY transaction_id ASC");
 
517
    }
456
518
    else
457
519
    {
458
 
      if (not isEndTransaction(transaction))
 
520
      query_string= "SELECT transaction_message_binary, transaction_length FROM DATA_DICTIONARY.INNODB_REPLICATION_LOG";
 
521
    }
 
522
 
 
523
    drizzle_result_st *result= connection->query(query_string);
 
524
 
 
525
    drizzle_row_t row;
 
526
    while ((row= drizzle_row_next(result)))
 
527
    {
 
528
      char* data= (char*)row[0];
 
529
      uint64_t length= (row[1]) ? boost::lexical_cast<uint64_t>(row[1]) : 0;
 
530
 
 
531
      message::Transaction transaction;
 
532
      TransactionManager trx_mgr;
 
533
 
 
534
      transaction.ParseFromArray(data, length);
 
535
 
 
536
      processTransactionMessage(trx_mgr, transaction, 
 
537
                                summarize, ignore_events, print_as_raw);
 
538
    }    
 
539
  }
 
540
  else // file based transaction log 
 
541
  {
 
542
    string filename= vm["input-file"].as< vector<string> >()[0];
 
543
 
 
544
    TransactionFileReader fileReader;
 
545
 
 
546
    if (not fileReader.openFile(filename, opt_start_pos))
 
547
    {
 
548
      cerr << fileReader.getErrorString() << endl;
 
549
      return -1;
 
550
    }
 
551
 
 
552
    message::Transaction transaction;
 
553
    TransactionManager trx_mgr;
 
554
    uint32_t checksum= 0;
 
555
 
 
556
    while (fileReader.getNextTransaction(transaction, &checksum))
 
557
    {
 
558
      const message::TransactionContext trx= transaction.transaction_context();
 
559
      uint64_t transaction_id= trx.transaction_id();
 
560
    
 
561
      /*
 
562
       * If we are given a transaction ID, we only look for that one and
 
563
       * print it out.
 
564
       */
 
565
      if (vm.count("transaction-id"))
459
566
      {
460
 
        trx_mgr.store(transaction);
 
567
        if (opt_transaction_id == transaction_id)
 
568
        {
 
569
          processTransactionMessage(trx_mgr, transaction, summarize, 
 
570
                                    ignore_events, print_as_raw);
 
571
        }
 
572
        else
 
573
        {
 
574
          continue;
 
575
        }
461
576
      }
462
 
      else
 
577
      else 
463
578
      {
464
579
        /*
465
 
         * If there are any previous Transaction messages for this transaction,
466
 
         * store this one, then output all of them together.
 
580
         * No transaction ID given, so process all messages.
467
581
         */
468
 
        if (trx_mgr.contains(transaction_id))
469
 
        {
470
 
          trx_mgr.store(transaction);
471
 
 
472
 
          uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
473
 
          uint32_t idx= 0;
474
 
 
475
 
          while (idx != size)
476
 
          {
477
 
            message::Transaction new_trx;
478
 
            trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
479
 
            if (vm.count("summarize"))
480
 
              printTransactionSummary(new_trx, ignore_events);
481
 
            else
482
 
              printTransaction(new_trx, ignore_events, print_as_raw);
483
 
            idx++;
484
 
          }
485
 
 
486
 
          /* No longer need this transaction */
487
 
          trx_mgr.remove(transaction_id);
488
 
        }
489
 
        else
490
 
        {
491
 
          if (vm.count("summarize"))
492
 
            printTransactionSummary(transaction, ignore_events);
493
 
          else
494
 
            printTransaction(transaction, ignore_events, print_as_raw);
 
582
        processTransactionMessage(trx_mgr, transaction, summarize,
 
583
                                  ignore_events, print_as_raw);
 
584
      }  
 
585
 
 
586
      if (do_checksum)
 
587
      {
 
588
        uint32_t calculated= fileReader.checksumLastReadTransaction();
 
589
        if (checksum != calculated)
 
590
        {
 
591
          cerr << _("Checksum failed. Wanted ")
 
592
               << checksum
 
593
               << _(" got ")
 
594
               << calculated
 
595
               << endl;
495
596
        }
496
597
      }
497
 
    } /* end ! vm.count("transaction-id") */
498
 
 
499
 
    if (do_checksum)
 
598
    } // end while
 
599
 
 
600
    string error= fileReader.getErrorString();
 
601
 
 
602
    if (error != "EOF")
500
603
    {
501
 
      uint32_t calculated= fileReader.checksumLastReadTransaction();
502
 
      if (checksum != calculated)
503
 
      {
504
 
        cerr << _("Checksum failed. Wanted ")
505
 
             << checksum
506
 
             << _(" got ")
507
 
             << calculated
508
 
             << endl;
509
 
      }
 
604
      cerr << error << endl;
 
605
      return 1;
510
606
    }
511
 
 
512
 
  } /* end while */
513
 
 
514
 
  string error= fileReader.getErrorString();
515
 
 
516
 
  if (error != "EOF")
517
 
  {
518
 
    cerr << error << endl;
519
 
    return 1;
520
607
  }
521
608
 
522
609
  return 0;