~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: David Shrewsbury
  • Date: 2011-01-20 14:08:19 UTC
  • mto: (2109.1.1 build)
  • mto: This revision was merged to the branch mainline in revision 2110.
  • Revision ID: shrewsbury.dave@gmail.com-20110120140819-ctwi0etrhsaaumud
Initial change to use references to Session in TransactionServices methods rather than pointers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session *session,
 
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session->transaction.stmt;
333
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session.transaction.stmt;
 
333
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session *session,
 
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session->transaction.stmt;
366
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session.transaction.stmt;
 
366
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session->transaction.all;
387
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session.transaction.all;
 
387
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session.server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session->transaction.xid_state.xid.is_null())
404
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
403
  if (session.transaction.xid_state.xid.is_null())
 
404
    session.transaction.xid_state.xid.set(session.getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session->transaction.all;
417
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session.transaction.all;
 
417
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session.server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session->transaction.xid_state.xid.is_null())
434
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
433
  if (session.transaction.xid_state.xid.is_null())
 
434
    session.transaction.xid_state.xid.set(session.getQueryId());
435
435
 
436
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session.getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
457
457
{
458
 
  if (session->getXaId() == 0)
 
458
  if (session.getXaId() == 0)
459
459
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
460
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
461
461
  }
462
462
 
463
 
  return session->getXaId();
 
463
  return session.getXaId();
464
464
}
465
465
 
466
 
/**
467
 
  @retval
468
 
    0   ok
469
 
  @retval
470
 
    1   transaction was rolled back
471
 
  @retval
472
 
    2   error during commit, data may be inconsistent
473
 
 
474
 
  @todo
475
 
    Since we don't support nested statement transactions in 5.0,
476
 
    we can't commit or rollback stmt transactions while we are inside
477
 
    stored functions or triggers. So we simply do nothing now.
478
 
    TODO: This should be fixed in later ( >= 5.1) releases.
479
 
*/
480
 
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
 
466
int TransactionServices::commitTransaction(Session::reference session,
 
467
                                           bool normal_transaction)
481
468
{
482
469
  int error= 0, cookie= 0;
483
470
  /*
484
471
    'all' means that this is either an explicit commit issued by
485
472
    user, or an implicit commit issued by a DDL.
486
473
  */
487
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
474
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
489
476
 
490
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
477
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
491
478
 
492
479
  /*
493
480
    We must not commit the normal transaction if a statement
495
482
    flags will not get propagated to its normal transaction's
496
483
    counterpart.
497
484
  */
498
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
499
 
              trans == &session->transaction.stmt);
 
485
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
486
              trans == &session.transaction.stmt);
500
487
 
501
488
  if (resource_contexts.empty() == false)
502
489
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
490
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
504
491
    {
505
492
      rollbackTransaction(session, normal_transaction);
506
493
      return 1;
531
518
 
532
519
        if (resource->participatesInXaTransaction())
533
520
        {
534
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
 
521
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
535
522
          {
536
523
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
537
524
            error= 1;
538
525
          }
539
526
          else
540
527
          {
541
 
            session->status_var.ha_prepare_count++;
 
528
            session.status_var.ha_prepare_count++;
542
529
          }
543
530
        }
544
531
      }
560
547
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
548
end:
562
549
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
 
550
      session.startWaitingGlobalReadLock();
564
551
  }
565
552
  return error;
566
553
}
569
556
  @note
570
557
  This function does not care about global read lock. A caller should.
571
558
*/
572
 
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
 
559
int TransactionServices::commitPhaseOne(Session::reference session,
 
560
                                        bool normal_transaction)
573
561
{
574
562
  int error=0;
575
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
563
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
576
564
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
577
565
 
578
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
566
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
579
567
 
580
568
  if (resource_contexts.empty() == false)
581
569
  {
590
578
 
591
579
      if (resource->participatesInXaTransaction())
592
580
      {
593
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
 
581
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, normal_transaction)))
594
582
        {
595
583
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
596
584
          error= 1;
597
585
        }
598
586
        else if (normal_transaction)
599
587
        {
600
 
          session->status_var.ha_commit_count++;
 
588
          session.status_var.ha_commit_count++;
601
589
        }
602
590
      }
603
591
      else if (resource->participatesInSqlTransaction())
604
592
      {
605
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
 
593
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, normal_transaction)))
606
594
        {
607
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
608
596
          error= 1;
609
597
        }
610
598
        else if (normal_transaction)
611
599
        {
612
 
          session->status_var.ha_commit_count++;
 
600
          session.status_var.ha_commit_count++;
613
601
        }
614
602
      }
615
603
      resource_context->reset(); /* keep it conveniently zero-filled */
616
604
    }
617
605
 
618
606
    if (is_real_trans)
619
 
      session->transaction.xid_state.xid.null();
 
607
      session.transaction.xid_state.xid.null();
620
608
 
621
609
    if (normal_transaction)
622
610
    {
623
 
      session->variables.tx_isolation= session->session_tx_isolation;
624
 
      session->transaction.cleanup();
 
611
      session.variables.tx_isolation= session.session_tx_isolation;
 
612
      session.transaction.cleanup();
625
613
    }
626
614
  }
627
615
  trans->reset();
628
616
  return error;
629
617
}
630
618
 
631
 
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
 
619
int TransactionServices::rollbackTransaction(Session::reference session,
 
620
                                             bool normal_transaction)
632
621
{
633
622
  int error= 0;
634
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
623
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
635
624
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
636
625
 
637
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
626
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
638
627
 
639
628
  /*
640
629
    We must not rollback the normal transaction if a statement
641
630
    transaction is pending.
642
631
  */
643
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
644
 
              trans == &session->transaction.stmt);
 
632
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
633
              trans == &session.transaction.stmt);
645
634
 
646
635
  if (resource_contexts.empty() == false)
647
636
  {
656
645
 
657
646
      if (resource->participatesInXaTransaction())
658
647
      {
659
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
 
648
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, normal_transaction)))
660
649
        {
661
650
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
651
          error= 1;
663
652
        }
664
653
        else if (normal_transaction)
665
654
        {
666
 
          session->status_var.ha_rollback_count++;
 
655
          session.status_var.ha_rollback_count++;
667
656
        }
668
657
      }
669
658
      else if (resource->participatesInSqlTransaction())
670
659
      {
671
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
 
660
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, normal_transaction)))
672
661
        {
673
662
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
663
          error= 1;
675
664
        }
676
665
        else if (normal_transaction)
677
666
        {
678
 
          session->status_var.ha_rollback_count++;
 
667
          session.status_var.ha_rollback_count++;
679
668
        }
680
669
      }
681
670
      resource_context->reset(); /* keep it conveniently zero-filled */
694
683
      rollbackStatementMessage(session);
