~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-12-19 06:20:54 UTC
  • mfrom: (2005.1.1 bug673105)
  • Revision ID: brian@tangent.org-20101219062054-1kt0l3dxs4z2z8md
Merge Dave.

Show diffs side-by-side

added added

removed removed

Lines of Context:
64
64
#include "drizzled/lock.h"
65
65
#include "drizzled/item/int.h"
66
66
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/epoch.h"
 
67
#include "drizzled/field/timestamp.h"
68
68
#include "drizzled/plugin/client.h"
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
316
void TransactionServices::registerResourceForStatement(Session *session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session.transaction.stmt;
333
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session->transaction.stmt;
 
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
348
void TransactionServices::registerResourceForStatement(Session *session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session.transaction.stmt;
366
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session->transaction.stmt;
 
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
382
void TransactionServices::registerResourceForTransaction(Session *session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session.transaction.all;
387
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session->transaction.all;
 
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session.transaction.xid_state.xid.is_null())
404
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
403
  if (session->transaction.xid_state.xid.is_null())
 
404
    session->transaction.xid_state.xid.set(session->getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session.transaction.all;
417
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session->transaction.all;
 
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session.transaction.xid_state.xid.is_null())
434
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
433
  if (session->transaction.xid_state.xid.is_null())
 
434
    session->transaction.xid_state.xid.set(session->getQueryId());
435
435
 
436
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
457
{
458
 
  if (session.getXaId() == 0)
 
458
  if (session->getXaId() == 0)
459
459
  {
460
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
461
461
  }
462
462
 
463
 
  return session.getXaId();
 
463
  return session->getXaId();
464
464
}
465
465
 
466
 
int TransactionServices::commitTransaction(Session::reference session,
467
 
                                           bool normal_transaction)
 
466
/**
 
467
  @retval
 
468
    0   ok
 
469
  @retval
 
470
    1   transaction was rolled back
 
471
  @retval
 
472
    2   error during commit, data may be inconsistent
 
473
 
 
474
  @todo
 
475
    Since we don't support nested statement transactions in 5.0,
 
476
    we can't commit or rollback stmt transactions while we are inside
 
477
    stored functions or triggers. So we simply do nothing now.
 
478
    TODO: This should be fixed in later ( >= 5.1) releases.
 
479
*/
 
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
468
481
{
469
482
  int error= 0, cookie= 0;
470
483
  /*
471
484
    'all' means that this is either an explicit commit issued by
472
485
    user, or an implicit commit issued by a DDL.
473
486
  */
474
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
475
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
476
489
 
477
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
478
491
 
479
492
  /*
480
493
    We must not commit the normal transaction if a statement
482
495
    flags will not get propagated to its normal transaction's
483
496
    counterpart.
484
497
  */
485
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
486
 
              trans == &session.transaction.stmt);
 
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
499
              trans == &session->transaction.stmt);
487
500
 
488
501
  if (resource_contexts.empty() == false)
489
502
  {
490
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
491
504
    {
492
505
      rollbackTransaction(session, normal_transaction);
493
506
      return 1;
518
531
 
519
532
        if (resource->participatesInXaTransaction())
520
533
        {
521
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
522
535
          {
523
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
524
537
            error= 1;
525
538
          }
526
539
          else
527
540
          {
528
 
            session.status_var.ha_prepare_count++;
 
541
            session->status_var.ha_prepare_count++;
529
542
          }
530
543
        }
531
544
      }
547
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
548
561
end:
549
562
    if (is_real_trans)
550
 
      session.startWaitingGlobalReadLock();
 
563
      session->startWaitingGlobalReadLock();
551
564
  }
552
565
  return error;
553
566
}
556
569
  @note
557
570
  This function does not care about global read lock. A caller should.
558
571
*/
559
 
int TransactionServices::commitPhaseOne(Session::reference session,
560
 
                                        bool normal_transaction)
 
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
561
573
{
562
574
  int error=0;
563
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
564
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
565
577
 
566
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
567
 
  bool all= normal_transaction;
568
 
 
569
 
  /* If we're in autocommit then we have a real transaction to commit
570
 
     (except if it's BEGIN)
571
 
  */
572
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
573
 
    all= true;
 
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
574
579
 
575
580
  if (resource_contexts.empty() == false)
576
581
  {
585
590
 
586
591
      if (resource->participatesInXaTransaction())
587
592
      {
588
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
589
594
        {
590
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
591
596
          error= 1;
592
597
        }
593
598
        else if (normal_transaction)
594
599
        {
595
 
          session.status_var.ha_commit_count++;
 
600
          session->status_var.ha_commit_count++;
596
601
        }
597
602
      }
598
603
      else if (resource->participatesInSqlTransaction())
599
604
      {
600
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
601
606
        {
602
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
603
608
          error= 1;
604
609
        }
605
610
        else if (normal_transaction)
606
611
        {
607
 
          session.status_var.ha_commit_count++;
 
612
          session->status_var.ha_commit_count++;
608
613
        }
609
614
      }
610
615
      resource_context->reset(); /* keep it conveniently zero-filled */
611
616
    }
612
617
 
613
618
    if (is_real_trans)
614
 
      session.transaction.xid_state.xid.null();
 
619
      session->transaction.xid_state.xid.null();
615
620
 
616
621
    if (normal_transaction)
617
622
    {
618
 
      session.variables.tx_isolation= session.session_tx_isolation;
619
 
      session.transaction.cleanup();
 
623
      session->variables.tx_isolation= session->session_tx_isolation;
 
624
      session->transaction.cleanup();
620
625
    }
621
626
  }
622
627
  trans->reset();
623
628
  return error;
624
629
}
625
630
 
626
 
int TransactionServices::rollbackTransaction(Session::reference session,
627
 
                                             bool normal_transaction)
 
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
628
632
{
629
633
  int error= 0;
630
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
631
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
632
636
 
633
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
635
638
 
636
639
  /*
637
640
    We must not rollback the normal transaction if a statement
638
641
    transaction is pending.
639
642
  */
640
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
641
 
              trans == &session.transaction.stmt);
 
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
644
              trans == &session->transaction.stmt);
642
645
 
643
646
  if (resource_contexts.empty() == false)
644
647
  {
653
656
 
654
657
      if (resource->participatesInXaTransaction())
655
658
      {
656
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
657
660
        {
658
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
659
662
          error= 1;
660
663
        }
661
664
        else if (normal_transaction)
662
665
        {
663
 
          session.status_var.ha_rollback_count++;
 
666
          session->status_var.ha_rollback_count++;
664
667
        }
665
668
      }
666
669
      else if (resource->participatesInSqlTransaction())
667
670
      {
668
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
669
672
        {
670
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
671
674
          error= 1;
672
675
        }
673
676
        else if (normal_transaction)
674
677
        {
675
 
          session.status_var.ha_rollback_count++;
 
678
          session->status_var.ha_rollback_count++;
676
679
        }
677
680
      }
678
681
      resource_context->reset(); /* keep it conveniently zero-filled */
685
688
     * a rollback statement with the corresponding transaction ID
686
689
     * to rollback.
687
690
     */
688
 
    if (all)
 
691
    if (normal_transaction)
689
692
      rollbackTransactionMessage(session);
690
693
    else
691
694
      rollbackStatementMessage(session);
692
695
 
693
696
    if (is_real_trans)
694
 
      session.transaction.xid_state.xid.null();
 
697
      session->transaction.xid_state.xid.null();
695
698
    if (normal_transaction)
696
699
    {
697
 
      session.variables.tx_isolation=session.session_tx_isolation;
698
 
      session.transaction.cleanup();
 
700
      session->variables.tx_isolation=session->session_tx_isolation;
 
701
      session->transaction.cleanup();
699
702
    }
700
703
  }
701
704
  if (normal_transaction)
702
 
    session.transaction_rollback_request= false;
 
705
    session->transaction_rollback_request= false;
703
706
 
