~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Andrew Hutchings
  • Date: 2010-11-01 22:14:18 UTC
  • mto: This revision was merged to the branch mainline in revision 1907.
  • Revision ID: andrew@linuxjedi.co.uk-20101101221418-9n9gmm4ms7fl8vo5
Fix copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
71
71
#include "drizzled/plugin/xa_resource_manager.h"
72
72
#include "drizzled/internal/my_sys.h"
73
73
 
74
 
using namespace std;
75
 
 
76
74
#include <vector>
77
75
#include <algorithm>
78
76
#include <functional>
 
77
#include <google/protobuf/repeated_field.h>
 
78
 
 
79
using namespace std;
 
80
using namespace google;
79
81
 
80
82
namespace drizzled
81
83
{
387
389
  if (session->transaction.xid_state.xid.is_null())
388
390
    session->transaction.xid_state.xid.set(session->getQueryId());
389
391
 
390
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
391
 
 
392
392
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
393
393
  if (! session->getResourceContext(monitored, 0)->isStarted())
394
394
    registerResourceForStatement(session, monitored, engine);
427
427
}
428
428
 
429
429
/**
430
 
  Check if we can skip the two-phase commit.
431
 
 
432
 
  A helper function to evaluate if two-phase commit is mandatory.
433
 
  As a side effect, propagates the read-only/read-write flags
434
 
  of the statement transaction to its enclosing normal transaction.
435
 
 
436
 
  @retval true   we must run a two-phase commit. Returned
437
 
                 if we have at least two engines with read-write changes.
438
 
  @retval false  Don't need two-phase commit. Even if we have two
439
 
                 transactional engines, we can run two independent
440
 
                 commits if changes in one of the engines are read-only.
441
 
*/
442
 
static
443
 
bool
444
 
ha_check_and_coalesce_trx_read_only(Session *session,
445
 
                                    TransactionContext::ResourceContexts &resource_contexts,
446
 
                                    bool normal_transaction)
447
 
{
448
 
  /* The number of storage engines that have actual changes. */
449
 
  unsigned num_resources_modified_data= 0;
450
 
  ResourceContext *resource_context;
451
 
 
452
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
 
       it != resource_contexts.end();
454
 
       ++it)
455
 
  {
456
 
    resource_context= *it;
457
 
    if (resource_context->hasModifiedData())
458
 
      ++num_resources_modified_data;
459
 
 
460
 
    if (! normal_transaction)
461
 
    {
462
 
      ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
 
      assert(resource_context != resource_context_normal);
464
 
      /*
465
 
        Merge read-only/read-write information about statement
466
 
        transaction to its enclosing normal transaction. Do this
467
 
        only if in a real transaction -- that is, if we know
468
 
        that resource_context_all is registered in session->transaction.all.
469
 
        Since otherwise we only clutter the normal transaction flags.
470
 
      */
471
 
      if (resource_context_normal->isStarted()) /* false if autocommit. */
472
 
        resource_context_normal->coalesceWith(resource_context);
473
 
    }
474
 
    else if (num_resources_modified_data > 1)
475
 
    {
476
 
      /*
477
 
        It is a normal transaction, so we don't need to merge read/write
478
 
        information up, and the need for two-phase commit has been
479
 
        already established. Break the loop prematurely.
480
 
      */
481
 
      break;
482
 
    }
483
 
  }
484
 
  return num_resources_modified_data > 1;
485
 
}
486
 
 
487
 
 
488
 
/**
489
430
  @retval
490
431
    0   ok
491
432
  @retval
499
440
    stored functions or triggers. So we simply do nothing now.
500
441
    TODO: This should be fixed in later ( >= 5.1) releases.
501
442
*/
502
 
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
 
443
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
503
444
{
504
445
  int error= 0, cookie= 0;
505
446
  /*
522
463
 
523
464
  if (resource_contexts.empty() == false)
524
465
  {
525
 
    bool must_2pc;
526
 
 
527
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
528
467
    {
529
 
      ha_rollback_trans(session, normal_transaction);
 
468
      rollbackTransaction(session, normal_transaction);
530
469
      return 1;
531
470
    }
532
471
 
533
 
    must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
534
 
 
535
 
    if (! trans->no_2pc && must_2pc)
 
472
    /*
 
473
     * If replication is on, we do a PREPARE on the resource managers, push the
 
474
     * Transaction message across the replication stream, and then COMMIT if the
 
475
     * replication stream returned successfully.
 
476
     */
 
477
    if (shouldConstructMessages())
536
478
    {
537
479
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
480
           it != resource_contexts.end() && ! error;
559
501
          }
560
502
          else
561
503
          {
562
 
            status_var_increment(session->status_var.ha_prepare_count);
 
504
            session->status_var.ha_prepare_count++;
563
505
          }
564
506
        }
565
507
      }
 
508
      if (error == 0 && is_real_trans)
 
509
      {
 
510
        /*
 
511
         * Push the constructed Transaction messages across to
 
512
         * replicators and appliers.
 
513
         */
 
514
        error= commitTransactionMessage(session);
 
515
      }
566
516
      if (error)
567
517
      {
568
 
        ha_rollback_trans(session, normal_transaction);
 
518
        rollbackTransaction(session, normal_transaction);
569
519
        error= 1;
570
520
        goto end;
571
521
      }
572
522
    }
573
 
    error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
 
523
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
574
524
end:
575
525
    if (is_real_trans)
576
526
      start_waiting_global_read_lock(session);
582
532
  @note
583
533
  This function does not care about global read lock. A caller should.
584
534
*/
585
 
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
 
535
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
586
536
{
587
537
  int error=0;
588
538
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
610
560
        }
611
561
        else if (normal_transaction)
612
562
        {
613
 
          status_var_increment(session->status_var.ha_commit_count);
 
563
          session->status_var.ha_commit_count++;
614
564
        }
615
565
      }
616
566
      else if (resource->participatesInSqlTransaction())
622
572
        }
623
573
        else if (normal_transaction)
624
574
        {
625
 
          status_var_increment(session->status_var.ha_commit_count);
 
575
          session->status_var.ha_commit_count++;
626
576
        }
627
577
      }
