~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-11-19 20:09:58 UTC
  • mfrom: (1938.2.1 bug673579)
  • Revision ID: brian@tangent.org-20101119200958-anlloqi9va5gu4c7
Merge in Monty

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  This program is free software; you can redistribute it and/or modify
8
8
 *  it under the terms of the GNU General Public License as published by
64
64
#include "drizzled/lock.h"
65
65
#include "drizzled/item/int.h"
66
66
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/epoch.h"
 
67
#include "drizzled/field/timestamp.h"
68
68
#include "drizzled/plugin/client.h"
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
316
void TransactionServices::registerResourceForStatement(Session *session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session.transaction.stmt;
333
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session->transaction.stmt;
 
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
348
void TransactionServices::registerResourceForStatement(Session *session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session.transaction.stmt;
366
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session->transaction.stmt;
 
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
382
void TransactionServices::registerResourceForTransaction(Session *session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session.transaction.all;
387
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session->transaction.all;
 
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session.transaction.xid_state.xid.is_null())
404
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
403
  if (session->transaction.xid_state.xid.is_null())
 
404
    session->transaction.xid_state.xid.set(session->getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session.transaction.all;
417
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session->transaction.all;
 
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session.transaction.xid_state.xid.is_null())
434
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
433
  if (session->transaction.xid_state.xid.is_null())
 
434
    session->transaction.xid_state.xid.set(session->getQueryId());
435
435
 
436
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
457
{
458
 
  if (session.getXaId() == 0)
 
458
  if (session->getXaId() == 0)
459
459
  {
460
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
461
461
  }
462
462
 
463
 
  return session.getXaId();
 
463
  return session->getXaId();
464
464
}
465
465
 
466
 
int TransactionServices::commitTransaction(Session::reference session,
467
 
                                           bool normal_transaction)
 
466
/**
 
467
  @retval
 
468
    0   ok
 
469
  @retval
 
470
    1   transaction was rolled back
 
471
  @retval
 
472
    2   error during commit, data may be inconsistent
 
473
 
 
474
  @todo
 
475
    Since we don't support nested statement transactions in 5.0,
 
476
    we can't commit or rollback stmt transactions while we are inside
 
477
    stored functions or triggers. So we simply do nothing now.
 
478
    TODO: This should be fixed in later ( >= 5.1) releases.
 
479
*/
 
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
468
481
{
469
482
  int error= 0, cookie= 0;
470
483
  /*
471
484
    'all' means that this is either an explicit commit issued by
472
485
    user, or an implicit commit issued by a DDL.
473
486
  */
474
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
475
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
476
489
 
477
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
478
491
 
479
492
  /*
480
493
    We must not commit the normal transaction if a statement
482
495
    flags will not get propagated to its normal transaction's
483
496
    counterpart.
484
497
  */
485
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
486
 
              trans == &session.transaction.stmt);
 
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
499
              trans == &session->transaction.stmt);
487
500
 
488
501
  if (resource_contexts.empty() == false)
489
502
  {
490
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
491
504
    {
492
505
      rollbackTransaction(session, normal_transaction);
493
506
      return 1;
518
531
 
519
532
        if (resource->participatesInXaTransaction())
520
533
        {
521
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
522
535
          {
523
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
524
537
            error= 1;
525
538
          }
526
539
          else
527
540
          {
528
 
            session.status_var.ha_prepare_count++;
 
541
            session->status_var.ha_prepare_count++;
529
542
          }
530
543
        }
531
544
      }
547
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
548
561
end:
549
562
    if (is_real_trans)
550
 
      session.startWaitingGlobalReadLock();
 
563
      session->startWaitingGlobalReadLock();
551
564
  }
552
565
  return error;
553
566
}
556
569
  @note
557
570
  This function does not care about global read lock. A caller should.
558
571
*/
559
 
int TransactionServices::commitPhaseOne(Session::reference session,
560
 
                                        bool normal_transaction)
 
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
561
573
{
562
574
  int error=0;
563
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
564
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
565
577
 
566
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
567
 
  bool all= normal_transaction;
568
 
 
569
 
  /* If we're in autocommit then we have a real transaction to commit
570
 
     (except if it's BEGIN)
571
 
  */
572
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
573
 
    all= true;
 
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
574
579
 
575
580
  if (resource_contexts.empty() == false)
576
581
  {
585
590
 
586
591
      if (resource->participatesInXaTransaction())
587
592
      {
588
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
589
594
        {
590
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
591
596
          error= 1;
592
597
        }
593
598
        else if (normal_transaction)
594
599
        {
595
 
          session.status_var.ha_commit_count++;
 
600
          session->status_var.ha_commit_count++;
596
601
        }
597
602
      }
598
603
      else if (resource->participatesInSqlTransaction())
599
604
      {
600
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
601
606
        {
602
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
603
608
          error= 1;
604
609
        }
605
610
        else if (normal_transaction)
606
611
        {
607
 
          session.status_var.ha_commit_count++;
 
612
          session->status_var.ha_commit_count++;
608
613
        }
609
614
      }
610
615
      resource_context->reset(); /* keep it conveniently zero-filled */
611
616
    }
612
617
 
613
618
    if (is_real_trans)
614
 
      session.transaction.xid_state.xid.null();
 
619
      session->transaction.xid_state.xid.null();
615
620
 
616
621
    if (normal_transaction)
617
622
    {
618
 
      session.variables.tx_isolation= session.session_tx_isolation;
619
 
      session.transaction.cleanup();
 
623
      session->variables.tx_isolation= session->session_tx_isolation;
 
624
      session->transaction.cleanup();
620
625
    }
621
626
  }
622
627
  trans->reset();
623
628
  return error;
624
629
}
625
630
 
626
 
int TransactionServices::rollbackTransaction(Session::reference session,
627
 
                                             bool normal_transaction)
 
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
628
632
{
629
633
  int error= 0;
630
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
631
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
632
636
 
633
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
635
638
 
636
639
  /*
637
640
    We must not rollback the normal transaction if a statement
638
641
    transaction is pending.
639
642
  */
640
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
641
 
              trans == &session.transaction.stmt);
 
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
644
              trans == &session->transaction.stmt);
642
645
 
643
646
  if (resource_contexts.empty() == false)
644
647
  {
653
656
 
654
657
      if (resource->participatesInXaTransaction())
655
658
      {
656
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
657
660
        {
658
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
659
662
          error= 1;
660
663
        }
661
664
        else if (normal_transaction)
662
665
        {
663
 
          session.status_var.ha_rollback_count++;
 
666
          session->status_var.ha_rollback_count++;
664
667
        }
665
668
      }
666
669
      else if (resource->participatesInSqlTransaction())
667
670
      {
668
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
669
672
        {
670
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
671
674
          error= 1;
672
675
        }
673
676
        else if (normal_transaction)
674
677
        {
675
 
          session.status_var.ha_rollback_count++;
 
678
          session->status_var.ha_rollback_count++;
676
679
        }
677
680
      }
678
681
      resource_context->reset(); /* keep it conveniently zero-filled */
685
688
     * a rollback statement with the corresponding transaction ID
686
689
     * to rollback.
687
690
     */
688
 
    if (all)
 
691
    if (normal_transaction)
689
692
      rollbackTransactionMessage(session);
690
 
    else
691
 
      rollbackStatementMessage(session);
692
693
 
693
694
    if (is_real_trans)
694
 
      session.transaction.xid_state.xid.null();
 
695
      session->transaction.xid_state.xid.null();
695
696
    if (normal_transaction)
696
697
    {
697
 
      session.variables.tx_isolation=session.session_tx_isolation;
698
 
      session.transaction.cleanup();
 
698
      session->variables.tx_isolation=session->session_tx_isolation;
 
699
      session->transaction.cleanup();
699
700
    }
700
701
  }
701
702
  if (normal_transaction)
702
 
    session.transaction_rollback_request= false;
 
703
    session->transaction_rollback_request= false;
703
704
 
704
705
  /*
705
706
   * If a non-transactional table was updated, warn the user
706
707
   */
707
708
  if (is_real_trans &&
708
 
      session.transaction.all.hasModifiedNonTransData() &&
709
 
      session.getKilled() != Session::KILL_CONNECTION)
 
709
      session->transaction.all.hasModifiedNonTransData() &&
 
710
      session->getKilled() != Session::KILL_CONNECTION)
710
711
  {
711
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
712
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
712
713
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
713
714
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
714
715
  }
716
717
  return error;
717
718
}
718
719
 
719
 
int TransactionServices::autocommitOrRollback(Session::reference session,
720
 
                                              int error)
 
