~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-08-09 18:04:12 UTC
  • mfrom: (1689.3.7 staging)
  • Revision ID: brian@gaz-20100809180412-olurwh51ojllev6p
Merge in heap conversion, and case insensitive patch, and remove need for
M_HASH in session.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  This program is free software; you can redistribute it and/or modify
8
8
 *  it under the terms of the GNU General Public License as published by
47
47
 * plugins can understand.
48
48
 */
49
49
 
50
 
#include <config.h>
51
 
#include <drizzled/current_session.h>
52
 
#include <drizzled/my_hash.h>
53
 
#include <drizzled/error.h>
54
 
#include <drizzled/gettext.h>
55
 
#include <drizzled/probes.h>
56
 
#include <drizzled/sql_parse.h>
57
 
#include <drizzled/session.h>
58
 
#include <drizzled/sql_base.h>
59
 
#include <drizzled/replication_services.h>
60
 
#include <drizzled/transaction_services.h>
61
 
#include <drizzled/transaction_context.h>
62
 
#include <drizzled/message/transaction.pb.h>
63
 
#include <drizzled/message/statement_transform.h>
64
 
#include <drizzled/resource_context.h>
65
 
#include <drizzled/lock.h>
66
 
#include <drizzled/item/int.h>
67
 
#include <drizzled/item/empty_string.h>
68
 
#include <drizzled/field/epoch.h>
69
 
#include <drizzled/plugin/client.h>
70
 
#include <drizzled/plugin/monitored_in_transaction.h>
71
 
#include <drizzled/plugin/transactional_storage_engine.h>
72
 
#include <drizzled/plugin/xa_resource_manager.h>
73
 
#include <drizzled/plugin/xa_storage_engine.h>
74
 
#include <drizzled/internal/my_sys.h>
 
50
#include "config.h"
 
51
#include "drizzled/my_hash.h"
 
52
#include "drizzled/error.h"
 
53
#include "drizzled/gettext.h"
 
54
#include "drizzled/probes.h"
 
55
#include "drizzled/sql_parse.h"
 
56
#include "drizzled/session.h"
 
57
#include "drizzled/sql_base.h"
 
58
#include "drizzled/replication_services.h"
 
59
#include "drizzled/transaction_services.h"
 
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
 
63
#include "drizzled/resource_context.h"
 
64
#include "drizzled/lock.h"
 
65
#include "drizzled/item/int.h"
 
66
#include "drizzled/item/empty_string.h"
 
67
#include "drizzled/field/timestamp.h"
 
68
#include "drizzled/plugin/client.h"
 
69
#include "drizzled/plugin/monitored_in_transaction.h"
 
70
#include "drizzled/plugin/transactional_storage_engine.h"
 
71
#include "drizzled/plugin/xa_resource_manager.h"
 
72
#include "drizzled/internal/my_sys.h"
 
73
 
 
74
using namespace std;
75
75
 
76
76
#include <vector>
77
77
#include <algorithm>
78
78
#include <functional>
79
 
#include <google/protobuf/repeated_field.h>
80
 
 
81
 
using namespace std;
82
 
using namespace google;
83
79
 
84
80
namespace drizzled
85
81
{
301
297
 * transaction after all DDLs, just like the statement transaction
302
298
 * is always committed at the end of all statements.
303
299
 */
304
 
TransactionServices::TransactionServices()
305
 
{
306
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
307
 
  if (engine)
308
 
  {
309
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
310
 
  }
311
 
  else 
312
 
  {
313
 
    xa_storage_engine= NULL;
314
 
  }
315
 
}
316
 
 
317
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
300
void TransactionServices::registerResourceForStatement(Session *session,
318
301
                                                       plugin::MonitoredInTransaction *monitored,
319
302
                                                       plugin::TransactionalStorageEngine *engine)
320
303
{
321
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
304
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
322
305
  {
323
306
    /* 
324
307
     * Now we automatically register this resource manager for the
330
313
    registerResourceForTransaction(session, monitored, engine);
331
314
  }
332
315
 
333
 
  TransactionContext *trans= &session.transaction.stmt;
334
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
316
  TransactionContext *trans= &session->transaction.stmt;
 
317
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
335
318
 
336
319
  if (resource_context->isStarted())
337
320
    return; /* already registered, return */
346
329
  trans->no_2pc|= true;
347
330
}
348
331
 
349
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
332
void TransactionServices::registerResourceForStatement(Session *session,
350
333
                                                       plugin::MonitoredInTransaction *monitored,
351
334
                                                       plugin::TransactionalStorageEngine *engine,
352
335
                                                       plugin::XaResourceManager *resource_manager)
353
336
{
354
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
337
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
355
338
  {
356
339
    /* 
357
340
     * Now we automatically register this resource manager for the
363
346
    registerResourceForTransaction(session, monitored, engine, resource_manager);
364
347
  }
365
348
 
366
 
  TransactionContext *trans= &session.transaction.stmt;
367
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
349
  TransactionContext *trans= &session->transaction.stmt;
 
350
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
368
351
 
369
352
  if (resource_context->isStarted())
370
353
    return; /* already registered, return */
380
363
  trans->no_2pc|= false;
381
364
}
382
365
 
383
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
366
void TransactionServices::registerResourceForTransaction(Session *session,
384
367
                                                         plugin::MonitoredInTransaction *monitored,
385
368
                                                         plugin::TransactionalStorageEngine *engine)
386
369
{
387
 
  TransactionContext *trans= &session.transaction.all;
388
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
370
  TransactionContext *trans= &session->transaction.all;
 
371
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
389
372
 
390
373
  if (resource_context->isStarted())
391
374
    return; /* already registered, return */
392
375
 
393
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
376
  session->server_status|= SERVER_STATUS_IN_TRANS;
394
377
 
395
378
  trans->registerResource(resource_context);
396
379
 
401
384
  resource_context->setTransactionalStorageEngine(engine);
402
385
  trans->no_2pc|= true;
403
386
 
404
 
  if (session.transaction.xid_state.xid.is_null())
405
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
387
  if (session->transaction.xid_state.xid.is_null())
 
388
    session->transaction.xid_state.xid.set(session->getQueryId());
 
389
 
 
390
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
406
391
 
407
392
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
393
  if (! session->getResourceContext(monitored, 0)->isStarted())
409
394
    registerResourceForStatement(session, monitored, engine);
410
395
}
411
396
 
412
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
397
void TransactionServices::registerResourceForTransaction(Session *session,
413
398
                                                         plugin::MonitoredInTransaction *monitored,
414
399
                                                         plugin::TransactionalStorageEngine *engine,
415
400
                                                         plugin::XaResourceManager *resource_manager)
416
401
{
417
 
  TransactionContext *trans= &session.transaction.all;
418
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
402
  TransactionContext *trans= &session->transaction.all;
 
403
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
419
404
 
420
405
  if (resource_context->isStarted())
421
406
    return; /* already registered, return */
422
407
 
423
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
 
408
  session->server_status|= SERVER_STATUS_IN_TRANS;
424
409
 
425
410
  trans->registerResource(resource_context);
426
411
 
431
416
  resource_context->setTransactionalStorageEngine(engine);
432
417
  trans->no_2pc|= true;
433
418
 
434
 
  if (session.transaction.xid_state.xid.is_null())
435
 
    session.transaction.xid_state.xid.set(session.getQueryId());
 
419
  if (session->transaction.xid_state.xid.is_null())
 
420
    session->transaction.xid_state.xid.set(session->getQueryId());
436
421
 
437
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
 
422
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
438
423
 
439
424
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
425
  if (! session->getResourceContext(monitored, 0)->isStarted())
441
426
    registerResourceForStatement(session, monitored, engine, resource_manager);
442
427
}
443
428
 
444
 
void TransactionServices::allocateNewTransactionId()
445
 
{
446
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
447
 
  if (! replication_services.isActive())
448
 
  {
449
 
    return;
450
 
  }
451
 
 
452
 
  Session *my_session= current_session;
453
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
454
 
  my_session->setXaId(xa_id);
455
 
}
456
 
 
457
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
458
 
{
459
 
  if (session.getXaId() == 0)
460
 
  {
461
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
462
 
  }
463
 
 
464
 
  return session.getXaId();
465
 
}
466
 
 
467
 
int TransactionServices::commitTransaction(Session::reference session,
468
 
                                           bool normal_transaction)
 
429
/**
 
430
  @retval
 
431
    0   ok
 
432
  @retval
 
433
    1   transaction was rolled back
 
434
  @retval
 
435
    2   error during commit, data may be inconsistent
 
436
 
 
437
  @todo
 
438
    Since we don't support nested statement transactions in 5.0,
 
439
    we can't commit or rollback stmt transactions while we are inside
 
440
    stored functions or triggers. So we simply do nothing now.
 
441
    TODO: This should be fixed in later ( >= 5.1) releases.
 
442
*/
 
443
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
469
444
{
470
445
  int error= 0, cookie= 0;
471
446
  /*
472
447
    'all' means that this is either an explicit commit issued by
473
448
    user, or an implicit commit issued by a DDL.
474
449
  */
475
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
450
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
476
451
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
477
452
 
478
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
 
453
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
479
454
 
480
455
  /*
481
456
    We must not commit the normal transaction if a statement
483
458
    flags will not get propagated to its normal transaction's
484
459
    counterpart.
485
460
  */
486
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
487
 
              trans == &session.transaction.stmt);
 
461
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
462
              trans == &session->transaction.stmt);
488
463
 
489
464
  if (resource_contexts.empty() == false)
490
465
  {
491
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
 
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
492
467
    {
493
468
      rollbackTransaction(session, normal_transaction);
494
469
      return 1;
519
494
 
520
495
        if (resource->participatesInXaTransaction())
521
496
        {
522
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
497
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
523
498
          {
524
499
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
525
500
            error= 1;
526
501
          }
527
502
          else
528
503
          {
529
 
            session.status_var.ha_prepare_count++;
 
504
            status_var_increment(session->status_var.ha_prepare_count);
530
505
          }
531
506
        }
532
507
      }
548
523
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
549
524
end:
550
525
    if (is_real_trans)
551
 
      session.startWaitingGlobalReadLock();
 
526
      start_waiting_global_read_lock(session);
552
527
  }
553
528
  return error;
554
529
}
557
532
  @note
558
533
  This function does not care about global read lock. A caller should.
559
534
*/
560
 
int TransactionServices::commitPhaseOne(Session::reference session,
561
 
                                        bool normal_transaction)
 
535
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
562
536
{
563
537
  int error=0;
564
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
538
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
565
539
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
566
540
 
567
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
568
 
  bool all= normal_transaction;
569
 
 
570
 
  /* If we're in autocommit then we have a real transaction to commit
571
 
     (except if it's BEGIN)
572
 
  */
573
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
574
 
    all= true;
 
541
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
575
542
 
576
543
  if (resource_contexts.empty() == false)
577
544
  {
586
553
 
587
554
      if (resource->participatesInXaTransaction())
588
555
      {
589
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
556
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
590
557
        {
591
558
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
592
559
          error= 1;
593
560
        }
594
561
        else if (normal_transaction)
595
562
        {
596
 
          session.status_var.ha_commit_count++;
 
563
          status_var_increment(session->status_var.ha_commit_count);
597
564
        }
598
565
      }
599
566
      else if (resource->participatesInSqlTransaction())
600
567
      {
601
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
568
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
602
569
        {
603
570
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
604
571
          error= 1;
605
572
        }
606
573
        else if (normal_transaction)
607
574
        {
608
 
          session.status_var.ha_commit_count++;
 
575
          status_var_increment(session->status_var.ha_commit_count);
609
576
        }
610
577
      }
611
578
      resource_context->reset(); /* keep it conveniently zero-filled */
612
579
    }
613
580
 
614
581
    if (is_real_trans)
615
 
      session.transaction.xid_state.xid.null();
 
582
      session->transaction.xid_state.xid.null();
616
583
 
617
584
    if (normal_transaction)
618
585
    {
619
 
      session.variables.tx_isolation= session.session_tx_isolation;
620
 
      session.transaction.cleanup();
 
586
      session->variables.tx_isolation= session->session_tx_isolation;
 
587
      session->transaction.cleanup();
621
588
    }
622
589
  }
623
590
  trans->reset();
624
591
  return error;
625
592
}
626
593
 
