~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008 Sun Microsystems, Inc.
 
5
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  This program is free software; you can redistribute it and/or modify
8
8
 *  it under the terms of the GNU General Public License as published by
64
64
#include "drizzled/lock.h"
65
65
#include "drizzled/item/int.h"
66
66
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/timestamp.h"
 
67
#include "drizzled/field/epoch.h"
68
68
#include "drizzled/plugin/client.h"
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session *session,
 
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session->transaction.stmt;
333
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session.transaction.stmt;
 
333
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session *session,
 
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session->transaction.stmt;
366
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session.transaction.stmt;
 
366
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session->transaction.all;
387
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session.transaction.all;
 
387
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session.server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session->transaction.xid_state.xid.is_null())
404
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
403
  if (session.transaction.xid_state.xid.is_null())
 
404
    session.transaction.xid_state.xid.set(session.getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session->transaction.all;
417
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session.transaction.all;
 
417
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session.server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session->transaction.xid_state.xid.is_null())
434
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
433
  if (session.transaction.xid_state.xid.is_null())
 
434
    session.transaction.xid_state.xid.set(session.getQueryId());
435
435
 
436
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session.getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
457
457
{
458
 
  if (session->getXaId() == 0)
 
458
  if (session.getXaId() == 0)
459
459
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
460
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
461
461
  }
462
462
 
463
 
  return session->getXaId();
 
463
  return session.getXaId();
464
464
}
465
465
 
466
 
/**
467
 
  @retval
468
 
    0   ok
469
 
  @retval
470
 
    1   transaction was rolled back
471
 
  @retval
472
 
    2   error during commit, data may be inconsistent
473
 
 
474
 
  @todo
475
 
    Since we don't support nested statement transactions in 5.0,
476
 
    we can't commit or rollback stmt transactions while we are inside
477
 
    stored functions or triggers. So we simply do nothing now.
478
 
    TODO: This should be fixed in later ( >= 5.1) releases.
479
 
*/
480
 
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
 
466
int TransactionServices::commitTransaction(Session::reference session,
 
467
                                           bool normal_transaction)
481
468
{
482
469
  int error= 0, cookie= 0;
483
470
  /*
484
471
    'all' means that this is either an explicit commit issued by
485
472
    user, or an implicit commit issued by a DDL.
486
473
  */
487
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
474
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
489
476
 
490
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
477
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
491
478
 
492
479
  /*
493
480
    We must not commit the normal transaction if a statement
495
482
    flags will not get propagated to its normal transaction's
496
483
    counterpart.
497
484
  */
498
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
499
 
              trans == &session->transaction.stmt);
 
485
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
486
              trans == &session.transaction.stmt);
500
487
 
501
488
  if (resource_contexts.empty() == false)
502
489
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
490
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
504
491
    {
505
492
      rollbackTransaction(session, normal_transaction);
506
493
      return 1;
531
518
 
532
519
        if (resource->participatesInXaTransaction())
533
520
        {
534
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
 
521
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
535
522
          {
536
523
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
537
524
            error= 1;
538
525
          }
539
526
          else
540
527
          {
541
 
            session->status_var.ha_prepare_count++;
 
528
            session.status_var.ha_prepare_count++;
542
529
          }
543
530
        }
544
531
      }
560
547
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
548
end:
562
549
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
 
550
      session.startWaitingGlobalReadLock();
564
551
  }
565
552
  return error;
566
553
}
569
556
  @note
570
557
  This function does not care about global read lock. A caller should.
571
558
*/
572
 
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
 
559
int TransactionServices::commitPhaseOne(Session::reference session,
 
560
                                        bool normal_transaction)
573
561
{
574
562
  int error=0;
575
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
563
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
576
564
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
577
565
 
578
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
566
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
567
  bool all= normal_transaction;
 
568
 
 
569
  /* If we're in autocommit then we have a real transaction to commit
 
570
     (except if it's BEGIN)
 
571
  */
 
572
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
573
    all= true;
579
574
 
580
575
  if (resource_contexts.empty() == false)
581
576
  {
590
585
 
591
586
      if (resource->participatesInXaTransaction())
592
587
      {
593
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
 
588
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
594
589
        {
595
590
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
596
591
          error= 1;
597
592
        }
598
593
        else if (normal_transaction)
599
594
        {
600
 
          session->status_var.ha_commit_count++;
 
595
          session.status_var.ha_commit_count++;
601
596
        }
602
597
      }
603
598
      else if (resource->participatesInSqlTransaction())
604
599
      {
605
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
 
600
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
606
601
        {
607
602
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
608
603
          error= 1;
609
604
        }
610
605
        else if (normal_transaction)
611
606
        {
612
 
          session->status_var.ha_commit_count++;
 
607
          session.status_var.ha_commit_count++;
613
608
        }
614
609
      }
615
610
      resource_context->reset(); /* keep it conveniently zero-filled */
616
611
    }
617
612
 
618
613
    if (is_real_trans)
619
 
      session->transaction.xid_state.xid.null();
 
614
      session.transaction.xid_state.xid.null();
620
615
 
621
616
    if (normal_transaction)
622
617
    {
623
 
      session->variables.tx_isolation= session->session_tx_isolation;
624
 
      session->transaction.cleanup();
 
618
      session.variables.tx_isolation= session.session_tx_isolation;
 
619
      session.transaction.cleanup();
625
620
    }
626
621
  }
627
622
  trans->reset();
628
623
  return error;
629
624
}
630
625
 
631
 
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
 
626
int TransactionServices::rollbackTransaction(Session::reference session,
 
627
                                             bool normal_transaction)
632
628
{
633
629
  int error= 0;
634
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
630
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
635
631
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
636
632
 
637
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
633
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
634
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
638
635
 
639
636
  /*
640
637
    We must not rollback the normal transaction if a statement
641
638
    transaction is pending.
642
639
  */
643
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
644
 
              trans == &session->transaction.stmt);
 
640
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
641
              trans == &session.transaction.stmt);
645
642
 
646
643
  if (resource_contexts.empty() == false)
647
644
  {
656
653
 
657
654
      if (resource->participatesInXaTransaction())
658
655
      {
659
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
 
656
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
660
657
        {
661
658
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
659
          error= 1;
663
660
        }
664
661
        else if (normal_transaction)
665
662
        {
666
 
          session->status_var.ha_rollback_count++;
 
663
          session.status_var.ha_rollback_count++;
667
664
        }
668
665
      }
669
666
      else if (resource->participatesInSqlTransaction())
670
667
      {
671
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
 
668
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
672
669
        {
673
670
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
671
          error= 1;
675
672
        }
676
673
        else if (normal_transaction)
677
674
        {
678
 
          session->status_var.ha_rollback_count++;
 
675
          session.status_var.ha_rollback_count++;
679
676
        }
680
677
      }
681
678
      resource_context->reset(); /* keep it conveniently zero-filled */
688
685
     * a rollback statement with the corresponding transaction ID
689
686
     * to rollback.
690
687
     */
691
 
    if (normal_transaction)
 
688
    if (all)
692
689
      rollbackTransactionMessage(session);
 
690
    else
 
691
      rollbackStatementMessage(session);
693
692
 
694
693
    if (is_real_trans)
695
 
      session->transaction.xid_state.xid.null();
 
694
      session.transaction.xid_state.xid.null();
696
695
    if (normal_transaction)
697
696
    {
698
 
      session->variables.tx_isolation=session->session_tx_isolation;
699
 
      session->transaction.cleanup();
 
697
      session.variables.tx_isolation=session.session_tx_isolation;
 
698
      session.transaction.cleanup();
700
699
    }
701
700
  }
702
701
  if (normal_transaction)
703
 
    session->transaction_rollback_request= false;
 
702
    session.transaction_rollback_request= false;
704
703
 
705
704
  /*
706
705
   * If a non-transactional table was updated, warn the user
707
706
   */
708
707
  if (is_real_trans &&
709
 
      session->transaction.all.hasModifiedNonTransData() &&
710
 
      session->getKilled() != Session::KILL_CONNECTION)
 
708
      session.transaction.all.hasModifiedNonTransData() &&
 
709
      session.getKilled() != Session::KILL_CONNECTION)
711
710
  {
712
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
711
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
712
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
713
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
715
714
  }
717
716
  return error;
718
717
}
719
718
 
720
 
/**
721
 
  This is used to commit or rollback a single statement depending on
722
 
  the value of error.
723
 
 
724
 
  @note
725
 
    Note that if the autocommit is on, then the following call inside
726
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
727
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
728
 
    the user has used LOCK TABLES then that mechanism does not know to do the
729
 
    commit.
730
 
*/
731
 
int TransactionServices::autocommitOrRollback(Session *session, int error)
 
719
int TransactionServices::autocommitOrRollback(Session::reference session,
 
720
                                              int error)
