~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Lee Bieber
  • Date: 2011-03-13 16:37:38 UTC
  • mfrom: (2227.4.18 session2)
  • Revision ID: kalebral@gmail.com-20110313163738-7ti21zk40o2xi3ew
Merge Olaf - Refactor Session

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008 Sun Microsystems, Inc.
 
5
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  This program is free software; you can redistribute it and/or modify
8
8
 *  it under the terms of the GNU General Public License as published by
47
47
 * plugins can understand.
48
48
 */
49
49
 
50
 
#include "config.h"
51
 
#include "drizzled/my_hash.h"
52
 
#include "drizzled/error.h"
53
 
#include "drizzled/gettext.h"
54
 
#include "drizzled/probes.h"
55
 
#include "drizzled/sql_parse.h"
56
 
#include "drizzled/session.h"
57
 
#include "drizzled/sql_base.h"
58
 
#include "drizzled/replication_services.h"
59
 
#include "drizzled/transaction_services.h"
60
 
#include "drizzled/transaction_context.h"
61
 
#include "drizzled/message/transaction.pb.h"
62
 
#include "drizzled/message/statement_transform.h"
63
 
#include "drizzled/resource_context.h"
64
 
#include "drizzled/lock.h"
65
 
#include "drizzled/item/int.h"
66
 
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/timestamp.h"
68
 
#include "drizzled/plugin/client.h"
69
 
#include "drizzled/plugin/monitored_in_transaction.h"
70
 
#include "drizzled/plugin/transactional_storage_engine.h"
71
 
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/plugin/xa_storage_engine.h"
73
 
#include "drizzled/internal/my_sys.h"
 
50
#include <config.h>
 
51
#include <drizzled/current_session.h>
 
52
#include <drizzled/error.h>
 
53
#include <drizzled/gettext.h>
 
54
#include <drizzled/probes.h>
 
55
#include <drizzled/sql_parse.h>
 
56
#include <drizzled/session.h>
 
57
#include <drizzled/sql_base.h>
 
58
#include <drizzled/replication_services.h>
 
59
#include <drizzled/transaction_services.h>
 
60
#include <drizzled/transaction_context.h>
 
61
#include <drizzled/message/transaction.pb.h>
 
62
#include <drizzled/message/statement_transform.h>
 
63
#include <drizzled/resource_context.h>
 
64
#include <drizzled/lock.h>
 
65
#include <drizzled/item/int.h>
 
66
#include <drizzled/item/empty_string.h>
 
67
#include <drizzled/field/epoch.h>
 
68
#include <drizzled/plugin/client.h>
 
69
#include <drizzled/plugin/monitored_in_transaction.h>
 
70
#include <drizzled/plugin/transactional_storage_engine.h>
 
71
#include <drizzled/plugin/xa_resource_manager.h>
 
72
#include <drizzled/plugin/xa_storage_engine.h>
 
73
#include <drizzled/internal/my_sys.h>
74
74
 
75
75
#include <vector>
76
76
#include <algorithm>
313
313
  }
314
314
}
315
315
 
316
 
void TransactionServices::registerResourceForStatement(Session *session,
 
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
317
                                                       plugin::MonitoredInTransaction *monitored,
318
318
                                                       plugin::TransactionalStorageEngine *engine)
319
319
{
320
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
320
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
321
  {
322
322
    /* 
323
323
     * Now we automatically register this resource manager for the
329
329
    registerResourceForTransaction(session, monitored, engine);
330
330
  }
331
331
 
332
 
  TransactionContext *trans= &session->transaction.stmt;
333
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
332
  TransactionContext *trans= &session.transaction.stmt;
 
333
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
334
334
 
335
335
  if (resource_context->isStarted())
336
336
    return; /* already registered, return */
345
345
  trans->no_2pc|= true;
346
346
}
347
347
 
348
 
void TransactionServices::registerResourceForStatement(Session *session,
 
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
349
                                                       plugin::MonitoredInTransaction *monitored,
350
350
                                                       plugin::TransactionalStorageEngine *engine,
351
351
                                                       plugin::XaResourceManager *resource_manager)
352
352
{
353
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
353
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
354
  {
355
355
    /* 
356
356
     * Now we automatically register this resource manager for the
362
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
363
  }
364
364
 
365
 
  TransactionContext *trans= &session->transaction.stmt;
366
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
 
365
  TransactionContext *trans= &session.transaction.stmt;
 
366
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
367
367
 
368
368
  if (resource_context->isStarted())
369
369
    return; /* already registered, return */
379
379
  trans->no_2pc|= false;
380
380
}
381
381
 
382
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
383
                                                         plugin::MonitoredInTransaction *monitored,
384
384
                                                         plugin::TransactionalStorageEngine *engine)
385
385
{
386
 
  TransactionContext *trans= &session->transaction.all;
387
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
386
  TransactionContext *trans= &session.transaction.all;
 
387
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
388
388
 
389
389
  if (resource_context->isStarted())
390
390
    return; /* already registered, return */
391
391
 
392
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
392
  session.server_status|= SERVER_STATUS_IN_TRANS;
393
393
 
394
394
  trans->registerResource(resource_context);
395
395
 
400
400
  resource_context->setTransactionalStorageEngine(engine);
401
401
  trans->no_2pc|= true;
402
402
 
403
 
  if (session->transaction.xid_state.xid.is_null())
404
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
403
  if (session.transaction.xid_state.xid.is_null())
 
404
    session.transaction.xid_state.xid.set(session.getQueryId());
405
405
 
406
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
407
  if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
    registerResourceForStatement(session, monitored, engine);
409
409
}
410
410
 
411
 
void TransactionServices::registerResourceForTransaction(Session *session,
 
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
                                                         plugin::MonitoredInTransaction *monitored,
413
413
                                                         plugin::TransactionalStorageEngine *engine,
414
414
                                                         plugin::XaResourceManager *resource_manager)
415
415
{
416
 
  TransactionContext *trans= &session->transaction.all;
417
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
 
416
  TransactionContext *trans= &session.transaction.all;
 
417
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
418
418
 
419
419
  if (resource_context->isStarted())
420
420
    return; /* already registered, return */
421
421
 
422
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
422
  session.server_status|= SERVER_STATUS_IN_TRANS;
423
423
 
424
424
  trans->registerResource(resource_context);
425
425
 
430
430
  resource_context->setTransactionalStorageEngine(engine);
431
431
  trans->no_2pc|= true;
432
432
 
433
 
  if (session->transaction.xid_state.xid.is_null())
434
 
    session->transaction.xid_state.xid.set(session->getQueryId());
 
433
  if (session.transaction.xid_state.xid.is_null())
 
434
    session.transaction.xid_state.xid.set(session.getQueryId());
435
435
 
436
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
436
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
437
437
 
438
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
 
439
  if (! session.getResourceContext(monitored, 0)->isStarted())
440
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
441
}
442
442
 
453
453
  my_session->setXaId(xa_id);
454
454
}
455
455
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
457
457
{
458
 
  if (session->getXaId() == 0)
 
458
  if (session.getXaId() == 0)
459
459
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
460
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
461
461
  }
462
462
 
463
 
  return session->getXaId();
 
463
  return session.getXaId();
464
464
}
465
465
 
466
 
/**
467
 
  @retval
468
 
    0   ok
469
 
  @retval
470
 
    1   transaction was rolled back
471
 
  @retval
472
 
    2   error during commit, data may be inconsistent
473
 
 
474
 
  @todo
475
 
    Since we don't support nested statement transactions in 5.0,
476
 
    we can't commit or rollback stmt transactions while we are inside
477
 
    stored functions or triggers. So we simply do nothing now.
478
 
    TODO: This should be fixed in later ( >= 5.1) releases.
479
 
*/
480
 
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
 
466
int TransactionServices::commitTransaction(Session::reference session,
 
467
                                           bool normal_transaction)
481
468
{
482
469
  int error= 0, cookie= 0;
483
470
  /*
484
471
    'all' means that this is either an explicit commit issued by
485
472
    user, or an implicit commit issued by a DDL.
486
473
  */
487
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
474
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
489
476
 
490
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
477
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
491
478
 
492
479
  /*
493
480
    We must not commit the normal transaction if a statement
495
482
    flags will not get propagated to its normal transaction's
496
483
    counterpart.
497
484
  */
498
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
499
 
              trans == &session->transaction.stmt);
 
485
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
486
              trans == &session.transaction.stmt);
500
487
 
501
488
  if (resource_contexts.empty() == false)
502
489
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
490
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
504
491
    {
505
492
      rollbackTransaction(session, normal_transaction);
506
493
      return 1;
531
518
 
532
519
        if (resource->participatesInXaTransaction())
533
520
        {
534
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
 
521
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
535
522
          {
536
523
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
537
524
            error= 1;
538
525
          }
539
526
          else
540
527
          {
541
 
            session->status_var.ha_prepare_count++;
 
528
            session.status_var.ha_prepare_count++;
542
529
          }
543
530
        }
544
531
      }
560
547
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
548
end:
562
549
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
 
550
      session.startWaitingGlobalReadLock();
564
551
  }
565
552
  return error;
566
553
}
569
556
  @note
