~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

edit

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
 * plugins can understand.
48
48
 */
49
49
 
50
 
#include <config.h>
51
 
#include <drizzled/current_session.h>
52
 
#include <drizzled/my_hash.h>
53
 
#include <drizzled/error.h>
54
 
#include <drizzled/gettext.h>
55
 
#include <drizzled/probes.h>
56
 
#include <drizzled/sql_parse.h>
57
 
#include <drizzled/session.h>
58
 
#include <drizzled/sql_base.h>
59
 
#include <drizzled/replication_services.h>
60
 
#include <drizzled/transaction_services.h>
61
 
#include <drizzled/transaction_context.h>
62
 
#include <drizzled/message/transaction.pb.h>
63
 
#include <drizzled/message/statement_transform.h>
64
 
#include <drizzled/resource_context.h>
65
 
#include <drizzled/lock.h>
66
 
#include <drizzled/item/int.h>
67
 
#include <drizzled/item/empty_string.h>
68
 
#include <drizzled/field/epoch.h>
69
 
#include <drizzled/plugin/client.h>
70
 
#include <drizzled/plugin/monitored_in_transaction.h>
71
 
#include <drizzled/plugin/transactional_storage_engine.h>
72
 
#include <drizzled/plugin/xa_resource_manager.h>
73
 
#include <drizzled/plugin/xa_storage_engine.h>
74
 
#include <drizzled/internal/my_sys.h>
 
50
#include "config.h"
 
51
#include "drizzled/my_hash.h"
 
52
#include "drizzled/error.h"
 
53
#include "drizzled/gettext.h"
 
54
#include "drizzled/probes.h"
 
55
#include "drizzled/sql_parse.h"
 
56
#include "drizzled/session.h"
 
57
#include "drizzled/sql_base.h"
 
58
#include "drizzled/replication_services.h"
 
59
#include "drizzled/transaction_services.h"
 
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
 
63
#include "drizzled/resource_context.h"
 
64
#include "drizzled/lock.h"
 
65
#include "drizzled/item/int.h"
 
66
#include "drizzled/item/empty_string.h"
 
67
#include "drizzled/field/epoch.h"
 
68
#include "drizzled/plugin/client.h"
 
69
#include "drizzled/plugin/monitored_in_transaction.h"
 
70
#include "drizzled/plugin/transactional_storage_engine.h"
 
71
#include "drizzled/plugin/xa_resource_manager.h"
 
72
#include "drizzled/plugin/xa_storage_engine.h"
 
73
#include "drizzled/internal/my_sys.h"
75
74
 
76
75
#include <vector>
77
76
#include <algorithm>
314
313
  }
315
314
}
316
315
 
317
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
316
void TransactionServices::registerResourceForStatement(Session *session,
318
317
                                                       plugin::MonitoredInTransaction *monitored,
319
318
                                                       plugin::TransactionalStorageEngine *engine)
320
319
{
321
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
322
321
  {
323
322
    /* 
324
323
     * Now we automatically register this resource manager for the
330
329
    registerResourceForTransaction(session, monitored, engine);
331
330
  }
332
331
 
333
 
  TransactionContext *trans= &session.transaction.stmt;
334
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session->transaction.stmt;
 
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
335
334
 
336
335
  if (resource_context->isStarted())
337
336
    return; /* already registered, return */
346
345
  trans->no_2pc|= true;
347
346
}
348
347
 
349
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
348
void TransactionServices::registerResourceForStatement(Session *session,
350
349
                                                       plugin::MonitoredInTransaction *monitored,
351
350
                                                       plugin::TransactionalStorageEngine *engine,
352
351
                                                       plugin::XaResourceManager *resource_manager)
353
352
{
354
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
355
354
  {
356
355
    /* 
357
356
     * Now we automatically register this resource manager for the
363
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
364
363
  }
365
364
 
366
 
  TransactionContext *trans= &session.transaction.stmt;
367
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session->transaction.stmt;
 
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
368
367
 
369
368
  if (resource_context->isStarted())
370
369
    return; /* already registered, return */
380
379
  trans->no_2pc|= false;
381
380
}
382
381
 
383
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
382
void TransactionServices::registerResourceForTransaction(Session *session,
384
383
                                                         plugin::MonitoredInTransaction *monitored,
385
384
                                                         plugin::TransactionalStorageEngine *engine)
386
385
{
387
 
  TransactionContext *trans= &session.transaction.all;
388
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session->transaction.all;
 
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
389
388
 
390
389
  if (resource_context->isStarted())
391
390
    return; /* already registered, return */
392
391
 
393
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
394
393
 
395
394
  trans->registerResource(resource_context);
396
395
 
401
400
  resource_context->setTransactionalStorageEngine(engine);
402
401
  trans->no_2pc|= true;
403
402
 
404
 
  if (session.transaction.xid_state.xid.is_null())
405
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
403
  if (session->transaction.xid_state.xid.is_null())
 
404
    session->transaction.xid_state.xid.set(session->getQueryId());
406
405
 
407
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
409
408
    registerResourceForStatement(session, monitored, engine);
410
409
}
411
410
 
412
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
411
void TransactionServices::registerResourceForTransaction(Session *session,
413
412
                                                         plugin::MonitoredInTransaction *monitored,
414
413
                                                         plugin::TransactionalStorageEngine *engine,
415
414
                                                         plugin::XaResourceManager *resource_manager)
416
415
{
417
 
  TransactionContext *trans= &session.transaction.all;
418
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session->transaction.all;
 
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
419
418
 
420
419
  if (resource_context->isStarted())
421
420
    return; /* already registered, return */
422
421
 
423
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
424
423
 
425
424
  trans->registerResource(resource_context);
426
425
 
431
430
  resource_context->setTransactionalStorageEngine(engine);
432
431
  trans->no_2pc|= true;
433
432
 
434
 
  if (session.transaction.xid_state.xid.is_null())
435
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
433
  if (session->transaction.xid_state.xid.is_null())
 
434
    session->transaction.xid_state.xid.set(session->getQueryId());
436
435
 
437
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
438
437
 
439
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
441
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
442
441
}
443
442
 
454
453
  my_session->setXaId(xa_id);
455
454
}
456
455
 
457
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
457
{
459
 
  if (session.getXaId() == 0)
 
458
  if (session->getXaId() == 0)
460
459
  {
461
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
462
461
  }
463
462
 
464
 
  return session.getXaId();
 
463
  return session->getXaId();
465
464
}
466
465
 
467
 
int TransactionServices::commitTransaction(Session::reference session,
468
 
                                           bool normal_transaction)
 
466
/**
 
467
  @retval
 
468
    0   ok
 
469
  @retval
 
470
    1   transaction was rolled back
 
471
  @retval
 
472
    2   error during commit, data may be inconsistent
 
473
 
 
474
  @todo
 
475
    Since we don't support nested statement transactions in 5.0,
 
476
    we can't commit or rollback stmt transactions while we are inside
 
477
    stored functions or triggers. So we simply do nothing now.
 
478
    TODO: This should be fixed in later ( >= 5.1) releases.
 
479
*/
 
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
469
481
{
470
482
  int error= 0, cookie= 0;
471
483
  /*
472
484
    'all' means that this is either an explicit commit issued by
473
485
    user, or an implicit commit issued by a DDL.
474
486
  */
475
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
476
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
477
489
 
478
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
479
491
 
480
492
  /*
481
493
    We must not commit the normal transaction if a statement
483
495
    flags will not get propagated to its normal transaction's
484
496
    counterpart.
485
497
  */
486
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
487
 
              trans == &session.transaction.stmt);
 
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
499
              trans == &session->transaction.stmt);