720
/**
 
721
  This is used to commit or rollback a single statement depending on
 
722
  the value of error.
 
723
 
 
724
  @note
 
725
    Note that if the autocommit is on, then the following call inside
 
726
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
727
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
728
    the user has used LOCK TABLES then that mechanism does not know to do the
 
729
    commit.
 
730
*/
 
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
721
732
{
722
 
  /* One GPB Statement message per SQL statement */
723
 
  message::Statement *statement= session.getStatementMessage();
724
 
  if ((statement != NULL) && (! error))
725
 
    finalizeStatementMessage(*statement, session);
726
733
 
727
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
 
734
  if (session->transaction.stmt.getResourceContexts().empty() == false)
728
735
  {
729
 
    TransactionContext *trans = &session.transaction.stmt;
 
736
    TransactionContext *trans = &session->transaction.stmt;
730
737
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
731
738
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
732
739
         it != resource_contexts.end();
734
741
    {
735
742
      ResourceContext *resource_context= *it;
736
743
 
737
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
 
744
      resource_context->getTransactionalStorageEngine()->endStatement(session);
738
745
    }
739
746
 
740
747
    if (! error)
745
752
    else
746
753
    {
747
754
      (void) rollbackTransaction(session, false);
748
 
      if (session.transaction_rollback_request)
749
 
      {
 
755
      if (session->transaction_rollback_request)
750
756
        (void) rollbackTransaction(session, true);
751
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
752
 
      }
753
757
    }
754
758
 
755
 
    session.variables.tx_isolation= session.session_tx_isolation;
 
759
    session->variables.tx_isolation= session->session_tx_isolation;
756
760
  }
757
 
 
758
761
  return error;
759
762
}
760
763
 
768
771
  }
769
772
};
770
773
 
771
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
772
 
                                             NamedSavepoint &sv)
 
774
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
773
775
{
774
776
  int error= 0;
775
 
  TransactionContext *trans= &session.transaction.all;
 
777
  TransactionContext *trans= &session->transaction.all;
776
778
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
777
779
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
778
780
 
792
794
 
793
795
    if (resource->participatesInSqlTransaction())
794
796
    {
795
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
797
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
796
798
      {
797
799
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
798
800
        error= 1;
799
801
      }
800
802
      else
801
803
      {
802
 
        session.status_var.ha_savepoint_rollback_count++;
 
804
        session->status_var.ha_savepoint_rollback_count++;
803
805
      }
804
806
    }
805
807
    trans->no_2pc|= not resource->participatesInXaTransaction();
849
851
 
850
852
      if (resource->participatesInSqlTransaction())
851
853
      {
852
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
854
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
853
855
        {
854
856
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
855
857
          error= 1;
856
858
        }
857
859
        else
858
860
        {
859
 
          session.status_var.ha_rollback_count++;
 
861
          session->status_var.ha_rollback_count++;
860
862
        }
861
863
      }
862
864
      resource_context->reset(); /* keep it conveniently zero-filled */
879
881
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
880
882
      if (num_statements == 0)
881
883
      {    
882
 
        session.setStatementMessage(NULL);
 
884
        session->setStatementMessage(NULL);
883
885
      }    
884
886
      else 
885
887
      {
886
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
888
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
887
889
      }    
888
 
      session.setTransactionMessage(savepoint_transaction_copy);
 
890
      session->setTransactionMessage(savepoint_transaction_copy);
889
891
    }
890
892
  }
891
893
 
898
900
  section "4.33.4 SQL-statements and transaction states",
899
901
  NamedSavepoint is *not* transaction-initiating SQL-statement
900
902
*/
901
 
int TransactionServices::setSavepoint(Session::reference session,
902
 
                                      NamedSavepoint &sv)
 
903
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
903
904
{
904
905
  int error= 0;
905
 
  TransactionContext *trans= &session.transaction.all;
 
906
  TransactionContext *trans= &session->transaction.all;
906
907
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
907
908
 
908
909
  if (resource_contexts.empty() == false)
918
919
 
919
920
      if (resource->participatesInSqlTransaction())
920
921
      {
921
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
922
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
922
923
        {
923
924
          my_error(ER_GET_ERRNO, MYF(0), err);
924
925
          error= 1;
925
926
        }
926
927
        else
927
928
        {
928
 
          session.status_var.ha_savepoint_count++;
 
929
          session->status_var.ha_savepoint_count++;
929
930
        }
930
931
      }
931
932
    }
937
938
 
938
939
  if (shouldConstructMessages())
939
940
  {
940
 
    message::Transaction *transaction= session.getTransactionMessage();
 
941
    message::Transaction *transaction= session->getTransactionMessage();
941
942
                  
942
943
    if (transaction != NULL)
943
944
    {
950
951
  return error;
951
952
}
952
953
 
953
 
int TransactionServices::releaseSavepoint(Session::reference session,
954
 
                                          NamedSavepoint &sv)
 
954
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
955
955
{
956
956
  int error= 0;
957
957
 
968
968
 
969
969
    if (resource->participatesInSqlTransaction())
970
970
    {
971
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
971
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
972
972
      {
973
973
        my_error(ER_GET_ERRNO, MYF(0), err);
974
974
        error= 1;
985
985
  return replication_services.isActive();
986
986
}
987
987
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
989
 
                                                                       bool should_inc_trx_id)
 
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
990
989
{
991
 
  message::Transaction *transaction= session.getTransactionMessage();
 
990
  message::Transaction *transaction= in_session->getTransactionMessage();
992
991
 
993
992
  if (unlikely(transaction == NULL))
994
993
  {
998
997
     * deleting transaction message when done with it.
999
998
     */
1000
999
    transaction= new (nothrow) message::Transaction();
1001
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
 
    session.setTransactionMessage(transaction);
 
1000
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
1001
    in_session->setTransactionMessage(transaction);
1003
1002
    return transaction;
1004
1003
  }
1005
1004
  else
1006
1005
    return transaction;
1007
1006
}
1008
1007
 
1009
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
 
                                                 Session::reference session,
 
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1009
                                                 Session *in_session,
1011
1010
                                                 bool should_inc_trx_id)
1012
1011
{
1013
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
 
  trx->set_server_id(session.getServerId());
 
1012
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1013
  trx->set_server_id(in_session->getServerId());
1015
1014
 
1016
1015
  if (should_inc_trx_id)
1017
1016
  {
1018
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1019
 
    session.setXaId(0);
1020
 
  }
 
1017
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1018
    in_session->setXaId(0);
 
1019
  }  
1021
1020
  else
1022
 
  {
1023
 
    /* trx and seg id will get set properly elsewhere */
 
1021
  { 
1024
1022
    trx->set_transaction_id(0);
1025
1023
  }
1026
1024
 
1027
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1028
 
  
1029
 
  /* segment info may get set elsewhere as needed */
1030
 
  transaction.set_segment_id(1);
1031
 
  transaction.set_end_segment(true);
1032
 
}
1033
 
 
1034
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
 
                                                     Session::const_reference session)
1036
 
{
1037
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1039
 
}
1040
 
 
1041
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
 
                                                    Session::reference session)
1043
 
{
1044
 
  delete transaction;
1045
 
  session.setStatementMessage(NULL);
1046
 
  session.setTransactionMessage(NULL);
1047
 
  session.setXaId(0);
1048
 
}
1049
 
 
1050
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
1025
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1026
}
 
1027
 
 
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1029
                                              Session *in_session)
 
1030
{
 
1031
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1032
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1033
}
 
1034
 
 
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1036
                                             Session *in_session)
 
1037
{
 
1038
  delete in_transaction;
 
1039
  in_session->setStatementMessage(NULL);
 
1040
  in_session->setTransactionMessage(NULL);
 
1041
  in_session->setXaId(0);
 
1042
}
 
1043
 
 
1044
int TransactionServices::commitTransactionMessage(Session *in_session)
1051
1045
{
1052
1046
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1047
  if (! replication_services.isActive())
1054
1048
    return 0;
1055
1049
 
1056
 
  /*
1057
 
   * If no Transaction message was ever created, then no data modification
1058
 
   * occurred inside the transaction, so nothing to do.
1059
 
   */
1060
 
  if (session.getTransactionMessage() == NULL)
1061
 
    return 0;
1062
 
  
1063
 
  /* If there is an active statement message, finalize it. */
1064
 
  message::Statement *statement= session.getStatementMessage();
 
1050
  /* If there is an active statement message, finalize it */
 
1051
  message::Statement *statement= in_session->getStatementMessage();
1065
1052
 
1066
1053
  if (statement != NULL)
1067
1054
  {
1068
 
    finalizeStatementMessage(*statement, session);
1069
 
  }
1070
 
 
1071
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
1072
 
 
1073
 
  /*
1074
 
   * It is possible that we could have a Transaction without any Statements
1075
 
   * if we had created a Statement but had to roll it back due to it failing
1076
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
 
   * message. In this case, we simply clean up the message and not push it.
1078
 
   */
1079
 
  if (transaction->statement_size() == 0)
1080
 
  {
1081
 
    cleanupTransactionMessage(transaction, session);
1082
 
    return 0;
1083
 
  }
1084
 
  
1085
 
  finalizeTransactionMessage(*transaction, session);
1086
 
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1088
 
 
1089
 
  cleanupTransactionMessage(transaction, session);
 
1055
    finalizeStatementMessage(*statement, in_session);
 
1056
  }
 
