~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-12-30 15:50:18 UTC
  • mto: This revision was merged to the branch mainline in revision 2041.
  • Revision ID: brian@tangent.org-20101230155018-bacjzhn8vfd57his
Update support since we support this syntax now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
 * plugins can understand.
48
48
 */
49
49
 
50
 
#include <config.h>
51
 
#include <drizzled/current_session.h>
52
 
#include <drizzled/my_hash.h>
53
 
#include <drizzled/error.h>
54
 
#include <drizzled/gettext.h>
55
 
#include <drizzled/probes.h>
56
 
#include <drizzled/sql_parse.h>
57
 
#include <drizzled/session.h>
58
 
#include <drizzled/sql_base.h>
59
 
#include <drizzled/replication_services.h>
60
 
#include <drizzled/transaction_services.h>
61
 
#include <drizzled/transaction_context.h>
62
 
#include <drizzled/message/transaction.pb.h>
63
 
#include <drizzled/message/statement_transform.h>
64
 
#include <drizzled/resource_context.h>
65
 
#include <drizzled/lock.h>
66
 
#include <drizzled/item/int.h>
67
 
#include <drizzled/item/empty_string.h>
68
 
#include <drizzled/field/epoch.h>
69
 
#include <drizzled/plugin/client.h>
70
 
#include <drizzled/plugin/monitored_in_transaction.h>
71
 
#include <drizzled/plugin/transactional_storage_engine.h>
72
 
#include <drizzled/plugin/xa_resource_manager.h>
73
 
#include <drizzled/plugin/xa_storage_engine.h>
74
 
#include <drizzled/internal/my_sys.h>
 
50
#include "config.h"
 
51
#include "drizzled/my_hash.h"
 
52
#include "drizzled/error.h"
 
53
#include "drizzled/gettext.h"
 
54
#include "drizzled/probes.h"
 
55
#include "drizzled/sql_parse.h"
 
56
#include "drizzled/session.h"
 
57
#include "drizzled/sql_base.h"
 
58
#include "drizzled/replication_services.h"
 
59
#include "drizzled/transaction_services.h"
 
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
 
63
#include "drizzled/resource_context.h"
 
64
#include "drizzled/lock.h"
 
65
#include "drizzled/item/int.h"
 
66
#include "drizzled/item/empty_string.h"
 
67
#include "drizzled/field/epoch.h"
 
68
#include "drizzled/plugin/client.h"
 
69
#include "drizzled/plugin/monitored_in_transaction.h"
 
70
#include "drizzled/plugin/transactional_storage_engine.h"
 
71
#include "drizzled/plugin/xa_resource_manager.h"
 
72
#include "drizzled/plugin/xa_storage_engine.h"
 
73
#include "drizzled/internal/my_sys.h"
75
74
 
76
75
#include <vector>
77
76
#include <algorithm>
314
313
  }
315
314
}
316
315
 
317
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
316
void TransactionServices::registerResourceForStatement(Session *session,
318
317
                                                       plugin::MonitoredInTransaction *monitored,
319
318
                                                       plugin::TransactionalStorageEngine *engine)
320
319
{
321
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
322
321
  {
323
322
    /* 
324
323
     * Now we automatically register this resource manager for the
330
329
    registerResourceForTransaction(session, monitored, engine);
331
330
  }
332
331
 
333
 
  TransactionContext *trans= &session.transaction.stmt;
334
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session->transaction.stmt;
 
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
335
334
 
336
335
  if (resource_context->isStarted())
337
336
    return; /* already registered, return */
346
345
  trans->no_2pc|= true;
347
346
}
348
347
 
349
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
348
void TransactionServices::registerResourceForStatement(Session *session,
350
349
                                                       plugin::MonitoredInTransaction *monitored,
351
350
                                                       plugin::TransactionalStorageEngine *engine,
352
351
                                                       plugin::XaResourceManager *resource_manager)
353
352
{
354
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
355
354
  {
356
355
    /* 
357
356
     * Now we automatically register this resource manager for the
363
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
364
363
  }
365
364
 
366
 
  TransactionContext *trans= &session.transaction.stmt;
367
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session->transaction.stmt;
 
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
368
367
 
369
368
  if (resource_context->isStarted())
370
369
    return; /* already registered, return */
380
379
  trans->no_2pc|= false;
381
380
}
382
381
 
383
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
382
void TransactionServices::registerResourceForTransaction(Session *session,
384
383
                                                         plugin::MonitoredInTransaction *monitored,
385
384
                                                         plugin::TransactionalStorageEngine *engine)
386
385
{
387
 
  TransactionContext *trans= &session.transaction.all;
388
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session->transaction.all;
 
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
389
388
 
390
389
  if (resource_context->isStarted())
391
390
    return; /* already registered, return */
392
391
 
393
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
394
393
 
395
394
  trans->registerResource(resource_context);
396
395
 
401
400
  resource_context->setTransactionalStorageEngine(engine);
402
401
  trans->no_2pc|= true;
403
402
 
404
 
  if (session.transaction.xid_state.xid.is_null())
405
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
403
  if (session->transaction.xid_state.xid.is_null())
 
404
    session->transaction.xid_state.xid.set(session->getQueryId());
406
405
 
407
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
409
408
    registerResourceForStatement(session, monitored, engine);
410
409
}
411
410
 
412
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
411
void TransactionServices::registerResourceForTransaction(Session *session,
413
412
                                                         plugin::MonitoredInTransaction *monitored,
414
413
                                                         plugin::TransactionalStorageEngine *engine,
415
414
                                                         plugin::XaResourceManager *resource_manager)
416
415
{
417
 
  TransactionContext *trans= &session.transaction.all;
418
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session->transaction.all;
 
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
419
418
 
420
419
  if (resource_context->isStarted())
421
420
    return; /* already registered, return */
422
421
 
423
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
424
423
 
425
424
  trans->registerResource(resource_context);
426
425
 
431
430
  resource_context->setTransactionalStorageEngine(engine);
432
431
  trans->no_2pc|= true;
433
432
 
434
 
  if (session.transaction.xid_state.xid.is_null())
435
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
433
  if (session->transaction.xid_state.xid.is_null())
 
434
    session->transaction.xid_state.xid.set(session->getQueryId());
436
435
 
437
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
438
437
 
439
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
441
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
442
441
}
443
442
 
454
453
  my_session->setXaId(xa_id);
455
454
}
456
455
 
457
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
457
{
459
 
  if (session.getXaId() == 0)
 
458
  if (session->getXaId() == 0)
460
459
  {
461
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
462
461
  }
463
462
 
464
 
  return session.getXaId();
 
463
  return session->getXaId();
465
464
}
466
465
 
467
 
int TransactionServices::commitTransaction(Session::reference session,
468
 
                                           bool normal_transaction)
 
466
/**
 
467
  @retval
 
468
    0   ok
 
469
  @retval
 
470
    1   transaction was rolled back
 
471
  @retval
 
472
    2   error during commit, data may be inconsistent
 
473
 
 
474
  @todo
 
475
    Since we don't support nested statement transactions in 5.0,
 
476
    we can't commit or rollback stmt transactions while we are inside
 
477
    stored functions or triggers. So we simply do nothing now.
 
478
    TODO: This should be fixed in later ( >= 5.1) releases.
 
479
*/
 
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
469
481
{
470
482
  int error= 0, cookie= 0;
471
483
  /*
472
484
    'all' means that this is either an explicit commit issued by
473
485
    user, or an implicit commit issued by a DDL.
474
486
  */
475
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
476
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
477
489
 
478
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
479
491
 
480
492
  /*
481
493
    We must not commit the normal transaction if a statement
483
495
    flags will not get propagated to its normal transaction's
484
496
    counterpart.
485
497
  */
486
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
487
 
              trans == &session.transaction.stmt);
 
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
499
              trans == &session->transaction.stmt);
488
500
 
489
501
  if (resource_contexts.empty() == false)
490
502
  {
491
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
492
504
    {
493
505
      rollbackTransaction(session, normal_transaction);
494
506
      return 1;
519
531
 
520
532
        if (resource->participatesInXaTransaction())
521
533
        {
522
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
523
535
          {
524
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
525
537
            error= 1;
526
538
          }
527
539
          else
528
540
          {
529
 
            session.status_var.ha_prepare_count++;
 
541
            session->status_var.ha_prepare_count++;
530
542
          }
531
543
        }
532
544
      }
548
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
549
561
end:
550
562
    if (is_real_trans)
551
 
      session.startWaitingGlobalReadLock();
 
563
      session->startWaitingGlobalReadLock();
552
564
  }
553
565
  return error;
554
566
}
557
569
  @note
558
570
  This function does not care about global read lock. A caller should.
559
571
*/
560
 