570
557
  This function does not care about global read lock. A caller should.
571
558
*/
572
 
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
 
559
int TransactionServices::commitPhaseOne(Session::reference session,
 
560
                                        bool normal_transaction)
573
561
{
574
562
  int error=0;
575
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
563
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
576
564
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
577
565
 
578
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
566
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
567
  bool all= normal_transaction;
 
568
 
 
569
  /* If we're in autocommit then we have a real transaction to commit
 
570
     (except if it's BEGIN)
 
571
  */
 
572
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
573
    all= true;
579
574
 
580
575
  if (resource_contexts.empty() == false)
581
576
  {
590
585
 
591
586
      if (resource->participatesInXaTransaction())
592
587
      {
593
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
 
588
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
594
589
        {
595
590
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
596
591
          error= 1;
597
592
        }
598
593
        else if (normal_transaction)
599
594
        {
600
 
          session->status_var.ha_commit_count++;
 
595
          session.status_var.ha_commit_count++;
601
596
        }
602
597
      }
603
598
      else if (resource->participatesInSqlTransaction())
604
599
      {
605
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
 
600
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
606
601
        {
607
602
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
608
603
          error= 1;
609
604
        }
610
605
        else if (normal_transaction)
611
606
        {
612
 
          session->status_var.ha_commit_count++;
 
607
          session.status_var.ha_commit_count++;
613
608
        }
614
609
      }
615
610
      resource_context->reset(); /* keep it conveniently zero-filled */
616
611
    }
617
612
 
618
613
    if (is_real_trans)
619
 
      session->transaction.xid_state.xid.null();
 
614
      session.transaction.xid_state.xid.null();
620
615
 
621
616
    if (normal_transaction)
622
617
    {
623
 
      session->variables.tx_isolation= session->session_tx_isolation;
624
 
      session->transaction.cleanup();
 
618
      session.variables.tx_isolation= session.session_tx_isolation;
 
619
      session.transaction.cleanup();
625
620
    }
626
621
  }
627
622
  trans->reset();
628
623
  return error;
629
624
}
630
625
 
631
 
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
 
626
int TransactionServices::rollbackTransaction(Session::reference session,
 
627
                                             bool normal_transaction)
632
628
{
633
629
  int error= 0;
634
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
630
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
635
631
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
636
632
 
637
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
633
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
634
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
638
635
 
639
636
  /*
640
637
    We must not rollback the normal transaction if a statement
641
638
    transaction is pending.
642
639
  */
643
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
644
 
              trans == &session->transaction.stmt);
 
640
  assert(session.transaction.stmt.getResourceContexts().empty() ||
 
641
              trans == &session.transaction.stmt);
645
642
 
646
643
  if (resource_contexts.empty() == false)
647
644
  {
656
653
 
657
654
      if (resource->participatesInXaTransaction())
658
655
      {
659
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
 
656
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
660
657
        {
661
658
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
659
          error= 1;
663
660
        }
664
661
        else if (normal_transaction)
665
662
        {
666
 
          session->status_var.ha_rollback_count++;
 
663
          session.status_var.ha_rollback_count++;
667
664
        }
668
665
      }
669
666
      else if (resource->participatesInSqlTransaction())
670
667
      {
671
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
 
668
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
672
669
        {
673
670
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
671
          error= 1;
675
672
        }
676
673
        else if (normal_transaction)
677
674
        {
678
 
          session->status_var.ha_rollback_count++;
 
675
          session.status_var.ha_rollback_count++;
679
676
        }
680
677
      }
681
678
      resource_context->reset(); /* keep it conveniently zero-filled */
688
685
     * a rollback statement with the corresponding transaction ID
689
686
     * to rollback.
690
687
     */
691
 
    if (normal_transaction)
 
688
    if (all)
692
689
      rollbackTransactionMessage(session);
693
690
    else
694
691
      rollbackStatementMessage(session);
695
692
 
696
693
    if (is_real_trans)
697
 
      session->transaction.xid_state.xid.null();
 
694
      session.transaction.xid_state.xid.null();
698
695
    if (normal_transaction)
699
696
    {
700
 
      session->variables.tx_isolation=session->session_tx_isolation;
701
 
      session->transaction.cleanup();
 
697
      session.variables.tx_isolation=session.session_tx_isolation;
 
698
      session.transaction.cleanup();
702
699
    }
703
700
  }
704
701
  if (normal_transaction)
705
 
    session->transaction_rollback_request= false;
 
702
    session.transaction_rollback_request= false;
706
703
 
707
704
  /*
708
705
   * If a non-transactional table was updated, warn the user
709
706
   */
710
707
  if (is_real_trans &&
711
 
      session->transaction.all.hasModifiedNonTransData() &&
712
 
      session->getKilled() != Session::KILL_CONNECTION)
 
708
      session.transaction.all.hasModifiedNonTransData() &&
 
709
      session.getKilled() != Session::KILL_CONNECTION)
713
710
  {
714
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
711
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
712
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
716
713
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
717
714
  }
719
716
  return error;
720
717
}
721
718
 
722
 
/**
723
 
  This is used to commit or rollback a single statement depending on
724
 
  the value of error.
725
 
 
726
 
  @note
727
 
    Note that if the autocommit is on, then the following call inside
728
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
729
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
730
 
    the user has used LOCK TABLES then that mechanism does not know to do the
731
 
    commit.
732
 
*/
733
 
int TransactionServices::autocommitOrRollback(Session *session, int error)
 
719
int TransactionServices::autocommitOrRollback(Session::reference session,
 
720
                                              int error)
734
721
{
735
722
  /* One GPB Statement message per SQL statement */
736
 
  message::Statement *statement= session->getStatementMessage();
 
723
  message::Statement *statement= session.getStatementMessage();
737
724
  if ((statement != NULL) && (! error))
738
725
    finalizeStatementMessage(*statement, session);
739
726
 
740
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
 
727
  if (session.transaction.stmt.getResourceContexts().empty() == false)
741
728
  {
742
 
    TransactionContext *trans = &session->transaction.stmt;
 
729
    TransactionContext *trans = &session.transaction.stmt;
743
730
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
731
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
732
         it != resource_contexts.end();
747
734
    {
748
735
      ResourceContext *resource_context= *it;
749
736
 
750
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
737
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
751
738
    }
752
739
 
753
740
    if (! error)
758
745
    else
759
746
    {
760
747
      (void) rollbackTransaction(session, false);
761
 
      if (session->transaction_rollback_request)
 
748
      if (session.transaction_rollback_request)
 
749
      {
762
750
        (void) rollbackTransaction(session, true);
 
751
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
 
752
      }
763
753
    }
764
754
 
765
 
    session->variables.tx_isolation= session->session_tx_isolation;
 
755
    session.variables.tx_isolation= session.session_tx_isolation;
766
756
  }
 
757
 
767
758
  return error;
768
759
}
769
760
 
777
768
  }
778
769
};
779
770
 
780
 
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
 
771
int TransactionServices::rollbackToSavepoint(Session::reference session,
 
772
                                             NamedSavepoint &sv)
781
773
{
782
774
  int error= 0;
783
 
  TransactionContext *trans= &session->transaction.all;
 
775
  TransactionContext *trans= &session.transaction.all;
784
776
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
785
777
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
786
778
 
800
792
 
801
793
    if (resource->participatesInSqlTransaction())
802
794
    {
803
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
 
795
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
804
796
      {
805
797
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
806
798
        error= 1;
807
799
      }
808
800
      else
809
801
      {
810
 
        session->status_var.ha_savepoint_rollback_count++;
 
802
        session.status_var.ha_savepoint_rollback_count++;
811
803
      }
812
804
    }
813
805
    trans->no_2pc|= not resource->participatesInXaTransaction();
857
849
 
858
850
      if (resource->participatesInSqlTransaction())
859
851
      {
860
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
 
852
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
861
853
        {
862
854
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
863
855
          error= 1;
864
856
        }
865
857
        else
866
858
        {
867
 
          session->status_var.ha_rollback_count++;
 
859
          session.status_var.ha_rollback_count++;
868
860
        }
869
861
      }
870
862
      resource_context->reset(); /* keep it conveniently zero-filled */
887
879
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
888
880
      if (num_statements == 0)
889
881
      {    
890
 
        session->setStatementMessage(NULL);
 
882
        session.setStatementMessage(NULL);
891
883
      }    
892
884
      else 
893
885
      {
894
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
886
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
895
887
      }    
896
 
      session->setTransactionMessage(savepoint_transaction_copy);
 
888
      session.setTransactionMessage(savepoint_transaction_copy);
897
889
    }
898
890
  }
899
891
 
906
898
  section "4.33.4 SQL-statements and transaction states",
907
899
  NamedSavepoint is *not* transaction-initiating SQL-statement
908
900
*/
909
 
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
 
901
int TransactionServices::setSavepoint(Session::reference session,
 
902
                                      NamedSavepoint &sv)