627
 
int TransactionServices::rollbackTransaction(Session::reference session,
628
 
                                             bool normal_transaction)
 
594
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
629
595
{
630
596
  int error= 0;
631
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
 
597
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
632
598
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
599
 
634
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
 
600
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
636
601
 
637
602
  /*
638
603
    We must not rollback the normal transaction if a statement
639
604
    transaction is pending.
640
605
  */
641
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
642
 
              trans == &session.transaction.stmt);
 
606
  assert(session->transaction.stmt.getResourceContexts().empty() ||
 
607
              trans == &session->transaction.stmt);
643
608
 
644
609
  if (resource_contexts.empty() == false)
645
610
  {
654
619
 
655
620
      if (resource->participatesInXaTransaction())
656
621
      {
657
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
622
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
658
623
        {
659
624
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
660
625
          error= 1;
661
626
        }
662
627
        else if (normal_transaction)
663
628
        {
664
 
          session.status_var.ha_rollback_count++;
 
629
          status_var_increment(session->status_var.ha_rollback_count);
665
630
        }
666
631
      }
667
632
      else if (resource->participatesInSqlTransaction())
668
633
      {
669
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
634
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
670
635
        {
671
636
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
672
637
          error= 1;
673
638
        }
674
639
        else if (normal_transaction)
675
640
        {
676
 
          session.status_var.ha_rollback_count++;
 
641
          status_var_increment(session->status_var.ha_rollback_count);
677
642
        }
678
643
      }
679
644
      resource_context->reset(); /* keep it conveniently zero-filled */
686
651
     * a rollback statement with the corresponding transaction ID
687
652
     * to rollback.
688
653
     */
689
 
    if (all)
690
 
      rollbackTransactionMessage(session);
691
 
    else
692
 
      rollbackStatementMessage(session);
 
654
    rollbackTransactionMessage(session);
693
655
 
694
656
    if (is_real_trans)
695
 
      session.transaction.xid_state.xid.null();
 
657
      session->transaction.xid_state.xid.null();
696
658
    if (normal_transaction)
697
659
    {
698
 
      session.variables.tx_isolation=session.session_tx_isolation;
699
 
      session.transaction.cleanup();
 
660
      session->variables.tx_isolation=session->session_tx_isolation;
 
661
      session->transaction.cleanup();
700
662
    }
701
663
  }
702
664
  if (normal_transaction)
703
 
    session.transaction_rollback_request= false;
 
665
    session->transaction_rollback_request= false;
704
666
 
705
667
  /*
706
668
   * If a non-transactional table was updated, warn the user
707
669
   */
708
670
  if (is_real_trans &&
709
 
      session.transaction.all.hasModifiedNonTransData() &&
710
 
      session.getKilled() != Session::KILL_CONNECTION)
 
671
      session->transaction.all.hasModifiedNonTransData() &&
 
672
      session->killed != Session::KILL_CONNECTION)
711
673
  {
712
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
674
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
675
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
676
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
715
677
  }
717
679
  return error;
718
680
}
719
681
 
720
 
int TransactionServices::autocommitOrRollback(Session::reference session,
721
 
                                              int error)
 
682
/**
 
683
  This is used to commit or rollback a single statement depending on
 
684
  the value of error.
 
685
 
 
686
  @note
 
687
    Note that if the autocommit is on, then the following call inside
 
688
    InnoDB will commit or rollback the whole transaction (= the statement). The
 
689
    autocommit mechanism built into InnoDB is based on counting locks, but if
 
690
    the user has used LOCK TABLES then that mechanism does not know to do the
 
691
    commit.
 
692
*/
 
693
int TransactionServices::autocommitOrRollback(Session *session, int error)
722
694
{
723
 
  /* One GPB Statement message per SQL statement */
724
 
  message::Statement *statement= session.getStatementMessage();
725
 
  if ((statement != NULL) && (! error))
726
 
    finalizeStatementMessage(*statement, session);
727
 
 
728
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
 
695
  if (session->transaction.stmt.getResourceContexts().empty() == false)
729
696
  {
730
 
    TransactionContext *trans = &session.transaction.stmt;
731
 
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
732
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
733
 
         it != resource_contexts.end();
734
 
         ++it)
735
 
    {
736
 
      ResourceContext *resource_context= *it;
737
 
 
738
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
739
 
    }
740
 
 
741
697
    if (! error)
742
698
    {
743
699
      if (commitTransaction(session, false))
746
702
    else
747
703
    {
748
704
      (void) rollbackTransaction(session, false);
749
 
      if (session.transaction_rollback_request)
750
 
      {
 
705
      if (session->transaction_rollback_request)
751
706
        (void) rollbackTransaction(session, true);
752
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
753
 
      }
754
707
    }
755
708
 
756
 
    session.variables.tx_isolation= session.session_tx_isolation;
 
709
    session->variables.tx_isolation= session->session_tx_isolation;
757
710
  }
758
 
 
759
711
  return error;
760
712
}
761
713
 
769
721
  }
770
722
};
771
723
 
772
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
773
 
                                             NamedSavepoint &sv)
 
724
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
774
725
{
775
726
  int error= 0;
776
 
  TransactionContext *trans= &session.transaction.all;
 
727
  TransactionContext *trans= &session->transaction.all;
777
728
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
778
729
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
779
730
 
793
744
 
794
745
    if (resource->participatesInSqlTransaction())
795
746
    {
796
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
747
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
797
748
      {
798
749
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
799
750
        error= 1;
800
751
      }
801
752
      else
802
753
      {
803
 
        session.status_var.ha_savepoint_rollback_count++;
 
754
        status_var_increment(session->status_var.ha_savepoint_rollback_count);
804
755
      }
805
756
    }
806
757
    trans->no_2pc|= not resource->participatesInXaTransaction();
850
801
 
851
802
      if (resource->participatesInSqlTransaction())
852
803
      {
853
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
804
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
854
805
        {
855
806
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
856
807
          error= 1;
857
808
        }
858
809
        else
859
810
        {
860
 
          session.status_var.ha_rollback_count++;
 
811
          status_var_increment(session->status_var.ha_rollback_count);
861
812
        }
862
813
      }
863
814
      resource_context->reset(); /* keep it conveniently zero-filled */
864
815
    }