628
578
      resource_context->reset(); /* keep it conveniently zero-filled */
638
588
    }
639
589
  }
640
590
  trans->reset();
641
 
  if (error == 0)
642
 
  {
643
 
    if (is_real_trans)
644
 
    {
645
 
      /* 
646
 
       * We commit the normal transaction by finalizing the transaction message
647
 
       * and propogating the message to all registered replicators.
648
 
       */
649
 
      commitTransactionMessage(session);
650
 
    }
651
 
  }
652
591
  return error;
653
592
}
654
593
 
655
 
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
 
594
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
656
595
{
657
596
  int error= 0;
658
597
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
687
626
        }
688
627
        else if (normal_transaction)
689
628
        {
690
 
          status_var_increment(session->status_var.ha_rollback_count);
 
629
          session->status_var.ha_rollback_count++;
691
630
        }
692
631
      }
693
632
      else if (resource->participatesInSqlTransaction())
699
638
        }
700
639
        else if (normal_transaction)
701
640
        {
702
 
          status_var_increment(session->status_var.ha_rollback_count);
 
641
          session->status_var.ha_rollback_count++;
703
642
        }
704
643
      }
705
644
      resource_context->reset(); /* keep it conveniently zero-filled */
712
651
     * a rollback statement with the corresponding transaction ID
713
652
     * to rollback.
714
653
     */
715
 
    rollbackTransactionMessage(session);
 
654
    if (normal_transaction)
 
655
      rollbackTransactionMessage(session);
716
656
 
717
657
    if (is_real_trans)
718
658
      session->transaction.xid_state.xid.null();
751
691
    the user has used LOCK TABLES then that mechanism does not know to do the
752
692
    commit.
753
693
*/
754
 
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
 
694
int TransactionServices::autocommitOrRollback(Session *session, int error)
755
695
{
 
696
 
756
697
  if (session->transaction.stmt.getResourceContexts().empty() == false)
757
698
  {
 
699
    TransactionContext *trans = &session->transaction.stmt;
 
700
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
701
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
702
         it != resource_contexts.end();
 
703
         ++it)
 
704
    {
 
705
      ResourceContext *resource_context= *it;
 
706
 
 
707
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
708
    }
 
709
 
758
710
    if (! error)
759
711
    {
760
 
      if (ha_commit_trans(session, false))
 
712
      if (commitTransaction(session, false))
761
713
        error= 1;
762
714
    }
763
715
    else
764
716
    {
765
 
      (void) ha_rollback_trans(session, false);
 
717
      (void) rollbackTransaction(session, false);
766
718
      if (session->transaction_rollback_request)
767
 
        (void) ha_rollback_trans(session, true);
 
719
        (void) rollbackTransaction(session, true);
768
720
    }
769
721
 
770
722
    session->variables.tx_isolation= session->session_tx_isolation;
772
724
  return error;
773
725
}
774
726
 
775
 
/**
776
 
  return the list of XID's to a client, the same way SHOW commands do.
777
 
 
778
 
  @note
779
 
    I didn't find in XA specs that an RM cannot return the same XID twice,
780
 
    so mysql_xa_recover does not filter XID's to ensure uniqueness.
781
 
    It can be easily fixed later, if necessary.
782
 
*/
783
 
bool TransactionServices::mysql_xa_recover(Session *session)
784
 
{
785
 
  List<Item> field_list;
786
 
  int i= 0;
787
 
  XID_STATE *xs;
788
 
 
789
 
  field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790
 
  field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791
 
  field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792
 
  field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
793
 
 
794
 
  if (session->client->sendFields(&field_list))
795
 
    return 1;
796
 
 
797
 
  pthread_mutex_lock(&LOCK_xid_cache);
798
 
  while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
799
 
  {
800
 
    if (xs->xa_state==XA_PREPARED)
801
 
    {
802
 
      session->client->store((int64_t)xs->xid.formatID);
803
 
      session->client->store((int64_t)xs->xid.gtrid_length);
804
 
      session->client->store((int64_t)xs->xid.bqual_length);
805
 
      session->client->store(xs->xid.data,
806
 
                             xs->xid.gtrid_length+xs->xid.bqual_length);
807
 
      if (session->client->flush())
808
 
      {
809
 
        pthread_mutex_unlock(&LOCK_xid_cache);
810
 
        return 1;
811
 
      }
812
 
    }
813
 
  }
814
 
 
815
 
  pthread_mutex_unlock(&LOCK_xid_cache);
816
 
  session->my_eof();
817
 
  return 0;
818
 
}
819
 
 
820
727
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
821
728
{
822
729
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
827
734
  }
828
735
};
829
736
 
830
 
int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
 
737
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
831
738
{
832
739
  int error= 0;
833
740
  TransactionContext *trans= &session->transaction.all;
857
764
      }
858
765
      else
859
766
      {
860
 
        status_var_increment(session->status_var.ha_savepoint_rollback_count);
 
767
        session->status_var.ha_savepoint_rollback_count++;
861
768
      }
862
769
    }
863
770
    trans->no_2pc|= not resource->participatesInXaTransaction();
871
778
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
872
779
    TransactionContext::ResourceContexts set_difference_contexts;
873
780
 
 
781
    /* 
 
782
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
 
783
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
 
784
     * here
 
785
     */
 
786
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
 
787
 
874
788
    sort(sorted_tran_resource_contexts.begin(),
875
789
         sorted_tran_resource_contexts.end(),
876
790
         ResourceContextCompare());
907
821
        }
908
822
        else
909
823
        {
910
 
          status_var_increment(session->status_var.ha_rollback_count);
 
824
          session->status_var.ha_rollback_count++;
911
825
        }
912
826
      }
913
827
      resource_context->reset(); /* keep it conveniently zero-filled */
914
828
    }
915
829
  }
916
830
  trans->setResourceContexts(sv_resource_contexts);
 
831
 
 
832
  if (shouldConstructMessages())
 