1057
  else
 
1058
    return 0; /* No data modification occurred inside the transaction */
 
1059
  
 
1060
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1061
 
 
1062
  finalizeTransactionMessage(*transaction, in_session);
 
1063
  
 
1064
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1065
 
 
1066
  cleanupTransactionMessage(transaction, in_session);
1090
1067
 
1091
1068
  return static_cast<int>(result);
1092
1069
}
1093
1070
 
1094
1071
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
 
                                               message::Statement::Type type,
1096
 
                                               Session::const_reference session)
 
1072
                                        message::Statement::Type in_type,
 
1073
                                        Session *in_session)
1097
1074
{
1098
 
  statement.set_type(type);
1099
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
1100
 
 
1101
 
  if (session.variables.replicate_query)
1102
 
    statement.set_sql(session.getQueryString()->c_str());
 
1075
  statement.set_type(in_type);
 
1076
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1103
1077
}
1104
1078
 
1105
1079
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                                   Session::reference session)
 
1080
                                            Session *in_session)
1107
1081
{
1108
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1109
 
  session.setStatementMessage(NULL);
 
1082
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1083
  in_session->setStatementMessage(NULL);
1110
1084
}
1111
1085
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
1086
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1113
1087
{
1114
1088
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1089
  if (! replication_services.isActive())
1116
1090
    return;
1117
1091
  
1118
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1092
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1119
1093
 
1120
1094
  /*
1121
1095
   * OK, so there are two situations that we need to deal with here:
1138
1112
  {
1139
1113
    /* Remember the transaction ID so we can re-use it */
1140
1114
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
 
    uint32_t seg_id= transaction->segment_id();
1142
1115
 
1143
1116
    /*
1144
1117
     * Clear the transaction, create a Rollback statement message, 
1145
1118
     * attach it to the transaction, and push it to replicators.
1146
1119
     */
1147
1120
    transaction->Clear();
1148
 
    initTransactionMessage(*transaction, session, false);
 
1121
    initTransactionMessage(*transaction, in_session, false);
1149
1122
 
1150
1123
    /* Set the transaction ID to match the previous messages */
1151
1124
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
 
    transaction->set_segment_id(seg_id);
1153
 
    transaction->set_end_segment(true);
1154
1125
 
1155
1126
    message::Statement *statement= transaction->add_statement();
1156
1127
 
1157
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
 
    finalizeStatementMessage(*statement, session);
 
1128
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1129
    finalizeStatementMessage(*statement, in_session);
1159
1130
 
1160
 
    finalizeTransactionMessage(*transaction, session);
 
1131
    finalizeTransactionMessage(*transaction, in_session);
1161
1132
    
1162
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
1163
 
  }
1164
 
 
1165
 
  cleanupTransactionMessage(transaction, session);
1166
 
}
1167
 
 
1168
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
1169
 
{
1170
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1171
 
  if (! replication_services.isActive())
1172
 
    return;
1173
 
 
1174
 
  message::Statement *current_statement= session.getStatementMessage();
1175
 
 
1176
 
  /* If we never added a Statement message, nothing to undo. */
1177
 
  if (current_statement == NULL)
1178
 
    return;
1179
 
 
1180
 
  /*
1181
 
   * If the Statement has been segmented, then we've already pushed a portion
1182
 
   * of this Statement's row changes through the replication stream and we
1183
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
 
   * delete the current Statement message.
1185
 
   */
1186
 
  bool is_segmented= false;
1187
 
 
1188
 
  switch (current_statement->type())
1189
 
  {
1190
 
    case message::Statement::INSERT:
1191
 
      if (current_statement->insert_data().segment_id() > 1)
1192
 
        is_segmented= true;
1193
 
      break;
1194
 
 
1195
 
    case message::Statement::UPDATE:
1196
 
      if (current_statement->update_data().segment_id() > 1)
1197
 
        is_segmented= true;
1198
 
      break;
1199
 
 
1200
 
    case message::Statement::DELETE:
1201
 
      if (current_statement->delete_data().segment_id() > 1)
1202
 
        is_segmented= true;
1203
 
      break;
1204
 
 
1205
 
    default:
1206
 
      break;
1207
 
  }
1208
 
 
1209
 
  /*
1210
 
   * Remove the Statement message we've been working with (same as
1211
 
   * current_statement).
1212
 
   */
1213
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1214
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
 
  statements_in_txn= transaction->mutable_statement();
1216
 
  statements_in_txn->RemoveLast();
1217
 
  session.setStatementMessage(NULL);
1218
 
  
1219
 
  /*
1220
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
 
   * an indicator to cancel the previous Statement message which should have
1222
 
   * had its end_segment attribute set to false.
1223
 
   */
1224
 
  if (is_segmented)
1225
 
  {
1226
 
    current_statement= transaction->add_statement();
1227
 
    initStatementMessage(*current_statement,
1228
 
                         message::Statement::ROLLBACK_STATEMENT,
1229
 
                         session);
1230
 
    finalizeStatementMessage(*current_statement, session);
1231
 
  }
1232
 
}
1233
 
 
1234
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
 
                                                                     message::Transaction *transaction)
1236
 
{
1237
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
 
  uint32_t seg_id= transaction->segment_id();
1239
 
  
1240
 
  transaction->set_end_segment(false);
1241
 
  commitTransactionMessage(session);
1242
 
  transaction= getActiveTransactionMessage(session, false);
1243
 
  
1244
 
  /* Set the transaction ID to match the previous messages */
1245
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
 
  transaction->set_segment_id(seg_id + 1);
1247
 
  transaction->set_end_segment(true);
1248
 
 
1249
 
  return transaction;
1250
 
}
1251
 
 
1252
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1253
 
                                                            Table &table,
 
1133
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1134
  }
 
1135
  cleanupTransactionMessage(transaction, in_session);
 
1136
}
 
1137
 
 
1138
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1139
                                                            Table *in_table,
1254
1140
                                                            uint32_t *next_segment_id)
1255
1141
{
1256
 
  message::Statement *statement= session.getStatementMessage();
 
1142
  message::Statement *statement= in_session->getStatementMessage();
1257
1143
  message::Transaction *transaction= NULL;
1258
 
  
1259
 
  /*
1260
 
   * If statement is NULL, this is a new statement.
1261
 
   * If statement is NOT NULL, this a continuation of the same statement.
1262
 
   * This is because autocommitOrRollback() finalizes the statement so that
1263
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1264
 
   * share a single GPB message for multiple statements).
 
1144
 
 
1145
  /* 
 
1146
   * Check the type for the current Statement message, if it is anything
 
1147
   * other then INSERT we need to call finalize, this will ensure a 
 
1148
   * new InsertStatement is created. If it is of type INSERT check
 
1149
   * what table the INSERT belongs to, if it is a different table
 
1150
   * call finalize, so a new InsertStatement can be created. 
1265
1151
   */
1266
 
  if (statement == NULL)
1267
 
  {
1268
 
    transaction= getActiveTransactionMessage(session);
1269
 
 
1270
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1271
 
        transaction_message_threshold)
1272
 
    {
1273
 
      transaction= segmentTransactionMessage(session, transaction);
1274
 
    }
1275
 
 
1276
 
    statement= transaction->add_statement();
1277
 
    setInsertHeader(*statement, session, table);
1278
 
    session.setStatementMessage(statement);
1279
 
  }
1280
 
  else
1281
 
  {
1282
 
    transaction= getActiveTransactionMessage(session);
1283
 
    
 
1152
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1153
  {
 
1154
    finalizeStatementMessage(*statement, in_session);
 
1155
    statement= in_session->getStatementMessage();
 
1156
  } 
 
1157
  else if (statement != NULL)
 
1158
  {
 
1159
    transaction= getActiveTransactionMessage(in_session);
 
1160
 
1284
1161
    /*
1285
1162
     * If we've passed our threshold for the statement size (possible for
1286
1163
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1287
1164
     * the Transaction will keep it from getting huge).
1288
1165
     */
1289
1166
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1290
 
        transaction_message_threshold)
 
1167
      in_session->variables.transaction_message_threshold)