488
500
 
489
501
  if (resource_contexts.empty() == false)
490
502
  {
491
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
492
504
    {
493
505
      rollbackTransaction(session, normal_transaction);
494
506
      return 1;
519
531
 
520
532
        if (resource->participatesInXaTransaction())
521
533
        {
522
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
523
535
          {
524
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
525
537
            error= 1;
526
538
          }
527
539
          else
528
540
          {
529
 
            session.status_var.ha_prepare_count++;
 
541
            session->status_var.ha_prepare_count++;
530
542
          }
531
543
        }
532
544
      }
548
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
549
561
end:
550
562
    if (is_real_trans)
551
 
      session.startWaitingGlobalReadLock();
 
563
      session->startWaitingGlobalReadLock();
552
564
  }
553
565
  return error;
554
566
}
557
569
  @note
558
570
  This function does not care about global read lock. A caller should.
559
571
*/
560
 
int TransactionServices::commitPhaseOne(Session::reference session,
561
 
                                        bool normal_transaction)
 
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
562
573
{
563
574
  int error=0;
564
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
565
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
566
577
 
567
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
568
 
  bool all= normal_transaction;
569
 
 
570
 
  /* If we're in autocommit then we have a real transaction to commit
571
 
     (except if it's BEGIN)
572
 
  */
573
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
574
 
    all= true;
 
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
575
579
 
576
580
  if (resource_contexts.empty() == false)
577
581
  {
586
590
 
587
591
      if (resource->participatesInXaTransaction())
588
592
      {
589
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
590
594
        {
591
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
592
596
          error= 1;
593
597
        }
594
598
        else if (normal_transaction)
595
599
        {
596
 
          session.status_var.ha_commit_count++;
 
600
          session->status_var.ha_commit_count++;
597
601
        }
598
602
      }
599
603
      else if (resource->participatesInSqlTransaction())
600
604
      {
601
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
602
606
        {
603
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
604
608
          error= 1;
605
609
        }
606
610
        else if (normal_transaction)
607
611
        {
608
 
          session.status_var.ha_commit_count++;
 
612
          session->status_var.ha_commit_count++;
609
613
        }
610
614
      }
611
615
      resource_context->reset(); /* keep it conveniently zero-filled */
612
616
    }
613
617
 
614
618
    if (is_real_trans)
615
 
      session.transaction.xid_state.xid.null();
 
619
      session->transaction.xid_state.xid.null();
616
620
 
617
621
    if (normal_transaction)
618
622
    {
619
 
      session.variables.tx_isolation= session.session_tx_isolation;
620
 
      session.transaction.cleanup();
 
623
      session->variables.tx_isolation= session->session_tx_isolation;
 
624
      session->transaction.cleanup();
621
625
    }
622
626
  }
623
627
  trans->reset();
624
628
  return error;
625
629
}
626
630
 
627
 
int TransactionServices::rollbackTransaction(Session::reference session,
628
 
                                             bool normal_transaction)
 
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
629
632
{
630
633
  int error= 0;
631
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
632
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
636
 
634
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
636
638
 
637
639
  /*
638
640
    We must not rollback the normal transaction if a statement
639
641
    transaction is pending.
640
642
  */
641
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
642
 
              trans == &session.transaction.stmt);
 
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
644
              trans == &session->transaction.stmt);
643
645
 
644
646
  if (resource_contexts.empty() == false)
645
647
  {
654
656
 
655
657
      if (resource->participatesInXaTransaction())
656
658
      {
657
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
658
660
        {
659
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
660
662
          error= 1;
661
663
        }
662
664
        else if (normal_transaction)
663
665
        {
664
 
          session.status_var.ha_rollback_count++;
 
666
          session->status_var.ha_rollback_count++;
665
667
        }
666
668
      }
667
669
      else if (resource->participatesInSqlTransaction())
668
670
      {
669
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
670
672
        {
671
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
672
674
          error= 1;
673
675
        }
674
676
        else if (normal_transaction)
675
677
        {
676
 
          session.status_var.ha_rollback_count++;
 
678
          session->status_var.ha_rollback_count++;
677
679
        }
678
680
      }
679
681
      resource_context->reset(); /* keep it conveniently zero-filled */
686
688
     * a rollback statement with the corresponding transaction ID
687
689
     * to rollback.
688
690
     */
689
 
    if (all)
 
691
    if (normal_transaction)
690
692
      rollbackTransactionMessage(session);
691
693
    else
692
694
      rollbackStatementMessage(session);
693
695
 
694
696
    if (is_real_trans)
695
 
      session.transaction.xid_state.xid.null();
 
697
      session->transaction.xid_state.xid.null();
696
698
    if (normal_transaction)
697
699
    {
698
 
      session.variables.tx_isolation=session.session_tx_isolation;
699
 
      session.transaction.cleanup();
 
700
      session->variables.tx_isolation=session->session_tx_isolation;
 
701
      session->transaction.cleanup();
700
702
    }
701
703
  }
702
704
  if (normal_transaction)
703
 
    session.transaction_rollback_request= false;
 
705
    session->transaction_rollback_request= false;
704
706
 
705
707
  /*
706
708
   * If a non-transactional table was updated, warn the user
707
709
   */
708
710
  if (is_real_trans &&
709
 
      session.transaction.all.hasModifiedNonTransData() &&
710
 
      session.getKilled() != Session::KILL_CONNECTION)
 
711
      session->transaction.all.hasModifiedNonTransData() &&
 
712
      session->getKilled() != Session::KILL_CONNECTION)
711
713
  {
712
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
714
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
715
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
716
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
715
717
  }
717
719
  return error;
718
720
}
719
721
 
720
 
int TransactionServices::autocommitOrRollback(Session::reference session,
721
 
                                              int error)
 
722
/**
 
723
  This is used to commit or rollback a single statement depending on
 
724
  the value of error.
 
725
 
 
726
  @note
 
727
    Note that if the autocommit is on, then the following call inside
 
728
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
729
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
730
    the user has used LOCK TABLES then that mechanism does not know to do the
 
731
    commit.
 
732
*/
 
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
722
734
{
723
735
  /* One GPB Statement message per SQL statement */
724
 
  message::Statement *statement= session.getStatementMessage();
 
736
  message::Statement *statement= session->getStatementMessage();
725
737
  if ((statement != NULL) && (! error))
726
738
    finalizeStatementMessage(*statement, session);
727
739
 
728
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
 
740
  if (session->transaction.stmt.getResourceContexts().empty() == false)
729
741
  {
730
 
    TransactionContext *trans = &session.transaction.stmt;
 
742
    TransactionContext *trans = &session->transaction.stmt;
731
743
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
732
744
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
733
745
         it != resource_contexts.end();
735
747
    {
736
748
      ResourceContext *resource_context= *it;
737
749
 
738
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
 
750
      resource_context->getTransactionalStorageEngine()->endStatement(session);
739
751
    }
740
752
 
741
753
    if (! error)
746
758
    else
747
759
    {
748
760
      (void) rollbackTransaction(session, false);
749
 
      if (session.transaction_rollback_request)
750
 
      {
 
761
      if (session->transaction_rollback_request)
751
762
        (void) rollbackTransaction(session, true);
752
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
753
 
      }
754
763
    }
755
764
 
756
 
    session.variables.tx_isolation= session.session_tx_isolation;
 
765
    session->variables.tx_isolation= session->session_tx_isolation;
757
766
  }
758
 
 
759
767
  return error;
760
768
}
761
769
 
