~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-07-16 05:40:08 UTC
  • mto: This revision was merged to the branch mainline in revision 1661.
  • Revision ID: brian@gaz-20100716054008-cfcvyu0akpwqnwqd
Encapsulate Table in field

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
71
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/plugin/xa_storage_engine.h"
73
72
#include "drizzled/internal/my_sys.h"
74
73
 
 
74
using namespace std;
 
75
 
75
76
#include <vector>
76
77
#include <algorithm>
77
78
#include <functional>
78
 
#include <google/protobuf/repeated_field.h>
79
 
 
80
 
using namespace std;
81
 
using namespace google;
82
79
 
83
80
namespace drizzled
84
81
{
300
297
 * transaction after all DDLs, just like the statement transaction
301
298
 * is always committed at the end of all statements.
302
299
 */
303
 
TransactionServices::TransactionServices()
304
 
{
305
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
306
 
  if (engine)
307
 
  {
308
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
309
 
  }
310
 
  else 
311
 
  {
312
 
    xa_storage_engine= NULL;
313
 
  }
314
 
}
315
 
 
316
300
void TransactionServices::registerResourceForStatement(Session *session,
317
301
                                                       plugin::MonitoredInTransaction *monitored,
318
302
                                                       plugin::TransactionalStorageEngine *engine)
403
387
  if (session->transaction.xid_state.xid.is_null())
404
388
    session->transaction.xid_state.xid.set(session->getQueryId());
405
389
 
 
390
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
391
 
406
392
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
393
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
394
    registerResourceForStatement(session, monitored, engine);
440
426
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
427
}
442
428
 
443
 
void TransactionServices::allocateNewTransactionId()
444
 
{
445
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
446
 
  if (! replication_services.isActive())
447
 
  {
448
 
    return;
449
 
  }
450
 
 
451
 
  Session *my_session= current_session;
452
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
 
  my_session->setXaId(xa_id);
454
 
}
455
 
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
 
{
458
 
  if (session->getXaId() == 0)
459
 
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
461
 
  }
462
 
 
463
 
  return session->getXaId();
464
 
}
465
 
 
466
429
/**
467
430
  @retval
468
431
    0   ok
500
463
 
501
464
  if (resource_contexts.empty() == false)
502
465
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
504
467
    {
505
468
      rollbackTransaction(session, normal_transaction);
506
469
      return 1;
538
501
          }
539
502
          else
540
503
          {
541
 
            session->status_var.ha_prepare_count++;
 
504
            status_var_increment(session->status_var.ha_prepare_count);
542
505
          }
543
506
        }
544
507
      }
560
523
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
524
end:
562
525
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
 
526
      start_waiting_global_read_lock(session);
564
527
  }
565
528
  return error;
566
529
}
597
560
        }
598
561
        else if (normal_transaction)
599
562
        {
600
 
          session->status_var.ha_commit_count++;
 
563
          status_var_increment(session->status_var.ha_commit_count);
601
564
        }
602
565
      }
603
566
      else if (resource->participatesInSqlTransaction())
609
572
        }
610
573
        else if (normal_transaction)
611
574
        {
612
 
          session->status_var.ha_commit_count++;
 
575
          status_var_increment(session->status_var.ha_commit_count);
613
576
        }
614
577
      }
615
578
      resource_context->reset(); /* keep it conveniently zero-filled */
663
626
        }
664
627
        else if (normal_transaction)
665
628
        {
666
 
          session->status_var.ha_rollback_count++;
 
629
          status_var_increment(session->status_var.ha_rollback_count);
667
630
        }
668
631
      }
669
632
      else if (resource->participatesInSqlTransaction())
675
638
        }
676
639
        else if (normal_transaction)
677
640
        {
678
 
          session->status_var.ha_rollback_count++;
 
641
          status_var_increment(session->status_var.ha_rollback_count);
679
642
        }
680
643
      }
681
644
      resource_context->reset(); /* keep it conveniently zero-filled */
688
651
     * a rollback statement with the corresponding transaction ID
689
652
     * to rollback.
690
653
     */
691
 
    if (normal_transaction)
692
 
      rollbackTransactionMessage(session);
 
654
    rollbackTransactionMessage(session);
693
655
 
694
656
    if (is_real_trans)
695
657
      session->transaction.xid_state.xid.null();
707
669
   */
708
670
  if (is_real_trans &&
709
671
      session->transaction.all.hasModifiedNonTransData() &&
710
 
      session->getKilled() != Session::KILL_CONNECTION)
 
672
      session->killed != Session::KILL_CONNECTION)