1291
1168
    {
1292
1169
      /* Remember the transaction ID so we can re-use it */
1293
1170
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
 
      uint32_t seg_id= transaction->segment_id();
1295
 
      
 
1171
 
1296
1172
      message::InsertData *current_data= statement->mutable_insert_data();
1297
 
      
 
1173
 
1298
1174
      /* Caller should use this value when adding a new record */
1299
1175
      *next_segment_id= current_data->segment_id() + 1;
1300
 
      
 
1176
 
1301
1177
      current_data->set_end_segment(false);
1302
 
      transaction->set_end_segment(false);
1303
 
      
 
1178
 
1304
1179
      /* 
1305
1180
       * Send the trx message to replicators after finalizing the 
1306
1181
       * statement and transaction. This will also set the Transaction
1307
1182
       * and Statement objects in Session to NULL.
1308
1183
       */
1309
 
      commitTransactionMessage(session);
1310
 
      
 
1184
      commitTransactionMessage(in_session);
 
1185
 
1311
1186
      /*
1312
1187
       * Statement and Transaction should now be NULL, so new ones will get
1313
1188
       * created. We reuse the transaction id since we are segmenting
1314
1189
       * one transaction.
1315
1190
       */
1316
 
      transaction= getActiveTransactionMessage(session, false);
 
1191
      statement= in_session->getStatementMessage();
 
1192
      transaction= getActiveTransactionMessage(in_session, false);
1317
1193
      assert(transaction != NULL);
1318
1194
 
1319
 
      statement= transaction->add_statement();
1320
 
      setInsertHeader(*statement, session, table);
1321
 
      session.setStatementMessage(statement);
1322
 
            
1323
1195
      /* Set the transaction ID to match the previous messages */
1324
1196
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
 
      transaction->set_segment_id(seg_id + 1);
1326
 
      transaction->set_end_segment(true);
1327
1197
    }
1328
1198
    else
1329
1199
    {
1330
 
      /*
1331
 
       * Continuation of the same statement. Carry forward the existing
1332
 
       * segment id.
1333
 
       */
1334
 
      const message::InsertData &current_data= statement->insert_data();
1335
 
      *next_segment_id= current_data.segment_id();
 
1200
      const message::InsertHeader &insert_header= statement->insert_header();
 
1201
      string old_table_name= insert_header.table_metadata().table_name();
 
1202
     
 
1203
      string current_table_name;
 
1204
      (void) in_table->getShare()->getTableName(current_table_name);
 
1205
 
 
1206
      if (current_table_name.compare(old_table_name))
 
1207
      {
 
1208
        finalizeStatementMessage(*statement, in_session);
 
1209
        statement= in_session->getStatementMessage();
 
1210
      }
 
1211
      else
 
1212
      {
 
1213
        /* carry forward the existing segment id */
 
1214
        const message::InsertData &current_data= statement->insert_data();
 
1215
        *next_segment_id= current_data.segment_id();
 
1216
      }
1336
1217
    }
 
1218
  } 
 
1219
 
 
1220
  if (statement == NULL)
 
1221
  {
 
1222
    /*
 
1223
     * Transaction will be non-NULL only if we had to segment it due to
 
1224
     * transaction size above.
 
1225
     */
 
1226
    if (transaction == NULL)
 
1227
      transaction= getActiveTransactionMessage(in_session);
 
1228
 
 
1229
    /* 
 
1230
     * Transaction message initialized and set, but no statement created
 
1231
     * yet.  We construct one and initialize it, here, then return the
 
1232
     * message after attaching the new Statement message pointer to the 
 
1233
     * Session for easy retrieval later...
 
1234
     */
 
1235
    statement= transaction->add_statement();
 
1236
    setInsertHeader(*statement, in_session, in_table);
 
1237
    in_session->setStatementMessage(statement);
1337
1238
  }
1338
 
  
1339
1239
  return *statement;
1340
1240
}
1341
1241
 
1342
1242
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
 
                                          Session::const_reference session,
1344
 
                                          Table &table)
 
1243
                                          Session *in_session,
 
1244
                                          Table *in_table)
1345
1245
{
1346
 
  initStatementMessage(statement, message::Statement::INSERT, session);
 
1246
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1347
1247
 
1348
1248
  /* 
1349
1249
   * Now we construct the specialized InsertHeader message inside
1354
1254
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1355
1255
 
1356
1256
  string schema_name;
1357
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1257
  (void) in_table->getShare()->getSchemaName(schema_name);
1358
1258
  string table_name;
1359
 
  (void) table.getShare()->getTableName(table_name);
 
1259
  (void) in_table->getShare()->getTableName(table_name);
1360
1260
 
1361
1261
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1362
1262
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1363
1263
 
1364
1264
  Field *current_field;
1365
 
  Field **table_fields= table.getFields();
 
1265
  Field **table_fields= in_table->getFields();
1366
1266
 
1367
1267
  message::FieldMetadata *field_metadata;
1368
1268
 
1369
1269
  /* We will read all the table's fields... */
1370
 
  table.setReadSet();
 
1270
  in_table->setReadSet();
1371
1271
 
1372
1272
  while ((current_field= *table_fields++) != NULL) 
1373
1273
  {
1377
1277
  }
1378
1278
}
1379
1279
 
1380
 
bool TransactionServices::insertRecord(Session::reference session,
1381
 
                                       Table &table)
 
1280
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1382
1281
{
1383
1282
  ReplicationServices &replication_services= ReplicationServices::singleton();
1384
1283
  if (! replication_services.isActive())
1385
1284
    return false;
1386
 
 
1387
1285
  /**
1388
1286
   * We do this check here because we don't want to even create a 
1389
1287
   * statement if there isn't a primary key on the table...
1392
1290
   *
1393
1291
   * Multi-column primary keys are handled how exactly?
1394
1292
   */
1395
 
  if (not table.getShare()->hasPrimaryKey())
 
1293
  if (not in_table->getShare()->hasPrimaryKey())
1396
1294
  {
1397
1295
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1398
1296
    return true;
1399
1297
  }
1400
1298
 
1401
1299
  uint32_t next_segment_id= 1;
1402
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
 
1300
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1403
1301
 
1404
1302
  message::InsertData *data= statement.mutable_insert_data();
1405
1303
  data->set_segment_id(next_segment_id);
1407
1305
  message::InsertRecord *record= data->add_record();
1408
1306
 
1409
1307
  Field *current_field;
1410
 
  Field **table_fields= table.getFields();
 
1308
  Field **table_fields= in_table->getFields();
1411
1309
 
1412
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1310
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1413
1311
  string_value->set_charset(system_charset_info);
1414
1312
 
1415
1313
  /* We will read all the table's fields... */
1416
 
  table.setReadSet();
 
1314
  in_table->setReadSet();
1417
1315
 
1418
1316
  while ((current_field= *table_fields++) != NULL) 
1419
1317
  {
1424
1322
    } 
1425
1323
    else 
1426
1324
    {
1427
 
      string_value= current_field->val_str_internal(string_value);
 
1325
      string_value= current_field->val_str(string_value);
1428
1326
      record->add_is_null(false);
1429
1327
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1430
1328
      string_value->free();
1433
1331
  return false;
1434
1332
}
1435
1333
 
1436
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1437
 
                                                            Table &table,
 
1334
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1335
                                                            Table *in_table,
1438
1336
                                                            const unsigned char *old_record, 
1439
1337
                                                            const unsigned char *new_record,
1440
1338
                                                            uint32_t *next_segment_id)
1441
1339
{
1442
 
  message::Statement *statement= session.getStatementMessage();
 
1340
  message::Statement *statement= in_session->getStatementMessage();
1443
1341
  message::Transaction *transaction= NULL;
1444
1342
 
1445
1343
  /*
1446
 
   * If statement is NULL, this is a new statement.
1447
 
   * If statement is NOT NULL, this a continuation of the same statement.
1448
 
   * This is because autocommitOrRollback() finalizes the statement so that
1449
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1450
 
   * share a single GPB message for multiple statements).
 
1344
   * Check the type for the current Statement message, if it is anything
 
1345
   * other then UPDATE we need to call finalize, this will ensure a
 
1346
   * new UpdateStatement is created. If it is of type UPDATE check
 
1347
   * what table the UPDATE belongs to, if it is a different table
 
1348
   * call finalize, so a new UpdateStatement can be created.
1451
1349
   */
1452
 
  if (statement == NULL)
 
1350
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1453
1351
  {
1454
 
    transaction= getActiveTransactionMessage(session);
1455
 
    
1456
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
 
        transaction_message_threshold)
1458
 
    {
1459
 
      transaction= segmentTransactionMessage(session, transaction);
1460
 
    }
1461
 
    
1462
 
    statement= transaction->add_statement();
1463
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1464
 
    session.setStatementMessage(statement);
 
1352
    finalizeStatementMessage(*statement, in_session);
 
1353
    statement= in_session->getStatementMessage();
1465
1354
  }