769
777
  }
770
778
};
771
779
 
772
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
773
 
                                             NamedSavepoint &sv)
 
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
774
781
{
775
782
  int error= 0;
776
 
  TransactionContext *trans= &session.transaction.all;
 
783
  TransactionContext *trans= &session->transaction.all;
777
784
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
778
785
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
779
786
 
793
800
 
794
801
    if (resource->participatesInSqlTransaction())
795
802
    {
796
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
803
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
797
804
      {
798
805
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
799
806
        error= 1;
800
807
      }
801
808
      else
802
809
      {
803
 
        session.status_var.ha_savepoint_rollback_count++;
 
810
        session->status_var.ha_savepoint_rollback_count++;
804
811
      }
805
812
    }
806
813
    trans->no_2pc|= not resource->participatesInXaTransaction();
850
857
 
851
858
      if (resource->participatesInSqlTransaction())
852
859
      {
853
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
860
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
854
861
        {
855
862
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
856
863
          error= 1;
857
864
        }
858
865
        else
859
866
        {
860
 
          session.status_var.ha_rollback_count++;
 
867
          session->status_var.ha_rollback_count++;
861
868
        }
862
869
      }
863
870
      resource_context->reset(); /* keep it conveniently zero-filled */
880
887
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
881
888
      if (num_statements == 0)
882
889
      {    
883
 
        session.setStatementMessage(NULL);
 
890
        session->setStatementMessage(NULL);
884
891
      }    
885
892
      else 
886
893
      {
887
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
894
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
888
895
      }    
889
 
      session.setTransactionMessage(savepoint_transaction_copy);
 
896
      session->setTransactionMessage(savepoint_transaction_copy);
890
897
    }
891
898
  }
892
899
 
899
906
  section "4.33.4 SQL-statements and transaction states",
900
907
  NamedSavepoint is *not* transaction-initiating SQL-statement
901
908
*/
902
 
int TransactionServices::setSavepoint(Session::reference session,
903
 
                                      NamedSavepoint &sv)
 
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
904
910
{
905
911
  int error= 0;
906
 
  TransactionContext *trans= &session.transaction.all;
 
912
  TransactionContext *trans= &session->transaction.all;
907
913
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
908
914
 
909
915
  if (resource_contexts.empty() == false)
919
925
 
920
926
      if (resource->participatesInSqlTransaction())
921
927
      {
922
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
928
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
923
929
        {
924
930
          my_error(ER_GET_ERRNO, MYF(0), err);
925
931
          error= 1;
926
932
        }
927
933
        else
928
934
        {
929
 
          session.status_var.ha_savepoint_count++;
 
935
          session->status_var.ha_savepoint_count++;
930
936
        }
931
937
      }
932
938
    }
938
944
 
939
945
  if (shouldConstructMessages())
940
946
  {
941
 
    message::Transaction *transaction= session.getTransactionMessage();
 
947
    message::Transaction *transaction= session->getTransactionMessage();
942
948
                  
943
949
    if (transaction != NULL)
944
950
    {
951
957
  return error;
952
958
}
953
959
 
954
 
int TransactionServices::releaseSavepoint(Session::reference session,
955
 
                                          NamedSavepoint &sv)
 
960
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
956
961
{
957
962
  int error= 0;
958
963
 
969
974
 
970
975
    if (resource->participatesInSqlTransaction())
971
976
    {
972
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
977
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
973
978
      {
974
979
        my_error(ER_GET_ERRNO, MYF(0), err);
975
980
        error= 1;
986
991
  return replication_services.isActive();
987
992
}
988
993
 
989
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
990
 
                                                                       bool should_inc_trx_id)
 
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
991
995
{
992
 
  message::Transaction *transaction= session.getTransactionMessage();
 
996
  message::Transaction *transaction= in_session->getTransactionMessage();
993
997
 
994
998
  if (unlikely(transaction == NULL))
995
999
  {
999
1003
     * deleting transaction message when done with it.
1000
1004
     */
1001
1005
    transaction= new (nothrow) message::Transaction();
1002
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
 
    session.setTransactionMessage(transaction);
 
1006
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
1007
    in_session->setTransactionMessage(transaction);
1004
1008
    return transaction;
1005
1009
  }
1006
1010
  else
1007
1011
    return transaction;
1008
1012
}
1009
1013
 
1010
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
 
                                                 Session::reference session,
 
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1015
                                                 Session *in_session,
1012
1016
                                                 bool should_inc_trx_id)
1013
1017
{
1014
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
 
  trx->set_server_id(session.getServerId());
 
1018
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1019
  trx->set_server_id(in_session->getServerId());
1016
1020
 
1017
1021
  if (should_inc_trx_id)
1018
1022
  {
1019
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1020
 
    session.setXaId(0);
 
1023
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1024
    in_session->setXaId(0);
1021
1025
  }
1022
1026
  else
1023
1027
  {
1025
1029
    trx->set_transaction_id(0);
1026
1030
  }
1027
1031
 
1028
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
 
1032
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1029
1033
  
1030
1034
  /* segment info may get set elsewhere as needed */
1031
 
  transaction.set_segment_id(1);
1032
 
  transaction.set_end_segment(true);
1033
 
}
1034
 
 
1035
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
 
                                                     Session::const_reference session)
1037
 
{
1038
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1040
 
}
1041
 
 
1042
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
 
                                                    Session::reference session)
1044
 
{
1045
 
  delete transaction;
1046
 
  session.setStatementMessage(NULL);
1047
 
  session.setTransactionMessage(NULL);
1048
 
  session.setXaId(0);
1049
 
}
1050
 
 
1051
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
1035
  in_transaction.set_segment_id(1);
 
1036
  in_transaction.set_end_segment(true);
 
1037
}
 
1038
 
 
1039
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1040
                                              Session *in_session)
 
1041
{
 
1042
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1043
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1044
}
 
1045
 
 
1046
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1047
                                             Session *in_session)
 
1048
{
 
1049
  delete in_transaction;
 
1050
  in_session->setStatementMessage(NULL);
 
1051
  in_session->setTransactionMessage(NULL);
 
1052
  in_session->setXaId(0);
 
1053
}
 