711
673
  {
712
674
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
675
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
730
692
*/
731
693
int TransactionServices::autocommitOrRollback(Session *session, int error)
732
694
{
733
 
 
734
695
  if (session->transaction.stmt.getResourceContexts().empty() == false)
735
696
  {
736
 
    TransactionContext *trans = &session->transaction.stmt;
737
 
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
738
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
739
 
         it != resource_contexts.end();
740
 
         ++it)
741
 
    {
742
 
      ResourceContext *resource_context= *it;
743
 
 
744
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
745
 
    }
746
 
 
747
697
    if (! error)
748
698
    {
749
699
      if (commitTransaction(session, false))
801
751
      }
802
752
      else
803
753
      {
804
 
        session->status_var.ha_savepoint_rollback_count++;
 
754
        status_var_increment(session->status_var.ha_savepoint_rollback_count);
805
755
      }
806
756
    }
807
757
    trans->no_2pc|= not resource->participatesInXaTransaction();
858
808
        }
859
809
        else
860
810
        {
861
 
          session->status_var.ha_rollback_count++;
 
811
          status_var_increment(session->status_var.ha_rollback_count);
862
812
        }
863
813
      }
864
814
      resource_context->reset(); /* keep it conveniently zero-filled */
865
815
    }
866
816
  }
867
817
  trans->setResourceContexts(sv_resource_contexts);
868
 
 
869
 
  if (shouldConstructMessages())
870
 
  {
871
 
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
872
 
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
873
 
    if (savepoint_transaction != NULL)
874
 
    {
875
 
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
876
 
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
877
 
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
878
 
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
879
 
      */ 
880
 
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
881
 
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
882
 
      if (num_statements == 0)
883
 
      {    
884
 
        session->setStatementMessage(NULL);
885
 
      }    
886
 
      else 
887
 
      {
888
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
889
 
      }    
890
 
      session->setTransactionMessage(savepoint_transaction_copy);
891
 
    }
892
 
  }
893
 
 
894
818
  return error;
895
819
}
896
820
 
926
850
        }
927
851
        else
928
852
        {
929
 
          session->status_var.ha_savepoint_count++;
 
853
          status_var_increment(session->status_var.ha_savepoint_count);
930
854
        }
931
855
      }
932
856
    }
935
859
    Remember the list of registered storage engines.
936
860
  */
937
861
  sv.setResourceContexts(resource_contexts);
938
 
 
939
 
  if (shouldConstructMessages())
940
 
  {
941
 
    message::Transaction *transaction= session->getTransactionMessage();
942
 
                  
943
 
    if (transaction != NULL)
944
 
    {
945
 
      message::Transaction *transaction_savepoint= 
946
 
        new message::Transaction(*transaction);
947
 
      sv.setTransactionMessage(transaction_savepoint);
948
 
    }
949
 
  } 
950
 
 
951
862
  return error;
952
863
}
953
864
 
975
886
      }
976
887
    }
977
888
  }
978
 
  
979
889
  return error;
980
890
}
981
891
 
985
895
  return replication_services.isActive();
986
896
}
987
897
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
898
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
989
899
{
990
900
  message::Transaction *transaction= in_session->getTransactionMessage();
991
901
 
997
907
     * deleting transaction message when done with it.
998
908
     */
999
909
    transaction= new (nothrow) message::Transaction();
1000
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
910
    initTransactionMessage(*transaction, in_session);
1001
911
    in_session->setTransactionMessage(transaction);
1002
912
    return transaction;
1003
913
  }
1006
916
}
1007
917
 
1008
918
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1009
 
                                                 Session *in_session,
1010
 
                                                 bool should_inc_trx_id)
 
919
                                          Session *in_session)
1011
920
{
1012
921
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1013
922
  trx->set_server_id(in_session->getServerId());
1014
 
 
1015
 
  if (should_inc_trx_id)
1016
 
  {
1017
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1018
 
    in_session->setXaId(0);
1019
 
  }  
1020
 
  else
1021
 
  { 
1022
 
    trx->set_transaction_id(0);
1023
 
  }
1024
 
 
 
923
  trx->set_transaction_id(getNextTransactionId());
1025
924
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1026
925
}
1027
926
 
1038
937
  delete in_transaction;
1039
938
  in_session->setStatementMessage(NULL);
1040
939
  in_session->setTransactionMessage(NULL);
1041
 
  in_session->setXaId(0);
1042
940
}
1043
941
 
1044
942
int TransactionServices::commitTransactionMessage(Session *in_session)
1074
972
{
1075
973
  statement.set_type(in_type);
1076
974
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1077
 
 
1078
 
  if (in_session->variables.replicate_query)
1079
 
    statement.set_sql(in_session->getQueryString()->c_str());
 
975
  /** @TODO Set sql string optionally */
1080
976
}
1081
977
 
1082
978
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1113
1009
   */