695
684
 
696
685
    if (is_real_trans)
697
 
      session->transaction.xid_state.xid.null();
 
686
      session.transaction.xid_state.xid.null();
698
687
    if (normal_transaction)
699
688
    {
700
 
      session->variables.tx_isolation=session->session_tx_isolation;
701
 
      session->transaction.cleanup();
 
689
      session.variables.tx_isolation=session.session_tx_isolation;
 
690
      session.transaction.cleanup();
702
691
    }
703
692
  }
704
693
  if (normal_transaction)
705
 
    session->transaction_rollback_request= false;
 
694
    session.transaction_rollback_request= false;
706
695
 
707
696
  /*
708
697
   * If a non-transactional table was updated, warn the user
709
698
   */
710
699
  if (is_real_trans &&
711
 
      session->transaction.all.hasModifiedNonTransData() &&
712
 
      session->getKilled() != Session::KILL_CONNECTION)
 
700
      session.transaction.all.hasModifiedNonTransData() &&
 
701
      session.getKilled() != Session::KILL_CONNECTION)
713
702
  {
714
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
703
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
704
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
716
705
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
717
706
  }
719
708
  return error;
720
709
}
721
710
 
722
 
/**
723
 
  This is used to commit or rollback a single statement depending on
724
 
  the value of error.
725
 
 
726
 
  @note
727
 
    Note that if the autocommit is on, then the following call inside
728
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
729
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
730
 
    the user has used LOCK TABLES then that mechanism does not know to do the
731
 
    commit.
732
 
*/
733
 
int TransactionServices::autocommitOrRollback(Session *session, int error)
 
711
int TransactionServices::autocommitOrRollback(Session::reference session,
 
712
                                              int error)
734
713
{
735
714
  /* One GPB Statement message per SQL statement */
736
 
  message::Statement *statement= session->getStatementMessage();
 
715
  message::Statement *statement= session.getStatementMessage();
737
716
  if ((statement != NULL) && (! error))
738
717
    finalizeStatementMessage(*statement, session);
739
718
 
740
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
 
719
  if (session.transaction.stmt.getResourceContexts().empty() == false)
741
720
  {
742
 
    TransactionContext *trans = &session->transaction.stmt;
 
721
    TransactionContext *trans = &session.transaction.stmt;
743
722
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
723
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
724
         it != resource_contexts.end();
747
726
    {
748
727
      ResourceContext *resource_context= *it;
749
728
 
750
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
729
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
751
730
    }
752
731
 
753
732
    if (! error)
758
737
    else
759
738
    {
760
739
      (void) rollbackTransaction(session, false);
761
 
      if (session->transaction_rollback_request)
 
740
      if (session.transaction_rollback_request)
762
741
        (void) rollbackTransaction(session, true);
763
742
    }
764
743
 
765
 
    session->variables.tx_isolation= session->session_tx_isolation;
 
744
    session.variables.tx_isolation= session.session_tx_isolation;
766
745
  }
767
746
  return error;
768
747
}
777
756
  }
778
757
};
779
758
 
780
 
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
 
759
int TransactionServices::rollbackToSavepoint(Session::reference session,
 
760
                                             NamedSavepoint &sv)
781
761
{
782
762
  int error= 0;
783
 
  TransactionContext *trans= &session->transaction.all;
 
763
  TransactionContext *trans= &session.transaction.all;
784
764
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
785
765
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
786
766
 
800
780
 
801
781
    if (resource->participatesInSqlTransaction())
802
782
    {
803
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
 
783
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
804
784
      {
805
785
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
806
786
        error= 1;
807
787
      }
808
788
      else
809
789
      {
810
 
        session->status_var.ha_savepoint_rollback_count++;
 
790
        session.status_var.ha_savepoint_rollback_count++;
811
791
      }
812
792
    }
813
793
    trans->no_2pc|= not resource->participatesInXaTransaction();
857
837
 
858
838
      if (resource->participatesInSqlTransaction())
859
839
      {
860
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
 
840
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
861
841
        {
862
842
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
863
843
          error= 1;
864
844
        }
865
845
        else
866
846
        {
867
 
          session->status_var.ha_rollback_count++;
 
847
          session.status_var.ha_rollback_count++;
868
848
        }
869
849
      }
870
850
      resource_context->reset(); /* keep it conveniently zero-filled */
887
867
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
888
868
      if (num_statements == 0)
889
869
      {    
890
 
        session->setStatementMessage(NULL);
 
870
        session.setStatementMessage(NULL);
891
871
      }    
892
872
      else 
893
873
      {
894
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
874
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
895
875
      }    
896
 
      session->setTransactionMessage(savepoint_transaction_copy);
 
876
      session.setTransactionMessage(savepoint_transaction_copy);
897
877
    }
898
878
  }
899
879
 
906
886
  section "4.33.4 SQL-statements and transaction states",
907
887
  NamedSavepoint is *not* transaction-initiating SQL-statement
908
888
*/
909
 
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
 
889
int TransactionServices::setSavepoint(Session::reference session,
 
890
                                      NamedSavepoint &sv)
910
891
{
911
892
  int error= 0;
912
 
  TransactionContext *trans= &session->transaction.all;
 
893
  TransactionContext *trans= &session.transaction.all;
913
894
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
914
895
 
915
896
  if (resource_contexts.empty() == false)
925
906
 
926
907
      if (resource->participatesInSqlTransaction())
927
908
      {
928
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
 
909
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
929
910
        {
930
911
          my_error(ER_GET_ERRNO, MYF(0), err);
931
912
          error= 1;
932
913
        }
933
914
        else
934
915
        {
935
 
          session->status_var.ha_savepoint_count++;
 
916
          session.status_var.ha_savepoint_count++;
936
917
        }
937
918
      }
938
919
    }
944
925
 
945
926
  if (shouldConstructMessages())