865
816
  }
866
817
  trans->setResourceContexts(sv_resource_contexts);
867
 
 
868
 
  if (shouldConstructMessages())
869
 
  {
870
 
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
871
 
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
872
 
    if (savepoint_transaction != NULL)
873
 
    {
874
 
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
875
 
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
876
 
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
877
 
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
878
 
      */ 
879
 
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
880
 
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
881
 
      if (num_statements == 0)
882
 
      {    
883
 
        session.setStatementMessage(NULL);
884
 
      }    
885
 
      else 
886
 
      {
887
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
888
 
      }    
889
 
      session.setTransactionMessage(savepoint_transaction_copy);
890
 
    }
891
 
  }
892
 
 
893
818
  return error;
894
819
}
895
820
 
899
824
  section "4.33.4 SQL-statements and transaction states",
900
825
  NamedSavepoint is *not* transaction-initiating SQL-statement
901
826
*/
902
 
int TransactionServices::setSavepoint(Session::reference session,
903
 
                                      NamedSavepoint &sv)
 
827
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
904
828
{
905
829
  int error= 0;
906
 
  TransactionContext *trans= &session.transaction.all;
 
830
  TransactionContext *trans= &session->transaction.all;
907
831
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
908
832
 
909
833
  if (resource_contexts.empty() == false)
919
843
 
920
844
      if (resource->participatesInSqlTransaction())
921
845
      {
922
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
846
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
923
847
        {
924
848
          my_error(ER_GET_ERRNO, MYF(0), err);
925
849
          error= 1;
926
850
        }
927
851
        else
928
852
        {
929
 
          session.status_var.ha_savepoint_count++;
 
853
          status_var_increment(session->status_var.ha_savepoint_count);
930
854
        }
931
855
      }
932
856
    }
935
859
    Remember the list of registered storage engines.
936
860
  */
937
861
  sv.setResourceContexts(resource_contexts);
938
 
 
939
 
  if (shouldConstructMessages())
940
 
  {
941
 
    message::Transaction *transaction= session.getTransactionMessage();
942
 
                  
943
 
    if (transaction != NULL)
944
 
    {
945
 
      message::Transaction *transaction_savepoint= 
946
 
        new message::Transaction(*transaction);
947
 
      sv.setTransactionMessage(transaction_savepoint);
948
 
    }
949
 
  } 
950
 
 
951
862
  return error;
952
863
}
953
864
 
954
 
int TransactionServices::releaseSavepoint(Session::reference session,
955
 
                                          NamedSavepoint &sv)
 
865
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
956
866
{
957
867
  int error= 0;
958
868
 
969
879
 
970
880
    if (resource->participatesInSqlTransaction())
971
881
    {
972
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
882
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
973
883
      {
974
884
        my_error(ER_GET_ERRNO, MYF(0), err);
975
885
        error= 1;
976
886
      }
977
887
    }
978
888
  }
979
 
  
980
889
  return error;
981
890
}
982
891
 
986
895
  return replication_services.isActive();
987
896
}
988
897
 
989
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
990
 
                                                                       bool should_inc_trx_id)
 
898
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
991
899
{
992
 
  message::Transaction *transaction= session.getTransactionMessage();
 
900
  message::Transaction *transaction= in_session->getTransactionMessage();
993
901
 
994
902
  if (unlikely(transaction == NULL))
995
903
  {
999
907
     * deleting transaction message when done with it.
1000
908
     */
1001
909
    transaction= new (nothrow) message::Transaction();
1002
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
 
    session.setTransactionMessage(transaction);
1004
 
    return transaction;
1005
 
  }
1006
 
  else
1007
 
    return transaction;
1008
 
}
1009
 
 
1010
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
 
                                                 Session::reference session,
1012
 
                                                 bool should_inc_trx_id)
1013
 
{
1014
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
 
  trx->set_server_id(session.getServerId());
1016
 
 
1017
 
  if (should_inc_trx_id)
1018
 
  {
1019
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1020
 
    session.setXaId(0);
1021
 
  }
1022
 
  else
1023
 
  {
1024
 
    /* trx and seg id will get set properly elsewhere */
1025
 
    trx->set_transaction_id(0);
1026
 
  }
1027
 
 
1028
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1029
 
  
1030
 
  /* segment info may get set elsewhere as needed */
1031
 
  transaction.set_segment_id(1);
1032
 
  transaction.set_end_segment(true);
1033
 
}
1034
 
 
1035
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
 
                                                     Session::const_reference session)
1037
 
{
1038
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1040
 
}
1041
 
 
1042
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
 
                                                    Session::reference session)
1044
 
{
1045
 
  delete transaction;
1046
 
  session.setStatementMessage(NULL);
1047
 
  session.setTransactionMessage(NULL);
1048
 
  session.setXaId(0);
1049
 
}
1050
 
 
1051
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
910
    initTransactionMessage(*transaction, in_session);
 
911
    in_session->setTransactionMessage(transaction);
 
912
    return transaction;
 
913
  }
 
914
  else
 
915
    return transaction;
 
916
}
 
917
 
 
918
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
919
                                          Session *in_session)
 
920
{
 
921
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
922
  trx->set_server_id(in_session->getServerId());
 
923
  trx->set_transaction_id(getNextTransactionId());
 
924
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
925
}
 
926
 
 
927
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
928
                                              Session *in_session)
 
929
{
 
930
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
931
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
932
}
 
933
 
 
934
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
935
                                             Session *in_session)
 
936
{
 
937
  delete in_transaction;
 
938
  in_session->setStatementMessage(NULL);
 
939
  in_session->setTransactionMessage(NULL);
 
940
}
 
941
 
 
942
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
943
{
1053
944
  ReplicationServices &replication_services= ReplicationServices::singleton();
1054
945
  if (! replication_services.isActive())
1055
946
    return 0;
1056
947
 
1057
 
  /*
1058
 
   * If no Transaction message was ever created, then no data modification
1059
 
   * occurred inside the transaction, so nothing to do.
1060
 
   */
1061
 
  if (session.getTransactionMessage() == NULL)
1062
 
    return 0;
1063
 
  
1064
 
  /* If there is an active statement message, finalize it. */
1065
 
  message::Statement *statement= session.getStatementMessage();
 
948
  /* If there is an active statement message, finalize it */
 
949
  message::Statement *statement= in_session->getStatementMessage();
1066
950
 
1067
951
  if (statement != NULL)
1068
952
  {
1069
 
    finalizeStatementMessage(*statement, session);
1070
 
  }
1071
 
 
1072
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
1073
 
 
1074
 
  /*
1075
 
   * It is possible that we could have a Transaction without any Statements
1076
 
   * if we had created a Statement but had to roll it back due to it failing
1077
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1078
 
   * message. In this case, we simply clean up the message and not push it.
1079
 
   */
1080
 
  if (transaction->statement_size() == 0)
1081
 
  {
1082
 
    cleanupTransactionMessage(transaction, session);
1083
 
    return 0;
1084
 
  }
1085
 
  
1086
 
  finalizeTransactionMessage(*transaction, session);
1087
 
  
1088
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
 
 
1090
 
  cleanupTransactionMessage(transaction, session);
 
953
    finalizeStatementMessage(*statement, in_session);
 
954
  }
 
955
  else
 
956
    return 0; /* No data modification occurred inside the transaction */
 
957
  
 
958
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
959
 
 
960
  finalizeTransactionMessage(*transaction, in_session);
 
961
  
 
962
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
963
 
 
964
  cleanupTransactionMessage(transaction, in_session);
1091
965
 
1092
966
  return static_cast<int>(result);
1093
967
}
1094
968
 
1095
969
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
 
                                               message::Statement::Type type,
1097
 
                                               Session::const_reference session)
 
970
                                        message::Statement::Type in_type,
 
971
                                        Session *in_session)
1098
972
{
1099
 
  statement.set_type(type);
1100
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
1101
 
 
1102
 
  if (session.variables.replicate_query)
1103
 
    statement.set_sql(session.getQueryString()->c_str());
 
973
  statement.set_type(in_type);
 
974
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
975
  /** @TODO Set sql string optionally */
1104
976
}
1105
977
 
1106
978
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
 
                                                   Session::reference session)
 
979
                                            Session *in_session)
1108
980
{
1109
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1110
 
  session.setStatementMessage(NULL);
 
981
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
982
  in_session->setStatementMessage(NULL);
1111
983
}
1112
984
 