833
  {
 
834
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
 
835
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
 
836
    if (savepoint_transaction != NULL)
 
837
    {
 
838
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
 
839
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
 
840
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
 
841
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
 
842
      */ 
 
843
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
 
844
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
 
845
      if (num_statements == 0)
 
846
      {    
 
847
        session->setStatementMessage(NULL);
 
848
      }    
 
849
      else 
 
850
      {
 
851
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
852
      }    
 
853
      session->setTransactionMessage(savepoint_transaction_copy);
 
854
    }
 
855
  }
 
856
 
917
857
  return error;
918
858
}
919
859
 
923
863
  section "4.33.4 SQL-statements and transaction states",
924
864
  NamedSavepoint is *not* transaction-initiating SQL-statement
925
865
*/
926
 
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
 
866
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
927
867
{
928
868
  int error= 0;
929
869
  TransactionContext *trans= &session->transaction.all;
949
889
        }
950
890
        else
951
891
        {
952
 
          status_var_increment(session->status_var.ha_savepoint_count);
 
892
          session->status_var.ha_savepoint_count++;
953
893
        }
954
894
      }
955
895
    }
958
898
    Remember the list of registered storage engines.
959
899
  */
960
900
  sv.setResourceContexts(resource_contexts);
 
901
 
 
902
  if (shouldConstructMessages())
 
903
  {
 
904
    message::Transaction *transaction= session->getTransactionMessage();
 
905
                  
 
906
    if (transaction != NULL)
 
907
    {
 
908
      message::Transaction *transaction_savepoint= 
 
909
        new message::Transaction(*transaction);
 
910
      sv.setTransactionMessage(transaction_savepoint);
 
911
    }
 
912
  } 
 
913
 
961
914
  return error;
962
915
}
963
916
 
964
 
int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
 
917
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
965
918
{
966
919
  int error= 0;
967
920
 
985
938
      }
986
939
    }
987
940
  }
 
941
  
988
942
  return error;
989
943
}
990
944
 
991
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
 
945
bool TransactionServices::shouldConstructMessages()
 
946
{
 
947
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
948
  return replication_services.isActive();
 
949
}
 
950
 
 
951
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
992
952
{
993
953
  message::Transaction *transaction= in_session->getTransactionMessage();
994
954
 
1000
960
     * deleting transaction message when done with it.
1001
961
     */
1002
962
    transaction= new (nothrow) message::Transaction();
1003
 
    initTransactionMessage(*transaction, in_session);
 
963
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1004
964
    in_session->setTransactionMessage(transaction);
1005
965
    return transaction;
1006
966
  }
1009
969
}
1010
970
 
1011
971
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1012
 
                                          Session *in_session)
 
972
                                                 Session *in_session,
 
973
                                                 bool should_inc_trx_id)
1013
974
{
1014
975
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
976
  trx->set_server_id(in_session->getServerId());
1016
 
  trx->set_transaction_id(in_session->getQueryId());
 
977
 
 
978
  if (should_inc_trx_id)
 
979
    trx->set_transaction_id(getNextTransactionId());
 
980
 
1017
981
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1018
982
}
1019
983
 
1032
996
  in_session->setTransactionMessage(NULL);
1033
997
}
1034
998
 
1035
 
void TransactionServices::commitTransactionMessage(Session *in_session)
 
999
int TransactionServices::commitTransactionMessage(Session *in_session)
1036
1000
{
1037
1001
  ReplicationServices &replication_services= ReplicationServices::singleton();
1038
1002
  if (! replication_services.isActive())
1039
 
    return;
 
1003
    return 0;
1040
1004
 
1041
1005
  /* If there is an active statement message, finalize it */
1042
1006
  message::Statement *statement= in_session->getStatementMessage();
1046
1010
    finalizeStatementMessage(*statement, in_session);
1047
1011
  }
1048
1012
  else
1049
 
    return; /* No data modification occurred inside the transaction */
 
1013
    return 0; /* No data modification occurred inside the transaction */
1050
1014
  
1051
1015
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1052
1016
 
1053
1017
  finalizeTransactionMessage(*transaction, in_session);
1054
1018
  
1055
 
  replication_services.pushTransactionMessage(*transaction);
 
1019
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1056
1020
 
1057
1021
  cleanupTransactionMessage(transaction, in_session);
 
1022
 
 
1023
  return static_cast<int>(result);
1058
1024
}
1059
1025
 
1060
1026
void TransactionServices::initStatementMessage(message::Statement &statement,
1100
1066
   */
1101
1067
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1102
1068
  {
 
1069
    /* Remember the transaction ID so we can re-use it */
 
1070
    uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1071
 
1103
1072
    /*
1104
1073
     * Clear the transaction, create a Rollback statement message, 
1105
1074
     * attach it to the transaction, and push it to replicators.
1106
1075
     */
1107
1076
    transaction->Clear();
1108
 
    initTransactionMessage(*transaction, in_session);
 
1077
    initTransactionMessage(*transaction, in_session, false);
 
1078
 
 
1079
    /* Set the transaction ID to match the previous messages */
 
1080
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1109
1081
 
1110
1082
    message::Statement *statement= transaction->add_statement();
1111
1083
 
1114
1086
 
1115
1087
    finalizeTransactionMessage(*transaction, in_session);
1116
1088
    
1117
 
    replication_services.pushTransactionMessage(*transaction);
 
1089
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1118
1090
  }
1119
1091
  cleanupTransactionMessage(transaction, in_session);
1120
1092
}
1121
1093
 
1122
1094
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1123
 
                                                                 Table *in_table)
 
1095
                                                            Table *in_table,
 
1096
                                                            uint32_t *next_segment_id)
1124
1097
{
1125
1098
  message::Statement *statement= in_session->getStatementMessage();
1126
 
  /*
1127
 
   * We check to see if the current Statement message is of type INSERT.
1128
 
   * If it is not, we finalize the current Statement and ensure a new
1129
 
   * InsertStatement is created.
 
1099
  message::Transaction *transaction= NULL;
 
1100
 
 
1101
  /* 
 
1102
   * Check the type for the current Statement message, if it is anything
 
1103
   * other then INSERT we need to call finalize, this will ensure a 
 
1104
   * new InsertStatement is created. If it is of type INSERT check
 
1105
   * what table the INSERT belongs to, if it is a different table
 
1106
   * call finalize, so a new InsertStatement can be created. 
1130
1107
   */
1131
 
  if (statement != NULL &&
1132
 
      statement->type() != message::Statement::INSERT)
 
1108
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1133
1109
  {
1134
1110
    finalizeStatementMessage(*statement, in_session);
1135
1111
    statement= in_session->getStatementMessage();
1136
 
  }
 
1112
  } 
 