1054
 
 
1055
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
1056
{
1053
1057
  ReplicationServices &replication_services= ReplicationServices::singleton();
1054
1058
  if (! replication_services.isActive())
1058
1062
   * If no Transaction message was ever created, then no data modification
1059
1063
   * occurred inside the transaction, so nothing to do.
1060
1064
   */
1061
 
  if (session.getTransactionMessage() == NULL)
 
1065
  if (in_session->getTransactionMessage() == NULL)
1062
1066
    return 0;
1063
1067
  
1064
1068
  /* If there is an active statement message, finalize it. */
1065
 
  message::Statement *statement= session.getStatementMessage();
 
1069
  message::Statement *statement= in_session->getStatementMessage();
1066
1070
 
1067
1071
  if (statement != NULL)
1068
1072
  {
1069
 
    finalizeStatementMessage(*statement, session);
 
1073
    finalizeStatementMessage(*statement, in_session);
1070
1074
  }
1071
1075
 
1072
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
 
1076
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1073
1077
 
1074
1078
  /*
1075
1079
   * It is possible that we could have a Transaction without any Statements
1079
1083
   */
1080
1084
  if (transaction->statement_size() == 0)
1081
1085
  {
1082
 
    cleanupTransactionMessage(transaction, session);
 
1086
    cleanupTransactionMessage(transaction, in_session);
1083
1087
    return 0;
1084
1088
  }
1085
1089
  
1086
 
  finalizeTransactionMessage(*transaction, session);
 
1090
  finalizeTransactionMessage(*transaction, in_session);
1087
1091
  
1088
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
1092
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1089
1093
 
1090
 
  cleanupTransactionMessage(transaction, session);
 
1094
  cleanupTransactionMessage(transaction, in_session);
1091
1095
 
1092
1096
  return static_cast<int>(result);
1093
1097
}
1094
1098
 
1095
1099
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
 
                                               message::Statement::Type type,
1097
 
                                               Session::const_reference session)
 
1100
                                        message::Statement::Type in_type,
 
1101
                                        Session *in_session)
1098
1102
{
1099
 
  statement.set_type(type);
1100
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
 
1103
  statement.set_type(in_type);
 
1104
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1101
1105
 
1102
 
  if (session.variables.replicate_query)
1103
 
    statement.set_sql(session.getQueryString()->c_str());
 
1106
  if (in_session->variables.replicate_query)
 
1107
    statement.set_sql(in_session->getQueryString()->c_str());
1104
1108
}
1105
1109
 
1106
1110
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
 
                                                   Session::reference session)
 
1111
                                            Session *in_session)
1108
1112
{
1109
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1110
 
  session.setStatementMessage(NULL);
 
1113
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1114
  in_session->setStatementMessage(NULL);
1111
1115
}
1112
1116
 
1113
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
1117
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
1118
{
1115
1119
  ReplicationServices &replication_services= ReplicationServices::singleton();
1116
1120
  if (! replication_services.isActive())
1117
1121
    return;
1118
1122
  
1119
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1123
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1120
1124
 
1121
1125
  /*
1122
1126
   * OK, so there are two situations that we need to deal with here:
1146
1150
     * attach it to the transaction, and push it to replicators.
1147
1151
     */
1148
1152
    transaction->Clear();
1149
 
    initTransactionMessage(*transaction, session, false);
 
1153
    initTransactionMessage(*transaction, in_session, false);
1150
1154
 
1151
1155
    /* Set the transaction ID to match the previous messages */
1152
1156
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1155
1159
 
1156
1160
    message::Statement *statement= transaction->add_statement();
1157
1161
 
1158
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
 
    finalizeStatementMessage(*statement, session);
 
1162
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1163
    finalizeStatementMessage(*statement, in_session);
1160
1164
 
1161
 
    finalizeTransactionMessage(*transaction, session);
 
1165
    finalizeTransactionMessage(*transaction, in_session);
1162
1166
    
1163
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
 
1167
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1164
1168
  }
1165
 
 
1166
 
  cleanupTransactionMessage(transaction, session);
 
1169
  cleanupTransactionMessage(transaction, in_session);
1167
1170
}
1168
1171
 
1169
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
 
1172
void TransactionServices::rollbackStatementMessage(Session *in_session)
1170
1173
{
1171
1174
  ReplicationServices &replication_services= ReplicationServices::singleton();
1172
1175
  if (! replication_services.isActive())
1173
1176
    return;
1174
1177
 
1175
 
  message::Statement *current_statement= session.getStatementMessage();
 
1178
  message::Statement *current_statement= in_session->getStatementMessage();
1176
1179
 
1177
1180
  /* If we never added a Statement message, nothing to undo. */
1178
1181
  if (current_statement == NULL)
1211
1214
   * Remove the Statement message we've been working with (same as
1212
1215
   * current_statement).
1213
1216
   */
1214
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1217
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1215
1218
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
1219
  statements_in_txn= transaction->mutable_statement();
1217
1220
  statements_in_txn->RemoveLast();
1218
 
  session.setStatementMessage(NULL);
 
1221
  in_session->setStatementMessage(NULL);
1219
1222
  
1220
1223
  /*
1221
1224
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1227
1230
    current_statement= transaction->add_statement();
1228
1231
    initStatementMessage(*current_statement,
1229
1232
                         message::Statement::ROLLBACK_STATEMENT,
1230
 
                         session);
1231
 
    finalizeStatementMessage(*current_statement, session);
 
1233
                         in_session);
 
1234
    finalizeStatementMessage(*current_statement, in_session);
1232
1235
  }
1233
1236
}
1234
1237
 
1235
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
 
1238
message::Transaction *TransactionServices::segmentTransactionMessage(Session *in_session,
1236
1239
                                                                     message::Transaction *transaction)
1237
1240
{
1238
1241
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
1242
  uint32_t seg_id= transaction->segment_id();
1240
1243
  
1241
1244
  transaction->set_end_segment(false);
1242
 
  commitTransactionMessage(session);
1243
 
  transaction= getActiveTransactionMessage(session, false);
 
1245
  commitTransactionMessage(in_session);
 
1246
  transaction= getActiveTransactionMessage(in_session, false);
1244
1247
  
1245
1248
  /* Set the transaction ID to match the previous messages */
1246
1249
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1250
1253
  return transaction;
1251
1254
}
1252
1255
 
1253
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1254
 
                                                            Table &table,
 
1256
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1257
                                                            Table *in_table,
1255
1258
                                                            uint32_t *next_segment_id)
1256
1259
{
1257
 
  message::Statement *statement= session.getStatementMessage();
 
1260
  message::Statement *statement= in_session->getStatementMessage();
1258
1261
  message::Transaction *transaction= NULL;
1259
1262
  
1260
1263
  /*
1266
1269
   */
1267
1270
  if (statement == NULL)
1268
1271
  {
1269
 
    transaction= getActiveTransactionMessage(session);
 
1272
    transaction= getActiveTransactionMessage(in_session);
1270
1273
 
1271
1274
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1272
 
        transaction_message_threshold)
 
1275
        in_session->variables.transaction_message_threshold)
1273
1276
    {
1274
 
      transaction= segmentTransactionMessage(session, transaction);
 
1277
      transaction= segmentTransactionMessage(in_session, transaction);
1275
1278
    }
1276
1279
 
1277
1280
    statement= transaction->add_statement();
1278
 
    setInsertHeader(*statement, session, table);
1279
 
    session.setStatementMessage(statement);
 
1281
    setInsertHeader(*statement, in_session, in_table);
 
1282
    in_session->setStatementMessage(statement);
1280
1283
  }
1281
1284
  else
1282
1285
  {
1283
 
    transaction= getActiveTransactionMessage(session);
 
1286
    transaction= getActiveTransactionMessage(in_session);
1284
1287
    
1285
1288
    /*
1286
1289
     * If we've passed our threshold for the statement size (possible for
1288
1291
     * the Transaction will keep it from getting huge).
1289
1292
     */
1290
1293
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1291
 
        transaction_message_threshold)
 