704
707
  /*
705
708
   * If a non-transactional table was updated, warn the user
706
709
   */
707
710
  if (is_real_trans &&
708
 
      session.transaction.all.hasModifiedNonTransData() &&
709
 
      session.getKilled() != Session::KILL_CONNECTION)
 
711
      session->transaction.all.hasModifiedNonTransData() &&
 
712
      session->getKilled() != Session::KILL_CONNECTION)
710
713
  {
711
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
714
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
712
715
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
713
716
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
714
717
  }
716
719
  return error;
717
720
}
718
721
 
719
 
int TransactionServices::autocommitOrRollback(Session::reference session,
720
 
                                              int error)
 
722
/**
 
723
  This is used to commit or rollback a single statement depending on
 
724
  the value of error.
 
725
 
 
726
  @note
 
727
    Note that if the autocommit is on, then the following call inside
 
728
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
729
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
730
    the user has used LOCK TABLES then that mechanism does not know to do the
 
731
    commit.
 
732
*/
 
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
721
734
{
722
735
  /* One GPB Statement message per SQL statement */
723
 
  message::Statement *statement= session.getStatementMessage();
 
736
  message::Statement *statement= session->getStatementMessage();
724
737
  if ((statement != NULL) && (! error))
725
738
    finalizeStatementMessage(*statement, session);
726
739
 
727
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
 
740
  if (session->transaction.stmt.getResourceContexts().empty() == false)
728
741
  {
729
 
    TransactionContext *trans = &session.transaction.stmt;
 
742
    TransactionContext *trans = &session->transaction.stmt;
730
743
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
731
744
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
732
745
         it != resource_contexts.end();
734
747
    {
735
748
      ResourceContext *resource_context= *it;
736
749
 
737
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
 
750
      resource_context->getTransactionalStorageEngine()->endStatement(session);
738
751
    }
739
752
 
740
753
    if (! error)
745
758
    else
746
759
    {
747
760
      (void) rollbackTransaction(session, false);
748
 
      if (session.transaction_rollback_request)
749
 
      {
 
761
      if (session->transaction_rollback_request)
750
762
        (void) rollbackTransaction(session, true);
751
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
752
 
      }
753
763
    }
754
764
 
755
 
    session.variables.tx_isolation= session.session_tx_isolation;
 
765
    session->variables.tx_isolation= session->session_tx_isolation;
756
766
  }
757
 
 
758
767
  return error;
759
768
}
760
769
 
768
777
  }
769
778
};
770
779
 
771
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
772
 
                                             NamedSavepoint &sv)
 
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
773
781
{
774
782
  int error= 0;
775
 
  TransactionContext *trans= &session.transaction.all;
 
783
  TransactionContext *trans= &session->transaction.all;
776
784
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
777
785
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
778
786
 
792
800
 
793
801
    if (resource->participatesInSqlTransaction())
794
802
    {
795
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
803
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
796
804
      {
797
805
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
798
806
        error= 1;
799
807
      }
800
808
      else
801
809
      {
802
 
        session.status_var.ha_savepoint_rollback_count++;
 
810
        session->status_var.ha_savepoint_rollback_count++;
803
811
      }
804
812
    }
805
813
    trans->no_2pc|= not resource->participatesInXaTransaction();
849
857
 
850
858
      if (resource->participatesInSqlTransaction())
851
859
      {
852
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
860
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
853
861
        {
854
862
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
855
863
          error= 1;
856
864
        }
857
865
        else
858
866
        {
859
 
          session.status_var.ha_rollback_count++;
 
867
          session->status_var.ha_rollback_count++;
860
868
        }
861
869
      }
862
870
      resource_context->reset(); /* keep it conveniently zero-filled */
879
887
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
880
888
      if (num_statements == 0)
881
889
      {    
882
 
        session.setStatementMessage(NULL);
 
890
        session->setStatementMessage(NULL);
883
891
      }    
884
892
      else 
885
893
      {
886
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
894
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
887
895
      }    
888
 
      session.setTransactionMessage(savepoint_transaction_copy);
 
896
      session->setTransactionMessage(savepoint_transaction_copy);
889
897
    }
890
898
  }
891
899
 
898
906
  section "4.33.4 SQL-statements and transaction states",
899
907
  NamedSavepoint is *not* transaction-initiating SQL-statement
900
908
*/
901
 
int TransactionServices::setSavepoint(Session::reference session,
902
 
                                      NamedSavepoint &sv)
 
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
903
910
{
904
911
  int error= 0;
905
 
  TransactionContext *trans= &session.transaction.all;
 
912
  TransactionContext *trans= &session->transaction.all;
906
913
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
907
914
 
908
915
  if (resource_contexts.empty() == false)
918
925
 
919
926
      if (resource->participatesInSqlTransaction())
920
927
      {
921
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
928
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
922
929
        {
923
930
          my_error(ER_GET_ERRNO, MYF(0), err);
924
931
          error= 1;
925
932
        }
926
933
        else
927
934
        {
928
 
          session.status_var.ha_savepoint_count++;
 
935
          session->status_var.ha_savepoint_count++;
929
936
        }
930
937
      }
931
938
    }
937
944
 
938
945
  if (shouldConstructMessages())
939
946
  {
940
 
    message::Transaction *transaction= session.getTransactionMessage();
 
947
    message::Transaction *transaction= session->getTransactionMessage();
941
948
                  
942
949
    if (transaction != NULL)
943
950
    {
950
957
  return error;
951
958
}
952
959
 
953
 
int TransactionServices::releaseSavepoint(Session::reference session,
954
 
                                          NamedSavepoint &sv)
 
960
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
955
961
{
956
962
  int error= 0;
957
963
 
968
974
 
969
975
    if (resource->participatesInSqlTransaction())
970
976
    {
971
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
977
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
972
978
      {
973
979
        my_error(ER_GET_ERRNO, MYF(0), err);
974
980
        error= 1;
985
991
  return replication_services.isActive();
986
992
}
987
993
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
989
 
                                                                       bool should_inc_trx_id)
 
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
990
995
{
991
 
  message::Transaction *transaction= session.getTransactionMessage();
 
996
  message::Transaction *transaction= in_session->getTransactionMessage();
992
997
 
993
998
  if (unlikely(transaction == NULL))
994
999
  {
998
1003
     * deleting transaction message when done with it.
999
1004
     */
1000
1005
    transaction= new (nothrow) message::Transaction();
1001
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
 
    session.setTransactionMessage(transaction);
 
1006
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
1007
    in_session->setTransactionMessage(transaction);
1003
1008
    return transaction;
1004
1009
  }
1005
1010
  else
1006
1011
    return transaction;
1007
1012
}
1008
1013
 
1009
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
 
                                                 Session::reference session,
 
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1015
                                                 Session *in_session,
1011
1016
                                                 bool should_inc_trx_id)
1012
1017
{
1013
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
 
  trx->set_server_id(session.getServerId());
 
1018
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1019
  trx->set_server_id(in_session->getServerId());
1015
1020
 
1016
1021
  if (should_inc_trx_id)
1017
1022
  {
1018
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1019
 
    session.setXaId(0);
1020
 
  }
 
1023
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1024
    in_session->setXaId(0);
 
1025
  }  
1021
1026
  else
1022
 
  {
1023
 
    /* trx and seg id will get set properly elsewhere */
 
1027
  { 
1024
1028
    trx->set_transaction_id(0);
1025
1029
  }
1026
1030
 
1027
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1028
 
  
1029
 
  /* segment info may get set elsewhere as needed */
1030
 
  transaction.set_segment_id(1);
1031
 
  transaction.set_end_segment(true);
1032
 
}
1033
 
 
1034
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
 
                                                     Session::const_reference session)
1036
 
{
1037
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1039
 
}
1040
 
 
1041
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
 
                                                    Session::reference session)
1043
 
{
1044
 
  delete transaction;
1045
 
  session.setStatementMessage(NULL);
1046
 
  session.setTransactionMessage(NULL);
1047
 
  session.setXaId(0);
1048
 
}
1049
 
 
1050
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
1031
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1032
}
 
