~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Lee Bieber
  • Date: 2011-01-25 02:10:42 UTC
  • mfrom: (2109.1.4 build)
  • Revision ID: kalebral@gmail.com-20110125021042-ocqa0v509ae7fmtz
Need to add a "drop table a" in execute.wait test
Add execute test suite to regular test run
Merge Lee - fix second part of 705699, check for both client and server before building and testing rabbitmq plugin
Merge Shrews - Changes TransactionServices methods to use references to Session objects instead of pointers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session *session,
 
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session->transaction.stmt;
333
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session.transaction.stmt;
 
333
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session *session,
 
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session->transaction.stmt;
366
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session.transaction.stmt;
 
366
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session->transaction.all;
387
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session.transaction.all;
 
387
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session.server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session->transaction.xid_state.xid.is_null())
404
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
403
  if (session.transaction.xid_state.xid.is_null())
 
404
    session.transaction.xid_state.xid.set(session.getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session->transaction.all;
417
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session.transaction.all;
 
417
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session.server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session->transaction.xid_state.xid.is_null())
434
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
433
  if (session.transaction.xid_state.xid.is_null())
 
434
    session.transaction.xid_state.xid.set(session.getQueryId());
435
435
 
436
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session.getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
457
457
{
458
 
  if (session->getXaId() == 0)
 
458
  if (session.getXaId() == 0)
459
459
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
460
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
461
461
  }
462
462
 
463
 
  return session->getXaId();
 
463
  return session.getXaId();
464
464
}
465
465
 
466
 
/**
467
 
  @retval
468
 
    0   ok
469
 
  @retval
470
 
    1   transaction was rolled back
471
 
  @retval
472
 
    2   error during commit, data may be inconsistent
473
 
 
474
 
  @todo
475
 
    Since we don't support nested statement transactions in 5.0,
476
 
    we can't commit or rollback stmt transactions while we are inside
477
 
    stored functions or triggers. So we simply do nothing now.
478
 
    TODO: This should be fixed in later ( >= 5.1) releases.
479
 
*/
480
 
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
 
466
int TransactionServices::commitTransaction(Session::reference session,
 
467
                                           bool normal_transaction)
481
468
{
482
469
  int error= 0, cookie= 0;
483
470
  /*
484
471
    'all' means that this is either an explicit commit issued by
485
472
    user, or an implicit commit issued by a DDL.
486
473
  */
487
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
474
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
489
476
 
490
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
477
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
491
478
 
492
479
  /*
493
480
    We must not commit the normal transaction if a statement
495
482
    flags will not get propagated to its normal transaction's
496
483
    counterpart.
497
484
  */
498
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
499
 
              trans == &session->transaction.stmt);
 
485
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
486
              trans == &session.transaction.stmt);
500
487
 
501
488
  if (resource_contexts.empty() == false)
502
489
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
490
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
504
491
    {
505
492
      rollbackTransaction(session, normal_transaction);
506
493
      return 1;
531
518
 
532
519
        if (resource->participatesInXaTransaction())
533
520
        {
534
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
 
521
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
535
522
          {
536
523
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
537
524
            error= 1;
538
525
          }
539
526
          else
540
527
          {
541
 
            session->status_var.ha_prepare_count++;
 
528
            session.status_var.ha_prepare_count++;
542
529
          }
543
530
        }
544
531
      }
560
547
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
548
end:
562
549
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
 
550
      session.startWaitingGlobalReadLock();
564
551
  }
565
552
  return error;
566
553
}
569
556
  @note
570
557
  This function does not care about global read lock. A caller should.
571
558
*/
572
 
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
 
559
int TransactionServices::commitPhaseOne(Session::reference session,
 
560
                                        bool normal_transaction)
573
561
{
574
562
  int error=0;
575
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
563
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
576
564
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
577
565
 
578
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
566
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
579
567
  bool all= normal_transaction;
580
568
 
581
569
  /* If we're in autocommit then we have a real transaction to commit
582
570
     (except if it's BEGIN)
583
571
  */
584
 
  if (! session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
572
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
585
573
    all= true;
586
574
 
587
575
  if (resource_contexts.empty() == false)
597
585
 
598
586
      if (resource->participatesInXaTransaction())
599
587
      {
600
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, all)))
 
588
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
601
589
        {
602
590
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
603
591
          error= 1;
604
592
        }
605
593
        else if (normal_transaction)
606
594
        {
607
 
          session->status_var.ha_commit_count++;
 
595
          session.status_var.ha_commit_count++;
608
596
        }
609
597
      }
610
598
      else if (resource->participatesInSqlTransaction())
611
599
      {
612
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, all)))
 
600
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
613
601
        {
614
602
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
615
603
          error= 1;
616
604
        }
617
605
        else if (normal_transaction)
618
606
        {
619
 
          session->status_var.ha_commit_count++;
 
607
          session.status_var.ha_commit_count++;
620
608
        }
621
609
      }
622
610
      resource_context->reset(); /* keep it conveniently zero-filled */
623
611
    }
624
612
 
625
613
    if (is_real_trans)
626
 
      session->transaction.xid_state.xid.null();
 
614
      session.transaction.xid_state.xid.null();
627
615
 
628
616
    if (normal_transaction)
629
617
    {
630
 
      session->variables.tx_isolation= session->session_tx_isolation;
631
 
      session->transaction.cleanup();
 
618
      session.variables.tx_isolation= session.session_tx_isolation;
 
619
      session.transaction.cleanup();
632
620
    }
633
621
  }
634
622
  trans->reset();
635
623
  return error;
636
624
}
637
625
 
638
 
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
 
626
int TransactionServices::rollbackTransaction(Session::reference session,
 
627
                                             bool normal_transaction)
639
628
{
640
629
  int error= 0;
641
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
630
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
642
631
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
643
632
 
644
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
645
 
  bool all = normal_transaction || !session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
633
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
634
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
646
635
 
647
636
  /*
648
637
    We must not rollback the normal transaction if a statement
649
638
    transaction is pending.
650
639
  */
651
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
652
 
              trans == &session->transaction.stmt);
 
640
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
641
              trans == &session.transaction.stmt);
653
642
 
654
643
  if (resource_contexts.empty() == false)
655
644
  {
664
653
 
665
654
      if (resource->participatesInXaTransaction())
666
655
      {
667
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, all)))
 
656
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
668
657
        {
669
658
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
670
659
          error= 1;
671
660
        }
672
661
        else if (normal_transaction)
673
662
        {
674
 
          session->status_var.ha_rollback_count++;
 
663
          session.status_var.ha_rollback_count++;
675
664
        }
676
665
      }
677
666
      else if (resource->participatesInSqlTransaction())
678
667
      {
679
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, all)))
 
668
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
680
669
        {
681
670
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
682
671
          error= 1;
683
672
        }