1294
        in_session->variables.transaction_message_threshold)
1292
1295
    {
1293
1296
      /* Remember the transaction ID so we can re-use it */
1294
1297
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1307
1310
       * statement and transaction. This will also set the Transaction
1308
1311
       * and Statement objects in Session to NULL.
1309
1312
       */
1310
 
      commitTransactionMessage(session);
 
1313
      commitTransactionMessage(in_session);
1311
1314
      
1312
1315
      /*
1313
1316
       * Statement and Transaction should now be NULL, so new ones will get
1314
1317
       * created. We reuse the transaction id since we are segmenting
1315
1318
       * one transaction.
1316
1319
       */
1317
 
      transaction= getActiveTransactionMessage(session, false);
 
1320
      transaction= getActiveTransactionMessage(in_session, false);
1318
1321
      assert(transaction != NULL);
1319
1322
 
1320
1323
      statement= transaction->add_statement();
1321
 
      setInsertHeader(*statement, session, table);
1322
 
      session.setStatementMessage(statement);
 
1324
      setInsertHeader(*statement, in_session, in_table);
 
1325
      in_session->setStatementMessage(statement);
1323
1326
            
1324
1327
      /* Set the transaction ID to match the previous messages */
1325
1328
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1341
1344
}
1342
1345
 
1343
1346
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
 
                                          Session::const_reference session,
1345
 
                                          Table &table)
 
1347
                                          Session *in_session,
 
1348
                                          Table *in_table)
1346
1349
{
1347
 
  initStatementMessage(statement, message::Statement::INSERT, session);
 
1350
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1348
1351
 
1349
1352
  /* 
1350
1353
   * Now we construct the specialized InsertHeader message inside
1355
1358
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1356
1359
 
1357
1360
  string schema_name;
1358
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1361
  (void) in_table->getShare()->getSchemaName(schema_name);
1359
1362
  string table_name;
1360
 
  (void) table.getShare()->getTableName(table_name);
 
1363
  (void) in_table->getShare()->getTableName(table_name);
1361
1364
 
1362
1365
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1363
1366
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1364
1367
 
1365
1368
  Field *current_field;
1366
 
  Field **table_fields= table.getFields();
 
1369
  Field **table_fields= in_table->getFields();
1367
1370
 
1368
1371
  message::FieldMetadata *field_metadata;
1369
1372
 
1370
1373
  /* We will read all the table's fields... */
1371
 
  table.setReadSet();
 
1374
  in_table->setReadSet();
1372
1375
 
1373
1376
  while ((current_field= *table_fields++) != NULL) 
1374
1377
  {
1378
1381
  }
1379
1382
}
1380
1383
 
1381
 
bool TransactionServices::insertRecord(Session::reference session,
1382
 
                                       Table &table)
 
1384
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1383
1385
{
1384
1386
  ReplicationServices &replication_services= ReplicationServices::singleton();
1385
1387
  if (! replication_services.isActive())
1386
1388
    return false;
1387
 
 
1388
 
  if (not table.getShare()->replicate())
1389
 
    return false;
1390
 
 
1391
1389
  /**
1392
1390
   * We do this check here because we don't want to even create a 
1393
1391
   * statement if there isn't a primary key on the table...
1396
1394
   *
1397
1395
   * Multi-column primary keys are handled how exactly?
1398
1396
   */
1399
 
  if (not table.getShare()->hasPrimaryKey())
 
1397
  if (not in_table->getShare()->hasPrimaryKey())
1400
1398
  {
1401
1399
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1402
1400
    return true;
1403
1401
  }
1404
1402
 
1405
1403
  uint32_t next_segment_id= 1;
1406
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
 
1404
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1407
1405
 
1408
1406
  message::InsertData *data= statement.mutable_insert_data();
1409
1407
  data->set_segment_id(next_segment_id);
1411
1409
  message::InsertRecord *record= data->add_record();
1412
1410
 
1413
1411
  Field *current_field;
1414
 
  Field **table_fields= table.getFields();
 
1412
  Field **table_fields= in_table->getFields();
1415
1413
 
1416
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1414
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1417
1415
  string_value->set_charset(system_charset_info);
1418
1416
 
1419
1417
  /* We will read all the table's fields... */
1420
 
  table.setReadSet();
 
1418
  in_table->setReadSet();
1421
1419
 
1422
1420
  while ((current_field= *table_fields++) != NULL) 
1423
1421
  {
1437
1435
  return false;
1438
1436
}
1439
1437
 
1440
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1441
 
                                                            Table &table,
 
1438
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1439
                                                            Table *in_table,
1442
1440
                                                            const unsigned char *old_record, 
1443
1441
                                                            const unsigned char *new_record,
1444
1442
                                                            uint32_t *next_segment_id)
1445
1443
{
1446
 
  message::Statement *statement= session.getStatementMessage();
 
1444
  message::Statement *statement= in_session->getStatementMessage();
1447
1445
  message::Transaction *transaction= NULL;
1448
1446
 
1449
1447
  /*
1455
1453
   */
1456
1454
  if (statement == NULL)
1457
1455
  {
1458
 
    transaction= getActiveTransactionMessage(session);
 
1456
    transaction= getActiveTransactionMessage(in_session);
1459
1457
    
1460
1458
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1461
 
        transaction_message_threshold)
 
1459
        in_session->variables.transaction_message_threshold)
1462
1460
    {
1463
 
      transaction= segmentTransactionMessage(session, transaction);
 
1461
      transaction= segmentTransactionMessage(in_session, transaction);
1464
1462
    }
1465
1463
    
1466
1464
    statement= transaction->add_statement();
1467
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1468
 
    session.setStatementMessage(statement);
 
1465
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1466
    in_session->setStatementMessage(statement);
1469
1467
  }
1470
1468
  else
1471
1469
  {
1472
 
    transaction= getActiveTransactionMessage(session);
 
1470
    transaction= getActiveTransactionMessage(in_session);
1473
1471
    
1474
1472
    /*
1475
1473
     * If we've passed our threshold for the statement size (possible for
1477
1475
     * the Transaction will keep it from getting huge).
1478
1476
     */
1479
1477
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1480
 
        transaction_message_threshold)
 
1478
        in_session->variables.transaction_message_threshold)
1481
1479
    {
1482
1480
      /* Remember the transaction ID so we can re-use it */
1483
1481
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1496
1494
       * statement and transaction. This will also set the Transaction
1497
1495
       * and Statement objects in Session to NULL.
1498
1496
       */
1499
 
      commitTransactionMessage(session);
 
1497
      commitTransactionMessage(in_session);
1500
1498
      
1501
1499
      /*
1502
1500
       * Statement and Transaction should now be NULL, so new ones will get
1503
1501
       * created. We reuse the transaction id since we are segmenting
1504
1502
       * one transaction.
1505
1503
       */
1506
 
      transaction= getActiveTransactionMessage(session, false);
 
1504
      transaction= getActiveTransactionMessage(in_session, false);
1507
1505
      assert(transaction != NULL);
1508
1506
      
1509
1507
      statement= transaction->add_statement();
1510
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1511
 
      session.setStatementMessage(statement);
 
1508
      setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1509
      in_session->setStatementMessage(statement);
1512
1510
      
1513
1511
      /* Set the transaction ID to match the previous messages */
1514
1512
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1530
1528
}
1531
1529
 