1113
  else if (statement != NULL)
 
1114
  {
 
1115
    transaction= getActiveTransactionMessage(in_session);
 
1116
 
 
1117
    /*
 
1118
     * If we've passed our threshold for the statement size (possible for
 
1119
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1120
     * the Transaction will keep it from getting huge).
 
1121
     */
 
1122
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1123
      in_session->variables.transaction_message_threshold)
 
1124
    {
 
1125
      /* Remember the transaction ID so we can re-use it */
 
1126
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1127
 
 
1128
      message::InsertData *current_data= statement->mutable_insert_data();
 
1129
 
 
1130
      /* Caller should use this value when adding a new record */
 
1131
      *next_segment_id= current_data->segment_id() + 1;
 
1132
 
 
1133
      current_data->set_end_segment(false);
 
1134
 
 
1135
      /* 
 
1136
       * Send the trx message to replicators after finalizing the 
 
1137
       * statement and transaction. This will also set the Transaction
 
1138
       * and Statement objects in Session to NULL.
 
1139
       */
 
1140
      commitTransactionMessage(in_session);
 
1141
 
 
1142
      /*
 
1143
       * Statement and Transaction should now be NULL, so new ones will get
 
1144
       * created. We reuse the transaction id since we are segmenting
 
1145
       * one transaction.
 
1146
       */
 
1147
      statement= in_session->getStatementMessage();
 
1148
      transaction= getActiveTransactionMessage(in_session, false);
 
1149
      assert(transaction != NULL);
 
1150
 
 
1151
      /* Set the transaction ID to match the previous messages */
 
1152
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1153
    }
 
1154
    else
 
1155
    {
 
1156
      const message::InsertHeader &insert_header= statement->insert_header();
 
1157
      string old_table_name= insert_header.table_metadata().table_name();
 
1158
     
 
1159
      string current_table_name;
 
1160
      (void) in_table->getShare()->getTableName(current_table_name);
 
1161
 
 
1162
      if (current_table_name.compare(old_table_name))
 
1163
      {
 
1164
        finalizeStatementMessage(*statement, in_session);
 
1165
        statement= in_session->getStatementMessage();
 
1166
      }
 
1167
      else
 
1168
      {
 
1169
        /* carry forward the existing segment id */
 
1170
        const message::InsertData &current_data= statement->insert_data();
 
1171
        *next_segment_id= current_data.segment_id();
 
1172
      }
 
1173
    }
 
1174
  } 
1137
1175
 
1138
1176
  if (statement == NULL)
1139
1177
  {
1140
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1178
    /*
 
1179
     * Transaction will be non-NULL only if we had to segment it due to
 
1180
     * transaction size above.
 
1181
     */
 
1182
    if (transaction == NULL)
 
1183
      transaction= getActiveTransactionMessage(in_session);
 
1184
 
1141
1185
    /* 
1142
1186
     * Transaction message initialized and set, but no statement created
1143
1187
     * yet.  We construct one and initialize it, here, then return the
1174
1218
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1175
1219
 
1176
1220
  Field *current_field;
1177
 
  Field **table_fields= in_table->field;
 
1221
  Field **table_fields= in_table->getFields();
1178
1222
 
1179
1223
  message::FieldMetadata *field_metadata;
1180
1224
 
1202
1246
   *
1203
1247
   * Multi-column primary keys are handled how exactly?
1204
1248
   */
1205
 
  if (in_table->s->primary_key == MAX_KEY)
 
1249
  if (not in_table->getShare()->hasPrimaryKey())
1206
1250
  {
1207
1251
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1208
1252
    return true;
1209
1253
  }
1210
1254
 
1211
 
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
1255
  uint32_t next_segment_id= 1;
 
1256
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1212
1257
 
1213
1258
  message::InsertData *data= statement.mutable_insert_data();
1214
 
  data->set_segment_id(1);
 
1259
  data->set_segment_id(next_segment_id);
1215
1260
  data->set_end_segment(true);
1216
1261
  message::InsertRecord *record= data->add_record();
1217
1262
 
1218
1263
  Field *current_field;
1219
 
  Field **table_fields= in_table->field;
 
1264
  Field **table_fields= in_table->getFields();
1220
1265
 
1221
1266
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1222
1267
  string_value->set_charset(system_charset_info);
1226
1271
 
1227
1272
  while ((current_field= *table_fields++) != NULL) 
1228
1273
  {
1229
 
    string_value= current_field->val_str(string_value);
1230
 
    record->add_insert_value(string_value->c_ptr(), string_value->length());
1231
 
    string_value->free();
 
1274
    if (current_field->is_null())
 
1275
    {
 
1276
      record->add_is_null(true);
 
1277
      record->add_insert_value("", 0);
 
1278
    } 
 
1279
    else 
 
1280
    {
 
1281
      string_value= current_field->val_str(string_value);
 
1282
      record->add_is_null(false);
 
1283
      record->add_insert_value(string_value->c_ptr(), string_value->length());
 
1284
      string_value->free();
 
1285
    }
1232
1286
  }
1233
1287
  return false;
1234
1288
}
1236
1290
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1237
1291
                                                            Table *in_table,
1238
1292
                                                            const unsigned char *old_record, 
1239
 
                                                            const unsigned char *new_record)
 
1293
                                                            const unsigned char *new_record,
 
1294
                                                            uint32_t *next_segment_id)