910
903
{
911
904
  int error= 0;
912
 
  TransactionContext *trans= &session->transaction.all;
 
905
  TransactionContext *trans= &session.transaction.all;
913
906
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
914
907
 
915
908
  if (resource_contexts.empty() == false)
925
918
 
926
919
      if (resource->participatesInSqlTransaction())
927
920
      {
928
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
 
921
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
929
922
        {
930
923
          my_error(ER_GET_ERRNO, MYF(0), err);
931
924
          error= 1;
932
925
        }
933
926
        else
934
927
        {
935
 
          session->status_var.ha_savepoint_count++;
 
928
          session.status_var.ha_savepoint_count++;
936
929
        }
937
930
      }
938
931
    }
944
937
 
945
938
  if (shouldConstructMessages())
946
939
  {
947
 
    message::Transaction *transaction= session->getTransactionMessage();
 
940
    message::Transaction *transaction= session.getTransactionMessage();
948
941
                  
949
942
    if (transaction != NULL)
950
943
    {
957
950
  return error;
958
951
}
959
952
 
960
 
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
 
953
int TransactionServices::releaseSavepoint(Session::reference session,
 
954
                                          NamedSavepoint &sv)
961
955
{
962
956
  int error= 0;
963
957
 
974
968
 
975
969
    if (resource->participatesInSqlTransaction())
976
970
    {
977
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
 
971
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
978
972
      {
979
973
        my_error(ER_GET_ERRNO, MYF(0), err);
980
974
        error= 1;
991
985
  return replication_services.isActive();
992
986
}
993
987
 
994
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
 
989
                                                                       bool should_inc_trx_id)
995
990
{
996
 
  message::Transaction *transaction= in_session->getTransactionMessage();
 
991
  message::Transaction *transaction= session.getTransactionMessage();
997
992
 
998
993
  if (unlikely(transaction == NULL))
999
994
  {
1003
998
     * deleting transaction message when done with it.
1004
999
     */
1005
1000
    transaction= new (nothrow) message::Transaction();
1006
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
 
    in_session->setTransactionMessage(transaction);
 
1001
    initTransactionMessage(*transaction, session, should_inc_trx_id);
 
1002
    session.setTransactionMessage(transaction);
1008
1003
    return transaction;
1009
1004
  }
1010
1005
  else
1011
1006
    return transaction;
1012
1007
}
1013
1008
 
1014
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
 
                                                 Session *in_session,
 
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
 
1010
                                                 Session::reference session,
1016
1011
                                                 bool should_inc_trx_id)
1017
1012
{
1018
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
 
  trx->set_server_id(in_session->getServerId());
 
1013
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1014
  trx->set_server_id(session.getServerId());
1020
1015
 
1021
1016
  if (should_inc_trx_id)
1022
1017
  {
1023
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
 
    in_session->setXaId(0);
1025
 
  }  
 
1018
    trx->set_transaction_id(getCurrentTransactionId(session));
 
1019
    session.setXaId(0);
 
1020
  }
1026
1021
  else
1027
 
  { 
 
1022
  {
 
1023
    /* trx and seg id will get set properly elsewhere */
1028
1024
    trx->set_transaction_id(0);
1029
1025
  }
1030
1026
 
1031
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1032
 
}
1033
 
 
1034
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
 
                                              Session *in_session)
1036
 
{
1037
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1039
 
}
1040
 
 
1041
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
 
                                             Session *in_session)
1043
 
{
1044
 
  delete in_transaction;
1045
 
  in_session->setStatementMessage(NULL);
1046
 
  in_session->setTransactionMessage(NULL);
1047
 
  in_session->setXaId(0);
1048
 
}
1049
 
 
1050
 
int TransactionServices::commitTransactionMessage(Session *in_session)
 
1027
  trx->set_start_timestamp(session.getCurrentTimestamp());
 
1028
  
 
1029
  /* segment info may get set elsewhere as needed */
 
1030
  transaction.set_segment_id(1);
 
1031
  transaction.set_end_segment(true);
 
1032
}
 
1033
 
 
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
 
1035
                                                     Session::const_reference session)
 
1036
{
 
1037
  message::TransactionContext *trx= transaction.mutable_transaction_context();
 
1038
  trx->set_end_timestamp(session.getCurrentTimestamp());
 
1039
}
 
1040
 
 
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
 
1042
                                                    Session::reference session)
 
1043
{
 
1044
  delete transaction;
 
1045
  session.setStatementMessage(NULL);
 
1046
  session.setTransactionMessage(NULL);
 
1047
  session.setXaId(0);
 
1048
}
 
1049
 
 
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1051
1051
{
1052
1052
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1053
  if (! replication_services.isActive())
1057
1057
   * If no Transaction message was ever created, then no data modification
1058
1058
   * occurred inside the transaction, so nothing to do.
1059
1059
   */
1060
 
  if (in_session->getTransactionMessage() == NULL)
 
1060
  if (session.getTransactionMessage() == NULL)
1061
1061
    return 0;
1062
1062
  
1063
1063
  /* If there is an active statement message, finalize it. */
1064
 
  message::Statement *statement= in_session->getStatementMessage();
 
1064
  message::Statement *statement= session.getStatementMessage();
1065
1065
 
1066
1066
  if (statement != NULL)
1067
1067
  {
1068
 
    finalizeStatementMessage(*statement, in_session);
 
1068
    finalizeStatementMessage(*statement, session);
1069
1069
  }
1070
1070
 
1071
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1071
  message::Transaction* transaction= getActiveTransactionMessage(session);
1072
1072
 
1073
1073
  /*
1074
1074
   * It is possible that we could have a Transaction without any Statements
1078
1078
   */
1079
1079
  if (transaction->statement_size() == 0)
1080
1080
  {
1081
 
    cleanupTransactionMessage(transaction, in_session);
 
1081
    cleanupTransactionMessage(transaction, session);
1082
1082
    return 0;
1083
1083
  }
1084
1084
  
1085
 
  finalizeTransactionMessage(*transaction, in_session);
 
1085
  finalizeTransactionMessage(*transaction, session);
1086
1086
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1087
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1088
1088
 
1089
 
  cleanupTransactionMessage(transaction, in_session);
 
1089
  cleanupTransactionMessage(transaction, session);
1090
1090
 
1091
1091
  return static_cast<int>(result);
1092
1092
}
1093
1093
 
1094
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
 
                                        message::Statement::Type in_type,
1096
 
                                        Session *in_session)
 
1095
                                               message::Statement::Type type,
 
1096
                                               Session::const_reference session)
1097
1097
{
1098
 
  statement.set_type(in_type);
1099
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1098
  statement.set_type(type);
 
1099
  statement.set_start_timestamp(session.getCurrentTimestamp());
1100
1100
 
1101
 
  if (in_session->variables.replicate_query)
1102
 
    statement.set_sql(in_session->getQueryString()->c_str());
 
1101
  if (session.variables.replicate_query)
 
1102
    statement.set_sql(session.getQueryString()->c_str());
1103
1103
}
1104
1104
 
1105
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                            Session *in_session)
 
1106
                                                   Session::reference session)
1107
1107
{
1108
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
 
  in_session->setStatementMessage(NULL);
 
1108
  statement.set_end_timestamp(session.getCurrentTimestamp());
 
1109
  session.setStatementMessage(NULL);
1110
1110
}
1111
1111
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1113
1113
{
1114
1114
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1115
  if (! replication_services.isActive())
1116
1116
    return;
1117
1117
  
1118
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1118
  message::Transaction *transaction= getActiveTransactionMessage(session);
1119
1119
 
1120
1120
  /*
1121
1121
   * OK, so there are two situations that we need to deal with here:
1138
1138
  {
1139
1139
    /* Remember the transaction ID so we can re-use it */
1140
1140
    uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1141
    uint32_t seg_id= transaction->segment_id();
1141
1142
 
1142
1143
    /*
1143
1144
     * Clear the transaction, create a Rollback statement message, 
1144
1145
     * attach it to the transaction, and push it to replicators.
1145
1146
     */
1146
1147
    transaction->Clear();
1147
 
    initTransactionMessage(*transaction, in_session, false);
 
1148
    initTransactionMessage(*transaction, session, false);
1148
1149
 
1149
1150
    /* Set the transaction ID to match the previous messages */
1150
1151
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1152
    transaction->set_segment_id(seg_id);
 
1153
    transaction->set_end_segment(true);
1151
1154
 
1152
1155
    message::Statement *statement= transaction->add_statement();
1153
1156
 
1154
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
 
    finalizeStatementMessage(*statement, in_session);
 
1157
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
 
1158
    finalizeStatementMessage(*statement, session);
1156
1159
 
1157
 
    finalizeTransactionMessage(*transaction, in_session);
 
1160
    finalizeTransactionMessage(*transaction, session);
1158
1161
    
1159
 
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1162
    (void) replication_services.pushTransactionMessage(session, *transaction);
1160
1163
  }
1161
 
  cleanupTransactionMessage(transaction, in_session);
 
1164
 
 
1165
  cleanupTransactionMessage(transaction, session);
1162
1166
}
1163
1167
 
1164
 
