~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Lee Bieber
  • Date: 2010-11-14 23:15:42 UTC
  • mfrom: (1929.1.42 warning-stack-frame)
  • Revision ID: kalebral@gmail.com-20101114231542-fnnu6ydd2p17n582
Merge Monty - fix bug 672372: some functions use > 32k stack

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  This program is free software; you can redistribute it and/or modify
8
8
 *  it under the terms of the GNU General Public License as published by
64
64
#include "drizzled/lock.h"
65
65
#include "drizzled/item/int.h"
66
66
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/epoch.h"
 
67
#include "drizzled/field/timestamp.h"
68
68
#include "drizzled/plugin/client.h"
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
316
void TransactionServices::registerResourceForStatement(Session *session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session.transaction.stmt;
333
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session->transaction.stmt;
 
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
348
void TransactionServices::registerResourceForStatement(Session *session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session.transaction.stmt;
366
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session->transaction.stmt;
 
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
382
void TransactionServices::registerResourceForTransaction(Session *session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session.transaction.all;
387
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session->transaction.all;
 
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session.transaction.xid_state.xid.is_null())
404
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
403
  if (session->transaction.xid_state.xid.is_null())
 
404
    session->transaction.xid_state.xid.set(session->getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session.transaction.all;
417
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session->transaction.all;
 
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session.transaction.xid_state.xid.is_null())
434
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
433
  if (session->transaction.xid_state.xid.is_null())
 
434
    session->transaction.xid_state.xid.set(session->getQueryId());
435
435
 
436
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
457
{
458
 
  if (session.getXaId() == 0)
 
458
  if (session->getXaId() == 0)
459
459
  {
460
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
461
461
  }
462
462
 
463
 
  return session.getXaId();
 
463
  return session->getXaId();
464
464
}
465
465
 
466
 
int TransactionServices::commitTransaction(Session::reference session,
467
 
                                           bool normal_transaction)
 
466
/**
 
467
  @retval
 
468
    0   ok
 
469
  @retval
 
470
    1   transaction was rolled back
 
471
  @retval
 
472
    2   error during commit, data may be inconsistent
 
473
 
 
474
  @todo
 
475
    Since we don't support nested statement transactions in 5.0,
 
476
    we can't commit or rollback stmt transactions while we are inside
 
477
    stored functions or triggers. So we simply do nothing now.
 
478
    TODO: This should be fixed in later ( >= 5.1) releases.
 
479
*/
 
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
468
481
{
469
482
  int error= 0, cookie= 0;
470
483
  /*
471
484
    'all' means that this is either an explicit commit issued by
472
485
    user, or an implicit commit issued by a DDL.
473
486
  */
474
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
475
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
476
489
 
477
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
478
491
 
479
492
  /*
480
493
    We must not commit the normal transaction if a statement
482
495
    flags will not get propagated to its normal transaction's
483
496
    counterpart.
484
497
  */
485
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
486
 
              trans == &session.transaction.stmt);
 
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
499
              trans == &session->transaction.stmt);
487
500
 
488
501
  if (resource_contexts.empty() == false)
489
502
  {
490
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
491
504
    {
492
505
      rollbackTransaction(session, normal_transaction);
493
506
      return 1;
518
531
 
519
532
        if (resource->participatesInXaTransaction())
520
533
        {
521
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
522
535
          {
523
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
524
537
            error= 1;
525
538
          }
526
539
          else
527
540
          {
528
 
            session.status_var.ha_prepare_count++;
 
541
            session->status_var.ha_prepare_count++;
529
542
          }
530
543
        }
531
544
      }
547
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
548
561
end:
549
562
    if (is_real_trans)
550
 
      session.startWaitingGlobalReadLock();
 
563
      session->startWaitingGlobalReadLock();
551
564
  }
552
565
  return error;
553
566
}
556
569
  @note
557
570
  This function does not care about global read lock. A caller should.
558
571
*/
559
 
int TransactionServices::commitPhaseOne(Session::reference session,
560
 
                                        bool normal_transaction)
 
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
561
573
{
562
574
  int error=0;
563
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
564
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
565
577
 
566
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
567
 
  bool all= normal_transaction;
568
 
 
569
 
  /* If we're in autocommit then we have a real transaction to commit
570
 
     (except if it's BEGIN)
571
 
  */
572
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
573
 
    all= true;
 
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
574
579
 
575
580
  if (resource_contexts.empty() == false)
576
581
  {
585
590
 
586
591
      if (resource->participatesInXaTransaction())
587
592
      {
588
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
589
594
        {
590
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
591
596
          error= 1;
592
597
        }
593
598
        else if (normal_transaction)
594
599
        {
595
 
          session.status_var.ha_commit_count++;
 
600
          session->status_var.ha_commit_count++;
596
601
        }
597
602
      }
598
603
      else if (resource->participatesInSqlTransaction())
599
604
      {
600
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
601
606
        {
602
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
603
608
          error= 1;
604
609
        }
605
610
        else if (normal_transaction)
606
611
        {
607
 
          session.status_var.ha_commit_count++;
 
612
          session->status_var.ha_commit_count++;
608
613
        }
609
614
      }
610
615
      resource_context->reset(); /* keep it conveniently zero-filled */
611
616
    }
612
617
 
613
618
    if (is_real_trans)
614
 
      session.transaction.xid_state.xid.null();
 
619
      session->transaction.xid_state.xid.null();
615
620
 
616
621
    if (normal_transaction)
617
622
    {
618
 
      session.variables.tx_isolation= session.session_tx_isolation;
619
 
      session.transaction.cleanup();
 
623
      session->variables.tx_isolation= session->session_tx_isolation;
 
624
      session->transaction.cleanup();
620
625
    }
621
626
  }
622
627
  trans->reset();
623
628
  return error;
624
629
}
625
630
 
626
 
int TransactionServices::rollbackTransaction(Session::reference session,
627
 
                                             bool normal_transaction)
 
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
628
632
{
629
633
  int error= 0;
630
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
631
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
632
636
 
633
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
635
638
 
636
639
  /*
637
640
    We must not rollback the normal transaction if a statement
638
641
    transaction is pending.
639
642
  */
640
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
641
 
              trans == &session.transaction.stmt);
 
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
644
              trans == &session->transaction.stmt);
642
645
 
643
646
  if (resource_contexts.empty() == false)
644
647
  {
653
656
 
654
657
      if (resource->participatesInXaTransaction())
655
658
      {
656
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
657
660
        {
658
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
659
662
          error= 1;
660
663
        }
661
664
        else if (normal_transaction)
662
665
        {
663
 
          session.status_var.ha_rollback_count++;
 
666
          session->status_var.ha_rollback_count++;
664
667
        }
665
668
      }
666
669
      else if (resource->participatesInSqlTransaction())
667
670
      {
668
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
669
672
        {
670
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
671
674
          error= 1;
672
675
        }
673
676
        else if (normal_transaction)
674
677
        {
675
 
          session.status_var.ha_rollback_count++;
 
678
          session->status_var.ha_rollback_count++;
676
679
        }
677
680
      }
678
681
      resource_context->reset(); /* keep it conveniently zero-filled */
685
688
     * a rollback statement with the corresponding transaction ID
686
689
     * to rollback.
687
690
     */
688
 
    if (all)
 
691
    if (normal_transaction)
689
692
      rollbackTransactionMessage(session);
690
 
    else
691
 
      rollbackStatementMessage(session);
692
693
 
693
694
    if (is_real_trans)
694
 
      session.transaction.xid_state.xid.null();
 
695
      session->transaction.xid_state.xid.null();
695
696
    if (normal_transaction)
696
697
    {
697
 
      session.variables.tx_isolation=session.session_tx_isolation;
698
 
      session.transaction.cleanup();
 
698
      session->variables.tx_isolation=session->session_tx_isolation;
 
699
      session->transaction.cleanup();
699
700
    }
700
701
  }
701
702
  if (normal_transaction)
702
 
    session.transaction_rollback_request= false;
 
703
    session->transaction_rollback_request= false;
703
704
 
704
705
  /*
705
706
   * If a non-transactional table was updated, warn the user
706
707
   */
707
708
  if (is_real_trans &&
708
 
      session.transaction.all.hasModifiedNonTransData() &&
709
 
      session.getKilled() != Session::KILL_CONNECTION)
 
709
      session->transaction.all.hasModifiedNonTransData() &&
 
710
      session->getKilled() != Session::KILL_CONNECTION)
710
711
  {
711
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
712
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
712
713
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
713
714
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
714
715
  }
716
717
  return error;
717
718
}
718
719
 
719
 
int TransactionServices::autocommitOrRollback(Session::reference session,
720
 
                                              int error)
 