1240
1295
{
1241
1296
  message::Statement *statement= in_session->getStatementMessage();
 
1297
  message::Transaction *transaction= NULL;
 
1298
 
1242
1299
  /*
1243
 
   * We check to see if the current Statement message is of type UPDATE.
1244
 
   * If it is not, we finalize the current Statement and ensure a new
1245
 
   * UpdateStatement is created.
 
1300
   * Check the type for the current Statement message, if it is anything
 
1301
   * other then UPDATE we need to call finalize, this will ensure a
 
1302
   * new UpdateStatement is created. If it is of type UPDATE check
 
1303
   * what table the UPDATE belongs to, if it is a different table
 
1304
   * call finalize, so a new UpdateStatement can be created.
1246
1305
   */
1247
 
  if (statement != NULL &&
1248
 
      statement->type() != message::Statement::UPDATE)
 
1306
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1249
1307
  {
1250
1308
    finalizeStatementMessage(*statement, in_session);
1251
1309
    statement= in_session->getStatementMessage();
1252
1310
  }
 
1311
  else if (statement != NULL)
 
1312
  {
 
1313
    transaction= getActiveTransactionMessage(in_session);
 
1314
 
 
1315
    /*
 
1316
     * If we've passed our threshold for the statement size (possible for
 
1317
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1318
     * the Transaction will keep it from getting huge).
 
1319
     */
 
1320
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1321
      in_session->variables.transaction_message_threshold)
 
1322
    {
 
1323
      /* Remember the transaction ID so we can re-use it */
 
1324
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1325
 
 
1326
      message::UpdateData *current_data= statement->mutable_update_data();
 
1327
 
 
1328
      /* Caller should use this value when adding a new record */
 
1329
      *next_segment_id= current_data->segment_id() + 1;
 
1330
 
 
1331
      current_data->set_end_segment(false);
 
1332
 
 
1333
      /*
 
1334
       * Send the trx message to replicators after finalizing the 
 
1335
       * statement and transaction. This will also set the Transaction
 
1336
       * and Statement objects in Session to NULL.
 
1337
       */
 
1338
      commitTransactionMessage(in_session);
 
1339
 
 
1340
      /*
 
1341
       * Statement and Transaction should now be NULL, so new ones will get
 
1342
       * created. We reuse the transaction id since we are segmenting
 
1343
       * one transaction.
 
1344
       */
 
1345
      statement= in_session->getStatementMessage();
 
1346
      transaction= getActiveTransactionMessage(in_session, false);
 
1347
      assert(transaction != NULL);
 
1348
 
 
1349
      /* Set the transaction ID to match the previous messages */
 
1350
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1351
    }
 
1352
    else
 
1353
    {
 
1354
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1355
      {
 
1356
        /* carry forward the existing segment id */
 
1357
        const message::UpdateData &current_data= statement->update_data();
 
1358
        *next_segment_id= current_data.segment_id();
 
1359
      } 
 
1360
      else 
 
1361
      {
 
1362
        finalizeStatementMessage(*statement, in_session);
 
1363
        statement= in_session->getStatementMessage();
 
1364
      }
 
1365
    }
 
1366
  }
1253
1367
 
1254
1368
  if (statement == NULL)
1255
1369
  {
1256
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1370
    /*
 
1371
     * Transaction will be non-NULL only if we had to segment it due to
 
1372
     * transaction size above.
 
1373
     */
 
1374
    if (transaction == NULL)
 
1375
      transaction= getActiveTransactionMessage(in_session);
 
1376
 
1257
1377
    /* 
1258
1378
     * Transaction message initialized and set, but no statement created
1259
1379
     * yet.  We construct one and initialize it, here, then return the
1267
1387
  return *statement;
1268
1388
}
1269
1389
 
 
1390
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1391
                                                  Table *in_table,
 
1392
                                                  const unsigned char *old_record,
 
1393
                                                  const unsigned char *new_record)
 
1394
{
 
1395
  const message::UpdateHeader &update_header= statement.update_header();
 
1396
  string old_table_name= update_header.table_metadata().table_name();
 
1397
 
 
1398
  string current_table_name;
 
1399
  (void) in_table->getShare()->getTableName(current_table_name);
 
1400
  if (current_table_name.compare(old_table_name))
 
1401
  {
 
1402
    return false;
 
1403
  }
 
1404
  else
 
1405
  {
 
1406
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1407
     * match the updated fields in the new record, if they do not we must
 
1408
     * create a new UpdateHeader 
 
1409
     */
 
1410
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1411
 
 
1412
    Field *current_field;
 
1413
    Field **table_fields= in_table->getFields();
 
1414
    in_table->setReadSet();
 
1415
 
 
1416
    size_t num_calculated_updated_fields= 0;
 
1417
    bool found= false;
 
1418
    while ((current_field= *table_fields++) != NULL)
 
1419
    {
 
1420
      if (num_calculated_updated_fields > num_set_fields)
 
1421
      {
 
1422
        break;
 
1423
      }
 
1424
 
 
1425
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1426
      {
 
1427
        /* check that this field exists in the UpdateHeader record */
 
1428
        found= false;
 
1429
 
 
1430
        for (size_t x= 0; x < num_set_fields; ++x)
 
1431
        {
 
1432
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1433
          string name= field_metadata.name();
 
1434
          if (name.compare(current_field->field_name) == 0)
 
1435
          {
 
1436
            found= true;
 
1437
            ++num_calculated_updated_fields;
 
1438
            break;
 
1439
          } 
 
1440
        }
 
1441
        if (! found)
 
1442
        {
 
1443
          break;
 
1444
        } 
 
1445
      }
 
1446
    }
 
1447
 
 
1448
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1449
    {
 
1450
      return true;
 
1451
    } 
 
1452
    else 
 
1453
    {
 
1454
      return false;
 
1455
    }
 
1456
  }
 
1457
}  
 
1458
 