int TransactionServices::commitPhaseOne(Session::reference session,
561
 
                                        bool normal_transaction)
 
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
562
573
{
563
574
  int error=0;
564
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
565
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
566
577
 
567
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
568
 
  bool all= normal_transaction;
569
 
 
570
 
  /* If we're in autocommit then we have a real transaction to commit
571
 
     (except if it's BEGIN)
572
 
  */
573
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
574
 
    all= true;
 
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
575
579
 
576
580
  if (resource_contexts.empty() == false)
577
581
  {
586
590
 
587
591
      if (resource->participatesInXaTransaction())
588
592
      {
589
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
590
594
        {
591
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
592
596
          error= 1;
593
597
        }
594
598
        else if (normal_transaction)
595
599
        {
596
 
          session.status_var.ha_commit_count++;
 
600
          session->status_var.ha_commit_count++;
597
601
        }
598
602
      }
599
603
      else if (resource->participatesInSqlTransaction())
600
604
      {
601
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
602
606
        {
603
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
604
608
          error= 1;
605
609
        }
606
610
        else if (normal_transaction)
607
611
        {
608
 
          session.status_var.ha_commit_count++;
 
612
          session->status_var.ha_commit_count++;
609
613
        }
610
614
      }
611
615
      resource_context->reset(); /* keep it conveniently zero-filled */
612
616
    }
613
617
 
614
618
    if (is_real_trans)
615
 
      session.transaction.xid_state.xid.null();
 
619
      session->transaction.xid_state.xid.null();
616
620
 
617
621
    if (normal_transaction)
618
622
    {
619
 
      session.variables.tx_isolation= session.session_tx_isolation;
620
 
      session.transaction.cleanup();
 
623
      session->variables.tx_isolation= session->session_tx_isolation;
 
624
      session->transaction.cleanup();
621
625
    }
622
626
  }
623
627
  trans->reset();
624
628
  return error;
625
629
}
626
630
 
627
 
int TransactionServices::rollbackTransaction(Session::reference session,
628
 
                                             bool normal_transaction)
 
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
629
632
{
630
633
  int error= 0;
631
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
632
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
636
 
634
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
636
638
 
637
639
  /*
638
640
    We must not rollback the normal transaction if a statement
639
641
    transaction is pending.
640
642
  */
641
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
642
 
              trans == &session.transaction.stmt);
 
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
644
              trans == &session->transaction.stmt);
643
645
 
644
646
  if (resource_contexts.empty() == false)
645
647
  {
654
656
 
655
657
      if (resource->participatesInXaTransaction())
656
658
      {
657
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
658
660
        {
659
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
660
662
          error= 1;
661
663
        }
662
664
        else if (normal_transaction)
663
665
        {
664
 
          session.status_var.ha_rollback_count++;
 
666
          session->status_var.ha_rollback_count++;
665
667
        }
666
668
      }
667
669
      else if (resource->participatesInSqlTransaction())
668
670
      {
669
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
670
672
        {
671
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
672
674
          error= 1;
673
675
        }
674
676
        else if (normal_transaction)
675
677
        {
676
 
          session.status_var.ha_rollback_count++;
 
678
          session->status_var.ha_rollback_count++;
677
679
        }
678
680
      }
679
681
      resource_context->reset(); /* keep it conveniently zero-filled */
686
688
     * a rollback statement with the corresponding transaction ID
687
689
     * to rollback.
688
690
     */
689
 
    if (all)
 
691
    if (normal_transaction)
690
692
      rollbackTransactionMessage(session);
691
693
    else
692
694
      rollbackStatementMessage(session);
693
695
 
694
696
    if (is_real_trans)
695
 
      session.transaction.xid_state.xid.null();
 
697
      session->transaction.xid_state.xid.null();
696
698
    if (normal_transaction)
697
699
    {
698
 
      session.variables.tx_isolation=session.session_tx_isolation;
699
 
      session.transaction.cleanup();
 
700
      session->variables.tx_isolation=session->session_tx_isolation;
 
701
      session->transaction.cleanup();
700
702
    }
701
703
  }
702
704
  if (normal_transaction)
703
 
    session.transaction_rollback_request= false;
 
705
    session->transaction_rollback_request= false;
704
706
 
705
707
  /*
706
708
   * If a non-transactional table was updated, warn the user
707
709
   */
708
710
  if (is_real_trans &&
709
 
      session.transaction.all.hasModifiedNonTransData() &&
710
 
      session.getKilled() != Session::KILL_CONNECTION)
 
711
      session->transaction.all.hasModifiedNonTransData() &&
 
712
      session->getKilled() != Session::KILL_CONNECTION)
711
713
  {
712
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
714
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
715
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
716
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
715
717
  }
717
719
  return error;
718
720
}
719
721
 
720
 
int TransactionServices::autocommitOrRollback(Session::reference session,
721
 
                                              int error)
 
722
/**
 
723
  This is used to commit or rollback a single statement depending on
 
724
  the value of error.
 
725
 
 
726
  @note
 
727
    Note that if the autocommit is on, then the following call inside
 
728
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
729
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
730
    the user has used LOCK TABLES then that mechanism does not know to do the
 
731
    commit.
 
732
*/
 
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
722
734
{
723
735
  /* One GPB Statement message per SQL statement */
724
 
  message::Statement *statement= session.getStatementMessage();
 
736
  message::Statement *statement= session->getStatementMessage();
725
737
  if ((statement != NULL) && (! error))
726
738
    finalizeStatementMessage(*statement, session);
727
739
 
728
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
 
740
  if (session->transaction.stmt.getResourceContexts().empty() == false)
729
741
  {
730
 
    TransactionContext *trans = &session.transaction.stmt;
 
742
    TransactionContext *trans = &session->transaction.stmt;
731
743
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
732
744
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
733
745
         it != resource_contexts.end();
735
747
    {
736
748
      ResourceContext *resource_context= *it;
737
749
 
738
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
 
750
      resource_context->getTransactionalStorageEngine()->endStatement(session);
739
751
    }
740
752
 
741
753
    if (! error)
746
758
    else
747
759
    {
748
760
      (void) rollbackTransaction(session, false);
749
 
      if (session.transaction_rollback_request)
750
 
      {
 
761
      if (session->transaction_rollback_request)
751
762
        (void) rollbackTransaction(session, true);
752
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
753
 
      }
754
763
    }
755
764
 
756
 
    session.variables.tx_isolation= session.session_tx_isolation;
 
765
    session->variables.tx_isolation= session->session_tx_isolation;
757
766
  }
758
 
 
759
767
  return error;
760
768
}
761
769
 
769
777
  }
770
778
};
771
779
 
772
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
773
 
                                             NamedSavepoint &sv)
 
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
774
781
{
775
782
  int error= 0;
776
 
  TransactionContext *trans= &session.transaction.all;
 
783
  TransactionContext *trans= &session->transaction.all;
777
784
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
778
785
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
779
786
 
793
800
 
794
801
    if (resource->participatesInSqlTransaction())
795
802
    {
796
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
803
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
797
804
      {
798
805
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
799
806
        error= 1;
800
807
      }
801
808
      else
802
809
      {
803
 
        session.status_var.ha_savepoint_rollback_count++;
 
810
        session->status_var.ha_savepoint_rollback_count++;
804
811
      }
805
812
    }
806
813
    trans->no_2pc|= not resource->participatesInXaTransaction();
850
857
 
851
858
      if (resource->participatesInSqlTransaction())
852
859
      {
853
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
860
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
854
861
        {
855
862
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
856
863
          error= 1;
857
864
        }
858
865
        else
859
866
        {
860
 
          session.status_var.ha_rollback_count++;
 
867
          session->status_var.ha_rollback_count++;
861
868
        }
862
869
      }
863
870
      resource_context->reset(); /* keep it conveniently zero-filled */
880
887
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
881
888
      if (num_statements == 0)
882
889
      {    
883
 
        session.setStatementMessage(NULL);
 
890
        session->setStatementMessage(NULL);
884
891
      }    
885
892
      else 
886
893
      {
887
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
894
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
888
895
      }    
889
 
      session.setTransactionMessage(savepoint_transaction_copy);
 
896
      session->setTransactionMessage(savepoint_transaction_copy);
890
897
    }
891
898
  }
892
899
 
899
906
  section "4.33.4 SQL-statements and transaction states",
900
907
  NamedSavepoint is *not* transaction-initiating SQL-statement
901
908
*/
902
 