1033
 
 
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1035
                                              Session *in_session)
 
1036
{
 
1037
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1038
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1039
}
 
1040
 
 
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1042
                                             Session *in_session)
 
1043
{
 
1044
  delete in_transaction;
 
1045
  in_session->setStatementMessage(NULL);
 
1046
  in_session->setTransactionMessage(NULL);
 
1047
  in_session->setXaId(0);
 
1048
}
 
1049
 
 
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1051
1051
{
1052
1052
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1053
  if (! replication_services.isActive())
1057
1057
   * If no Transaction message was ever created, then no data modification
1058
1058
   * occurred inside the transaction, so nothing to do.
1059
1059
   */
1060
 
  if (session.getTransactionMessage() == NULL)
 
1060
  if (in_session->getTransactionMessage() == NULL)
1061
1061
    return 0;
1062
1062
  
1063
1063
  /* If there is an active statement message, finalize it. */
1064
 
  message::Statement *statement= session.getStatementMessage();
 
1064
  message::Statement *statement= in_session->getStatementMessage();
1065
1065
 
1066
1066
  if (statement != NULL)
1067
1067
  {
1068
 
    finalizeStatementMessage(*statement, session);
 
1068
    finalizeStatementMessage(*statement, in_session);
1069
1069
  }
1070
1070
 
1071
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
 
1071
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1072
1072
 
1073
1073
  /*
1074
1074
   * It is possible that we could have a Transaction without any Statements
1078
1078
   */
1079
1079
  if (transaction->statement_size() == 0)
1080
1080
  {
1081
 
    cleanupTransactionMessage(transaction, session);
 
1081
    cleanupTransactionMessage(transaction, in_session);
1082
1082
    return 0;
1083
1083
  }
1084
1084
  
1085
 
  finalizeTransactionMessage(*transaction, session);
 
1085
  finalizeTransactionMessage(*transaction, in_session);
1086
1086
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
1087
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1088
1088
 
1089
 
  cleanupTransactionMessage(transaction, session);
 
1089
  cleanupTransactionMessage(transaction, in_session);
1090
1090
 
1091
1091
  return static_cast<int>(result);
1092
1092
}
1093
1093
 
1094
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
 
                                               message::Statement::Type type,
1096
 
                                               Session::const_reference session)
 
1095
                                        message::Statement::Type in_type,
 
1096
                                        Session *in_session)
1097
1097
{
1098
 
  statement.set_type(type);
1099
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
 
1098
  statement.set_type(in_type);
 
1099
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1100
1100
 
1101
 
  if (session.variables.replicate_query)
1102
 
    statement.set_sql(session.getQueryString()->c_str());
 
1101
  if (in_session->variables.replicate_query)
 
1102
    statement.set_sql(in_session->getQueryString()->c_str());
1103
1103
}
1104
1104
 
1105
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                                   Session::reference session)
 
1106
                                            Session *in_session)
1107
1107
{
1108
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1109
 
  session.setStatementMessage(NULL);
 
1108
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1109
  in_session->setStatementMessage(NULL);
1110
1110
}
1111
1111
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1113
1113
{
1114
1114
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1115
  if (! replication_services.isActive())
1116
1116
    return;
1117
1117
  
1118
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1118
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1119
1119
 
1120
1120
  /*
1121
1121
   * OK, so there are two situations that we need to deal with here:
1138
1138
  {
1139
1139
    /* Remember the transaction ID so we can re-use it */
1140
1140
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
 
    uint32_t seg_id= transaction->segment_id();
1142
1141
 
1143
1142
    /*
1144
1143
     * Clear the transaction, create a Rollback statement message, 
1145
1144
     * attach it to the transaction, and push it to replicators.
1146
1145
     */
1147
1146
    transaction->Clear();
1148
 
    initTransactionMessage(*transaction, session, false);
 
1147
    initTransactionMessage(*transaction, in_session, false);
1149
1148
 
1150
1149
    /* Set the transaction ID to match the previous messages */
1151
1150
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
 
    transaction->set_segment_id(seg_id);
1153
 
    transaction->set_end_segment(true);
1154
1151
 
1155
1152
    message::Statement *statement= transaction->add_statement();
1156
1153
 
1157
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
 
    finalizeStatementMessage(*statement, session);
 
1154
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1155
    finalizeStatementMessage(*statement, in_session);
1159
1156
 
1160
 
    finalizeTransactionMessage(*transaction, session);
 
1157
    finalizeTransactionMessage(*transaction, in_session);
1161
1158
    
1162
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
 
1159
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1163
1160
  }
1164
 
 
1165
 
  cleanupTransactionMessage(transaction, session);
 
1161
  cleanupTransactionMessage(transaction, in_session);
1166
1162
}
1167
1163
 
1168
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
 
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1169
1165
{
1170
1166
  ReplicationServices &replication_services= ReplicationServices::singleton();
1171
1167
  if (! replication_services.isActive())
1172
1168
    return;
1173
1169
 
1174
 
  message::Statement *current_statement= session.getStatementMessage();
 
1170
  message::Statement *current_statement= in_session->getStatementMessage();
1175
1171
 
1176
1172
  /* If we never added a Statement message, nothing to undo. */
1177
1173
  if (current_statement == NULL)
1210
1206
   * Remove the Statement message we've been working with (same as
1211
1207
   * current_statement).
1212
1208
   */
1213
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1209
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1214
1210
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
1211
  statements_in_txn= transaction->mutable_statement();
1216
1212
  statements_in_txn->RemoveLast();
1217
 
  session.setStatementMessage(NULL);
 
1213
  in_session->setStatementMessage(NULL);
1218
1214
  
1219
1215
  /*
1220
1216
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1226
1222
    current_statement= transaction->add_statement();
1227
1223
    initStatementMessage(*current_statement,
1228
1224
                         message::Statement::ROLLBACK_STATEMENT,
1229
 
                         session);
1230
 
    finalizeStatementMessage(*current_statement, session);
 
1225
                         in_session);
 
1226
    finalizeStatementMessage(*current_statement, in_session);
1231
1227
  }
1232
1228
}
1233
 
 
1234
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
 
                                                                     message::Transaction *transaction)
1236
 
{
1237
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
 
  uint32_t seg_id= transaction->segment_id();
1239
 
  
1240
 
  transaction->set_end_segment(false);
1241
 
  commitTransactionMessage(session);
1242
 
  transaction= getActiveTransactionMessage(session, false);
1243
 
  
1244
 
  /* Set the transaction ID to match the previous messages */
1245
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
 
  transaction->set_segment_id(seg_id + 1);
1247
 
  transaction->set_end_segment(true);
1248
 
 
1249
 
  return transaction;
1250
 
}
1251
 
 
1252
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1253
 
                                                            Table &table,
 
1229
  
 
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1231
                                                            Table *in_table,
1254
1232
                                                            uint32_t *next_segment_id)
1255
1233
{
1256
 
  message::Statement *statement= session.getStatementMessage();
 
1234
  message::Statement *statement= in_session->getStatementMessage();
1257
1235
  message::Transaction *transaction= NULL;
1258
 
  
1259
 
  /*
1260
 
   * If statement is NULL, this is a new statement.
1261
 
   * If statement is NOT NULL, this a continuation of the same statement.
1262
 
   * This is because autocommitOrRollback() finalizes the statement so that
1263
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1264
 
   * share a single GPB message for multiple statements).
 
1236
 
 
1237
  /* 
 
1238
   * Check the type for the current Statement message, if it is anything
 
1239
   * other then INSERT we need to call finalize, this will ensure a 
 
1240
   * new InsertStatement is created. If it is of type INSERT check
 
1241
   * what table the INSERT belongs to, if it is a different table
 
1242
   * call finalize, so a new InsertStatement can be created. 
1265
1243
   */
1266
 
  if (statement == NULL)
1267
 
  {
1268
 
    transaction= getActiveTransactionMessage(session);
1269
 
 
1270
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1271
 
        transaction_message_threshold)
1272
 
    {
1273
 
      transaction= segmentTransactionMessage(session, transaction);
1274
 
    }
1275
 
 
1276
 
    statement= transaction->add_statement();
1277
 
    setInsertHeader(*statement, session, table);
1278
 
    session.setStatementMessage(statement);
1279
 
  }