684
673
        else if (normal_transaction)
685
674
        {
686
 
          session->status_var.ha_rollback_count++;
 
675
          session.status_var.ha_rollback_count++;
687
676
        }
688
677
      }
689
678
      resource_context->reset(); /* keep it conveniently zero-filled */
702
691
      rollbackStatementMessage(session);
703
692
 
704
693
    if (is_real_trans)
705
 
      session->transaction.xid_state.xid.null();
 
694
      session.transaction.xid_state.xid.null();
706
695
    if (normal_transaction)
707
696
    {
708
 
      session->variables.tx_isolation=session->session_tx_isolation;
709
 
      session->transaction.cleanup();
 
697
      session.variables.tx_isolation=session.session_tx_isolation;
 
698
      session.transaction.cleanup();
710
699
    }
711
700
  }
712
701
  if (normal_transaction)
713
 
    session->transaction_rollback_request= false;
 
702
    session.transaction_rollback_request= false;
714
703
 
715
704
  /*
716
705
   * If a non-transactional table was updated, warn the user
717
706
   */
718
707
  if (is_real_trans &&
719
 
      session->transaction.all.hasModifiedNonTransData() &&
720
 
      session->getKilled() != Session::KILL_CONNECTION)
 
708
      session.transaction.all.hasModifiedNonTransData() &&
 
709
      session.getKilled() != Session::KILL_CONNECTION)
721
710
  {
722
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
711
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
723
712
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
724
713
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
725
714
  }
727
716
  return error;
728
717
}
729
718
 
730
 
/**
731
 
  This is used to commit or rollback a single statement depending on
732
 
  the value of error.
733
 
 
734
 
  @note
735
 
    Note that if the autocommit is on, then the following call inside
736
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
737
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
738
 
    the user has used LOCK TABLES then that mechanism does not know to do the
739
 
    commit.
740
 
*/
741
 
int TransactionServices::autocommitOrRollback(Session *session, int error)
 
719
int TransactionServices::autocommitOrRollback(Session::reference session,
 
720
                                              int error)
742
721
{
743
722
  /* One GPB Statement message per SQL statement */
744
 
  message::Statement *statement= session->getStatementMessage();
 
723
  message::Statement *statement= session.getStatementMessage();
745
724
  if ((statement != NULL) && (! error))
746
725
    finalizeStatementMessage(*statement, session);
747
726
 
748
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
 
727
  if (session.transaction.stmt.getResourceContexts().empty() == false)
749
728
  {
750
 
    TransactionContext *trans = &session->transaction.stmt;
 
729
    TransactionContext *trans = &session.transaction.stmt;
751
730
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
752
731
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
753
732
         it != resource_contexts.end();
755
734
    {
756
735
      ResourceContext *resource_context= *it;
757
736
 
758
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
737
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
759
738
    }
760
739
 
761
740
    if (! error)
766
745
    else
767
746
    {
768
747
      (void) rollbackTransaction(session, false);
769
 
      if (session->transaction_rollback_request)
 
748
      if (session.transaction_rollback_request)
770
749
        (void) rollbackTransaction(session, true);
771
750
    }
772
751
 
773
 
    session->variables.tx_isolation= session->session_tx_isolation;
 
752
    session.variables.tx_isolation= session.session_tx_isolation;
774
753
  }
775
754
  return error;
776
755
}
785
764
  }
786
765
};
787
766
 
788
 
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
 
767
int TransactionServices::rollbackToSavepoint(Session::reference session,
 
768
                                             NamedSavepoint &sv)
789
769
{
790
770
  int error= 0;
791
 
  TransactionContext *trans= &session->transaction.all;
 
771
  TransactionContext *trans= &session.transaction.all;
792
772
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
793
773
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
794
774
 
808
788
 
809
789
    if (resource->participatesInSqlTransaction())
810
790
    {
811
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
 
791
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
812
792
      {
813
793
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
814
794
        error= 1;
815
795
      }
816
796
      else
817
797
      {
818
 
        session->status_var.ha_savepoint_rollback_count++;
 
798
        session.status_var.ha_savepoint_rollback_count++;
819
799
      }
820
800
    }
821
801
    trans->no_2pc|= not resource->participatesInXaTransaction();
865
845
 
866
846
      if (resource->participatesInSqlTransaction())
867
847
      {
868
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
 
848
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
869
849
        {
870
850
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
871
851
          error= 1;
872
852
        }
873
853
        else
874
854
        {
875
 
          session->status_var.ha_rollback_count++;
 
855
          session.status_var.ha_rollback_count++;
876
856
        }
877
857
      }
878
858
      resource_context->reset(); /* keep it conveniently zero-filled */
895
875
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
896
876
      if (num_statements == 0)
897
877
      {    
898
 
        session->setStatementMessage(NULL);
 
878
        session.setStatementMessage(NULL);
899
879
      }    
900
880
      else 
901
881
      {
902
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
882
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
903
883
      }    
904
 
      session->setTransactionMessage(savepoint_transaction_copy);
 
884
      session.setTransactionMessage(savepoint_transaction_copy);
905
885
    }
906
886
  }
907
887
 
914
894
  section "4.33.4 SQL-statements and transaction states",
915
895
  NamedSavepoint is *not* transaction-initiating SQL-statement
916
896
*/
917
 
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
 
897
int TransactionServices::setSavepoint(Session::reference session,
 
898
                                      NamedSavepoint &sv)
918
899
{
919
900
  int error= 0;
920
 
  TransactionContext *trans= &session->transaction.all;
 
901
  TransactionContext *trans= &session.transaction.all;
921
902
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
922
903
 
923
904
  if (resource_contexts.empty() == false)
933
914
 
934
915
      if (resource->participatesInSqlTransaction())
935
916
      {
936
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
 
917
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
937
918
        {
938
919
          my_error(ER_GET_ERRNO, MYF(0), err);
939
920
          error= 1;
940
921
        }
941
922
        else
942
923
        {
943
 
          session->status_var.ha_savepoint_count++;
 
924
          session.status_var.ha_savepoint_count++;
944
925
        }
945
926
      }
946
927
    }
952
933
 
953
934
  if (shouldConstructMessages())