1532
1530
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
 
                                          Session::const_reference session,
1534
 
                                          Table &table,
 
1531
                                          Session *in_session,
 
1532
                                          Table *in_table,
1535
1533
                                          const unsigned char *old_record, 
1536
1534
                                          const unsigned char *new_record)
1537
1535
{
1538
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
 
1536
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1539
1537
 
1540
1538
  /* 
1541
1539
   * Now we construct the specialized UpdateHeader message inside
1546
1544
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1547
1545
 
1548
1546
  string schema_name;
1549
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1547
  (void) in_table->getShare()->getSchemaName(schema_name);
1550
1548
  string table_name;
1551
 
  (void) table.getShare()->getTableName(table_name);
 
1549
  (void) in_table->getShare()->getTableName(table_name);
1552
1550
 
1553
1551
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1554
1552
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1555
1553
 
1556
1554
  Field *current_field;
1557
 
  Field **table_fields= table.getFields();
 
1555
  Field **table_fields= in_table->getFields();
1558
1556
 
1559
1557
  message::FieldMetadata *field_metadata;
1560
1558
 
1561
1559
  /* We will read all the table's fields... */
1562
 
  table.setReadSet();
 
1560
  in_table->setReadSet();
1563
1561
 
1564
1562
  while ((current_field= *table_fields++) != NULL) 
1565
1563
  {
1567
1565
     * We add the "key field metadata" -- i.e. the fields which is
1568
1566
     * the primary key for the table.
1569
1567
     */
1570
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1568
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1571
1569
    {
1572
1570
      field_metadata= header->add_key_field_metadata();
1573
1571
      field_metadata->set_name(current_field->field_name);
1574
1572
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1575
1573
    }
1576
1574
 
1577
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1575
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1578
1576
    {
1579
1577
      /* Field is changed from old to new */
1580
1578
      field_metadata= header->add_set_field_metadata();
1583
1581
    }
1584
1582
  }
1585
1583
}
1586
 
 
1587
 
void TransactionServices::updateRecord(Session::reference session,
1588
 
                                       Table &table, 
 
1584
void TransactionServices::updateRecord(Session *in_session,
 
1585
                                       Table *in_table, 
1589
1586
                                       const unsigned char *old_record, 
1590
1587
                                       const unsigned char *new_record)
1591
1588
{
1593
1590
  if (! replication_services.isActive())
1594
1591
    return;
1595
1592
 
1596
 
  if (not table.getShare()->replicate())
1597
 
    return;
1598
 
 
1599
1593
  uint32_t next_segment_id= 1;
1600
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
 
1594
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1601
1595
 
1602
1596
  message::UpdateData *data= statement.mutable_update_data();
1603
1597
  data->set_segment_id(next_segment_id);
1605
1599
  message::UpdateRecord *record= data->add_record();
1606
1600
 
1607
1601
  Field *current_field;
1608
 
  Field **table_fields= table.getFields();
1609
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1602
  Field **table_fields= in_table->getFields();
 
1603
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1610
1604
  string_value->set_charset(system_charset_info);
1611
1605
 
1612
1606
  while ((current_field= *table_fields++) != NULL) 
1622
1616
     *
1623
1617
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1624
1618
     */
1625
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1619
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1626
1620
    {
1627
1621
      /* Store the original "read bit" for this field */
1628
1622
      bool is_read_set= current_field->isReadSet();
1629
1623
 
1630
1624
      /* We need to mark that we will "read" this field... */
1631
 
      table.setReadSet(current_field->position());
 
1625
      in_table->setReadSet(current_field->position());
1632
1626
 
1633
1627
      /* Read the string value of this field's contents */
1634
1628
      string_value= current_field->val_str_internal(string_value);
1657
1651
     * primary key field value.  Replication only supports tables
1658
1652
     * with a primary key.
1659
1653
     */
1660
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1654
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1661
1655
    {
1662
1656
      /**
1663
1657
       * To say the below is ugly is an understatement. But it works.
1675
1669
}
1676
1670
 
1677
1671
bool TransactionServices::isFieldUpdated(Field *current_field,
1678
 
                                         Table &table,
 
1672
                                         Table *in_table,
1679
1673
                                         const unsigned char *old_record,
1680
1674
                                         const unsigned char *new_record)
1681
1675
{
1684
1678
   * we do this crazy pointer fiddling to figure out if the current field
1685
1679
   * has been updated in the supplied record raw byte pointers.
1686
1680
   */
1687
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1681
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1682
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1689
1683
 
1690
1684
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1691
1685
 
1715
1709
  return isUpdated;
1716
1710
}  
1717
1711
 
1718
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1719
 
                                                            Table &table,
 
1712
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1713
                                                            Table *in_table,
1720
1714
                                                            uint32_t *next_segment_id)
1721
1715
{
1722
 
  message::Statement *statement= session.getStatementMessage();
 
1716
  message::Statement *statement= in_session->getStatementMessage();
1723
1717
  message::Transaction *transaction= NULL;
1724
1718
 
1725
1719
  /*
1731
1725
   */
1732
1726
  if (statement == NULL)
1733
1727
  {
1734
 
    transaction= getActiveTransactionMessage(session);
 
1728
    transaction= getActiveTransactionMessage(in_session);
1735
1729
    
1736
1730
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1737
 
        transaction_message_threshold)
 
1731
        in_session->variables.transaction_message_threshold)
1738
1732
    {
1739
 
      transaction= segmentTransactionMessage(session, transaction);
 
1733
      transaction= segmentTransactionMessage(in_session, transaction);
1740
1734
    }
1741
1735
    
1742
1736
    statement= transaction->add_statement();
1743
 
    setDeleteHeader(*statement, session, table);
1744
 
    session.setStatementMessage(statement);
 
1737
    setDeleteHeader(*statement, in_session, in_table);
 
1738
    in_session->setStatementMessage(statement);
1745
1739
  }
1746
1740
  else
1747
1741
  {
1748
 
    transaction= getActiveTransactionMessage(session);
 
1742
    transaction= getActiveTransactionMessage(in_session);
1749
1743
    
1750
1744
    /*
1751
1745
     * If we've passed our threshold for the statement size (possible for
1753
1747
     * the Transaction will keep it from getting huge).
1754
1748
     */
1755
1749
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1756
 
        transaction_message_threshold)
 
1750
        in_session->variables.transaction_message_threshold)
1757
1751
    {
1758
1752
      /* Remember the transaction ID so we can re-use it */
1759
1753
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1772
1766
       * statement and transaction. This will also set the Transaction
1773
1767
       * and Statement objects in Session to NULL.
1774
1768
       */
1775
 
      commitTransactionMessage(session);
 
1769
      commitTransactionMessage(in_session);
1776
1770
      
1777
1771
      /*
1778
1772
       * Statement and Transaction should now be NULL, so new ones will get
1779
1773
       * created. We reuse the transaction id since we are segmenting
1780
1774
       * one transaction.
1781
1775
       */
1782
 
      transaction= getActiveTransactionMessage(session, false);
 
1776
      transaction= getActiveTransactionMessage(in_session, false);
1783
1777
      assert(transaction != NULL);
1784
1778
      
1785
1779
      statement= transaction->add_statement();
1786
 
      setDeleteHeader(*statement, session, table);
1787
 
      session.setStatementMessage(statement);
 
1780
      setDeleteHeader(*statement, in_session, in_table);
 
1781
      in_session->setStatementMessage(statement);
1788
1782
      
1789
1783
      /* Set the transaction ID to match the previous messages */
1790
1784
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1806
1800
}
1807
1801
 