1280
 
  else
1281
 
  {
1282
 
    transaction= getActiveTransactionMessage(session);
1283
 
    
 
1244
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1245
  {
 
1246
    finalizeStatementMessage(*statement, in_session);
 
1247
    statement= in_session->getStatementMessage();
 
1248
  } 
 
1249
  else if (statement != NULL)
 
1250
  {
 
1251
    transaction= getActiveTransactionMessage(in_session);
 
1252
 
1284
1253
    /*
1285
1254
     * If we've passed our threshold for the statement size (possible for
1286
1255
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1287
1256
     * the Transaction will keep it from getting huge).
1288
1257
     */
1289
1258
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1290
 
        transaction_message_threshold)
 
1259
      in_session->variables.transaction_message_threshold)
1291
1260
    {
1292
1261
      /* Remember the transaction ID so we can re-use it */
1293
1262
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
 
      uint32_t seg_id= transaction->segment_id();
1295
 
      
 
1263
 
1296
1264
      message::InsertData *current_data= statement->mutable_insert_data();
1297
 
      
 
1265
 
1298
1266
      /* Caller should use this value when adding a new record */
1299
1267
      *next_segment_id= current_data->segment_id() + 1;
1300
 
      
 
1268
 
1301
1269
      current_data->set_end_segment(false);
1302
 
      transaction->set_end_segment(false);
1303
 
      
 
1270
 
1304
1271
      /* 
1305
1272
       * Send the trx message to replicators after finalizing the 
1306
1273
       * statement and transaction. This will also set the Transaction
1307
1274
       * and Statement objects in Session to NULL.
1308
1275
       */
1309
 
      commitTransactionMessage(session);
1310
 
      
 
1276
      commitTransactionMessage(in_session);
 
1277
 
1311
1278
      /*
1312
1279
       * Statement and Transaction should now be NULL, so new ones will get
1313
1280
       * created. We reuse the transaction id since we are segmenting
1314
1281
       * one transaction.
1315
1282
       */
1316
 
      transaction= getActiveTransactionMessage(session, false);
 
1283
      statement= in_session->getStatementMessage();
 
1284
      transaction= getActiveTransactionMessage(in_session, false);
1317
1285
      assert(transaction != NULL);
1318
1286
 
1319
 
      statement= transaction->add_statement();
1320
 
      setInsertHeader(*statement, session, table);
1321
 
      session.setStatementMessage(statement);
1322
 
            
1323
1287
      /* Set the transaction ID to match the previous messages */
1324
1288
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
 
      transaction->set_segment_id(seg_id + 1);
1326
 
      transaction->set_end_segment(true);
1327
1289
    }
1328
1290
    else
1329
1291
    {
1330
 
      /*
1331
 
       * Continuation of the same statement. Carry forward the existing
1332
 
       * segment id.
1333
 
       */
1334
 
      const message::InsertData &current_data= statement->insert_data();
1335
 
      *next_segment_id= current_data.segment_id();
 
1292
      const message::InsertHeader &insert_header= statement->insert_header();
 
1293
      string old_table_name= insert_header.table_metadata().table_name();
 
1294
     
 
1295
      string current_table_name;
 
1296
      (void) in_table->getShare()->getTableName(current_table_name);
 
1297
 
 
1298
      if (current_table_name.compare(old_table_name))
 
1299
      {
 
1300
        finalizeStatementMessage(*statement, in_session);
 
1301
        statement= in_session->getStatementMessage();
 
1302
      }
 
1303
      else
 
1304
      {
 
1305
        /* carry forward the existing segment id */
 
1306
        const message::InsertData &current_data= statement->insert_data();
 
1307
        *next_segment_id= current_data.segment_id();
 
1308
      }
1336
1309
    }
 
1310
  } 
 
1311
 
 
1312
  if (statement == NULL)
 
1313
  {
 
1314
    /*
 
1315
     * Transaction will be non-NULL only if we had to segment it due to
 
1316
     * transaction size above.
 
1317
     */
 
1318
    if (transaction == NULL)
 
1319
      transaction= getActiveTransactionMessage(in_session);
 
1320
 
 
1321
    /* 
 
1322
     * Transaction message initialized and set, but no statement created
 
1323
     * yet.  We construct one and initialize it, here, then return the
 
1324
     * message after attaching the new Statement message pointer to the 
 
1325
     * Session for easy retrieval later...
 
1326
     */
 
1327
    statement= transaction->add_statement();
 
1328
    setInsertHeader(*statement, in_session, in_table);
 
1329
    in_session->setStatementMessage(statement);
1337
1330
  }
1338
 
  
1339
1331
  return *statement;
1340
1332
}
1341
1333
 
1342
1334
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
 
                                          Session::const_reference session,
1344
 
                                          Table &table)
 
1335
                                          Session *in_session,
 
1336
                                          Table *in_table)
1345
1337
{
1346
 
  initStatementMessage(statement, message::Statement::INSERT, session);
 
1338
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1347
1339
 
1348
1340
  /* 
1349
1341
   * Now we construct the specialized InsertHeader message inside
1354
1346
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1355
1347
 
1356
1348
  string schema_name;
1357
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1349
  (void) in_table->getShare()->getSchemaName(schema_name);
1358
1350
  string table_name;
1359
 
  (void) table.getShare()->getTableName(table_name);
 
1351
  (void) in_table->getShare()->getTableName(table_name);
1360
1352
 
1361
1353
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1362
1354
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1363
1355
 
1364
1356
  Field *current_field;
1365
 
  Field **table_fields= table.getFields();
 
1357
  Field **table_fields= in_table->getFields();
1366
1358
 
1367
1359
  message::FieldMetadata *field_metadata;
1368
1360
 
1369
1361
  /* We will read all the table's fields... */
1370
 
  table.setReadSet();
 
1362
  in_table->setReadSet();
1371
1363
 
1372
1364
  while ((current_field= *table_fields++) != NULL) 
1373
1365
  {
1377
1369
  }
1378
1370
}
1379
1371
 
1380
 
bool TransactionServices::insertRecord(Session::reference session,
1381
 
                                       Table &table)
 
1372
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1382
1373
{
1383
1374
  ReplicationServices &replication_services= ReplicationServices::singleton();
1384
1375
  if (! replication_services.isActive())
1385
1376
    return false;
1386
 
 
1387
1377
  /**
1388
1378
   * We do this check here because we don't want to even create a 
1389
1379
   * statement if there isn't a primary key on the table...
1392
1382
   *
1393
1383
   * Multi-column primary keys are handled how exactly?
1394
1384
   */
1395
 
  if (not table.getShare()->hasPrimaryKey())
 
1385
  if (not in_table->getShare()->hasPrimaryKey())
1396
1386
  {
1397
1387
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1398
1388
    return true;
1399
1389
  }
1400
1390
 
1401
1391
  uint32_t next_segment_id= 1;
1402
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
 
1392
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1403
1393
 
1404
1394
  message::InsertData *data= statement.mutable_insert_data();
1405
1395
  data->set_segment_id(next_segment_id);
1407
1397
  message::InsertRecord *record= data->add_record();
1408
1398
 
1409
1399
  Field *current_field;
1410
 
  Field **table_fields= table.getFields();
 
1400
  Field **table_fields= in_table->getFields();
1411
1401
 
1412
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1402
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1413
1403
  string_value->set_charset(system_charset_info);
1414
1404
 
1415
1405
  /* We will read all the table's fields... */
1416
 
  table.setReadSet();
 
1406
  in_table->setReadSet();
1417
1407
 
1418
1408
  while ((current_field= *table_fields++) != NULL) 
1419
1409
  {
1433
1423
  return false;
1434
1424
}
1435
1425
 
1436
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1437
 
                                                            Table &table,
 
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1427
                                                            Table *in_table,
1438
1428
                                                            const unsigned char *old_record, 
1439
1429
                                                            const unsigned char *new_record,
1440
1430
                                                            uint32_t *next_segment_id)