946
927
  {
947
 
    message::Transaction *transaction= session->getTransactionMessage();
 
928
    message::Transaction *transaction= session.getTransactionMessage();
948
929
                  
949
930
    if (transaction != NULL)
950
931
    {
957
938
  return error;
958
939
}
959
940
 
960
 
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
 
941
int TransactionServices::releaseSavepoint(Session::reference session,
 
942
                                          NamedSavepoint &sv)
961
943
{
962
944
  int error= 0;
963
945
 
974
956
 
975
957
    if (resource->participatesInSqlTransaction())
976
958
    {
977
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
 
959
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
978
960
      {
979
961
        my_error(ER_GET_ERRNO, MYF(0), err);
980
962
        error= 1;
991
973
  return replication_services.isActive();
992
974
}
993
975
 
994
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
976
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
 
977
                                                                       bool should_inc_trx_id)
995
978
{
996
 
  message::Transaction *transaction= in_session->getTransactionMessage();
 
979
  message::Transaction *transaction= session.getTransactionMessage();
997
980
 
998
981
  if (unlikely(transaction == NULL))
999
982
  {
1003
986
     * deleting transaction message when done with it.
1004
987
     */
1005
988
    transaction= new (nothrow) message::Transaction();
1006
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
 
    in_session->setTransactionMessage(transaction);
 
989
    initTransactionMessage(*transaction, session, should_inc_trx_id);
 
990
    session.setTransactionMessage(transaction);
1008
991
    return transaction;
1009
992
  }
1010
993
  else
1011
994
    return transaction;
1012
995
}
1013
996
 
1014
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
 
                                                 Session *in_session,
 
997
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
 
998
                                                 Session::reference session,
1016
999
                                                 bool should_inc_trx_id)
1017
1000
{
1018
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
 
  trx->set_server_id(in_session->getServerId());
 
1001
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1002
  trx->set_server_id(session.getServerId());
1020
1003
 
1021
1004
  if (should_inc_trx_id)
1022
1005
  {
1023
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
 
    in_session->setXaId(0);
 
1006
    trx->set_transaction_id(getCurrentTransactionId(session));
 
1007
    session.setXaId(0);
1025
1008
  }
1026
1009
  else
1027
1010
  {
1029
1012
    trx->set_transaction_id(0);
1030
1013
  }
1031
1014
 
1032
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1015
  trx->set_start_timestamp(session.getCurrentTimestamp());
1033
1016
  
1034
1017
  /* segment info may get set elsewhere as needed */
1035
 
  in_transaction.set_segment_id(1);
1036
 
  in_transaction.set_end_segment(true);
1037
 
}
1038
 
 
1039
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1040
 
                                              Session *in_session)
1041
 
{
1042
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1043
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1044
 
}
1045
 
 
1046
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1047
 
                                             Session *in_session)
1048
 
{
1049
 
  delete in_transaction;
1050
 
  in_session->setStatementMessage(NULL);
1051
 
  in_session->setTransactionMessage(NULL);
1052
 
  in_session->setXaId(0);
1053
 
}
1054
 
 
1055
 
int TransactionServices::commitTransactionMessage(Session *in_session)
 
1018
  transaction.set_segment_id(1);
 
1019
  transaction.set_end_segment(true);
 
1020
}
 
1021
 
 
1022
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
 
1023
                                                     Session::const_reference session)
 
1024
{
 
1025
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1026
  trx->set_end_timestamp(session.getCurrentTimestamp());
 
1027
}
 
1028
 
 
1029
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
 
1030
                                                    Session::reference session)
 
1031
{
 
1032
  delete transaction;
 
1033
  session.setStatementMessage(NULL);
 
1034
  session.setTransactionMessage(NULL);
 
1035
  session.setXaId(0);
 
1036
}
 
1037
 
 
1038
int TransactionServices::commitTransactionMessage(Session::reference session)
1056
1039
{
1057
1040
  ReplicationServices &replication_services= ReplicationServices::singleton();
1058
1041
  if (! replication_services.isActive())
1062
1045
   * If no Transaction message was ever created, then no data modification
1063
1046
   * occurred inside the transaction, so nothing to do.
1064
1047
   */
1065
 
  if (in_session->getTransactionMessage() == NULL)
 
1048
  if (session.getTransactionMessage() == NULL)
1066
1049
    return 0;
1067
1050
  
1068
1051
  /* If there is an active statement message, finalize it. */
1069
 
  message::Statement *statement= in_session->getStatementMessage();
 
1052
  message::Statement *statement= session.getStatementMessage();
1070
1053
 
1071
1054
  if (statement != NULL)
1072
1055
  {
1073
 
    finalizeStatementMessage(*statement, in_session);
 
1056
    finalizeStatementMessage(*statement, session);
1074
1057
  }
1075
1058
 
1076
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1059
  message::Transaction* transaction= getActiveTransactionMessage(session);
1077
1060
 
1078
1061
  /*
1079
1062
   * It is possible that we could have a Transaction without any Statements
1083
1066
   */
1084
1067
  if (transaction->statement_size() == 0)
1085
1068
  {
1086
 
    cleanupTransactionMessage(transaction, in_session);
 
1069
    cleanupTransactionMessage(transaction, session);
1087
1070
    return 0;
1088
1071
  }
1089
1072
  
1090
 
  finalizeTransactionMessage(*transaction, in_session);
 
1073
  finalizeTransactionMessage(*transaction, session);
1091
1074
  
1092
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1075
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1093
1076
 
1094
 
  cleanupTransactionMessage(transaction, in_session);
 
1077
  cleanupTransactionMessage(transaction, session);
1095
1078
 
1096
1079
  return static_cast<int>(result);
1097
1080
}
1098
1081
 
1099
1082
void TransactionServices::initStatementMessage(message::Statement &statement,
1100
 
                                        message::Statement::Type in_type,
1101
 
                                        Session *in_session)
 
1083
                                               message::Statement::Type type,
 
1084
                                               Session::const_reference session)
1102
1085
{
1103
 
  statement.set_type(in_type);
1104
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1086
  statement.set_type(type);
 
1087
  statement.set_start_timestamp(session.getCurrentTimestamp());
1105
1088
 
1106
 
  if (in_session->variables.replicate_query)
1107
 
    statement.set_sql(in_session->getQueryString()->c_str());
 
1089
  if (session.variables.replicate_query)
 
1090
    statement.set_sql(session.getQueryString()->c_str());
1108
1091
}
1109
1092
 
1110
1093
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1111
 
                                            Session *in_session)
 
1094
                                                   Session::reference session)
1112
1095
{
1113
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1114
 
  in_session->setStatementMessage(NULL);
 
1096
  statement.set_end_timestamp(session.getCurrentTimestamp());
 
1097
  session.setStatementMessage(NULL);
1115
1098
}
1116
1099
 
1117
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1100
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1118
1101
{
1119
1102
  ReplicationServices &replication_services= ReplicationServices::singleton();
1120
1103
  if (! replication_services.isActive())
1121
1104
    return;
1122
1105
  
1123
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1106
  message::Transaction *transaction= getActiveTransactionMessage(session);
1124
1107
 
1125
1108
  /*
1126
1109
   * OK, so there are two situations that we need to deal with here:
1150
1133
     * attach it to the transaction, and push it to replicators.
1151
1134
     */
1152
1135
    transaction->Clear();
1153
 
    initTransactionMessage(*transaction, in_session, false);
 
1136
    initTransactionMessage(*transaction, session, false);
1154
1137
 
1155
1138
    /* Set the transaction ID to match the previous messages */
1156
1139
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1159
1142
 
1160
1143
    message::Statement *statement= transaction->add_statement();
1161
1144
 
1162
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1163
 
    finalizeStatementMessage(*statement, in_session);
 
1145
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
 
1146
    finalizeStatementMessage(*statement, session);
1164
1147
 
1165
 
    finalizeTransactionMessage(*transaction, in_session);
 
1148
    finalizeTransactionMessage(*transaction, session);
1166
1149
    
1167
 
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1150
    (void) replication_services.pushTransactionMessage(session, *transaction);
1168
1151
  }