732
721
{
 
722
  /* One GPB Statement message per SQL statement */
 
723
  message::Statement *statement= session.getStatementMessage();
 
724
  if ((statement != NULL) && (! error))
 
725
    finalizeStatementMessage(*statement, session);
733
726
 
734
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
 
727
  if (session.transaction.stmt.getResourceContexts().empty() == false)
735
728
  {
736
 
    TransactionContext *trans = &session->transaction.stmt;
 
729
    TransactionContext *trans = &session.transaction.stmt;
737
730
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
738
731
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
739
732
         it != resource_contexts.end();
741
734
    {
742
735
      ResourceContext *resource_context= *it;
743
736
 
744
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
737
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
745
738
    }
746
739
 
747
740
    if (! error)
752
745
    else
753
746
    {
754
747
      (void) rollbackTransaction(session, false);
755
 
      if (session->transaction_rollback_request)
 
748
      if (session.transaction_rollback_request)
 
749
      {
756
750
        (void) rollbackTransaction(session, true);
 
751
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
 
752
      }
757
753
    }
758
754
 
759
 
    session->variables.tx_isolation= session->session_tx_isolation;
 
755
    session.variables.tx_isolation= session.session_tx_isolation;
760
756
  }
 
757
 
761
758
  return error;
762
759
}
763
760
 
771
768
  }
772
769
};
773
770
 
774
 
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
 
771
int TransactionServices::rollbackToSavepoint(Session::reference session,
 
772
                                             NamedSavepoint &sv)
775
773
{
776
774
  int error= 0;
777
 
  TransactionContext *trans= &session->transaction.all;
 
775
  TransactionContext *trans= &session.transaction.all;
778
776
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
779
777
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
780
778
 
794
792
 
795
793
    if (resource->participatesInSqlTransaction())
796
794
    {
797
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
 
795
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
798
796
      {
799
797
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
800
798
        error= 1;
801
799
      }
802
800
      else
803
801
      {
804
 
        session->status_var.ha_savepoint_rollback_count++;
 
802
        session.status_var.ha_savepoint_rollback_count++;
805
803
      }
806
804
    }
807
805
    trans->no_2pc|= not resource->participatesInXaTransaction();
851
849
 
852
850
      if (resource->participatesInSqlTransaction())
853
851
      {
854
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
 
852
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
855
853
        {
856
854
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
857
855
          error= 1;
858
856
        }
859
857
        else
860
858
        {
861
 
          session->status_var.ha_rollback_count++;
 
859
          session.status_var.ha_rollback_count++;
862
860
        }
863
861
      }
864
862
      resource_context->reset(); /* keep it conveniently zero-filled */
881
879
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
882
880
      if (num_statements == 0)
883
881
      {    
884
 
        session->setStatementMessage(NULL);
 
882
        session.setStatementMessage(NULL);
885
883
      }    
886
884
      else 
887
885
      {
888
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
886
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
889
887
      }    
890
 
      session->setTransactionMessage(savepoint_transaction_copy);
 
888
      session.setTransactionMessage(savepoint_transaction_copy);
891
889
    }
892
890
  }
893
891
 
900
898
  section "4.33.4 SQL-statements and transaction states",
901
899
  NamedSavepoint is *not* transaction-initiating SQL-statement
902
900
*/
903
 
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
 
901
int TransactionServices::setSavepoint(Session::reference session,
 
902
                                      NamedSavepoint &sv)
904
903
{
905
904
  int error= 0;
906
 
  TransactionContext *trans= &session->transaction.all;
 
905
  TransactionContext *trans= &session.transaction.all;
907
906
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
908
907
 
909
908
  if (resource_contexts.empty() == false)
919
918
 
920
919
      if (resource->participatesInSqlTransaction())
921
920
      {
922
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
 
921
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
923
922
        {
924
923
          my_error(ER_GET_ERRNO, MYF(0), err);
925
924
          error= 1;
926
925
        }
927
926
        else
928
927
        {
929
 
          session->status_var.ha_savepoint_count++;
 
928
          session.status_var.ha_savepoint_count++;
930
929
        }
931
930
      }
932
931
    }
938
937
 
939
938
  if (shouldConstructMessages())
940
939
  {
941
 
    message::Transaction *transaction= session->getTransactionMessage();
 
940
    message::Transaction *transaction= session.getTransactionMessage();
942
941
                  
943
942
    if (transaction != NULL)
944
943
    {
951
950
  return error;
952
951
}
953
952
 
954
 
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
 
953
int TransactionServices::releaseSavepoint(Session::reference session,
 
954
                                          NamedSavepoint &sv)
955
955
{
956
956
  int error= 0;
957
957
 
968
968
 
969
969
    if (resource->participatesInSqlTransaction())
970
970
    {
971
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
 
971
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
972
972
      {
973
973
        my_error(ER_GET_ERRNO, MYF(0), err);
974
974
        error= 1;
985
985
  return replication_services.isActive();
986
986
}
987
987
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
 
989
                                                                       bool should_inc_trx_id)
989
990
{
990
 
  message::Transaction *transaction= in_session->getTransactionMessage();
 
991
  message::Transaction *transaction= session.getTransactionMessage();
991
992
 
992
993
  if (unlikely(transaction == NULL))
993
994
  {
997
998
     * deleting transaction message when done with it.
998
999
     */
999
1000
    transaction= new (nothrow) message::Transaction();
1000
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1001
 
    in_session->setTransactionMessage(transaction);
 
1001
    initTransactionMessage(*transaction, session, should_inc_trx_id);
 
1002
    session.setTransactionMessage(transaction);
1002
1003
    return transaction;
1003
1004
  }
1004
1005
  else
1005
1006
    return transaction;
1006
1007
}
1007
1008
 
1008
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1009
 
                                                 Session *in_session,
 
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
 
1010
                                                 Session::reference session,
1010
1011
                                                 bool should_inc_trx_id)
1011
1012
{
1012
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1013
 
  trx->set_server_id(in_session->getServerId());
 
1013
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1014
  trx->set_server_id(session.getServerId());
1014
1015
 
1015
1016
  if (should_inc_trx_id)
1016
1017
  {
1017
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1018
 
    in_session->setXaId(0);
1019
 
  }  
 
1018
    trx->set_transaction_id(getCurrentTransactionId(session));
 
1019
    session.setXaId(0);
 
1020
  }
1020
1021
  else
1021
 
  { 
 
1022
  {
 
1023
    /* trx and seg id will get set properly elsewhere */
1022
1024
    trx->set_transaction_id(0);
1023
1025
  }
1024
1026
 
1025
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1026
 
}
1027
 
 
1028
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1029
 
                                              Session *in_session)
1030
 
{
1031
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1032
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1033
 
}
1034
 
 
1035
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1036
 
                                             Session *in_session)
1037
 
{
1038
 
  delete in_transaction;
1039
 
  in_session->setStatementMessage(NULL);
1040
 
  in_session->setTransactionMessage(NULL);
1041
 
  in_session->setXaId(0);
1042
 
}
1043
 
 
1044
 
int TransactionServices::commitTransactionMessage(Session *in_session)
 
1027
  trx->set_start_timestamp(session.getCurrentTimestamp());
 
1028
  
 
1029
  /* segment info may get set elsewhere as needed */
 
1030
  transaction.set_segment_id(1);
 
1031
  transaction.set_end_segment(true);
 
1032
}
 
1033
 
 
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
 
1035
                                                     Session::const_reference session)
 
1036
{
 
1037
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1038
  trx->set_end_timestamp(session.getCurrentTimestamp());
 
1039
}
 
1040
 
 
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
 
1042
                                                    Session::reference session)
 
1043
{
 
1044
  delete transaction;
 
1045
  session.setStatementMessage(NULL);
 
1046
  session.setTransactionMessage(NULL);
 
1047
  session.setXaId(0);
 
1048
}
 
1049
 
 
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1045
1051
{
1046
1052
  ReplicationServices &replication_services= ReplicationServices::singleton();
1047
1053
  if (! replication_services.isActive())
1048
1054
    return 0;
1049
1055
 
1050
 
  /* If there is an active statement message, finalize it */
1051
 
  message::Statement *statement= in_session->getStatementMessage();
 
1056
  /*
 
1057
   * If no Transaction message was ever created, then no data modification
 
1058
   * occurred inside the transaction, so nothing to do.
 
1059
   */
 
1060
  if (session.getTransactionMessage() == NULL)
 
1061
    return 0;
 
1062
  
 
1063
  /* If there is an active statement message, finalize it. */
 
1064
  message::Statement *statement= session.getStatementMessage();
1052
1065
 
1053
1066
  if (statement != NULL)
1054
1067
  {
1055
 
    finalizeStatementMessage(*statement, in_session);
1056
 
  }
1057
 
  else
1058
 
    return 0; /* No data modification occurred inside the transaction */
1059
 
  
1060
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1061
 
 
1062
 
  finalizeTransactionMessage(*transaction, in_session);
1063
 
  
1064
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1065
 
 
1066
 
  cleanupTransactionMessage(transaction, in_session);
 
1068
    finalizeStatementMessage(*statement, session);
 
1069
  }
 
1070
 
 
1071
  message::Transaction* transaction= getActiveTransactionMessage(session);
 