1466
 
  else
 
1355
  else if (statement != NULL)
1467
1356
  {
1468
 
    transaction= getActiveTransactionMessage(session);
1469
 
    
 
1357
    transaction= getActiveTransactionMessage(in_session);
 
1358
 
1470
1359
    /*
1471
1360
     * If we've passed our threshold for the statement size (possible for
1472
1361
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1473
1362
     * the Transaction will keep it from getting huge).
1474
1363
     */
1475
1364
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1476
 
        transaction_message_threshold)
 
1365
      in_session->variables.transaction_message_threshold)
1477
1366
    {
1478
1367
      /* Remember the transaction ID so we can re-use it */
1479
1368
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
 
      uint32_t seg_id= transaction->segment_id();
1481
 
      
 
1369
 
1482
1370
      message::UpdateData *current_data= statement->mutable_update_data();
1483
 
      
 
1371
 
1484
1372
      /* Caller should use this value when adding a new record */
1485
1373
      *next_segment_id= current_data->segment_id() + 1;
1486
 
      
 
1374
 
1487
1375
      current_data->set_end_segment(false);
1488
 
      transaction->set_end_segment(false);
1489
 
      
1490
 
      /* 
 
1376
 
 
1377
      /*
1491
1378
       * Send the trx message to replicators after finalizing the 
1492
1379
       * statement and transaction. This will also set the Transaction
1493
1380
       * and Statement objects in Session to NULL.
1494
1381
       */
1495
 
      commitTransactionMessage(session);
1496
 
      
 
1382
      commitTransactionMessage(in_session);
 
1383
 
1497
1384
      /*
1498
1385
       * Statement and Transaction should now be NULL, so new ones will get
1499
1386
       * created. We reuse the transaction id since we are segmenting
1500
1387
       * one transaction.
1501
1388
       */
1502
 
      transaction= getActiveTransactionMessage(session, false);
 
1389
      statement= in_session->getStatementMessage();
 
1390
      transaction= getActiveTransactionMessage(in_session, false);
1503
1391
      assert(transaction != NULL);
1504
 
      
1505
 
      statement= transaction->add_statement();
1506
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1507
 
      session.setStatementMessage(statement);
1508
 
      
 
1392
 
1509
1393
      /* Set the transaction ID to match the previous messages */
1510
1394
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
 
      transaction->set_segment_id(seg_id + 1);
1512
 
      transaction->set_end_segment(true);
1513
1395
    }
1514
1396
    else
1515
1397
    {
1516
 
      /*
1517
 
       * Continuation of the same statement. Carry forward the existing
1518
 
       * segment id.
1519
 
       */
1520
 
      const message::UpdateData &current_data= statement->update_data();
1521
 
      *next_segment_id= current_data.segment_id();
 
1398
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1399
      {
 
1400
        /* carry forward the existing segment id */
 
1401
        const message::UpdateData &current_data= statement->update_data();
 
1402
        *next_segment_id= current_data.segment_id();
 
1403
      } 
 
1404
      else 
 
1405
      {
 
1406
        finalizeStatementMessage(*statement, in_session);
 
1407
        statement= in_session->getStatementMessage();
 
1408
      }
1522
1409
    }
1523
1410
  }
1524
 
  
 
1411
 
 
1412
  if (statement == NULL)
 
1413
  {
 
1414
    /*
 
1415
     * Transaction will be non-NULL only if we had to segment it due to
 
1416
     * transaction size above.
 
1417
     */
 
1418
    if (transaction == NULL)
 
1419
      transaction= getActiveTransactionMessage(in_session);
 
1420
 
 
1421
    /* 
 
1422
     * Transaction message initialized and set, but no statement created
 
1423
     * yet.  We construct one and initialize it, here, then return the
 
1424
     * message after attaching the new Statement message pointer to the 
 
1425
     * Session for easy retrieval later...
 
1426
     */
 
1427
    statement= transaction->add_statement();
 
1428
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1429
    in_session->setStatementMessage(statement);
 
1430
  }
1525
1431
  return *statement;
1526
1432
}
1527
1433
 
 
1434
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1435
                                                  Table *in_table,
 
1436
                                                  const unsigned char *old_record,
 
1437
                                                  const unsigned char *new_record)
 
1438
{
 
1439
  const message::UpdateHeader &update_header= statement.update_header();
 
1440
  string old_table_name= update_header.table_metadata().table_name();
 
1441
 
 
1442
  string current_table_name;
 
1443
  (void) in_table->getShare()->getTableName(current_table_name);
 
1444
  if (current_table_name.compare(old_table_name))
 
1445
  {
 
1446
    return false;
 
1447
  }
 
1448
  else
 
1449
  {
 
1450
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1451
     * match the updated fields in the new record, if they do not we must
 
1452
     * create a new UpdateHeader 
 
1453
     */
 
1454
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1455
 
 
1456
    Field *current_field;
 
1457
    Field **table_fields= in_table->getFields();
 
1458
    in_table->setReadSet();
 
1459
 
 
1460
    size_t num_calculated_updated_fields= 0;
 
1461
    bool found= false;
 
1462
    while ((current_field= *table_fields++) != NULL)
 
1463
    {
 
1464
      if (num_calculated_updated_fields > num_set_fields)
 
1465
      {
 
1466
        break;
 
1467
      }
 
1468
 
 
1469
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1470
      {
 
1471
        /* check that this field exists in the UpdateHeader record */
 
1472
        found= false;
 
1473
 
 
1474
        for (size_t x= 0; x < num_set_fields; ++x)
 
1475
        {
 
1476
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1477
          string name= field_metadata.name();
 
1478
          if (name.compare(current_field->field_name) == 0)
 
1479
          {
 
1480
            found= true;
 
1481
            ++num_calculated_updated_fields;
 
1482
            break;
 
1483
          } 
 
1484
        }
 
1485
        if (! found)
 
1486
        {
 
1487
          break;
 
1488
        } 
 
1489
      }
 
1490
    }
 
1491
 
 
1492
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1493
    {
 
1494
      return true;
 
1495
    } 
 
1496
    else 
 
1497
    {
 
1498
      return false;
 
1499
    }
 
1500
  }
 
1501
}  
 
1502
 
1528
1503
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
 
                                          Session::const_reference session,
1530
 
                                          Table &table,
 
1504
                                          Session *in_session,
 
1505
                                          Table *in_table,
1531
1506
                                          const unsigned char *old_record, 
1532
1507
                                          const unsigned char *new_record)
1533
1508
{
1534
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
 
1509
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1535
1510
 
1536
1511
  /* 
1537
1512
   * Now we construct the specialized UpdateHeader message inside
1542
1517
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1543
1518
 
1544
1519
  string schema_name;
1545
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1520
  (void) in_table->getShare()->getSchemaName(schema_name);
1546
1521
  string table_name;
1547
 
  (void) table.getShare()->getTableName(table_name);
 
1522
  (void) in_table->getShare()->getTableName(table_name);
1548
1523
 
1549
1524
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1550
1525
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1551
1526
 
1552
1527
  Field *current_field;
1553
 
  Field **table_fields= table.getFields();
 
1528
  Field **table_fields= in_table->getFields();
1554
1529
 
1555
1530
  message::FieldMetadata *field_metadata;
1556
1531
 
1557
1532
  /* We will read all the table's fields... */
1558
 
  table.setReadSet();
 
1533
  in_table->setReadSet();
1559
1534
 
1560
1535
  while ((current_field= *table_fields++) != NULL) 
1561
1536
  {
1563
1538
     * We add the "key field metadata" -- i.e. the fields which is
1564
1539
     * the primary key for the table.
1565
1540
     */
1566
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1541
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1567
1542
    {
1568
1543
      field_metadata= header->add_key_field_metadata();
1569
1544
      field_metadata->set_name(current_field->field_name);
1570
1545
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1571
1546
    }
1572
1547
 
1573
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1548
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1574
1549
    {
1575
1550
      /* Field is changed from old to new */
1576
1551
      field_metadata= header->add_set_field_metadata();
1579
1554
    }
1580
1555
  }
1581
1556
}
1582
 