void TransactionServices::rollbackStatementMessage(Session *in_session)
 
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1165
1169
{
1166
1170
  ReplicationServices &replication_services= ReplicationServices::singleton();
1167
1171
  if (! replication_services.isActive())
1168
1172
    return;
1169
1173
 
1170
 
  message::Statement *current_statement= in_session->getStatementMessage();
 
1174
  message::Statement *current_statement= session.getStatementMessage();
1171
1175
 
1172
1176
  /* If we never added a Statement message, nothing to undo. */
1173
1177
  if (current_statement == NULL)
1206
1210
   * Remove the Statement message we've been working with (same as
1207
1211
   * current_statement).
1208
1212
   */
1209
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1213
  message::Transaction *transaction= getActiveTransactionMessage(session);
1210
1214
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
1215
  statements_in_txn= transaction->mutable_statement();
1212
1216
  statements_in_txn->RemoveLast();
1213
 
  in_session->setStatementMessage(NULL);
 
1217
  session.setStatementMessage(NULL);
1214
1218
  
1215
1219
  /*
1216
1220
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1222
1226
    current_statement= transaction->add_statement();
1223
1227
    initStatementMessage(*current_statement,
1224
1228
                         message::Statement::ROLLBACK_STATEMENT,
1225
 
                         in_session);
1226
 
    finalizeStatementMessage(*current_statement, in_session);
 
1229
                         session);
 
1230
    finalizeStatementMessage(*current_statement, session);
1227
1231
  }
1228
1232
}
1229
 
  
1230
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1231
 
                                                            Table *in_table,
 
1233
 
 
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
 
1235
                                                                     message::Transaction *transaction)
 
1236
{
 
1237
  uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1238
  uint32_t seg_id= transaction->segment_id();
 
1239
  
 
1240
  transaction->set_end_segment(false);
 
1241
  commitTransactionMessage(session);
 
1242
  transaction= getActiveTransactionMessage(session, false);
 
1243
  
 
1244
  /* Set the transaction ID to match the previous messages */
 
1245
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1246
  transaction->set_segment_id(seg_id + 1);
 
1247
  transaction->set_end_segment(true);
 
1248
 
 
1249
  return transaction;
 
1250
}
 
1251
 
 
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
 
1253
                                                            Table &table,
1232
1254
                                                            uint32_t *next_segment_id)
1233
1255
{
1234
 
  message::Statement *statement= in_session->getStatementMessage();
 
1256
  message::Statement *statement= session.getStatementMessage();
1235
1257
  message::Transaction *transaction= NULL;
1236
 
 
1237
 
  /* 
1238
 
   * Check the type for the current Statement message, if it is anything
1239
 
   * other then INSERT we need to call finalize, this will ensure a 
1240
 
   * new InsertStatement is created. If it is of type INSERT check
1241
 
   * what table the INSERT belongs to, if it is a different table
1242
 
   * call finalize, so a new InsertStatement can be created. 
 
1258
  
 
1259
  /*
 
1260
   * If statement is NULL, this is a new statement.
 
1261
   * If statement is NOT NULL, this a continuation of the same statement.
 
1262
   * This is because autocommitOrRollback() finalizes the statement so that
 
1263
   * we guarantee only one Statement message per statement (i.e., we no longer
 
1264
   * share a single GPB message for multiple statements).
1243
1265
   */
1244
 
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1245
 
  {
1246
 
    finalizeStatementMessage(*statement, in_session);
1247
 
    statement= in_session->getStatementMessage();
1248
 
  } 
1249
 
  else if (statement != NULL)
1250
 
  {
1251
 
    transaction= getActiveTransactionMessage(in_session);
1252
 
 
 
1266
  if (statement == NULL)
 
1267
  {
 
1268
    transaction= getActiveTransactionMessage(session);
 
1269
 
 
1270
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1271
        transaction_message_threshold)
 
1272
    {
 
1273
      transaction= segmentTransactionMessage(session, transaction);
 
1274
    }
 
1275
 
 
1276
    statement= transaction->add_statement();
 
1277
    setInsertHeader(*statement, session, table);
 
1278
    session.setStatementMessage(statement);
 
1279
  }
 
1280
  else
 
1281
  {
 
1282
    transaction= getActiveTransactionMessage(session);
 
1283
    
1253
1284
    /*
1254
1285
     * If we've passed our threshold for the statement size (possible for
1255
1286
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1256
1287
     * the Transaction will keep it from getting huge).
1257
1288
     */
1258
1289
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1259
 
      in_session->variables.transaction_message_threshold)
 
1290
        transaction_message_threshold)
1260
1291
    {
1261
1292
      /* Remember the transaction ID so we can re-use it */
1262
1293
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1263
 
 
 
1294
      uint32_t seg_id= transaction->segment_id();
 
1295
      
1264
1296
      message::InsertData *current_data= statement->mutable_insert_data();
1265
 
 
 
1297
      
1266
1298
      /* Caller should use this value when adding a new record */
1267
1299
      *next_segment_id= current_data->segment_id() + 1;
1268
 
 
 
1300
      
1269
1301
      current_data->set_end_segment(false);
1270
 
 
 
1302
      transaction->set_end_segment(false);
 
1303
      
1271
1304
      /* 
1272
1305
       * Send the trx message to replicators after finalizing the 
1273
1306
       * statement and transaction. This will also set the Transaction
1274
1307
       * and Statement objects in Session to NULL.
1275
1308
       */
1276
 
      commitTransactionMessage(in_session);
1277
 
 
 
1309
      commitTransactionMessage(session);
 
1310
      
1278
1311
      /*
1279
1312
       * Statement and Transaction should now be NULL, so new ones will get
1280
1313
       * created. We reuse the transaction id since we are segmenting
1281
1314
       * one transaction.
1282
1315
       */
1283
 
      statement= in_session->getStatementMessage();
1284
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1316
      transaction= getActiveTransactionMessage(session, false);
1285
1317
      assert(transaction != NULL);
1286
1318
 
 
1319
      statement= transaction->add_statement();
 
1320
      setInsertHeader(*statement, session, table);
 
1321
      session.setStatementMessage(statement);
 
1322
            
1287
1323
      /* Set the transaction ID to match the previous messages */
1288
1324
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1325
      transaction->set_segment_id(seg_id + 1);
 
1326
      transaction->set_end_segment(true);
1289
1327
    }
1290
1328
    else
1291
1329
    {
1292
 
      const message::InsertHeader &insert_header= statement->insert_header();
1293
 
      string old_table_name= insert_header.table_metadata().table_name();
1294
 
     
1295
 
      string current_table_name;
1296
 
      (void) in_table->getShare()->getTableName(current_table_name);
1297
 
 
1298
 
      if (current_table_name.compare(old_table_name))
1299
 
      {
1300
 
        finalizeStatementMessage(*statement, in_session);
1301
 
        statement= in_session->getStatementMessage();
1302
 
      }
1303
 
      else
1304
 
      {
1305
 
        /* carry forward the existing segment id */
1306
 
        const message::InsertData &current_data= statement->insert_data();
1307
 
        *next_segment_id= current_data.segment_id();
1308
 
      }
 
1330
      /*
 
1331
       * Continuation of the same statement. Carry forward the existing
 
1332
       * segment id.
 
1333
       */
 
1334
      const message::InsertData &current_data= statement->insert_data();
 
1335
      *next_segment_id= current_data.segment_id();
1309
1336
    }
1310
 
  } 
1311
 
 
1312
 
  if (statement == NULL)
1313
 
  {
1314
 
    /*
1315
 
     * Transaction will be non-NULL only if we had to segment it due to
1316
 
     * transaction size above.
1317
 
     */
1318
 
    if (transaction == NULL)
1319
 
      transaction= getActiveTransactionMessage(in_session);
1320
 
 
1321
 
    /* 
1322
 
     * Transaction message initialized and set, but no statement created
1323
 
     * yet.  We construct one and initialize it, here, then return the
1324
 
     * message after attaching the new Statement message pointer to the 
1325
 
     * Session for easy retrieval later...
1326
 
     */
1327
 
    statement= transaction->add_statement();
1328
 
    setInsertHeader(*statement, in_session, in_table);
1329
 
    in_session->setStatementMessage(statement);
1330
1337
  }
 
1338
  
1331
1339
  return *statement;
1332
1340
}
1333
1341
 
1334
1342
void TransactionServices::setInsertHeader(message::Statement &statement,
1335
 
                                          Session *in_session,
1336
 
                                          Table *in_table)
 
1343
                                          Session::const_reference session,
 
1344
                                          Table &table)
1337
1345
{
1338
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1346
  initStatementMessage(statement, message::Statement::INSERT, session);
1339
1347
 
1340
1348
  /* 
1341
1349
   * Now we construct the specialized InsertHeader message inside
1346
1354
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1347
1355
 
1348
1356
  string schema_name;
1349
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1357
  (void) table.getShare()->getSchemaName(schema_name);
1350
1358
  string table_name;
1351
 
  (void) in_table->getShare()->getTableName(table_name);
 
1359
  (void) table.getShare()->getTableName(table_name);
1352
1360
 
1353
1361
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1354
1362
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1355
1363
 
1356
1364
  Field *current_field;
1357
 
  Field **table_fields= in_table->getFields();
 
1365
  Field **table_fields= table.getFields();
1358
1366
 
1359
1367
  message::FieldMetadata *field_metadata;
1360
1368
 
1361
1369
  /* We will read all the table's fields... */
1362
 
  in_table->setReadSet();
 
1370
  table.setReadSet();
1363
1371
 
1364
1372
  while ((current_field= *table_fields++) != NULL) 
1365
1373
  {
1369
1377
  }
1370
1378
}
1371
1379
 