1441
1431
{
1442
 
  message::Statement *statement= session.getStatementMessage();
 
1432
  message::Statement *statement= in_session->getStatementMessage();
1443
1433
  message::Transaction *transaction= NULL;
1444
1434
 
1445
1435
  /*
1446
 
   * If statement is NULL, this is a new statement.
1447
 
   * If statement is NOT NULL, this a continuation of the same statement.
1448
 
   * This is because autocommitOrRollback() finalizes the statement so that
1449
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1450
 
   * share a single GPB message for multiple statements).
 
1436
   * Check the type for the current Statement message, if it is anything
 
1437
   * other then UPDATE we need to call finalize, this will ensure a
 
1438
   * new UpdateStatement is created. If it is of type UPDATE check
 
1439
   * what table the UPDATE belongs to, if it is a different table
 
1440
   * call finalize, so a new UpdateStatement can be created.
1451
1441
   */
1452
 
  if (statement == NULL)
 
1442
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1453
1443
  {
1454
 
    transaction= getActiveTransactionMessage(session);
1455
 
    
1456
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
 
        transaction_message_threshold)
1458
 
    {
1459
 
      transaction= segmentTransactionMessage(session, transaction);
1460
 
    }
1461
 
    
1462
 
    statement= transaction->add_statement();
1463
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1464
 
    session.setStatementMessage(statement);
 
1444
    finalizeStatementMessage(*statement, in_session);
 
1445
    statement= in_session->getStatementMessage();
1465
1446
  }
1466
 
  else
 
1447
  else if (statement != NULL)
1467
1448
  {
1468
 
    transaction= getActiveTransactionMessage(session);
1469
 
    
 
1449
    transaction= getActiveTransactionMessage(in_session);
 
1450
 
1470
1451
    /*
1471
1452
     * If we've passed our threshold for the statement size (possible for
1472
1453
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1473
1454
     * the Transaction will keep it from getting huge).
1474
1455
     */
1475
1456
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1476
 
        transaction_message_threshold)
 
1457
      in_session->variables.transaction_message_threshold)
1477
1458
    {
1478
1459
      /* Remember the transaction ID so we can re-use it */
1479
1460
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
 
      uint32_t seg_id= transaction->segment_id();
1481
 
      
 
1461
 
1482
1462
      message::UpdateData *current_data= statement->mutable_update_data();
1483
 
      
 
1463
 
1484
1464
      /* Caller should use this value when adding a new record */
1485
1465
      *next_segment_id= current_data->segment_id() + 1;
1486
 
      
 
1466
 
1487
1467
      current_data->set_end_segment(false);
1488
 
      transaction->set_end_segment(false);
1489
 
      
1490
 
      /* 
 
1468
 
 
1469
      /*
1491
1470
       * Send the trx message to replicators after finalizing the 
1492
1471
       * statement and transaction. This will also set the Transaction
1493
1472
       * and Statement objects in Session to NULL.
1494
1473
       */
1495
 
      commitTransactionMessage(session);
1496
 
      
 
1474
      commitTransactionMessage(in_session);
 
1475
 
1497
1476
      /*
1498
1477
       * Statement and Transaction should now be NULL, so new ones will get
1499
1478
       * created. We reuse the transaction id since we are segmenting
1500
1479
       * one transaction.
1501
1480
       */
1502
 
      transaction= getActiveTransactionMessage(session, false);
 
1481
      statement= in_session->getStatementMessage();
 
1482
      transaction= getActiveTransactionMessage(in_session, false);
1503
1483
      assert(transaction != NULL);
1504
 
      
1505
 
      statement= transaction->add_statement();
1506
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1507
 
      session.setStatementMessage(statement);
1508
 
      
 
1484
 
1509
1485
      /* Set the transaction ID to match the previous messages */
1510
1486
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
 
      transaction->set_segment_id(seg_id + 1);
1512
 
      transaction->set_end_segment(true);
1513
1487
    }
1514
1488
    else
1515
1489
    {
1516
 
      /*
1517
 
       * Continuation of the same statement. Carry forward the existing
1518
 
       * segment id.
1519
 
       */
1520
 
      const message::UpdateData &current_data= statement->update_data();
1521
 
      *next_segment_id= current_data.segment_id();
 
1490
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1491
      {
 
1492
        /* carry forward the existing segment id */
 
1493
        const message::UpdateData &current_data= statement->update_data();
 
1494
        *next_segment_id= current_data.segment_id();
 
1495
      } 
 
1496
      else 
 
1497
      {
 
1498
        finalizeStatementMessage(*statement, in_session);
 
1499
        statement= in_session->getStatementMessage();
 
1500
      }
1522
1501
    }
1523
1502
  }
1524
 
  
 
1503
 
 
1504
  if (statement == NULL)
 
1505
  {
 
1506
    /*
 
1507
     * Transaction will be non-NULL only if we had to segment it due to
 
1508
     * transaction size above.
 
1509
     */
 
1510
    if (transaction == NULL)
 
1511
      transaction= getActiveTransactionMessage(in_session);
 
1512
 
 
1513
    /* 
 
1514
     * Transaction message initialized and set, but no statement created
 
1515
     * yet.  We construct one and initialize it, here, then return the
 
1516
     * message after attaching the new Statement message pointer to the 
 
1517
     * Session for easy retrieval later...
 
1518
     */
 
1519
    statement= transaction->add_statement();
 
1520
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1521
    in_session->setStatementMessage(statement);
 
1522
  }
1525
1523
  return *statement;
1526
1524
}
1527
1525
 
 
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1527
                                                  Table *in_table,
 
1528
                                                  const unsigned char *old_record,
 
1529
                                                  const unsigned char *new_record)
 
1530
{
 
1531
  const message::UpdateHeader &update_header= statement.update_header();
 
1532
  string old_table_name= update_header.table_metadata().table_name();
 
1533
 
 
1534
  string current_table_name;
 
1535
  (void) in_table->getShare()->getTableName(current_table_name);
 
1536
  if (current_table_name.compare(old_table_name))
 
1537
  {
 
1538
    return false;
 
1539
  }
 
1540
  else
 
1541
  {
 
1542
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1543
     * match the updated fields in the new record, if they do not we must
 
1544
     * create a new UpdateHeader 
 
1545
     */
 
1546
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1547
 
 
1548
    Field *current_field;
 
1549
    Field **table_fields= in_table->getFields();
 
1550
    in_table->setReadSet();
 
1551
 
 
1552
    size_t num_calculated_updated_fields= 0;
 
1553
    bool found= false;
 
1554
    while ((current_field= *table_fields++) != NULL)
 
1555
    {
 
1556
      if (num_calculated_updated_fields > num_set_fields)
 
1557
      {
 
1558
        break;
 
1559
      }
 
1560
 
 
1561
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1562
      {
 
1563
        /* check that this field exists in the UpdateHeader record */
 
1564
        found= false;
 
1565
 
 
1566
        for (size_t x= 0; x < num_set_fields; ++x)
 
1567
        {
 
1568
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1569
          string name= field_metadata.name();
 
1570
          if (name.compare(current_field->field_name) == 0)
 
1571
          {
 
1572
            found= true;
 
1573
            ++num_calculated_updated_fields;
 
1574
            break;
 
1575
          } 
 
1576
        }
 
1577
        if (! found)
 
1578
        {
 
1579
          break;
 
1580
        } 
 
1581
      }
 
1582
    }
 
1583
 
 
1584
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1585
    {
 
1586
      return true;
 
1587
    } 
 
1588
    else 
 
1589
    {
 
1590
      return false;
 
1591
    }
 
1592
  }
 
1593
}  
 