void TransactionServices::updateRecord(Session::reference session,
1583
 
                                       Table &table, 
 
1557
void TransactionServices::updateRecord(Session *in_session,
 
1558
                                       Table *in_table, 
1584
1559
                                       const unsigned char *old_record, 
1585
1560
                                       const unsigned char *new_record)
1586
1561
{
1589
1564
    return;
1590
1565
 
1591
1566
  uint32_t next_segment_id= 1;
1592
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
 
1567
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1593
1568
 
1594
1569
  message::UpdateData *data= statement.mutable_update_data();
1595
1570
  data->set_segment_id(next_segment_id);
1597
1572
  message::UpdateRecord *record= data->add_record();
1598
1573
 
1599
1574
  Field *current_field;
1600
 
  Field **table_fields= table.getFields();
1601
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1575
  Field **table_fields= in_table->getFields();
 
1576
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1602
1577
  string_value->set_charset(system_charset_info);
1603
1578
 
1604
1579
  while ((current_field= *table_fields++) != NULL) 
1614
1589
     *
1615
1590
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1616
1591
     */
1617
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1592
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1618
1593
    {
1619
1594
      /* Store the original "read bit" for this field */
1620
1595
      bool is_read_set= current_field->isReadSet();
1621
1596
 
1622
1597
      /* We need to mark that we will "read" this field... */
1623
 
      table.setReadSet(current_field->position());
 
1598
      in_table->setReadSet(current_field->field_index);
1624
1599
 
1625
1600
      /* Read the string value of this field's contents */
1626
 
      string_value= current_field->val_str_internal(string_value);
 
1601
      string_value= current_field->val_str(string_value);
1627
1602
 
1628
1603
      /* 
1629
1604
       * Reset the read bit after reading field to its original state.  This 
1649
1624
     * primary key field value.  Replication only supports tables
1650
1625
     * with a primary key.
1651
1626
     */
1652
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1627
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1653
1628
    {
1654
1629
      /**
1655
1630
       * To say the below is ugly is an understatement. But it works.
1656
1631
       * 
1657
1632
       * @todo Move this crap into a real Record API.
1658
1633
       */
1659
 
      string_value= current_field->val_str_internal(string_value,
1660
 
                                                    old_record + 
1661
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
 
1634
      string_value= current_field->val_str(string_value,
 
1635
                                           old_record + 
 
1636
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1662
1637
      record->add_key_value(string_value->c_ptr(), string_value->length());
1663
1638
      string_value->free();
1664
1639
    }
1667
1642
}
1668
1643
 
1669
1644
bool TransactionServices::isFieldUpdated(Field *current_field,
1670
 
                                         Table &table,
 
1645
                                         Table *in_table,
1671
1646
                                         const unsigned char *old_record,
1672
1647
                                         const unsigned char *new_record)
1673
1648
{
1676
1651
   * we do this crazy pointer fiddling to figure out if the current field
1677
1652
   * has been updated in the supplied record raw byte pointers.
1678
1653
   */
1679
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1680
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1654
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1655
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1681
1656
 
1682
1657
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1683
1658
 
1707
1682
  return isUpdated;
1708
1683
}  
1709
1684
 
1710
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1711
 
                                                            Table &table,
 
1685
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1686
                                                            Table *in_table,
1712
1687
                                                            uint32_t *next_segment_id)
1713
1688
{
1714
 
  message::Statement *statement= session.getStatementMessage();
 
1689
  message::Statement *statement= in_session->getStatementMessage();
1715
1690
  message::Transaction *transaction= NULL;
1716
1691
 
1717
1692
  /*
1718
 
   * If statement is NULL, this is a new statement.
1719
 
   * If statement is NOT NULL, this a continuation of the same statement.
1720
 
   * This is because autocommitOrRollback() finalizes the statement so that
1721
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1722
 
   * share a single GPB message for multiple statements).
 
1693
   * Check the type for the current Statement message, if it is anything
 
1694
   * other then DELETE we need to call finalize, this will ensure a
 
1695
   * new DeleteStatement is created. If it is of type DELETE check
 
1696
   * what table the DELETE belongs to, if it is a different table
 
1697
   * call finalize, so a new DeleteStatement can be created.
1723
1698
   */
1724
 
  if (statement == NULL)
 
1699
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1725
1700
  {
1726
 
    transaction= getActiveTransactionMessage(session);
1727
 
    
1728
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1729
 
        transaction_message_threshold)
1730
 
    {
1731
 
      transaction= segmentTransactionMessage(session, transaction);
1732
 
    }
1733
 
    
1734
 
    statement= transaction->add_statement();
1735
 
    setDeleteHeader(*statement, session, table);
1736
 
    session.setStatementMessage(statement);
 
1701
    finalizeStatementMessage(*statement, in_session);
 
1702
    statement= in_session->getStatementMessage();
1737
1703
  }
1738
 
  else
 
1704
  else if (statement != NULL)
1739
1705
  {
1740
 
    transaction= getActiveTransactionMessage(session);
1741
 
    
 
1706
    transaction= getActiveTransactionMessage(in_session);
 
1707
 
1742
1708
    /*
1743
1709
     * If we've passed our threshold for the statement size (possible for
1744
1710
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1745
1711
     * the Transaction will keep it from getting huge).
1746
1712
     */
1747
1713
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1748
 
        transaction_message_threshold)
 
1714
      in_session->variables.transaction_message_threshold)
1749
1715
    {
1750
1716
      /* Remember the transaction ID so we can re-use it */
1751
1717
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
 
      uint32_t seg_id= transaction->segment_id();
1753
 
      
 
1718
 
1754
1719
      message::DeleteData *current_data= statement->mutable_delete_data();
1755
 
      
 
1720
 
1756
1721
      /* Caller should use this value when adding a new record */
1757
1722
      *next_segment_id= current_data->segment_id() + 1;
1758
 
      
 
1723
 
1759
1724
      current_data->set_end_segment(false);
1760
 
      transaction->set_end_segment(false);
1761
 
      
 
1725
 
1762
1726
      /* 
1763
1727
       * Send the trx message to replicators after finalizing the 
1764
1728
       * statement and transaction. This will also set the Transaction
1765
1729
       * and Statement objects in Session to NULL.
1766
1730
       */
1767
 
      commitTransactionMessage(session);
1768
 
      
 
1731
      commitTransactionMessage(in_session);
 
1732
 
1769
1733
      /*
1770
1734
       * Statement and Transaction should now be NULL, so new ones will get
1771
1735
       * created. We reuse the transaction id since we are segmenting
1772
1736
       * one transaction.
1773
1737
       */
1774
 
      transaction= getActiveTransactionMessage(session, false);
 
1738
      statement= in_session->getStatementMessage();
 
1739
      transaction= getActiveTransactionMessage(in_session, false);
1775
1740
      assert(transaction != NULL);
1776
 
      
1777
 
      statement= transaction->add_statement();
1778
 
      setDeleteHeader(*statement, session, table);
1779
 
      session.setStatementMessage(statement);
1780
 
      
 
1741
 
1781
1742
      /* Set the transaction ID to match the previous messages */
1782
1743
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
 
      transaction->set_segment_id(seg_id + 1);
1784
 
      transaction->set_end_segment(true);
1785
1744
    }
1786
1745
    else
1787
1746
    {
1788
 
      /*
1789
 
       * Continuation of the same statement. Carry forward the existing
1790
 
       * segment id.
1791
 
       */
1792
 
      const message::DeleteData &current_data= statement->delete_data();
1793
 
      *next_segment_id= current_data.segment_id();
 
1747
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1748
      string old_table_name= delete_header.table_metadata().table_name();
 
1749
 
 
1750
      string current_table_name;
 
1751
      (void) in_table->getShare()->getTableName(current_table_name);
 
1752
      if (current_table_name.compare(old_table_name))
 
1753
      {
 
1754
        finalizeStatementMessage(*statement, in_session);
 
1755
        statement= in_session->getStatementMessage();
 
1756
      }
 
1757
      else
 
1758
      {
 
1759
        /* carry forward the existing segment id */
 
1760
        const message::DeleteData &current_data= statement->delete_data();
 
1761
        *next_segment_id= current_data.segment_id();
 
1762
      }
1794
1763
    }
1795
1764
  }
1796
 
  
 
1765
 
 
1766
  if (statement == NULL)
 
1767
  {
 
1768
    /*
 
1769
     * Transaction will be non-NULL only if we had to segment it due to
 
1770
     * transaction size above.
 
1771
     */
 
1772
    if (transaction == NULL)
 
1773
      transaction= getActiveTransactionMessage(in_session);
 
1774
 
 
1775
    /* 
 
1776
     * Transaction message initialized and set, but no statement created
 
1777
     * yet.  We construct one and initialize it, here, then return the
 
1778
     * message after attaching the new Statement message pointer to the 
 
1779
     * Session for easy retrieval later...
 
1780
     */
 
1781
    statement= transaction->add_statement();
 
1782
    setDeleteHeader(*statement, in_session, in_table);
 
1783
    in_session->setStatementMessage(statement);
 
1784
  }
1797
1785
  return *statement;
1798
1786
}
1799
1787
 
1800
1788
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
 
                                          Session::const_reference session,