1169
 
  cleanupTransactionMessage(transaction, in_session);
 
1152
 
 
1153
  cleanupTransactionMessage(transaction, session);
1170
1154
}
1171
1155
 
1172
 
void TransactionServices::rollbackStatementMessage(Session *in_session)
 
1156
void TransactionServices::rollbackStatementMessage(Session::reference session)
1173
1157
{
1174
1158
  ReplicationServices &replication_services= ReplicationServices::singleton();
1175
1159
  if (! replication_services.isActive())
1176
1160
    return;
1177
1161
 
1178
 
  message::Statement *current_statement= in_session->getStatementMessage();
 
1162
  message::Statement *current_statement= session.getStatementMessage();
1179
1163
 
1180
1164
  /* If we never added a Statement message, nothing to undo. */
1181
1165
  if (current_statement == NULL)
1214
1198
   * Remove the Statement message we've been working with (same as
1215
1199
   * current_statement).
1216
1200
   */
1217
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1201
  message::Transaction *transaction= getActiveTransactionMessage(session);
1218
1202
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1219
1203
  statements_in_txn= transaction->mutable_statement();
1220
1204
  statements_in_txn->RemoveLast();
1221
 
  in_session->setStatementMessage(NULL);
 
1205
  session.setStatementMessage(NULL);
1222
1206
  
1223
1207
  /*
1224
1208
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1230
1214
    current_statement= transaction->add_statement();
1231
1215
    initStatementMessage(*current_statement,
1232
1216
                         message::Statement::ROLLBACK_STATEMENT,
1233
 
                         in_session);
1234
 
    finalizeStatementMessage(*current_statement, in_session);
 
1217
                         session);
 
1218
    finalizeStatementMessage(*current_statement, session);
1235
1219
  }
1236
1220
}
1237
1221
 
1238
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session *in_session,
 
1222
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1239
1223
                                                                     message::Transaction *transaction)
1240
1224
{
1241
1225
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1242
1226
  uint32_t seg_id= transaction->segment_id();
1243
1227
  
1244
1228
  transaction->set_end_segment(false);
1245
 
  commitTransactionMessage(in_session);
1246
 
  transaction= getActiveTransactionMessage(in_session, false);
 
1229
  commitTransactionMessage(session);
 
1230
  transaction= getActiveTransactionMessage(session, false);
1247
1231
  
1248
1232
  /* Set the transaction ID to match the previous messages */
1249
1233
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1253
1237
  return transaction;
1254
1238
}
1255
1239
 
1256
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1257
 
                                                            Table *in_table,
 
1240
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
 
1241
                                                            Table &table,
1258
1242
                                                            uint32_t *next_segment_id)
1259
1243
{
1260
 
  message::Statement *statement= in_session->getStatementMessage();
 
1244
  message::Statement *statement= session.getStatementMessage();
1261
1245
  message::Transaction *transaction= NULL;
1262
1246
  
1263
1247
  /*
1269
1253
   */
1270
1254
  if (statement == NULL)
1271
1255
  {
1272
 
    transaction= getActiveTransactionMessage(in_session);
 
1256
    transaction= getActiveTransactionMessage(session);
1273
1257
 
1274
1258
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1275
 
        in_session->variables.transaction_message_threshold)
 
1259
        session.variables.transaction_message_threshold)
1276
1260
    {
1277
 
      transaction= segmentTransactionMessage(in_session, transaction);
 
1261
      transaction= segmentTransactionMessage(session, transaction);
1278
1262
    }
1279
1263
 
1280
1264
    statement= transaction->add_statement();
1281
 
    setInsertHeader(*statement, in_session, in_table);
1282
 
    in_session->setStatementMessage(statement);
 
1265
    setInsertHeader(*statement, session, table);
 
1266
    session.setStatementMessage(statement);
1283
1267
  }
1284
1268
  else
1285
1269
  {
1286
 
    transaction= getActiveTransactionMessage(in_session);
 
1270
    transaction= getActiveTransactionMessage(session);
1287
1271
    
1288
1272
    /*
1289
1273
     * If we've passed our threshold for the statement size (possible for
1291
1275
     * the Transaction will keep it from getting huge).
1292
1276
     */
1293
1277
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1294
 
        in_session->variables.transaction_message_threshold)
 
1278
        session.variables.transaction_message_threshold)
1295
1279
    {
1296
1280
      /* Remember the transaction ID so we can re-use it */
1297
1281
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1310
1294
       * statement and transaction. This will also set the Transaction
1311
1295
       * and Statement objects in Session to NULL.
1312
1296
       */
1313
 
      commitTransactionMessage(in_session);
 
1297
      commitTransactionMessage(session);
1314
1298
      
1315
1299
      /*
1316
1300
       * Statement and Transaction should now be NULL, so new ones will get
1317
1301
       * created. We reuse the transaction id since we are segmenting
1318
1302
       * one transaction.
1319
1303
       */
1320
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1304
      transaction= getActiveTransactionMessage(session, false);
1321
1305
      assert(transaction != NULL);
1322
1306
 
1323
1307
      statement= transaction->add_statement();
1324
 
      setInsertHeader(*statement, in_session, in_table);
1325
 
      in_session->setStatementMessage(statement);
 
1308
      setInsertHeader(*statement, session, table);
 
1309
      session.setStatementMessage(statement);
1326
1310
            
1327
1311
      /* Set the transaction ID to match the previous messages */
1328
1312
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1344
1328
}
1345
1329
 
1346
1330
void TransactionServices::setInsertHeader(message::Statement &statement,
1347
 
                                          Session *in_session,
1348
 
                                          Table *in_table)
 
1331
                                          Session::const_reference session,
 
1332
                                          Table &table)
1349
1333
{
1350
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1334
  initStatementMessage(statement, message::Statement::INSERT, session);
1351
1335
 
1352
1336
  /* 
1353
1337
   * Now we construct the specialized InsertHeader message inside
1358
1342
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1359
1343
 
1360
1344
  string schema_name;
1361
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1345
  (void) table.getShare()->getSchemaName(schema_name);
1362
1346
  string table_name;
1363
 
  (void) in_table->getShare()->getTableName(table_name);
 
1347
  (void) table.getShare()->getTableName(table_name);
1364
1348
 
1365
1349
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1366
1350
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1367
1351
 
1368
1352
  Field *current_field;
1369
 
  Field **table_fields= in_table->getFields();
 
1353
  Field **table_fields= table.getFields();
1370
1354
 
1371
1355
  message::FieldMetadata *field_metadata;
1372
1356
 
1373
1357
  /* We will read all the table's fields... */
1374
 
  in_table->setReadSet();
 
1358
  table.setReadSet();
1375
1359
 
1376
1360
  while ((current_field= *table_fields++) != NULL) 
1377
1361
  {
1381
1365
  }
1382
1366
}
1383
1367
 