1270
1459
void TransactionServices::setUpdateHeader(message::Statement &statement,
1271
1460
                                          Session *in_session,
1272
1461
                                          Table *in_table,
1292
1481
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1293
1482
 
1294
1483
  Field *current_field;
1295
 
  Field **table_fields= in_table->field;
 
1484
  Field **table_fields= in_table->getFields();
1296
1485
 
1297
1486
  message::FieldMetadata *field_metadata;
1298
1487
 
1305
1494
     * We add the "key field metadata" -- i.e. the fields which is
1306
1495
     * the primary key for the table.
1307
1496
     */
1308
 
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1497
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1309
1498
    {
1310
1499
      field_metadata= header->add_key_field_metadata();
1311
1500
      field_metadata->set_name(current_field->field_name);
1312
1501
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1313
1502
    }
1314
1503
 
1315
 
    /*
1316
 
     * The below really should be moved into the Field API and Record API.  But for now
1317
 
     * we do this crazy pointer fiddling to figure out if the current field
1318
 
     * has been updated in the supplied record raw byte pointers.
1319
 
     */
1320
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1321
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1322
 
 
1323
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1324
 
 
1325
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1504
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1326
1505
    {
1327
1506
      /* Field is changed from old to new */
1328
1507
      field_metadata= header->add_set_field_metadata();
1340
1519
  if (! replication_services.isActive())
1341
1520
    return;
1342
1521
 
1343
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
1522
  uint32_t next_segment_id= 1;
 
1523
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1344
1524
 
1345
1525
  message::UpdateData *data= statement.mutable_update_data();
1346
 
  data->set_segment_id(1);
 
1526
  data->set_segment_id(next_segment_id);
1347
1527
  data->set_end_segment(true);
1348
1528
  message::UpdateRecord *record= data->add_record();
1349
1529
 
1350
1530
  Field *current_field;
1351
 
  Field **table_fields= in_table->field;
 
1531
  Field **table_fields= in_table->getFields();
1352
1532
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1353
1533
  string_value->set_charset(system_charset_info);
1354
1534
 
1364
1544
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1365
1545
     *
1366
1546
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1367
 
     *
1368
 
     * The below really should be moved into the Field API and Record API.  But for now
1369
 
     * we do this crazy pointer fiddling to figure out if the current field
1370
 
     * has been updated in the supplied record raw byte pointers.
1371
1547
     */
1372
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1373
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1374
 
 
1375
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1376
 
 
1377
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1548
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1378
1549
    {
1379
1550
      /* Store the original "read bit" for this field */
1380
1551
      bool is_read_set= current_field->isReadSet();
1391
1562
       */
1392
1563
      current_field->setReadSet(is_read_set);
1393
1564
 
1394
 
      record->add_after_value(string_value->c_ptr(), string_value->length());
 
1565
      if (current_field->is_null())
 
1566
      {
 
1567
        record->add_is_null(true);
 
1568
        record->add_after_value("", 0);
 
1569
      }
 
1570
      else
 
1571
      {
 
1572
        record->add_is_null(false);
 
1573
        record->add_after_value(string_value->c_ptr(), string_value->length());
 
1574
      }
1395
1575
      string_value->free();
1396
1576
    }
1397
1577
 
1400
1580
     * primary key field value.  Replication only supports tables
1401
1581
     * with a primary key.
1402
1582
     */
1403
 
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1583
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1404
1584
    {
1405
1585
      /**
1406
1586
       * To say the below is ugly is an understatement. But it works.
1417
1597
  }
1418
1598
}
1419
1599
 
 
1600
bool TransactionServices::isFieldUpdated(Field *current_field,
 
1601
                                         Table *in_table,
 
1602
                                         const unsigned char *old_record,
 
1603
                                         const unsigned char *new_record)
 
1604
{
 
1605
  /*
 
1606
   * The below really should be moved into the Field API and Record API.  But for now
 
1607
   * we do this crazy pointer fiddling to figure out if the current field
 
1608
   * has been updated in the supplied record raw byte pointers.
 
1609
   */
 
1610
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1611
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1612
 
 
1613
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1614
 
 
1615
  bool old_value_is_null= current_field->is_null_in_record(old_record);
 
1616
  bool new_value_is_null= current_field->is_null_in_record(new_record);
 
1617
 
 
1618
  bool isUpdated= false;
 
1619
  if (old_value_is_null != new_value_is_null)
 
1620
  {
 
1621
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
 
1622
    {
 
1623
      isUpdated= true;
 
1624
    }
 
1625
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
 
1626
    {
 
1627
      isUpdated= true;
 
1628
    }
 
1629
  }
 
1630
 
 
1631
  if (! isUpdated)
 
1632
  {
 
1633
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1634
    {
 
1635
      isUpdated= true;
 
1636
    }
 
1637
  }
 
1638
  return isUpdated;
 
1639
}  
 
1640
 
1420
1641
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1421
 
                                                            Table *in_table)
 
1642
                                                            Table *in_table,
 
1643
                                                            uint32_t *next_segment_id)
1422
1644
{
1423
1645
  message::Statement *statement= in_session->getStatementMessage();
 
1646
  message::Transaction *transaction= NULL;
 
1647
 
1424
1648
  /*
1425
 
   * We check to see if the current Statement message is of type DELETE.
1426
 
   * If it is not, we finalize the current Statement and ensure a new
1427
 
   * DeleteStatement is created.
 
1649
   * Check the type for the current Statement message, if it is anything
 
1650
   * other then DELETE we need to call finalize, this will ensure a
 
1651
   * new DeleteStatement is created. If it is of type DELETE check
 
1652
   * what table the DELETE belongs to, if it is a different table
 
1653
   * call finalize, so a new DeleteStatement can be created.
1428
1654
   */
1429
 
  if (statement != NULL &&
1430
 
      statement->type() != message::Statement::DELETE)
 
1655
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1431
1656
  {
1432
1657
    finalizeStatementMessage(*statement, in_session);
1433
1658
    statement= in_session->getStatementMessage();
1434
1659
  }
 
1660
  else if (statement != NULL)
 
1661
  {
 
1662
    transaction= getActiveTransactionMessage(in_session);
 
1663
 
 
1664
    /*
 
1665
     * If we've passed our threshold for the statement size (possible for
 
1666
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1667
     * the Transaction will keep it from getting huge).
 
1668
     */
 
1669
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1670
      in_session->variables.transaction_message_threshold)
 
1671
    {
 
1672
      /* Remember the transaction ID so we can re-use it */
 
1673
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1674
 
 
1675
      message::DeleteData *current_data= statement->mutable_delete_data();
 
1676
 
 
1677
      /* Caller should use this value when adding a new record */
 
1678
      *next_segment_id= current_data->segment_id() + 1;
 
1679
 
 
1680
      current_data->set_end_segment(false);
 
1681
 
 
1682
      /* 
 
1683
       * Send the trx message to replicators after finalizing the 
 
1684
       * statement and transaction. This will also set the Transaction
 
1685
       * and Statement objects in Session to NULL.
 
1686
       */
 
1687
      commitTransactionMessage(in_session);
 
1688
 
 
1689
      /*
 
1690
       * Statement and Transaction should now be NULL, so new ones will get
 
1691
       * created. We reuse the transaction id since we are segmenting
 
1692
       * one transaction.
 
1693
       */
 
1694
      statement= in_session->getStatementMessage();
 
1695
      transaction= getActiveTransactionMessage(in_session, false);
 
1696
      assert(transaction != NULL);
 
1697
 
 
1698
      /* Set the transaction ID to match the previous messages */
 
1699
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1700
    }
 
1701
    else
 
1702
    {
 
1703
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1704
      string old_table_name= delete_header.table_metadata().table_name();
 
1705
 
 
1706
      string current_table_name;
 
1707
      (void) in_table->getShare()->getTableName(current_table_name);
 
1708
      if (current_table_name.compare(old_table_name))
 
1709
      {
 
1710
        finalizeStatementMessage(*statement, in_session);
 
1711
        statement= in_session->getStatementMessage();
 
1712
      }
 
1713
      else
 
1714
      {
 
1715
        /* carry forward the existing segment id */
 
1716
        const message::DeleteData &current_data= statement->delete_data();
 
1717
        *next_segment_id= current_data.segment_id();
 
1718
      }
 
1719
    }
 
1720
  }