954
935
  {
955
 
    message::Transaction *transaction= session->getTransactionMessage();
 
936
    message::Transaction *transaction= session.getTransactionMessage();
956
937
                  
957
938
    if (transaction != NULL)
958
939
    {
965
946
  return error;
966
947
}
967
948
 
968
 
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
 
949
int TransactionServices::releaseSavepoint(Session::reference session,
 
950
                                          NamedSavepoint &sv)
969
951
{
970
952
  int error= 0;
971
953
 
982
964
 
983
965
    if (resource->participatesInSqlTransaction())
984
966
    {
985
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
 
967
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
986
968
      {
987
969
        my_error(ER_GET_ERRNO, MYF(0), err);
988
970
        error= 1;
999
981
  return replication_services.isActive();
1000
982
}
1001
983
 
1002
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
984
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
 
985
                                                                       bool should_inc_trx_id)
1003
986
{
1004
 
  message::Transaction *transaction= in_session->getTransactionMessage();
 
987
  message::Transaction *transaction= session.getTransactionMessage();
1005
988
 
1006
989
  if (unlikely(transaction == NULL))
1007
990
  {
1011
994
     * deleting transaction message when done with it.
1012
995
     */
1013
996
    transaction= new (nothrow) message::Transaction();
1014
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1015
 
    in_session->setTransactionMessage(transaction);
 
997
    initTransactionMessage(*transaction, session, should_inc_trx_id);
 
998
    session.setTransactionMessage(transaction);
1016
999
    return transaction;
1017
1000
  }
1018
1001
  else
1019
1002
    return transaction;
1020
1003
}
1021
1004
 
1022
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1023
 
                                                 Session *in_session,
 
1005
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
 
1006
                                                 Session::reference session,
1024
1007
                                                 bool should_inc_trx_id)
1025
1008
{
1026
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1027
 
  trx->set_server_id(in_session->getServerId());
 
1009
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1010
  trx->set_server_id(session.getServerId());
1028
1011
 
1029
1012
  if (should_inc_trx_id)
1030
1013
  {
1031
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1032
 
    in_session->setXaId(0);
 
1014
    trx->set_transaction_id(getCurrentTransactionId(session));
 
1015
    session.setXaId(0);
1033
1016
  }
1034
1017
  else
1035
1018
  {
1037
1020
    trx->set_transaction_id(0);
1038
1021
  }
1039
1022
 
1040
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1023
  trx->set_start_timestamp(session.getCurrentTimestamp());
1041
1024
  
1042
1025
  /* segment info may get set elsewhere as needed */
1043
 
  in_transaction.set_segment_id(1);
1044
 
  in_transaction.set_end_segment(true);
1045
 
}
1046
 
 
1047
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1048
 
                                              Session *in_session)
1049
 
{
1050
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1051
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1052
 
}
1053
 
 
1054
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1055
 
                                             Session *in_session)
1056
 
{
1057
 
  delete in_transaction;
1058
 
  in_session->setStatementMessage(NULL);
1059
 
  in_session->setTransactionMessage(NULL);
1060
 
  in_session->setXaId(0);
1061
 
}
1062
 
 
1063
 
int TransactionServices::commitTransactionMessage(Session *in_session)
 
1026
  transaction.set_segment_id(1);
 
1027
  transaction.set_end_segment(true);
 
1028
}
 
1029
 
 
1030
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
 
1031
                                                     Session::const_reference session)
 
1032
{
 
1033
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1034
  trx->set_end_timestamp(session.getCurrentTimestamp());
 
1035
}
 
1036
 
 
1037
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
 
1038
                                                    Session::reference session)
 
1039
{
 
1040
  delete transaction;
 
1041
  session.setStatementMessage(NULL);
 
1042
  session.setTransactionMessage(NULL);
 
1043
  session.setXaId(0);
 
1044
}
 
1045
 
 
1046
int TransactionServices::commitTransactionMessage(Session::reference session)
1064
1047
{
1065
1048
  ReplicationServices &replication_services= ReplicationServices::singleton();
1066
1049
  if (! replication_services.isActive())
1070
1053
   * If no Transaction message was ever created, then no data modification
1071
1054
   * occurred inside the transaction, so nothing to do.
1072
1055
   */
1073
 
  if (in_session->getTransactionMessage() == NULL)
 
1056
  if (session.getTransactionMessage() == NULL)
1074
1057
    return 0;
1075
1058
  
1076
1059
  /* If there is an active statement message, finalize it. */
1077
 
  message::Statement *statement= in_session->getStatementMessage();
 
1060
  message::Statement *statement= session.getStatementMessage();
1078
1061
 
1079
1062
  if (statement != NULL)
1080
1063
  {
1081
 
    finalizeStatementMessage(*statement, in_session);
 
1064
    finalizeStatementMessage(*statement, session);
1082
1065
  }
1083
1066
 
1084
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1067
  message::Transaction* transaction= getActiveTransactionMessage(session);
1085
1068
 
1086
1069
  /*
1087
1070
   * It is possible that we could have a Transaction without any Statements
1091
1074
   */
1092
1075
  if (transaction->statement_size() == 0)
1093
1076
  {
1094
 
    cleanupTransactionMessage(transaction, in_session);
 
1077
    cleanupTransactionMessage(transaction, session);
1095
1078
    return 0;
1096
1079
  }
1097
1080
  
1098
 
  finalizeTransactionMessage(*transaction, in_session);
 
1081
  finalizeTransactionMessage(*transaction, session);
1099
1082
  
1100
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1083
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1101
1084
 
1102
 
  cleanupTransactionMessage(transaction, in_session);
 
1085
  cleanupTransactionMessage(transaction, session);
1103
1086
 
1104
1087
  return static_cast<int>(result);
1105
1088
}
1106
1089
 
1107
1090
void TransactionServices::initStatementMessage(message::Statement &statement,
1108
 
                                        message::Statement::Type in_type,
1109
 
                                        Session *in_session)
 
1091
                                               message::Statement::Type type,
 
1092
                                               Session::const_reference session)
1110
1093
{
1111
 
  statement.set_type(in_type);
1112
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1094
  statement.set_type(type);
 
1095
  statement.set_start_timestamp(session.getCurrentTimestamp());
1113
1096
 
1114
 
  if (in_session->variables.replicate_query)
1115
 
    statement.set_sql(in_session->getQueryString()->c_str());
 
1097
  if (session.variables.replicate_query)
 
1098
    statement.set_sql(session.getQueryString()->c_str());
1116
1099
}
1117
1100
 
1118
1101
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1119
 
                                            Session *in_session)
 
1102
                                                   Session::reference session)
1120
1103
{
1121
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1122
 
  in_session->setStatementMessage(NULL);
 
1104
  statement.set_end_timestamp(session.getCurrentTimestamp());
 
1105
  session.setStatementMessage(NULL);
1123
1106
}
1124
1107
 