720
/**
 
721
  This is used to commit or rollback a single statement depending on
 
722
  the value of error.
 
723
 
 
724
  @note
 
725
    Note that if the autocommit is on, then the following call inside
 
726
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
727
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
728
    the user has used LOCK TABLES then that mechanism does not know to do the
 
729
    commit.
 
730
*/
 
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
721
732
{
722
 
  /* One GPB Statement message per SQL statement */
723
 
  message::Statement *statement= session.getStatementMessage();
724
 
  if ((statement != NULL) && (! error))
725
 
    finalizeStatementMessage(*statement, session);
726
733
 
727
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
 
734
  if (session->transaction.stmt.getResourceContexts().empty() == false)
728
735
  {
729
 
    TransactionContext *trans = &session.transaction.stmt;
 
736
    TransactionContext *trans = &session->transaction.stmt;
730
737
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
731
738
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
732
739
         it != resource_contexts.end();
734
741
    {
735
742
      ResourceContext *resource_context= *it;
736
743
 
737
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
 
744
      resource_context->getTransactionalStorageEngine()->endStatement(session);
738
745
    }
739
746
 
740
747
    if (! error)
745
752
    else
746
753
    {
747
754
      (void) rollbackTransaction(session, false);
748
 
      if (session.transaction_rollback_request)
749
 
      {
 
755
      if (session->transaction_rollback_request)
750
756
        (void) rollbackTransaction(session, true);
751
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
752
 
      }
753
757
    }
754
758
 
755
 
    session.variables.tx_isolation= session.session_tx_isolation;
 
759
    session->variables.tx_isolation= session->session_tx_isolation;
756
760
  }
757
 
 
758
761
  return error;
759
762
}
760
763
 
768
771
  }
769
772
};
770
773
 
771
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
772
 
                                             NamedSavepoint &sv)
 
774
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
773
775
{
774
776
  int error= 0;
775
 
  TransactionContext *trans= &session.transaction.all;
 
777
  TransactionContext *trans= &session->transaction.all;
776
778
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
777
779
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
778
780
 
792
794
 
793
795
    if (resource->participatesInSqlTransaction())
794
796
    {
795
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
797
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
796
798
      {
797
799
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
798
800
        error= 1;
799
801
      }
800
802
      else
801
803
      {
802
 
        session.status_var.ha_savepoint_rollback_count++;
 
804
        session->status_var.ha_savepoint_rollback_count++;
803
805
      }
804
806
    }
805
807
    trans->no_2pc|= not resource->participatesInXaTransaction();
849
851
 
850
852
      if (resource->participatesInSqlTransaction())
851
853
      {
852
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
854
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
853
855
        {
854
856
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
855
857
          error= 1;
856
858
        }
857
859
        else
858
860
        {
859
 
          session.status_var.ha_rollback_count++;
 
861
          session->status_var.ha_rollback_count++;
860
862
        }
861
863
      }
862
864
      resource_context->reset(); /* keep it conveniently zero-filled */
879
881
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
880
882
      if (num_statements == 0)
881
883
      {    
882
 
        session.setStatementMessage(NULL);
 
884
        session->setStatementMessage(NULL);
883
885
      }    
884
886
      else 
885
887
      {
886
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
888
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
887
889
      }    
888
 
      session.setTransactionMessage(savepoint_transaction_copy);
 
890
      session->setTransactionMessage(savepoint_transaction_copy);
889
891
    }
890
892
  }
891
893
 
898
900
  section "4.33.4 SQL-statements and transaction states",
899
901
  NamedSavepoint is *not* transaction-initiating SQL-statement
900
902
*/
901
 
int TransactionServices::setSavepoint(Session::reference session,
902
 
                                      NamedSavepoint &sv)
 
903
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
903
904
{
904
905
  int error= 0;
905
 
  TransactionContext *trans= &session.transaction.all;
 
906
  TransactionContext *trans= &session->transaction.all;
906
907
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
907
908
 
908
909
  if (resource_contexts.empty() == false)
918
919
 
919
920
      if (resource->participatesInSqlTransaction())
920
921
      {
921
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
922
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
922
923
        {
923
924
          my_error(ER_GET_ERRNO, MYF(0), err);
924
925
          error= 1;
925
926
        }
926
927
        else
927
928
        {
928
 
          session.status_var.ha_savepoint_count++;
 
929
          session->status_var.ha_savepoint_count++;
929
930
        }
930
931
      }
931
932
    }
937
938
 
938
939
  if (shouldConstructMessages())
939
940
  {
940
 
    message::Transaction *transaction= session.getTransactionMessage();
 
941
    message::Transaction *transaction= session->getTransactionMessage();
941
942
                  
942
943
    if (transaction != NULL)
943
944
    {
950
951
  return error;
951
952
}
952
953
 
953
 
int TransactionServices::releaseSavepoint(Session::reference session,
954
 
                                          NamedSavepoint &sv)
 
954
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
955
955
{
956
956
  int error= 0;
957
957
 
968
968
 
969
969
    if (resource->participatesInSqlTransaction())
970
970
    {
971
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
971
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
972
972
      {
973
973
        my_error(ER_GET_ERRNO, MYF(0), err);
974
974
        error= 1;
985
985
  return replication_services.isActive();
986
986
}
987
987
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
989
 
                                                                       bool should_inc_trx_id)
 
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
990
989
{
991
 
  message::Transaction *transaction= session.getTransactionMessage();
 
990
  message::Transaction *transaction= in_session->getTransactionMessage();
992
991
 
993
992
  if (unlikely(transaction == NULL))
994
993
  {
998
997
     * deleting transaction message when done with it.
999
998
     */
1000
999
    transaction= new (nothrow) message::Transaction();
1001
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
 
    session.setTransactionMessage(transaction);
 
1000
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
1001
    in_session->setTransactionMessage(transaction);
1003
1002
    return transaction;
1004
1003
  }
1005
1004
  else
1006
1005
    return transaction;
1007
1006
}
1008
1007
 
1009
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
 
                                                 Session::reference session,
 
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1009
                                                 Session *in_session,
1011
1010
                                                 bool should_inc_trx_id)
1012
1011
{
1013
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
 
  trx->set_server_id(session.getServerId());
 
1012
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1013
  trx->set_server_id(in_session->getServerId());
1015
1014
 
1016
1015
  if (should_inc_trx_id)
1017
1016
  {
1018
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1019
 
    session.setXaId(0);
1020
 
  }
 
1017
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1018
    in_session->setXaId(0);
 
1019
  }  
1021
1020
  else
1022
 
  {
1023
 
    /* trx and seg id will get set properly elsewhere */
 
1021
  { 
1024
1022
    trx->set_transaction_id(0);
1025
1023
  }
1026
1024
 
1027
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1028
 
  
1029
 
  /* segment info may get set elsewhere as needed */
1030
 
  transaction.set_segment_id(1);
1031
 
  transaction.set_end_segment(true);
1032
 
}
1033
 
 
1034
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
 
                                                     Session::const_reference session)
1036
 
{
1037
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1039
 
}
1040
 
 
1041
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
 
                                                    Session::reference session)
1043
 
{
1044
 
  delete transaction;
1045
 
  session.setStatementMessage(NULL);
1046
 
  session.setTransactionMessage(NULL);
1047
 
  session.setXaId(0);
1048
 
}
1049
 
 
1050
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
1025
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1026
}
 
1027
 
 
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1029
                                              Session *in_session)
 
1030
{
 
1031
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1032
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1033
}
 
1034
 
 
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1036
                                             Session *in_session)
 
1037
{
 
1038
  delete in_transaction;
 
1039
  in_session->setStatementMessage(NULL);
 
1040
  in_session->setTransactionMessage(NULL);
 
1041
}
 
1042
 
 
1043
int TransactionServices::commitTransactionMessage(Session *in_session)
1051
1044
{
1052
1045
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1046
  if (! replication_services.isActive())
1054
1047
    return 0;
1055
1048
 
1056
 
  /*
1057
 
   * If no Transaction message was ever created, then no data modification
1058
 
   * occurred inside the transaction, so nothing to do.
1059
 
   */
1060
 
  if (session.getTransactionMessage() == NULL)
1061
 
    return 0;
1062
 
  
1063
 
  /* If there is an active statement message, finalize it. */
1064
 
  message::Statement *statement= session.getStatementMessage();
 
1049
  /* If there is an active statement message, finalize it */
 
1050
  message::Statement *statement= in_session->getStatementMessage();
1065
1051
 
1066
1052
  if (statement != NULL)
1067
1053
  {
1068
 
    finalizeStatementMessage(*statement, session);
1069
 
  }
1070
 
 
1071
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
1072
 
 
1073
 
  /*
1074
 
   * It is possible that we could have a Transaction without any Statements
1075
 
   * if we had created a Statement but had to roll it back due to it failing
1076
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
 
   * message. In this case, we simply clean up the message and not push it.
1078
 
   */
1079
 
  if (transaction->statement_size() == 0)
1080
 
  {
1081
 
    cleanupTransactionMessage(transaction, session);
1082
 
    return 0;
1083
 
  }
1084
 
  
1085
 
  finalizeTransactionMessage(*transaction, session);
1086
 
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1088
 
 
1089
 
  cleanupTransactionMessage(transaction, session);
 
1054
    finalizeStatementMessage(*statement, in_session);
 
1055
  }
 