1372
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1380
bool TransactionServices::insertRecord(Session::reference session,
 
1381
                                       Table &table)
1373
1382
{
1374
1383
  ReplicationServices &replication_services= ReplicationServices::singleton();
1375
1384
  if (! replication_services.isActive())
1376
1385
    return false;
 
1386
 
 
1387
  if (not table.getShare()->is_replicated())
 
1388
    return false;
 
1389
 
1377
1390
  /**
1378
1391
   * We do this check here because we don't want to even create a 
1379
1392
   * statement if there isn't a primary key on the table...
1382
1395
   *
1383
1396
   * Multi-column primary keys are handled how exactly?
1384
1397
   */
1385
 
  if (not in_table->getShare()->hasPrimaryKey())
 
1398
  if (not table.getShare()->hasPrimaryKey())
1386
1399
  {
1387
1400
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1388
1401
    return true;
1389
1402
  }
1390
1403
 
1391
1404
  uint32_t next_segment_id= 1;
1392
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1405
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1393
1406
 
1394
1407
  message::InsertData *data= statement.mutable_insert_data();
1395
1408
  data->set_segment_id(next_segment_id);
1397
1410
  message::InsertRecord *record= data->add_record();
1398
1411
 
1399
1412
  Field *current_field;
1400
 
  Field **table_fields= in_table->getFields();
 
1413
  Field **table_fields= table.getFields();
1401
1414
 
1402
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1415
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1403
1416
  string_value->set_charset(system_charset_info);
1404
1417
 
1405
1418
  /* We will read all the table's fields... */
1406
 
  in_table->setReadSet();
 
1419
  table.setReadSet();
1407
1420
 
1408
1421
  while ((current_field= *table_fields++) != NULL) 
1409
1422
  {
1423
1436
  return false;
1424
1437
}
1425
1438
 
1426
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1427
 
                                                            Table *in_table,
 
1439
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
 
1440
                                                            Table &table,
1428
1441
                                                            const unsigned char *old_record, 
1429
1442
                                                            const unsigned char *new_record,
1430
1443
                                                            uint32_t *next_segment_id)
1431
1444
{
1432
 
  message::Statement *statement= in_session->getStatementMessage();
 
1445
  message::Statement *statement= session.getStatementMessage();
1433
1446
  message::Transaction *transaction= NULL;
1434
1447
 
1435
1448
  /*
1436
 
   * Check the type for the current Statement message, if it is anything
1437
 
   * other then UPDATE we need to call finalize, this will ensure a
1438
 
   * new UpdateStatement is created. If it is of type UPDATE check
1439
 
   * what table the UPDATE belongs to, if it is a different table
1440
 
   * call finalize, so a new UpdateStatement can be created.
 
1449
   * If statement is NULL, this is a new statement.
 
1450
   * If statement is NOT NULL, this a continuation of the same statement.
 
1451
   * This is because autocommitOrRollback() finalizes the statement so that
 
1452
   * we guarantee only one Statement message per statement (i.e., we no longer
 
1453
   * share a single GPB message for multiple statements).
1441
1454
   */
1442
 
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
 
1455
  if (statement == NULL)
1443
1456
  {
1444
 
    finalizeStatementMessage(*statement, in_session);
1445
 
    statement= in_session->getStatementMessage();
 
1457
    transaction= getActiveTransactionMessage(session);
 
1458
    
 
1459
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1460
        transaction_message_threshold)
 
1461
    {
 
1462
      transaction= segmentTransactionMessage(session, transaction);
 
1463
    }
 
1464
    
 
1465
    statement= transaction->add_statement();
 
1466
    setUpdateHeader(*statement, session, table, old_record, new_record);
 
1467
    session.setStatementMessage(statement);
1446
1468
  }
1447
 
  else if (statement != NULL)
 
1469
  else
1448
1470
  {
1449
 
    transaction= getActiveTransactionMessage(in_session);
1450
 
 
 
1471
    transaction= getActiveTransactionMessage(session);
 
1472
    
1451
1473
    /*
1452
1474
     * If we've passed our threshold for the statement size (possible for
1453
1475
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1454
1476
     * the Transaction will keep it from getting huge).
1455
1477
     */
1456
1478
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
 
      in_session->variables.transaction_message_threshold)
 
1479
        transaction_message_threshold)
1458
1480
    {
1459
1481
      /* Remember the transaction ID so we can re-use it */
1460
1482
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1461
 
 
 
1483
      uint32_t seg_id= transaction->segment_id();
 
1484
      
1462
1485
      message::UpdateData *current_data= statement->mutable_update_data();
1463
 
 
 
1486
      
1464
1487
      /* Caller should use this value when adding a new record */
1465
1488
      *next_segment_id= current_data->segment_id() + 1;
1466
 
 
 
1489
      
1467
1490
      current_data->set_end_segment(false);
1468
 
 
1469
 
      /*
 
1491
      transaction->set_end_segment(false);
 
1492
      
 
1493
      /* 
1470
1494
       * Send the trx message to replicators after finalizing the 
1471
1495
       * statement and transaction. This will also set the Transaction
1472
1496
       * and Statement objects in Session to NULL.
1473
1497
       */
1474
 
      commitTransactionMessage(in_session);
1475
 
 
 
1498
      commitTransactionMessage(session);
 
1499
      
1476
1500
      /*
1477
1501
       * Statement and Transaction should now be NULL, so new ones will get
1478
1502
       * created. We reuse the transaction id since we are segmenting
1479
1503
       * one transaction.
1480
1504
       */
1481
 
      statement= in_session->getStatementMessage();
1482
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1505
      transaction= getActiveTransactionMessage(session, false);
1483
1506
      assert(transaction != NULL);
1484
 
 
 
1507
      
 
1508
      statement= transaction->add_statement();
 
1509
      setUpdateHeader(*statement, session, table, old_record, new_record);
 
1510
      session.setStatementMessage(statement);
 
1511
      
1485
1512
      /* Set the transaction ID to match the previous messages */
1486
1513
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1514
      transaction->set_segment_id(seg_id + 1);
 
1515
      transaction->set_end_segment(true);
1487
1516
    }
1488
1517
    else
1489
1518
    {
1490
 
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1491
 
      {
1492
 
        /* carry forward the existing segment id */
1493
 
        const message::UpdateData &current_data= statement->update_data();
1494
 
        *next_segment_id= current_data.segment_id();
1495
 
      } 
1496
 
      else 
1497
 
      {
1498
 
        finalizeStatementMessage(*statement, in_session);
1499
 
        statement= in_session->getStatementMessage();
1500
 
      }
 
1519
      /*
 
1520
       * Continuation of the same statement. Carry forward the existing
 
1521
       * segment id.
 
1522
       */
 
1523
      const message::UpdateData &current_data= statement->update_data();
 
1524
      *next_segment_id= current_data.segment_id();
1501
1525
    }
1502
1526
  }
1503
 
 
1504
 
  if (statement == NULL)
1505
 
  {
1506
 
    /*
1507
 
     * Transaction will be non-NULL only if we had to segment it due to
1508
 
     * transaction size above.
1509
 
     */
1510
 
    if (transaction == NULL)
1511
 
      transaction= getActiveTransactionMessage(in_session);
1512
 
 
1513
 
    /* 
1514
 
     * Transaction message initialized and set, but no statement created
1515
 
     * yet.  We construct one and initialize it, here, then return the
1516
 
     * message after attaching the new Statement message pointer to the 
1517
 
     * Session for easy retrieval later...
1518
 
     */
1519
 
    statement= transaction->add_statement();
1520
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
 
    in_session->setStatementMessage(statement);
1522
 
  }
 
1527
  
1523
1528
  return *statement;
1524
1529
}
1525
1530
 
1526
 
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1527
 
                                                  Table *in_table,
1528
 
                                                  const unsigned char *old_record,
1529
 
                                                  const unsigned char *new_record)
1530
 