1072
 
 
1073
  /*
 
1074
   * It is possible that we could have a Transaction without any Statements
 
1075
   * if we had created a Statement but had to roll it back due to it failing
 
1076
   * mid-execution, and no subsequent Statements were added to the Transaction
 
1077
   * message. In this case, we simply clean up the message and not push it.
 
1078
   */
 
1079
  if (transaction->statement_size() == 0)
 
1080
  {
 
1081
    cleanupTransactionMessage(transaction, session);
 
1082
    return 0;
 
1083
  }
 
1084
  
 
1085
  finalizeTransactionMessage(*transaction, session);
 
1086
  
 
1087
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
1088
 
 
1089
  cleanupTransactionMessage(transaction, session);
1067
1090
 
1068
1091
  return static_cast<int>(result);
1069
1092
}
1070
1093
 
1071
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1072
 
                                        message::Statement::Type in_type,
1073
 
                                        Session *in_session)
 
1095
                                               message::Statement::Type type,
 
1096
                                               Session::const_reference session)
1074
1097
{
1075
 
  statement.set_type(in_type);
1076
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1098
  statement.set_type(type);
 
1099
  statement.set_start_timestamp(session.getCurrentTimestamp());
1077
1100
 
1078
 
  if (in_session->variables.replicate_query)
1079
 
    statement.set_sql(in_session->getQueryString()->c_str());
 
1101
  if (session.variables.replicate_query)
 
1102
    statement.set_sql(session.getQueryString()->c_str());
1080
1103
}
1081
1104
 
1082
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1083
 
                                            Session *in_session)
 
1106
                                                   Session::reference session)
1084
1107
{
1085
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1086
 
  in_session->setStatementMessage(NULL);
 
1108
  statement.set_end_timestamp(session.getCurrentTimestamp());
 
1109
  session.setStatementMessage(NULL);
1087
1110
}
1088
1111
 
1089
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1090
1113
{
1091
1114
  ReplicationServices &replication_services= ReplicationServices::singleton();
1092
1115
  if (! replication_services.isActive())
1093
1116
    return;
1094
1117
  
1095
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1118
  message::Transaction *transaction= getActiveTransactionMessage(session);
1096
1119
 
1097
1120
  /*
1098
1121
   * OK, so there are two situations that we need to deal with here:
1115
1138
  {
1116
1139
    /* Remember the transaction ID so we can re-use it */
1117
1140
    uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1141
    uint32_t seg_id= transaction->segment_id();
1118
1142
 
1119
1143
    /*
1120
1144
     * Clear the transaction, create a Rollback statement message, 
1121
1145
     * attach it to the transaction, and push it to replicators.
1122
1146
     */
1123
1147
    transaction->Clear();
1124
 
    initTransactionMessage(*transaction, in_session, false);
 
1148
    initTransactionMessage(*transaction, session, false);
1125
1149
 
1126
1150
    /* Set the transaction ID to match the previous messages */
1127
1151
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1152
    transaction->set_segment_id(seg_id);
 
1153
    transaction->set_end_segment(true);
1128
1154
 
1129
1155
    message::Statement *statement= transaction->add_statement();
1130
1156
 
1131
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1132
 
    finalizeStatementMessage(*statement, in_session);
 
1157
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
 
1158
    finalizeStatementMessage(*statement, session);
1133
1159
 
1134
 
    finalizeTransactionMessage(*transaction, in_session);
 
1160
    finalizeTransactionMessage(*transaction, session);
1135
1161
    
1136
 
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1137
 
  }
1138
 
  cleanupTransactionMessage(transaction, in_session);
1139
 
}
1140
 
 
1141
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1142
 
                                                            Table *in_table,
 
1162
    (void) replication_services.pushTransactionMessage(session, *transaction);
 
1163
  }
 
1164
 
 
1165
  cleanupTransactionMessage(transaction, session);
 
1166
}
 
1167
 
 
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
 
1169
{
 
1170
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1171
  if (! replication_services.isActive())
 
1172
    return;
 
1173
 
 
1174
  message::Statement *current_statement= session.getStatementMessage();
 
1175
 
 
1176
  /* If we never added a Statement message, nothing to undo. */
 
1177
  if (current_statement == NULL)
 
1178
    return;
 
1179
 
 
1180
  /*
 
1181
   * If the Statement has been segmented, then we've already pushed a portion
 
1182
   * of this Statement's row changes through the replication stream and we
 
1183
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
 
1184
   * delete the current Statement message.
 
1185
   */
 
1186
  bool is_segmented= false;
 
1187
 
 
1188
  switch (current_statement->type())
 
1189
  {
 
1190
    case message::Statement::INSERT:
 
1191
      if (current_statement->insert_data().segment_id() > 1)
 
1192
        is_segmented= true;
 
1193
      break;
 
1194
 
 
1195
    case message::Statement::UPDATE:
 
1196
      if (current_statement->update_data().segment_id() > 1)
 
1197
        is_segmented= true;
 
1198
      break;
 
1199
 
 
1200
    case message::Statement::DELETE:
 
1201
      if (current_statement->delete_data().segment_id() > 1)
 
1202
        is_segmented= true;
 
1203
      break;
 
1204
 
 
1205
    default:
 
1206
      break;
 
1207
  }
 
1208
 
 
1209
  /*
 
1210
   * Remove the Statement message we've been working with (same as
 
1211
   * current_statement).
 
1212
   */
 
1213
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1214
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
 
1215
  statements_in_txn= transaction->mutable_statement();
 
1216
  statements_in_txn->RemoveLast();
 
1217
  session.setStatementMessage(NULL);
 
1218
  
 
1219
  /*
 
1220
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
 
1221
   * an indicator to cancel the previous Statement message which should have
 
1222
   * had its end_segment attribute set to false.
 
1223
   */
 
1224
  if (is_segmented)
 
1225
  {
 
1226
    current_statement= transaction->add_statement();
 
1227
    initStatementMessage(*current_statement,
 
1228
                         message::Statement::ROLLBACK_STATEMENT,
 
1229
                         session);
 
1230
    finalizeStatementMessage(*current_statement, session);
 
1231
  }
 
1232
}
 
1233
 
 
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
 
1235
                                                                     message::Transaction *transaction)
 
1236
{
 
1237
  uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1238
  uint32_t seg_id= transaction->segment_id();
 
1239
  
 
1240
  transaction->set_end_segment(false);
 
1241
  commitTransactionMessage(session);
 
1242
  transaction= getActiveTransactionMessage(session, false);
 
1243
  
 
1244
  /* Set the transaction ID to match the previous messages */
 
1245
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1246
  transaction->set_segment_id(seg_id + 1);
 
1247
  transaction->set_end_segment(true);
 
1248
 
 
1249
  return transaction;
 
1250
}
 
1251
 
 
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
 
1253
                                                            Table &table,
1143
1254
                                                            uint32_t *next_segment_id)
1144
1255
{
1145
 
  message::Statement *statement= in_session->getStatementMessage();
 
1256
  message::Statement *statement= session.getStatementMessage();
1146
1257
  message::Transaction *transaction= NULL;
1147
 
 
1148
 
  /* 
1149
 
   * Check the type for the current Statement message, if it is anything
1150
 
   * other then INSERT we need to call finalize, this will ensure a 
1151
 
   * new InsertStatement is created. If it is of type INSERT check
1152
 
   * what table the INSERT belongs to, if it is a different table
1153
 
   * call finalize, so a new InsertStatement can be created. 
 
1258
  
 
1259
  /*
 
1260
   * If statement is NULL, this is a new statement.
 
1261
   * If statement is NOT NULL, this a continuation of the same statement.
 
1262
   * This is because autocommitOrRollback() finalizes the statement so that
 
1263
   * we guarantee only one Statement message per statement (i.e., we no longer
 
1264
   * share a single GPB message for multiple statements).
1154
1265
   */
1155
 
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1156
 
  {
1157
 
    finalizeStatementMessage(*statement, in_session);
1158
 
    statement= in_session->getStatementMessage();
1159
 
  } 
1160
 
  else if (statement != NULL)
1161
 
  {
1162
 
    transaction= getActiveTransactionMessage(in_session);
1163
 
 
 
1266
  if (statement == NULL)
 
1267
  {
 
1268
    transaction= getActiveTransactionMessage(session);
 
1269
 
 
1270
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1271
        transaction_message_threshold)
 
1272
    {
 
1273
      transaction= segmentTransactionMessage(session, transaction);
 
1274
    }
 
1275
 
 
1276
    statement= transaction->add_statement();
 
1277
    setInsertHeader(*statement, session, table);
 
1278
    session.setStatementMessage(statement);
 
1279
  }
 
1280
  else
 
1281
  {
 
1282
    transaction= getActiveTransactionMessage(session);
 
1283
    
1164
1284
    /*
1165
1285
     * If we've passed our threshold for the statement size (possible for
1166
1286
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1167
1287
     * the Transaction will keep it from getting huge).
1168
1288
     */
1169
1289
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1170
 
      in_session->variables.transaction_message_threshold)
 
1290
        transaction_message_threshold)