1125
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1108
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1126
1109
{
1127
1110
  ReplicationServices &replication_services= ReplicationServices::singleton();
1128
1111
  if (! replication_services.isActive())
1129
1112
    return;
1130
1113
  
1131
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1114
  message::Transaction *transaction= getActiveTransactionMessage(session);
1132
1115
 
1133
1116
  /*
1134
1117
   * OK, so there are two situations that we need to deal with here:
1158
1141
     * attach it to the transaction, and push it to replicators.
1159
1142
     */
1160
1143
    transaction->Clear();
1161
 
    initTransactionMessage(*transaction, in_session, false);
 
1144
    initTransactionMessage(*transaction, session, false);
1162
1145
 
1163
1146
    /* Set the transaction ID to match the previous messages */
1164
1147
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1167
1150
 
1168
1151
    message::Statement *statement= transaction->add_statement();
1169
1152
 
1170
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1171
 
    finalizeStatementMessage(*statement, in_session);
 
1153
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
 
1154
    finalizeStatementMessage(*statement, session);
1172
1155
 
1173
 
    finalizeTransactionMessage(*transaction, in_session);
 
1156
    finalizeTransactionMessage(*transaction, session);
1174
1157
    
1175
 
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1158
    (void) replication_services.pushTransactionMessage(session, *transaction);
1176
1159
  }
1177
 
  cleanupTransactionMessage(transaction, in_session);
 
1160
 
 
1161
  cleanupTransactionMessage(transaction, session);
1178
1162
}
1179
1163
 
1180
 
void TransactionServices::rollbackStatementMessage(Session *in_session)
 
1164
void TransactionServices::rollbackStatementMessage(Session::reference session)
1181
1165
{
1182
1166
  ReplicationServices &replication_services= ReplicationServices::singleton();
1183
1167
  if (! replication_services.isActive())
1184
1168
    return;
1185
1169
 
1186
 
  message::Statement *current_statement= in_session->getStatementMessage();
 
1170
  message::Statement *current_statement= session.getStatementMessage();
1187
1171
 
1188
1172
  /* If we never added a Statement message, nothing to undo. */
1189
1173
  if (current_statement == NULL)
1222
1206
   * Remove the Statement message we've been working with (same as
1223
1207
   * current_statement).
1224
1208
   */
1225
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1209
  message::Transaction *transaction= getActiveTransactionMessage(session);
1226
1210
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1227
1211
  statements_in_txn= transaction->mutable_statement();
1228
1212
  statements_in_txn->RemoveLast();
1229
 
  in_session->setStatementMessage(NULL);
 
1213
  session.setStatementMessage(NULL);
1230
1214
  
1231
1215
  /*
1232
1216
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1238
1222
    current_statement= transaction->add_statement();
1239
1223
    initStatementMessage(*current_statement,
1240
1224
                         message::Statement::ROLLBACK_STATEMENT,
1241
 
                         in_session);
1242
 
    finalizeStatementMessage(*current_statement, in_session);
 
1225
                         session);
 
1226
    finalizeStatementMessage(*current_statement, session);
1243
1227
  }
1244
1228
}
1245
1229
 
1246
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session *in_session,
 
1230
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1247
1231
                                                                     message::Transaction *transaction)
1248
1232
{
1249
1233
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1250
1234
  uint32_t seg_id= transaction->segment_id();
1251
1235
  
1252
1236
  transaction->set_end_segment(false);
1253
 
  commitTransactionMessage(in_session);
1254
 
  transaction= getActiveTransactionMessage(in_session, false);
 
1237
  commitTransactionMessage(session);
 
1238
  transaction= getActiveTransactionMessage(session, false);
1255
1239
  
1256
1240
  /* Set the transaction ID to match the previous messages */
1257
1241
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1261
1245
  return transaction;
1262
1246
}
1263
1247
 
1264
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1265
 
                                                            Table *in_table,
 
1248
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
 
1249
                                                            Table &table,
1266
1250
                                                            uint32_t *next_segment_id)
1267
1251
{
1268
 
  message::Statement *statement= in_session->getStatementMessage();
 
1252
  message::Statement *statement= session.getStatementMessage();
1269
1253
  message::Transaction *transaction= NULL;
1270
1254
  
1271
1255
  /*
1277
1261
   */
1278
1262
  if (statement == NULL)
1279
1263
  {
1280
 
    transaction= getActiveTransactionMessage(in_session);
 
1264
    transaction= getActiveTransactionMessage(session);
1281
1265
 
1282
1266
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1283
 
        in_session->variables.transaction_message_threshold)
 
1267
        session.variables.transaction_message_threshold)
1284
1268
    {
1285
 
      transaction= segmentTransactionMessage(in_session, transaction);
 
1269
      transaction= segmentTransactionMessage(session, transaction);
1286
1270
    }
1287
1271
 
1288
1272
    statement= transaction->add_statement();
1289
 
    setInsertHeader(*statement, in_session, in_table);
1290
 
    in_session->setStatementMessage(statement);
 
1273
    setInsertHeader(*statement, session, table);
 
1274
    session.setStatementMessage(statement);
1291
1275
  }
1292
1276
  else
1293
1277
  {
1294
 
    transaction= getActiveTransactionMessage(in_session);
 
1278
    transaction= getActiveTransactionMessage(session);
1295
1279
    
1296
1280
    /*
1297
1281
     * If we've passed our threshold for the statement size (possible for
1299
1283
     * the Transaction will keep it from getting huge).
1300
1284
     */
1301
1285
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1302
 
        in_session->variables.transaction_message_threshold)
 
1286
        session.variables.transaction_message_threshold)
1303
1287
    {
1304
1288
      /* Remember the transaction ID so we can re-use it */
1305
1289
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1318
1302
       * statement and transaction. This will also set the Transaction
1319
1303
       * and Statement objects in Session to NULL.
1320
1304
       */
1321
 
      commitTransactionMessage(in_session);
 
1305
      commitTransactionMessage(session);
1322
1306
      
1323
1307
      /*
1324
1308
       * Statement and Transaction should now be NULL, so new ones will get
1325
1309
       * created. We reuse the transaction id since we are segmenting
1326
1310
       * one transaction.
1327
1311
       */
1328
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1312
      transaction= getActiveTransactionMessage(session, false);
1329
1313
      assert(transaction != NULL);
1330
1314
 
1331
1315
      statement= transaction->add_statement();
1332
 
      setInsertHeader(*statement, in_session, in_table);
1333
 
      in_session->setStatementMessage(statement);
 
1316
      setInsertHeader(*statement, session, table);
 
1317
      session.setStatementMessage(statement);
1334
1318
            
1335
1319
      /* Set the transaction ID to match the previous messages */
1336
1320
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1352
1336
}
1353
1337
 
1354
1338
void TransactionServices::setInsertHeader(message::Statement &statement,
1355
 
                                          Session *in_session,
1356
 
                                          Table *in_table)
 
1339
                                          Session::const_reference session,
 
1340
                                          Table &table)