1113
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
985
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
986
{
1115
987
  ReplicationServices &replication_services= ReplicationServices::singleton();
1116
988
  if (! replication_services.isActive())
1117
989
    return;
1118
990
  
1119
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
991
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1120
992
 
1121
993
  /*
1122
994
   * OK, so there are two situations that we need to deal with here:
1137
1009
   */
1138
1010
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1139
1011
  {
1140
 
    /* Remember the transaction ID so we can re-use it */
1141
 
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1142
 
    uint32_t seg_id= transaction->segment_id();
1143
 
 
1144
1012
    /*
1145
1013
     * Clear the transaction, create a Rollback statement message, 
1146
1014
     * attach it to the transaction, and push it to replicators.
1147
1015
     */
1148
1016
    transaction->Clear();
1149
 
    initTransactionMessage(*transaction, session, false);
1150
 
 
1151
 
    /* Set the transaction ID to match the previous messages */
1152
 
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1153
 
    transaction->set_segment_id(seg_id);
1154
 
    transaction->set_end_segment(true);
 
1017
    initTransactionMessage(*transaction, in_session);
1155
1018
 
1156
1019
    message::Statement *statement= transaction->add_statement();
1157
1020
 
1158
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
 
    finalizeStatementMessage(*statement, session);
 
1021
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1022
    finalizeStatementMessage(*statement, in_session);
1160
1023
 
1161
 
    finalizeTransactionMessage(*transaction, session);
 
1024
    finalizeTransactionMessage(*transaction, in_session);
1162
1025
    
1163
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
1164
 
  }
1165
 
 
1166
 
  cleanupTransactionMessage(transaction, session);
1167
 
}
1168
 
 
1169
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
1170
 
{
1171
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1172
 
  if (! replication_services.isActive())
1173
 
    return;
1174
 
 
1175
 
  message::Statement *current_statement= session.getStatementMessage();
1176
 
 
1177
 
  /* If we never added a Statement message, nothing to undo. */
1178
 
  if (current_statement == NULL)
1179
 
    return;
1180
 
 
1181
 
  /*
1182
 
   * If the Statement has been segmented, then we've already pushed a portion
1183
 
   * of this Statement's row changes through the replication stream and we
1184
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1185
 
   * delete the current Statement message.
1186
 
   */
1187
 
  bool is_segmented= false;
1188
 
 
1189
 
  switch (current_statement->type())
1190
 
  {
1191
 
    case message::Statement::INSERT:
1192
 
      if (current_statement->insert_data().segment_id() > 1)
1193
 
        is_segmented= true;
1194
 
      break;
1195
 
 
1196
 
    case message::Statement::UPDATE:
1197
 
      if (current_statement->update_data().segment_id() > 1)
1198
 
        is_segmented= true;
1199
 
      break;
1200
 
 
1201
 
    case message::Statement::DELETE:
1202
 
      if (current_statement->delete_data().segment_id() > 1)
1203
 
        is_segmented= true;
1204
 
      break;
1205
 
 
1206
 
    default:
1207
 
      break;
1208
 
  }
1209
 
 
1210
 
  /*
1211
 
   * Remove the Statement message we've been working with (same as
1212
 
   * current_statement).
1213
 
   */
1214
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1215
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
 
  statements_in_txn= transaction->mutable_statement();
1217
 
  statements_in_txn->RemoveLast();
1218
 
  session.setStatementMessage(NULL);
1219
 
  
1220
 
  /*
1221
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1222
 
   * an indicator to cancel the previous Statement message which should have
1223
 
   * had its end_segment attribute set to false.
1224
 
   */
1225
 
  if (is_segmented)
1226
 
  {
1227
 
    current_statement= transaction->add_statement();
1228
 
    initStatementMessage(*current_statement,
1229
 
                         message::Statement::ROLLBACK_STATEMENT,
1230
 
                         session);
1231
 
    finalizeStatementMessage(*current_statement, session);
1232
 
  }
1233
 
}
1234
 
 
1235
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1236
 
                                                                     message::Transaction *transaction)
1237
 
{
1238
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
 
  uint32_t seg_id= transaction->segment_id();
1240
 
  
1241
 
  transaction->set_end_segment(false);
1242
 
  commitTransactionMessage(session);
1243
 
  transaction= getActiveTransactionMessage(session, false);
1244
 
  
1245
 
  /* Set the transaction ID to match the previous messages */
1246
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1247
 
  transaction->set_segment_id(seg_id + 1);
1248
 
  transaction->set_end_segment(true);
1249
 
 
1250
 
  return transaction;
1251
 
}
1252
 
 
1253
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1254
 
                                                            Table &table,
1255
 
                                                            uint32_t *next_segment_id)
1256
 
{
1257
 
  message::Statement *statement= session.getStatementMessage();
1258
 
  message::Transaction *transaction= NULL;
1259
 
  
1260
 
  /*
1261
 
   * If statement is NULL, this is a new statement.
1262
 
   * If statement is NOT NULL, this a continuation of the same statement.
1263
 
   * This is because autocommitOrRollback() finalizes the statement so that
1264
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1265
 
   * share a single GPB message for multiple statements).
1266
 
   */
 
1026
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1027
  }
 
1028
  cleanupTransactionMessage(transaction, in_session);
 
1029
}
 
1030
 
 
1031
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1032
                                                                 Table *in_table)
 
1033
{
 
1034
  message::Statement *statement= in_session->getStatementMessage();
 
1035
 
 
1036
  /* 
 
1037
   * Check the type for the current Statement message, if it is anything
 
1038
   * other then INSERT we need to call finalize, this will ensure a 
 
1039
   * new InsertStatement is created. If it is of type INSERT check
 
1040
   * what table the INSERT belongs to, if it is a different table
 
1041
   * call finalize, so a new InsertStatement can be created. 
 
1042
   */
 
1043
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1044
  {
 
1045
    finalizeStatementMessage(*statement, in_session);
 
1046
    statement= in_session->getStatementMessage();
 
1047
  } 
 
1048
  else if (statement != NULL)
 
1049
  {
 
1050
    const message::InsertHeader &insert_header= statement->insert_header();
 
1051
    string old_table_name= insert_header.table_metadata().table_name();
 
1052
     
 
1053
    string current_table_name;
 
1054
    (void) in_table->getShare()->getTableName(current_table_name);
 
1055
    if (current_table_name.compare(old_table_name))
 
1056
    {
 
1057
      finalizeStatementMessage(*statement, in_session);
 
1058
      statement= in_session->getStatementMessage();
 
1059
    }
 
1060
  } 
 
1061
 
1267
1062
  if (statement == NULL)
1268
1063
  {
1269
 
    transaction= getActiveTransactionMessage(session);
1270
 
 
1271
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1272
 
        transaction_message_threshold)
1273
 
    {
1274
 
      transaction= segmentTransactionMessage(session, transaction);
1275
 
    }
1276
 
 
 
1064
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1065
    /* 
 
1066
     * Transaction message initialized and set, but no statement created
 
1067
     * yet.  We construct one and initialize it, here, then return the
 
1068
     * message after attaching the new Statement message pointer to the 
 
1069
     * Session for easy retrieval later...
 
1070
     */
1277
1071
    statement= transaction->add_statement();
1278
 
    setInsertHeader(*statement, session, table);
1279
 
    session.setStatementMessage(statement);
1280
 
  }
1281
 
  else
1282
 
  {
1283
 
    transaction= getActiveTransactionMessage(session);
1284
 
    
1285
 
    /*
1286
 
     * If we've passed our threshold for the statement size (possible for
1287
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1288
 
     * the Transaction will keep it from getting huge).
1289
 
     */
1290
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1291
 
        transaction_message_threshold)
1292
 
    {
1293
 
      /* Remember the transaction ID so we can re-use it */
1294
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1295
 
      uint32_t seg_id= transaction->segment_id();
1296
 
      
1297
 
      message::InsertData *current_data= statement->mutable_insert_data();
1298
 
      
1299
 
      /* Caller should use this value when adding a new record */
1300
 
      *next_segment_id= current_data->segment_id() + 1;
1301
 
      
1302
 
      current_data->set_end_segment(false);
1303
 
      transaction->set_end_segment(false);
1304
 
      
1305
 
      /* 
1306
 
       * Send the trx message to replicators after finalizing the 
1307
 
       * statement and transaction. This will also set the Transaction
1308
 
       * and Statement objects in Session to NULL.
1309
 
       */
1310
 
      commitTransactionMessage(session);
1311
 
      
1312
 
      /*
1313
 
       * Statement and Transaction should now be NULL, so new ones will get
1314
 
       * created. We reuse the transaction id since we are segmenting
1315
 
       * one transaction.
1316
 
       */
1317
 
      transaction= getActiveTransactionMessage(session, false);
1318
 
      assert(transaction != NULL);
1319
 
 
1320
 
      statement= transaction->add_statement();
1321
 
      setInsertHeader(*statement, session, table);
1322
 
      session.setStatementMessage(statement);
1323
 
            
1324
 
      /* Set the transaction ID to match the previous messages */
1325
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1326
 
      transaction->set_segment_id(seg_id + 1);
1327
 
      transaction->set_end_segment(true);
1328
 
    }