1171
1291
    {
1172
1292
      /* Remember the transaction ID so we can re-use it */
1173
1293
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1174
 
 
 
1294
      uint32_t seg_id= transaction->segment_id();
 
1295
      
1175
1296
      message::InsertData *current_data= statement->mutable_insert_data();
1176
 
 
 
1297
      
1177
1298
      /* Caller should use this value when adding a new record */
1178
1299
      *next_segment_id= current_data->segment_id() + 1;
1179
 
 
 
1300
      
1180
1301
      current_data->set_end_segment(false);
1181
 
 
 
1302
      transaction->set_end_segment(false);
 
1303
      
1182
1304
      /* 
1183
1305
       * Send the trx message to replicators after finalizing the 
1184
1306
       * statement and transaction. This will also set the Transaction
1185
1307
       * and Statement objects in Session to NULL.
1186
1308
       */
1187
 
      commitTransactionMessage(in_session);
1188
 
 
 
1309
      commitTransactionMessage(session);
 
1310
      
1189
1311
      /*
1190
1312
       * Statement and Transaction should now be NULL, so new ones will get
1191
1313
       * created. We reuse the transaction id since we are segmenting
1192
1314
       * one transaction.
1193
1315
       */
1194
 
      statement= in_session->getStatementMessage();
1195
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1316
      transaction= getActiveTransactionMessage(session, false);
1196
1317
      assert(transaction != NULL);
1197
1318
 
 
1319
      statement= transaction->add_statement();
 
1320
      setInsertHeader(*statement, session, table);
 
1321
      session.setStatementMessage(statement);
 
1322
            
1198
1323
      /* Set the transaction ID to match the previous messages */
1199
1324
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1325
      transaction->set_segment_id(seg_id + 1);
 
1326
      transaction->set_end_segment(true);
1200
1327
    }
1201
1328
    else
1202
1329
    {
1203
 
      const message::InsertHeader &insert_header= statement->insert_header();
1204
 
      string old_table_name= insert_header.table_metadata().table_name();
1205
 
     
1206
 
      string current_table_name;
1207
 
      (void) in_table->getShare()->getTableName(current_table_name);
1208
 
 
1209
 
      if (current_table_name.compare(old_table_name))
1210
 
      {
1211
 
        finalizeStatementMessage(*statement, in_session);
1212
 
        statement= in_session->getStatementMessage();
1213
 
      }
1214
 
      else
1215
 
      {
1216
 
        /* carry forward the existing segment id */
1217
 
        const message::InsertData &current_data= statement->insert_data();
1218
 
        *next_segment_id= current_data.segment_id();
1219
 
      }
 
1330
      /*
 
1331
       * Continuation of the same statement. Carry forward the existing
 
1332
       * segment id.
 
1333
       */
 
1334
      const message::InsertData &current_data= statement->insert_data();
 
1335
      *next_segment_id= current_data.segment_id();
1220
1336
    }
1221
 
  } 
1222
 
 
1223
 
  if (statement == NULL)
1224
 
  {
1225
 
    /*
1226
 
     * Transaction will be non-NULL only if we had to segment it due to
1227
 
     * transaction size above.
1228
 
     */
1229
 
    if (transaction == NULL)
1230
 
      transaction= getActiveTransactionMessage(in_session);
1231
 
 
1232
 
    /* 
1233
 
     * Transaction message initialized and set, but no statement created
1234
 
     * yet.  We construct one and initialize it, here, then return the
1235
 
     * message after attaching the new Statement message pointer to the 
1236
 
     * Session for easy retrieval later...
1237
 
     */
1238
 
    statement= transaction->add_statement();
1239
 
    setInsertHeader(*statement, in_session, in_table);
1240
 
    in_session->setStatementMessage(statement);
1241
1337
  }
 
1338
  
1242
1339
  return *statement;
1243
1340
}
1244
1341
 
1245
1342
void TransactionServices::setInsertHeader(message::Statement &statement,
1246
 
                                          Session *in_session,
1247
 
                                          Table *in_table)
 
1343
                                          Session::const_reference session,
 
1344
                                          Table &table)
1248
1345
{
1249
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1346
  initStatementMessage(statement, message::Statement::INSERT, session);
1250
1347
 
1251
1348
  /* 
1252
1349
   * Now we construct the specialized InsertHeader message inside
1257
1354
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1258
1355
 
1259
1356
  string schema_name;
1260
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1357
  (void) table.getShare()->getSchemaName(schema_name);
1261
1358
  string table_name;
1262
 
  (void) in_table->getShare()->getTableName(table_name);
 
1359
  (void) table.getShare()->getTableName(table_name);
1263
1360
 
1264
1361
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1265
1362
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1266
1363
 
1267
1364
  Field *current_field;
1268
 
  Field **table_fields= in_table->getFields();
 
1365
  Field **table_fields= table.getFields();
1269
1366
 
1270
1367
  message::FieldMetadata *field_metadata;
1271
1368
 
1272
1369
  /* We will read all the table's fields... */
1273
 
  in_table->setReadSet();
 
1370
  table.setReadSet();
1274
1371
 
1275
1372
  while ((current_field= *table_fields++) != NULL) 
1276
1373
  {
1280
1377
  }
1281
1378
}
1282
1379
 
1283
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1380
bool TransactionServices::insertRecord(Session::reference session,
 
1381
                                       Table &table)
1284
1382
{
1285
1383
  ReplicationServices &replication_services= ReplicationServices::singleton();
1286
1384
  if (! replication_services.isActive())
1287
1385
    return false;
 
1386
 
1288
1387
  /**
1289
1388
   * We do this check here because we don't want to even create a 
1290
1389
   * statement if there isn't a primary key on the table...
1293
1392
   *
1294
1393
   * Multi-column primary keys are handled how exactly?
1295
1394
   */
1296
 
  if (not in_table->getShare()->hasPrimaryKey())
 
1395
  if (not table.getShare()->hasPrimaryKey())
1297
1396
  {
1298
1397
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1299
1398
    return true;
1300
1399
  }
1301
1400
 
1302
1401
  uint32_t next_segment_id= 1;
1303
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1402
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1304
1403
 
1305
1404
  message::InsertData *data= statement.mutable_insert_data();
1306
1405
  data->set_segment_id(next_segment_id);
1308
1407
  message::InsertRecord *record= data->add_record();
1309
1408
 
1310
1409
  Field *current_field;
1311
 
  Field **table_fields= in_table->getFields();
 
1410
  Field **table_fields= table.getFields();
1312
1411
 
1313
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1412
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1314
1413
  string_value->set_charset(system_charset_info);
1315
1414
 
1316
1415
  /* We will read all the table's fields... */
1317
 
  in_table->setReadSet();
 
1416
  table.setReadSet();
1318
1417
 
1319
1418
  while ((current_field= *table_fields++) != NULL) 
1320
1419
  {
1325
1424
    } 
1326
1425
    else 
1327
1426
    {
1328
 
      string_value= current_field->val_str(string_value);
 
1427
      string_value= current_field->val_str_internal(string_value);
1329
1428
      record->add_is_null(false);
1330
1429
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1331
1430
      string_value->free();
1334
1433
  return false;
1335
1434
}
1336
1435
 
1337
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1338
 
                                                            Table *in_table,
 
1436
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
 
1437
                                                            Table &table,
1339
1438
                                                            const unsigned char *old_record, 
1340
1439
                                                            const unsigned char *new_record,
1341
1440
                                                            uint32_t *next_segment_id)
1342
1441
{
1343
 
  message::Statement *statement= in_session->getStatementMessage();
 
1442
  message::Statement *statement= session.getStatementMessage();
1344
1443
  message::Transaction *transaction= NULL;
1345
1444
 
1346
1445
  /*
1347
 
   * Check the type for the current Statement message, if it is anything
1348
 
   * other then UPDATE we need to call finalize, this will ensure a
1349
 
   * new UpdateStatement is created. If it is of type UPDATE check
1350
 
   * what table the UPDATE belongs to, if it is a different table
1351
 
   * call finalize, so a new UpdateStatement can be created.
 
1446
   * If statement is NULL, this is a new statement.
 
1447
   * If statement is NOT NULL, this a continuation of the same statement.
 
1448
   * This is because autocommitOrRollback() finalizes the statement so that
 
1449
   * we guarantee only one Statement message per statement (i.e., we no longer
 
1450
   * share a single GPB message for multiple statements).
1352
1451
   */
1353
 
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
 
1452
  if (statement == NULL)
1354
1453
  {
1355
 
    finalizeStatementMessage(*statement, in_session);
1356
 
    statement= in_session->getStatementMessage();
 
1454
    transaction= getActiveTransactionMessage(session);
 
1455
    
 
1456
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1457
        transaction_message_threshold)
 
1458
    {
 
1459
      transaction= segmentTransactionMessage(session, transaction);
 
1460
    }
 
1461
    
 
1462
    statement= transaction->add_statement();
 
1463
    setUpdateHeader(*statement, session, table, old_record, new_record);
 
1464
    session.setStatementMessage(statement);
1357
1465
  }
1358
 
  else if (statement != NULL)
 
1466
  else