1357
1341
{
1358
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1342
  initStatementMessage(statement, message::Statement::INSERT, session);
1359
1343
 
1360
1344
  /* 
1361
1345
   * Now we construct the specialized InsertHeader message inside
1366
1350
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1367
1351
 
1368
1352
  string schema_name;
1369
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1353
  (void) table.getShare()->getSchemaName(schema_name);
1370
1354
  string table_name;
1371
 
  (void) in_table->getShare()->getTableName(table_name);
 
1355
  (void) table.getShare()->getTableName(table_name);
1372
1356
 
1373
1357
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1374
1358
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1375
1359
 
1376
1360
  Field *current_field;
1377
 
  Field **table_fields= in_table->getFields();
 
1361
  Field **table_fields= table.getFields();
1378
1362
 
1379
1363
  message::FieldMetadata *field_metadata;
1380
1364
 
1381
1365
  /* We will read all the table's fields... */
1382
 
  in_table->setReadSet();
 
1366
  table.setReadSet();
1383
1367
 
1384
1368
  while ((current_field= *table_fields++) != NULL) 
1385
1369
  {
1389
1373
  }
1390
1374
}
1391
1375
 
1392
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1376
bool TransactionServices::insertRecord(Session::reference session,
 
1377
                                       Table &table)
1393
1378
{
1394
1379
  ReplicationServices &replication_services= ReplicationServices::singleton();
1395
1380
  if (! replication_services.isActive())
1396
1381
    return false;
 
1382
 
1397
1383
  /**
1398
1384
   * We do this check here because we don't want to even create a 
1399
1385
   * statement if there isn't a primary key on the table...
1402
1388
   *
1403
1389
   * Multi-column primary keys are handled how exactly?
1404
1390
   */
1405
 
  if (not in_table->getShare()->hasPrimaryKey())
 
1391
  if (not table.getShare()->hasPrimaryKey())
1406
1392
  {
1407
1393
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1408
1394
    return true;
1409
1395
  }
1410
1396
 
1411
1397
  uint32_t next_segment_id= 1;
1412
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1398
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1413
1399
 
1414
1400
  message::InsertData *data= statement.mutable_insert_data();
1415
1401
  data->set_segment_id(next_segment_id);
1417
1403
  message::InsertRecord *record= data->add_record();
1418
1404
 
1419
1405
  Field *current_field;
1420
 
  Field **table_fields= in_table->getFields();
 
1406
  Field **table_fields= table.getFields();
1421
1407
 
1422
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1408
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1423
1409
  string_value->set_charset(system_charset_info);
1424
1410
 
1425
1411
  /* We will read all the table's fields... */
1426
 
  in_table->setReadSet();
 
1412
  table.setReadSet();
1427
1413
 
1428
1414
  while ((current_field= *table_fields++) != NULL) 
1429
1415
  {
1443
1429
  return false;
1444
1430
}
1445
1431
 
1446
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1447
 
                                                            Table *in_table,
 
1432
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
 
1433
                                                            Table &table,
1448
1434
                                                            const unsigned char *old_record, 
1449
1435
                                                            const unsigned char *new_record,
1450
1436
                                                            uint32_t *next_segment_id)
1451
1437
{
1452
 
  message::Statement *statement= in_session->getStatementMessage();
 
1438
  message::Statement *statement= session.getStatementMessage();
1453
1439
  message::Transaction *transaction= NULL;
1454
1440
 
1455
1441
  /*
1461
1447
   */
1462
1448
  if (statement == NULL)
1463
1449
  {
1464
 
    transaction= getActiveTransactionMessage(in_session);
 
1450
    transaction= getActiveTransactionMessage(session);
1465
1451
    
1466
1452
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1467
 
        in_session->variables.transaction_message_threshold)
 
1453
        session.variables.transaction_message_threshold)
1468
1454
    {
1469
 
      transaction= segmentTransactionMessage(in_session, transaction);
 
1455
      transaction= segmentTransactionMessage(session, transaction);
1470
1456
    }
1471
1457
    
1472
1458
    statement= transaction->add_statement();
1473
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1474
 
    in_session->setStatementMessage(statement);
 
1459
    setUpdateHeader(*statement, session, table, old_record, new_record);
 
1460
    session.setStatementMessage(statement);
1475
1461
  }
1476
1462
  else
1477
1463
  {
1478
 
    transaction= getActiveTransactionMessage(in_session);
 
1464
    transaction= getActiveTransactionMessage(session);
1479
1465
    
1480
1466
    /*
1481
1467
     * If we've passed our threshold for the statement size (possible for
1483
1469
     * the Transaction will keep it from getting huge).
1484
1470
     */
1485
1471
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1486
 
        in_session->variables.transaction_message_threshold)
 
1472
        session.variables.transaction_message_threshold)
1487
1473
    {
1488
1474
      /* Remember the transaction ID so we can re-use it */
1489
1475
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1502
1488
       * statement and transaction. This will also set the Transaction
1503
1489
       * and Statement objects in Session to NULL.
1504
1490
       */
1505
 
      commitTransactionMessage(in_session);
 
1491
      commitTransactionMessage(session);
1506
1492
      
1507
1493
      /*
1508
1494
       * Statement and Transaction should now be NULL, so new ones will get
1509
1495
       * created. We reuse the transaction id since we are segmenting
1510
1496
       * one transaction.
1511
1497
       */
1512
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1498
      transaction= getActiveTransactionMessage(session, false);
1513
1499
      assert(transaction != NULL);
1514
1500
      
1515
1501
      statement= transaction->add_statement();
1516
 
      setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1517
 
      in_session->setStatementMessage(statement);
 
1502
      setUpdateHeader(*statement, session, table, old_record, new_record);
 
1503
      session.setStatementMessage(statement);
1518
1504
      
1519
1505
      /* Set the transaction ID to match the previous messages */
1520
1506
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1536
1522
}
1537
1523
 
1538
1524
void TransactionServices::setUpdateHeader(message::Statement &statement,
1539
 
                                          Session *in_session,
1540
 
                                          Table *in_table,
 
1525
                                          Session::const_reference session,
 
1526
                                          Table &table,
1541
1527
                                          const unsigned char *old_record, 
1542
1528
                                          const unsigned char *new_record)
1543
1529
{
1544
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1530
  initStatementMessage(statement, message::Statement::UPDATE, session);
1545
1531
 
1546
1532
  /* 
1547
1533
   * Now we construct the specialized UpdateHeader message inside
1552
1538
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1553
1539
 
1554
1540
  string schema_name;
1555
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1541
  (void) table.getShare()->getSchemaName(schema_name);
1556
1542
  string table_name;
1557
 
  (void) in_table->getShare()->getTableName(table_name);
 
1543
  (void) table.getShare()->getTableName(table_name);
1558
1544
 
1559
1545
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1560
1546
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1561
1547
 
1562
1548
  Field *current_field;
1563
 
  Field **table_fields= in_table->getFields();
 
1549
  Field **table_fields= table.getFields();
1564
1550
 
1565
1551
  message::FieldMetadata *field_metadata;
1566
1552
 
1567
1553
  /* We will read all the table's fields... */
1568
 
  in_table->setReadSet();
 
1554
  table.setReadSet();
1569
1555
 
1570
1556
  while ((current_field= *table_fields++) != NULL) 
1571
1557
  {
1573
1559
     * We add the "key field metadata" -- i.e. the fields which is
1574
1560
     * the primary key for the table.
1575
1561
     */
1576
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1562
    if (table.getShare()->fieldInPrimaryKey(current_field))
1577
1563
    {
1578
1564
      field_metadata= header->add_key_field_metadata();
1579
1565
      field_metadata->set_name(current_field->field_name);
1580
1566
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1581
1567
    }
1582
1568
 
1583
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1569
    if (isFieldUpdated(current_field, table, old_record, new_record))
1584
1570
    {
1585
1571
      /* Field is changed from old to new */
1586
1572
      field_metadata= header->add_set_field_metadata();
1589
1575
    }
1590
1576
  }
1591
1577
}
1592
 