1329
 
    else
1330
 
    {
1331
 
      /*
1332
 
       * Continuation of the same statement. Carry forward the existing
1333
 
       * segment id.
1334
 
       */
1335
 
      const message::InsertData &current_data= statement->insert_data();
1336
 
      *next_segment_id= current_data.segment_id();
1337
 
    }
1338
 
  }
1339
 
  
 
1072
    setInsertHeader(*statement, in_session, in_table);
 
1073
    in_session->setStatementMessage(statement);
 
1074
  }
1340
1075
  return *statement;
1341
1076
}
1342
1077
 
1343
1078
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
 
                                          Session::const_reference session,
1345
 
                                          Table &table)
 
1079
                                          Session *in_session,
 
1080
                                          Table *in_table)
1346
1081
{
1347
 
  initStatementMessage(statement, message::Statement::INSERT, session);
 
1082
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1348
1083
 
1349
1084
  /* 
1350
1085
   * Now we construct the specialized InsertHeader message inside
1355
1090
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1356
1091
 
1357
1092
  string schema_name;
1358
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1093
  (void) in_table->getShare()->getSchemaName(schema_name);
1359
1094
  string table_name;
1360
 
  (void) table.getShare()->getTableName(table_name);
 
1095
  (void) in_table->getShare()->getTableName(table_name);
1361
1096
 
1362
1097
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1363
1098
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1364
1099
 
1365
1100
  Field *current_field;
1366
 
  Field **table_fields= table.getFields();
 
1101
  Field **table_fields= in_table->getFields();
1367
1102
 
1368
1103
  message::FieldMetadata *field_metadata;
1369
1104
 
1370
1105
  /* We will read all the table's fields... */
1371
 
  table.setReadSet();
 
1106
  in_table->setReadSet();
1372
1107
 
1373
1108
  while ((current_field= *table_fields++) != NULL) 
1374
1109
  {
1378
1113
  }
1379
1114
}
1380
1115
 
1381
 
bool TransactionServices::insertRecord(Session::reference session,
1382
 
                                       Table &table)
 
1116
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1383
1117
{
1384
1118
  ReplicationServices &replication_services= ReplicationServices::singleton();
1385
1119
  if (! replication_services.isActive())
1386
1120
    return false;
1387
 
 
1388
 
  if (not table.getShare()->replicate())
1389
 
    return false;
1390
 
 
1391
1121
  /**
1392
1122
   * We do this check here because we don't want to even create a 
1393
1123
   * statement if there isn't a primary key on the table...
1396
1126
   *
1397
1127
   * Multi-column primary keys are handled how exactly?
1398
1128
   */
1399
 
  if (not table.getShare()->hasPrimaryKey())
 
1129
  if (not in_table->getShare()->hasPrimaryKey())
1400
1130
  {
1401
1131
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1402
1132
    return true;
1403
1133
  }
1404
1134
 
1405
 
  uint32_t next_segment_id= 1;
1406
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
 
1135
  message::Statement &statement= getInsertStatement(in_session, in_table);
1407
1136
 
1408
1137
  message::InsertData *data= statement.mutable_insert_data();
1409
 
  data->set_segment_id(next_segment_id);
 
1138
  data->set_segment_id(1);
1410
1139
  data->set_end_segment(true);
1411
1140
  message::InsertRecord *record= data->add_record();
1412
1141
 
1413
1142
  Field *current_field;
1414
 
  Field **table_fields= table.getFields();
 
1143
  Field **table_fields= in_table->getFields();
1415
1144
 
1416
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1145
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1417
1146
  string_value->set_charset(system_charset_info);
1418
1147
 
1419
1148
  /* We will read all the table's fields... */
1420
 
  table.setReadSet();
 
1149
  in_table->setReadSet();
1421
1150
 
1422
1151
  while ((current_field= *table_fields++) != NULL) 
1423
1152
  {
1428
1157
    } 
1429
1158
    else 
1430
1159
    {
1431
 
      string_value= current_field->val_str_internal(string_value);
 
1160
      string_value= current_field->val_str(string_value);
1432
1161
      record->add_is_null(false);
1433
1162
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1434
1163
      string_value->free();
1437
1166
  return false;
1438
1167
}
1439
1168
 
1440
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1441
 
                                                            Table &table,
 
1169
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1170
                                                            Table *in_table,
1442
1171
                                                            const unsigned char *old_record, 
1443
 
                                                            const unsigned char *new_record,
1444
 
                                                            uint32_t *next_segment_id)
 
1172
                                                            const unsigned char *new_record)
1445
1173
{
1446
 
  message::Statement *statement= session.getStatementMessage();
1447
 
  message::Transaction *transaction= NULL;
 
1174
  message::Statement *statement= in_session->getStatementMessage();
1448
1175
 
1449
1176
  /*
1450
 
   * If statement is NULL, this is a new statement.
1451
 
   * If statement is NOT NULL, this a continuation of the same statement.
1452
 
   * This is because autocommitOrRollback() finalizes the statement so that
1453
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1454
 
   * share a single GPB message for multiple statements).
 
1177
   * Check the type for the current Statement message, if it is anything
 
1178
   * other then UPDATE we need to call finalize, this will ensure a
 
1179
   * new UpdateStatement is created. If it is of type UPDATE check
 
1180
   * what table the UPDATE belongs to, if it is a different table
 
1181
   * call finalize, so a new UpdateStatement can be created.
1455
1182
   */
 
1183
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
 
1184
  {
 
1185
    finalizeStatementMessage(*statement, in_session);
 
1186
    statement= in_session->getStatementMessage();
 
1187
  }
 
1188
  else if (statement != NULL)
 
1189
  {
 
1190
    const message::UpdateHeader &update_header= statement->update_header();
 
1191
    string old_table_name= update_header.table_metadata().table_name();
 
1192
 
 
1193
    string current_table_name;
 
1194
    (void) in_table->getShare()->getTableName(current_table_name);
 
1195
    if (current_table_name.compare(old_table_name))
 
1196
    {
 
1197
      finalizeStatementMessage(*statement, in_session);
 
1198
      statement= in_session->getStatementMessage();
 
1199
    }
 
1200
  }
 
1201
 
1456
1202
  if (statement == NULL)
1457
1203
  {
1458
 
    transaction= getActiveTransactionMessage(session);
1459
 
    
1460
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1461
 
        transaction_message_threshold)
1462
 
    {
1463
 
      transaction= segmentTransactionMessage(session, transaction);
1464
 
    }
1465
 
    
 
1204
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1205
    /* 
 
1206
     * Transaction message initialized and set, but no statement created
 
1207
     * yet.  We construct one and initialize it, here, then return the
 
1208
     * message after attaching the new Statement message pointer to the 
 
1209
     * Session for easy retrieval later...
 
1210
     */
1466
1211
    statement= transaction->add_statement();
1467
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1468
 
    session.setStatementMessage(statement);
1469
 
  }
1470
 
  else
1471
 
  {
1472
 
    transaction= getActiveTransactionMessage(session);
1473
 
    
1474
 
    /*
1475
 
     * If we've passed our threshold for the statement size (possible for
1476
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1477
 
     * the Transaction will keep it from getting huge).
1478
 
     */
1479
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1480
 
        transaction_message_threshold)
1481
 
    {
1482
 
      /* Remember the transaction ID so we can re-use it */
1483
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1484
 
      uint32_t seg_id= transaction->segment_id();
1485
 
      
1486
 
      message::UpdateData *current_data= statement->mutable_update_data();
1487
 
      
1488
 
      /* Caller should use this value when adding a new record */
1489
 
      *next_segment_id= current_data->segment_id() + 1;
1490
 
      
1491
 
      current_data->set_end_segment(false);
1492
 
      transaction->set_end_segment(false);
1493
 
      
1494
 
      /* 
1495
 
       * Send the trx message to replicators after finalizing the 
1496
 
       * statement and transaction. This will also set the Transaction
1497
 
       * and Statement objects in Session to NULL.
1498
 
       */
1499
 
      commitTransactionMessage(session);
1500
 
      
1501
 
      /*
1502
 
       * Statement and Transaction should now be NULL, so new ones will get
1503
 
       * created. We reuse the transaction id since we are segmenting
1504
 
       * one transaction.
1505
 
       */
1506
 
      transaction= getActiveTransactionMessage(session, false);
1507
 
      assert(transaction != NULL);
1508
 
      
1509
 
      statement= transaction->add_statement();
1510
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1511
 
      session.setStatementMessage(statement);
1512
 
      
1513
 
      /* Set the transaction ID to match the previous messages */
1514
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1515
 
      transaction->set_segment_id(seg_id + 1);
1516
 
      transaction->set_end_segment(true);
1517
 
    }
1518
 
    else
1519
 
    {
1520
 
      /*
1521
 
       * Continuation of the same statement. Carry forward the existing
1522
 
       * segment id.
1523
 
       */
1524
 
      const message::UpdateData &current_data= statement->update_data();
1525
 
      *next_segment_id= current_data.segment_id();
1526
 
    }