1384
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1368
bool TransactionServices::insertRecord(Session::reference session,
 
1369
                                       Table &table)
1385
1370
{
1386
1371
  ReplicationServices &replication_services= ReplicationServices::singleton();
1387
1372
  if (! replication_services.isActive())
1388
1373
    return false;
 
1374
 
1389
1375
  /**
1390
1376
   * We do this check here because we don't want to even create a 
1391
1377
   * statement if there isn't a primary key on the table...
1394
1380
   *
1395
1381
   * Multi-column primary keys are handled how exactly?
1396
1382
   */
1397
 
  if (not in_table->getShare()->hasPrimaryKey())
 
1383
  if (not table.getShare()->hasPrimaryKey())
1398
1384
  {
1399
1385
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1400
1386
    return true;
1401
1387
  }
1402
1388
 
1403
1389
  uint32_t next_segment_id= 1;
1404
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1390
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1405
1391
 
1406
1392
  message::InsertData *data= statement.mutable_insert_data();
1407
1393
  data->set_segment_id(next_segment_id);
1409
1395
  message::InsertRecord *record= data->add_record();
1410
1396
 
1411
1397
  Field *current_field;
1412
 
  Field **table_fields= in_table->getFields();
 
1398
  Field **table_fields= table.getFields();
1413
1399
 
1414
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1400
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1415
1401
  string_value->set_charset(system_charset_info);
1416
1402
 
1417
1403
  /* We will read all the table's fields... */
1418
 
  in_table->setReadSet();
 
1404
  table.setReadSet();
1419
1405
 
1420
1406
  while ((current_field= *table_fields++) != NULL) 
1421
1407
  {
1435
1421
  return false;
1436
1422
}
1437
1423
 
1438
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1439
 
                                                            Table *in_table,
 
1424
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
 
1425
                                                            Table &table,
1440
1426
                                                            const unsigned char *old_record, 
1441
1427
                                                            const unsigned char *new_record,
1442
1428
                                                            uint32_t *next_segment_id)
1443
1429
{
1444
 
  message::Statement *statement= in_session->getStatementMessage();
 
1430
  message::Statement *statement= session.getStatementMessage();
1445
1431
  message::Transaction *transaction= NULL;
1446
1432
 
1447
1433
  /*
1453
1439
   */
1454
1440
  if (statement == NULL)
1455
1441
  {
1456
 
    transaction= getActiveTransactionMessage(in_session);
 
1442
    transaction= getActiveTransactionMessage(session);
1457
1443
    
1458
1444
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1459
 
        in_session->variables.transaction_message_threshold)
 
1445
        session.variables.transaction_message_threshold)
1460
1446
    {
1461
 
      transaction= segmentTransactionMessage(in_session, transaction);
 
1447
      transaction= segmentTransactionMessage(session, transaction);
1462
1448
    }
1463
1449
    
1464
1450
    statement= transaction->add_statement();
1465
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1466
 
    in_session->setStatementMessage(statement);
 
1451
    setUpdateHeader(*statement, session, table, old_record, new_record);
 
1452
    session.setStatementMessage(statement);
1467
1453
  }
1468
1454
  else
1469
1455
  {
1470
 
    transaction= getActiveTransactionMessage(in_session);
 
1456
    transaction= getActiveTransactionMessage(session);
1471
1457
    
1472
1458
    /*
1473
1459
     * If we've passed our threshold for the statement size (possible for
1475
1461
     * the Transaction will keep it from getting huge).
1476
1462
     */
1477
1463
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1478
 
        in_session->variables.transaction_message_threshold)
 
1464
        session.variables.transaction_message_threshold)
1479
1465
    {
1480
1466
      /* Remember the transaction ID so we can re-use it */
1481
1467
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1494
1480
       * statement and transaction. This will also set the Transaction
1495
1481
       * and Statement objects in Session to NULL.
1496
1482
       */
1497
 
      commitTransactionMessage(in_session);
 
1483
      commitTransactionMessage(session);
1498
1484
      
1499
1485
      /*
1500
1486
       * Statement and Transaction should now be NULL, so new ones will get
1501
1487
       * created. We reuse the transaction id since we are segmenting
1502
1488
       * one transaction.
1503
1489
       */
1504
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1490
      transaction= getActiveTransactionMessage(session, false);
1505
1491
      assert(transaction != NULL);
1506
1492
      
1507
1493
      statement= transaction->add_statement();
1508
 
      setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1509
 
      in_session->setStatementMessage(statement);
 
1494
      setUpdateHeader(*statement, session, table, old_record, new_record);
 
1495
      session.setStatementMessage(statement);
1510
1496
      
1511
1497
      /* Set the transaction ID to match the previous messages */
1512
1498
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1528
1514
}
1529
1515
 
1530
1516
void TransactionServices::setUpdateHeader(message::Statement &statement,
1531
 
                                          Session *in_session,
1532
 
                                          Table *in_table,
 
1517
                                          Session::const_reference session,
 
1518
                                          Table &table,
1533
1519
                                          const unsigned char *old_record, 
1534
1520
                                          const unsigned char *new_record)
1535
1521
{
1536
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1522
  initStatementMessage(statement, message::Statement::UPDATE, session);
1537
1523
 
1538
1524
  /* 
1539
1525
   * Now we construct the specialized UpdateHeader message inside
1544
1530
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1545
1531
 
1546
1532
  string schema_name;
1547
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1533
  (void) table.getShare()->getSchemaName(schema_name);
1548
1534
  string table_name;
1549
 
  (void) in_table->getShare()->getTableName(table_name);
 
1535
  (void) table.getShare()->getTableName(table_name);
1550
1536
 
1551
1537
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1552
1538
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1553
1539
 
1554
1540
  Field *current_field;
1555
 
  Field **table_fields= in_table->getFields();
 
1541
  Field **table_fields= table.getFields();
1556
1542
 
1557
1543
  message::FieldMetadata *field_metadata;
1558
1544
 
1559
1545
  /* We will read all the table's fields... */
1560
 
  in_table->setReadSet();
 
1546
  table.setReadSet();
1561
1547
 
1562
1548
  while ((current_field= *table_fields++) != NULL) 
1563
1549
  {
1565
1551
     * We add the "key field metadata" -- i.e. the fields which is
1566
1552
     * the primary key for the table.
1567
1553
     */
1568
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1554
    if (table.getShare()->fieldInPrimaryKey(current_field))
1569
1555
    {
1570
1556
      field_metadata= header->add_key_field_metadata();
1571
1557
      field_metadata->set_name(current_field->field_name);
1572
1558
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1573
1559
    }
1574
1560
 
1575
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1561
    if (isFieldUpdated(current_field, table, old_record, new_record))
1576
1562
    {
1577
1563
      /* Field is changed from old to new */
1578
1564
      field_metadata= header->add_set_field_metadata();
1581
1567
    }
1582
1568
  }
1583
1569
}
1584
 