int TransactionServices::setSavepoint(Session::reference session,
903
 
                                      NamedSavepoint &sv)
 
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
904
910
{
905
911
  int error= 0;
906
 
  TransactionContext *trans= &session.transaction.all;
 
912
  TransactionContext *trans= &session->transaction.all;
907
913
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
908
914
 
909
915
  if (resource_contexts.empty() == false)
919
925
 
920
926
      if (resource->participatesInSqlTransaction())
921
927
      {
922
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
928
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
923
929
        {
924
930
          my_error(ER_GET_ERRNO, MYF(0), err);
925
931
          error= 1;
926
932
        }
927
933
        else
928
934
        {
929
 
          session.status_var.ha_savepoint_count++;
 
935
          session->status_var.ha_savepoint_count++;
930
936
        }
931
937
      }
932
938
    }
938
944
 
939
945
  if (shouldConstructMessages())
940
946
  {
941
 
    message::Transaction *transaction= session.getTransactionMessage();
 
947
    message::Transaction *transaction= session->getTransactionMessage();
942
948
                  
943
949
    if (transaction != NULL)
944
950
    {
951
957
  return error;
952
958
}
953
959
 
954
 
int TransactionServices::releaseSavepoint(Session::reference session,
955
 
                                          NamedSavepoint &sv)
 
960
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
956
961
{
957
962
  int error= 0;
958
963
 
969
974
 
970
975
    if (resource->participatesInSqlTransaction())
971
976
    {
972
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
977
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
973
978
      {
974
979
        my_error(ER_GET_ERRNO, MYF(0), err);
975
980
        error= 1;
986
991
  return replication_services.isActive();
987
992
}
988
993
 
989
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
990
 
                                                                       bool should_inc_trx_id)
 
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
991
995
{
992
 
  message::Transaction *transaction= session.getTransactionMessage();
 
996
  message::Transaction *transaction= in_session->getTransactionMessage();
993
997
 
994
998
  if (unlikely(transaction == NULL))
995
999
  {
999
1003
     * deleting transaction message when done with it.
1000
1004
     */
1001
1005
    transaction= new (nothrow) message::Transaction();
1002
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
 
    session.setTransactionMessage(transaction);
 
1006
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
1007
    in_session->setTransactionMessage(transaction);
1004
1008
    return transaction;
1005
1009
  }
1006
1010
  else
1007
1011
    return transaction;
1008
1012
}
1009
1013
 
1010
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
 
                                                 Session::reference session,
 
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1015
                                                 Session *in_session,
1012
1016
                                                 bool should_inc_trx_id)
1013
1017
{
1014
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
 
  trx->set_server_id(session.getServerId());
 
1018
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1019
  trx->set_server_id(in_session->getServerId());
1016
1020
 
1017
1021
  if (should_inc_trx_id)
1018
1022
  {
1019
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1020
 
    session.setXaId(0);
1021
 
  }
 
1023
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1024
    in_session->setXaId(0);
 
1025
  }  
1022
1026
  else
1023
 
  {
1024
 
    /* trx and seg id will get set properly elsewhere */
 
1027
  { 
1025
1028
    trx->set_transaction_id(0);
1026
1029
  }
1027
1030
 
1028
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1029
 
  
1030
 
  /* segment info may get set elsewhere as needed */
1031
 
  transaction.set_segment_id(1);
1032
 
  transaction.set_end_segment(true);
1033
 
}
1034
 
 
1035
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
 
                                                     Session::const_reference session)
1037
 
{
1038
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1040
 
}
1041
 
 
1042
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
 
                                                    Session::reference session)
1044
 
{
1045
 
  delete transaction;
1046
 
  session.setStatementMessage(NULL);
1047
 
  session.setTransactionMessage(NULL);
1048
 
  session.setXaId(0);
1049
 
}
1050
 
 
1051
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
1031
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1032
}
 
1033
 
 
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1035
                                              Session *in_session)
 
1036
{
 
1037
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1038
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1039
}
 
1040
 
 
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1042
                                             Session *in_session)
 
1043
{
 
1044
  delete in_transaction;
 
1045
  in_session->setStatementMessage(NULL);
 
1046
  in_session->setTransactionMessage(NULL);
 
1047
  in_session->setXaId(0);
 
1048
}
 
1049
 
 
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
1051
{
1053
1052
  ReplicationServices &replication_services= ReplicationServices::singleton();
1054
1053
  if (! replication_services.isActive())
1058
1057
   * If no Transaction message was ever created, then no data modification
1059
1058
   * occurred inside the transaction, so nothing to do.
1060
1059
   */
1061
 
  if (session.getTransactionMessage() == NULL)
 
1060
  if (in_session->getTransactionMessage() == NULL)
1062
1061
    return 0;
1063
1062
  
1064
1063
  /* If there is an active statement message, finalize it. */
1065
 
  message::Statement *statement= session.getStatementMessage();
 
1064
  message::Statement *statement= in_session->getStatementMessage();
1066
1065
 
1067
1066
  if (statement != NULL)
1068
1067
  {
1069
 
    finalizeStatementMessage(*statement, session);
 
1068
    finalizeStatementMessage(*statement, in_session);
1070
1069
  }
1071
1070
 
1072
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
 
1071
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1073
1072
 
1074
1073
  /*
1075
1074
   * It is possible that we could have a Transaction without any Statements
1079
1078
   */
1080
1079
  if (transaction->statement_size() == 0)
1081
1080
  {
1082
 
    cleanupTransactionMessage(transaction, session);
 
1081
    cleanupTransactionMessage(transaction, in_session);
1083
1082
    return 0;
1084
1083
  }
1085
1084
  
1086
 
  finalizeTransactionMessage(*transaction, session);
 
1085
  finalizeTransactionMessage(*transaction, in_session);
1087
1086
  
1088
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
1087
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1089
1088
 
1090
 
  cleanupTransactionMessage(transaction, session);
 
1089
  cleanupTransactionMessage(transaction, in_session);
1091
1090
 
1092
1091
  return static_cast<int>(result);
1093
1092
}
1094
1093
 
1095
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
 
                                               message::Statement::Type type,
1097
 
                                               Session::const_reference session)
 
1095
                                        message::Statement::Type in_type,
 
1096
                                        Session *in_session)
1098
1097
{
1099
 
  statement.set_type(type);
1100
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
 
1098
  statement.set_type(in_type);
 
1099
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1101
1100
 
1102
 
  if (session.variables.replicate_query)
1103
 
    statement.set_sql(session.getQueryString()->c_str());
 
1101
  if (in_session->variables.replicate_query)
 
1102
    statement.set_sql(in_session->getQueryString()->c_str());
1104
1103
}
1105
1104
 
1106
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
 
                                                   Session::reference session)
 
1106
                                            Session *in_session)
1108
1107
{
1109
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1110
 
  session.setStatementMessage(NULL);
 
1108
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1109
  in_session->setStatementMessage(NULL);
1111
1110
}
1112
1111
 
1113
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
1113
{
1115
1114
  ReplicationServices &replication_services= ReplicationServices::singleton();
1116
1115
  if (! replication_services.isActive())
1117
1116
    return;
1118
1117
  
1119
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1118
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1120
1119
 
1121
1120
  /*
1122
1121
   * OK, so there are two situations that we need to deal with here:
1139
1138
  {
1140
1139
    /* Remember the transaction ID so we can re-use it */
1141
1140
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1142
 
    uint32_t seg_id= transaction->segment_id();
1143
1141
 
1144
1142
    /*
1145
1143
     * Clear the transaction, create a Rollback statement message, 
1146
1144
     * attach it to the transaction, and push it to replicators.
1147
1145
     */
1148
1146
    transaction->Clear();
1149
 
    initTransactionMessage(*transaction, session, false);
 
1147
    initTransactionMessage(*transaction, in_session, false);
1150
1148
 
1151
1149
    /* Set the transaction ID to match the previous messages */
1152
1150
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1153
 
    transaction->set_segment_id(seg_id);
1154
 
    transaction->set_end_segment(true);
1155
1151
 
1156
1152
    message::Statement *statement= transaction->add_statement();
1157
1153
 
1158
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
 
    finalizeStatementMessage(*statement, session);
 
1154
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1155
    finalizeStatementMessage(*statement, in_session);
1160
1156
 
1161
 
    finalizeTransactionMessage(*transaction, session);
 
1157
    finalizeTransactionMessage(*transaction, in_session);
1162
1158
    
1163
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
 
1159
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1164
1160
  }
1165
 
 
1166
 
  cleanupTransactionMessage(transaction, session);
 
1161
  cleanupTransactionMessage(transaction, in_session);
1167
1162
}
1168
1163
 
1169
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
 
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1170
1165
{
1171
1166
  ReplicationServices &replication_services= ReplicationServices::singleton();
1172
1167
  if (! replication_services.isActive())
1173
1168
    return;
1174
1169
 
1175
 
  message::Statement *current_statement= session.getStatementMessage();
 
1170
  message::Statement *current_statement= in_session->getStatementMessage();
1176
1171
 
1177
1172
  /* If we never added a Statement message, nothing to undo. */
1178
1173
  if (current_statement == NULL)
1211
1206
   * Remove the Statement message we've been working with (same as
1212
1207
   * current_statement).