{
1531
 
  const message::UpdateHeader &update_header= statement.update_header();
1532
 
  string old_table_name= update_header.table_metadata().table_name();
1533
 
 
1534
 
  string current_table_name;
1535
 
  (void) in_table->getShare()->getTableName(current_table_name);
1536
 
  if (current_table_name.compare(old_table_name))
1537
 
  {
1538
 
    return false;
1539
 
  }
1540
 
  else
1541
 
  {
1542
 
    /* Compare the set fields in the existing UpdateHeader and see if they
1543
 
     * match the updated fields in the new record, if they do not we must
1544
 
     * create a new UpdateHeader 
1545
 
     */
1546
 
    size_t num_set_fields= update_header.set_field_metadata_size();
1547
 
 
1548
 
    Field *current_field;
1549
 
    Field **table_fields= in_table->getFields();
1550
 
    in_table->setReadSet();
1551
 
 
1552
 
    size_t num_calculated_updated_fields= 0;
1553
 
    bool found= false;
1554
 
    while ((current_field= *table_fields++) != NULL)
1555
 
    {
1556
 
      if (num_calculated_updated_fields > num_set_fields)
1557
 
      {
1558
 
        break;
1559
 
      }
1560
 
 
1561
 
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
1562
 
      {
1563
 
        /* check that this field exists in the UpdateHeader record */
1564
 
        found= false;
1565
 
 
1566
 
        for (size_t x= 0; x < num_set_fields; ++x)
1567
 
        {
1568
 
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
 
          string name= field_metadata.name();
1570
 
          if (name.compare(current_field->field_name) == 0)
1571
 
          {
1572
 
            found= true;
1573
 
            ++num_calculated_updated_fields;
1574
 
            break;
1575
 
          } 
1576
 
        }
1577
 
        if (! found)
1578
 
        {
1579
 
          break;
1580
 
        } 
1581
 
      }
1582
 
    }
1583
 
 
1584
 
    if ((num_calculated_updated_fields == num_set_fields) && found)
1585
 
    {
1586
 
      return true;
1587
 
    } 
1588
 
    else 
1589
 
    {
1590
 
      return false;
1591
 
    }
1592
 
  }
1593
 
}  
1594
 
 
1595
1531
void TransactionServices::setUpdateHeader(message::Statement &statement,
1596
 
                                          Session *in_session,
1597
 
                                          Table *in_table,
 
1532
                                          Session::const_reference session,
 
1533
                                          Table &table,
1598
1534
                                          const unsigned char *old_record, 
1599
1535
                                          const unsigned char *new_record)
1600
1536
{
1601
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1537
  initStatementMessage(statement, message::Statement::UPDATE, session);
1602
1538
 
1603
1539
  /* 
1604
1540
   * Now we construct the specialized UpdateHeader message inside
1609
1545
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1610
1546
 
1611
1547
  string schema_name;
1612
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1548
  (void) table.getShare()->getSchemaName(schema_name);
1613
1549
  string table_name;
1614
 
  (void) in_table->getShare()->getTableName(table_name);
 
1550
  (void) table.getShare()->getTableName(table_name);
1615
1551
 
1616
1552
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1617
1553
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1618
1554
 
1619
1555
  Field *current_field;
1620
 
  Field **table_fields= in_table->getFields();
 
1556
  Field **table_fields= table.getFields();
1621
1557
 
1622
1558
  message::FieldMetadata *field_metadata;
1623
1559
 
1624
1560
  /* We will read all the table's fields... */
1625
 
  in_table->setReadSet();
 
1561
  table.setReadSet();
1626
1562
 
1627
1563
  while ((current_field= *table_fields++) != NULL) 
1628
1564
  {
1630
1566
     * We add the "key field metadata" -- i.e. the fields which is
1631
1567
     * the primary key for the table.
1632
1568
     */
1633
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1569
    if (table.getShare()->fieldInPrimaryKey(current_field))
1634
1570
    {
1635
1571
      field_metadata= header->add_key_field_metadata();
1636
1572
      field_metadata->set_name(current_field->field_name);
1637
1573
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1638
1574
    }
1639
1575
 
1640
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1576
    if (isFieldUpdated(current_field, table, old_record, new_record))
1641
1577
    {
1642
1578
      /* Field is changed from old to new */
1643
1579
      field_metadata= header->add_set_field_metadata();
1646
1582
    }
1647
1583
  }
1648
1584
}
1649
 
void TransactionServices::updateRecord(Session *in_session,
1650
 
                                       Table *in_table, 
 
1585
 
 
1586
void TransactionServices::updateRecord(Session::reference session,
 
1587
                                       Table &table, 
1651
1588
                                       const unsigned char *old_record, 
1652
1589
                                       const unsigned char *new_record)
1653
1590
{
1655
1592
  if (! replication_services.isActive())
1656
1593
    return;
1657
1594
 
 
1595
  if (not table.getShare()->is_replicated())
 
1596
    return;
 
1597
 
1658
1598
  uint32_t next_segment_id= 1;
1659
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1599
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1660
1600
 
1661
1601
  message::UpdateData *data= statement.mutable_update_data();
1662
1602
  data->set_segment_id(next_segment_id);
1664
1604
  message::UpdateRecord *record= data->add_record();
1665
1605
 
1666
1606
  Field *current_field;
1667
 
  Field **table_fields= in_table->getFields();
1668
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1607
  Field **table_fields= table.getFields();
 
1608
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1669
1609
  string_value->set_charset(system_charset_info);
1670
1610
 
1671
1611
  while ((current_field= *table_fields++) != NULL) 
1681
1621
     *
1682
1622
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1683
1623
     */
1684
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1624
    if (isFieldUpdated(current_field, table, old_record, new_record))
1685
1625
    {
1686
1626
      /* Store the original "read bit" for this field */
1687
1627
      bool is_read_set= current_field->isReadSet();
1688
1628
 
1689
1629
      /* We need to mark that we will "read" this field... */
1690
 
      in_table->setReadSet(current_field->position());
 
1630
      table.setReadSet(current_field->position());
1691
1631
 
1692
1632
      /* Read the string value of this field's contents */
1693
1633
      string_value= current_field->val_str_internal(string_value);
1716
1656
     * primary key field value.  Replication only supports tables
1717
1657
     * with a primary key.
1718
1658
     */
1719
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1659
    if (table.getShare()->fieldInPrimaryKey(current_field))
1720
1660
    {
1721
1661
      /**
1722
1662
       * To say the below is ugly is an understatement. But it works.
1734
1674
}
1735
1675
 
1736
1676
bool TransactionServices::isFieldUpdated(Field *current_field,
1737
 
                                         Table *in_table,
 
1677
                                         Table &table,
1738
1678
                                         const unsigned char *old_record,
1739
1679
                                         const unsigned char *new_record)
1740
1680
{
1743
1683
   * we do this crazy pointer fiddling to figure out if the current field
1744
1684
   * has been updated in the supplied record raw byte pointers.
1745
1685
   */
1746
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1747
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1686
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
 
1687
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1748
1688
 
1749
1689
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1750
1690
 
1774
1714
  return isUpdated;
1775
1715
}  
1776
1716
 
1777
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1778
 
                                                            Table *in_table,
 
1717
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
 
1718
                                                            Table &table,
1779
1719
                                                            uint32_t *next_segment_id)
1780
1720
{
1781
 
  message::Statement *statement= in_session->getStatementMessage();
 
1721
  message::Statement *statement= session.getStatementMessage();
1782
1722
  message::Transaction *transaction= NULL;
1783
1723
 
1784
1724
  /*
1785
 
   * Check the type for the current Statement message, if it is anything
1786
 
   * other then DELETE we need to call finalize, this will ensure a
1787
 
   * new DeleteStatement is created. If it is of type DELETE check
1788
 
   * what table the DELETE belongs to, if it is a different table
1789
 
   * call finalize, so a new DeleteStatement can be created.
 
1725
   * If statement is NULL, this is a new statement.
 
1726
   * If statement is NOT NULL, this a continuation of the same statement.
 
1727
   * This is because autocommitOrRollback() finalizes the statement so that
 
1728
   * we guarantee only one Statement message per statement (i.e., we no longer
 
1729
   * share a single GPB message for multiple statements).
1790
1730
   */
1791
 
  if (statement != NULL && statement->type() != message::Statement::DELETE)
 
1731
  if (statement == NULL)
1792
1732
  {
1793
 
    finalizeStatementMessage(*statement, in_session);
1794
 
    statement= in_session->getStatementMessage();
 
1733
    transaction= getActiveTransactionMessage(session);
 
1734
    
 
1735
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1736
        transaction_message_threshold)
 
1737
    {
 
1738
      transaction= segmentTransactionMessage(session, transaction);
 
1739
    }
 
1740
    
 
1741
    statement= transaction->add_statement();
 
1742
    setDeleteHeader(*statement, session, table);
 
1743
    session.setStatementMessage(statement);
1795
1744
  }
1796
 
  else if (statement != NULL)
 
1745
  else
1797
1746
  {
1798
 
    transaction= getActiveTransactionMessage(in_session);
1799
 
 
 
1747
    transaction= getActiveTransactionMessage(session);
 
1748
    
1800
1749
    /*
1801
1750
     * If we've passed our threshold for the statement size (possible for
1802
1751
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1803
1752
     * the Transaction will keep it from getting huge).
1804
1753
     */
1805
1754
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1806
 
      in_session->variables.transaction_message_threshold)
 
1755
        transaction_message_threshold)
1807
1756
    {
1808
1757
      /* Remember the transaction ID so we can re-use it */
1809
1758
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1810
 
 
 
1759
      uint32_t seg_id= transaction->segment_id();
 
1760
      
1811
1761
      message::DeleteData *current_data= statement->mutable_delete_data();
1812
 
 
 
1762
      
1813
1763
      /* Caller should use this value when adding a new record */
1814
1764
      *next_segment_id= current_data->segment_id() + 1;
1815
 
 
 
1765
      
1816
1766
      current_data->set_end_segment(false);
1817
 
 
 
1767
      transaction->set_end_segment(false);
 
1768
      
1818
1769
      /* 
1819
1770
       * Send the trx message to replicators after finalizing the 
1820
1771
       * statement and transaction. This will also set the Transaction
1821
1772
       * and Statement objects in Session to NULL.
1822
1773
       */
1823
 
      commitTransactionMessage(in_session);
1824
 
 
 
1774
      commitTransactionMessage(session);
 
1775
      
1825
1776
      /*
1826
1777
       * Statement and Transaction should now be NULL, so new ones will get
1827
1778
       * created. We reuse the transaction id since we are segmenting
1828
1779
       * one transaction.
1829
1780
       */
1830
 
      statement= in_session->getStatementMessage();
1831
 
      transaction= getActiveTransactionMessage(in_session, false);
 
1781
      transaction= getActiveTransactionMessage(session, false);
1832
1782
      assert(transaction != NULL);
1833
 
 
 
1783
      
 
1784
      statement= transaction->add_statement();
 
1785
      setDeleteHeader(*statement, session, table);
 
1786
      session.setStatementMessage(statement);
 
1787
      
1834
1788
      /* Set the transaction ID to match the previous messages */
1835
1789
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1790
      transaction->set_segment_id(seg_id + 1);
 
1791
      transaction->set_end_segment(true);
1836
1792
    }
1837
1793
    else
1838
1794
    {
1839
 
      const message::DeleteHeader &delete_header= statement->delete_header();
1840
 
      string old_table_name= delete_header.table_metadata().table_name();
1841
 
 
1842
 
      string current_table_name;
1843
 
      (void) in_table->getShare()->getTableName(current_table_name);
1844
 
      if (current_table_name.compare(old_table_name))
1845
 
      {
1846
 
        finalizeStatementMessage(*statement, in_session);
1847
 
        statement= in_session->getStatementMessage();
1848
 
      }
1849
 
      else
1850
 
      {
1851
 
        /* carry forward the existing segment id */
1852
 
        const message::DeleteData &current_data= statement->delete_data();
1853
 
        *next_segment_id= current_data.segment_id();
1854
 
      }
 
1795
      /*
 
1796
       * Continuation of the same statement. Carry forward the existing
 
1797
       * segment id.
 
1798
       */
 
1799
      const message::DeleteData &current_data= statement->delete_data();
 
1800
      *next_segment_id= current_data.segment_id();
1855
1801
    }
1856
1802
  }