1808
1802
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
 
                                          Session::const_reference session,
1810
 
                                          Table &table)
 
1803
                                          Session *in_session,
 
1804
                                          Table *in_table)
1811
1805
{
1812
 
  initStatementMessage(statement, message::Statement::DELETE, session);
 
1806
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1813
1807
 
1814
1808
  /* 
1815
1809
   * Now we construct the specialized DeleteHeader message inside
1819
1813
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1820
1814
 
1821
1815
  string schema_name;
1822
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1816
  (void) in_table->getShare()->getSchemaName(schema_name);
1823
1817
  string table_name;
1824
 
  (void) table.getShare()->getTableName(table_name);
 
1818
  (void) in_table->getShare()->getTableName(table_name);
1825
1819
 
1826
1820
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1827
1821
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1828
1822
 
1829
1823
  Field *current_field;
1830
 
  Field **table_fields= table.getFields();
 
1824
  Field **table_fields= in_table->getFields();
1831
1825
 
1832
1826
  message::FieldMetadata *field_metadata;
1833
1827
 
1838
1832
     * primary key field value.  Replication only supports tables
1839
1833
     * with a primary key.
1840
1834
     */
1841
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1835
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1842
1836
    {
1843
1837
      field_metadata= header->add_key_field_metadata();
1844
1838
      field_metadata->set_name(current_field->field_name);
1847
1841
  }
1848
1842
}
1849
1843
 
1850
 
void TransactionServices::deleteRecord(Session::reference session,
1851
 
                                       Table &table,
1852
 
                                       bool use_update_record)
 
1844
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1853
1845
{
1854
1846
  ReplicationServices &replication_services= ReplicationServices::singleton();
1855
1847
  if (! replication_services.isActive())
1856
1848
    return;
1857
1849
 
1858
 
  if (not table.getShare()->replicate())
1859
 
    return;
1860
 
 
1861
1850
  uint32_t next_segment_id= 1;
1862
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
 
1851
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1863
1852
 
1864
1853
  message::DeleteData *data= statement.mutable_delete_data();
1865
1854
  data->set_segment_id(next_segment_id);
1867
1856
  message::DeleteRecord *record= data->add_record();
1868
1857
 
1869
1858
  Field *current_field;
1870
 
  Field **table_fields= table.getFields();
1871
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1859
  Field **table_fields= in_table->getFields();
 
1860
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1872
1861
  string_value->set_charset(system_charset_info);
1873
1862
 
1874
1863
  while ((current_field= *table_fields++) != NULL) 
1878
1867
     * primary key field value.  Replication only supports tables
1879
1868
     * with a primary key.
1880
1869
     */
1881
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1870
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1882
1871
    {
1883
1872
      if (use_update_record)
1884
1873
      {
1890
1879
         * We are careful not to change anything in old_ptr.
1891
1880
         */
1892
1881
        const unsigned char *old_ptr= current_field->ptr;
1893
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
 
1882
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1894
1883
        string_value= current_field->val_str_internal(string_value);
1895
1884
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1896
1885
      }
1907
1896
  }
1908
1897
}
1909
1898
 
1910
 