1114
1010
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1115
1011
  {
1116
 
    /* Remember the transaction ID so we can re-use it */
1117
 
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1118
 
 
1119
1012
    /*
1120
1013
     * Clear the transaction, create a Rollback statement message, 
1121
1014
     * attach it to the transaction, and push it to replicators.
1122
1015
     */
1123
1016
    transaction->Clear();
1124
 
    initTransactionMessage(*transaction, in_session, false);
1125
 
 
1126
 
    /* Set the transaction ID to match the previous messages */
1127
 
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1017
    initTransactionMessage(*transaction, in_session);
1128
1018
 
1129
1019
    message::Statement *statement= transaction->add_statement();
1130
1020
 
1139
1029
}
1140
1030
 
1141
1031
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1142
 
                                                            Table *in_table,
1143
 
                                                            uint32_t *next_segment_id)
 
1032
                                                                 Table *in_table)
1144
1033
{
1145
1034
  message::Statement *statement= in_session->getStatementMessage();
1146
 
  message::Transaction *transaction= NULL;
1147
 
 
1148
 
  /* 
1149
 
   * Check the type for the current Statement message, if it is anything
1150
 
   * other then INSERT we need to call finalize, this will ensure a 
1151
 
   * new InsertStatement is created. If it is of type INSERT check
1152
 
   * what table the INSERT belongs to, if it is a different table
1153
 
   * call finalize, so a new InsertStatement can be created. 
 
1035
  /*
 
1036
   * We check to see if the current Statement message is of type INSERT.
 
1037
   * If it is not, we finalize the current Statement and ensure a new
 
1038
   * InsertStatement is created.
1154
1039
   */
1155
 
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1040
  if (statement != NULL &&
 
1041
      statement->type() != message::Statement::INSERT)
1156
1042
  {
1157
1043
    finalizeStatementMessage(*statement, in_session);
1158
1044
    statement= in_session->getStatementMessage();
1159
 
  } 
1160
 
  else if (statement != NULL)
1161
 
  {
1162
 
    transaction= getActiveTransactionMessage(in_session);
1163
 
 
1164
 
    /*
1165
 
     * If we've passed our threshold for the statement size (possible for
1166
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1167
 
     * the Transaction will keep it from getting huge).
1168
 
     */
1169
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1170
 
      in_session->variables.transaction_message_threshold)
1171
 
    {
1172
 
      /* Remember the transaction ID so we can re-use it */
1173
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1174
 
 
1175
 
      message::InsertData *current_data= statement->mutable_insert_data();
1176
 
 
1177
 
      /* Caller should use this value when adding a new record */
1178
 
      *next_segment_id= current_data->segment_id() + 1;
1179
 
 
1180
 
      current_data->set_end_segment(false);
1181
 
 
1182
 
      /* 
1183
 
       * Send the trx message to replicators after finalizing the 
1184
 
       * statement and transaction. This will also set the Transaction
1185
 
       * and Statement objects in Session to NULL.
1186
 
       */
1187
 
      commitTransactionMessage(in_session);
1188
 
 
1189
 
      /*
1190
 
       * Statement and Transaction should now be NULL, so new ones will get
1191
 
       * created. We reuse the transaction id since we are segmenting
1192
 
       * one transaction.
1193
 
       */
1194
 
      statement= in_session->getStatementMessage();
1195
 
      transaction= getActiveTransactionMessage(in_session, false);
1196
 
      assert(transaction != NULL);
1197
 
 
1198
 
      /* Set the transaction ID to match the previous messages */
1199
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1200
 
    }
1201
 
    else
1202
 
    {
1203
 
      const message::InsertHeader &insert_header= statement->insert_header();
1204
 
      string old_table_name= insert_header.table_metadata().table_name();
1205
 
     
1206
 
      string current_table_name;
1207
 
      (void) in_table->getShare()->getTableName(current_table_name);
1208
 
 
1209
 
      if (current_table_name.compare(old_table_name))
1210
 
      {
1211
 
        finalizeStatementMessage(*statement, in_session);
1212
 
        statement= in_session->getStatementMessage();
1213
 
      }
1214
 
      else
1215
 
      {
1216
 
        /* carry forward the existing segment id */
1217
 
        const message::InsertData &current_data= statement->insert_data();
1218
 
        *next_segment_id= current_data.segment_id();
1219
 
      }
1220
 
    }
1221
 
  } 
 
1045
  }
1222
1046
 
1223
1047
  if (statement == NULL)