1857
 
 
1858
 
  if (statement == NULL)
1859
 
  {
1860
 
    /*
1861
 
     * Transaction will be non-NULL only if we had to segment it due to
1862
 
     * transaction size above.
1863
 
     */
1864
 
    if (transaction == NULL)
1865
 
      transaction= getActiveTransactionMessage(in_session);
1866
 
 
1867
 
    /* 
1868
 
     * Transaction message initialized and set, but no statement created
1869
 
     * yet.  We construct one and initialize it, here, then return the
1870
 
     * message after attaching the new Statement message pointer to the 
1871
 
     * Session for easy retrieval later...
1872
 
     */
1873
 
    statement= transaction->add_statement();
1874
 
    setDeleteHeader(*statement, in_session, in_table);
1875
 
    in_session->setStatementMessage(statement);
1876
 
  }
 
1803
  
1877
1804
  return *statement;
1878
1805
}
1879
1806
 
1880
1807
void TransactionServices::setDeleteHeader(message::Statement &statement,
1881
 
                                          Session *in_session,
1882
 
                                          Table *in_table)
 
1808
                                          Session::const_reference session,
 
1809
                                          Table &table)
1883
1810
{
1884
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1811
  initStatementMessage(statement, message::Statement::DELETE, session);
1885
1812
 
1886
1813
  /* 
1887
1814
   * Now we construct the specialized DeleteHeader message inside
1891
1818
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1892
1819
 
1893
1820
  string schema_name;
1894
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1821
  (void) table.getShare()->getSchemaName(schema_name);
1895
1822
  string table_name;
1896
 
  (void) in_table->getShare()->getTableName(table_name);
 
1823
  (void) table.getShare()->getTableName(table_name);
1897
1824
 
1898
1825
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1899
1826
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1900
1827
 
1901
1828
  Field *current_field;
1902
 
  Field **table_fields= in_table->getFields();
 
1829
  Field **table_fields= table.getFields();
1903
1830
 
1904
1831
  message::FieldMetadata *field_metadata;
1905
1832
 
1910
1837
     * primary key field value.  Replication only supports tables
1911
1838
     * with a primary key.
1912
1839
     */
1913
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1840
    if (table.getShare()->fieldInPrimaryKey(current_field))
1914
1841
    {
1915
1842
      field_metadata= header->add_key_field_metadata();
1916
1843
      field_metadata->set_name(current_field->field_name);
1919
1846
  }
1920
1847
}
1921
1848
 
1922
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1849
void TransactionServices::deleteRecord(Session::reference session,
 
1850
                                       Table &table,
 
1851
                                       bool use_update_record)
1923
1852
{
1924
1853
  ReplicationServices &replication_services= ReplicationServices::singleton();
1925
1854
  if (! replication_services.isActive())
1926
1855
    return;
1927
1856
 
 
1857
  if (not table.getShare()->is_replicated())
 
1858
    return;
 
1859
 
1928
1860
  uint32_t next_segment_id= 1;
1929
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1861
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1930
1862
 
1931
1863
  message::DeleteData *data= statement.mutable_delete_data();
1932
1864
  data->set_segment_id(next_segment_id);
1934
1866
  message::DeleteRecord *record= data->add_record();
1935
1867
 
1936
1868
  Field *current_field;
1937
 
  Field **table_fields= in_table->getFields();
1938
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1869
  Field **table_fields= table.getFields();
 
1870
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1939
1871
  string_value->set_charset(system_charset_info);
1940
1872
 
1941
1873
  while ((current_field= *table_fields++) != NULL) 
1945
1877
     * primary key field value.  Replication only supports tables
1946
1878
     * with a primary key.
1947
1879
     */
1948
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1880
    if (table.getShare()->fieldInPrimaryKey(current_field))
1949
1881
    {
1950
1882
      if (use_update_record)
1951
1883
      {
1957
1889
         * We are careful not to change anything in old_ptr.
1958
1890
         */
1959
1891
        const unsigned char *old_ptr= current_field->ptr;
1960
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1892
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1961
1893
        string_value= current_field->val_str_internal(string_value);
1962
1894
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1963
1895
      }
1974
1906
  }
1975
1907
}
1976
1908
 
1977
 