1213
1208
   */
1214
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1209
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1215
1210
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
1211
  statements_in_txn= transaction->mutable_statement();
1217
1212
  statements_in_txn->RemoveLast();
1218
 
  session.setStatementMessage(NULL);
 
1213
  in_session->setStatementMessage(NULL);
1219
1214
  
1220
1215
  /*
1221
1216
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1227
1222
    current_statement= transaction->add_statement();
1228
1223
    initStatementMessage(*current_statement,
1229
1224
                         message::Statement::ROLLBACK_STATEMENT,
1230
 
                         session);
1231
 
    finalizeStatementMessage(*current_statement, session);
 
1225
                         in_session);
 
1226
    finalizeStatementMessage(*current_statement, in_session);
1232
1227
  }
1233
1228
}
1234
 
 
1235
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1236
 
                                                                     message::Transaction *transaction)
1237
 
{
1238
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
 
  uint32_t seg_id= transaction->segment_id();
1240
 
  
1241
 
  transaction->set_end_segment(false);
1242
 
  commitTransactionMessage(session);
1243
 
  transaction= getActiveTransactionMessage(session, false);
1244
 
  
1245
 
  /* Set the transaction ID to match the previous messages */
1246
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1247
 
  transaction->set_segment_id(seg_id + 1);
1248
 
  transaction->set_end_segment(true);
1249
 
 
1250
 
  return transaction;
1251
 
}
1252
 
 
1253
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1254
 
                                                            Table &table,
 
1229
  
 
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1231
                                                            Table *in_table,
1255
1232
                                                            uint32_t *next_segment_id)
1256
1233
{
1257
 
  message::Statement *statement= session.getStatementMessage();
 
1234
  message::Statement *statement= in_session->getStatementMessage();
1258
1235
  message::Transaction *transaction= NULL;
1259
 
  
1260
 
  /*
1261
 
   * If statement is NULL, this is a new statement.
1262
 
   * If statement is NOT NULL, this a continuation of the same statement.
1263
 
   * This is because autocommitOrRollback() finalizes the statement so that
1264
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1265
 
   * share a single GPB message for multiple statements).
 
1236
 
 
1237
  /* 
 
1238
   * Check the type for the current Statement message, if it is anything
 
1239
   * other then INSERT we need to call finalize, this will ensure a 
 
1240
   * new InsertStatement is created. If it is of type INSERT check
 
1241
   * what table the INSERT belongs to, if it is a different table
 
1242
   * call finalize, so a new InsertStatement can be created. 
1266
1243
   */
1267
 
  if (statement == NULL)
1268
 
  {
1269
 
    transaction= getActiveTransactionMessage(session);
1270
 
 
1271
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1272
 
        transaction_message_threshold)
1273
 
    {
1274
 
      transaction= segmentTransactionMessage(session, transaction);
1275
 
    }
1276
 
 
1277
 
    statement= transaction->add_statement();
1278
 
    setInsertHeader(*statement, session, table);
1279
 
    session.setStatementMessage(statement);
1280
 
  }
1281
 
  else
1282
 
  {
1283
 
    transaction= getActiveTransactionMessage(session);
1284
 
    
 
1244
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1245
  {
 
1246
    finalizeStatementMessage(*statement, in_session);
 
1247
    statement= in_session->getStatementMessage();
 
1248
  } 
 
1249
  else if (statement != NULL)
 
1250
  {
 
1251
    transaction= getActiveTransactionMessage(in_session);
 
1252
 
1285
1253
    /*
1286
1254
     * If we've passed our threshold for the statement size (possible for
1287
1255
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1288
1256
     * the Transaction will keep it from getting huge).
1289
1257
     */
1290
1258
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1291
 
        transaction_message_threshold)
 
1259
      in_session->variables.transaction_message_threshold)
1292
1260
    {
1293
1261
      /* Remember the transaction ID so we can re-use it */
1294
1262
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1295
 
      uint32_t seg_id= transaction->segment_id();
1296
 
      
 
1263
 
1297
1264
      message::InsertData *current_data= statement->mutable_insert_data();
1298
 
      
 
1265
 
1299
1266
      /* Caller should use this value when adding a new record */
1300
1267
      *next_segment_id= current_data->segment_id() + 1;
1301
 
      
 
1268
 
1302
1269
      current_data->set_end_segment(false);
1303
 
      transaction->set_end_segment(false);
1304
 
      
 
1270
 
1305
1271
      /* 
1306
1272
       * Send the trx message to replicators after finalizing the 
1307
1273
       * statement and transaction. This will also set the Transaction
1308
1274
       * and Statement objects in Session to NULL.
1309
1275
       */
1310
 
      commitTransactionMessage(session);
1311
 
      
 
1276
      commitTransactionMessage(in_session);
 
1277
 
1312
1278
      /*
1313
1279
       * Statement and Transaction should now be NULL, so new ones will get
1314
1280
       * created. We reuse the transaction id since we are segmenting
1315
1281
       * one transaction.
1316
1282
       */
1317
 
      transaction= getActiveTransactionMessage(session, false);
 
1283
      statement= in_session->getStatementMessage();
 
1284
      transaction= getActiveTransactionMessage(in_session, false);
1318
1285
      assert(transaction != NULL);
1319
1286
 
1320
 
      statement= transaction->add_statement();
1321
 
      setInsertHeader(*statement, session, table);
1322
 
      session.setStatementMessage(statement);
1323
 
            
1324
1287
      /* Set the transaction ID to match the previous messages */
1325
1288
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1326
 
      transaction->set_segment_id(seg_id + 1);
1327
 
      transaction->set_end_segment(true);
1328
1289
    }
1329
1290
    else
1330
1291
    {
1331
 
      /*
1332
 
       * Continuation of the same statement. Carry forward the existing
1333
 
       * segment id.
1334
 
       */
1335
 
      const message::InsertData &current_data= statement->insert_data();
1336
 
      *next_segment_id= current_data.segment_id();
 
1292
      const message::InsertHeader &insert_header= statement->insert_header();
 
1293
      string old_table_name= insert_header.table_metadata().table_name();
 
1294
     
 
1295
      string current_table_name;
 
1296
      (void) in_table->getShare()->getTableName(current_table_name);
 
1297
 
 
1298
      if (current_table_name.compare(old_table_name))
 
1299
      {
 
1300
        finalizeStatementMessage(*statement, in_session);
 
1301
        statement= in_session->getStatementMessage();
 
1302
      }
 
1303
      else
 
1304
      {
 
1305
        /* carry forward the existing segment id */
 
1306
        const message::InsertData &current_data= statement->insert_data();
 
1307
        *next_segment_id= current_data.segment_id();
 
1308
      }
1337
1309
    }
 
1310
  } 
 
1311
 
 
1312
  if (statement == NULL)
 
1313
  {
 
1314
    /*
 
1315
     * Transaction will be non-NULL only if we had to segment it due to
 
1316
     * transaction size above.
 
1317
     */
 
1318
    if (transaction == NULL)
 
1319
      transaction= getActiveTransactionMessage(in_session);
 
1320
 
 
1321
    /* 
 
1322
     * Transaction message initialized and set, but no statement created
 
1323
     * yet.  We construct one and initialize it, here, then return the
 
1324
     * message after attaching the new Statement message pointer to the 
 
1325
     * Session for easy retrieval later...
 
1326
     */
 
1327
    statement= transaction->add_statement();
 
1328
    setInsertHeader(*statement, in_session, in_table);
 
1329
    in_session->setStatementMessage(statement);
1338
1330
  }
1339
 
  
1340
1331
  return *statement;
1341
1332
}
1342
1333
 
1343
1334
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
 
                                          Session::const_reference session,
1345
 
                                          Table &table)
 
1335
                                          Session *in_session,
 
1336
                                          Table *in_table)
1346
1337
{
1347
 
  initStatementMessage(statement, message::Statement::INSERT, session);
 
1338
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1348
1339
 
1349
1340
  /* 
1350
1341
   * Now we construct the specialized InsertHeader message inside
1355
1346
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1356
1347
 
1357
1348
  string schema_name;
1358
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1349
  (void) in_table->getShare()->getSchemaName(schema_name);
1359
1350
  string table_name;
1360
 
  (void) table.getShare()->getTableName(table_name);
 
1351
  (void) in_table->getShare()->getTableName(table_name);
1361
1352
 
1362
1353
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1363
1354
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1364
1355
 
1365
1356
  Field *current_field;
1366
 
  Field **table_fields= table.getFields();
 
1357
  Field **table_fields= in_table->getFields();
1367
1358
 
1368
1359
  message::FieldMetadata *field_metadata;
1369
1360
 
1370
1361
  /* We will read all the table's fields... */
1371
 
  table.setReadSet();
 
1362
  in_table->setReadSet();
1372
1363
 
1373
1364
  while ((current_field= *table_fields++) != NULL) 
1374
1365
  {
1378
1369
  }
1379
1370
}
1380
1371
 