1435
1721
 
1436
1722
  if (statement == NULL)
1437
1723
  {
1438
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1724
    /*
 
1725
     * Transaction will be non-NULL only if we had to segment it due to
 
1726
     * transaction size above.
 
1727
     */
 
1728
    if (transaction == NULL)
 
1729
      transaction= getActiveTransactionMessage(in_session);
 
1730
 
1439
1731
    /* 
1440
1732
     * Transaction message initialized and set, but no statement created
1441
1733
     * yet.  We construct one and initialize it, here, then return the
1471
1763
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1472
1764
 
1473
1765
  Field *current_field;
1474
 
  Field **table_fields= in_table->field;
 
1766
  Field **table_fields= in_table->getFields();
1475
1767
 
1476
1768
  message::FieldMetadata *field_metadata;
1477
1769
 
1482
1774
     * primary key field value.  Replication only supports tables
1483
1775
     * with a primary key.
1484
1776
     */
1485
 
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1777
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1486
1778
    {
1487
1779
      field_metadata= header->add_key_field_metadata();
1488
1780
      field_metadata->set_name(current_field->field_name);
1491
1783
  }
1492
1784
}
1493
1785
 
1494
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
1786
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1495
1787
{
1496
1788
  ReplicationServices &replication_services= ReplicationServices::singleton();
1497
1789
  if (! replication_services.isActive())
1498
1790
    return;
1499
1791
 
1500
 
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
1792
  uint32_t next_segment_id= 1;
 
1793
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1501
1794
 
1502
1795
  message::DeleteData *data= statement.mutable_delete_data();
1503
 
  data->set_segment_id(1);
 
1796
  data->set_segment_id(next_segment_id);
1504
1797
  data->set_end_segment(true);
1505
1798
  message::DeleteRecord *record= data->add_record();
1506
1799
 
1507
1800
  Field *current_field;
1508
 
  Field **table_fields= in_table->field;
 
1801
  Field **table_fields= in_table->getFields();
1509
1802
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1510
1803
  string_value->set_charset(system_charset_info);
1511
1804
 
1516
1809
     * primary key field value.  Replication only supports tables
1517
1810
     * with a primary key.
1518
1811
     */
1519
 
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1812
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1520
1813
    {
1521
 
      string_value= current_field->val_str(string_value);
 
1814
      if (use_update_record)
 
1815
      {
 
1816
        /*
 
1817
         * Temporarily point to the update record to get its value.
 
1818
         * This is pretty much a hack in order to get the PK value from
 
1819
         * the update record rather than the insert record. Field::val_str()
 
1820
         * should not change anything in Field::ptr, so this should be safe.
 
1821
         * We are careful not to change anything in old_ptr.
 
1822
         */
 
1823
        const unsigned char *old_ptr= current_field->ptr;
 
1824
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1825
        string_value= current_field->val_str(string_value);
 
1826
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
 
1827
      }
 
1828
      else
 
1829
      {
 
1830
        string_value= current_field->val_str(string_value);
 
1831
        /**
 
1832
         * @TODO Store optional old record value in the before data member
 
1833
         */
 
1834
      }
1522
1835
      record->add_key_value(string_value->c_ptr(), string_value->length());
1523
 
      /**
1524
 
       * @TODO Store optional old record value in the before data member
 
1836
      string_value->free();
 
1837
    }
 
1838
  }
 
1839
}
 
1840
 
 
1841
 
 
1842
/**
 
1843
 * Template for removing Statement records of different types.
 
1844
 *
 
1845
 * The code for removing records from different Statement message types
 
1846
 * is identical except for the class types that are embedded within the
 
1847
 * Statement.
 
1848
 *
 
1849
 * There are 3 scenarios we need to look for:
 
1850
 *   - We've been asked to remove more records than exist in the Statement
 
1851
 *   - We've been asked to remove less records than exist in the Statement
 
1852
 *   - We've been asked to remove ALL records that exist in the Statement
 
1853
 *
 
1854
 * If we are removing ALL records, then effectively we would be left with
 
1855
 * an empty Statement message, so we should just remove it and clean up
 
1856
 * message pointers in the Session object.
 
1857
 */
 
1858
template <class DataType, class RecordType>
 
1859
static bool removeStatementRecordsWithType(Session *session,
 
1860
                                           DataType *data,
 
1861
                                           uint32_t count)
 
1862
{
 
1863
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1864
 
 
1865
  /* If there aren't enough records to remove 'count' of them, error. */
 
1866
  if (num_avail_recs < count)
 
1867
    return false;
 
1868
 
 
1869
  /*
 
1870
   * If we are removing all of the data records, we'll just remove this
 
1871
   * entire Statement message.
 
1872
   */
 
1873
  if (num_avail_recs == count)
 
1874
  {
 
1875
    message::Transaction *transaction= session->getTransactionMessage();
 
1876
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1877
    statements->RemoveLast();
 
1878
 
 
1879
    /*
 
1880
     * Now need to set the Session Statement pointer to either the previous
 
1881
     * Statement, or NULL if there isn't one.
 
1882
     */
 
1883
    if (statements->size() == 0)
 
1884
    {
 
1885
      session->setStatementMessage(NULL);
 
1886
    }
 
1887
    else
 
1888
    {
 
1889
      /*
 
1890
       * There isn't a great way to get a pointer to the previous Statement
 
1891
       * message using the RepeatedPtrField object, so we'll just get to it
 
1892
       * using the Transaction message.
1525
1893
       */
1526
 
      string_value->free();
1527
 
    }
1528
 
  }
1529
 
}
 
1894
      int last_stmt_idx= transaction->statement_size() - 1;
 
1895
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1896
    }
 
1897
  }
 