1594
 
1528
1595
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
 
                                          Session::const_reference session,
1530
 
                                          Table &table,
 
1596
                                          Session *in_session,
 
1597
                                          Table *in_table,
1531
1598
                                          const unsigned char *old_record, 
1532
1599
                                          const unsigned char *new_record)
1533
1600
{
1534
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
 
1601
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1535
1602
 
1536
1603
  /* 
1537
1604
   * Now we construct the specialized UpdateHeader message inside
1542
1609
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1543
1610
 
1544
1611
  string schema_name;
1545
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1612
  (void) in_table->getShare()->getSchemaName(schema_name);
1546
1613
  string table_name;
1547
 
  (void) table.getShare()->getTableName(table_name);
 
1614
  (void) in_table->getShare()->getTableName(table_name);
1548
1615
 
1549
1616
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1550
1617
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1551
1618
 
1552
1619
  Field *current_field;
1553
 
  Field **table_fields= table.getFields();
 
1620
  Field **table_fields= in_table->getFields();
1554
1621
 
1555
1622
  message::FieldMetadata *field_metadata;
1556
1623
 
1557
1624
  /* We will read all the table's fields... */
1558
 
  table.setReadSet();
 
1625
  in_table->setReadSet();
1559
1626
 
1560
1627
  while ((current_field= *table_fields++) != NULL) 
1561
1628
  {
1563
1630
     * We add the "key field metadata" -- i.e. the fields which is
1564
1631
     * the primary key for the table.
1565
1632
     */
1566
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1633
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1567
1634
    {
1568
1635
      field_metadata= header->add_key_field_metadata();
1569
1636
      field_metadata->set_name(current_field->field_name);
1570
1637
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1571
1638
    }
1572
1639
 
1573
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1640
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1574
1641
    {
1575
1642
      /* Field is changed from old to new */
1576
1643
      field_metadata= header->add_set_field_metadata();
1579
1646
    }
1580
1647
  }
1581
1648
}
1582
 
void TransactionServices::updateRecord(Session::reference session,
1583
 
                                       Table &table, 
 
1649
void TransactionServices::updateRecord(Session *in_session,
 
1650
                                       Table *in_table, 
1584
1651
                                       const unsigned char *old_record, 
1585
1652
                                       const unsigned char *new_record)
1586
1653
{
1589
1656
    return;
1590
1657
 
1591
1658
  uint32_t next_segment_id= 1;
1592
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
 
1659
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1593
1660
 
1594
1661
  message::UpdateData *data= statement.mutable_update_data();
1595
1662
  data->set_segment_id(next_segment_id);
1597
1664
  message::UpdateRecord *record= data->add_record();
1598
1665
 
1599
1666
  Field *current_field;
1600
 
  Field **table_fields= table.getFields();
1601
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1667
  Field **table_fields= in_table->getFields();
 
1668
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1602
1669
  string_value->set_charset(system_charset_info);
1603
1670
 
1604
1671
  while ((current_field= *table_fields++) != NULL) 
1614
1681
     *
1615
1682
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1616
1683
     */
1617
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1684
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1618
1685
    {
1619
1686
      /* Store the original "read bit" for this field */
1620
1687
      bool is_read_set= current_field->isReadSet();
1621
1688
 
1622
1689
      /* We need to mark that we will "read" this field... */
1623
 
      table.setReadSet(current_field->position());
 
1690
      in_table->setReadSet(current_field->position());
1624
1691
 
1625
1692
      /* Read the string value of this field's contents */
1626
1693
      string_value= current_field->val_str_internal(string_value);
1649
1716
     * primary key field value.  Replication only supports tables
1650
1717
     * with a primary key.
1651
1718
     */
1652
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1719
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1653
1720
    {
1654
1721
      /**
1655
1722
       * To say the below is ugly is an understatement. But it works.
1667
1734
}
1668
1735
 
1669
1736
bool TransactionServices::isFieldUpdated(Field *current_field,
1670
 
                                         Table &table,
 
1737
                                         Table *in_table,
1671
1738
                                         const unsigned char *old_record,
1672
1739
                                         const unsigned char *new_record)
1673
1740
{
1676
1743
   * we do this crazy pointer fiddling to figure out if the current field
1677
1744
   * has been updated in the supplied record raw byte pointers.
1678
1745
   */
1679
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1680
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1746
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1747
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1681
1748
 
1682
1749
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1683
1750
 
1707
1774
  return isUpdated;
1708
1775
}  
1709
1776
 
1710
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1711
 
                                                            Table &table,
 
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1778
                                                            Table *in_table,
1712
1779
                                                            uint32_t *next_segment_id)
1713
1780
{
1714
 
  message::Statement *statement= session.getStatementMessage();
 
1781
  message::Statement *statement= in_session->getStatementMessage();
1715
1782
  message::Transaction *transaction= NULL;
1716
1783
 
1717
1784
  /*
1718
 
   * If statement is NULL, this is a new statement.
1719
 
   * If statement is NOT NULL, this a continuation of the same statement.
1720
 
   * This is because autocommitOrRollback() finalizes the statement so that
1721
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1722
 
   * share a single GPB message for multiple statements).
 
1785
   * Check the type for the current Statement message, if it is anything
 
1786
   * other then DELETE we need to call finalize, this will ensure a
 
1787
   * new DeleteStatement is created. If it is of type DELETE check
 
1788
   * what table the DELETE belongs to, if it is a different table
 
1789
   * call finalize, so a new DeleteStatement can be created.
1723
1790
   */
1724
 
  if (statement == NULL)
 
1791
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1725
1792
  {
1726
 
    transaction= getActiveTransactionMessage(session);
1727
 
    
1728
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1729
 
        transaction_message_threshold)
1730
 
    {
1731
 
      transaction= segmentTransactionMessage(session, transaction);
1732
 
    }
1733
 
    
1734
 
    statement= transaction->add_statement();
1735
 
    setDeleteHeader(*statement, session, table);
1736
 
    session.setStatementMessage(statement);
 
1793
    finalizeStatementMessage(*statement, in_session);
 
1794
    statement= in_session->getStatementMessage();
1737
1795
  }
1738
 
  else
 
1796
  else if (statement != NULL)
1739
1797
  {
1740
 
    transaction= getActiveTransactionMessage(session);
1741
 
    
 
1798
    transaction= getActiveTransactionMessage(in_session);
 
1799
 
1742
1800
    /*
1743
1801
     * If we've passed our threshold for the statement size (possible for
1744
1802
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1745
1803
     * the Transaction will keep it from getting huge).
1746
1804
     */
1747
1805
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1748
 
        transaction_message_threshold)
 
1806
      in_session->variables.transaction_message_threshold)
1749
1807
    {
1750
1808
      /* Remember the transaction ID so we can re-use it */
1751
1809
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
 
      uint32_t seg_id= transaction->segment_id();
1753
 
      
 
1810
 
1754
1811
      message::DeleteData *current_data= statement->mutable_delete_data();
1755
 
      
 
1812
 
1756
1813
      /* Caller should use this value when adding a new record */
1757
1814
      *next_segment_id= current_data->segment_id() + 1;
1758
 
      
 
1815
 
1759
1816
      current_data->set_end_segment(false);
1760
 
      transaction->set_end_segment(false);
1761
 
      
 
1817
 
1762
1818
      /* 
1763
1819
       * Send the trx message to replicators after finalizing the 
1764
1820
       * statement and transaction. This will also set the Transaction
1765
1821
       * and Statement objects in Session to NULL.
1766
1822
       */
1767
 
      commitTransactionMessage(session);
1768
 
      
 
1823
      commitTransactionMessage(in_session);
 
1824
 
1769
1825
      /*
1770
1826
       * Statement and Transaction should now be NULL, so new ones will get
1771
1827
       * created. We reuse the transaction id since we are segmenting
1772
1828
       * one transaction.
1773
1829
       */
1774
 
      transaction= getActiveTransactionMessage(session, false);
 
1830
      statement= in_session->getStatementMessage();
 
1831
      transaction= getActiveTransactionMessage(in_session, false);
1775
1832
      assert(transaction != NULL);
1776
 
      
1777
 
      statement= transaction->add_statement();
1778
 
      setDeleteHeader(*statement, session, table);
1779
 
      session.setStatementMessage(statement);
1780
 
      
 
1833
 
1781
1834
      /* Set the transaction ID to match the previous messages */
1782
1835
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
 
      transaction->set_segment_id(seg_id + 1);
1784
 
      transaction->set_end_segment(true);
1785
1836
    }
1786
1837
    else
1787
1838
    {
1788
 
      /*
1789
 
       * Continuation of the same statement. Carry forward the existing
1790
 
       * segment id.
1791
 
       */
1792
 
      const message::DeleteData &current_data= statement->delete_data();
1793
 
      *next_segment_id= current_data.segment_id();
 
1839
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1840
      string old_table_name= delete_header.table_metadata().table_name();
 
1841
 
 
1842
      string current_table_name;
 
1843
      (void) in_table->getShare()->getTableName(current_table_name);
 
1844
      if (current_table_name.compare(old_table_name))
 
1845
      {
 
1846
        finalizeStatementMessage(*statement, in_session);
 
1847
        statement= in_session->getStatementMessage();
 
1848
      }
 
1849
      else
 
1850
      {
 
1851
        /* carry forward the existing segment id */
 
1852
        const message::DeleteData &current_data= statement->delete_data();
 
1853
        *next_segment_id= current_data.segment_id();
 
1854
      }
1794
1855
    }
1795
1856
  }