1381
 
bool TransactionServices::insertRecord(Session::reference session,
1382
 
                                       Table &table)
 
1372
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1383
1373
{
1384
1374
  ReplicationServices &replication_services= ReplicationServices::singleton();
1385
1375
  if (! replication_services.isActive())
1386
1376
    return false;
1387
 
 
1388
 
  if (not table.getShare()->replicate())
1389
 
    return false;
1390
 
 
1391
1377
  /**
1392
1378
   * We do this check here because we don't want to even create a 
1393
1379
   * statement if there isn't a primary key on the table...
1396
1382
   *
1397
1383
   * Multi-column primary keys are handled how exactly?
1398
1384
   */
1399
 
  if (not table.getShare()->hasPrimaryKey())
 
1385
  if (not in_table->getShare()->hasPrimaryKey())
1400
1386
  {
1401
1387
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1402
1388
    return true;
1403
1389
  }
1404
1390
 
1405
1391
  uint32_t next_segment_id= 1;
1406
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
 
1392
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1407
1393
 
1408
1394
  message::InsertData *data= statement.mutable_insert_data();
1409
1395
  data->set_segment_id(next_segment_id);
1411
1397
  message::InsertRecord *record= data->add_record();
1412
1398
 
1413
1399
  Field *current_field;
1414
 
  Field **table_fields= table.getFields();
 
1400
  Field **table_fields= in_table->getFields();
1415
1401
 
1416
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1402
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1417
1403
  string_value->set_charset(system_charset_info);
1418
1404
 
1419
1405
  /* We will read all the table's fields... */
1420
 
  table.setReadSet();
 
1406
  in_table->setReadSet();
1421
1407
 
1422
1408
  while ((current_field= *table_fields++) != NULL) 
1423
1409
  {
1437
1423
  return false;
1438
1424
}
1439
1425
 
1440
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1441
 
                                                            Table &table,
 
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1427
                                                            Table *in_table,
1442
1428
                                                            const unsigned char *old_record, 
1443
1429
                                                            const unsigned char *new_record,
1444
1430
                                                            uint32_t *next_segment_id)
1445
1431
{
1446
 
  message::Statement *statement= session.getStatementMessage();
 
1432
  message::Statement *statement= in_session->getStatementMessage();
1447
1433
  message::Transaction *transaction= NULL;
1448
1434
 
1449
1435
  /*
1450
 
   * If statement is NULL, this is a new statement.
1451
 
   * If statement is NOT NULL, this a continuation of the same statement.
1452
 
   * This is because autocommitOrRollback() finalizes the statement so that
1453
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1454
 
   * share a single GPB message for multiple statements).
 
1436
   * Check the type for the current Statement message, if it is anything
 
1437
   * other then UPDATE we need to call finalize, this will ensure a
 
1438
   * new UpdateStatement is created. If it is of type UPDATE check
 
1439
   * what table the UPDATE belongs to, if it is a different table
 
1440
   * call finalize, so a new UpdateStatement can be created.
1455
1441
   */
1456
 
  if (statement == NULL)
 
1442
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1457
1443
  {
1458
 
    transaction= getActiveTransactionMessage(session);
1459
 
    
1460
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1461
 
        transaction_message_threshold)
1462
 
    {
1463
 
      transaction= segmentTransactionMessage(session, transaction);
1464
 
    }
1465
 
    
1466
 
    statement= transaction->add_statement();
1467
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1468
 
    session.setStatementMessage(statement);
 
1444
    finalizeStatementMessage(*statement, in_session);
 
1445
    statement= in_session->getStatementMessage();
1469
1446
  }
1470
 
  else
 
1447
  else if (statement != NULL)
1471
1448
  {
1472
 
    transaction= getActiveTransactionMessage(session);
1473
 
    
 
1449
    transaction= getActiveTransactionMessage(in_session);
 
1450
 
1474
1451
    /*
1475
1452
     * If we've passed our threshold for the statement size (possible for
1476
1453
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1477
1454
     * the Transaction will keep it from getting huge).
1478
1455
     */
1479
1456
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1480
 
        transaction_message_threshold)
 
1457
      in_session->variables.transaction_message_threshold)
1481
1458
    {
1482
1459
      /* Remember the transaction ID so we can re-use it */
1483
1460
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1484
 
      uint32_t seg_id= transaction->segment_id();
1485
 
      
 
1461
 
1486
1462
      message::UpdateData *current_data= statement->mutable_update_data();
1487
 
      
 
1463
 
1488
1464
      /* Caller should use this value when adding a new record */
1489
1465
      *next_segment_id= current_data->segment_id() + 1;
1490
 
      
 
1466
 
1491
1467
      current_data->set_end_segment(false);
1492
 
      transaction->set_end_segment(false);
1493
 
      
1494
 
      /* 
 
1468
 
 
1469
      /*
1495
1470
       * Send the trx message to replicators after finalizing the 
1496
1471
       * statement and transaction. This will also set the Transaction
1497
1472
       * and Statement objects in Session to NULL.
1498
1473
       */
1499
 
      commitTransactionMessage(session);
1500
 
      
 
1474
      commitTransactionMessage(in_session);
 
1475
 
1501
1476
      /*
1502
1477
       * Statement and Transaction should now be NULL, so new ones will get
1503
1478
       * created. We reuse the transaction id since we are segmenting
1504
1479
       * one transaction.
1505
1480
       */
1506
 
      transaction= getActiveTransactionMessage(session, false);
 
1481
      statement= in_session->getStatementMessage();
 
1482
      transaction= getActiveTransactionMessage(in_session, false);
1507
1483
      assert(transaction != NULL);
1508
 
      
1509
 
      statement= transaction->add_statement();
1510
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1511
 
      session.setStatementMessage(statement);
1512
 
      
 
1484
 
1513
1485
      /* Set the transaction ID to match the previous messages */
1514
1486
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1515
 
      transaction->set_segment_id(seg_id + 1);
1516
 
      transaction->set_end_segment(true);
1517
1487
    }
1518
1488
    else
1519
1489
    {
1520
 
      /*
1521
 
       * Continuation of the same statement. Carry forward the existing
1522
 
       * segment id.
1523
 
       */
1524
 
      const message::UpdateData &current_data= statement->update_data();
1525
 
      *next_segment_id= current_data.segment_id();
 
1490
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1491
      {
 
1492
        /* carry forward the existing segment id */
 
1493
        const message::UpdateData &current_data= statement->update_data();
 
1494
        *next_segment_id= current_data.segment_id();
 
1495
      } 
 
1496
      else 
 
1497
      {
 
1498
        finalizeStatementMessage(*statement, in_session);
 
1499
        statement= in_session->getStatementMessage();
 
1500
      }
1526
1501
    }
1527
1502
  }
1528
 
  
 
1503
 
 
1504
  if (statement == NULL)
 
1505
  {
 
1506
    /*
 
1507
     * Transaction will be non-NULL only if we had to segment it due to
 
1508
     * transaction size above.
 
1509
     */
 
1510
    if (transaction == NULL)
 
1511
      transaction= getActiveTransactionMessage(in_session);
 
1512
 
 
1513
    /* 
 
1514
     * Transaction message initialized and set, but no statement created
 
1515
     * yet.  We construct one and initialize it, here, then return the
 
1516
     * message after attaching the new Statement message pointer to the 
 
1517
     * Session for easy retrieval later...
 
1518
     */
 
1519
    statement= transaction->add_statement();
 
1520
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1521
    in_session->setStatementMessage(statement);
 
1522
  }
1529
1523
  return *statement;
1530
1524
}
1531
1525
 
 
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1527
                                                  Table *in_table,
 
1528
                                                  const unsigned char *old_record,
 
1529
                                                  const unsigned char *new_record)
 