void TransactionServices::updateRecord(Session *in_session,
1585
 
                                       Table *in_table, 
 
1570
void TransactionServices::updateRecord(Session::reference session,
 
1571
                                       Table &table, 
1586
1572
                                       const unsigned char *old_record, 
1587
1573
                                       const unsigned char *new_record)
1588
1574
{
1591
1577
    return;
1592
1578
 
1593
1579
  uint32_t next_segment_id= 1;
1594
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1580
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1595
1581
 
1596
1582
  message::UpdateData *data= statement.mutable_update_data();
1597
1583
  data->set_segment_id(next_segment_id);
1599
1585
  message::UpdateRecord *record= data->add_record();
1600
1586
 
1601
1587
  Field *current_field;
1602
 
  Field **table_fields= in_table->getFields();
1603
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1588
  Field **table_fields= table.getFields();
 
1589
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1604
1590
  string_value->set_charset(system_charset_info);
1605
1591
 
1606
1592
  while ((current_field= *table_fields++) != NULL) 
1616
1602
     *
1617
1603
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1618
1604
     */
1619
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1605
    if (isFieldUpdated(current_field, table, old_record, new_record))
1620
1606
    {
1621
1607
      /* Store the original "read bit" for this field */
1622
1608
      bool is_read_set= current_field->isReadSet();
1623
1609
 
1624
1610
      /* We need to mark that we will "read" this field... */
1625
 
      in_table->setReadSet(current_field->position());
 
1611
      table.setReadSet(current_field->position());
1626
1612
 
1627
1613
      /* Read the string value of this field's contents */
1628
1614
      string_value= current_field->val_str_internal(string_value);
1651
1637
     * primary key field value.  Replication only supports tables
1652
1638
     * with a primary key.
1653
1639
     */
1654
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1640
    if (table.getShare()->fieldInPrimaryKey(current_field))
1655
1641
    {
1656
1642
      /**
1657
1643
       * To say the below is ugly is an understatement. But it works.
1669
1655
}
1670
1656
 
1671
1657
bool TransactionServices::isFieldUpdated(Field *current_field,
1672
 
                                         Table *in_table,
 
1658
                                         Table &table,
1673
1659
                                         const unsigned char *old_record,
1674
1660
                                         const unsigned char *new_record)
1675
1661
{
1678
1664
   * we do this crazy pointer fiddling to figure out if the current field
1679
1665
   * has been updated in the supplied record raw byte pointers.
1680
1666
   */
1681
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1682
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1667
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1668
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1683
1669
 
1684
1670
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1685
1671
 
1709
1695
  return isUpdated;
1710
1696
}  
1711
1697
 
1712
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1713
 
                                                            Table *in_table,
 
1698
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
 
1699
                                                            Table &table,
1714
1700
                                                            uint32_t *next_segment_id)
1715
1701
{
1716
 
  message::Statement *statement= in_session->getStatementMessage();
 
1702
  message::Statement *statement= session.getStatementMessage();
1717
1703
  message::Transaction *transaction= NULL;
1718
1704
 
1719
1705
  /*
1725
1711
   */
1726
1712
  if (statement == NULL)
1727
1713
  {
1728
 
    transaction= getActiveTransactionMessage(in_session);
 
1714
    transaction= getActiveTransactionMessage(session);
1729
1715
    
1730
1716
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1731
 
        in_session->variables.transaction_message_threshold)
 
1717
        session.variables.transaction_message_threshold)
1732
1718
    {
1733
 
      transaction= segmentTransactionMessage(in_session, transaction);
 
1719
      transaction= segmentTransactionMessage(session, transaction);
1734
1720
    }
1735
1721
    
1736
1722
    statement= transaction->add_statement();
1737
 
    setDeleteHeader(*statement, in_session, in_table);
1738
 
    in_session->setStatementMessage(statement);
 
1723
    setDeleteHeader(*statement, session, table);
 
1724
    session.setStatementMessage(statement);
1739
1725
  }
1740
1726
  else
1741
1727
  {
1742
 
    transaction= getActiveTransactionMessage(in_session);
 
1728
    transaction= getActiveTransactionMessage(session);
1743
1729
    
1744
1730
    /*
1745
1731
     * If we've passed our threshold for the statement size (possible for
1747
1733
     * the Transaction will keep it from getting huge).
1748
1734
     */
1749
1735
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1750
 
        in_session->variables.transaction_message_threshold)
 
1736
        session.variables.transaction_message_threshold)
1751
1737
    {
1752
1738
      /* Remember the transaction ID so we can re-use it */
1753
1739
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1766
1752
       * statement and transaction. This will also set the Transaction
1767
1753
       * and Statement objects in Session to NULL.
1768
1754
       */
1769
 
      commitTransactionMessage(in_session);
 
1755
      commitTransactionMessage(session);
1770
1756
      
1771
1757
      /*
1772
1758
       * Statement and Transaction should now be NULL, so new ones will get
1773
1759
       * created. We reuse the transaction id since we are segmenting
1774
1760
       * one transaction.
1775
1761
       */
1776
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1762
      transaction= getActiveTransactionMessage(session, false);
1777
1763
      assert(transaction != NULL);
1778
1764
      
1779
1765
      statement= transaction->add_statement();
1780
 
      setDeleteHeader(*statement, in_session, in_table);
1781
 
      in_session->setStatementMessage(statement);
 
1766
      setDeleteHeader(*statement, session, table);
 
1767
      session.setStatementMessage(statement);
1782
1768
      
1783
1769
      /* Set the transaction ID to match the previous messages */
1784
1770
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1800
1786
}
1801
1787
 
1802
1788
void TransactionServices::setDeleteHeader(message::Statement &statement,
1803
 
                                          Session *in_session,
1804
 
                                          Table *in_table)
 
1789
                                          Session::const_reference session,
 
1790
                                          Table &table)