1056
  else
 
1057
    return 0; /* No data modification occurred inside the transaction */
 
1058
  
 
1059
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1060
 
 
1061
  finalizeTransactionMessage(*transaction, in_session);
 
1062
  
 
1063
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1064
 
 
1065
  cleanupTransactionMessage(transaction, in_session);
1090
1066
 
1091
1067
  return static_cast<int>(result);
1092
1068
}
1093
1069
 
1094
1070
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
 
                                               message::Statement::Type type,
1096
 
                                               Session::const_reference session)
 
1071
                                        message::Statement::Type in_type,
 
1072
                                        Session *in_session)
1097
1073
{
1098
 
  statement.set_type(type);
1099
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
1100
 
 
1101
 
  if (session.variables.replicate_query)
1102
 
    statement.set_sql(session.getQueryString()->c_str());
 
1074
  statement.set_type(in_type);
 
1075
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1103
1076
}
1104
1077
 
1105
1078
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                                   Session::reference session)
 
1079
                                            Session *in_session)
1107
1080
{
1108
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1109
 
  session.setStatementMessage(NULL);
 
1081
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1082
  in_session->setStatementMessage(NULL);
1110
1083
}
1111
1084
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
1085
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1113
1086
{
1114
1087
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1088
  if (! replication_services.isActive())
1116
1089
    return;
1117
1090
  
1118
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1091
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1119
1092
 
1120
1093
  /*
1121
1094
   * OK, so there are two situations that we need to deal with here:
1138
1111
  {
1139
1112
    /* Remember the transaction ID so we can re-use it */
1140
1113
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
 
    uint32_t seg_id= transaction->segment_id();
1142
1114
 
1143
1115
    /*
1144
1116
     * Clear the transaction, create a Rollback statement message, 
1145
1117
     * attach it to the transaction, and push it to replicators.
1146
1118
     */
1147
1119
    transaction->Clear();
1148
 
    initTransactionMessage(*transaction, session, false);
 
1120
    initTransactionMessage(*transaction, in_session, false);
1149
1121
 
1150
1122
    /* Set the transaction ID to match the previous messages */
1151
1123
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
 
    transaction->set_segment_id(seg_id);
1153
 
    transaction->set_end_segment(true);
1154
1124
 
1155
1125
    message::Statement *statement= transaction->add_statement();
1156
1126
 
1157
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
 
    finalizeStatementMessage(*statement, session);
 
1127
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1128
    finalizeStatementMessage(*statement, in_session);
1159
1129
 
1160
 
    finalizeTransactionMessage(*transaction, session);
 
1130
    finalizeTransactionMessage(*transaction, in_session);
1161
1131
    
1162
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
1163
 
  }
1164
 
 
1165
 
  cleanupTransactionMessage(transaction, session);
1166
 
}
1167
 
 
1168
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
1169
 
{
1170
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1171
 
  if (! replication_services.isActive())
1172
 
    return;
1173
 
 
1174
 
  message::Statement *current_statement= session.getStatementMessage();
1175
 
 
1176
 
  /* If we never added a Statement message, nothing to undo. */
1177
 
  if (current_statement == NULL)
1178
 
    return;
1179
 
 
1180
 
  /*
1181
 
   * If the Statement has been segmented, then we've already pushed a portion
1182
 
   * of this Statement's row changes through the replication stream and we
1183
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
 
   * delete the current Statement message.
1185
 
   */
1186
 
  bool is_segmented= false;
1187
 
 
1188
 
  switch (current_statement->type())
1189
 
  {
1190
 
    case message::Statement::INSERT:
1191
 
      if (current_statement->insert_data().segment_id() > 1)
1192
 
        is_segmented= true;
1193
 
      break;
1194
 
 
1195
 
    case message::Statement::UPDATE:
1196
 
      if (current_statement->update_data().segment_id() > 1)
1197
 
        is_segmented= true;
1198
 
      break;
1199
 
 
1200
 
    case message::Statement::DELETE:
1201
 
      if (current_statement->delete_data().segment_id() > 1)
1202
 
        is_segmented= true;
1203
 
      break;
1204
 
 
1205
 
    default:
1206
 
      break;
1207
 
  }
1208
 
 
1209
 
  /*
1210
 
   * Remove the Statement message we've been working with (same as
1211
 
   * current_statement).
1212
 
   */
1213
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1214
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
 
  statements_in_txn= transaction->mutable_statement();
1216
 
  statements_in_txn->RemoveLast();
1217
 
  session.setStatementMessage(NULL);
1218
 
  
1219
 
  /*
1220
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
 
   * an indicator to cancel the previous Statement message which should have
1222
 
   * had its end_segment attribute set to false.
1223
 
   */
1224
 
  if (is_segmented)
1225
 
  {
1226
 
    current_statement= transaction->add_statement();
1227
 
    initStatementMessage(*current_statement,
1228
 
                         message::Statement::ROLLBACK_STATEMENT,
1229
 
                         session);
1230
 
    finalizeStatementMessage(*current_statement, session);
1231
 
  }
1232
 
}
1233
 
 
1234
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
 
                                                                     message::Transaction *transaction)
1236
 
{
1237
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
 
  uint32_t seg_id= transaction->segment_id();
1239
 
  
1240
 
  transaction->set_end_segment(false);
1241
 
  commitTransactionMessage(session);
1242
 
  transaction= getActiveTransactionMessage(session, false);
1243
 
  
1244
 
  /* Set the transaction ID to match the previous messages */
1245
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
 
  transaction->set_segment_id(seg_id + 1);
1247
 
  transaction->set_end_segment(true);
1248
 
 
1249
 
  return transaction;
1250
 
}
1251
 
 
1252
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1253
 
                                                            Table &table,
 
1132
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1133
  }
 
1134
  cleanupTransactionMessage(transaction, in_session);
 
1135
}
 
1136
 
 
1137
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1138
                                                            Table *in_table,
1254
1139
                                                            uint32_t *next_segment_id)
1255
1140
{
1256
 
  message::Statement *statement= session.getStatementMessage();
 
1141
  message::Statement *statement= in_session->getStatementMessage();
1257
1142
  message::Transaction *transaction= NULL;
1258
 
  
1259
 
  /*
1260
 
   * If statement is NULL, this is a new statement.
1261
 
   * If statement is NOT NULL, this a continuation of the same statement.
1262
 
   * This is because autocommitOrRollback() finalizes the statement so that
1263
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1264
 
   * share a single GPB message for multiple statements).
 
1143
 
 
1144
  /* 
 
1145
   * Check the type for the current Statement message, if it is anything
 
1146
   * other then INSERT we need to call finalize, this will ensure a 
 
1147
   * new InsertStatement is created. If it is of type INSERT check
 
1148
   * what table the INSERT belongs to, if it is a different table
 
1149
   * call finalize, so a new InsertStatement can be created. 
1265
1150
   */
1266
 
  if (statement == NULL)
1267
 
  {
1268
 
    transaction= getActiveTransactionMessage(session);
1269
 
 
1270
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1271
 
        transaction_message_threshold)
1272
 
    {
1273
 
      transaction= segmentTransactionMessage(session, transaction);
1274
 
    }
1275
 
 
1276
 
    statement= transaction->add_statement();
1277
 
    setInsertHeader(*statement, session, table);
1278
 
    session.setStatementMessage(statement);
1279
 
  }
1280
 
  else
1281
 
  {
1282
 
    transaction= getActiveTransactionMessage(session);
1283
 
    
 
1151
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1152
  {
 
1153
    finalizeStatementMessage(*statement, in_session);
 
1154
    statement= in_session->getStatementMessage();
 
1155
  } 
 
1156
  else if (statement != NULL)
 
1157
  {
 
1158
    transaction= getActiveTransactionMessage(in_session);
 
1159
 
1284
1160
    /*
1285
1161
     * If we've passed our threshold for the statement size (possible for
1286
1162
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1287
1163
     * the Transaction will keep it from getting huge).
1288
1164
     */
1289
1165
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1290
 
        transaction_message_threshold)
 
1166
      in_session->variables.transaction_message_threshold)