1530
{
 
1531
  const message::UpdateHeader &update_header= statement.update_header();
 
1532
  string old_table_name= update_header.table_metadata().table_name();
 
1533
 
 
1534
  string current_table_name;
 
1535
  (void) in_table->getShare()->getTableName(current_table_name);
 
1536
  if (current_table_name.compare(old_table_name))
 
1537
  {
 
1538
    return false;
 
1539
  }
 
1540
  else
 
1541
  {
 
1542
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1543
     * match the updated fields in the new record, if they do not we must
 
1544
     * create a new UpdateHeader 
 
1545
     */
 
1546
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1547
 
 
1548
    Field *current_field;
 
1549
    Field **table_fields= in_table->getFields();
 
1550
    in_table->setReadSet();
 
1551
 
 
1552
    size_t num_calculated_updated_fields= 0;
 
1553
    bool found= false;
 
1554
    while ((current_field= *table_fields++) != NULL)
 
1555
    {
 
1556
      if (num_calculated_updated_fields > num_set_fields)
 
1557
      {
 
1558
        break;
 
1559
      }
 
1560
 
 
1561
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1562
      {
 
1563
        /* check that this field exists in the UpdateHeader record */
 
1564
        found= false;
 
1565
 
 
1566
        for (size_t x= 0; x < num_set_fields; ++x)
 
1567
        {
 
1568
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1569
          string name= field_metadata.name();
 
1570
          if (name.compare(current_field->field_name) == 0)
 
1571
          {
 
1572
            found= true;
 
1573
            ++num_calculated_updated_fields;
 
1574
            break;
 
1575
          } 
 
1576
        }
 
1577
        if (! found)
 
1578
        {
 
1579
          break;
 
1580
        } 
 
1581
      }
 
1582
    }
 
1583
 
 
1584
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1585
    {
 
1586
      return true;
 
1587
    } 
 
1588
    else 
 
1589
    {
 
1590
      return false;
 
1591
    }
 
1592
  }
 
1593
}  
 
1594
 
1532
1595
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
 
                                          Session::const_reference session,
1534
 
                                          Table &table,
 
1596
                                          Session *in_session,
 
1597
                                          Table *in_table,
1535
1598
                                          const unsigned char *old_record, 
1536
1599
                                          const unsigned char *new_record)
1537
1600
{
1538
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
 
1601
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1539
1602
 
1540
1603
  /* 
1541
1604
   * Now we construct the specialized UpdateHeader message inside
1546
1609
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1547
1610
 
1548
1611
  string schema_name;
1549
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1612
  (void) in_table->getShare()->getSchemaName(schema_name);
1550
1613
  string table_name;
1551
 
  (void) table.getShare()->getTableName(table_name);
 
1614
  (void) in_table->getShare()->getTableName(table_name);
1552
1615
 
1553
1616
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1554
1617
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1555
1618
 
1556
1619
  Field *current_field;
1557
 
  Field **table_fields= table.getFields();
 
1620
  Field **table_fields= in_table->getFields();
1558
1621
 
1559
1622
  message::FieldMetadata *field_metadata;
1560
1623
 
1561
1624
  /* We will read all the table's fields... */
1562
 
  table.setReadSet();
 
1625
  in_table->setReadSet();
1563
1626
 
1564
1627
  while ((current_field= *table_fields++) != NULL) 
1565
1628
  {
1567
1630
     * We add the "key field metadata" -- i.e. the fields which is
1568
1631
     * the primary key for the table.
1569
1632
     */
1570
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1633
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1571
1634
    {
1572
1635
      field_metadata= header->add_key_field_metadata();
1573
1636
      field_metadata->set_name(current_field->field_name);
1574
1637
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1575
1638
    }
1576
1639
 
1577
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1640
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1578
1641
    {
1579
1642
      /* Field is changed from old to new */
1580
1643
      field_metadata= header->add_set_field_metadata();
1583
1646
    }
1584
1647
  }
1585
1648
}
1586
 
 
1587
 
void TransactionServices::updateRecord(Session::reference session,
1588
 
                                       Table &table, 
 
1649
void TransactionServices::updateRecord(Session *in_session,
 
1650
                                       Table *in_table, 
1589
1651
                                       const unsigned char *old_record, 
1590
1652
                                       const unsigned char *new_record)
1591
1653
{
1593
1655
  if (! replication_services.isActive())
1594
1656
    return;
1595
1657
 
1596
 
  if (not table.getShare()->replicate())
1597
 
    return;
1598
 
 
1599
1658
  uint32_t next_segment_id= 1;
1600
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
 
1659
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1601
1660
 
1602
1661
  message::UpdateData *data= statement.mutable_update_data();
1603
1662
  data->set_segment_id(next_segment_id);
1605
1664
  message::UpdateRecord *record= data->add_record();
1606
1665
 
1607
1666
  Field *current_field;
1608
 
  Field **table_fields= table.getFields();
1609
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1667
  Field **table_fields= in_table->getFields();
 
1668
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1610
1669
  string_value->set_charset(system_charset_info);
1611
1670
 
1612
1671
  while ((current_field= *table_fields++) != NULL) 
1622
1681
     *
1623
1682
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1624
1683
     */
1625
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1684
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1626
1685
    {
1627
1686
      /* Store the original "read bit" for this field */
1628
1687
      bool is_read_set= current_field->isReadSet();
1629
1688
 
1630
1689
      /* We need to mark that we will "read" this field... */
1631
 
      table.setReadSet(current_field->position());
 
1690
      in_table->setReadSet(current_field->position());
1632
1691
 
1633
1692
      /* Read the string value of this field's contents */
1634
1693
      string_value= current_field->val_str_internal(string_value);
1657
1716
     * primary key field value.  Replication only supports tables
1658
1717
     * with a primary key.
1659
1718
     */
1660
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1719
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1661
1720
    {
1662
1721
      /**
1663
1722
       * To say the below is ugly is an understatement. But it works.
1675
1734
}
1676
1735
 
1677
1736
bool TransactionServices::isFieldUpdated(Field *current_field,
1678
 
                                         Table &table,
 
1737
                                         Table *in_table,
1679
1738
                                         const unsigned char *old_record,
1680
1739
                                         const unsigned char *new_record)
1681
1740
{
1684
1743
   * we do this crazy pointer fiddling to figure out if the current field
1685
1744
   * has been updated in the supplied record raw byte pointers.
1686
1745
   */
1687
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1746
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1747
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1689
1748
 
1690
1749
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1691
1750
 
1715
1774
  return isUpdated;
1716
1775
}  
1717
1776
 
1718
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1719
 
                                                            Table &table,
 
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1778
                                                            Table *in_table,
1720
1779
                                                            uint32_t *next_segment_id)
1721
1780
{
1722
 
  message::Statement *statement= session.getStatementMessage();
 
1781
  message::Statement *statement= in_session->getStatementMessage();
1723
1782
  message::Transaction *transaction= NULL;
1724
1783
 
1725
1784
  /*
1726
 
   * If statement is NULL, this is a new statement.
1727
 
   * If statement is NOT NULL, this a continuation of the same statement.
1728
 
   * This is because autocommitOrRollback() finalizes the statement so that
1729
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1730
 
   * share a single GPB message for multiple statements).
 
1785
   * Check the type for the current Statement message, if it is anything
 
1786
   * other then DELETE we need to call finalize, this will ensure a
 
1787
   * new DeleteStatement is created. If it is of type DELETE check
 
1788
   * what table the DELETE belongs to, if it is a different table
 
1789
   * call finalize, so a new DeleteStatement can be created.
1731
1790
   */
1732
 
  if (statement == NULL)
 
1791
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1733
1792
  {
1734
 
    transaction= getActiveTransactionMessage(session);
1735
 
    
1736
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1737
 
        transaction_message_threshold)
1738
 
    {
1739
 
      transaction= segmentTransactionMessage(session, transaction);
1740
 
    }
1741
 
    
1742
 
    statement= transaction->add_statement();
1743
 
    setDeleteHeader(*statement, session, table);
1744
 
    session.setStatementMessage(statement);
 
1793
    finalizeStatementMessage(*statement, in_session);
 
1794
    statement= in_session->getStatementMessage();
1745
1795
  }
1746
 
  else
 
1796
  else if (statement != NULL)
1747
1797
  {
1748
 
    transaction= getActiveTransactionMessage(session);
1749
 
    
 
1798
    transaction= getActiveTransactionMessage(in_session);
 
1799
 
1750
1800
    /*
1751
1801
     * If we've passed our threshold for the statement size (possible for
1752
1802
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1753
1803
     * the Transaction will keep it from getting huge).
1754
1804
     */
1755
1805
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1756
 
        transaction_message_threshold)
 
1806
      in_session->variables.transaction_message_threshold)
1757
1807
    {
1758
1808
      /* Remember the transaction ID so we can re-use it */
1759
1809
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1760
 
      uint32_t seg_id= transaction->segment_id();
1761
 
      
 
1810
 
1762
1811
      message::DeleteData *current_data= statement->mutable_delete_data();
1763
 
      
 
1812
 
1764
1813
      /* Caller should use this value when adding a new record */
1765
1814
      *next_segment_id= current_data->segment_id() + 1;
1766
 
      
 
1815
 
1767
1816
      current_data->set_end_segment(false);
1768
 
      transaction->set_end_segment(false);
1769
 
      
 
1817
 
1770
1818
      /* 
1771
1819
       * Send the trx message to replicators after finalizing the 
1772
1820
       * statement and transaction. This will also set the Transaction
1773
1821
       * and Statement objects in Session to NULL.
1774
1822
       */
1775
 
      commitTransactionMessage(session);
1776
 
      
 
1823
      commitTransactionMessage(in_session);
 
1824
 
1777
1825
      /*
1778
1826
       * Statement and Transaction should now be NULL, so new ones will get
1779
1827
       * created. We reuse the transaction id since we are segmenting
1780
1828
       * one transaction.
1781
1829
       */
1782
 
      transaction= getActiveTransactionMessage(session, false);
 
1830
      statement= in_session->getStatementMessage();
 
1831
      transaction= getActiveTransactionMessage(in_session, false);
1783
1832
      assert(transaction != NULL);
1784
 
      
1785
 
      statement= transaction->add_statement();
1786
 
      setDeleteHeader(*statement, session, table);
1787
 
      session.setStatementMessage(statement);
1788
 
      
 
1833
 
1789
1834
      /* Set the transaction ID to match the previous messages */
1790
1835
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1791
 
      transaction->set_segment_id(seg_id + 1);
1792
 
      transaction->set_end_segment(true);
1793
1836
    }
1794
1837
    else
1795
1838
    {
1796
 
      /*
1797
 
       * Continuation of the same statement. Carry forward the existing
1798
 
       * segment id.
1799
 
       */
1800
 
      const message::DeleteData &current_data= statement->delete_data();
1801
 
      *next_segment_id= current_data.segment_id();
 
1839
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1840
      string old_table_name= delete_header.table_metadata().table_name();
 
1841
 
 
1842
      string current_table_name;
 
1843
      (void) in_table->getShare()->getTableName(current_table_name);
 
1844
      if (current_table_name.compare(old_table_name))
 
1845
      {
 
1846
        finalizeStatementMessage(*statement, in_session);
 
1847
        statement= in_session->getStatementMessage();
 
1848
      }
 
1849
      else
 
1850
      {
 
1851
        /* carry forward the existing segment id */
 
1852
        const message::DeleteData &current_data= statement->delete_data();
 
1853
        *next_segment_id= current_data.segment_id();
 
1854
      }
1802
1855
    }
1803
1856
  }