1802
 
                                          Table &table)
 
1789
                                          Session *in_session,
 
1790
                                          Table *in_table)
1803
1791
{
1804
 
  initStatementMessage(statement, message::Statement::DELETE, session);
 
1792
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1805
1793
 
1806
1794
  /* 
1807
1795
   * Now we construct the specialized DeleteHeader message inside
1811
1799
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1812
1800
 
1813
1801
  string schema_name;
1814
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1802
  (void) in_table->getShare()->getSchemaName(schema_name);
1815
1803
  string table_name;
1816
 
  (void) table.getShare()->getTableName(table_name);
 
1804
  (void) in_table->getShare()->getTableName(table_name);
1817
1805
 
1818
1806
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1819
1807
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1820
1808
 
1821
1809
  Field *current_field;
1822
 
  Field **table_fields= table.getFields();
 
1810
  Field **table_fields= in_table->getFields();
1823
1811
 
1824
1812
  message::FieldMetadata *field_metadata;
1825
1813
 
1830
1818
     * primary key field value.  Replication only supports tables
1831
1819
     * with a primary key.
1832
1820
     */
1833
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1821
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1834
1822
    {
1835
1823
      field_metadata= header->add_key_field_metadata();
1836
1824
      field_metadata->set_name(current_field->field_name);
1839
1827
  }
1840
1828
}
1841
1829
 
1842
 
void TransactionServices::deleteRecord(Session::reference session,
1843
 
                                       Table &table,
1844
 
                                       bool use_update_record)
 