1291
1167
    {
1292
1168
      /* Remember the transaction ID so we can re-use it */
1293
1169
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
 
      uint32_t seg_id= transaction->segment_id();
1295
 
      
 
1170
 
1296
1171
      message::InsertData *current_data= statement->mutable_insert_data();
1297
 
      
 
1172
 
1298
1173
      /* Caller should use this value when adding a new record */
1299
1174
      *next_segment_id= current_data->segment_id() + 1;
1300
 
      
 
1175
 
1301
1176
      current_data->set_end_segment(false);
1302
 
      transaction->set_end_segment(false);
1303
 
      
 
1177
 
1304
1178
      /* 
1305
1179
       * Send the trx message to replicators after finalizing the 
1306
1180
       * statement and transaction. This will also set the Transaction
1307
1181
       * and Statement objects in Session to NULL.
1308
1182
       */
1309
 
      commitTransactionMessage(session);
1310
 
      
 
1183
      commitTransactionMessage(in_session);
 
1184
 
1311
1185
      /*
1312
1186
       * Statement and Transaction should now be NULL, so new ones will get
1313
1187
       * created. We reuse the transaction id since we are segmenting
1314
1188
       * one transaction.
1315
1189
       */
1316
 
      transaction= getActiveTransactionMessage(session, false);
 
1190
      statement= in_session->getStatementMessage();
 
1191
      transaction= getActiveTransactionMessage(in_session, false);
1317
1192
      assert(transaction != NULL);
1318
1193
 
1319
 
      statement= transaction->add_statement();
1320
 
      setInsertHeader(*statement, session, table);
1321
 
      session.setStatementMessage(statement);
1322
 
            
1323
1194
      /* Set the transaction ID to match the previous messages */
1324
1195
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
 
      transaction->set_segment_id(seg_id + 1);
1326
 
      transaction->set_end_segment(true);
1327
1196
    }
1328
1197
    else
1329
1198
    {
1330
 
      /*
1331
 
       * Continuation of the same statement. Carry forward the existing
1332
 
       * segment id.
1333
 
       */
1334
 
      const message::InsertData &current_data= statement->insert_data();
1335
 
      *next_segment_id= current_data.segment_id();
 
1199
      const message::InsertHeader &insert_header= statement->insert_header();
 
1200
      string old_table_name= insert_header.table_metadata().table_name();
 
1201
     
 
1202
      string current_table_name;
 
1203
      (void) in_table->getShare()->getTableName(current_table_name);
 
1204
 
 
1205
      if (current_table_name.compare(old_table_name))
 
1206
      {
 
1207
        finalizeStatementMessage(*statement, in_session);
 
1208
        statement= in_session->getStatementMessage();
 
1209
      }
 
1210
      else
 
1211
      {
 
1212
        /* carry forward the existing segment id */
 
1213
        const message::InsertData &current_data= statement->insert_data();
 
1214
        *next_segment_id= current_data.segment_id();
 
1215
      }
1336
1216
    }
 
1217
  } 
 
1218
 
 
1219
  if (statement == NULL)
 
1220
  {
 
1221
    /*
 
1222
     * Transaction will be non-NULL only if we had to segment it due to
 
1223
     * transaction size above.
 
1224
     */
 
1225
    if (transaction == NULL)
 
1226
      transaction= getActiveTransactionMessage(in_session);
 
1227
 
 
1228
    /* 
 
1229
     * Transaction message initialized and set, but no statement created
 
1230
     * yet.  We construct one and initialize it, here, then return the
 
1231
     * message after attaching the new Statement message pointer to the 
 
1232
     * Session for easy retrieval later...
 
1233
     */
 
1234
    statement= transaction->add_statement();
 
1235
    setInsertHeader(*statement, in_session, in_table);
 
1236
    in_session->setStatementMessage(statement);
1337
1237
  }
1338
 
  
1339
1238
  return *statement;
1340
1239
}
1341
1240
 
1342
1241
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
 
                                          Session::const_reference session,
1344
 
                                          Table &table)
 
1242
                                          Session *in_session,
 
1243
                                          Table *in_table)
1345
1244
{
1346
 
  initStatementMessage(statement, message::Statement::INSERT, session);
 
1245
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1347
1246
 
1348
1247
  /* 
1349
1248
   * Now we construct the specialized InsertHeader message inside
1354
1253
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1355
1254
 
1356
1255
  string schema_name;
1357
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1256
  (void) in_table->getShare()->getSchemaName(schema_name);
1358
1257
  string table_name;
1359
 
  (void) table.getShare()->getTableName(table_name);
 
1258
  (void) in_table->getShare()->getTableName(table_name);
1360
1259
 
1361
1260
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1362
1261
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1363
1262
 
1364
1263
  Field *current_field;
1365
 
  Field **table_fields= table.getFields();
 
1264
  Field **table_fields= in_table->getFields();
1366
1265
 
1367
1266
  message::FieldMetadata *field_metadata;
1368
1267
 
1369
1268
  /* We will read all the table's fields... */
1370
 
  table.setReadSet();
 
1269
  in_table->setReadSet();
1371
1270
 
1372
1271
  while ((current_field= *table_fields++) != NULL) 
1373
1272
  {
1377
1276
  }
1378
1277
}
1379
1278
 
1380
 
bool TransactionServices::insertRecord(Session::reference session,
1381
 
                                       Table &table)
 
1279
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1382
1280
{
1383
1281
  ReplicationServices &replication_services= ReplicationServices::singleton();
1384
1282
  if (! replication_services.isActive())
1385
1283
    return false;
1386
 
 
1387
1284
  /**
1388
1285
   * We do this check here because we don't want to even create a 
1389
1286
   * statement if there isn't a primary key on the table...
1392
1289
   *
1393
1290
   * Multi-column primary keys are handled how exactly?
1394
1291
   */
1395
 
  if (not table.getShare()->hasPrimaryKey())
 
1292
  if (not in_table->getShare()->hasPrimaryKey())
1396
1293
  {
1397
1294
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1398
1295
    return true;
1399
1296
  }
1400
1297
 
1401
1298
  uint32_t next_segment_id= 1;
1402
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
 
1299
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1403
1300
 
1404
1301
  message::InsertData *data= statement.mutable_insert_data();
1405
1302
  data->set_segment_id(next_segment_id);
1407
1304
  message::InsertRecord *record= data->add_record();
1408
1305
 
1409
1306
  Field *current_field;
1410
 
  Field **table_fields= table.getFields();
 
1307
  Field **table_fields= in_table->getFields();
1411
1308
 
1412
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1309
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1413
1310
  string_value->set_charset(system_charset_info);
1414
1311
 
1415
1312
  /* We will read all the table's fields... */
1416
 
  table.setReadSet();
 
1313
  in_table->setReadSet();
1417
1314
 
1418
1315
  while ((current_field= *table_fields++) != NULL) 
1419
1316
  {
1424
1321
    } 
1425
1322
    else 
1426
1323
    {
1427
 
      string_value= current_field->val_str_internal(string_value);
 
1324
      string_value= current_field->val_str(string_value);
1428
1325
      record->add_is_null(false);
1429
1326
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1430
1327
      string_value->free();
1433
1330
  return false;
1434
1331
}
1435
1332
 
1436
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1437
 
                                                            Table &table,
 
1333
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1334
                                                            Table *in_table,
1438
1335
                                                            const unsigned char *old_record, 
1439
1336
                                                            const unsigned char *new_record,
1440
1337
                                                            uint32_t *next_segment_id)
1441
1338
{
1442
 
  message::Statement *statement= session.getStatementMessage();
 
1339
  message::Statement *statement= in_session->getStatementMessage();
1443
1340
  message::Transaction *transaction= NULL;
1444
1341
 
1445
1342
  /*
1446
 
   * If statement is NULL, this is a new statement.
1447
 
   * If statement is NOT NULL, this a continuation of the same statement.
1448
 
   * This is because autocommitOrRollback() finalizes the statement so that
1449
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1450
 
   * share a single GPB message for multiple statements).
 
1343
   * Check the type for the current Statement message, if it is anything
 
1344
   * other then UPDATE we need to call finalize, this will ensure a
 
1345
   * new UpdateStatement is created. If it is of type UPDATE check
 
1346
   * what table the UPDATE belongs to, if it is a different table
 
1347
   * call finalize, so a new UpdateStatement can be created.
1451
1348
   */
1452
 
  if (statement == NULL)
 
1349
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1453
1350
  {
1454
 
    transaction= getActiveTransactionMessage(session);
1455
 
    
1456
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
 
        transaction_message_threshold)
1458
 
    {
1459
 
      transaction= segmentTransactionMessage(session, transaction);
1460
 
    }
1461
 
    
1462
 
    statement= transaction->add_statement();
1463
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1464
 
    session.setStatementMessage(statement);
 
1351
    finalizeStatementMessage(*statement, in_session);
 
1352
    statement= in_session->getStatementMessage();
1465
1353
  }