1527
 
  }
1528
 
  
 
1212
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1213
    in_session->setStatementMessage(statement);
 
1214
  }
1529
1215
  return *statement;
1530
1216
}
1531
1217
 
1532
1218
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
 
                                          Session::const_reference session,
1534
 
                                          Table &table,
 
1219
                                          Session *in_session,
 
1220
                                          Table *in_table,
1535
1221
                                          const unsigned char *old_record, 
1536
1222
                                          const unsigned char *new_record)
1537
1223
{
1538
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
 
1224
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1539
1225
 
1540
1226
  /* 
1541
1227
   * Now we construct the specialized UpdateHeader message inside
1546
1232
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1547
1233
 
1548
1234
  string schema_name;
1549
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1235
  (void) in_table->getShare()->getSchemaName(schema_name);
1550
1236
  string table_name;
1551
 
  (void) table.getShare()->getTableName(table_name);
 
1237
  (void) in_table->getShare()->getTableName(table_name);
1552
1238
 
1553
1239
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1554
1240
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1555
1241
 
1556
1242
  Field *current_field;
1557
 
  Field **table_fields= table.getFields();
 
1243
  Field **table_fields= in_table->getFields();
1558
1244
 
1559
1245
  message::FieldMetadata *field_metadata;
1560
1246
 
1561
1247
  /* We will read all the table's fields... */
1562
 
  table.setReadSet();
 
1248
  in_table->setReadSet();
1563
1249
 
1564
1250
  while ((current_field= *table_fields++) != NULL) 
1565
1251
  {
1567
1253
     * We add the "key field metadata" -- i.e. the fields which is
1568
1254
     * the primary key for the table.
1569
1255
     */
1570
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1256
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1571
1257
    {
1572
1258
      field_metadata= header->add_key_field_metadata();
1573
1259
      field_metadata->set_name(current_field->field_name);
1574
1260
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1575
1261
    }
1576
1262
 
1577
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1263
    /*
 
1264
     * The below really should be moved into the Field API and Record API.  But for now
 
1265
     * we do this crazy pointer fiddling to figure out if the current field
 
1266
     * has been updated in the supplied record raw byte pointers.
 
1267
     */
 
1268
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
 
1269
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
 
1270
 
 
1271
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1272
 
 
1273
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1578
1274
    {
1579
1275
      /* Field is changed from old to new */
1580
1276
      field_metadata= header->add_set_field_metadata();
1583
1279
    }
1584
1280
  }
1585
1281
}
1586
 
 
1587
 
void TransactionServices::updateRecord(Session::reference session,
1588
 
                                       Table &table, 
 
1282
void TransactionServices::updateRecord(Session *in_session,
 
1283
                                       Table *in_table, 
1589
1284
                                       const unsigned char *old_record, 
1590
1285
                                       const unsigned char *new_record)
1591
1286
{
1593
1288
  if (! replication_services.isActive())
1594
1289
    return;
1595
1290
 
1596
 
  if (not table.getShare()->replicate())
1597
 
    return;
1598
 
 
1599
 
  uint32_t next_segment_id= 1;
1600
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
 
1291
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1601
1292
 
1602
1293
  message::UpdateData *data= statement.mutable_update_data();
1603
 
  data->set_segment_id(next_segment_id);
 
1294
  data->set_segment_id(1);
1604
1295
  data->set_end_segment(true);
1605
1296
  message::UpdateRecord *record= data->add_record();
1606
1297
 
1607
1298
  Field *current_field;
1608
 
  Field **table_fields= table.getFields();
1609
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1299
  Field **table_fields= in_table->getFields();
 
1300
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1610
1301
  string_value->set_charset(system_charset_info);
1611
1302
 
1612
1303
  while ((current_field= *table_fields++) != NULL) 
1621
1312
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1622
1313
     *
1623
1314
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
1315
     *
 
1316
     * The below really should be moved into the Field API and Record API.  But for now
 
1317
     * we do this crazy pointer fiddling to figure out if the current field
 
1318
     * has been updated in the supplied record raw byte pointers.
1624
1319
     */
1625
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
 
1320
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
 
1321
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
 
1322
 
 
1323
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1324
 
 
1325
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1626
1326
    {
1627
1327
      /* Store the original "read bit" for this field */
1628
1328
      bool is_read_set= current_field->isReadSet();
1629
1329
 
1630
1330
      /* We need to mark that we will "read" this field... */
1631
 
      table.setReadSet(current_field->position());
 
1331
      in_table->setReadSet(current_field->field_index);
1632
1332
 
1633
1333
      /* Read the string value of this field's contents */
1634
 
      string_value= current_field->val_str_internal(string_value);
 
1334
      string_value= current_field->val_str(string_value);
1635
1335
 
1636
1336
      /* 
1637
1337
       * Reset the read bit after reading field to its original state.  This 
1657
1357
     * primary key field value.  Replication only supports tables
1658
1358
     * with a primary key.
1659
1359
     */
1660
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1360
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1661
1361
    {
1662
1362
      /**
1663
1363
       * To say the below is ugly is an understatement. But it works.
1664
1364
       * 
1665
1365
       * @todo Move this crap into a real Record API.
1666
1366
       */
1667
 
      string_value= current_field->val_str_internal(string_value,
1668
 
                                                    old_record + 
1669
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
 
1367
      string_value= current_field->val_str(string_value,
 
1368
                                           old_record + 
 
1369
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1670
1370
      record->add_key_value(string_value->c_ptr(), string_value->length());
1671
1371
      string_value->free();
1672
1372
    }
1674
1374
  }
1675
1375
}
1676
1376
 
1677
 
bool TransactionServices::isFieldUpdated(Field *current_field,
1678
 
                                         Table &table,
1679
 
                                         const unsigned char *old_record,
1680
 
                                         const unsigned char *new_record)
1681
 
{
1682
 
  /*
1683
 
   * The below really should be moved into the Field API and Record API.  But for now
1684
 
   * we do this crazy pointer fiddling to figure out if the current field
1685
 
   * has been updated in the supplied record raw byte pointers.
1686
 
   */
1687
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1689
 
 
1690
 
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1691
 
 
1692
 
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1693
 
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1694
 
 
1695
 
  bool isUpdated= false;
1696
 
  if (old_value_is_null != new_value_is_null)
1697
 
  {
1698
 
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1699
 
    {
1700
 
      isUpdated= true;
1701
 
    }
1702
 
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1703
 
    {
1704
 
      isUpdated= true;
1705
 
    }
1706
 
  }
1707
 
 
1708
 
  if (! isUpdated)
1709
 
  {
1710
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1711
 
    {
1712
 
      isUpdated= true;
1713
 
    }
1714
 
  }
1715
 
  return isUpdated;
1716
 
}  
1717
 
 
1718
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1719
 
                                                            Table &table,
1720
 
                                                            uint32_t *next_segment_id)
1721
 
{
1722
 
  message::Statement *statement= session.getStatementMessage();
1723
 
  message::Transaction *transaction= NULL;
1724
 
 
1725
 
  /*
1726
 
   * If statement is NULL, this is a new statement.
1727
 
   * If statement is NOT NULL, this a continuation of the same statement.
1728
 
   * This is because autocommitOrRollback() finalizes the statement so that
1729
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1730
 
   * share a single GPB message for multiple statements).
1731
 
   */
 
1377
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1378
                                                            Table *in_table)
 
1379
{
 
1380
  message::Statement *statement= in_session->getStatementMessage();
 
1381
 
 
1382
  /*
 
1383
   * Check the type for the current Statement message, if it is anything
 
1384
   * other then DELETE we need to call finalize, this will ensure a
 
1385
   * new DeleteStatement is created. If it is of type DELETE check
 
1386
   * what table the DELETE belongs to, if it is a different table
 
1387
   * call finalize, so a new DeleteStatement can be created.
 
1388
   */
 
1389
  if (statement != NULL && statement->type() != message::Statement::DELETE)
 
1390
  {
 
1391
    finalizeStatementMessage(*statement, in_session);
 
1392
    statement= in_session->getStatementMessage();
 
1393
  }
 
1394
  else if (statement != NULL)
 
1395
  {
 
1396
    const message::DeleteHeader &delete_header= statement->delete_header();
 
1397
    string old_table_name= delete_header.table_metadata().table_name();
 
1398
 
 
1399
    string current_table_name;
 
1400
    (void) in_table->getShare()->getTableName(current_table_name);
 
1401
    if (current_table_name.compare(old_table_name))
 
1402
    {
 
1403
      finalizeStatementMessage(*statement, in_session);
 
1404
      statement= in_session->getStatementMessage();
 
1405
    }
 
1406
  }
 
1407
 
1732
1408
  if (statement == NULL)
1733
1409
  {
1734
 
    transaction= getActiveTransactionMessage(session);
1735
 
    
1736
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1737
 
        transaction_message_threshold)
1738
 
    {
1739
 
      transaction= segmentTransactionMessage(session, transaction);
1740
 
    }
1741
 
    
 
1410
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1411
    /* 
 
1412
     * Transaction message initialized and set, but no statement created
 
1413
     * yet.  We construct one and initialize it, here, then return the
 
1414
     * message after attaching the new Statement message pointer to the 
 
1415
     * Session for easy retrieval later...
 
1416
     */
1742
1417
    statement= transaction->add_statement();
1743
 
    setDeleteHeader(*statement, session, table);
1744
 
    session.setStatementMessage(statement);
1745
 
  }
1746
 
  else
1747
 
  {
1748
 
    transaction= getActiveTransactionMessage(session);
1749
 
    
1750
 
    /*
1751
 
     * If we've passed our threshold for the statement size (possible for
1752
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1753
 
     * the Transaction will keep it from getting huge).
1754
 
     */
1755
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1756
 
        transaction_message_threshold)
1757
 
    {
1758
 
      /* Remember the transaction ID so we can re-use it */
1759
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1760
 
      uint32_t seg_id= transaction->segment_id();
1761
 
      
1762
 
      message::DeleteData *current_data= statement->mutable_delete_data();
1763
 
      
1764
 
      /* Caller should use this value when adding a new record */
1765
 
      *next_segment_id= current_data->segment_id() + 1;
1766
 
      
1767
 
      current_data->set_end_segment(false);
1768
 
      transaction->set_end_segment(false);
1769
 
      
1770
 
      /* 
1771
 
       * Send the trx message to replicators after finalizing the 
1772
 
       * statement and transaction. This will also set the Transaction
1773
 
       * and Statement objects in Session to NULL.
1774
 
       */
1775
 
      commitTransactionMessage(session);
1776
 
      
1777
 
      /*
1778
 
       * Statement and Transaction should now be NULL, so new ones will get
1779
 
       * created. We reuse the transaction id since we are segmenting
1780
 
       * one transaction.
1781
 
       */
1782
 
      transaction= getActiveTransactionMessage(session, false);
1783
 
      assert(transaction != NULL);
1784
 
      
1785
 
      statement= transaction->add_statement();
1786
 
      setDeleteHeader(*statement, session, table);
1787
 
      session.setStatementMessage(statement);
1788
 
      
1789
 
      /* Set the transaction ID to match the previous messages */
1790
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1791
 
      transaction->set_segment_id(seg_id + 1);
1792
 
      transaction->set_end_segment(true);
1793
 
    }
1794
 
    else
1795
 
    {
1796
 
      /*
1797
 
       * Continuation of the same statement. Carry forward the existing
1798
 
       * segment id.
1799
 
       */
1800
 
      const message::DeleteData &current_data= statement->delete_data();
1801
 
      *next_segment_id= current_data.segment_id();
1802
 
    }
1803
 
  }
1804
 
  
 
1418
    setDeleteHeader(*statement, in_session, in_table);
 
1419
    in_session->setStatementMessage(statement);
 
1420
  }