1805
1791
{
1806
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1792
  initStatementMessage(statement, message::Statement::DELETE, session);
1807
1793
 
1808
1794
  /* 
1809
1795
   * Now we construct the specialized DeleteHeader message inside
1813
1799
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1814
1800
 
1815
1801
  string schema_name;
1816
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1802
  (void) table.getShare()->getSchemaName(schema_name);
1817
1803
  string table_name;
1818
 
  (void) in_table->getShare()->getTableName(table_name);
 
1804
  (void) table.getShare()->getTableName(table_name);
1819
1805
 
1820
1806
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1821
1807
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1822
1808
 
1823
1809
  Field *current_field;
1824
 
  Field **table_fields= in_table->getFields();
 
1810
  Field **table_fields= table.getFields();
1825
1811
 
1826
1812
  message::FieldMetadata *field_metadata;
1827
1813
 
1832
1818
     * primary key field value.  Replication only supports tables
1833
1819
     * with a primary key.
1834
1820
     */
1835
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1821
    if (table.getShare()->fieldInPrimaryKey(current_field))
1836
1822
    {
1837
1823
      field_metadata= header->add_key_field_metadata();
1838
1824
      field_metadata->set_name(current_field->field_name);
1841
1827
  }
1842
1828
}
1843
1829
 
1844
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1830
void TransactionServices::deleteRecord(Session::reference session,
 
1831
                                       Table &table,
 
1832
                                       bool use_update_record)
1845
1833
{
1846
1834
  ReplicationServices &replication_services= ReplicationServices::singleton();
1847
1835
  if (! replication_services.isActive())
1848
1836
    return;
1849
1837
 
1850
1838
  uint32_t next_segment_id= 1;
1851
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1839
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1852
1840
 
1853
1841
  message::DeleteData *data= statement.mutable_delete_data();
1854
1842
  data->set_segment_id(next_segment_id);
1856
1844
  message::DeleteRecord *record= data->add_record();
1857
1845
 
1858
1846
  Field *current_field;
1859
 
  Field **table_fields= in_table->getFields();
1860
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1847
  Field **table_fields= table.getFields();
 
1848
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
1849
  string_value->set_charset(system_charset_info);
1862
1850
 
1863
1851
  while ((current_field= *table_fields++) != NULL) 
1867
1855
     * primary key field value.  Replication only supports tables
1868
1856
     * with a primary key.
1869
1857
     */
1870
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1858
    if (table.getShare()->fieldInPrimaryKey(current_field))
1871
1859
    {
1872
1860
      if (use_update_record)
1873
1861
      {
1879
1867
         * We are careful not to change anything in old_ptr.
1880
1868
         */
1881
1869
        const unsigned char *old_ptr= current_field->ptr;
1882
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1870
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1883
1871
        string_value= current_field->val_str_internal(string_value);
1884
1872
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1885
1873
      }
1896
1884
  }
1897
1885
}
1898
1886
 
1899
 