1466
 
  else
 
1354
  else if (statement != NULL)
1467
1355
  {
1468
 
    transaction= getActiveTransactionMessage(session);
1469
 
    
 
1356
    transaction= getActiveTransactionMessage(in_session);
 
1357
 
1470
1358
    /*
1471
1359
     * If we've passed our threshold for the statement size (possible for
1472
1360
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1473
1361
     * the Transaction will keep it from getting huge).
1474
1362
     */
1475
1363
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1476
 
        transaction_message_threshold)
 
1364
      in_session->variables.transaction_message_threshold)
1477
1365
    {
1478
1366
      /* Remember the transaction ID so we can re-use it */
1479
1367
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
 
      uint32_t seg_id= transaction->segment_id();
1481
 
      
 
1368
 
1482
1369
      message::UpdateData *current_data= statement->mutable_update_data();
1483
 
      
 
1370
 
1484
1371
      /* Caller should use this value when adding a new record */
1485
1372
      *next_segment_id= current_data->segment_id() + 1;
1486
 
      
 
1373
 
1487
1374
      current_data->set_end_segment(false);
1488
 
      transaction->set_end_segment(false);
1489
 
      
1490
 
      /* 
 
1375
 
 
1376
      /*
1491
1377
       * Send the trx message to replicators after finalizing the 
1492
1378
       * statement and transaction. This will also set the Transaction
1493
1379
       * and Statement objects in Session to NULL.
1494
1380
       */
1495
 
      commitTransactionMessage(session);
1496
 
      
 
1381
      commitTransactionMessage(in_session);
 
1382
 
1497
1383
      /*
1498
1384
       * Statement and Transaction should now be NULL, so new ones will get
1499
1385
       * created. We reuse the transaction id since we are segmenting
1500
1386
       * one transaction.
1501
1387
       */
1502
 
      transaction= getActiveTransactionMessage(session, false);
 
1388
      statement= in_session->getStatementMessage();
 
1389
      transaction= getActiveTransactionMessage(in_session, false);
1503
1390
      assert(transaction != NULL);
1504
 
      
1505
 
      statement= transaction->add_statement();
1506
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1507
 
      session.setStatementMessage(statement);
1508
 
      
 
1391
 
1509
1392
      /* Set the transaction ID to match the previous messages */
1510
1393
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
 
      transaction->set_segment_id(seg_id + 1);
1512
 
      transaction->set_end_segment(true);
1513
1394
    }
1514
1395
    else
1515
1396
    {
1516
 
      /*
1517
 
       * Continuation of the same statement. Carry forward the existing
1518
 
       * segment id.
1519
 
       */
1520
 
      const message::UpdateData &current_data= statement->update_data();
1521
 
      *next_segment_id= current_data.segment_id();
 
1397
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1398
      {
 
1399
        /* carry forward the existing segment id */
 
1400
        const message::UpdateData &current_data= statement->update_data();
 
1401
        *next_segment_id= current_data.segment_id();
 
1402
      } 
 
1403
      else 
 
1404
      {
 
1405
        finalizeStatementMessage(*statement, in_session);
 
1406
        statement= in_session->getStatementMessage();
 
1407
      }
1522
1408
    }
1523
1409
  }
1524
 
  
 
1410
 
 
1411
  if (statement == NULL)
 
1412
  {
 
1413
    /*
 
1414
     * Transaction will be non-NULL only if we had to segment it due to
 
1415
     * transaction size above.
 
1416
     */
 
1417
    if (transaction == NULL)
 
1418
      transaction= getActiveTransactionMessage(in_session);
 
1419
 
 
1420
    /* 
 
1421
     * Transaction message initialized and set, but no statement created
 
1422
     * yet.  We construct one and initialize it, here, then return the
 
1423
     * message after attaching the new Statement message pointer to the 
 
1424
     * Session for easy retrieval later...
 
1425
     */
 
1426
    statement= transaction->add_statement();
 
1427
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1428
    in_session->setStatementMessage(statement);
 
1429
  }
1525
1430
  return *statement;
1526
1431
}
1527
1432
 
 
1433
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1434
                                                  Table *in_table,
 
1435
                                                  const unsigned char *old_record,
 
1436
                                                  const unsigned char *new_record)
 
1437
{
 
1438
  const message::UpdateHeader &update_header= statement.update_header();
 
1439
  string old_table_name= update_header.table_metadata().table_name();
 
1440
 
 
1441
  string current_table_name;
 
1442
  (void) in_table->getShare()->getTableName(current_table_name);
 
1443
  if (current_table_name.compare(old_table_name))
 
1444
  {
 
1445
    return false;
 
1446
  }
 
1447
  else
 
1448
  {
 
1449
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1450
     * match the updated fields in the new record, if they do not we must
 
1451
     * create a new UpdateHeader 
 
1452
     */
 
1453
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1454
 
 
1455
    Field *current_field;
 
1456
    Field **table_fields= in_table->getFields();
 
1457
    in_table->setReadSet();
 
1458
 
 
1459
    size_t num_calculated_updated_fields= 0;
 
1460
    bool found= false;
 
1461
    while ((current_field= *table_fields++) != NULL)
 
1462
    {
 
1463
      if (num_calculated_updated_fields > num_set_fields)
 
1464
      {
 
1465
        break;
 
1466
      }
 
1467
 
 
1468
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1469
      {
 
1470
        /* check that this field exists in the UpdateHeader record */
 
1471
        found= false;
 
1472
 
 
1473
        for (size_t x= 0; x < num_set_fields; ++x)
 
1474
        {
 
1475
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1476
          string name= field_metadata.name();
 
1477
          if (name.compare(current_field->field_name) == 0)
 
1478
          {
 
1479
            found= true;
 
1480
            ++num_calculated_updated_fields;
 
1481
            break;
 
1482
          } 
 
1483
        }
 
1484
        if (! found)
 
1485
        {
 
1486
          break;
 
1487
        } 
 
1488
      }
 
1489
    }
 
1490
 
 
1491
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1492
    {
 
1493
      return true;
 
1494
    } 
 
1495
    else 
 
1496
    {
 
1497
      return false;
 
1498
    }
 
1499
  }
 
1500
}  
 
1501
 
1528
1502
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
 
                                          Session::const_reference session,
1530
 
                                          Table &table,
 
1503
                                          Session *in_session,
 
1504
                                          Table *in_table,
1531
1505
                                          const unsigned char *old_record, 
1532
1506
                                          const unsigned char *new_record)
1533
1507
{
1534
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
 
1508
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1535
1509
 
1536
1510
  /* 
1537
1511
   * Now we construct the specialized UpdateHeader message inside
1542
1516
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1543
1517
 
1544
1518
  string schema_name;
1545
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1519
  (void) in_table->getShare()->getSchemaName(schema_name);
1546
1520
  string table_name;
1547
 
  (void) table.getShare()->getTableName(table_name);
 
1521
  (void) in_table->getShare()->getTableName(table_name);
1548
1522
 
1549
1523
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1550
1524
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1551
1525
 
1552
1526
  Field *current_field;
1553
 
  Field **table_fields= table.getFields();
 
1527
  Field **table_fields= in_table->getFields();
1554
1528
 
1555
1529
  message::FieldMetadata *field_metadata;
1556
1530
 
1557
1531
  /* We will read all the table's fields... */
1558
 
  table.setReadSet();
 
1532
  in_table->setReadSet();
1559
1533
 
1560
1534
  while ((current_field= *table_fields++) != NULL) 
1561
1535
  {
1563
1537
     * We add the "key field metadata" -- i.e. the fields which is
1564
1538
     * the primary key for the table.
1565
1539
     */
1566
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1540
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1567
1541
    {
1568
1542
      field_metadata= header->add_key_field_metadata();
1569
1543
      field_metadata->set_name(current_field->field_name);
1570
1544
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1571
1545
    }
1572
1546
 
1573
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1547
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1574
1548
    {
1575
1549
      /* Field is changed from old to new */
1576
1550
      field_metadata= header->add_set_field_metadata();
1579
1553
    }
1580
1554
  }
1581
1555
}
1582
 