void TransactionServices::createTable(Session *in_session,
 
1909
void TransactionServices::createTable(Session::reference session,
1978
1910
                                      const message::Table &table)
1979
1911
{
1980
1912
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
 
  if (! replication_services.isActive())
1982
 
    return;
1983
 
  
1984
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1913
  if (not replication_services.isActive())
 
1914
    return;
 
1915
 
 
1916
  if (not message::is_replicated(table))
 
1917
    return;
 
1918
 
 
1919
  message::Transaction *transaction= getActiveTransactionMessage(session);
1985
1920
  message::Statement *statement= transaction->add_statement();
1986
1921
 
1987
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1922
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1988
1923
 
1989
1924
  /* 
1990
1925
   * Construct the specialized CreateTableStatement message and attach
1994
1929
  message::Table *new_table_message= create_table_statement->mutable_table();
1995
1930
  *new_table_message= table;
1996
1931
 
1997
 
  finalizeStatementMessage(*statement, in_session);
 
1932
  finalizeStatementMessage(*statement, session);
1998
1933
 
1999
 
  finalizeTransactionMessage(*transaction, in_session);
 
1934
  finalizeTransactionMessage(*transaction, session);
2000
1935
  
2001
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1936
  (void) replication_services.pushTransactionMessage(session, *transaction);
2002
1937
 
2003
 
  cleanupTransactionMessage(transaction, in_session);
 
1938
  cleanupTransactionMessage(transaction, session);
2004
1939
 
2005
1940
}
2006
1941
 
2007
 
void TransactionServices::createSchema(Session *in_session,
 
1942
void TransactionServices::createSchema(Session::reference session,
2008
1943
                                       const message::Schema &schema)
2009
1944
{
2010
1945
  ReplicationServices &replication_services= ReplicationServices::singleton();
2011
1946
  if (! replication_services.isActive())
2012
1947
    return;
2013
 
  
2014
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1948
 
 
1949
  if (not message::is_replicated(schema))
 
1950
    return;
 
1951
 
 
1952
  message::Transaction *transaction= getActiveTransactionMessage(session);
2015
1953
  message::Statement *statement= transaction->add_statement();
2016
1954
 
2017
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1955
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
2018
1956
 
2019
1957
  /* 
2020
1958
   * Construct the specialized CreateSchemaStatement message and attach
2024
1962
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2025
1963
  *new_schema_message= schema;
2026
1964
 
2027
 
  finalizeStatementMessage(*statement, in_session);
 
1965
  finalizeStatementMessage(*statement, session);
2028
1966
 
2029
 
  finalizeTransactionMessage(*transaction, in_session);
 
1967
  finalizeTransactionMessage(*transaction, session);
2030
1968
  
2031
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1969
  (void) replication_services.pushTransactionMessage(session, *transaction);
2032
1970
 
2033
 
  cleanupTransactionMessage(transaction, in_session);
 
1971
  cleanupTransactionMessage(transaction, session);
2034
1972
 
2035
1973
}
2036
1974
 
2037
 
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
 
1975
void TransactionServices::dropSchema(Session::reference session,
 
1976
                                     identifier::Schema::const_reference identifier,
 
1977
                                     message::schema::const_reference schema)
2038
1978
{
2039
1979
  ReplicationServices &replication_services= ReplicationServices::singleton();
2040
 
  if (! replication_services.isActive())
2041
 
    return;
2042
 
  
2043
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1980
  if (not replication_services.isActive())
 
1981
    return;
 
1982
 
 
1983
  if (not message::is_replicated(schema))
 
1984
    return;
 
1985
 
 
1986
  message::Transaction *transaction= getActiveTransactionMessage(session);
2044
1987
  message::Statement *statement= transaction->add_statement();
2045
1988
 
2046
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1989
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
2047
1990
 
2048
1991
  /* 
2049
1992
   * Construct the specialized DropSchemaStatement message and attach
2051
1994
   */
2052
1995
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2053
1996
 
2054
 
  drop_schema_statement->set_schema_name(schema_name);
2055
 
 
2056
 
  finalizeStatementMessage(*statement, in_session);
2057
 
 
2058
 
  finalizeTransactionMessage(*transaction, in_session);
2059
 
  
2060
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2061
 
 
2062
 
  cleanupTransactionMessage(transaction, in_session);
2063
 
}
2064
 
 
2065
 
void TransactionServices::dropTable(Session *in_session,
2066
 
                                    const string &schema_name,
2067
 
                                    const string &table_name,
 
1997
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
 
1998
 
 
1999
  finalizeStatementMessage(*statement, session);
 
2000
 
 
2001
  finalizeTransactionMessage(*transaction, session);
 
2002
  
 
2003
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2004
 
 
2005
  cleanupTransactionMessage(transaction, session);
 
2006
}
 
2007
 
 
2008
void TransactionServices::alterSchema(Session::reference session,
 
2009
                                      const message::Schema &old_schema,
 
2010
                                      const message::Schema &new_schema)
 
2011
{
 
2012
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2013
  if (! replication_services.isActive())
 
2014
    return;
 
2015
 
 
2016
  if (not message::is_replicated(old_schema))
 
2017
    return;
 
2018
 
 
2019
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
2020
  message::Statement *statement= transaction->add_statement();
 
2021
 
 
2022
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
 
2023
 
 
2024
  /* 
 
2025
   * Construct the specialized AlterSchemaStatement message and attach
 
2026
   * it to the generic Statement message
 
2027
   */
 
2028
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
 
2029
 
 
2030
  message::Schema *before= alter_schema_statement->mutable_before();
 
2031
  message::Schema *after= alter_schema_statement->mutable_after();
 
2032
 
 
2033
  *before= old_schema;
 
2034
  *after= new_schema;
 
2035
 
 
2036
  finalizeStatementMessage(*statement, session);
 
2037
 
 
2038
  finalizeTransactionMessage(*transaction, session);
 
2039
  
 
2040
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2041
 
 
2042
  cleanupTransactionMessage(transaction, session);
 
2043
}
 
2044
 
 
2045
void TransactionServices::dropTable(Session::reference session,
 
2046
                                    identifier::Table::const_reference identifier,
 
2047
                                    message::table::const_reference table,
2068
2048
                                    bool if_exists)
2069
2049
{
2070
2050
  ReplicationServices &replication_services= ReplicationServices::singleton();
2071
2051
  if (! replication_services.isActive())
2072
2052
    return;
2073
 
  
2074
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2053
 
 
2054
  if (not message::is_replicated(table))
 
2055
    return;
 
2056
 
 
2057
  message::Transaction *transaction= getActiveTransactionMessage(session);
2075
2058
  message::Statement *statement= transaction->add_statement();
2076
2059
 
2077
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
2060
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2078
2061
 
2079
2062
  /* 
2080
2063
   * Construct the specialized DropTableStatement message and attach
2086
2069
 
2087
2070
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2088
2071
 
2089
 
  table_metadata->set_schema_name(schema_name);
2090
 
  table_metadata->set_table_name(table_name);
2091
 
 
2092
 
  finalizeStatementMessage(*statement, in_session);
2093
 
 
2094
 
  finalizeTransactionMessage(*transaction, in_session);
 
2072
  table_metadata->set_schema_name(identifier.getSchemaName());
 
2073
  table_metadata->set_table_name(identifier.getTableName());
 
2074
 
 
2075
  finalizeStatementMessage(*statement, session);
 
2076
 
 
2077
  finalizeTransactionMessage(*transaction, session);
2095
2078
  
2096
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2079
  (void) replication_services.pushTransactionMessage(session, *transaction);
2097
2080
 
2098
 
  cleanupTransactionMessage(transaction, in_session);
 
2081
  cleanupTransactionMessage(transaction, session);
2099
2082
}
2100
2083
 
2101
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
2084
void TransactionServices::truncateTable(Session::reference session,
 
2085
                                        Table &table)
2102
2086
{
2103
2087
  ReplicationServices &replication_services= ReplicationServices::singleton();
2104
2088
  if (! replication_services.isActive())
2105
2089
    return;
2106
 
  
2107
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2090
 
 
2091
  if (not table.getShare()->is_replicated())
 
2092
    return;
 
2093
 
 
2094
  message::Transaction *transaction= getActiveTransactionMessage(session);
2108
2095
  message::Statement *statement= transaction->add_statement();
2109
2096
 
2110
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
2097
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2111
2098
 
2112
2099
  /* 
2113
2100
   * Construct the specialized TruncateTableStatement message and attach
2117
2104
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2118
2105
 
2119
2106
  string schema_name;
2120
 
  (void) in_table->getShare()->getSchemaName(schema_name);
 
2107
  (void) table.getShare()->getSchemaName(schema_name);
 
2108
 
2121
2109
  string table_name;
2122
 
  (void) in_table->getShare()->getTableName(table_name);
 
2110
  (void) table.getShare()->getTableName(table_name);
2123
2111
 
2124
2112
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2125
2113
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2126
2114
 
2127
 
  finalizeStatementMessage(*statement, in_session);
 
2115
  finalizeStatementMessage(*statement, session);
2128
2116
 
2129
 
  finalizeTransactionMessage(*transaction, in_session);
 
2117
  finalizeTransactionMessage(*transaction, session);
2130
2118
  
2131
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2119
  (void) replication_services.pushTransactionMessage(session, *transaction);
2132
2120
 
2133
 
  cleanupTransactionMessage(transaction, in_session);
 
2121
  cleanupTransactionMessage(transaction, session);
2134
2122
}
2135
2123
 
2136
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
2124
void TransactionServices::rawStatement(Session::reference session,
 
2125
                                       const string &query,
 
2126
                                       const string &schema)
2137
2127
{
2138
2128
  ReplicationServices &replication_services= ReplicationServices::singleton();
2139
2129
  if (! replication_services.isActive())
2140
2130
    return;
2141
2131
 
2142
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
2132
  message::Transaction *transaction= getActiveTransactionMessage(session);
2143
2133
  message::Statement *statement= transaction->add_statement();
2144
2134
 
2145
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
2135
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2146
2136
  statement->set_sql(query);
2147
 
  finalizeStatementMessage(*statement, in_session);
 
2137
  if (not schema.empty())
 
2138
    statement->set_raw_sql_schema(schema);
 
2139
  finalizeStatementMessage(*statement, session);
2148
2140
 
2149
 
  finalizeTransactionMessage(*transaction, in_session);
 
2141
  finalizeTransactionMessage(*transaction, session);
2150
2142
  
2151
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
2143
  (void) replication_services.pushTransactionMessage(session, *transaction);
2152
2144
 
2153
 
  cleanupTransactionMessage(transaction, in_session);
 
2145
  cleanupTransactionMessage(transaction, session);
2154
2146
}
2155
2147
 
2156
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2148
int TransactionServices::sendEvent(Session::reference session,
 
2149
                                   const message::Event &event)
2157
2150
{
2158
2151
  ReplicationServices &replication_services= ReplicationServices::singleton();
2159
2152
  if (! replication_services.isActive())
2171
2164
 
2172
2165
  trx_event->CopyFrom(event);
2173
2166
 
2174
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2167
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2175
2168
 
2176
2169
  delete transaction;
2177
2170
 
2178
2171
  return static_cast<int>(result);
2179
2172
}
2180
2173
 
2181
 
bool TransactionServices::sendStartupEvent(Session *session)
 
2174
bool TransactionServices::sendStartupEvent(Session::reference session)
2182
2175
{
2183
2176
  message::Event event;
2184
2177
  event.set_type(message::Event::STARTUP);
2187
2180
  return true;
2188
2181
}
2189
2182
 
2190
 
bool TransactionServices::sendShutdownEvent(Session *session)
 
2183
bool TransactionServices::sendShutdownEvent(Session::reference session)
2191
2184
{
2192
2185
  message::Event event;
2193
2186
  event.set_type(message::Event::SHUTDOWN);