1796
 
  
 
1857
 
 
1858
  if (statement == NULL)
 
1859
  {
 
1860
    /*
 
1861
     * Transaction will be non-NULL only if we had to segment it due to
 
1862
     * transaction size above.
 
1863
     */
 
1864
    if (transaction == NULL)
 
1865
      transaction= getActiveTransactionMessage(in_session);
 
1866
 
 
1867
    /* 
 
1868
     * Transaction message initialized and set, but no statement created
 
1869
     * yet.  We construct one and initialize it, here, then return the
 
1870
     * message after attaching the new Statement message pointer to the 
 
1871
     * Session for easy retrieval later...
 
1872
     */
 
1873
    statement= transaction->add_statement();
 
1874
    setDeleteHeader(*statement, in_session, in_table);
 
1875
    in_session->setStatementMessage(statement);
 
1876
  }
1797
1877
  return *statement;
1798
1878
}
1799
1879
 
1800
1880
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
 
                                          Session::const_reference session,
1802
 
                                          Table &table)
 
1881
                                          Session *in_session,
 
1882
                                          Table *in_table)
1803
1883
{
1804
 
  initStatementMessage(statement, message::Statement::DELETE, session);
 
1884
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1805
1885
 
1806
1886
  /* 
1807
1887
   * Now we construct the specialized DeleteHeader message inside
1811
1891
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1812
1892
 
1813
1893
  string schema_name;
1814
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1894
  (void) in_table->getShare()->getSchemaName(schema_name);
1815
1895
  string table_name;
1816
 
  (void) table.getShare()->getTableName(table_name);
 
1896
  (void) in_table->getShare()->getTableName(table_name);
1817
1897
 
1818
1898
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1819
1899
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1820
1900
 
1821
1901
  Field *current_field;
1822
 
  Field **table_fields= table.getFields();
 
1902
  Field **table_fields= in_table->getFields();
1823
1903
 
1824
1904
  message::FieldMetadata *field_metadata;
1825
1905
 
1830
1910
     * primary key field value.  Replication only supports tables
1831
1911
     * with a primary key.
1832
1912
     */
1833
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1913
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1834
1914
    {
1835
1915
      field_metadata= header->add_key_field_metadata();
1836
1916
      field_metadata->set_name(current_field->field_name);
1839
1919
  }
1840
1920
}
1841
1921
 
1842
 
void TransactionServices::deleteRecord(Session::reference session,
1843
 
                                       Table &table,
1844
 
                                       bool use_update_record)
 
1922
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1845
1923
{
1846
1924
  ReplicationServices &replication_services= ReplicationServices::singleton();
1847
1925
  if (! replication_services.isActive())
1848
1926
    return;
1849
1927
 
1850
1928
  uint32_t next_segment_id= 1;
1851
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
 
1929
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1852
1930
 
1853
1931
  message::DeleteData *data= statement.mutable_delete_data();
1854
1932
  data->set_segment_id(next_segment_id);
1856
1934
  message::DeleteRecord *record= data->add_record();
1857
1935
 
1858
1936
  Field *current_field;
1859
 
  Field **table_fields= table.getFields();
1860
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1937
  Field **table_fields= in_table->getFields();
 
1938
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
1939
  string_value->set_charset(system_charset_info);
1862
1940
 
1863
1941
  while ((current_field= *table_fields++) != NULL) 
1867
1945
     * primary key field value.  Replication only supports tables
1868
1946
     * with a primary key.
1869
1947
     */
1870
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1948
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1871
1949
    {
1872
1950
      if (use_update_record)
1873
1951
      {
1879
1957
         * We are careful not to change anything in old_ptr.
1880
1958
         */
1881
1959
        const unsigned char *old_ptr= current_field->ptr;
1882
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
 
1960
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1883
1961
        string_value= current_field->val_str_internal(string_value);
1884
1962
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1885
1963
      }
1896
1974
  }
1897
1975
}
1898
1976
 
1899
 