1805
1421
  return *statement;
1806
1422
}
1807
1423
 
1808
1424
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
 
                                          Session::const_reference session,
1810
 
                                          Table &table)
 
1425
                                          Session *in_session,
 
1426
                                          Table *in_table)
1811
1427
{
1812
 
  initStatementMessage(statement, message::Statement::DELETE, session);
 
1428
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1813
1429
 
1814
1430
  /* 
1815
1431
   * Now we construct the specialized DeleteHeader message inside
1819
1435
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1820
1436
 
1821
1437
  string schema_name;
1822
 
  (void) table.getShare()->getSchemaName(schema_name);
 
1438
  (void) in_table->getShare()->getSchemaName(schema_name);
1823
1439
  string table_name;
1824
 
  (void) table.getShare()->getTableName(table_name);
 
1440
  (void) in_table->getShare()->getTableName(table_name);
1825
1441
 
1826
1442
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1827
1443
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1828
1444
 
1829
1445
  Field *current_field;
1830
 
  Field **table_fields= table.getFields();
 
1446
  Field **table_fields= in_table->getFields();
1831
1447
 
1832
1448
  message::FieldMetadata *field_metadata;
1833
1449
 
1838
1454
     * primary key field value.  Replication only supports tables
1839
1455
     * with a primary key.
1840
1456
     */
1841
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1457
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1842
1458
    {
1843
1459
      field_metadata= header->add_key_field_metadata();
1844
1460
      field_metadata->set_name(current_field->field_name);
1847
1463
  }
1848
1464
}
1849
1465
 
1850
 
void TransactionServices::deleteRecord(Session::reference session,
1851
 
                                       Table &table,
1852
 
                                       bool use_update_record)
 
1466
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1853
1467
{
1854
1468
  ReplicationServices &replication_services= ReplicationServices::singleton();
1855
1469
  if (! replication_services.isActive())
1856
1470
    return;
1857
1471
 
1858
 
  if (not table.getShare()->replicate())
1859
 
    return;
1860
 
 
1861
 
  uint32_t next_segment_id= 1;
1862
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
 
1472
  message::Statement &statement= getDeleteStatement(in_session, in_table);
1863
1473
 
1864
1474
  message::DeleteData *data= statement.mutable_delete_data();
1865
 
  data->set_segment_id(next_segment_id);
 
1475
  data->set_segment_id(1);
1866
1476
  data->set_end_segment(true);
1867
1477
  message::DeleteRecord *record= data->add_record();
1868
1478
 
1869
1479
  Field *current_field;
1870
 
  Field **table_fields= table.getFields();
1871
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1480
  Field **table_fields= in_table->getFields();
 
1481
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1872
1482
  string_value->set_charset(system_charset_info);
1873
1483
 
1874
1484
  while ((current_field= *table_fields++) != NULL) 
1878
1488
     * primary key field value.  Replication only supports tables
1879
1489
     * with a primary key.
1880
1490
     */
1881
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
 
1491
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1882
1492
    {
1883
 
      if (use_update_record)
1884
 
      {
1885
 
        /*
1886
 
         * Temporarily point to the update record to get its value.
1887
 
         * This is pretty much a hack in order to get the PK value from
1888
 
         * the update record rather than the insert record. Field::val_str()
1889
 
         * should not change anything in Field::ptr, so this should be safe.
1890
 
         * We are careful not to change anything in old_ptr.
1891
 
         */
1892
 
        const unsigned char *old_ptr= current_field->ptr;
1893
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1894
 
        string_value= current_field->val_str_internal(string_value);
1895
 
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1896
 
      }
1897
 
      else
1898
 
      {
1899
 
        string_value= current_field->val_str_internal(string_value);
1900
 
        /**
1901
 
         * @TODO Store optional old record value in the before data member
1902
 
         */
1903
 
      }
 
1493
      string_value= current_field->val_str(string_value);
1904
1494
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1495
      /**
 
1496
       * @TODO Store optional old record value in the before data member
 
1497
       */
1905
1498
      string_value->free();
1906
1499
    }
1907
1500
  }
1908
1501
}
1909
1502
 
1910
 