1804
 
  
 
1857
 
 
1858
  if (statement == NULL)
 
1859
  {
 
1860
    /*
 
1861
     * Transaction will be non-NULL only if we had to segment it due to
 
1862
     * transaction size above.
 
1863
     */
 
1864
    if (transaction == NULL)
 
1865
      transaction= getActiveTransactionMessage(in_session);
 
1866
 
 
1867
    /* 
 
1868
     * Transaction message initialized and set, but no statement created
 
1869
     * yet.  We construct one and initialize it, here, then return the
 
1870
     * message after attaching the new Statement message pointer to the 
 
1871
     * Session for easy retrieval later...
 
1872
     */
 
1873
    statement= transaction->add_statement();
 
1874
    setDeleteHeader(*statement, in_session, in_table);
 
1875
    in_session->setStatementMessage(statement);
 
1876
  }
1805
1877
  return *statement;
1806
1878
}
1807
1879
 
1808
1880
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
 
                                          Session::const_reference session,
1810
 
                                          Table &table)
 
1881
                                          Session *in_session,
 
1882
                                          Table *in_table)
1811
1883
{
1812
 
  initStatementMessage(statement, message::Statement::DELETE, session);
 
1884
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1813
1885
 
1814
1886
  /* 
1815
1887
   * Now we construct the specialized DeleteHeader message inside
1819
1891
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1820
1892
 
1821
1893
  string schema_name;
1822
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1894
  (void) in_table->getShare()->getSchemaName(schema_name);
1823
1895
  string table_name;
1824
 
  (void) table.getShare()->getTableName(table_name);
 
1896
  (void) in_table->getShare()->getTableName(table_name);
1825
1897
 
1826
1898
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1827
1899
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1828
1900
 
1829
1901
  Field *current_field;
1830
 
  Field **table_fields= table.getFields();
 
1902
  Field **table_fields= in_table->getFields();
1831
1903
 
1832
1904
  message::FieldMetadata *field_metadata;
1833
1905
 
1838
1910
     * primary key field value.  Replication only supports tables
1839
1911
     * with a primary key.
1840
1912
     */
1841
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1913
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1842
1914
    {
1843
1915
      field_metadata= header->add_key_field_metadata();
1844
1916
      field_metadata->set_name(current_field->field_name);
1847
1919
  }
1848
1920
}
1849
1921
 
1850
 
void TransactionServices::deleteRecord(Session::reference session,
1851
 
                                       Table &table,
1852
 
                                       bool use_update_record)
 
1922
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1853
1923
{
1854
1924
  ReplicationServices &replication_services= ReplicationServices::singleton();
1855
1925
  if (! replication_services.isActive())
1856
1926
    return;
1857
1927
 
1858
 
  if (not table.getShare()->replicate())
1859
 
    return;
1860
 
 
1861
1928
  uint32_t next_segment_id= 1;
1862
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
 
1929
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1863
1930
 
1864
1931
  message::DeleteData *data= statement.mutable_delete_data();
1865
1932
  data->set_segment_id(next_segment_id);
1867
1934
  message::DeleteRecord *record= data->add_record();
1868
1935
 
1869
1936
  Field *current_field;
1870
 
  Field **table_fields= table.getFields();
1871
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1937
  Field **table_fields= in_table->getFields();
 
1938
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1872
1939
  string_value->set_charset(system_charset_info);
1873
1940
 
1874
1941
  while ((current_field= *table_fields++) != NULL) 
1878
1945
     * primary key field value.  Replication only supports tables
1879
1946
     * with a primary key.
1880
1947
     */
1881
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1948
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1882
1949
    {
1883
1950
      if (use_update_record)
1884
1951
      {
1890
1957
         * We are careful not to change anything in old_ptr.
1891
1958
         */
1892
1959
        const unsigned char *old_ptr= current_field->ptr;
1893
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
 
1960
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1894
1961
        string_value= current_field->val_str_internal(string_value);
1895
1962
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1896
1963
      }
1907
1974
  }
1908
1975
}
1909
1976
 
1910
 