1224
1048
  {
1225
 
    /*
1226
 
     * Transaction will be non-NULL only if we had to segment it due to
1227
 
     * transaction size above.
1228
 
     */
1229
 
    if (transaction == NULL)
1230
 
      transaction= getActiveTransactionMessage(in_session);
1231
 
 
 
1049
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1232
1050
    /* 
1233
1051
     * Transaction message initialized and set, but no statement created
1234
1052
     * yet.  We construct one and initialize it, here, then return the
1299
1117
    return true;
1300
1118
  }
1301
1119
 
1302
 
  uint32_t next_segment_id= 1;
1303
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1120
  message::Statement &statement= getInsertStatement(in_session, in_table);
1304
1121
 
1305
1122
  message::InsertData *data= statement.mutable_insert_data();
1306
 
  data->set_segment_id(next_segment_id);
 
1123
  data->set_segment_id(1);
1307
1124
  data->set_end_segment(true);
1308
1125
  message::InsertRecord *record= data->add_record();
1309
1126
 
1318
1135
 
1319
1136
  while ((current_field= *table_fields++) != NULL) 
1320
1137
  {
1321
 
    if (current_field->is_null())
1322
 
    {
1323
 
      record->add_is_null(true);
1324
 
      record->add_insert_value("", 0);
1325
 
    } 
1326
 
    else 
1327
 
    {
1328
 
      string_value= current_field->val_str(string_value);
1329
 
      record->add_is_null(false);
1330
 
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1331
 
      string_value->free();
1332
 
    }
 
1138
    string_value= current_field->val_str(string_value);
 
1139
    record->add_insert_value(string_value->c_ptr(), string_value->length());
 
1140
    string_value->free();
1333
1141
  }
1334
1142
  return false;
1335
1143
}
1337
1145
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1338
1146
                                                            Table *in_table,
1339
1147
                                                            const unsigned char *old_record, 
1340
 
                                                            const unsigned char *new_record,
1341
 
                                                            uint32_t *next_segment_id)
 
1148
                                                            const unsigned char *new_record)
1342
1149
{
1343
1150
  message::Statement *statement= in_session->getStatementMessage();
1344
 
  message::Transaction *transaction= NULL;
1345
 
 
1346
1151
  /*
1347
 
   * Check the type for the current Statement message, if it is anything
1348
 
   * other then UPDATE we need to call finalize, this will ensure a
1349
 
   * new UpdateStatement is created. If it is of type UPDATE check
1350
 
   * what table the UPDATE belongs to, if it is a different table
1351
 
   * call finalize, so a new UpdateStatement can be created.
 
1152
   * We check to see if the current Statement message is of type UPDATE.
 
1153
   * If it is not, we finalize the current Statement and ensure a new
 
1154
   * UpdateStatement is created.
1352
1155
   */
1353
 
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
 
1156
  if (statement != NULL &&
 
1157
      statement->type() != message::Statement::UPDATE)
1354
1158
  {
1355
1159
    finalizeStatementMessage(*statement, in_session);
1356
1160
    statement= in_session->getStatementMessage();
1357
1161
  }
1358
 
  else if (statement != NULL)
1359
 
  {
1360
 
    transaction= getActiveTransactionMessage(in_session);
1361
 
 
1362
 
    /*
1363
 
     * If we've passed our threshold for the statement size (possible for
1364
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1365
 
     * the Transaction will keep it from getting huge).
1366
 
     */
1367
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1368
 
      in_session->variables.transaction_message_threshold)
1369
 
    {
1370
 
      /* Remember the transaction ID so we can re-use it */
1371
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1372
 
 
1373
 
      message::UpdateData *current_data= statement->mutable_update_data();
1374
 
 
1375
 
      /* Caller should use this value when adding a new record */
1376
 
      *next_segment_id= current_data->segment_id() + 1;
1377
 
 
1378
 
      current_data->set_end_segment(false);
1379
 
 
1380
 
      /*
1381
 
       * Send the trx message to replicators after finalizing the 
1382
 
       * statement and transaction. This will also set the Transaction
1383
 
       * and Statement objects in Session to NULL.
1384
 
       */
1385
 
      commitTransactionMessage(in_session);
1386
 
 
1387
 
      /*
1388
 
       * Statement and Transaction should now be NULL, so new ones will get
1389
 
       * created. We reuse the transaction id since we are segmenting
1390
 
       * one transaction.
1391
 
       */
1392
 
      statement= in_session->getStatementMessage();
1393
 
      transaction= getActiveTransactionMessage(in_session, false);
1394
 
      assert(transaction != NULL);
1395
 
 
1396
 
      /* Set the transaction ID to match the previous messages */
1397
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1398
 
    }
1399
 
    else
1400
 
    {
1401
 
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1402
 
      {
1403
 
        /* carry forward the existing segment id */
1404
 
        const message::UpdateData &current_data= statement->update_data();
1405
 
        *next_segment_id= current_data.segment_id();
1406
 
      } 
1407
 
      else 
1408
 
      {
1409
 
        finalizeStatementMessage(*statement, in_session);
1410
 
        statement= in_session->getStatementMessage();
1411
 
      }
1412
 
    }
1413
 
  }