void TransactionServices::createTable(Session::reference session,
 
1503
void TransactionServices::createTable(Session *in_session,
1911
1504
                                      const message::Table &table)
1912
1505
{
1913
1506
  ReplicationServices &replication_services= ReplicationServices::singleton();
1914
1507
  if (! replication_services.isActive())
1915
1508
    return;
1916
 
 
1917
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1918
 
    return;
1919
 
 
1920
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1509
  
 
1510
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1921
1511
  message::Statement *statement= transaction->add_statement();
1922
1512
 
1923
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
 
1513
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1924
1514
 
1925
1515
  /* 
1926
1516
   * Construct the specialized CreateTableStatement message and attach
1930
1520
  message::Table *new_table_message= create_table_statement->mutable_table();
1931
1521
  *new_table_message= table;
1932
1522
 
1933
 
  finalizeStatementMessage(*statement, session);
 
1523
  finalizeStatementMessage(*statement, in_session);
1934
1524
 
1935
 
  finalizeTransactionMessage(*transaction, session);
 
1525
  finalizeTransactionMessage(*transaction, in_session);
1936
1526
  
1937
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1527
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1938
1528
 
1939
 
  cleanupTransactionMessage(transaction, session);
 
1529
  cleanupTransactionMessage(transaction, in_session);
1940
1530
 
1941
1531
}
1942
1532
 
1943
 
void TransactionServices::createSchema(Session::reference session,
 
1533
void TransactionServices::createSchema(Session *in_session,
1944
1534
                                       const message::Schema &schema)
1945
1535
{
1946
1536
  ReplicationServices &replication_services= ReplicationServices::singleton();
1947
1537
  if (! replication_services.isActive())
1948
1538
    return;
1949
 
 
1950
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1951
 
    return;
1952
 
 
1953
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1539
  
 
1540
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1954
1541
  message::Statement *statement= transaction->add_statement();
1955
1542
 
1956
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
 
1543
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1957
1544
 
1958
1545
  /* 
1959
1546
   * Construct the specialized CreateSchemaStatement message and attach
1963
1550
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
1551
  *new_schema_message= schema;
1965
1552
 
1966
 
  finalizeStatementMessage(*statement, session);
 
1553
  finalizeStatementMessage(*statement, in_session);
1967
1554
 
1968
 
  finalizeTransactionMessage(*transaction, session);
 
1555
  finalizeTransactionMessage(*transaction, in_session);
1969
1556
  
1970
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1557
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1971
1558
 
1972
 
  cleanupTransactionMessage(transaction, session);
 
1559
  cleanupTransactionMessage(transaction, in_session);
1973
1560
 
1974
1561
}
1975
1562
 
1976
 
void TransactionServices::dropSchema(Session::reference session,
1977
 
                                     identifier::Schema::const_reference identifier,
1978
 
                                     message::schema::const_reference schema)
 
1563
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1979
1564
{
1980
1565
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
1566
  if (! replication_services.isActive())
1982
1567
    return;
1983
 
 
1984
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1985
 
    return;
1986
 
 
1987
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1568
  
 
1569
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1988
1570
  message::Statement *statement= transaction->add_statement();
1989
1571
 
1990
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
 
1572
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1991
1573
 
1992
1574
  /* 
1993
1575
   * Construct the specialized DropSchemaStatement message and attach
1995
1577
   */
1996
1578
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1997
1579
 
1998
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1999
 
 
2000
 
  finalizeStatementMessage(*statement, session);
2001
 
 
2002
 
  finalizeTransactionMessage(*transaction, session);
2003
 
  
2004
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2005
 
 
2006
 
  cleanupTransactionMessage(transaction, session);
2007
 
}
2008
 
 
2009
 
void TransactionServices::alterSchema(Session::reference session,
2010
 
                                      const message::Schema &old_schema,
2011
 
                                      const message::Schema &new_schema)
2012
 
{
2013
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2014
 
  if (! replication_services.isActive())
2015
 
    return;
2016
 
 
2017
 
  if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2018
 
    return;
2019
 
 
2020
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2021
 
  message::Statement *statement= transaction->add_statement();
2022
 
 
2023
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2024
 
 
2025
 
  /* 
2026
 
   * Construct the specialized AlterSchemaStatement message and attach
2027
 
   * it to the generic Statement message
2028
 
   */
2029
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2030
 
 
2031
 
  message::Schema *before= alter_schema_statement->mutable_before();
2032
 
  message::Schema *after= alter_schema_statement->mutable_after();
2033
 
 
2034
 
  *before= old_schema;
2035
 
  *after= new_schema;
2036
 
 
2037
 
  finalizeStatementMessage(*statement, session);
2038
 
 
2039
 
  finalizeTransactionMessage(*transaction, session);
2040
 
  
2041
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2042
 
 
2043
 
  cleanupTransactionMessage(transaction, session);
2044
 
}
2045
 
 
2046
 
void TransactionServices::dropTable(Session::reference session,
2047
 
                                    identifier::Table::const_reference identifier,
2048
 
                                    message::table::const_reference table,
 
1580
  drop_schema_statement->set_schema_name(schema_name);
 
1581
 
 
1582
  finalizeStatementMessage(*statement, in_session);
 
1583
 
 
1584
  finalizeTransactionMessage(*transaction, in_session);
 
1585
  
 
1586
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1587
 
 
1588
  cleanupTransactionMessage(transaction, in_session);
 
1589
}
 
1590
 
 
1591
void TransactionServices::dropTable(Session *in_session,
 
1592
                                    const string &schema_name,
 
1593
                                    const string &table_name,
2049
1594
                                    bool if_exists)
2050
1595
{
2051
1596
  ReplicationServices &replication_services= ReplicationServices::singleton();
2052
1597
  if (! replication_services.isActive())
2053
1598
    return;
2054
 
 
2055
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2056
 
    return;
2057
 
 
2058
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1599
  
 
1600
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2059
1601
  message::Statement *statement= transaction->add_statement();
2060
1602
 
2061
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
 
1603
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2062
1604
 
2063
1605
  /* 
2064
1606
   * Construct the specialized DropTableStatement message and attach
2070
1612
 
2071
1613
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2072
1614
 
2073
 
  table_metadata->set_schema_name(identifier.getSchemaName());
2074
 
  table_metadata->set_table_name(identifier.getTableName());
2075
 
 
2076
 
  finalizeStatementMessage(*statement, session);
2077
 
 
2078
 
  finalizeTransactionMessage(*transaction, session);
 
1615
  table_metadata->set_schema_name(schema_name);
 
1616
  table_metadata->set_table_name(table_name);
 
1617
 
 
1618
  finalizeStatementMessage(*statement, in_session);
 
1619
 
 
1620
  finalizeTransactionMessage(*transaction, in_session);
2079
1621
  
2080
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1622
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2081
1623
 
2082
 
  cleanupTransactionMessage(transaction, session);
 
1624
  cleanupTransactionMessage(transaction, in_session);
2083
1625
}
2084
1626
 
2085
 
void TransactionServices::truncateTable(Session::reference session,
2086
 
                                        Table &table)
 
1627
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2087
1628
{
2088
1629
  ReplicationServices &replication_services= ReplicationServices::singleton();
2089
1630
  if (! replication_services.isActive())
2090
1631
    return;
2091
 
 
2092
 
  if (not table.getShare()->replicate())
2093
 
    return;
2094
 
 
2095
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1632
  
 
1633
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
1634
  message::Statement *statement= transaction->add_statement();
2097
1635
 
2098
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
 
1636
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2099
1637
 
2100
1638
  /* 
2101
1639
   * Construct the specialized TruncateTableStatement message and attach
2105
1643
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2106
1644
 
2107
1645
  string schema_name;
2108
 
  (void) table.getShare()->getSchemaName(schema_name);
2109
 
 
 
1646
  (void) in_table->getShare()->getSchemaName(schema_name);
2110
1647
  string table_name;
2111
 
  (void) table.getShare()->getTableName(table_name);
 
1648
  (void) in_table->getShare()->getTableName(table_name);
2112
1649
 
2113
1650
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
1651
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2115
1652
 
2116
 
  finalizeStatementMessage(*statement, session);
 
1653
  finalizeStatementMessage(*statement, in_session);
2117
1654
 
2118
 
  finalizeTransactionMessage(*transaction, session);
 
1655
  finalizeTransactionMessage(*transaction, in_session);
2119
1656
  
2120
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1657
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2121
1658
 
2122
 
  cleanupTransactionMessage(transaction, session);
 
1659
  cleanupTransactionMessage(transaction, in_session);
2123
1660
}
2124
1661
 
2125
 
void TransactionServices::rawStatement(Session::reference session,
2126
 
                                       const string &query)
 
1662
void TransactionServices::rawStatement(Session *in_session, const string &query)
2127
1663
{
2128
1664
  ReplicationServices &replication_services= ReplicationServices::singleton();
2129
1665
  if (! replication_services.isActive())
2130
1666
    return;
2131
 
 
2132
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
 
1667
  
 
1668
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2133
1669
  message::Statement *statement= transaction->add_statement();
2134
1670
 
2135
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
 
1671
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2136
1672
  statement->set_sql(query);
2137
 
  finalizeStatementMessage(*statement, session);
 
1673
  finalizeStatementMessage(*statement, in_session);
2138
1674
 
2139
 
  finalizeTransactionMessage(*transaction, session);
 
1675
  finalizeTransactionMessage(*transaction, in_session);
2140
1676
  
2141
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2142
 
 
2143
 
  cleanupTransactionMessage(transaction, session);
2144
 
}
2145
 
 
2146
 
int TransactionServices::sendEvent(Session::reference session,
2147
 
                                   const message::Event &event)
2148
 
{
2149
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2150
 
  if (! replication_services.isActive())
2151
 
    return 0;
2152
 
 
2153
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
2154
 
 
2155
 
  // set server id, start timestamp
2156
 
  initTransactionMessage(*transaction, session, true);
2157
 
 
2158
 
  // set end timestamp
2159
 
  finalizeTransactionMessage(*transaction, session);
2160
 
 
2161
 
  message::Event *trx_event= transaction->mutable_event();
2162
 
 
2163
 
  trx_event->CopyFrom(event);
2164
 
 
2165
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2166
 
 
2167
 
  delete transaction;
2168
 
 
2169
 
  return static_cast<int>(result);
2170
 
}
2171
 
 
2172
 
bool TransactionServices::sendStartupEvent(Session::reference session)
2173
 
{
2174
 
  message::Event event;
2175
 
  event.set_type(message::Event::STARTUP);
2176
 
  if (sendEvent(session, event) != 0)
2177
 
    return false;
2178
 
  return true;
2179
 
}
2180
 
 
2181
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
2182
 
{
2183
 
  message::Event event;
2184
 
  event.set_type(message::Event::SHUTDOWN);
2185
 
  if (sendEvent(session, event) != 0)
2186
 
    return false;
2187
 
  return true;
 
1677
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1678
 
 
1679
  cleanupTransactionMessage(transaction, in_session);
2188
1680
}
2189
1681
 
2190
1682
} /* namespace drizzled */