void TransactionServices::createTable(Session::reference session,
 
1977
void TransactionServices::createTable(Session *in_session,
1911
1978
                                      const message::Table &table)
1912
1979
{
1913
1980
  ReplicationServices &replication_services= ReplicationServices::singleton();
1914
1981
  if (! replication_services.isActive())
1915
1982
    return;
1916
 
 
1917
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1918
 
    return;
1919
 
 
1920
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1983
  
 
1984
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1921
1985
  message::Statement *statement= transaction->add_statement();
1922
1986
 
1923
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
 
1987
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1924
1988
 
1925
1989
  /* 
1926
1990
   * Construct the specialized CreateTableStatement message and attach
1930
1994
  message::Table *new_table_message= create_table_statement->mutable_table();
1931
1995
  *new_table_message= table;
1932
1996
 
1933
 
  finalizeStatementMessage(*statement, session);
 
1997
  finalizeStatementMessage(*statement, in_session);
1934
1998
 
1935
 
  finalizeTransactionMessage(*transaction, session);
 
1999
  finalizeTransactionMessage(*transaction, in_session);
1936
2000
  
1937
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2001
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1938
2002
 
1939
 
  cleanupTransactionMessage(transaction, session);
 
2003
  cleanupTransactionMessage(transaction, in_session);
1940
2004
 
1941
2005
}
1942
2006
 
1943
 
void TransactionServices::createSchema(Session::reference session,
 
2007
void TransactionServices::createSchema(Session *in_session,
1944
2008
                                       const message::Schema &schema)
1945
2009
{
1946
2010
  ReplicationServices &replication_services= ReplicationServices::singleton();
1947
2011
  if (! replication_services.isActive())
1948
2012
    return;
1949
 
 
1950
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1951
 
    return;
1952
 
 
1953
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2013
  
 
2014
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1954
2015
  message::Statement *statement= transaction->add_statement();
1955
2016
 
1956
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
 
2017
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1957
2018
 
1958
2019
  /* 
1959
2020
   * Construct the specialized CreateSchemaStatement message and attach
1963
2024
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
2025
  *new_schema_message= schema;
1965
2026
 
1966
 
  finalizeStatementMessage(*statement, session);
 
2027
  finalizeStatementMessage(*statement, in_session);
1967
2028
 
1968
 
  finalizeTransactionMessage(*transaction, session);
 
2029
  finalizeTransactionMessage(*transaction, in_session);
1969
2030
  
1970
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2031
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1971
2032
 
1972
 
  cleanupTransactionMessage(transaction, session);
 
2033
  cleanupTransactionMessage(transaction, in_session);
1973
2034
 
1974
2035
}
1975
2036
 
1976
 
void TransactionServices::dropSchema(Session::reference session,
1977
 
                                     identifier::Schema::const_reference identifier,
1978
 
                                     message::schema::const_reference schema)
 
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1979
2038
{
1980
2039
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
2040
  if (! replication_services.isActive())
1982
2041
    return;
1983
 
 
1984
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1985
 
    return;
1986
 
 
1987
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2042
  
 
2043
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1988
2044
  message::Statement *statement= transaction->add_statement();
1989
2045
 
1990
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
 
2046
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1991
2047
 
1992
2048
  /* 
1993
2049
   * Construct the specialized DropSchemaStatement message and attach
1995
2051
   */
1996
2052
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1997
2053
 
1998
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1999
 
 
2000
 
  finalizeStatementMessage(*statement, session);
2001
 
 
2002
 
  finalizeTransactionMessage(*transaction, session);
2003
 
  
2004
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2005
 
 
2006
 
  cleanupTransactionMessage(transaction, session);
2007
 
}
2008
 
 
2009
 
void TransactionServices::alterSchema(Session::reference session,
2010
 
                                      const message::Schema &old_schema,
2011
 
                                      const message::Schema &new_schema)
2012
 
{
2013
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2014
 
  if (! replication_services.isActive())
2015
 
    return;
2016
 
 
2017
 
  if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2018
 
    return;
2019
 
 
2020
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2021
 
  message::Statement *statement= transaction->add_statement();
2022
 
 
2023
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2024
 
 
2025
 
  /* 
2026
 
   * Construct the specialized AlterSchemaStatement message and attach
2027
 
   * it to the generic Statement message
2028
 
   */
2029
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2030
 
 
2031
 
  message::Schema *before= alter_schema_statement->mutable_before();
2032
 
  message::Schema *after= alter_schema_statement->mutable_after();
2033
 
 
2034
 
  *before= old_schema;
2035
 
  *after= new_schema;
2036
 
 
2037
 
  finalizeStatementMessage(*statement, session);
2038
 
 
2039
 
  finalizeTransactionMessage(*transaction, session);
2040
 
  
2041
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2042
 
 
2043
 
  cleanupTransactionMessage(transaction, session);
2044
 
}
2045
 
 
2046
 
void TransactionServices::dropTable(Session::reference session,
2047
 
                                    identifier::Table::const_reference identifier,
2048
 
                                    message::table::const_reference table,
2049
 
                                    bool if_exists)
2050
 
{
2051
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2052
 
  if (! replication_services.isActive())
2053
 
    return;
2054
 
 
2055
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2056
 
    return;
2057
 
 
2058
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2059
 
  message::Statement *statement= transaction->add_statement();
2060
 
 
2061
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
 
2054
  drop_schema_statement->set_schema_name(schema_name);
 
2055
 
 
2056
  finalizeStatementMessage(*statement, in_session);
 
2057
 
 
2058
  finalizeTransactionMessage(*transaction, in_session);
 
2059
  
 
2060
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2061
 
 
2062
  cleanupTransactionMessage(transaction, in_session);
 
2063
}
 
2064
 
 
2065
void TransactionServices::dropTable(Session *in_session,
 
2066
                                    const string &schema_name,
 
2067
                                    const string &table_name)
 
2068
{
 
2069
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2070
  if (! replication_services.isActive())
 
2071
    return;
 
2072
  
 
2073
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2074
  message::Statement *statement= transaction->add_statement();
 
2075
 
 
2076
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2062
2077
 
2063
2078
  /* 
2064
2079
   * Construct the specialized DropTableStatement message and attach
2066
2081
   */
2067
2082
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2068
2083
 
2069
 
  drop_table_statement->set_if_exists_clause(if_exists);
2070
 
 
2071
2084
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2072
2085
 
2073
 
  table_metadata->set_schema_name(identifier.getSchemaName());
2074
 
  table_metadata->set_table_name(identifier.getTableName());
2075
 
 
2076
 
  finalizeStatementMessage(*statement, session);
2077
 
 
2078
 
  finalizeTransactionMessage(*transaction, session);
 
2086
  table_metadata->set_schema_name(schema_name);
 
2087
  table_metadata->set_table_name(table_name);
 
2088
 
 
2089
  finalizeStatementMessage(*statement, in_session);
 
2090
 
 
2091
  finalizeTransactionMessage(*transaction, in_session);
2079
2092
  
2080
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2093
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2081
2094
 
2082
 
  cleanupTransactionMessage(transaction, session);
 
2095
  cleanupTransactionMessage(transaction, in_session);
2083
2096
}
2084
2097
 
2085
 
void TransactionServices::truncateTable(Session::reference session,
2086
 
                                        Table &table)
 
2098
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2087
2099
{
2088
2100
  ReplicationServices &replication_services= ReplicationServices::singleton();
2089
2101
  if (! replication_services.isActive())
2090
2102
    return;
2091
 
 
2092
 
  if (not table.getShare()->replicate())
2093
 
    return;
2094
 
 
2095
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2103
  
 
2104
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
2105
  message::Statement *statement= transaction->add_statement();
2097
2106
 
2098
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
 
2107
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2099
2108
 
2100
2109
  /* 
2101
2110
   * Construct the specialized TruncateTableStatement message and attach
2105
2114
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2106
2115
 
2107
2116
  string schema_name;
2108
 
  (void) table.getShare()->getSchemaName(schema_name);
2109
 
 
 
2117
  (void) in_table->getShare()->getSchemaName(schema_name);
2110
2118
  string table_name;
2111
 
  (void) table.getShare()->getTableName(table_name);
 
2119
  (void) in_table->getShare()->getTableName(table_name);
2112
2120
 
2113
2121
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
2122
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2115
2123
 
2116
 
  finalizeStatementMessage(*statement, session);
 
2124
  finalizeStatementMessage(*statement, in_session);
2117
2125
 
2118
 
  finalizeTransactionMessage(*transaction, session);
 
2126
  finalizeTransactionMessage(*transaction, in_session);
2119
2127
  
2120
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2128
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2121
2129
 
2122
 
  cleanupTransactionMessage(transaction, session);
 
2130
  cleanupTransactionMessage(transaction, in_session);
2123
2131
}
2124
2132
 
2125
 
void TransactionServices::rawStatement(Session::reference session,
2126
 
                                       const string &query)
 
2133
void TransactionServices::rawStatement(Session *in_session, const string &query)
2127
2134
{
2128
2135
  ReplicationServices &replication_services= ReplicationServices::singleton();
2129
2136
  if (! replication_services.isActive())
2130
2137
    return;
2131
2138
 
2132
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2139
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2133
2140
  message::Statement *statement= transaction->add_statement();
2134
2141
 
2135
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
 
2142
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2136
2143
  statement->set_sql(query);
2137
 
  finalizeStatementMessage(*statement, session);
 
2144
  finalizeStatementMessage(*statement, in_session);
2138
2145
 
2139
 
  finalizeTransactionMessage(*transaction, session);
 
2146
  finalizeTransactionMessage(*transaction, in_session);
2140
2147
  
2141
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2148
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2142
2149
 
2143
 
  cleanupTransactionMessage(transaction, session);
 
2150
  cleanupTransactionMessage(transaction, in_session);
2144
2151
}
2145
2152
 
2146
 
int TransactionServices::sendEvent(Session::reference session,
2147
 
                                   const message::Event &event)
 
2153
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2148
2154
{
2149
2155
  ReplicationServices &replication_services= ReplicationServices::singleton();
2150
2156
  if (! replication_services.isActive())
2162
2168
 
2163
2169
  trx_event->CopyFrom(event);
2164
2170
 
2165
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
2171
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2166
2172
 
2167
2173
  delete transaction;
2168
2174
 
2169
2175
  return static_cast<int>(result);
2170
2176
}
2171
2177
 
2172
 
bool TransactionServices::sendStartupEvent(Session::reference session)
 
2178
bool TransactionServices::sendStartupEvent(Session *session)
2173
2179
{
2174
2180
  message::Event event;
2175
2181
  event.set_type(message::Event::STARTUP);
2178
2184
  return true;
2179
2185
}
2180
2186
 
2181
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
 
2187
bool TransactionServices::sendShutdownEvent(Session *session)
2182
2188
{
2183
2189
  message::Event event;
2184
2190
  event.set_type(message::Event::SHUTDOWN);