1414
1162
 
1415
1163
  if (statement == NULL)
1416
1164
  {
1417
 
    /*
1418
 
     * Transaction will be non-NULL only if we had to segment it due to
1419
 
     * transaction size above.
1420
 
     */
1421
 
    if (transaction == NULL)
1422
 
      transaction= getActiveTransactionMessage(in_session);
1423
 
 
 
1165
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1424
1166
    /* 
1425
1167
     * Transaction message initialized and set, but no statement created
1426
1168
     * yet.  We construct one and initialize it, here, then return the
1434
1176
  return *statement;
1435
1177
}
1436
1178
 
1437
 
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1438
 
                                                  Table *in_table,
1439
 
                                                  const unsigned char *old_record,
1440
 
                                                  const unsigned char *new_record)
1441
 
{
1442
 
  const message::UpdateHeader &update_header= statement.update_header();
1443
 
  string old_table_name= update_header.table_metadata().table_name();
1444
 
 
1445
 
  string current_table_name;
1446
 
  (void) in_table->getShare()->getTableName(current_table_name);
1447
 
  if (current_table_name.compare(old_table_name))
1448
 
  {
1449
 
    return false;
1450
 
  }
1451
 
  else
1452
 
  {
1453
 
    /* Compare the set fields in the existing UpdateHeader and see if they
1454
 
     * match the updated fields in the new record, if they do not we must
1455
 
     * create a new UpdateHeader 
1456
 
     */
1457
 
    size_t num_set_fields= update_header.set_field_metadata_size();
1458
 
 
1459
 
    Field *current_field;
1460
 
    Field **table_fields= in_table->getFields();
1461
 
    in_table->setReadSet();
1462
 
 
1463
 
    size_t num_calculated_updated_fields= 0;
1464
 
    bool found= false;
1465
 
    while ((current_field= *table_fields++) != NULL)
1466
 
    {
1467
 
      if (num_calculated_updated_fields > num_set_fields)
1468
 
      {
1469
 
        break;
1470
 
      }
1471
 
 
1472
 
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
1473
 
      {
1474
 
        /* check that this field exists in the UpdateHeader record */
1475
 
        found= false;
1476
 
 
1477
 
        for (size_t x= 0; x < num_set_fields; ++x)
1478
 
        {
1479
 
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1480
 
          string name= field_metadata.name();
1481
 
          if (name.compare(current_field->field_name) == 0)
1482
 
          {
1483
 
            found= true;
1484
 
            ++num_calculated_updated_fields;
1485
 
            break;
1486
 
          } 
1487
 
        }
1488
 
        if (! found)
1489
 
        {
1490
 
          break;
1491
 
        } 
1492
 
      }
1493
 
    }
1494
 
 
1495
 
    if ((num_calculated_updated_fields == num_set_fields) && found)
1496
 
    {
1497
 
      return true;
1498
 
    } 
1499
 
    else 
1500
 
    {
1501
 
      return false;
1502
 
    }
1503
 
  }
1504
 
}  
1505
 
 
1506
1179
void TransactionServices::setUpdateHeader(message::Statement &statement,
1507
1180
                                          Session *in_session,
1508
1181
                                          Table *in_table,
1548
1221
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1549
1222
    }
1550
1223
 
1551
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1224
    /*
 
1225
     * The below really should be moved into the Field API and Record API.  But for now
 
1226
     * we do this crazy pointer fiddling to figure out if the current field
 
1227
     * has been updated in the supplied record raw byte pointers.
 
1228
     */
 
1229
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1230
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1231
 
 
1232
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1233
 
 
1234
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1552
1235
    {
1553
1236
      /* Field is changed from old to new */
1554
1237
      field_metadata= header->add_set_field_metadata();
1566
1249
  if (! replication_services.isActive())
1567
1250
    return;
1568
1251
 
1569
 
  uint32_t next_segment_id= 1;
1570
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1252
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1571
1253
 
1572
1254
  message::UpdateData *data= statement.mutable_update_data();
1573
 
  data->set_segment_id(next_segment_id);
 
1255
  data->set_segment_id(1);
1574
1256
  data->set_end_segment(true);
1575
1257
  message::UpdateRecord *record= data->add_record();
1576
1258
 
1591
1273
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1592
1274
     *
1593
1275
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
1276
     *
 
1277
     * The below really should be moved into the Field API and Record API.  But for now
 
1278
     * we do this crazy pointer fiddling to figure out if the current field
 
1279
     * has been updated in the supplied record raw byte pointers.
1594
1280
     */
1595
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1281
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1282
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1283
 
 
1284
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1285
 
 
1286
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1596
1287
    {
1597
1288
      /* Store the original "read bit" for this field */
1598
1289
      bool is_read_set= current_field->isReadSet();
1609
1300
       */
1610
1301
      current_field->setReadSet(is_read_set);
1611
1302
 
1612
 
      if (current_field->is_null())
1613
 
      {
1614
 
        record->add_is_null(true);
1615
 
        record->add_after_value("", 0);
1616
 
      }
1617
 
      else
1618
 
      {
1619
 
        record->add_is_null(false);
1620
 
        record->add_after_value(string_value->c_ptr(), string_value->length());
1621
 
      }
 
1303
      record->add_after_value(string_value->c_ptr(), string_value->length());
1622
1304
      string_value->free();
1623
1305
    }
1624
1306
 
1644
1326
  }