void TransactionServices::updateRecord(Session::reference session,
1583
 
                                       Table &table, 
 
1556
void TransactionServices::updateRecord(Session *in_session,
 
1557
                                       Table *in_table, 
1584
1558
                                       const unsigned char *old_record, 
1585
1559
                                       const unsigned char *new_record)
1586
1560
{
1589
1563
    return;
1590
1564
 
1591
1565
  uint32_t next_segment_id= 1;
1592
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
 
1566
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1593
1567
 
1594
1568
  message::UpdateData *data= statement.mutable_update_data();
1595
1569
  data->set_segment_id(next_segment_id);
1597
1571
  message::UpdateRecord *record= data->add_record();
1598
1572
 
1599
1573
  Field *current_field;
1600
 
  Field **table_fields= table.getFields();
1601
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1574
  Field **table_fields= in_table->getFields();
 
1575
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1602
1576
  string_value->set_charset(system_charset_info);
1603
1577
 
1604
1578
  while ((current_field= *table_fields++) != NULL) 
1614
1588
     *
1615
1589
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1616
1590
     */
1617
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1591
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1618
1592
    {
1619
1593
      /* Store the original "read bit" for this field */
1620
1594
      bool is_read_set= current_field->isReadSet();
1621
1595
 
1622
1596
      /* We need to mark that we will "read" this field... */
1623
 
      table.setReadSet(current_field->position());
 
1597
      in_table->setReadSet(current_field->field_index);
1624
1598
 
1625
1599
      /* Read the string value of this field's contents */
1626
 
      string_value= current_field->val_str_internal(string_value);
 
1600
      string_value= current_field->val_str(string_value);
1627
1601
 
1628
1602
      /* 
1629
1603
       * Reset the read bit after reading field to its original state.  This 
1649
1623
     * primary key field value.  Replication only supports tables
1650
1624
     * with a primary key.
1651
1625
     */
1652
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1626
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1653
1627
    {
1654
1628
      /**
1655
1629
       * To say the below is ugly is an understatement. But it works.
1656
1630
       * 
1657
1631
       * @todo Move this crap into a real Record API.
1658
1632
       */
1659
 
      string_value= current_field->val_str_internal(string_value,
1660
 
                                                    old_record + 
1661
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
 
1633
      string_value= current_field->val_str(string_value,
 
1634
                                           old_record + 
 
1635
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1662
1636
      record->add_key_value(string_value->c_ptr(), string_value->length());
1663
1637
      string_value->free();
1664
1638
    }
1667
1641
}
1668
1642
 
1669
1643
bool TransactionServices::isFieldUpdated(Field *current_field,
1670
 
                                         Table &table,
 
1644
                                         Table *in_table,
1671
1645
                                         const unsigned char *old_record,
1672
1646
                                         const unsigned char *new_record)
1673
1647
{
1676
1650
   * we do this crazy pointer fiddling to figure out if the current field
1677
1651
   * has been updated in the supplied record raw byte pointers.
1678
1652
   */
1679
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1680
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1653
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1654
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1681
1655
 
1682
1656
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1683
1657
 
1707
1681
  return isUpdated;
1708
1682
}  
1709
1683
 
1710
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1711
 
                                                            Table &table,
 
1684
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1685
                                                            Table *in_table,
1712
1686
                                                            uint32_t *next_segment_id)
1713
1687
{
1714
 
  message::Statement *statement= session.getStatementMessage();
 
1688
  message::Statement *statement= in_session->getStatementMessage();
1715
1689
  message::Transaction *transaction= NULL;
1716
1690
 
1717
1691
  /*
1718
 
   * If statement is NULL, this is a new statement.
1719
 
   * If statement is NOT NULL, this a continuation of the same statement.
1720
 
   * This is because autocommitOrRollback() finalizes the statement so that
1721
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1722
 
   * share a single GPB message for multiple statements).
 
1692
   * Check the type for the current Statement message, if it is anything
 
1693
   * other then DELETE we need to call finalize, this will ensure a
 
1694
   * new DeleteStatement is created. If it is of type DELETE check
 
1695
   * what table the DELETE belongs to, if it is a different table
 
1696
   * call finalize, so a new DeleteStatement can be created.
1723
1697
   */
1724
 
  if (statement == NULL)
 
1698
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1725
1699
  {
1726
 
    transaction= getActiveTransactionMessage(session);
1727
 
    
1728
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1729
 
        transaction_message_threshold)
1730
 
    {
1731
 
      transaction= segmentTransactionMessage(session, transaction);
1732
 
    }
1733
 
    
1734
 
    statement= transaction->add_statement();
1735
 
    setDeleteHeader(*statement, session, table);
1736
 
    session.setStatementMessage(statement);
 
1700
    finalizeStatementMessage(*statement, in_session);
 
1701
    statement= in_session->getStatementMessage();
1737
1702
  }
1738
 
  else
 
1703
  else if (statement != NULL)
1739
1704
  {
1740
 
    transaction= getActiveTransactionMessage(session);
1741
 
    
 
1705
    transaction= getActiveTransactionMessage(in_session);
 
1706
 
1742
1707
    /*
1743
1708
     * If we've passed our threshold for the statement size (possible for
1744
1709
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1745
1710
     * the Transaction will keep it from getting huge).
1746
1711
     */
1747
1712
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1748
 
        transaction_message_threshold)
 
1713
      in_session->variables.transaction_message_threshold)
1749
1714
    {
1750
1715
      /* Remember the transaction ID so we can re-use it */
1751
1716
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
 
      uint32_t seg_id= transaction->segment_id();
1753
 
      
 
1717
 
1754
1718
      message::DeleteData *current_data= statement->mutable_delete_data();
1755
 
      
 
1719
 
1756
1720
      /* Caller should use this value when adding a new record */
1757
1721
      *next_segment_id= current_data->segment_id() + 1;
1758
 
      
 
1722
 
1759
1723
      current_data->set_end_segment(false);
1760
 
      transaction->set_end_segment(false);
1761
 
      
 
1724
 
1762
1725
      /* 
1763
1726
       * Send the trx message to replicators after finalizing the 
1764
1727
       * statement and transaction. This will also set the Transaction
1765
1728
       * and Statement objects in Session to NULL.
1766
1729
       */
1767
 
      commitTransactionMessage(session);
1768
 
      
 
1730
      commitTransactionMessage(in_session);
 
1731
 
1769
1732
      /*
1770
1733
       * Statement and Transaction should now be NULL, so new ones will get
1771
1734
       * created. We reuse the transaction id since we are segmenting
1772
1735
       * one transaction.
1773
1736
       */
1774
 
      transaction= getActiveTransactionMessage(session, false);
 
1737
      statement= in_session->getStatementMessage();
 
1738
      transaction= getActiveTransactionMessage(in_session, false);
1775
1739
      assert(transaction != NULL);
1776
 
      
1777
 
      statement= transaction->add_statement();
1778
 
      setDeleteHeader(*statement, session, table);
1779
 
      session.setStatementMessage(statement);
1780
 
      
 
1740
 
1781
1741
      /* Set the transaction ID to match the previous messages */
1782
1742
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
 
      transaction->set_segment_id(seg_id + 1);
1784
 
      transaction->set_end_segment(true);
1785
1743
    }
1786
1744
    else
1787
1745
    {
1788
 
      /*
1789
 
       * Continuation of the same statement. Carry forward the existing
1790
 
       * segment id.
1791
 
       */
1792
 
      const message::DeleteData &current_data= statement->delete_data();
1793
 
      *next_segment_id= current_data.segment_id();
 
1746
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1747
      string old_table_name= delete_header.table_metadata().table_name();
 
1748
 
 
1749
      string current_table_name;
 
1750
      (void) in_table->getShare()->getTableName(current_table_name);
 
1751
      if (current_table_name.compare(old_table_name))
 
1752
      {
 
1753
        finalizeStatementMessage(*statement, in_session);
 
1754
        statement= in_session->getStatementMessage();
 
1755
      }
 
1756
      else
 
1757
      {
 
1758
        /* carry forward the existing segment id */
 
1759
        const message::DeleteData &current_data= statement->delete_data();
 
1760
        *next_segment_id= current_data.segment_id();
 
1761
      }
1794
1762
    }
1795
1763
  }
1796
 
  
 
1764
 
 
1765
  if (statement == NULL)
 
1766
  {
 
1767
    /*
 
1768
     * Transaction will be non-NULL only if we had to segment it due to
 
1769
     * transaction size above.
 
1770
     */
 
1771
    if (transaction == NULL)
 
1772
      transaction= getActiveTransactionMessage(in_session);
 
1773
 
 
1774
    /* 
 
1775
     * Transaction message initialized and set, but no statement created
 
1776
     * yet.  We construct one and initialize it, here, then return the
 
1777
     * message after attaching the new Statement message pointer to the 
 
1778
     * Session for easy retrieval later...
 
1779
     */
 
1780
    statement= transaction->add_statement();
 
1781
    setDeleteHeader(*statement, in_session, in_table);
 
1782
    in_session->setStatementMessage(statement);
 
1783
  }
1797
1784
  return *statement;
1798
1785
}
1799
1786
 
1800
1787
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
 
                                          Session::const_reference session,