void TransactionServices::createTable(Session *in_session,
 
1887
void TransactionServices::createTable(Session::reference session,
1900
1888
                                      const message::Table &table)
1901
1889
{
1902
1890
  ReplicationServices &replication_services= ReplicationServices::singleton();
1903
1891
  if (! replication_services.isActive())
1904
1892
    return;
1905
1893
  
1906
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1894
  message::Transaction *transaction= getActiveTransactionMessage(session);
1907
1895
  message::Statement *statement= transaction->add_statement();
1908
1896
 
1909
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1897
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1910
1898
 
1911
1899
  /* 
1912
1900
   * Construct the specialized CreateTableStatement message and attach
1916
1904
  message::Table *new_table_message= create_table_statement->mutable_table();
1917
1905
  *new_table_message= table;
1918
1906
 
1919
 
  finalizeStatementMessage(*statement, in_session);
 
1907
  finalizeStatementMessage(*statement, session);
1920
1908
 
1921
 
  finalizeTransactionMessage(*transaction, in_session);
 
1909
  finalizeTransactionMessage(*transaction, session);
1922
1910
  
1923
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1911
  (void) replication_services.pushTransactionMessage(session, *transaction);
1924
1912
 
1925
 
  cleanupTransactionMessage(transaction, in_session);
 
1913
  cleanupTransactionMessage(transaction, session);
1926
1914
 
1927
1915
}
1928
1916
 
1929
 
void TransactionServices::createSchema(Session *in_session,
 
1917
void TransactionServices::createSchema(Session::reference session,
1930
1918
                                       const message::Schema &schema)
1931
1919
{
1932
1920
  ReplicationServices &replication_services= ReplicationServices::singleton();
1933
1921
  if (! replication_services.isActive())
1934
1922
    return;
1935
1923
  
1936
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1924
  message::Transaction *transaction= getActiveTransactionMessage(session);
1937
1925
  message::Statement *statement= transaction->add_statement();
1938
1926
 
1939
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1927
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1940
1928
 
1941
1929
  /* 
1942
1930
   * Construct the specialized CreateSchemaStatement message and attach
1946
1934
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
1935
  *new_schema_message= schema;
1948
1936
 
1949
 
  finalizeStatementMessage(*statement, in_session);
 
1937
  finalizeStatementMessage(*statement, session);
1950
1938
 
1951
 
  finalizeTransactionMessage(*transaction, in_session);
 
1939
  finalizeTransactionMessage(*transaction, session);
1952
1940
  
1953
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1941
  (void) replication_services.pushTransactionMessage(session, *transaction);
1954
1942
 
1955
 
  cleanupTransactionMessage(transaction, in_session);
 
1943
  cleanupTransactionMessage(transaction, session);
1956
1944
 
1957
1945
}
1958
1946
 
1959
 
void TransactionServices::dropSchema(Session *in_session, identifier::Schema::const_reference identifier)
 
1947
void TransactionServices::dropSchema(Session::reference session,
 
1948
                                     identifier::Schema::const_reference identifier)
1960
1949
{
1961
1950
  ReplicationServices &replication_services= ReplicationServices::singleton();
1962
1951
  if (! replication_services.isActive())
1963
1952
    return;
1964
1953
  
1965
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1954
  message::Transaction *transaction= getActiveTransactionMessage(session);
1966
1955
  message::Statement *statement= transaction->add_statement();
1967
1956
 
1968
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1957
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1969
1958
 
1970
1959
  /* 
1971
1960
   * Construct the specialized DropSchemaStatement message and attach
1975
1964
 
1976
1965
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1977
1966
 
1978
 
  finalizeStatementMessage(*statement, in_session);
 
1967
  finalizeStatementMessage(*statement, session);
1979
1968
 
1980
 
  finalizeTransactionMessage(*transaction, in_session);
 
1969
  finalizeTransactionMessage(*transaction, session);
1981
1970
  
1982
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1971
  (void) replication_services.pushTransactionMessage(session, *transaction);
1983
1972
 
1984
 
  cleanupTransactionMessage(transaction, in_session);
 
1973
  cleanupTransactionMessage(transaction, session);
1985
1974
}
1986
1975
 
1987
 
void TransactionServices::alterSchema(Session *in_session,
 
1976
void TransactionServices::alterSchema(Session::reference session,
1988
1977
                                      const message::schema::shared_ptr &old_schema,
1989
1978
                                      const message::Schema &new_schema)
1990
1979
{
1992
1981
  if (! replication_services.isActive())
1993
1982
    return;
1994
1983
  
1995
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1984
  message::Transaction *transaction= getActiveTransactionMessage(session);
1996
1985
  message::Statement *statement= transaction->add_statement();
1997
1986
 
1998
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, in_session);
 
1987
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
1999
1988
 
2000
1989
  /* 
2001
1990
   * Construct the specialized AlterSchemaStatement message and attach
2009
1998
  *before= *old_schema;
2010
1999
  *after= new_schema;
2011
2000
 
2012
 
  finalizeStatementMessage(*statement, in_session);
 
2001
  finalizeStatementMessage(*statement, session);
2013
2002
 
2014
 
  finalizeTransactionMessage(*transaction, in_session);
 
2003
  finalizeTransactionMessage(*transaction, session);
2015
2004
  
2016
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2005
  (void) replication_services.pushTransactionMessage(session, *transaction);
2017
2006
 
2018
 
  cleanupTransactionMessage(transaction, in_session);
 
2007
  cleanupTransactionMessage(transaction, session);
2019
2008
}
2020
2009
 
2021
 
void TransactionServices::dropTable(Session *in_session,
 
2010
void TransactionServices::dropTable(Session::reference session,
2022
2011
                                    const identifier::Table &table,
2023
2012
                                    bool if_exists)
2024
2013
{
2026
2015
  if (! replication_services.isActive())
2027
2016
    return;
2028
2017
  
2029
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2018
  message::Transaction *transaction= getActiveTransactionMessage(session);
2030
2019
  message::Statement *statement= transaction->add_statement();
2031
2020
 
2032
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
2021
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2033
2022
 
2034
2023
  /* 
2035
2024
   * Construct the specialized DropTableStatement message and attach
2044
2033
  table_metadata->set_schema_name(table.getSchemaName());
2045
2034
  table_metadata->set_table_name(table.getTableName());
2046
2035
 
2047
 
  finalizeStatementMessage(*statement, in_session);
 
2036
  finalizeStatementMessage(*statement, session);
2048
2037
 
2049
 
  finalizeTransactionMessage(*transaction, in_session);
 
2038
  finalizeTransactionMessage(*transaction, session);
2050
2039
  
2051
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2040
  (void) replication_services.pushTransactionMessage(session, *transaction);
2052
2041
 
2053
 
  cleanupTransactionMessage(transaction, in_session);
 
2042
  cleanupTransactionMessage(transaction, session);
2054
2043
}
2055
2044
 
2056
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
2045
void TransactionServices::truncateTable(Session::reference session,
 
2046
                                        Table &table)
2057
2047
{
2058
2048
  ReplicationServices &replication_services= ReplicationServices::singleton();
2059
2049
  if (! replication_services.isActive())
2060
2050
    return;
2061
2051
  
2062
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2052
  message::Transaction *transaction= getActiveTransactionMessage(session);
2063
2053
  message::Statement *statement= transaction->add_statement();
2064
2054
 
2065
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
2055
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2066
2056
 
2067
2057
  /* 
2068
2058
   * Construct the specialized TruncateTableStatement message and attach
2072
2062
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2073
2063
 
2074
2064
  string schema_name;
2075
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
2065
  (void) table.getShare()->getSchemaName(schema_name);
2076
2066
  string table_name;
2077
 
  (void) in_table->getShare()->getTableName(table_name);
 
2067
  (void) table.getShare()->getTableName(table_name);
2078
2068
 
2079
2069
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2080
2070
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2081
2071
 
2082
 
  finalizeStatementMessage(*statement, in_session);
 
2072
  finalizeStatementMessage(*statement, session);
2083
2073
 
2084
 
  finalizeTransactionMessage(*transaction, in_session);
 
2074
  finalizeTransactionMessage(*transaction, session);
2085
2075
  
2086
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2076
  (void) replication_services.pushTransactionMessage(session, *transaction);
2087
2077
 
2088
 
  cleanupTransactionMessage(transaction, in_session);
 
2078
  cleanupTransactionMessage(transaction, session);
2089
2079
}
2090
2080
 
2091
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
2081
void TransactionServices::rawStatement(Session::reference session,
 
2082
                                       const string &query)
2092
2083
{
2093
2084
  ReplicationServices &replication_services= ReplicationServices::singleton();
2094
2085
  if (! replication_services.isActive())
2095
2086
    return;
2096
2087
 
2097
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2088
  message::Transaction *transaction= getActiveTransactionMessage(session);
2098
2089
  message::Statement *statement= transaction->add_statement();
2099
2090
 
2100
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
2091
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2101
2092
  statement->set_sql(query);
2102
 
  finalizeStatementMessage(*statement, in_session);
 
2093
  finalizeStatementMessage(*statement, session);
2103
2094
 
2104
 
  finalizeTransactionMessage(*transaction, in_session);
 
2095
  finalizeTransactionMessage(*transaction, session);
2105
2096
  
2106
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2097
  (void) replication_services.pushTransactionMessage(session, *transaction);
2107
2098
 
2108
 
  cleanupTransactionMessage(transaction, in_session);
 
2099
  cleanupTransactionMessage(transaction, session);
2109
2100
}
2110
2101
 
2111
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2102
int TransactionServices::sendEvent(Session::reference session,
 
2103
                                   const message::Event &event)
2112
2104
{
2113
2105
  ReplicationServices &replication_services= ReplicationServices::singleton();
2114
2106
  if (! replication_services.isActive())
2126
2118
 
2127
2119
  trx_event->CopyFrom(event);
2128
2120
 
2129
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2121
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2130
2122
 
2131
2123
  delete transaction;
2132
2124
 
2133
2125
  return static_cast<int>(result);
2134
2126
}
2135
2127
 
2136
 
bool TransactionServices::sendStartupEvent(Session *session)
 
2128
bool TransactionServices::sendStartupEvent(Session::reference session)
2137
2129
{
2138
2130
  message::Event event;
2139
2131
  event.set_type(message::Event::STARTUP);
2142
2134
  return true;
2143
2135
}
2144
2136
 
2145
 
bool TransactionServices::sendShutdownEvent(Session *session)
 
2137
bool TransactionServices::sendShutdownEvent(Session::reference session)
2146
2138
{
2147
2139
  message::Event event;
2148
2140
  event.set_type(message::Event::SHUTDOWN);