1830
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1845
1831
{
1846
1832
  ReplicationServices &replication_services= ReplicationServices::singleton();
1847
1833
  if (! replication_services.isActive())
1848
1834
    return;
1849
1835
 
1850
1836
  uint32_t next_segment_id= 1;
1851
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
 
1837
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1852
1838
 
1853
1839
  message::DeleteData *data= statement.mutable_delete_data();
1854
1840
  data->set_segment_id(next_segment_id);
1856
1842
  message::DeleteRecord *record= data->add_record();
1857
1843
 
1858
1844
  Field *current_field;
1859
 
  Field **table_fields= table.getFields();
1860
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1845
  Field **table_fields= in_table->getFields();
 
1846
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
1847
  string_value->set_charset(system_charset_info);
1862
1848
 
1863
1849
  while ((current_field= *table_fields++) != NULL) 
1867
1853
     * primary key field value.  Replication only supports tables
1868
1854
     * with a primary key.
1869
1855
     */
1870
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1856
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1871
1857
    {
1872
1858
      if (use_update_record)
1873
1859
      {
1879
1865
         * We are careful not to change anything in old_ptr.
1880
1866
         */
1881
1867
        const unsigned char *old_ptr= current_field->ptr;
1882
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1883
 
        string_value= current_field->val_str_internal(string_value);
 
1868
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1869
        string_value= current_field->val_str(string_value);
1884
1870
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1885
1871
      }
1886
1872
      else
1887
1873
      {
1888
 
        string_value= current_field->val_str_internal(string_value);
 
1874
        string_value= current_field->val_str(string_value);
1889
1875
        /**
1890
1876
         * @TODO Store optional old record value in the before data member
1891
1877
         */
1896
1882
  }
1897
1883
}
1898
1884
 
1899
 
void TransactionServices::createTable(Session::reference session,
 
1885
 
 
1886
/**
 
1887
 * Template for removing Statement records of different types.
 
1888
 *
 
1889
 * The code for removing records from different Statement message types
 
1890
 * is identical except for the class types that are embedded within the
 
1891
 * Statement.
 
1892
 *
 
1893
 * There are 3 scenarios we need to look for:
 
1894
 *   - We've been asked to remove more records than exist in the Statement
 
1895
 *   - We've been asked to remove less records than exist in the Statement
 
1896
 *   - We've been asked to remove ALL records that exist in the Statement
 
1897
 *
 
1898
 * If we are removing ALL records, then effectively we would be left with
 
1899
 * an empty Statement message, so we should just remove it and clean up
 
1900
 * message pointers in the Session object.
 
1901
 */
 
1902
template <class DataType, class RecordType>
 
1903
static bool removeStatementRecordsWithType(Session *session,
 
1904
                                           DataType *data,
 
1905
                                           uint32_t count)
 
1906
{
 
1907
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1908
 
 
1909
  /* If there aren't enough records to remove 'count' of them, error. */
 
1910
  if (num_avail_recs < count)
 
1911
    return false;
 
1912
 
 
1913
  /*
 
1914
   * If we are removing all of the data records, we'll just remove this
 
1915
   * entire Statement message.
 
1916
   */
 
1917
  if (num_avail_recs == count)
 
1918
  {
 
1919
    message::Transaction *transaction= session->getTransactionMessage();
 
1920
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1921
    statements->RemoveLast();
 
1922
 
 
1923
    /*
 
1924
     * Now need to set the Session Statement pointer to either the previous
 
1925
     * Statement, or NULL if there isn't one.
 
1926
     */
 
1927
    if (statements->size() == 0)
 
1928
    {
 
1929
      session->setStatementMessage(NULL);
 
1930
    }
 
1931
    else
 
1932
    {
 
1933
      /*
 
1934
       * There isn't a great way to get a pointer to the previous Statement
 
1935
       * message using the RepeatedPtrField object, so we'll just get to it
 
1936
       * using the Transaction message.
 
1937
       */
 
1938
      int last_stmt_idx= transaction->statement_size() - 1;
 
1939
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1940
    }
 
1941
  }
 
1942
  /* We only need to remove 'count' records */
 
1943
  else if (num_avail_recs > count)
 
1944
  {
 
1945
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1946
    while (count--)
 
1947
      records->RemoveLast();
 
1948
  }
 
1949
 
 
1950
  return true;
 
1951
}
 
1952
 
 
1953
 
 
1954
bool TransactionServices::removeStatementRecords(Session *session,
 
1955
                                                 uint32_t count)
 
1956
{
 
1957
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1958
  if (! replication_services.isActive())
 
1959
    return false;
 
1960
 
 
1961
  /* Get the most current Statement */
 
1962
  message::Statement *statement= session->getStatementMessage();
 
1963
 
 
1964
  /* Make sure we have work to do */
 
1965
  if (statement == NULL)
 
1966
    return false;
 
1967
 
 
1968
  bool retval= false;
 
1969
 
 
1970
  switch (statement->type())
 
1971
  {
 
1972
    case message::Statement::INSERT:
 
1973
    {
 
1974
      message::InsertData *data= statement->mutable_insert_data();
 
1975
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
1976
      break;
 
1977
    }
 
1978
 
 
1979
    case message::Statement::UPDATE:
 
1980
    {
 
1981
      message::UpdateData *data= statement->mutable_update_data();
 
1982
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
1983
      break;
 
1984
    }
 
1985
 
 
1986
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
1987
    {
 
1988
      message::DeleteData *data= statement->mutable_delete_data();
 
1989
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
1990
      break;
 
1991
    }
 
1992
 
 
1993
    default:
 
1994
      retval= false;
 
1995
      break;
 
1996
  }
 
1997
 
 
1998
  return retval;
 
1999
}
 
2000
 
 
2001
 
 
2002
void TransactionServices::createTable(Session *in_session,
1900
2003
                                      const message::Table &table)
1901
2004
{
1902
2005
  ReplicationServices &replication_services= ReplicationServices::singleton();
1903
2006
  if (! replication_services.isActive())
1904
2007
    return;
1905
2008
  
1906
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2009
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1907
2010
  message::Statement *statement= transaction->add_statement();
1908
2011
 
1909
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
 
2012
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1910
2013
 
1911
2014
  /* 
1912
2015
   * Construct the specialized CreateTableStatement message and attach
1916
2019
  message::Table *new_table_message= create_table_statement->mutable_table();
1917
2020
  *new_table_message= table;
1918
2021
 
1919
 
  finalizeStatementMessage(*statement, session);
 
2022
  finalizeStatementMessage(*statement, in_session);
1920
2023
 
1921
 
  finalizeTransactionMessage(*transaction, session);
 
2024
  finalizeTransactionMessage(*transaction, in_session);
1922
2025
  
1923
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2026
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1924
2027
 
1925
 
  cleanupTransactionMessage(transaction, session);
 
2028
  cleanupTransactionMessage(transaction, in_session);
1926
2029
 
1927
2030
}
1928
2031
 
1929
 
void TransactionServices::createSchema(Session::reference session,
 
2032
void TransactionServices::createSchema(Session *in_session,
1930
2033
                                       const message::Schema &schema)
1931
2034
{
1932
2035
  ReplicationServices &replication_services= ReplicationServices::singleton();
1933
2036
  if (! replication_services.isActive())
1934
2037
    return;
1935
2038
  
1936
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2039
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1937
2040
  message::Statement *statement= transaction->add_statement();
1938
2041
 
1939
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
 
2042
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1940
2043
 
1941
2044
  /* 
1942
2045
   * Construct the specialized CreateSchemaStatement message and attach
1946
2049
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
2050
  *new_schema_message= schema;
1948
2051
 
1949
 
  finalizeStatementMessage(*statement, session);
 
2052
  finalizeStatementMessage(*statement, in_session);
1950
2053
 
1951
 
  finalizeTransactionMessage(*transaction, session);
 
2054
  finalizeTransactionMessage(*transaction, in_session);
1952
2055
  
1953
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2056
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1954
2057
 
1955
 
  cleanupTransactionMessage(transaction, session);
 
2058
  cleanupTransactionMessage(transaction, in_session);
1956
2059
 
1957
2060
}
1958
2061
 
1959
 
void TransactionServices::dropSchema(Session::reference session,
1960
 
                                     identifier::Schema::const_reference identifier)
 
2062
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1961
2063
{
1962
2064
  ReplicationServices &replication_services= ReplicationServices::singleton();
1963
2065
  if (! replication_services.isActive())
1964
2066
    return;
1965
2067
  
1966
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2068
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1967
2069
  message::Statement *statement= transaction->add_statement();
1968
2070
 
1969
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
 
2071
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1970
2072
 
1971
2073
  /* 
1972
2074
   * Construct the specialized DropSchemaStatement message and attach
1974
2076
   */
1975
2077
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1976
2078
 
1977
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1978
 
 
1979
 
  finalizeStatementMessage(*statement, session);
1980
 
 
1981
 
  finalizeTransactionMessage(*transaction, session);
1982
 
  
1983
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1984
 
 
1985
 
  cleanupTransactionMessage(transaction, session);
1986
 
}
1987
 
 
1988
 
void TransactionServices::alterSchema(Session::reference session,
1989
 
                                      const message::schema::shared_ptr &old_schema,
1990
 
                                      const message::Schema &new_schema)
1991
 
{
1992
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1993
 
  if (! replication_services.isActive())
1994
 
    return;
1995
 
  
1996
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1997
 
  message::Statement *statement= transaction->add_statement();
1998
 
 
1999
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2000
 
 
2001
 
  /* 
2002
 
   * Construct the specialized AlterSchemaStatement message and attach
2003
 
   * it to the generic Statement message
2004
 
   */
2005
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2006
 
 
2007
 
  message::Schema *before= alter_schema_statement->mutable_before();
2008
 
  message::Schema *after= alter_schema_statement->mutable_after();
2009
 
 
2010
 
  *before= *old_schema;
2011
 
  *after= new_schema;
2012
 
 
2013
 
  finalizeStatementMessage(*statement, session);
2014
 
 
2015
 
  finalizeTransactionMessage(*transaction, session);
2016
 
  
2017
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2018
 
 
2019
 
  cleanupTransactionMessage(transaction, session);
2020
 
}
2021
 
 
2022
 
void TransactionServices::dropTable(Session::reference session,
2023
 
                                    const identifier::Table &table,
 
2079
  drop_schema_statement->set_schema_name(schema_name);
 
2080
 
 
2081
  finalizeStatementMessage(*statement, in_session);
 
2082
 
 
2083
  finalizeTransactionMessage(*transaction, in_session);
 
2084
  
 
2085
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2086
 
 
2087
  cleanupTransactionMessage(transaction, in_session);
 
2088
}
 
2089
 
 
2090
void TransactionServices::dropTable(Session *in_session,
 
2091
                                    const string &schema_name,
 
2092
                                    const string &table_name,
2024
2093
                                    bool if_exists)
2025
2094
{
2026
2095
  ReplicationServices &replication_services= ReplicationServices::singleton();
2027
2096
  if (! replication_services.isActive())
2028
2097
    return;
2029
2098
  
2030
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2099
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2031
2100
  message::Statement *statement= transaction->add_statement();
2032
2101
 
2033
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
 
2102
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2034
2103
 
2035
2104
  /* 
2036
2105
   * Construct the specialized DropTableStatement message and attach
2042
2111
 
2043
2112
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2044
2113
 
2045
 
  table_metadata->set_schema_name(table.getSchemaName());
2046
 
  table_metadata->set_table_name(table.getTableName());
2047
 
 
2048
 
  finalizeStatementMessage(*statement, session);
2049
 
 
2050
 
  finalizeTransactionMessage(*transaction, session);
 
2114
  table_metadata->set_schema_name(schema_name);
 
2115
  table_metadata->set_table_name(table_name);
 
2116
 
 
2117
  finalizeStatementMessage(*statement, in_session);
 
2118
 
 
2119
  finalizeTransactionMessage(*transaction, in_session);
2051
2120
  
2052
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2121
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2053
2122
 
2054
 
  cleanupTransactionMessage(transaction, session);
 
2123
  cleanupTransactionMessage(transaction, in_session);
2055
2124
}
2056
2125
 
2057
 
void TransactionServices::truncateTable(Session::reference session,
2058
 
                                        Table &table)
 
2126
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2059
2127
{
2060
2128
  ReplicationServices &replication_services= ReplicationServices::singleton();
2061
2129
  if (! replication_services.isActive())
2062
2130
    return;
2063
2131
  
2064
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2132
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2065
2133
  message::Statement *statement= transaction->add_statement();
2066
2134
 
2067
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
 
2135
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2068
2136
 
2069
2137
  /* 
2070
2138
   * Construct the specialized TruncateTableStatement message and attach
2074
2142
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2075
2143
 
2076
2144
  string schema_name;
2077
 
  (void) table.getShare()->getSchemaName(schema_name);
 
2145
  (void) in_table->getShare()->getSchemaName(schema_name);
2078
2146
  string table_name;
2079
 
  (void) table.getShare()->getTableName(table_name);
 
2147
  (void) in_table->getShare()->getTableName(table_name);
2080
2148
 
2081
2149
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
2150
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2083
2151
 
2084
 
  finalizeStatementMessage(*statement, session);
 
2152
  finalizeStatementMessage(*statement, in_session);
2085
2153
 
2086
 
  finalizeTransactionMessage(*transaction, session);
 
2154
  finalizeTransactionMessage(*transaction, in_session);
2087
2155
  
2088
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2156
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2089
2157
 
2090
 
  cleanupTransactionMessage(transaction, session);
 
2158
  cleanupTransactionMessage(transaction, in_session);
2091
2159
}
2092
2160
 
2093
 
void TransactionServices::rawStatement(Session::reference session,
2094
 
                                       const string &query)
 
2161
void TransactionServices::rawStatement(Session *in_session, const string &query)
2095
2162
{
2096
2163
  ReplicationServices &replication_services= ReplicationServices::singleton();
2097
2164
  if (! replication_services.isActive())
2098
2165
    return;
2099
2166
 
2100
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2167
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2101
2168
  message::Statement *statement= transaction->add_statement();
2102
2169
 
2103
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
 
2170
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2104
2171
  statement->set_sql(query);
2105
 
  finalizeStatementMessage(*statement, session);
 
2172
  finalizeStatementMessage(*statement, in_session);
2106
2173
 
2107
 
  finalizeTransactionMessage(*transaction, session);
 
2174
  finalizeTransactionMessage(*transaction, in_session);
2108
2175
  
2109
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2176
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2110
2177
 
2111
 
  cleanupTransactionMessage(transaction, session);
 
2178
  cleanupTransactionMessage(transaction, in_session);
2112
2179
}
2113
2180
 
2114
 
int TransactionServices::sendEvent(Session::reference session,
2115
 
                                   const message::Event &event)
 
2181
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2116
2182
{
2117
2183
  ReplicationServices &replication_services= ReplicationServices::singleton();
2118
2184
  if (! replication_services.isActive())
2130
2196
 
2131
2197
  trx_event->CopyFrom(event);
2132
2198
 
2133
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
2199
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2134
2200
 
2135
2201
  delete transaction;
2136
2202
 
2137
2203
  return static_cast<int>(result);
2138
2204
}
2139
2205
 
2140
 
bool TransactionServices::sendStartupEvent(Session::reference session)
 
2206
bool TransactionServices::sendStartupEvent(Session *session)
2141
2207
{
2142
2208
  message::Event event;
2143
2209
  event.set_type(message::Event::STARTUP);
2146
2212
  return true;
2147
2213
}
2148
2214
 
2149
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
 
2215
bool TransactionServices::sendShutdownEvent(Session *session)
2150
2216
{
2151
2217
  message::Event event;
2152
2218
  event.set_type(message::Event::SHUTDOWN);