1645
1327
}
1646
1328
 
1647
 
bool TransactionServices::isFieldUpdated(Field *current_field,
1648
 
                                         Table *in_table,
1649
 
                                         const unsigned char *old_record,
1650
 
                                         const unsigned char *new_record)
1651
 
{
1652
 
  /*
1653
 
   * The below really should be moved into the Field API and Record API.  But for now
1654
 
   * we do this crazy pointer fiddling to figure out if the current field
1655
 
   * has been updated in the supplied record raw byte pointers.
1656
 
   */
1657
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1658
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1659
 
 
1660
 
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1661
 
 
1662
 
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1663
 
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1664
 
 
1665
 
  bool isUpdated= false;
1666
 
  if (old_value_is_null != new_value_is_null)
1667
 
  {
1668
 
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1669
 
    {
1670
 
      isUpdated= true;
1671
 
    }
1672
 
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1673
 
    {
1674
 
      isUpdated= true;
1675
 
    }
1676
 
  }
1677
 
 
1678
 
  if (! isUpdated)
1679
 
  {
1680
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1681
 
    {
1682
 
      isUpdated= true;
1683
 
    }
1684
 
  }
1685
 
  return isUpdated;
1686
 
}  
1687
 
 
1688
1329
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1689
 
                                                            Table *in_table,
1690
 
                                                            uint32_t *next_segment_id)
 
1330
                                                            Table *in_table)
1691
1331
{
1692
1332
  message::Statement *statement= in_session->getStatementMessage();
1693
 
  message::Transaction *transaction= NULL;
1694
 
 
1695
1333
  /*
1696
 
   * Check the type for the current Statement message, if it is anything
1697
 
   * other then DELETE we need to call finalize, this will ensure a
1698
 
   * new DeleteStatement is created. If it is of type DELETE check
1699
 
   * what table the DELETE belongs to, if it is a different table
1700
 
   * call finalize, so a new DeleteStatement can be created.
 
1334
   * We check to see if the current Statement message is of type DELETE.
 
1335
   * If it is not, we finalize the current Statement and ensure a new
 
1336
   * DeleteStatement is created.
1701
1337
   */
1702
 
  if (statement != NULL && statement->type() != message::Statement::DELETE)
 
1338
  if (statement != NULL &&
 
1339
      statement->type() != message::Statement::DELETE)
1703
1340
  {
1704
1341
    finalizeStatementMessage(*statement, in_session);
1705
1342
    statement= in_session->getStatementMessage();
1706
1343
  }
1707
 
  else if (statement != NULL)
1708
 
  {
1709
 
    transaction= getActiveTransactionMessage(in_session);
1710
 
 
1711
 
    /*
1712
 
     * If we've passed our threshold for the statement size (possible for
1713
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1714
 
     * the Transaction will keep it from getting huge).
1715
 
     */
1716
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1717
 
      in_session->variables.transaction_message_threshold)
1718
 
    {
1719
 
      /* Remember the transaction ID so we can re-use it */
1720
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1721
 
 
1722
 
      message::DeleteData *current_data= statement->mutable_delete_data();
1723
 
 
1724
 
      /* Caller should use this value when adding a new record */
1725
 
      *next_segment_id= current_data->segment_id() + 1;
1726
 
 
1727
 
      current_data->set_end_segment(false);
1728
 
 
1729
 
      /* 
1730
 
       * Send the trx message to replicators after finalizing the 
1731
 
       * statement and transaction. This will also set the Transaction
1732
 
       * and Statement objects in Session to NULL.
1733
 
       */
1734
 
      commitTransactionMessage(in_session);
1735
 
 
1736
 
      /*
1737
 
       * Statement and Transaction should now be NULL, so new ones will get
1738
 
       * created. We reuse the transaction id since we are segmenting
1739
 
       * one transaction.
1740
 
       */
1741
 
      statement= in_session->getStatementMessage();
1742
 
      transaction= getActiveTransactionMessage(in_session, false);
1743
 
      assert(transaction != NULL);
1744
 
 
1745
 
      /* Set the transaction ID to match the previous messages */
1746
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1747
 
    }
1748
 
    else
1749
 
    {
1750
 
      const message::DeleteHeader &delete_header= statement->delete_header();
1751
 
      string old_table_name= delete_header.table_metadata().table_name();
1752
 
 
1753
 
      string current_table_name;
1754
 
      (void) in_table->getShare()->getTableName(current_table_name);
1755
 
      if (current_table_name.compare(old_table_name))
1756
 
      {
1757
 
        finalizeStatementMessage(*statement, in_session);
1758
 
        statement= in_session->getStatementMessage();
1759
 
      }
1760
 
      else
1761
 
      {
1762
 
        /* carry forward the existing segment id */
1763
 
        const message::DeleteData &current_data= statement->delete_data();
1764
 
        *next_segment_id= current_data.segment_id();
1765
 
      }
1766
 
    }
1767
 
  }