1802
 
                                          Table &table)
 
1788
                                          Session *in_session,
 
1789
                                          Table *in_table)
1803
1790
{
1804
 
  initStatementMessage(statement, message::Statement::DELETE, session);
 
1791
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1805
1792
 
1806
1793
  /* 
1807
1794
   * Now we construct the specialized DeleteHeader message inside
1811
1798
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1812
1799
 
1813
1800
  string schema_name;
1814
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1801
  (void) in_table->getShare()->getSchemaName(schema_name);
1815
1802
  string table_name;
1816
 
  (void) table.getShare()->getTableName(table_name);
 
1803
  (void) in_table->getShare()->getTableName(table_name);
1817
1804
 
1818
1805
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1819
1806
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1820
1807
 
1821
1808
  Field *current_field;
1822
 
  Field **table_fields= table.getFields();
 
1809
  Field **table_fields= in_table->getFields();
1823
1810
 
1824
1811
  message::FieldMetadata *field_metadata;
1825
1812
 
1830
1817
     * primary key field value.  Replication only supports tables
1831
1818
     * with a primary key.
1832
1819
     */
1833
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1820
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1834
1821
    {
1835
1822
      field_metadata= header->add_key_field_metadata();
1836
1823
      field_metadata->set_name(current_field->field_name);
1839
1826
  }
1840
1827
}
1841
1828
 
1842
 
void TransactionServices::deleteRecord(Session::reference session,
1843
 
                                       Table &table,
1844
 
                                       bool use_update_record)
 