void TransactionServices::updateRecord(Session *in_session,
1593
 
                                       Table *in_table, 
 
1578
void TransactionServices::updateRecord(Session::reference session,
 
1579
                                       Table &table, 
1594
1580
                                       const unsigned char *old_record, 
1595
1581
                                       const unsigned char *new_record)
1596
1582
{
1599
1585
    return;
1600
1586
 
1601
1587
  uint32_t next_segment_id= 1;
1602
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1588
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1603
1589
 
1604
1590
  message::UpdateData *data= statement.mutable_update_data();
1605
1591
  data->set_segment_id(next_segment_id);
1607
1593
  message::UpdateRecord *record= data->add_record();
1608
1594
 
1609
1595
  Field *current_field;
1610
 
  Field **table_fields= in_table->getFields();
1611
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1596
  Field **table_fields= table.getFields();
 
1597
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1612
1598
  string_value->set_charset(system_charset_info);
1613
1599
 
1614
1600
  while ((current_field= *table_fields++) != NULL) 
1624
1610
     *
1625
1611
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1626
1612
     */
1627
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1613
    if (isFieldUpdated(current_field, table, old_record, new_record))
1628
1614
    {
1629
1615
      /* Store the original "read bit" for this field */
1630
1616
      bool is_read_set= current_field->isReadSet();
1631
1617
 
1632
1618
      /* We need to mark that we will "read" this field... */
1633
 
      in_table->setReadSet(current_field->position());
 
1619
      table.setReadSet(current_field->position());
1634
1620
 
1635
1621
      /* Read the string value of this field's contents */
1636
1622
      string_value= current_field->val_str_internal(string_value);
1659
1645
     * primary key field value.  Replication only supports tables
1660
1646
     * with a primary key.
1661
1647
     */
1662
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1648
    if (table.getShare()->fieldInPrimaryKey(current_field))
1663
1649
    {
1664
1650
      /**
1665
1651
       * To say the below is ugly is an understatement. But it works.
1677
1663
}
1678
1664
 
1679
1665
bool TransactionServices::isFieldUpdated(Field *current_field,
1680
 
                                         Table *in_table,
 
1666
                                         Table &table,
1681
1667
                                         const unsigned char *old_record,
1682
1668
                                         const unsigned char *new_record)
1683
1669
{
1686
1672
   * we do this crazy pointer fiddling to figure out if the current field
1687
1673
   * has been updated in the supplied record raw byte pointers.
1688
1674
   */
1689
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1690
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1675
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1676
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1691
1677
 
1692
1678
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1693
1679
 
1717
1703
  return isUpdated;
1718
1704
}  
1719
1705
 
1720
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1721
 
                                                            Table *in_table,
 
1706
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
 
1707
                                                            Table &table,
1722
1708
                                                            uint32_t *next_segment_id)
1723
1709
{
1724
 
  message::Statement *statement= in_session->getStatementMessage();
 
1710
  message::Statement *statement= session.getStatementMessage();
1725
1711
  message::Transaction *transaction= NULL;
1726
1712
 
1727
1713
  /*
1733
1719
   */
1734
1720
  if (statement == NULL)
1735
1721
  {
1736
 
    transaction= getActiveTransactionMessage(in_session);
 
1722
    transaction= getActiveTransactionMessage(session);
1737
1723
    
1738
1724
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1739
 
        in_session->variables.transaction_message_threshold)
 
1725
        session.variables.transaction_message_threshold)
1740
1726
    {
1741
 
      transaction= segmentTransactionMessage(in_session, transaction);
 
1727
      transaction= segmentTransactionMessage(session, transaction);
1742
1728
    }
1743
1729
    
1744
1730
    statement= transaction->add_statement();
1745
 
    setDeleteHeader(*statement, in_session, in_table);
1746
 
    in_session->setStatementMessage(statement);
 
1731
    setDeleteHeader(*statement, session, table);
 
1732
    session.setStatementMessage(statement);
1747
1733
  }
1748
1734
  else
1749
1735
  {
1750
 
    transaction= getActiveTransactionMessage(in_session);
 
1736
    transaction= getActiveTransactionMessage(session);
1751
1737
    
1752
1738
    /*
1753
1739
     * If we've passed our threshold for the statement size (possible for
1755
1741
     * the Transaction will keep it from getting huge).
1756
1742
     */
1757
1743
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1758
 
        in_session->variables.transaction_message_threshold)
 
1744
        session.variables.transaction_message_threshold)
1759
1745
    {
1760
1746
      /* Remember the transaction ID so we can re-use it */
1761
1747
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1774
1760
       * statement and transaction. This will also set the Transaction
1775
1761
       * and Statement objects in Session to NULL.
1776
1762
       */
1777
 
      commitTransactionMessage(in_session);
 
1763
      commitTransactionMessage(session);
1778
1764
      
1779
1765
      /*
1780
1766
       * Statement and Transaction should now be NULL, so new ones will get
1781
1767
       * created. We reuse the transaction id since we are segmenting
1782
1768
       * one transaction.
1783
1769
       */
1784
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1770
      transaction= getActiveTransactionMessage(session, false);
1785
1771
      assert(transaction != NULL);
1786
1772
      
1787
1773
      statement= transaction->add_statement();
1788
 
      setDeleteHeader(*statement, in_session, in_table);
1789
 
      in_session->setStatementMessage(statement);
 
1774
      setDeleteHeader(*statement, session, table);
 
1775
      session.setStatementMessage(statement);
1790
1776
      
1791
1777
      /* Set the transaction ID to match the previous messages */
1792
1778
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1808
1794
}
1809
1795
 
1810
1796
void TransactionServices::setDeleteHeader(message::Statement &statement,
1811
 
                                          Session *in_session,
1812
 
                                          Table *in_table)
 
1797
                                          Session::const_reference session,
 
1798
                                          Table &table)