void TransactionServices::createTable(Session::reference session,
 
1977
void TransactionServices::createTable(Session *in_session,
1900
1978
                                      const message::Table &table)
1901
1979
{
1902
1980
  ReplicationServices &replication_services= ReplicationServices::singleton();
1903
1981
  if (! replication_services.isActive())
1904
1982
    return;
1905
1983
  
1906
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1984
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1907
1985
  message::Statement *statement= transaction->add_statement();
1908
1986
 
1909
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
 
1987
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1910
1988
 
1911
1989
  /* 
1912
1990
   * Construct the specialized CreateTableStatement message and attach
1916
1994
  message::Table *new_table_message= create_table_statement->mutable_table();
1917
1995
  *new_table_message= table;
1918
1996
 
1919
 
  finalizeStatementMessage(*statement, session);
 
1997
  finalizeStatementMessage(*statement, in_session);
1920
1998
 
1921
 
  finalizeTransactionMessage(*transaction, session);
 
1999
  finalizeTransactionMessage(*transaction, in_session);
1922
2000
  
1923
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2001
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1924
2002
 
1925
 
  cleanupTransactionMessage(transaction, session);
 
2003
  cleanupTransactionMessage(transaction, in_session);
1926
2004
 
1927
2005
}
1928
2006
 
1929
 
void TransactionServices::createSchema(Session::reference session,
 
2007
void TransactionServices::createSchema(Session *in_session,
1930
2008
                                       const message::Schema &schema)
1931
2009
{
1932
2010
  ReplicationServices &replication_services= ReplicationServices::singleton();
1933
2011
  if (! replication_services.isActive())
1934
2012
    return;
1935
2013
  
1936
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2014
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1937
2015
  message::Statement *statement= transaction->add_statement();
1938
2016
 
1939
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
 
2017
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1940
2018
 
1941
2019
  /* 
1942
2020
   * Construct the specialized CreateSchemaStatement message and attach
1946
2024
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
2025
  *new_schema_message= schema;
1948
2026
 
1949
 
  finalizeStatementMessage(*statement, session);
 
2027
  finalizeStatementMessage(*statement, in_session);
1950
2028
 
1951
 
  finalizeTransactionMessage(*transaction, session);
 
2029
  finalizeTransactionMessage(*transaction, in_session);
1952
2030
  
1953
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2031
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1954
2032
 
1955
 
  cleanupTransactionMessage(transaction, session);
 
2033
  cleanupTransactionMessage(transaction, in_session);
1956
2034
 
1957
2035
}
1958
2036
 
1959
 
void TransactionServices::dropSchema(Session::reference session,
1960
 
                                     identifier::Schema::const_reference identifier)
 
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1961
2038
{
1962
2039
  ReplicationServices &replication_services= ReplicationServices::singleton();
1963
2040
  if (! replication_services.isActive())
1964
2041
    return;
1965
2042
  
1966
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2043
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1967
2044
  message::Statement *statement= transaction->add_statement();
1968
2045
 
1969
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
 
2046
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1970
2047
 
1971
2048
  /* 
1972
2049
   * Construct the specialized DropSchemaStatement message and attach
1974
2051
   */
1975
2052
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1976
2053
 
1977
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1978
 
 
1979
 
  finalizeStatementMessage(*statement, session);
1980
 
 
1981
 
  finalizeTransactionMessage(*transaction, session);
1982
 
  
1983
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1984
 
 
1985
 
  cleanupTransactionMessage(transaction, session);
1986
 
}
1987
 
 
1988
 
void TransactionServices::alterSchema(Session::reference session,
1989
 
                                      const message::schema::shared_ptr &old_schema,
1990
 
                                      const message::Schema &new_schema)
1991
 
{
1992
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1993
 
  if (! replication_services.isActive())
1994
 
    return;
1995
 
  
1996
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1997
 
  message::Statement *statement= transaction->add_statement();
1998
 
 
1999
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2000
 
 
2001
 
  /* 
2002
 
   * Construct the specialized AlterSchemaStatement message and attach
2003
 
   * it to the generic Statement message
2004
 
   */
2005
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2006
 
 
2007
 
  message::Schema *before= alter_schema_statement->mutable_before();
2008
 
  message::Schema *after= alter_schema_statement->mutable_after();
2009
 
 
2010
 
  *before= *old_schema;
2011
 
  *after= new_schema;
2012
 
 
2013
 
  finalizeStatementMessage(*statement, session);
2014
 
 
2015
 
  finalizeTransactionMessage(*transaction, session);
2016
 
  
2017
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2018
 
 
2019
 
  cleanupTransactionMessage(transaction, session);
2020
 
}
2021
 
 
2022
 
void TransactionServices::dropTable(Session::reference session,
2023
 
                                    const identifier::Table &table,
 
2054
  drop_schema_statement->set_schema_name(schema_name);
 
2055
 
 
2056
  finalizeStatementMessage(*statement, in_session);
 
2057
 
 
2058
  finalizeTransactionMessage(*transaction, in_session);
 
2059
  
 
2060
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2061
 
 
2062
  cleanupTransactionMessage(transaction, in_session);
 
2063
}
 
2064
 
 
2065
void TransactionServices::dropTable(Session *in_session,
 
2066
                                    const string &schema_name,
 
2067
                                    const string &table_name,
2024
2068
                                    bool if_exists)
2025
2069
{
2026
2070
  ReplicationServices &replication_services= ReplicationServices::singleton();
2027
2071
  if (! replication_services.isActive())
2028
2072
    return;
2029
2073
  
2030
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2074
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2031
2075
  message::Statement *statement= transaction->add_statement();
2032
2076
 
2033
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
 
2077
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2034
2078
 
2035
2079
  /* 
2036
2080
   * Construct the specialized DropTableStatement message and attach
2042
2086
 
2043
2087
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2044
2088
 
2045
 
  table_metadata->set_schema_name(table.getSchemaName());
2046
 
  table_metadata->set_table_name(table.getTableName());
2047
 
 
2048
 
  finalizeStatementMessage(*statement, session);
2049
 
 
2050
 
  finalizeTransactionMessage(*transaction, session);
 
2089
  table_metadata->set_schema_name(schema_name);
 
2090
  table_metadata->set_table_name(table_name);
 
2091
 
 
2092
  finalizeStatementMessage(*statement, in_session);
 
2093
 
 
2094
  finalizeTransactionMessage(*transaction, in_session);
2051
2095
  
2052
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2096
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2053
2097
 
2054
 
  cleanupTransactionMessage(transaction, session);
 
2098
  cleanupTransactionMessage(transaction, in_session);
2055
2099
}
2056
2100
 
2057
 
void TransactionServices::truncateTable(Session::reference session,
2058
 
                                        Table &table)
 
2101
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2059
2102
{
2060
2103
  ReplicationServices &replication_services= ReplicationServices::singleton();
2061
2104
  if (! replication_services.isActive())
2062
2105
    return;
2063
2106
  
2064
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2107
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2065
2108
  message::Statement *statement= transaction->add_statement();
2066
2109
 
2067
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
 
2110
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2068
2111
 
2069
2112
  /* 
2070
2113
   * Construct the specialized TruncateTableStatement message and attach
2074
2117
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2075
2118
 
2076
2119
  string schema_name;
2077
 
  (void) table.getShare()->getSchemaName(schema_name);
 
2120
  (void) in_table->getShare()->getSchemaName(schema_name);
2078
2121
  string table_name;
2079
 
  (void) table.getShare()->getTableName(table_name);
 
2122
  (void) in_table->getShare()->getTableName(table_name);
2080
2123
 
2081
2124
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
2125
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2083
2126
 
2084
 
  finalizeStatementMessage(*statement, session);
 
2127
  finalizeStatementMessage(*statement, in_session);
2085
2128
 
2086
 
  finalizeTransactionMessage(*transaction, session);
 
2129
  finalizeTransactionMessage(*transaction, in_session);
2087
2130
  
2088
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2131
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2089
2132
 
2090
 
  cleanupTransactionMessage(transaction, session);
 
2133
  cleanupTransactionMessage(transaction, in_session);
2091
2134
}
2092
2135
 
2093
 
void TransactionServices::rawStatement(Session::reference session,
2094
 
                                       const string &query)
 
2136
void TransactionServices::rawStatement(Session *in_session, const string &query)
2095
2137
{
2096
2138
  ReplicationServices &replication_services= ReplicationServices::singleton();
2097
2139
  if (! replication_services.isActive())
2098
2140
    return;
2099
2141
 
2100
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2142
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2101
2143
  message::Statement *statement= transaction->add_statement();
2102
2144
 
2103
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
 
2145
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2104
2146
  statement->set_sql(query);
2105
 
  finalizeStatementMessage(*statement, session);
 
2147
  finalizeStatementMessage(*statement, in_session);
2106
2148
 
2107
 
  finalizeTransactionMessage(*transaction, session);
 
2149
  finalizeTransactionMessage(*transaction, in_session);
2108
2150
  
2109
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2151
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2110
2152
 
2111
 
  cleanupTransactionMessage(transaction, session);
 
2153
  cleanupTransactionMessage(transaction, in_session);
2112
2154
}
2113
2155
 
2114
 
int TransactionServices::sendEvent(Session::reference session,
2115
 
                                   const message::Event &event)
 
2156
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2116
2157
{
2117
2158
  ReplicationServices &replication_services= ReplicationServices::singleton();
2118
2159
  if (! replication_services.isActive())
2130
2171
 
2131
2172
  trx_event->CopyFrom(event);
2132
2173
 
2133
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
2174
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2134
2175
 
2135
2176
  delete transaction;
2136
2177
 
2137
2178
  return static_cast<int>(result);
2138
2179
}
2139
2180
 
2140
 
bool TransactionServices::sendStartupEvent(Session::reference session)
 
2181
bool TransactionServices::sendStartupEvent(Session *session)
2141
2182
{
2142
2183
  message::Event event;
2143
2184
  event.set_type(message::Event::STARTUP);
2146
2187
  return true;
2147
2188
}
2148
2189
 
2149
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
 
2190
bool TransactionServices::sendShutdownEvent(Session *session)
2150
2191
{
2151
2192
  message::Event event;
2152
2193
  event.set_type(message::Event::SHUTDOWN);