1829
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1845
1830
{
1846
1831
  ReplicationServices &replication_services= ReplicationServices::singleton();
1847
1832
  if (! replication_services.isActive())
1848
1833
    return;
1849
1834
 
1850
1835
  uint32_t next_segment_id= 1;
1851
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
 
1836
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1852
1837
 
1853
1838
  message::DeleteData *data= statement.mutable_delete_data();
1854
1839
  data->set_segment_id(next_segment_id);
1856
1841
  message::DeleteRecord *record= data->add_record();
1857
1842
 
1858
1843
  Field *current_field;
1859
 
  Field **table_fields= table.getFields();
1860
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1844
  Field **table_fields= in_table->getFields();
 
1845
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
1846
  string_value->set_charset(system_charset_info);
1862
1847
 
1863
1848
  while ((current_field= *table_fields++) != NULL) 
1867
1852
     * primary key field value.  Replication only supports tables
1868
1853
     * with a primary key.
1869
1854
     */
1870
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1855
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1871
1856
    {
1872
1857
      if (use_update_record)
1873
1858
      {
1879
1864
         * We are careful not to change anything in old_ptr.
1880
1865
         */
1881
1866
        const unsigned char *old_ptr= current_field->ptr;
1882
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1883
 
        string_value= current_field->val_str_internal(string_value);
 
1867
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1868
        string_value= current_field->val_str(string_value);
1884
1869
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1885
1870
      }
1886
1871
      else
1887
1872
      {
1888
 
        string_value= current_field->val_str_internal(string_value);
 
1873
        string_value= current_field->val_str(string_value);
1889
1874
        /**
1890
1875
         * @TODO Store optional old record value in the before data member
1891
1876
         */
1896
1881
  }
1897
1882
}
1898
1883
 
1899
 
void TransactionServices::createTable(Session::reference session,
 
1884
 
 
1885
/**
 
1886
 * Template for removing Statement records of different types.
 
1887
 *
 
1888
 * The code for removing records from different Statement message types
 
1889
 * is identical except for the class types that are embedded within the
 
1890
 * Statement.
 
1891
 *
 
1892
 * There are 3 scenarios we need to look for:
 
1893
 *   - We've been asked to remove more records than exist in the Statement
 
1894
 *   - We've been asked to remove less records than exist in the Statement
 
1895
 *   - We've been asked to remove ALL records that exist in the Statement
 
1896
 *
 
1897
 * If we are removing ALL records, then effectively we would be left with
 
1898
 * an empty Statement message, so we should just remove it and clean up
 
1899
 * message pointers in the Session object.
 
1900
 */
 
1901
template <class DataType, class RecordType>
 
1902
static bool removeStatementRecordsWithType(Session *session,
 
1903
                                           DataType *data,
 
1904
                                           uint32_t count)
 
1905
{
 
1906
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1907
 
 
1908
  /* If there aren't enough records to remove 'count' of them, error. */
 
1909
  if (num_avail_recs < count)
 
1910
    return false;
 
1911
 
 
1912
  /*
 
1913
   * If we are removing all of the data records, we'll just remove this
 
1914
   * entire Statement message.
 
1915
   */
 
1916
  if (num_avail_recs == count)
 
1917
  {
 
1918
    message::Transaction *transaction= session->getTransactionMessage();
 
1919
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1920
    statements->RemoveLast();
 
1921
 
 
1922
    /*
 
1923
     * Now need to set the Session Statement pointer to either the previous
 
1924
     * Statement, or NULL if there isn't one.
 
1925
     */
 
1926
    if (statements->size() == 0)
 
1927
    {
 
1928
      session->setStatementMessage(NULL);
 
1929
    }
 
1930
    else
 
1931
    {
 
1932
      /*
 
1933
       * There isn't a great way to get a pointer to the previous Statement
 
1934
       * message using the RepeatedPtrField object, so we'll just get to it
 
1935
       * using the Transaction message.
 
1936
       */
 
1937
      int last_stmt_idx= transaction->statement_size() - 1;
 
1938
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1939
    }
 
1940
  }
 
1941
  /* We only need to remove 'count' records */
 
1942
  else if (num_avail_recs > count)
 
1943
  {
 
1944
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1945
    while (count--)
 
1946
      records->RemoveLast();
 
1947
  }
 
1948
 
 
1949
  return true;
 
1950
}
 
1951
 
 
1952
 
 
1953
bool TransactionServices::removeStatementRecords(Session *session,
 
1954
                                                 uint32_t count)
 
1955
{
 
1956
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1957
  if (! replication_services.isActive())
 
1958
    return false;
 
1959
 
 
1960
  /* Get the most current Statement */
 
1961
  message::Statement *statement= session->getStatementMessage();
 
1962
 
 
1963
  /* Make sure we have work to do */
 
1964
  if (statement == NULL)
 
1965
    return false;
 
1966
 
 
1967
  bool retval= false;
 
1968
 
 
1969
  switch (statement->type())
 
1970
  {
 
1971
    case message::Statement::INSERT:
 
1972
    {
 
1973
      message::InsertData *data= statement->mutable_insert_data();
 
1974
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
1975
      break;
 
1976
    }
 
1977
 
 
1978
    case message::Statement::UPDATE:
 
1979
    {
 
1980
      message::UpdateData *data= statement->mutable_update_data();
 
1981
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
1982
      break;
 
1983
    }
 
1984
 
 
1985
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
1986
    {
 
1987
      message::DeleteData *data= statement->mutable_delete_data();
 
1988
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
1989
      break;
 
1990
    }
 
1991
 
 
1992
    default:
 
1993
      retval= false;
 
1994
      break;
 
1995
  }
 
1996
 
 
1997
  return retval;
 
1998
}
 
1999
 
 
2000
 
 
2001
void TransactionServices::createTable(Session *in_session,
1900
2002
                                      const message::Table &table)
1901
2003
{
1902
2004
  ReplicationServices &replication_services= ReplicationServices::singleton();
1903
2005
  if (! replication_services.isActive())
1904
2006
    return;
1905
2007
  
1906
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2008
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1907
2009
  message::Statement *statement= transaction->add_statement();
1908
2010
 
1909
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
 
2011
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1910
2012
 
1911
2013
  /* 
1912
2014
   * Construct the specialized CreateTableStatement message and attach
1916
2018
  message::Table *new_table_message= create_table_statement->mutable_table();
1917
2019
  *new_table_message= table;
1918
2020
 
1919
 
  finalizeStatementMessage(*statement, session);
 
2021
  finalizeStatementMessage(*statement, in_session);
1920
2022
 
1921
 
  finalizeTransactionMessage(*transaction, session);
 
2023
  finalizeTransactionMessage(*transaction, in_session);
1922
2024
  
1923
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2025
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1924
2026
 
1925
 
  cleanupTransactionMessage(transaction, session);
 
2027
  cleanupTransactionMessage(transaction, in_session);
1926
2028
 
1927
2029
}
1928
2030
 
1929
 
void TransactionServices::createSchema(Session::reference session,
 
2031
void TransactionServices::createSchema(Session *in_session,
1930
2032
                                       const message::Schema &schema)
1931
2033
{
1932
2034
  ReplicationServices &replication_services= ReplicationServices::singleton();
1933
2035
  if (! replication_services.isActive())
1934
2036
    return;
1935
2037
  
1936
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2038
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1937
2039
  message::Statement *statement= transaction->add_statement();
1938
2040
 
1939
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
 
2041
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1940
2042
 
1941
2043
  /* 
1942
2044
   * Construct the specialized CreateSchemaStatement message and attach
1946
2048
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
2049
  *new_schema_message= schema;
1948
2050
 
1949
 
  finalizeStatementMessage(*statement, session);
 
2051
  finalizeStatementMessage(*statement, in_session);
1950
2052
 
1951
 
  finalizeTransactionMessage(*transaction, session);
 
2053
  finalizeTransactionMessage(*transaction, in_session);
1952
2054
  
1953
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2055
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1954
2056
 
1955
 
  cleanupTransactionMessage(transaction, session);
 
2057
  cleanupTransactionMessage(transaction, in_session);
1956
2058
 
1957
2059
}
1958
2060
 
1959
 
void TransactionServices::dropSchema(Session::reference session,
1960
 
                                     identifier::Schema::const_reference identifier)
 
2061
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1961
2062
{
1962
2063
  ReplicationServices &replication_services= ReplicationServices::singleton();
1963
2064
  if (! replication_services.isActive())
1964
2065
    return;
1965
2066
  
1966
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2067
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1967
2068
  message::Statement *statement= transaction->add_statement();
1968
2069
 
1969
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
 
2070
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1970
2071
 
1971
2072
  /* 
1972
2073
   * Construct the specialized DropSchemaStatement message and attach
1974
2075
   */
1975
2076
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1976
2077
 
1977
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1978
 
 
1979
 
  finalizeStatementMessage(*statement, session);
1980
 
 
1981
 
  finalizeTransactionMessage(*transaction, session);
1982
 
  
1983
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1984
 
 
1985
 
  cleanupTransactionMessage(transaction, session);
1986
 
}
1987
 
 
1988
 
void TransactionServices::alterSchema(Session::reference session,
1989
 
                                      const message::schema::shared_ptr &old_schema,
1990
 
                                      const message::Schema &new_schema)
1991
 
{
1992
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1993
 
  if (! replication_services.isActive())
1994
 
    return;
1995
 
  
1996
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1997
 
  message::Statement *statement= transaction->add_statement();
1998
 
 
1999
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2000
 
 
2001
 
  /* 
2002
 
   * Construct the specialized AlterSchemaStatement message and attach
2003
 
   * it to the generic Statement message
2004
 
   */
2005
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2006
 
 
2007
 
  message::Schema *before= alter_schema_statement->mutable_before();
2008
 
  message::Schema *after= alter_schema_statement->mutable_after();
2009
 
 
2010
 
  *before= *old_schema;
2011
 
  *after= new_schema;
2012
 
 
2013
 
  finalizeStatementMessage(*statement, session);
2014
 
 
2015
 
  finalizeTransactionMessage(*transaction, session);
2016
 
  
2017
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2018
 
 
2019
 
  cleanupTransactionMessage(transaction, session);
2020
 
}
2021
 
 
2022
 
void TransactionServices::dropTable(Session::reference session,
2023
 
                                    const identifier::Table &table,
 
2078
  drop_schema_statement->set_schema_name(schema_name);
 
2079
 
 
2080
  finalizeStatementMessage(*statement, in_session);
 
2081
 
 
2082
  finalizeTransactionMessage(*transaction, in_session);
 
2083
  
 
2084
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2085
 
 
2086
  cleanupTransactionMessage(transaction, in_session);
 
2087
}
 
2088
 
 
2089
void TransactionServices::dropTable(Session *in_session,
 
2090
                                    const string &schema_name,
 
2091
                                    const string &table_name,
2024
2092
                                    bool if_exists)
2025
2093
{
2026
2094
  ReplicationServices &replication_services= ReplicationServices::singleton();
2027
2095
  if (! replication_services.isActive())
2028
2096
    return;
2029
2097
  
2030
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2098
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2031
2099
  message::Statement *statement= transaction->add_statement();
2032
2100
 
2033
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
 
2101
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2034
2102
 
2035
2103
  /* 
2036
2104
   * Construct the specialized DropTableStatement message and attach
2042
2110
 
2043
2111
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2044
2112
 
2045
 
  table_metadata->set_schema_name(table.getSchemaName());
2046
 
  table_metadata->set_table_name(table.getTableName());
2047
 
 
2048
 
  finalizeStatementMessage(*statement, session);
2049
 
 
2050
 
  finalizeTransactionMessage(*transaction, session);
 
2113
  table_metadata->set_schema_name(schema_name);
 
2114
  table_metadata->set_table_name(table_name);
 
2115
 
 
2116
  finalizeStatementMessage(*statement, in_session);
 
2117
 
 
2118
  finalizeTransactionMessage(*transaction, in_session);
2051
2119
  
2052
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2120
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2053
2121
 
2054
 
  cleanupTransactionMessage(transaction, session);
 
2122
  cleanupTransactionMessage(transaction, in_session);
2055
2123
}
2056
2124
 
2057
 
void TransactionServices::truncateTable(Session::reference session,
2058
 
                                        Table &table)
 
2125
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2059
2126
{
2060
2127
  ReplicationServices &replication_services= ReplicationServices::singleton();
2061
2128
  if (! replication_services.isActive())
2062
2129
    return;
2063
2130
  
2064
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2131
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2065
2132
  message::Statement *statement= transaction->add_statement();
2066
2133
 
2067
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
 
2134
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2068
2135
 
2069
2136
  /* 
2070
2137
   * Construct the specialized TruncateTableStatement message and attach
2074
2141
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2075
2142
 
2076
2143
  string schema_name;
2077
 
  (void) table.getShare()->getSchemaName(schema_name);
 
2144
  (void) in_table->getShare()->getSchemaName(schema_name);
2078
2145
  string table_name;
2079
 
  (void) table.getShare()->getTableName(table_name);
 
2146
  (void) in_table->getShare()->getTableName(table_name);
2080
2147
 
2081
2148
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
2149
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2083
2150
 
2084
 
  finalizeStatementMessage(*statement, session);
 
2151
  finalizeStatementMessage(*statement, in_session);
2085
2152
 
2086
 
  finalizeTransactionMessage(*transaction, session);
 
2153
  finalizeTransactionMessage(*transaction, in_session);
2087
2154
  
2088
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2155
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2089
2156
 
2090
 
  cleanupTransactionMessage(transaction, session);
 
2157
  cleanupTransactionMessage(transaction, in_session);
2091
2158
}
2092
2159
 
2093
 
void TransactionServices::rawStatement(Session::reference session,
2094
 
                                       const string &query)
 
2160
void TransactionServices::rawStatement(Session *in_session, const string &query)
2095
2161
{
2096
2162
  ReplicationServices &replication_services= ReplicationServices::singleton();
2097
2163
  if (! replication_services.isActive())
2098
2164
    return;
2099
 
 
2100
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2165
  
 
2166
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2101
2167
  message::Statement *statement= transaction->add_statement();
2102
2168
 
2103
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
 
2169
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2104
2170
  statement->set_sql(query);
2105
 
  finalizeStatementMessage(*statement, session);
 
2171
  finalizeStatementMessage(*statement, in_session);
2106
2172
 
2107
 
  finalizeTransactionMessage(*transaction, session);
 
2173
  finalizeTransactionMessage(*transaction, in_session);
2108
2174
  
2109
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2175
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2110
2176
 
2111
 
  cleanupTransactionMessage(transaction, session);
 
2177
  cleanupTransactionMessage(transaction, in_session);
2112
2178
}
2113
2179
 
2114
 
int TransactionServices::sendEvent(Session::reference session,
2115
 
                                   const message::Event &event)
 
2180
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2116
2181
{
2117
2182
  ReplicationServices &replication_services= ReplicationServices::singleton();
2118
2183
  if (! replication_services.isActive())
2130
2195
 
2131
2196
  trx_event->CopyFrom(event);
2132
2197
 
2133
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
2198
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2134
2199
 
2135
2200
  delete transaction;
2136
2201
 
2137
2202
  return static_cast<int>(result);
2138
2203
}
2139
2204
 
2140
 
bool TransactionServices::sendStartupEvent(Session::reference session)
 
2205
bool TransactionServices::sendStartupEvent(Session *session)
2141
2206
{
2142
2207
  message::Event event;
2143
2208
  event.set_type(message::Event::STARTUP);
2146
2211
  return true;
2147
2212
}
2148
2213
 
2149
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
 
2214
bool TransactionServices::sendShutdownEvent(Session *session)
2150
2215
{
2151
2216
  message::Event event;
2152
2217
  event.set_type(message::Event::SHUTDOWN);