1813
1799
{
1814
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1800
  initStatementMessage(statement, message::Statement::DELETE, session);
1815
1801
 
1816
1802
  /* 
1817
1803
   * Now we construct the specialized DeleteHeader message inside
1821
1807
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1822
1808
 
1823
1809
  string schema_name;
1824
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1810
  (void) table.getShare()->getSchemaName(schema_name);
1825
1811
  string table_name;
1826
 
  (void) in_table->getShare()->getTableName(table_name);
 
1812
  (void) table.getShare()->getTableName(table_name);
1827
1813
 
1828
1814
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1829
1815
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1830
1816
 
1831
1817
  Field *current_field;
1832
 
  Field **table_fields= in_table->getFields();
 
1818
  Field **table_fields= table.getFields();
1833
1819
 
1834
1820
  message::FieldMetadata *field_metadata;
1835
1821
 
1840
1826
     * primary key field value.  Replication only supports tables
1841
1827
     * with a primary key.
1842
1828
     */
1843
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1829
    if (table.getShare()->fieldInPrimaryKey(current_field))
1844
1830
    {
1845
1831
      field_metadata= header->add_key_field_metadata();
1846
1832
      field_metadata->set_name(current_field->field_name);
1849
1835
  }
1850
1836
}
1851
1837
 
1852
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1838
void TransactionServices::deleteRecord(Session::reference session,
 
1839
                                       Table &table,
 
1840
                                       bool use_update_record)
1853
1841
{
1854
1842
  ReplicationServices &replication_services= ReplicationServices::singleton();
1855
1843
  if (! replication_services.isActive())
1856
1844
    return;
1857
1845
 
1858
1846
  uint32_t next_segment_id= 1;
1859
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1847
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1860
1848
 
1861
1849
  message::DeleteData *data= statement.mutable_delete_data();
1862
1850
  data->set_segment_id(next_segment_id);
1864
1852
  message::DeleteRecord *record= data->add_record();
1865
1853
 
1866
1854
  Field *current_field;
1867
 
  Field **table_fields= in_table->getFields();
1868
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1855
  Field **table_fields= table.getFields();
 
1856
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1869
1857
  string_value->set_charset(system_charset_info);
1870
1858
 
1871
1859
  while ((current_field= *table_fields++) != NULL) 
1875
1863
     * primary key field value.  Replication only supports tables
1876
1864
     * with a primary key.
1877
1865
     */
1878
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1866
    if (table.getShare()->fieldInPrimaryKey(current_field))
1879
1867
    {
1880
1868
      if (use_update_record)
1881
1869
      {
1887
1875
         * We are careful not to change anything in old_ptr.
1888
1876
         */
1889
1877
        const unsigned char *old_ptr= current_field->ptr;
1890
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1878
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1891
1879
        string_value= current_field->val_str_internal(string_value);
1892
1880
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1893
1881
      }
1904
1892
  }
1905
1893
}
1906
1894
 
1907
 