1898
  /* We only need to remove 'count' records */
 
1899
  else if (num_avail_recs > count)
 
1900
  {
 
1901
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1902
    while (count--)
 
1903
      records->RemoveLast();
 
1904
  }
 
1905
 
 
1906
  return true;
 
1907
}
 
1908
 
 
1909
 
 
1910
bool TransactionServices::removeStatementRecords(Session *session,
 
1911
                                                 uint32_t count)
 
1912
{
 
1913
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1914
  if (! replication_services.isActive())
 
1915
    return false;
 
1916
 
 
1917
  /* Get the most current Statement */
 
1918
  message::Statement *statement= session->getStatementMessage();
 
1919
 
 
1920
  /* Make sure we have work to do */
 
1921
  if (statement == NULL)
 
1922
    return false;
 
1923
 
 
1924
  bool retval= false;
 
1925
 
 
1926
  switch (statement->type())
 
1927
  {
 
1928
    case message::Statement::INSERT:
 
1929
    {
 
1930
      message::InsertData *data= statement->mutable_insert_data();
 
1931
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
1932
      break;
 
1933
    }
 
1934
 
 
1935
    case message::Statement::UPDATE:
 
1936
    {
 
1937
      message::UpdateData *data= statement->mutable_update_data();
 
1938
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
1939
      break;
 
1940
    }
 
1941
 
 
1942
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
1943
    {
 
1944
      message::DeleteData *data= statement->mutable_delete_data();
 
1945
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
1946
      break;
 
1947
    }
 
1948
 
 
1949
    default:
 
1950
      retval= false;
 
1951
      break;
 
1952
  }
 
1953
 
 
1954
  return retval;
 
1955
}
 
1956
 
1530
1957
 
1531
1958
void TransactionServices::createTable(Session *in_session,
1532
1959
                                      const message::Table &table)
1552
1979
 
1553
1980
  finalizeTransactionMessage(*transaction, in_session);
1554
1981
  
1555
 
  replication_services.pushTransactionMessage(*transaction);
 
1982
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1556
1983
 
1557
1984
  cleanupTransactionMessage(transaction, in_session);
1558
1985
 
1582
2009
 
1583
2010
  finalizeTransactionMessage(*transaction, in_session);
1584
2011
  
1585
 
  replication_services.pushTransactionMessage(*transaction);
 
2012
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1586
2013
 
1587
2014
  cleanupTransactionMessage(transaction, in_session);
1588
2015
 
1611
2038
 
1612
2039
  finalizeTransactionMessage(*transaction, in_session);
1613
2040
  
1614
 
  replication_services.pushTransactionMessage(*transaction);
 
2041
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1615
2042
 
1616
2043
  cleanupTransactionMessage(transaction, in_session);
1617
2044
}
1647
2074
 
1648
2075
  finalizeTransactionMessage(*transaction, in_session);
1649
2076
  
1650
 
  replication_services.pushTransactionMessage(*transaction);
 
2077
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1651
2078
 
1652
2079
  cleanupTransactionMessage(transaction, in_session);
1653
2080
}
1682
2109
 
1683
2110
  finalizeTransactionMessage(*transaction, in_session);
1684
2111
  
1685
 
  replication_services.pushTransactionMessage(*transaction);
 
2112
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1686
2113
 
1687
2114
  cleanupTransactionMessage(transaction, in_session);
1688
2115
}
1702
2129
 
1703
2130
  finalizeTransactionMessage(*transaction, in_session);
1704
2131
  
1705
 
  replication_services.pushTransactionMessage(*transaction);
 
2132
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1706
2133
 
1707
2134
  cleanupTransactionMessage(transaction, in_session);
1708
2135
}
1709
2136
 
 
2137
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2138
{
 
2139
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2140
  if (! replication_services.isActive())
 
2141
    return 0;
 
2142
 
 
2143
  message::Transaction *transaction= new (nothrow) message::Transaction();
 
2144
 
 
2145
  // set server id, start timestamp
 
2146
  initTransactionMessage(*transaction, session, true);
 
2147
 
 
2148
  // set end timestamp
 
2149
  finalizeTransactionMessage(*transaction, session);
 
2150
 
 
2151
  message::Event *trx_event= transaction->mutable_event();
 
2152
 
 
2153
  trx_event->CopyFrom(event);
 
2154
 
 
2155
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2156
 
 
2157
  delete transaction;
 
2158
 
 
2159
  return static_cast<int>(result);
 
2160
}
 
2161
 
 
2162
bool TransactionServices::sendStartupEvent(Session *session)
 
2163
{
 
2164
  message::Event event;
 
2165
  event.set_type(message::Event::STARTUP);
 
2166
  if (sendEvent(session, event) != 0)
 
2167
    return false;
 
2168
  return true;
 
2169
}
 
2170
 
 
2171
bool TransactionServices::sendShutdownEvent(Session *session)
 
2172
{
 
2173
  message::Event event;
 
2174
  event.set_type(message::Event::SHUTDOWN);
 
2175
  if (sendEvent(session, event) != 0)
 
2176
    return false;
 
2177
  return true;
 
2178
}
 
2179
 
1710
2180
} /* namespace drizzled */