1359
1467
  {
1360
 
    transaction= getActiveTransactionMessage(in_session);
1361
 
 
 
1468
    transaction= getActiveTransactionMessage(session);
 
1469
    
1362
1470
    /*
1363
1471
     * If we've passed our threshold for the statement size (possible for
1364
1472
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1365
1473
     * the Transaction will keep it from getting huge).
1366
1474
     */
1367
1475
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1368
 
      in_session->variables.transaction_message_threshold)
 
1476
        transaction_message_threshold)
1369
1477
    {
1370
1478
      /* Remember the transaction ID so we can re-use it */
1371
1479
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1372
 
 
 
1480
      uint32_t seg_id= transaction->segment_id();
 
1481
      
1373
1482
      message::UpdateData *current_data= statement->mutable_update_data();
1374
 
 
 
1483
      
1375
1484
      /* Caller should use this value when adding a new record */
1376
1485
      *next_segment_id= current_data->segment_id() + 1;
1377
 
 
 
1486
      
1378
1487
      current_data->set_end_segment(false);
1379
 
 
1380
 
      /*
 
1488
      transaction->set_end_segment(false);
 
1489
      
 
1490
      /* 
1381
1491
       * Send the trx message to replicators after finalizing the 
1382
1492
       * statement and transaction. This will also set the Transaction
1383
1493
       * and Statement objects in Session to NULL.
1384
1494
       */
1385
 
      commitTransactionMessage(in_session);
1386
 
 
 
1495
      commitTransactionMessage(session);
 
1496
      
1387
1497
      /*
1388
1498
       * Statement and Transaction should now be NULL, so new ones will get
1389
1499
       * created. We reuse the transaction id since we are segmenting
1390
1500
       * one transaction.
1391
1501
       */
1392
 
      statement= in_session->getStatementMessage();
1393
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1502
      transaction= getActiveTransactionMessage(session, false);
1394
1503
      assert(transaction != NULL);
1395
 
 
 
1504
      
 
1505
      statement= transaction->add_statement();
 
1506
      setUpdateHeader(*statement, session, table, old_record, new_record);
 
1507
      session.setStatementMessage(statement);
 
1508
      
1396
1509
      /* Set the transaction ID to match the previous messages */
1397
1510
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1511
      transaction->set_segment_id(seg_id + 1);
 
1512
      transaction->set_end_segment(true);
1398
1513
    }
1399
1514
    else
1400
1515
    {
1401
 
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1402
 
      {
1403
 
        /* carry forward the existing segment id */
1404
 
        const message::UpdateData &current_data= statement->update_data();
1405
 
        *next_segment_id= current_data.segment_id();
1406
 
      } 
1407
 
      else 
1408
 
      {
1409
 
        finalizeStatementMessage(*statement, in_session);
1410
 
        statement= in_session->getStatementMessage();
1411
 
      }
 
1516
      /*
 
1517
       * Continuation of the same statement. Carry forward the existing
 
1518
       * segment id.
 
1519
       */
 
1520
      const message::UpdateData &current_data= statement->update_data();
 
1521
      *next_segment_id= current_data.segment_id();
1412
1522
    }
1413
1523
  }
1414
 
 
1415
 
  if (statement == NULL)
1416
 
  {
1417
 
    /*
1418
 
     * Transaction will be non-NULL only if we had to segment it due to
1419
 
     * transaction size above.
1420
 
     */
1421
 
    if (transaction == NULL)
1422
 
      transaction= getActiveTransactionMessage(in_session);
1423
 
 
1424
 
    /* 
1425
 
     * Transaction message initialized and set, but no statement created
1426
 
     * yet.  We construct one and initialize it, here, then return the
1427
 
     * message after attaching the new Statement message pointer to the 
1428
 
     * Session for easy retrieval later...
1429
 
     */
1430
 
    statement= transaction->add_statement();
1431
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1432
 
    in_session->setStatementMessage(statement);
1433
 
  }
 
1524
  
1434
1525
  return *statement;
1435
1526
}
1436
1527
 
1437
 
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1438
 
                                                  Table *in_table,
1439
 
                                                  const unsigned char *old_record,
1440
 
                                                  const unsigned char *new_record)
1441
 
{
1442
 
  const message::UpdateHeader &update_header= statement.update_header();
1443
 
  string old_table_name= update_header.table_metadata().table_name();
1444
 
 
1445
 
  string current_table_name;
1446
 
  (void) in_table->getShare()->getTableName(current_table_name);
1447
 
  if (current_table_name.compare(old_table_name))
1448
 
  {
1449
 
    return false;
1450
 
  }
1451
 
  else
1452
 
  {
1453
 
    /* Compare the set fields in the existing UpdateHeader and see if they
1454
 
     * match the updated fields in the new record, if they do not we must
1455
 
     * create a new UpdateHeader 
1456
 
     */
1457
 
    size_t num_set_fields= update_header.set_field_metadata_size();
1458
 
 
1459
 
    Field *current_field;
1460
 
    Field **table_fields= in_table->getFields();
1461
 
    in_table->setReadSet();
1462
 
 
1463
 
    size_t num_calculated_updated_fields= 0;
1464
 
    bool found= false;
1465
 
    while ((current_field= *table_fields++) != NULL)
1466
 
    {
1467
 
      if (num_calculated_updated_fields > num_set_fields)
1468
 
      {
1469
 
        break;
1470
 
      }
1471
 
 
1472
 
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
1473
 
      {
1474
 
        /* check that this field exists in the UpdateHeader record */
1475
 
        found= false;
1476
 
 
1477
 
        for (size_t x= 0; x < num_set_fields; ++x)
1478
 
        {
1479
 
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1480
 
          string name= field_metadata.name();
1481
 
          if (name.compare(current_field->field_name) == 0)
1482
 
          {
1483
 
            found= true;
1484
 
            ++num_calculated_updated_fields;
1485
 
            break;
1486
 
          } 
1487
 
        }
1488
 
        if (! found)
1489
 
        {
1490
 
          break;
1491
 
        } 
1492
 
      }
1493
 
    }
1494
 
 
1495
 
    if ((num_calculated_updated_fields == num_set_fields) && found)
1496
 
    {
1497
 
      return true;
1498
 
    } 
1499
 
    else 
1500
 
    {
1501
 
      return false;
1502
 
    }
1503
 
  }
1504
 
}  
1505
 
 
1506
1528
void TransactionServices::setUpdateHeader(message::Statement &statement,
1507
 
                                          Session *in_session,
1508
 
                                          Table *in_table,
 
1529
                                          Session::const_reference session,
 
1530
                                          Table &table,
1509
1531
                                          const unsigned char *old_record, 
1510
1532
                                          const unsigned char *new_record)
1511
1533
{
1512
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1534
  initStatementMessage(statement, message::Statement::UPDATE, session);
1513
1535
 
1514
1536
  /* 
1515
1537
   * Now we construct the specialized UpdateHeader message inside
1520
1542
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1521
1543
 
1522
1544
  string schema_name;
1523
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1545
  (void) table.getShare()->getSchemaName(schema_name);
1524
1546
  string table_name;
1525
 
  (void) in_table->getShare()->getTableName(table_name);
 
1547
  (void) table.getShare()->getTableName(table_name);
1526
1548
 
1527
1549
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1528
1550
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1529
1551
 
1530
1552
  Field *current_field;
1531
 
  Field **table_fields= in_table->getFields();
 
1553
  Field **table_fields= table.getFields();
1532
1554
 
1533
1555
  message::FieldMetadata *field_metadata;
1534
1556
 
1535
1557
  /* We will read all the table's fields... */
1536
 
  in_table->setReadSet();
 
1558
  table.setReadSet();
1537
1559
 
1538
1560
  while ((current_field= *table_fields++) != NULL) 
1539
1561
  {
1541
1563
     * We add the "key field metadata" -- i.e. the fields which is
1542
1564
     * the primary key for the table.
1543
1565
     */
1544
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1566
    if (table.getShare()->fieldInPrimaryKey(current_field))
1545
1567
    {
1546
1568
      field_metadata= header->add_key_field_metadata();
1547
1569
      field_metadata->set_name(current_field->field_name);
1548
1570
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1549
1571
    }
1550
1572
 
1551
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1573
    if (isFieldUpdated(current_field, table, old_record, new_record))
1552
1574
    {
1553
1575
      /* Field is changed from old to new */
1554
1576
      field_metadata= header->add_set_field_metadata();
1557
1579
    }
1558
1580
  }
1559
1581
}
1560
 