void TransactionServices::createTable(Session *in_session,
 
1895
void TransactionServices::createTable(Session::reference session,
1908
1896
                                      const message::Table &table)
1909
1897
{
1910
1898
  ReplicationServices &replication_services= ReplicationServices::singleton();
1911
1899
  if (! replication_services.isActive())
1912
1900
    return;
1913
1901
  
1914
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1902
  message::Transaction *transaction= getActiveTransactionMessage(session);
1915
1903
  message::Statement *statement= transaction->add_statement();
1916
1904
 
1917
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1905
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1918
1906
 
1919
1907
  /* 
1920
1908
   * Construct the specialized CreateTableStatement message and attach
1924
1912
  message::Table *new_table_message= create_table_statement->mutable_table();
1925
1913
  *new_table_message= table;
1926
1914
 
1927
 
  finalizeStatementMessage(*statement, in_session);
 
1915
  finalizeStatementMessage(*statement, session);
1928
1916
 
1929
 
  finalizeTransactionMessage(*transaction, in_session);
 
1917
  finalizeTransactionMessage(*transaction, session);
1930
1918
  
1931
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1919
  (void) replication_services.pushTransactionMessage(session, *transaction);
1932
1920
 
1933
 
  cleanupTransactionMessage(transaction, in_session);
 
1921
  cleanupTransactionMessage(transaction, session);
1934
1922
 
1935
1923
}
1936
1924
 
1937
 
void TransactionServices::createSchema(Session *in_session,
 
1925
void TransactionServices::createSchema(Session::reference session,
1938
1926
                                       const message::Schema &schema)
1939
1927
{
1940
1928
  ReplicationServices &replication_services= ReplicationServices::singleton();
1941
1929
  if (! replication_services.isActive())
1942
1930
    return;
1943
1931
  
1944
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1932
  message::Transaction *transaction= getActiveTransactionMessage(session);
1945
1933
  message::Statement *statement= transaction->add_statement();
1946
1934
 
1947
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1935
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1948
1936
 
1949
1937
  /* 
1950
1938
   * Construct the specialized CreateSchemaStatement message and attach
1954
1942
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1955
1943
  *new_schema_message= schema;
1956
1944
 
1957
 
  finalizeStatementMessage(*statement, in_session);
 
1945
  finalizeStatementMessage(*statement, session);
1958
1946
 
1959
 
  finalizeTransactionMessage(*transaction, in_session);
 
1947
  finalizeTransactionMessage(*transaction, session);
1960
1948
  
1961
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1949
  (void) replication_services.pushTransactionMessage(session, *transaction);
1962
1950
 
1963
 
  cleanupTransactionMessage(transaction, in_session);
 
1951
  cleanupTransactionMessage(transaction, session);
1964
1952
 
1965
1953
}
1966
1954
 
1967
 
void TransactionServices::dropSchema(Session *in_session, identifier::Schema::const_reference identifier)
 
1955
void TransactionServices::dropSchema(Session::reference session,
 
1956
                                     identifier::Schema::const_reference identifier)
1968
1957
{
1969
1958
  ReplicationServices &replication_services= ReplicationServices::singleton();
1970
1959
  if (! replication_services.isActive())
1971
1960
    return;
1972
1961
  
1973
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1962
  message::Transaction *transaction= getActiveTransactionMessage(session);
1974
1963
  message::Statement *statement= transaction->add_statement();
1975
1964
 
1976
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1965
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1977
1966
 
1978
1967
  /* 
1979
1968
   * Construct the specialized DropSchemaStatement message and attach
1983
1972
 
1984
1973
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1985
1974
 
1986
 
  finalizeStatementMessage(*statement, in_session);
 
1975
  finalizeStatementMessage(*statement, session);
1987
1976
 
1988
 
  finalizeTransactionMessage(*transaction, in_session);
 
1977
  finalizeTransactionMessage(*transaction, session);
1989
1978
  
1990
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1979
  (void) replication_services.pushTransactionMessage(session, *transaction);
1991
1980
 
1992
 
  cleanupTransactionMessage(transaction, in_session);
 
1981
  cleanupTransactionMessage(transaction, session);
1993
1982
}
1994
1983
 
1995
 
void TransactionServices::alterSchema(Session *in_session,
 
1984
void TransactionServices::alterSchema(Session::reference session,
1996
1985
                                      const message::schema::shared_ptr &old_schema,
1997
1986
                                      const message::Schema &new_schema)
1998
1987
{
2000
1989
  if (! replication_services.isActive())
2001
1990
    return;
2002
1991
  
2003
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1992
  message::Transaction *transaction= getActiveTransactionMessage(session);
2004
1993
  message::Statement *statement= transaction->add_statement();
2005
1994
 
2006
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, in_session);
 
1995
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2007
1996
 
2008
1997
  /* 
2009
1998
   * Construct the specialized AlterSchemaStatement message and attach
2017
2006
  *before= *old_schema;
2018
2007
  *after= new_schema;
2019
2008
 
2020
 
  finalizeStatementMessage(*statement, in_session);
 
2009
  finalizeStatementMessage(*statement, session);
2021
2010
 
2022
 
  finalizeTransactionMessage(*transaction, in_session);
 
2011
  finalizeTransactionMessage(*transaction, session);
2023
2012
  
2024
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2013
  (void) replication_services.pushTransactionMessage(session, *transaction);
2025
2014
 
2026
 
  cleanupTransactionMessage(transaction, in_session);
 
2015
  cleanupTransactionMessage(transaction, session);
2027
2016
}
2028
2017
 
2029
 
void TransactionServices::dropTable(Session *in_session,
 
2018
void TransactionServices::dropTable(Session::reference session,
2030
2019
                                    const identifier::Table &table,
2031
2020
                                    bool if_exists)
2032
2021
{
2034
2023
  if (! replication_services.isActive())
2035
2024
    return;
2036
2025
  
2037
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2026
  message::Transaction *transaction= getActiveTransactionMessage(session);
2038
2027
  message::Statement *statement= transaction->add_statement();
2039
2028
 
2040
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
2029
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2041
2030
 
2042
2031
  /* 
2043
2032
   * Construct the specialized DropTableStatement message and attach
2052
2041
  table_metadata->set_schema_name(table.getSchemaName());
2053
2042
  table_metadata->set_table_name(table.getTableName());
2054
2043
 
2055
 
  finalizeStatementMessage(*statement, in_session);
 
2044
  finalizeStatementMessage(*statement, session);
2056
2045
 
2057
 
  finalizeTransactionMessage(*transaction, in_session);
 
2046
  finalizeTransactionMessage(*transaction, session);
2058
2047
  
2059
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2048
  (void) replication_services.pushTransactionMessage(session, *transaction);
2060
2049
 
2061
 
  cleanupTransactionMessage(transaction, in_session);
 
2050
  cleanupTransactionMessage(transaction, session);
2062
2051
}
2063
2052
 
2064
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
2053
void TransactionServices::truncateTable(Session::reference session,
 
2054
                                        Table &table)
2065
2055
{
2066
2056
  ReplicationServices &replication_services= ReplicationServices::singleton();
2067
2057
  if (! replication_services.isActive())
2068
2058
    return;
2069
2059
  
2070
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2060
  message::Transaction *transaction= getActiveTransactionMessage(session);
2071
2061
  message::Statement *statement= transaction->add_statement();
2072
2062
 
2073
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
2063
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2074
2064
 
2075
2065
  /* 
2076
2066
   * Construct the specialized TruncateTableStatement message and attach
2080
2070
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2081
2071
 
2082
2072
  string schema_name;
2083
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
2073
  (void) table.getShare()->getSchemaName(schema_name);
2084
2074
  string table_name;
2085
 
  (void) in_table->getShare()->getTableName(table_name);
 
2075
  (void) table.getShare()->getTableName(table_name);
2086
2076
 
2087
2077
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2088
2078
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2089
2079
 
2090
 
  finalizeStatementMessage(*statement, in_session);
 
2080
  finalizeStatementMessage(*statement, session);
2091
2081
 
2092
 
  finalizeTransactionMessage(*transaction, in_session);
 
2082
  finalizeTransactionMessage(*transaction, session);
2093
2083
  
2094
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2084
  (void) replication_services.pushTransactionMessage(session, *transaction);
2095
2085
 
2096
 
  cleanupTransactionMessage(transaction, in_session);
 
2086
  cleanupTransactionMessage(transaction, session);
2097
2087
}
2098
2088
 
2099
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
2089
void TransactionServices::rawStatement(Session::reference session,
 
2090
                                       const string &query)
2100
2091
{
2101
2092
  ReplicationServices &replication_services= ReplicationServices::singleton();
2102
2093
  if (! replication_services.isActive())
2103
2094
    return;
2104
2095
 
2105
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2096
  message::Transaction *transaction= getActiveTransactionMessage(session);
2106
2097
  message::Statement *statement= transaction->add_statement();
2107
2098
 
2108
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
2099
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2109
2100
  statement->set_sql(query);
2110
 
  finalizeStatementMessage(*statement, in_session);
 
2101
  finalizeStatementMessage(*statement, session);
2111
2102
 
2112
 
  finalizeTransactionMessage(*transaction, in_session);
 
2103
  finalizeTransactionMessage(*transaction, session);
2113
2104
  
2114
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2105
  (void) replication_services.pushTransactionMessage(session, *transaction);
2115
2106
 
2116
 
  cleanupTransactionMessage(transaction, in_session);
 
2107
  cleanupTransactionMessage(transaction, session);
2117
2108
}
2118
2109
 
2119
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2110
int TransactionServices::sendEvent(Session::reference session,
 
2111
                                   const message::Event &event)
2120
2112
{
2121
2113
  ReplicationServices &replication_services= ReplicationServices::singleton();
2122
2114
  if (! replication_services.isActive())
2134
2126
 
2135
2127
  trx_event->CopyFrom(event);
2136
2128
 
2137
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2129
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2138
2130
 
2139
2131
  delete transaction;
2140
2132
 
2141
2133
  return static_cast<int>(result);
2142
2134
}
2143
2135
 
2144
 
bool TransactionServices::sendStartupEvent(Session *session)
 
2136
bool TransactionServices::sendStartupEvent(Session::reference session)
2145
2137
{
2146
2138
  message::Event event;
2147
2139
  event.set_type(message::Event::STARTUP);
2150
2142
  return true;
2151
2143
}
2152
2144
 
2153
 
bool TransactionServices::sendShutdownEvent(Session *session)
 
2145
bool TransactionServices::sendShutdownEvent(Session::reference session)
2154
2146
{
2155
2147
  message::Event event;
2156
2148
  event.set_type(message::Event::SHUTDOWN);