1768
1344
 
1769
1345
  if (statement == NULL)
1770
1346
  {
1771
 
    /*
1772
 
     * Transaction will be non-NULL only if we had to segment it due to
1773
 
     * transaction size above.
1774
 
     */
1775
 
    if (transaction == NULL)
1776
 
      transaction= getActiveTransactionMessage(in_session);
1777
 
 
 
1347
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1778
1348
    /* 
1779
1349
     * Transaction message initialized and set, but no statement created
1780
1350
     * yet.  We construct one and initialize it, here, then return the
1830
1400
  }
1831
1401
}
1832
1402
 
1833
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1403
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1834
1404
{
1835
1405
  ReplicationServices &replication_services= ReplicationServices::singleton();
1836
1406
  if (! replication_services.isActive())
1837
1407
    return;
1838
1408
 
1839
 
  uint32_t next_segment_id= 1;
1840
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1409
  message::Statement &statement= getDeleteStatement(in_session, in_table);
1841
1410
 
1842
1411
  message::DeleteData *data= statement.mutable_delete_data();
1843
 
  data->set_segment_id(next_segment_id);
 
1412
  data->set_segment_id(1);
1844
1413
  data->set_end_segment(true);
1845
1414
  message::DeleteRecord *record= data->add_record();
1846
1415
 
1858
1427
     */
1859
1428
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1860
1429
    {
1861
 
      if (use_update_record)
1862
 
      {
1863
 
        /*
1864
 
         * Temporarily point to the update record to get its value.
1865
 
         * This is pretty much a hack in order to get the PK value from
1866
 
         * the update record rather than the insert record. Field::val_str()
1867
 
         * should not change anything in Field::ptr, so this should be safe.
1868
 
         * We are careful not to change anything in old_ptr.
1869
 
         */
1870
 
        const unsigned char *old_ptr= current_field->ptr;
1871
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1872
 
        string_value= current_field->val_str(string_value);
1873
 
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1874
 
      }
1875
 
      else
1876
 
      {
1877
 
        string_value= current_field->val_str(string_value);
1878
 
        /**
1879
 
         * @TODO Store optional old record value in the before data member
1880
 
         */
1881
 
      }
 
1430
      string_value= current_field->val_str(string_value);
1882
1431
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1432
      /**
 
1433
       * @TODO Store optional old record value in the before data member
 
1434
       */
1883
1435
      string_value->free();
1884
1436
    }
1885
1437
  }
1886
1438
}
1887
1439
 
1888
 
 
1889
 
/**
1890
 
 * Template for removing Statement records of different types.
1891
 
 *
1892
 
 * The code for removing records from different Statement message types
1893
 
 * is identical except for the class types that are embedded within the
1894
 
 * Statement.
1895
 
 *
1896
 
 * There are 3 scenarios we need to look for:
1897
 
 *   - We've been asked to remove more records than exist in the Statement
1898
 
 *   - We've been asked to remove less records than exist in the Statement
1899
 
 *   - We've been asked to remove ALL records that exist in the Statement
1900
 
 *
1901
 
 * If we are removing ALL records, then effectively we would be left with
1902
 
 * an empty Statement message, so we should just remove it and clean up
1903
 
 * message pointers in the Session object.
1904
 
 */
1905
 
template <class DataType, class RecordType>
1906
 
static bool removeStatementRecordsWithType(Session *session,
1907
 
                                           DataType *data,
1908
 
                                           uint32_t count)
1909
 