void TransactionServices::updateRecord(Session *in_session,
1561
 
                                       Table *in_table, 
 
1582
void TransactionServices::updateRecord(Session::reference session,
 
1583
                                       Table &table, 
1562
1584
                                       const unsigned char *old_record, 
1563
1585
                                       const unsigned char *new_record)
1564
1586
{
1567
1589
    return;
1568
1590
 
1569
1591
  uint32_t next_segment_id= 1;
1570
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1592
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1571
1593
 
1572
1594
  message::UpdateData *data= statement.mutable_update_data();
1573
1595
  data->set_segment_id(next_segment_id);
1575
1597
  message::UpdateRecord *record= data->add_record();
1576
1598
 
1577
1599
  Field *current_field;
1578
 
  Field **table_fields= in_table->getFields();
1579
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1600
  Field **table_fields= table.getFields();
 
1601
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1580
1602
  string_value->set_charset(system_charset_info);
1581
1603
 
1582
1604
  while ((current_field= *table_fields++) != NULL) 
1592
1614
     *
1593
1615
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1594
1616
     */
1595
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1617
    if (isFieldUpdated(current_field, table, old_record, new_record))
1596
1618
    {
1597
1619
      /* Store the original "read bit" for this field */
1598
1620
      bool is_read_set= current_field->isReadSet();
1599
1621
 
1600
1622
      /* We need to mark that we will "read" this field... */
1601
 
      in_table->setReadSet(current_field->field_index);
 
1623
      table.setReadSet(current_field->position());
1602
1624
 
1603
1625
      /* Read the string value of this field's contents */
1604
 
      string_value= current_field->val_str(string_value);
 
1626
      string_value= current_field->val_str_internal(string_value);
1605
1627
 
1606
1628
      /* 
1607
1629
       * Reset the read bit after reading field to its original state.  This 
1627
1649
     * primary key field value.  Replication only supports tables
1628
1650
     * with a primary key.
1629
1651
     */
1630
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1652
    if (table.getShare()->fieldInPrimaryKey(current_field))
1631
1653
    {
1632
1654
      /**
1633
1655
       * To say the below is ugly is an understatement. But it works.
1634
1656
       * 
1635
1657
       * @todo Move this crap into a real Record API.
1636
1658
       */
1637
 
      string_value= current_field->val_str(string_value,
1638
 
                                           old_record + 
1639
 
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
1659
      string_value= current_field->val_str_internal(string_value,
 
1660
                                                    old_record + 
 
1661
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
1640
1662
      record->add_key_value(string_value->c_ptr(), string_value->length());
1641
1663
      string_value->free();
1642
1664
    }
1645
1667
}
1646
1668
 
1647
1669
bool TransactionServices::isFieldUpdated(Field *current_field,
1648
 
                                         Table *in_table,
 
1670
                                         Table &table,
1649
1671
                                         const unsigned char *old_record,
1650
1672
                                         const unsigned char *new_record)
1651
1673
{
1654
1676
   * we do this crazy pointer fiddling to figure out if the current field
1655
1677
   * has been updated in the supplied record raw byte pointers.
1656
1678
   */
1657
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1658
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1679
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1680
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1659
1681
 
1660
1682
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1661
1683
 
1685
1707
  return isUpdated;
1686
1708
}  
1687
1709
 
1688
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1689
 
                                                            Table *in_table,
 
1710
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
 
1711
                                                            Table &table,
1690
1712
                                                            uint32_t *next_segment_id)
1691
1713
{
1692
 
  message::Statement *statement= in_session->getStatementMessage();
 
1714
  message::Statement *statement= session.getStatementMessage();
1693
1715
  message::Transaction *transaction= NULL;
1694
1716
 
1695
1717
  /*
1696
 
   * Check the type for the current Statement message, if it is anything
1697
 
   * other then DELETE we need to call finalize, this will ensure a
1698
 
   * new DeleteStatement is created. If it is of type DELETE check
1699
 
   * what table the DELETE belongs to, if it is a different table
1700
 
   * call finalize, so a new DeleteStatement can be created.
 
1718
   * If statement is NULL, this is a new statement.
 
1719
   * If statement is NOT NULL, this a continuation of the same statement.
 
1720
   * This is because autocommitOrRollback() finalizes the statement so that
 
1721
   * we guarantee only one Statement message per statement (i.e., we no longer
 
1722
   * share a single GPB message for multiple statements).
1701
1723
   */
1702
 
  if (statement != NULL && statement->type() != message::Statement::DELETE)
 
1724
  if (statement == NULL)
1703
1725
  {
1704
 
    finalizeStatementMessage(*statement, in_session);
1705
 
    statement= in_session->getStatementMessage();
 
1726
    transaction= getActiveTransactionMessage(session);
 
1727
    
 
1728
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1729
        transaction_message_threshold)
 
1730
    {
 
1731
      transaction= segmentTransactionMessage(session, transaction);
 
1732
    }
 
1733
    
 
1734
    statement= transaction->add_statement();
 
1735
    setDeleteHeader(*statement, session, table);
 
1736
    session.setStatementMessage(statement);
1706
1737
  }
1707
 
  else if (statement != NULL)
 
1738
  else
1708
1739
  {
1709
 
    transaction= getActiveTransactionMessage(in_session);
1710
 
 
 
1740
    transaction= getActiveTransactionMessage(session);
 
1741
    
1711
1742
    /*
1712
1743
     * If we've passed our threshold for the statement size (possible for
1713
1744
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1714
1745
     * the Transaction will keep it from getting huge).
1715
1746
     */
1716
1747
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1717
 
      in_session->variables.transaction_message_threshold)
 
1748
        transaction_message_threshold)
1718
1749
    {
1719
1750
      /* Remember the transaction ID so we can re-use it */
1720
1751
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1721
 
 
 
1752
      uint32_t seg_id= transaction->segment_id();
 
1753
      
1722
1754
      message::DeleteData *current_data= statement->mutable_delete_data();
1723
 
 
 
1755
      
1724
1756
      /* Caller should use this value when adding a new record */
1725
1757
      *next_segment_id= current_data->segment_id() + 1;
1726
 
 
 
1758
      
1727
1759
      current_data->set_end_segment(false);
1728
 
 
 
1760
      transaction->set_end_segment(false);
 
1761
      
1729
1762
      /* 
1730
1763
       * Send the trx message to replicators after finalizing the 
1731
1764
       * statement and transaction. This will also set the Transaction
1732
1765
       * and Statement objects in Session to NULL.
1733
1766
       */
1734
 
      commitTransactionMessage(in_session);
1735
 
 
 
1767
      commitTransactionMessage(session);
 
1768
      
1736
1769
      /*
1737
1770
       * Statement and Transaction should now be NULL, so new ones will get
1738
1771
       * created. We reuse the transaction id since we are segmenting
1739
1772
       * one transaction.
1740
1773
       */
1741
 
      statement= in_session->getStatementMessage();
1742
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1774
      transaction= getActiveTransactionMessage(session, false);
1743
1775
      assert(transaction != NULL);
1744
 
 
 
1776
      
 
1777
      statement= transaction->add_statement();
 
1778
      setDeleteHeader(*statement, session, table);
 
1779
      session.setStatementMessage(statement);
 
1780
      
1745
1781
      /* Set the transaction ID to match the previous messages */
1746
1782
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1783
      transaction->set_segment_id(seg_id + 1);
 
1784
      transaction->set_end_segment(true);
1747
1785
    }
1748
1786
    else
1749
1787
    {
1750
 
      const message::DeleteHeader &delete_header= statement->delete_header();
1751
 
      string old_table_name= delete_header.table_metadata().table_name();
1752
 
 
1753
 
      string current_table_name;
1754
 
      (void) in_table->getShare()->getTableName(current_table_name);
1755
 
      if (current_table_name.compare(old_table_name))
1756
 
      {
1757
 
        finalizeStatementMessage(*statement, in_session);
1758
 
        statement= in_session->getStatementMessage();
1759
 
      }
1760
 
      else
1761
 
      {
1762
 
        /* carry forward the existing segment id */
1763
 
        const message::DeleteData &current_data= statement->delete_data();
1764
 
        *next_segment_id= current_data.segment_id();
1765
 
      }
 
1788
      /*
 
1789
       * Continuation of the same statement. Carry forward the existing
 
1790
       * segment id.
 
1791
       */
 
1792
      const message::DeleteData &current_data= statement->delete_data();
 
1793
      *next_segment_id= current_data.segment_id();
1766
1794
    }
1767
1795
  }
1768
 
 
1769
 
  if (statement == NULL)
1770
 
  {
1771
 
    /*
1772
 
     * Transaction will be non-NULL only if we had to segment it due to
1773
 
     * transaction size above.
1774
 
     */
1775
 
    if (transaction == NULL)
1776
 
      transaction= getActiveTransactionMessage(in_session);
1777
 
 
1778
 
    /* 
1779
 
     * Transaction message initialized and set, but no statement created
1780
 
     * yet.  We construct one and initialize it, here, then return the
1781
 
     * message after attaching the new Statement message pointer to the 
1782
 
     * Session for easy retrieval later...
1783
 
     */
1784
 
    statement= transaction->add_statement();
1785
 
    setDeleteHeader(*statement, in_session, in_table);
1786
 
    in_session->setStatementMessage(statement);
1787
 
  }
 
1796
  
1788
1797
  return *statement;
1789
1798
}
1790
1799
 
1791
1800
void TransactionServices::setDeleteHeader(message::Statement &statement,
1792
 
                                          Session *in_session,
1793
 
                                          Table *in_table)
 
1801
                                          Session::const_reference session,
 
1802
                                          Table &table)
