907
910
* deleting transaction message when done with it.
909
912
transaction= new (nothrow) message::Transaction();
910
initTransactionMessage(*transaction, in_session);
913
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
911
914
in_session->setTransactionMessage(transaction);
912
915
return transaction;
918
921
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
923
bool should_inc_trx_id)
921
925
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
922
926
trx->set_server_id(in_session->getServerId());
923
trx->set_transaction_id(getNextTransactionId());
928
if (should_inc_trx_id)
929
trx->set_transaction_id(getNextTransactionId());
931
trx->set_transaction_id(getCurrentTransactionId());
924
933
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1014
1023
* attach it to the transaction, and push it to replicators.
1016
1025
transaction->Clear();
1017
initTransactionMessage(*transaction, in_session);
1026
initTransactionMessage(*transaction, in_session, true);
1019
1028
message::Statement *statement= transaction->add_statement();
1031
1040
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1042
uint32_t *next_segment_id)
1034
1044
message::Statement *statement= in_session->getStatementMessage();
1045
message::Transaction *transaction= NULL;
1037
1048
* Check the type for the current Statement message, if it is anything
1048
1059
else if (statement != NULL)
1050
const message::InsertHeader &insert_header= statement->insert_header();
1051
string old_table_name= insert_header.table_metadata().table_name();
1062
* If we've passed our threshold for the statement size (possible for
1063
* a bulk insert), we'll finalize the Statement and Transaction (doing
1064
* the Transaction will keep it from getting huge).
1066
if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1068
message::InsertData *current_data= statement->mutable_insert_data();
1070
/* Caller should use this value when adding a new record */
1071
*next_segment_id= current_data->segment_id() + 1;
1073
current_data->set_end_segment(false);
1076
* Send the trx message to replicators after finalizing the
1077
* statement and transaction. This will also set the Transaction
1078
* and Statement objects in Session to NULL.
1080
commitTransactionMessage(in_session);
1083
* Statement and Transaction should now be NULL, so new ones will get
1084
* created. We reuse the transaction id since we are segmenting
1087
statement= in_session->getStatementMessage();
1088
transaction= getActiveTransactionMessage(in_session, false);
1092
const message::InsertHeader &insert_header= statement->insert_header();
1093
string old_table_name= insert_header.table_metadata().table_name();
1053
string current_table_name;
1054
(void) in_table->getShare()->getTableName(current_table_name);
1055
if (current_table_name.compare(old_table_name))
1057
finalizeStatementMessage(*statement, in_session);
1058
statement= in_session->getStatementMessage();
1095
string current_table_name;
1096
(void) in_table->getShare()->getTableName(current_table_name);
1097
if (current_table_name.compare(old_table_name))
1099
finalizeStatementMessage(*statement, in_session);
1100
statement= in_session->getStatementMessage();
1062
1105
if (statement == NULL)
1064
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1108
* Transaction will be non-NULL only if we had to segment it due to
1109
* transaction size above.
1111
if (transaction == NULL)
1112
transaction= getActiveTransactionMessage(in_session);
1066
1115
* Transaction message initialized and set, but no statement created
1067
1116
* yet. We construct one and initialize it, here, then return the
1135
message::Statement &statement= getInsertStatement(in_session, in_table);
1184
uint32_t next_segment_id= 1;
1185
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1137
1187
message::InsertData *data= statement.mutable_insert_data();
1138
data->set_segment_id(1);
1188
data->set_segment_id(next_segment_id);
1139
1189
data->set_end_segment(true);
1140
1190
message::InsertRecord *record= data->add_record();
1169
1219
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1170
1220
Table *in_table,
1171
1221
const unsigned char *old_record,
1172
const unsigned char *new_record)
1222
const unsigned char *new_record,
1223
uint32_t *next_segment_id)
1174
1225
message::Statement *statement= in_session->getStatementMessage();
1226
message::Transaction *transaction= NULL;
1177
1229
* Check the type for the current Statement message, if it is anything
1188
1240
else if (statement != NULL)
1190
const message::UpdateHeader &update_header= statement->update_header();
1191
string old_table_name= update_header.table_metadata().table_name();
1193
string current_table_name;
1194
(void) in_table->getShare()->getTableName(current_table_name);
1195
if (current_table_name.compare(old_table_name))
1243
* If we've passed our threshold for the statement size (possible for
1244
* a bulk insert), we'll finalize the Statement and Transaction (doing
1245
* the Transaction will keep it from getting huge).
1247
if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1197
finalizeStatementMessage(*statement, in_session);
1249
message::UpdateData *current_data= statement->mutable_update_data();
1251
/* Caller should use this value when adding a new record */
1252
*next_segment_id= current_data->segment_id() + 1;
1254
current_data->set_end_segment(false);
1257
* Send the trx message to replicators after finalizing the
1258
* statement and transaction. This will also set the Transaction
1259
* and Statement objects in Session to NULL.
1261
commitTransactionMessage(in_session);
1264
* Statement and Transaction should now be NULL, so new ones will get
1265
* created. We reuse the transaction id since we are segmenting
1198
1268
statement= in_session->getStatementMessage();
1269
transaction= getActiveTransactionMessage(in_session, false);
1273
const message::UpdateHeader &update_header= statement->update_header();
1274
string old_table_name= update_header.table_metadata().table_name();
1276
string current_table_name;
1277
(void) in_table->getShare()->getTableName(current_table_name);
1278
if (current_table_name.compare(old_table_name))
1280
finalizeStatementMessage(*statement, in_session);
1281
statement= in_session->getStatementMessage();
1202
1286
if (statement == NULL)
1204
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1289
* Transaction will be non-NULL only if we had to segment it due to
1290
* transaction size above.
1292
if (transaction == NULL)
1293
transaction= getActiveTransactionMessage(in_session);
1206
1296
* Transaction message initialized and set, but no statement created
1207
1297
* yet. We construct one and initialize it, here, then return the
1288
1378
if (! replication_services.isActive())
1291
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1381
uint32_t next_segment_id= 1;
1382
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1293
1384
message::UpdateData *data= statement.mutable_update_data();
1294
data->set_segment_id(1);
1385
data->set_segment_id(next_segment_id);
1295
1386
data->set_end_segment(true);
1296
1387
message::UpdateRecord *record= data->add_record();
1377
1468
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1470
uint32_t *next_segment_id)
1380
1472
message::Statement *statement= in_session->getStatementMessage();
1473
message::Transaction *transaction= NULL;
1383
1476
* Check the type for the current Statement message, if it is anything
1394
1487
else if (statement != NULL)
1396
const message::DeleteHeader &delete_header= statement->delete_header();
1397
string old_table_name= delete_header.table_metadata().table_name();
1399
string current_table_name;
1400
(void) in_table->getShare()->getTableName(current_table_name);
1401
if (current_table_name.compare(old_table_name))
1490
* If we've passed our threshold for the statement size (possible for
1491
* a bulk insert), we'll finalize the Statement and Transaction (doing
1492
* the Transaction will keep it from getting huge).
1494
if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1403
finalizeStatementMessage(*statement, in_session);
1496
message::DeleteData *current_data= statement->mutable_delete_data();
1498
/* Caller should use this value when adding a new record */
1499
*next_segment_id= current_data->segment_id() + 1;
1501
current_data->set_end_segment(false);
1504
* Send the trx message to replicators after finalizing the
1505
* statement and transaction. This will also set the Transaction
1506
* and Statement objects in Session to NULL.
1508
commitTransactionMessage(in_session);
1511
* Statement and Transaction should now be NULL, so new ones will get
1512
* created. We reuse the transaction id since we are segmenting
1404
1515
statement= in_session->getStatementMessage();
1516
transaction= getActiveTransactionMessage(in_session, false);
1520
const message::DeleteHeader &delete_header= statement->delete_header();
1521
string old_table_name= delete_header.table_metadata().table_name();
1523
string current_table_name;
1524
(void) in_table->getShare()->getTableName(current_table_name);
1525
if (current_table_name.compare(old_table_name))
1527
finalizeStatementMessage(*statement, in_session);
1528
statement= in_session->getStatementMessage();
1408
1533
if (statement == NULL)
1410
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1536
* Transaction will be non-NULL only if we had to segment it due to
1537
* transaction size above.
1539
if (transaction == NULL)
1540
transaction= getActiveTransactionMessage(in_session);
1412
1543
* Transaction message initialized and set, but no statement created
1413
1544
* yet. We construct one and initialize it, here, then return the
1466
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1597
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1468
1599
ReplicationServices &replication_services= ReplicationServices::singleton();
1469
1600
if (! replication_services.isActive())
1472
message::Statement &statement= getDeleteStatement(in_session, in_table);
1603
uint32_t next_segment_id= 1;
1604
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1474
1606
message::DeleteData *data= statement.mutable_delete_data();
1475
data->set_segment_id(1);
1607
data->set_segment_id(next_segment_id);
1476
1608
data->set_end_segment(true);
1477
1609
message::DeleteRecord *record= data->add_record();
1491
1623
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1493
string_value= current_field->val_str(string_value);
1625
if (use_update_record)
1628
* Temporarily point to the update record to get its value.
1629
* This is pretty much a hack in order to get the PK value from
1630
* the update record rather than the insert record. Field::val_str()
1631
* should not change anything in Field::ptr, so this should be safe.
1632
* We are careful not to change anything in old_ptr.
1634
const unsigned char *old_ptr= current_field->ptr;
1635
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1636
string_value= current_field->val_str(string_value);
1637
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1641
string_value= current_field->val_str(string_value);
1643
* @TODO Store optional old record value in the before data member
1494
1646
record->add_key_value(string_value->c_ptr(), string_value->length());
1496
* @TODO Store optional old record value in the before data member
1498
1647
string_value->free();