{
1910
 
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1911
 
 
1912
 
  /* If there aren't enough records to remove 'count' of them, error. */
1913
 
  if (num_avail_recs < count)
1914
 
    return false;
1915
 
 
1916
 
  /*
1917
 
   * If we are removing all of the data records, we'll just remove this
1918
 
   * entire Statement message.
1919
 
   */
1920
 
  if (num_avail_recs == count)
1921
 
  {
1922
 
    message::Transaction *transaction= session->getTransactionMessage();
1923
 
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1924
 
    statements->RemoveLast();
1925
 
 
1926
 
    /*
1927
 
     * Now need to set the Session Statement pointer to either the previous
1928
 
     * Statement, or NULL if there isn't one.
1929
 
     */
1930
 
    if (statements->size() == 0)
1931
 
    {
1932
 
      session->setStatementMessage(NULL);
1933
 
    }
1934
 
    else
1935
 
    {
1936
 
      /*
1937
 
       * There isn't a great way to get a pointer to the previous Statement
1938
 
       * message using the RepeatedPtrField object, so we'll just get to it
1939
 
       * using the Transaction message.
1940
 
       */
1941
 
      int last_stmt_idx= transaction->statement_size() - 1;
1942
 
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1943
 
    }
1944
 
  }
1945
 
  /* We only need to remove 'count' records */
1946
 
  else if (num_avail_recs > count)
1947
 
  {
1948
 
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1949
 
    while (count--)
1950
 
      records->RemoveLast();
1951
 
  }
1952
 
 
1953
 
  return true;
1954
 
}
1955
 
 
1956
 
 
1957
 
bool TransactionServices::removeStatementRecords(Session *session,
1958
 
                                                 uint32_t count)
1959
 
{
1960
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1961
 
  if (! replication_services.isActive())
1962
 
    return false;
1963
 
 
1964
 
  /* Get the most current Statement */
1965
 
  message::Statement *statement= session->getStatementMessage();
1966
 
 
1967
 
  /* Make sure we have work to do */
1968
 
  if (statement == NULL)
1969
 
    return false;
1970
 
 
1971
 
  bool retval= false;
1972
 
 
1973
 
  switch (statement->type())
1974
 
  {
1975
 
    case message::Statement::INSERT:
1976
 
    {
1977
 
      message::InsertData *data= statement->mutable_insert_data();
1978
 
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1979
 
      break;
1980
 
    }
1981
 
 
1982
 
    case message::Statement::UPDATE:
1983
 
    {
1984
 
      message::UpdateData *data= statement->mutable_update_data();
1985
 
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1986
 
      break;
1987
 
    }
1988
 
 
1989
 
    case message::Statement::DELETE:  /* not sure if this one is possible... */
1990
 
    {
1991
 
      message::DeleteData *data= statement->mutable_delete_data();
1992
 
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
1993
 
      break;
1994
 
    }
1995
 
 
1996
 
    default:
1997
 
      retval= false;
1998
 
      break;
1999
 
  }
2000
 
 
2001
 
  return retval;
2002
 
}
2003
 
 
2004
 
 
2005
1440
void TransactionServices::createTable(Session *in_session,
2006
1441
                                      const message::Table &table)
2007
1442
{
2166
1601
  ReplicationServices &replication_services= ReplicationServices::singleton();
2167
1602
  if (! replication_services.isActive())
2168
1603
    return;
2169
 
 
 
1604
  
2170
1605
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2171
1606
  message::Statement *statement= transaction->add_statement();
2172
1607
 
2181
1616
  cleanupTransactionMessage(transaction, in_session);
2182
1617
}
2183
1618
 
2184
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2185
 
{
2186
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2187
 
  if (! replication_services.isActive())
2188
 
    return 0;
2189
 
 
2190
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
2191
 
 
2192
 
  // set server id, start timestamp
2193
 
  initTransactionMessage(*transaction, session, true);
2194
 
 
2195
 
  // set end timestamp
2196
 
  finalizeTransactionMessage(*transaction, session);
2197
 
 
2198
 
  message::Event *trx_event= transaction->mutable_event();
2199
 
 
2200
 
  trx_event->CopyFrom(event);
2201
 
 
2202
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2203
 
 
2204
 
  delete transaction;
2205
 
 
2206
 
  return static_cast<int>(result);
2207
 
}
2208
 
 
2209
 
bool TransactionServices::sendStartupEvent(Session *session)
2210
 
{
2211
 
  message::Event event;
2212
 
  event.set_type(message::Event::STARTUP);
2213
 
  if (sendEvent(session, event) != 0)
2214
 
    return false;
2215
 
  return true;
2216
 
}
2217
 
 
2218
 
bool TransactionServices::sendShutdownEvent(Session *session)
2219
 
{
2220
 
  message::Event event;
2221
 
  event.set_type(message::Event::SHUTDOWN);
2222
 
  if (sendEvent(session, event) != 0)
2223
 
    return false;
2224
 
  return true;
2225
 
}
2226
 
 
2227
1619
} /* namespace drizzled */