1794
1803
{
1795
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1804
  initStatementMessage(statement, message::Statement::DELETE, session);
1796
1805
 
1797
1806
  /* 
1798
1807
   * Now we construct the specialized DeleteHeader message inside
1802
1811
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1803
1812
 
1804
1813
  string schema_name;
1805
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1814
  (void) table.getShare()->getSchemaName(schema_name);
1806
1815
  string table_name;
1807
 
  (void) in_table->getShare()->getTableName(table_name);
 
1816
  (void) table.getShare()->getTableName(table_name);
1808
1817
 
1809
1818
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1810
1819
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1811
1820
 
1812
1821
  Field *current_field;
1813
 
  Field **table_fields= in_table->getFields();
 
1822
  Field **table_fields= table.getFields();
1814
1823
 
1815
1824
  message::FieldMetadata *field_metadata;
1816
1825
 
1821
1830
     * primary key field value.  Replication only supports tables
1822
1831
     * with a primary key.
1823
1832
     */
1824
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1833
    if (table.getShare()->fieldInPrimaryKey(current_field))
1825
1834
    {
1826
1835
      field_metadata= header->add_key_field_metadata();
1827
1836
      field_metadata->set_name(current_field->field_name);
1830
1839
  }
1831
1840
}
1832
1841
 
1833
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1842
void TransactionServices::deleteRecord(Session::reference session,
 
1843
                                       Table &table,
 
1844
                                       bool use_update_record)