void TransactionServices::createTable(Session::reference session,
 
1899
void TransactionServices::createTable(Session *in_session,
1911
1900
                                      const message::Table &table)
1912
1901
{
1913
1902
  ReplicationServices &replication_services= ReplicationServices::singleton();
1914
1903
  if (! replication_services.isActive())
1915
1904
    return;
1916
 
 
1917
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1918
 
    return;
1919
 
 
1920
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1905
  
 
1906
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1921
1907
  message::Statement *statement= transaction->add_statement();
1922
1908
 
1923
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
 
1909
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1924
1910
 
1925
1911
  /* 
1926
1912
   * Construct the specialized CreateTableStatement message and attach
1930
1916
  message::Table *new_table_message= create_table_statement->mutable_table();
1931
1917
  *new_table_message= table;
1932
1918
 
1933
 
  finalizeStatementMessage(*statement, session);
 
1919
  finalizeStatementMessage(*statement, in_session);
1934
1920
 
1935
 
  finalizeTransactionMessage(*transaction, session);
 
1921
  finalizeTransactionMessage(*transaction, in_session);
1936
1922
  
1937
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1923
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1938
1924
 
1939
 
  cleanupTransactionMessage(transaction, session);
 
1925
  cleanupTransactionMessage(transaction, in_session);
1940
1926
 
1941
1927
}
1942
1928
 
1943
 
void TransactionServices::createSchema(Session::reference session,
 
1929
void TransactionServices::createSchema(Session *in_session,
1944
1930
                                       const message::Schema &schema)
1945
1931
{
1946
1932
  ReplicationServices &replication_services= ReplicationServices::singleton();
1947
1933
  if (! replication_services.isActive())
1948
1934
    return;
1949
 
 
1950
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1951
 
    return;
1952
 
 
1953
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1935
  
 
1936
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1954
1937
  message::Statement *statement= transaction->add_statement();
1955
1938
 
1956
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
 
1939
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1957
1940
 
1958
1941
  /* 
1959
1942
   * Construct the specialized CreateSchemaStatement message and attach
1963
1946
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
1947
  *new_schema_message= schema;
1965
1948
 
1966
 
  finalizeStatementMessage(*statement, session);
 
1949
  finalizeStatementMessage(*statement, in_session);
1967
1950
 
1968
 
  finalizeTransactionMessage(*transaction, session);
 
1951
  finalizeTransactionMessage(*transaction, in_session);
1969
1952
  
1970
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1953
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1971
1954
 
1972
 
  cleanupTransactionMessage(transaction, session);
 
1955
  cleanupTransactionMessage(transaction, in_session);
1973
1956
 
1974
1957
}
1975
1958
 
1976
 
void TransactionServices::dropSchema(Session::reference session,
1977
 
                                     identifier::Schema::const_reference identifier,
1978
 
                                     message::schema::const_reference schema)
 
1959
void TransactionServices::dropSchema(Session *in_session, identifier::Schema::const_reference identifier)
1979
1960
{
1980
1961
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
1962
  if (! replication_services.isActive())
1982
1963
    return;
1983
 
 
1984
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1985
 
    return;
1986
 
 
1987
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1964
  
 
1965
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1988
1966
  message::Statement *statement= transaction->add_statement();
1989
1967
 
1990
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
 
1968
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1991
1969
 
1992
1970
  /* 
1993
1971
   * Construct the specialized DropSchemaStatement message and attach
1997
1975
 
1998
1976
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1999
1977
 
2000
 
  finalizeStatementMessage(*statement, session);
 
1978
  finalizeStatementMessage(*statement, in_session);
2001
1979
 
2002
 
  finalizeTransactionMessage(*transaction, session);
 
1980
  finalizeTransactionMessage(*transaction, in_session);
2003
1981
  
2004
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1982
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2005
1983
 
2006
 
  cleanupTransactionMessage(transaction, session);
 
1984
  cleanupTransactionMessage(transaction, in_session);
2007
1985
}
2008
1986
 
2009
 
void TransactionServices::alterSchema(Session::reference session,
2010
 
                                      const message::Schema &old_schema,
 
1987
void TransactionServices::alterSchema(Session *in_session,
 
1988
                                      const message::schema::shared_ptr &old_schema,
2011
1989
                                      const message::Schema &new_schema)
2012
1990
{
2013
1991
  ReplicationServices &replication_services= ReplicationServices::singleton();
2014
1992
  if (! replication_services.isActive())
2015
1993
    return;
2016
 
 
2017
 
  if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2018
 
    return;
2019
 
 
2020
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1994
  
 
1995
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2021
1996
  message::Statement *statement= transaction->add_statement();
2022
1997
 
2023
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
 
1998
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, in_session);
2024
1999
 
2025
2000
  /* 
2026
2001
   * Construct the specialized AlterSchemaStatement message and attach
2031
2006
  message::Schema *before= alter_schema_statement->mutable_before();
2032
2007
  message::Schema *after= alter_schema_statement->mutable_after();
2033
2008
 
2034
 
  *before= old_schema;
 
2009
  *before= *old_schema;
2035
2010
  *after= new_schema;
2036
2011
 
2037
 
  finalizeStatementMessage(*statement, session);
 
2012
  finalizeStatementMessage(*statement, in_session);
2038
2013
 
2039
 
  finalizeTransactionMessage(*transaction, session);
 
2014
  finalizeTransactionMessage(*transaction, in_session);
2040
2015
  
2041
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2016
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2042
2017
 
2043
 
  cleanupTransactionMessage(transaction, session);
 
2018
  cleanupTransactionMessage(transaction, in_session);
2044
2019
}
2045
2020
 
2046
 
void TransactionServices::dropTable(Session::reference session,
2047
 
                                    identifier::Table::const_reference identifier,
2048
 
                                    message::table::const_reference table,
 
2021
void TransactionServices::dropTable(Session *in_session,
 
2022
                                    const identifier::Table &table,
2049
2023
                                    bool if_exists)
2050
2024
{
2051
2025
  ReplicationServices &replication_services= ReplicationServices::singleton();
2052
2026
  if (! replication_services.isActive())
2053
2027
    return;
2054
 
 
2055
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2056
 
    return;
2057
 
 
2058
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2028
  
 
2029
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2059
2030
  message::Statement *statement= transaction->add_statement();
2060
2031
 
2061
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
 
2032
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2062
2033
 
2063
2034
  /* 
2064
2035
   * Construct the specialized DropTableStatement message and attach
2070
2041
 
2071
2042
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2072
2043
 
2073
 
  table_metadata->set_schema_name(identifier.getSchemaName());
2074
 
  table_metadata->set_table_name(identifier.getTableName());
2075
 
 
2076
 
  finalizeStatementMessage(*statement, session);
2077
 
 
2078
 
  finalizeTransactionMessage(*transaction, session);
 
2044
  table_metadata->set_schema_name(table.getSchemaName());
 
2045
  table_metadata->set_table_name(table.getTableName());
 
2046
 
 
2047
  finalizeStatementMessage(*statement, in_session);
 
2048
 
 
2049
  finalizeTransactionMessage(*transaction, in_session);
2079
2050
  
2080
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2051
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2081
2052
 
2082
 
  cleanupTransactionMessage(transaction, session);
 
2053
  cleanupTransactionMessage(transaction, in_session);
2083
2054
}
2084
2055
 
2085
 
void TransactionServices::truncateTable(Session::reference session,
2086
 
                                        Table &table)
 
2056
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2087
2057
{
2088
2058
  ReplicationServices &replication_services= ReplicationServices::singleton();
2089
2059
  if (! replication_services.isActive())
2090
2060
    return;
2091
 
 
2092
 
  if (not table.getShare()->replicate())
2093
 
    return;
2094
 
 
2095
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2061
  
 
2062
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
2063
  message::Statement *statement= transaction->add_statement();
2097
2064
 
2098
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
 
2065
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2099
2066
 
2100
2067
  /* 
2101
2068
   * Construct the specialized TruncateTableStatement message and attach
2105
2072
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2106
2073
 
2107
2074
  string schema_name;
2108
 
  (void) table.getShare()->getSchemaName(schema_name);
2109
 
 
 
2075
  (void) in_table->getShare()->getSchemaName(schema_name);
2110
2076
  string table_name;
2111
 
  (void) table.getShare()->getTableName(table_name);
 
2077
  (void) in_table->getShare()->getTableName(table_name);
2112
2078
 
2113
2079
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
2080
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2115
2081
 
2116
 
  finalizeStatementMessage(*statement, session);
 
2082
  finalizeStatementMessage(*statement, in_session);
2117
2083
 
2118
 
  finalizeTransactionMessage(*transaction, session);
 
2084
  finalizeTransactionMessage(*transaction, in_session);
2119
2085
  
2120
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2086
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2121
2087
 
2122
 
  cleanupTransactionMessage(transaction, session);
 
2088
  cleanupTransactionMessage(transaction, in_session);
2123
2089
}
2124
2090
 
2125
 
void TransactionServices::rawStatement(Session::reference session,
2126
 
                                       const string &query)
 
2091
void TransactionServices::rawStatement(Session *in_session, const string &query)
2127
2092
{
2128
2093
  ReplicationServices &replication_services= ReplicationServices::singleton();
2129
2094
  if (! replication_services.isActive())
2130
2095
    return;
2131
2096
 
2132
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2097
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2133
2098
  message::Statement *statement= transaction->add_statement();
2134
2099
 
2135
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
 
2100
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2136
2101
  statement->set_sql(query);
2137
 
  finalizeStatementMessage(*statement, session);
 
2102
  finalizeStatementMessage(*statement, in_session);
2138
2103
 
2139
 
  finalizeTransactionMessage(*transaction, session);
 
2104
  finalizeTransactionMessage(*transaction, in_session);
2140
2105
  
2141
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2106
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2142
2107
 
2143
 
  cleanupTransactionMessage(transaction, session);
 
2108
  cleanupTransactionMessage(transaction, in_session);
2144
2109
}
2145
2110
 
2146
 
int TransactionServices::sendEvent(Session::reference session,
2147
 
                                   const message::Event &event)
 
2111
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2148
2112
{
2149
2113
  ReplicationServices &replication_services= ReplicationServices::singleton();
2150
2114
  if (! replication_services.isActive())
2162
2126
 
2163
2127
  trx_event->CopyFrom(event);
2164
2128
 
2165
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
2129
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2166
2130
 
2167
2131
  delete transaction;
2168
2132
 
2169
2133
  return static_cast<int>(result);
2170
2134
}
2171
2135
 
2172
 
bool TransactionServices::sendStartupEvent(Session::reference session)
 
2136
bool TransactionServices::sendStartupEvent(Session *session)
2173
2137
{
2174
2138
  message::Event event;
2175
2139
  event.set_type(message::Event::STARTUP);
2178
2142
  return true;
2179
2143
}
2180
2144
 
2181
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
 
2145
bool TransactionServices::sendShutdownEvent(Session *session)
2182
2146
{
2183
2147
  message::Event event;
2184
2148
  event.set_type(message::Event::SHUTDOWN);