1834
1845
{
1835
1846
  ReplicationServices &replication_services= ReplicationServices::singleton();
1836
1847
  if (! replication_services.isActive())
1837
1848
    return;
1838
1849
 
1839
1850
  uint32_t next_segment_id= 1;
1840
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1851
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1841
1852
 
1842
1853
  message::DeleteData *data= statement.mutable_delete_data();
1843
1854
  data->set_segment_id(next_segment_id);
1845
1856
  message::DeleteRecord *record= data->add_record();
1846
1857
 
1847
1858
  Field *current_field;
1848
 
  Field **table_fields= in_table->getFields();
1849
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1859
  Field **table_fields= table.getFields();
 
1860
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1850
1861
  string_value->set_charset(system_charset_info);
1851
1862
 
1852
1863
  while ((current_field= *table_fields++) != NULL) 
1856
1867
     * primary key field value.  Replication only supports tables
1857
1868
     * with a primary key.
1858
1869
     */
1859
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1870
    if (table.getShare()->fieldInPrimaryKey(current_field))
1860
1871
    {
1861
1872
      if (use_update_record)
1862
1873
      {
1868
1879
         * We are careful not to change anything in old_ptr.
1869
1880
         */
1870
1881
        const unsigned char *old_ptr= current_field->ptr;
1871
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1872
 
        string_value= current_field->val_str(string_value);
 
1882
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
 
1883
        string_value= current_field->val_str_internal(string_value);
1873
1884
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1874
1885
      }
1875
1886
      else
1876
1887
      {
1877
 
        string_value= current_field->val_str(string_value);
 
1888
        string_value= current_field->val_str_internal(string_value);
1878
1889
        /**
1879
1890
         * @TODO Store optional old record value in the before data member
1880
1891
         */
1885
1896
  }
1886
1897
}
1887
1898
 
1888
 
 
1889
 
/**
1890
 
 * Template for removing Statement records of different types.
1891
 
 *
1892
 
 * The code for removing records from different Statement message types
1893
 
 * is identical except for the class types that are embedded within the
1894
 
 * Statement.
1895
 
 *
1896
 
 * There are 3 scenarios we need to look for:
1897
 
 *   - We've been asked to remove more records than exist in the Statement
1898
 
 *   - We've been asked to remove less records than exist in the Statement
1899
 
 *   - We've been asked to remove ALL records that exist in the Statement
1900
 
 *
1901
 
 * If we are removing ALL records, then effectively we would be left with
1902
 
 * an empty Statement message, so we should just remove it and clean up
1903
 
 * message pointers in the Session object.
1904
 
 */
1905
 
template <class DataType, class RecordType>
1906
 
static bool removeStatementRecordsWithType(Session *session,
1907
 
                                           DataType *data,
1908
 
                                           uint32_t count)
1909
 
{
1910
 
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1911
 
 
1912
 
  /* If there aren't enough records to remove 'count' of them, error. */
1913
 
  if (num_avail_recs < count)
1914
 
    return false;
1915
 
 
1916
 
  /*
1917
 
   * If we are removing all of the data records, we'll just remove this
1918
 
   * entire Statement message.
1919
 
   */
1920
 
  if (num_avail_recs == count)
1921
 
  {
1922
 
    message::Transaction *transaction= session->getTransactionMessage();
1923
 
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1924
 
    statements->RemoveLast();
1925
 
 
1926
 
    /*
1927
 
     * Now need to set the Session Statement pointer to either the previous
1928
 
     * Statement, or NULL if there isn't one.
1929
 
     */
1930
 
    if (statements->size() == 0)
1931
 
    {
1932
 
      session->setStatementMessage(NULL);
1933
 
    }
1934
 
    else
1935
 
    {
1936
 
      /*
1937
 
       * There isn't a great way to get a pointer to the previous Statement
1938
 
       * message using the RepeatedPtrField object, so we'll just get to it
1939
 
       * using the Transaction message.
1940
 
       */
1941
 
      int last_stmt_idx= transaction->statement_size() - 1;
1942
 
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1943
 
    }
1944
 
  }
1945
 
  /* We only need to remove 'count' records */
1946
 
  else if (num_avail_recs > count)
1947
 
  {
1948
 
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1949
 
    while (count--)
1950
 
      records->RemoveLast();
1951
 
  }
1952
 
 
1953
 
  return true;
1954
 
}
1955
 
 
1956
 
 
1957
 
bool TransactionServices::removeStatementRecords(Session *session,
1958
 
                                                 uint32_t count)
1959
 
{
1960
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1961
 
  if (! replication_services.isActive())
1962
 
    return false;
1963
 
 
1964
 
  /* Get the most current Statement */
1965
 
  message::Statement *statement= session->getStatementMessage();
1966
 
 
1967
 
  /* Make sure we have work to do */
1968
 
  if (statement == NULL)
1969
 
    return false;
1970
 
 
1971
 
  bool retval= false;
1972
 
 
1973
 
  switch (statement->type())
1974
 
  {
1975
 
    case message::Statement::INSERT:
1976
 
    {
1977
 
      message::InsertData *data= statement->mutable_insert_data();
1978
 
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1979
 
      break;
1980
 
    }
1981
 
 
1982
 
    case message::Statement::UPDATE:
1983
 
    {
1984
 
      message::UpdateData *data= statement->mutable_update_data();
1985
 
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1986
 
      break;
1987
 
    }
1988
 
 
1989
 
    case message::Statement::DELETE:  /* not sure if this one is possible... */
1990
 
    {
1991
 
      message::DeleteData *data= statement->mutable_delete_data();
1992
 
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
1993
 
      break;
1994
 
    }
1995
 
 
1996
 
    default:
1997
 
      retval= false;
1998
 
      break;
1999
 
  }
2000
 
 
2001
 
  return retval;
2002
 
}
2003
 
 
2004
 
 
2005
 
void TransactionServices::createTable(Session *in_session,
 
1899
void TransactionServices::createTable(Session::reference session,
2006
1900
                                      const message::Table &table)
2007
1901
{
2008
1902
  ReplicationServices &replication_services= ReplicationServices::singleton();
2009
1903
  if (! replication_services.isActive())
2010
1904
    return;
2011
1905
  
2012
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1906
  message::Transaction *transaction= getActiveTransactionMessage(session);
2013
1907
  message::Statement *statement= transaction->add_statement();
2014
1908
 
2015
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1909
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
2016
1910
 
2017
1911
  /* 
2018
1912
   * Construct the specialized CreateTableStatement message and attach
2022
1916
  message::Table *new_table_message= create_table_statement->mutable_table();
2023
1917
  *new_table_message= table;
2024
1918
 
2025
 
  finalizeStatementMessage(*statement, in_session);
 
1919
  finalizeStatementMessage(*statement, session);
2026
1920
 
2027
 
  finalizeTransactionMessage(*transaction, in_session);
 
1921
  finalizeTransactionMessage(*transaction, session);
2028
1922
  
2029
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1923
  (void) replication_services.pushTransactionMessage(session, *transaction);
2030
1924
 
2031
 
  cleanupTransactionMessage(transaction, in_session);
 
1925
  cleanupTransactionMessage(transaction, session);
2032
1926
 
2033
1927
}
2034
1928
 
2035
 
void TransactionServices::createSchema(Session *in_session,
 
1929
void TransactionServices::createSchema(Session::reference session,
2036
1930
                                       const message::Schema &schema)
2037
1931
{
2038
1932
  ReplicationServices &replication_services= ReplicationServices::singleton();
2039
1933
  if (! replication_services.isActive())
2040
1934
    return;
2041
1935
  
2042
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1936
  message::Transaction *transaction= getActiveTransactionMessage(session);
2043
1937
  message::Statement *statement= transaction->add_statement();
2044
1938
 
2045
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1939
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
2046
1940
 
2047
1941
  /* 
2048
1942
   * Construct the specialized CreateSchemaStatement message and attach
2052
1946
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2053
1947
  *new_schema_message= schema;
2054
1948
 
2055
 
  finalizeStatementMessage(*statement, in_session);
 
1949
  finalizeStatementMessage(*statement, session);
2056
1950
 
2057
 
  finalizeTransactionMessage(*transaction, in_session);
 
1951
  finalizeTransactionMessage(*transaction, session);
2058
1952
  
2059
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1953
  (void) replication_services.pushTransactionMessage(session, *transaction);
2060
1954
 
2061
 
  cleanupTransactionMessage(transaction, in_session);
 
1955
  cleanupTransactionMessage(transaction, session);
2062
1956
 
2063
1957
}
2064
1958
 
2065
 
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
 
1959
void TransactionServices::dropSchema(Session::reference session,
 
1960
                                     identifier::Schema::const_reference identifier)
2066
1961
{
2067
1962
  ReplicationServices &replication_services= ReplicationServices::singleton();
2068
1963
  if (! replication_services.isActive())
2069
1964
    return;
2070
1965
  
2071
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1966
  message::Transaction *transaction= getActiveTransactionMessage(session);
2072
1967
  message::Statement *statement= transaction->add_statement();
2073
1968
 
2074
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1969
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
2075
1970
 
2076
1971
  /* 
2077
1972
   * Construct the specialized DropSchemaStatement message and attach
2079
1974
   */
2080
1975
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2081
1976
 
2082
 
  drop_schema_statement->set_schema_name(schema_name);
2083
 
 
2084
 
  finalizeStatementMessage(*statement, in_session);
2085
 
 
2086
 
  finalizeTransactionMessage(*transaction, in_session);
2087
 
  
2088
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2089
 
 
2090
 
  cleanupTransactionMessage(transaction, in_session);
2091
 
}
2092
 
 
2093
 
void TransactionServices::dropTable(Session *in_session,
2094
 
                                    const string &schema_name,
2095
 
                                    const string &table_name,
 
1977
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
 
1978
 
 
1979
  finalizeStatementMessage(*statement, session);
 
1980
 
 
1981
  finalizeTransactionMessage(*transaction, session);
 
1982
  
 
1983
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1984
 
 
1985
  cleanupTransactionMessage(transaction, session);
 
1986
}
 
1987
 
 
1988
void TransactionServices::alterSchema(Session::reference session,
 
1989
                                      const message::schema::shared_ptr &old_schema,
 
1990
                                      const message::Schema &new_schema)
 
1991
{
 
1992
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1993
  if (! replication_services.isActive())
 
1994
    return;
 
1995
  
 
1996
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1997
  message::Statement *statement= transaction->add_statement();
 
1998
 
 
1999
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
 
2000
 
 
2001
  /* 
 
2002
   * Construct the specialized AlterSchemaStatement message and attach
 
2003
   * it to the generic Statement message
 
2004
   */
 
2005
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
 
2006
 
 
2007
  message::Schema *before= alter_schema_statement->mutable_before();
 
2008
  message::Schema *after= alter_schema_statement->mutable_after();
 
2009
 
 
2010
  *before= *old_schema;
 
2011
  *after= new_schema;
 
2012
 
 
2013
  finalizeStatementMessage(*statement, session);
 
2014
 
 
2015
  finalizeTransactionMessage(*transaction, session);
 
2016
  
 
2017
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2018
 
 
2019
  cleanupTransactionMessage(transaction, session);
 
2020
}
 
2021
 
 
2022
void TransactionServices::dropTable(Session::reference session,
 
2023
                                    const identifier::Table &table,
2096
2024
                                    bool if_exists)
2097
2025
{
2098
2026
  ReplicationServices &replication_services= ReplicationServices::singleton();
2099
2027
  if (! replication_services.isActive())
2100
2028
    return;
2101
2029
  
2102
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2030
  message::Transaction *transaction= getActiveTransactionMessage(session);
2103
2031
  message::Statement *statement= transaction->add_statement();
2104
2032
 
2105
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
2033
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2106
2034
 
2107
2035
  /* 
2108
2036
   * Construct the specialized DropTableStatement message and attach
2114
2042
 
2115
2043
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2116
2044
 
2117
 
  table_metadata->set_schema_name(schema_name);
2118
 
  table_metadata->set_table_name(table_name);
2119
 
 
2120
 
  finalizeStatementMessage(*statement, in_session);
2121
 
 
2122
 
  finalizeTransactionMessage(*transaction, in_session);
 
2045
  table_metadata->set_schema_name(table.getSchemaName());
 
2046
  table_metadata->set_table_name(table.getTableName());
 
2047
 
 
2048
  finalizeStatementMessage(*statement, session);
 
2049
 
 
2050
  finalizeTransactionMessage(*transaction, session);
2123
2051
  
2124
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2052
  (void) replication_services.pushTransactionMessage(session, *transaction);
2125
2053
 
2126
 
  cleanupTransactionMessage(transaction, in_session);
 
2054
  cleanupTransactionMessage(transaction, session);
2127
2055
}
2128
2056
 
2129
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
2057
void TransactionServices::truncateTable(Session::reference session,
 
2058
                                        Table &table)
2130
2059
{
2131
2060
  ReplicationServices &replication_services= ReplicationServices::singleton();
2132
2061
  if (! replication_services.isActive())
2133
2062
    return;
2134
2063
  
2135
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2064
  message::Transaction *transaction= getActiveTransactionMessage(session);
2136
2065
  message::Statement *statement= transaction->add_statement();
2137
2066
 
2138
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
2067
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2139
2068
 
2140
2069
  /* 
2141
2070
   * Construct the specialized TruncateTableStatement message and attach
2145
2074
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2146
2075
 
2147
2076
  string schema_name;
2148
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
2077
  (void) table.getShare()->getSchemaName(schema_name);
2149
2078
  string table_name;
2150
 
  (void) in_table->getShare()->getTableName(table_name);
 
2079
  (void) table.getShare()->getTableName(table_name);
2151
2080
 
2152
2081
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2153
2082
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2154
2083
 
2155
 
  finalizeStatementMessage(*statement, in_session);
 
2084
  finalizeStatementMessage(*statement, session);
2156
2085
 
2157
 
  finalizeTransactionMessage(*transaction, in_session);
 
2086
  finalizeTransactionMessage(*transaction, session);
2158
2087
  
2159
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2088
  (void) replication_services.pushTransactionMessage(session, *transaction);
2160
2089
 
2161
 
  cleanupTransactionMessage(transaction, in_session);
 
2090
  cleanupTransactionMessage(transaction, session);
2162
2091
}
2163
2092
 
2164
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
2093
void TransactionServices::rawStatement(Session::reference session,
 
2094
                                       const string &query)
2165
2095
{
2166
2096
  ReplicationServices &replication_services= ReplicationServices::singleton();
2167
2097
  if (! replication_services.isActive())
2168
2098
    return;
2169
2099
 
2170
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2100
  message::Transaction *transaction= getActiveTransactionMessage(session);
2171
2101
  message::Statement *statement= transaction->add_statement();
2172
2102
 
2173
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
2103
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2174
2104
  statement->set_sql(query);
2175
 
  finalizeStatementMessage(*statement, in_session);
 
2105
  finalizeStatementMessage(*statement, session);
2176
2106
 
2177
 
  finalizeTransactionMessage(*transaction, in_session);
 
2107
  finalizeTransactionMessage(*transaction, session);
2178
2108
  
2179
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2109
  (void) replication_services.pushTransactionMessage(session, *transaction);
2180
2110
 
2181
 
  cleanupTransactionMessage(transaction, in_session);
 
2111
  cleanupTransactionMessage(transaction, session);
2182
2112
}
2183
2113
 
2184
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2114
int TransactionServices::sendEvent(Session::reference session,
 
2115
                                   const message::Event &event)
2185
2116
{
2186
2117
  ReplicationServices &replication_services= ReplicationServices::singleton();
2187
2118
  if (! replication_services.isActive())
2199
2130
 
2200
2131
  trx_event->CopyFrom(event);
2201
2132
 
2202
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2133
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2203
2134
 
2204
2135
  delete transaction;
2205
2136
 
2206
2137
  return static_cast<int>(result);
2207
2138
}
2208
2139
 
2209
 
bool TransactionServices::sendStartupEvent(Session *session)
 
2140
bool TransactionServices::sendStartupEvent(Session::reference session)
2210
2141
{
2211
2142
  message::Event event;
2212
2143
  event.set_type(message::Event::STARTUP);
2215
2146
  return true;
2216
2147
}
2217
2148
 
2218
 
bool TransactionServices::sendShutdownEvent(Session *session)
 
2149
bool TransactionServices::sendShutdownEvent(Session::reference session)
2219
2150
{
2220
2151
  message::Event event;
2221
2152
  event.set_type(message::Event::SHUTDOWN);