586
591
if (resource->participatesInXaTransaction())
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
590
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
593
598
else if (normal_transaction)
595
session.status_var.ha_commit_count++;
600
session->status_var.ha_commit_count++;
598
603
else if (resource->participatesInSqlTransaction())
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
602
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
605
610
else if (normal_transaction)
607
session.status_var.ha_commit_count++;
612
session->status_var.ha_commit_count++;
610
615
resource_context->reset(); /* keep it conveniently zero-filled */
613
618
if (is_real_trans)
614
session.transaction.xid_state.xid.null();
619
session->transaction.xid_state.xid.null();
616
621
if (normal_transaction)
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
631
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
637
640
We must not rollback the normal transaction if a statement
638
641
transaction is pending.
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
643
646
if (resource_contexts.empty() == false)
998
997
* deleting transaction message when done with it.
1000
999
transaction= new (nothrow) message::Transaction();
1001
initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
session.setTransactionMessage(transaction);
1000
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1001
in_session->setTransactionMessage(transaction);
1003
1002
return transaction;
1006
1005
return transaction;
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
Session::reference session,
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1009
Session *in_session,
1011
1010
bool should_inc_trx_id)
1013
message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
trx->set_server_id(session.getServerId());
1012
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1013
trx->set_server_id(in_session->getServerId());
1016
1015
if (should_inc_trx_id)
1018
trx->set_transaction_id(getCurrentTransactionId(session));
1017
trx->set_transaction_id(getCurrentTransactionId(in_session));
1018
in_session->setXaId(0);
1023
/* trx and seg id will get set properly elsewhere */
1024
1022
trx->set_transaction_id(0);
1027
trx->set_start_timestamp(session.getCurrentTimestamp());
1029
/* segment info may get set elsewhere as needed */
1030
transaction.set_segment_id(1);
1031
transaction.set_end_segment(true);
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
Session::const_reference session)
1037
message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(session.getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
Session::reference session)
1045
session.setStatementMessage(NULL);
1046
session.setTransactionMessage(NULL);
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1025
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1029
Session *in_session)
1031
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1032
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1036
Session *in_session)
1038
delete in_transaction;
1039
in_session->setStatementMessage(NULL);
1040
in_session->setTransactionMessage(NULL);
1043
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
1045
ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1046
if (! replication_services.isActive())
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (session.getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
1064
message::Statement *statement= session.getStatementMessage();
1049
/* If there is an active statement message, finalize it */
1050
message::Statement *statement= in_session->getStatementMessage();
1066
1052
if (statement != NULL)
1068
finalizeStatementMessage(*statement, session);
1071
message::Transaction* transaction= getActiveTransactionMessage(session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, session);
1085
finalizeTransactionMessage(*transaction, session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
cleanupTransactionMessage(transaction, session);
1054
finalizeStatementMessage(*statement, in_session);
1057
return 0; /* No data modification occurred inside the transaction */
1059
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1061
finalizeTransactionMessage(*transaction, in_session);
1063
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1065
cleanupTransactionMessage(transaction, in_session);
1091
1067
return static_cast<int>(result);
1094
1070
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
message::Statement::Type type,
1096
Session::const_reference session)
1071
message::Statement::Type in_type,
1072
Session *in_session)
1098
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1101
if (session.variables.replicate_query)
1102
statement.set_sql(session.getQueryString()->c_str());
1074
statement.set_type(in_type);
1075
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1105
1078
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session::reference session)
1079
Session *in_session)
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1109
session.setStatementMessage(NULL);
1081
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1082
in_session->setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1085
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
1087
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1088
if (! replication_services.isActive())
1118
message::Transaction *transaction= getActiveTransactionMessage(session);
1091
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1121
1094
* OK, so there are two situations that we need to deal with here:
1139
1112
/* Remember the transaction ID so we can re-use it */
1140
1113
uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
uint32_t seg_id= transaction->segment_id();
1144
1116
* Clear the transaction, create a Rollback statement message,
1145
1117
* attach it to the transaction, and push it to replicators.
1147
1119
transaction->Clear();
1148
initTransactionMessage(*transaction, session, false);
1120
initTransactionMessage(*transaction, in_session, false);
1150
1122
/* Set the transaction ID to match the previous messages */
1151
1123
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
transaction->set_segment_id(seg_id);
1153
transaction->set_end_segment(true);
1155
1125
message::Statement *statement= transaction->add_statement();
1157
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
finalizeStatementMessage(*statement, session);
1127
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1128
finalizeStatementMessage(*statement, in_session);
1160
finalizeTransactionMessage(*transaction, session);
1130
finalizeTransactionMessage(*transaction, in_session);
1162
(void) replication_services.pushTransactionMessage(session, *transaction);
1165
cleanupTransactionMessage(transaction, session);
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1170
ReplicationServices &replication_services= ReplicationServices::singleton();
1171
if (! replication_services.isActive())
1174
message::Statement *current_statement= session.getStatementMessage();
1176
/* If we never added a Statement message, nothing to undo. */
1177
if (current_statement == NULL)
1181
* If the Statement has been segmented, then we've already pushed a portion
1182
* of this Statement's row changes through the replication stream and we
1183
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
* delete the current Statement message.
1186
bool is_segmented= false;
1188
switch (current_statement->type())
1190
case message::Statement::INSERT:
1191
if (current_statement->insert_data().segment_id() > 1)
1195
case message::Statement::UPDATE:
1196
if (current_statement->update_data().segment_id() > 1)
1200
case message::Statement::DELETE:
1201
if (current_statement->delete_data().segment_id() > 1)
1210
* Remove the Statement message we've been working with (same as
1211
* current_statement).
1213
message::Transaction *transaction= getActiveTransactionMessage(session);
1214
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
statements_in_txn= transaction->mutable_statement();
1216
statements_in_txn->RemoveLast();
1217
session.setStatementMessage(NULL);
1220
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
* an indicator to cancel the previous Statement message which should have
1222
* had its end_segment attribute set to false.
1226
current_statement= transaction->add_statement();
1227
initStatementMessage(*current_statement,
1228
message::Statement::ROLLBACK_STATEMENT,
1230
finalizeStatementMessage(*current_statement, session);
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
message::Transaction *transaction)
1237
uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
uint32_t seg_id= transaction->segment_id();
1240
transaction->set_end_segment(false);
1241
commitTransactionMessage(session);
1242
transaction= getActiveTransactionMessage(session, false);
1244
/* Set the transaction ID to match the previous messages */
1245
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
transaction->set_segment_id(seg_id + 1);
1247
transaction->set_end_segment(true);
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1132
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1134
cleanupTransactionMessage(transaction, in_session);
1137
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1254
1139
uint32_t *next_segment_id)
1256
message::Statement *statement= session.getStatementMessage();
1141
message::Statement *statement= in_session->getStatementMessage();
1257
1142
message::Transaction *transaction= NULL;
1260
* If statement is NULL, this is a new statement.
1261
* If statement is NOT NULL, this a continuation of the same statement.
1262
* This is because autocommitOrRollback() finalizes the statement so that
1263
* we guarantee only one Statement message per statement (i.e., we no longer
1264
* share a single GPB message for multiple statements).
1145
* Check the type for the current Statement message, if it is anything
1146
* other then INSERT we need to call finalize, this will ensure a
1147
* new InsertStatement is created. If it is of type INSERT check
1148
* what table the INSERT belongs to, if it is a different table
1149
* call finalize, so a new InsertStatement can be created.
1266
if (statement == NULL)
1268
transaction= getActiveTransactionMessage(session);
1270
if (static_cast<size_t>(transaction->ByteSize()) >=
1271
transaction_message_threshold)
1273
transaction= segmentTransactionMessage(session, transaction);
1276
statement= transaction->add_statement();
1277
setInsertHeader(*statement, session, table);
1278
session.setStatementMessage(statement);
1282
transaction= getActiveTransactionMessage(session);
1151
if (statement != NULL && statement->type() != message::Statement::INSERT)
1153
finalizeStatementMessage(*statement, in_session);
1154
statement= in_session->getStatementMessage();
1156
else if (statement != NULL)
1158
transaction= getActiveTransactionMessage(in_session);
1285
1161
* If we've passed our threshold for the statement size (possible for
1286
1162
* a bulk insert), we'll finalize the Statement and Transaction (doing
1287
1163
* the Transaction will keep it from getting huge).
1289
1165
if (static_cast<size_t>(transaction->ByteSize()) >=
1290
transaction_message_threshold)
1166
in_session->variables.transaction_message_threshold)
1292
1168
/* Remember the transaction ID so we can re-use it */
1293
1169
uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
uint32_t seg_id= transaction->segment_id();
1296
1171
message::InsertData *current_data= statement->mutable_insert_data();
1298
1173
/* Caller should use this value when adding a new record */
1299
1174
*next_segment_id= current_data->segment_id() + 1;
1301
1176
current_data->set_end_segment(false);
1302
transaction->set_end_segment(false);
1305
1179
* Send the trx message to replicators after finalizing the
1306
1180
* statement and transaction. This will also set the Transaction
1307
1181
* and Statement objects in Session to NULL.
1309
commitTransactionMessage(session);
1183
commitTransactionMessage(in_session);
1312
1186
* Statement and Transaction should now be NULL, so new ones will get
1313
1187
* created. We reuse the transaction id since we are segmenting
1314
1188
* one transaction.
1316
transaction= getActiveTransactionMessage(session, false);
1190
statement= in_session->getStatementMessage();
1191
transaction= getActiveTransactionMessage(in_session, false);
1317
1192
assert(transaction != NULL);
1319
statement= transaction->add_statement();
1320
setInsertHeader(*statement, session, table);
1321
session.setStatementMessage(statement);
1323
1194
/* Set the transaction ID to match the previous messages */
1324
1195
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
transaction->set_segment_id(seg_id + 1);
1326
transaction->set_end_segment(true);
1331
* Continuation of the same statement. Carry forward the existing
1334
const message::InsertData ¤t_data= statement->insert_data();
1335
*next_segment_id= current_data.segment_id();
1199
const message::InsertHeader &insert_header= statement->insert_header();
1200
string old_table_name= insert_header.table_metadata().table_name();
1202
string current_table_name;
1203
(void) in_table->getShare()->getTableName(current_table_name);
1205
if (current_table_name.compare(old_table_name))
1207
finalizeStatementMessage(*statement, in_session);
1208
statement= in_session->getStatementMessage();
1212
/* carry forward the existing segment id */
1213
const message::InsertData ¤t_data= statement->insert_data();
1214
*next_segment_id= current_data.segment_id();
1219
if (statement == NULL)
1222
* Transaction will be non-NULL only if we had to segment it due to
1223
* transaction size above.
1225
if (transaction == NULL)
1226
transaction= getActiveTransactionMessage(in_session);
1229
* Transaction message initialized and set, but no statement created
1230
* yet. We construct one and initialize it, here, then return the
1231
* message after attaching the new Statement message pointer to the
1232
* Session for easy retrieval later...
1234
statement= transaction->add_statement();
1235
setInsertHeader(*statement, in_session, in_table);
1236
in_session->setStatementMessage(statement);
1339
1238
return *statement;
1342
1241
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
Session::const_reference session,
1242
Session *in_session,
1346
initStatementMessage(statement, message::Statement::INSERT, session);
1245
initStatementMessage(statement, message::Statement::INSERT, in_session);
1349
1248
* Now we construct the specialized InsertHeader message inside
1436
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1333
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1438
1335
const unsigned char *old_record,
1439
1336
const unsigned char *new_record,
1440
1337
uint32_t *next_segment_id)
1442
message::Statement *statement= session.getStatementMessage();
1339
message::Statement *statement= in_session->getStatementMessage();
1443
1340
message::Transaction *transaction= NULL;
1446
* If statement is NULL, this is a new statement.
1447
* If statement is NOT NULL, this a continuation of the same statement.
1448
* This is because autocommitOrRollback() finalizes the statement so that
1449
* we guarantee only one Statement message per statement (i.e., we no longer
1450
* share a single GPB message for multiple statements).
1343
* Check the type for the current Statement message, if it is anything
1344
* other then UPDATE we need to call finalize, this will ensure a
1345
* new UpdateStatement is created. If it is of type UPDATE check
1346
* what table the UPDATE belongs to, if it is a different table
1347
* call finalize, so a new UpdateStatement can be created.
1452
if (statement == NULL)
1349
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1454
transaction= getActiveTransactionMessage(session);
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
transaction_message_threshold)
1459
transaction= segmentTransactionMessage(session, transaction);
1462
statement= transaction->add_statement();
1463
setUpdateHeader(*statement, session, table, old_record, new_record);
1464
session.setStatementMessage(statement);
1351
finalizeStatementMessage(*statement, in_session);
1352
statement= in_session->getStatementMessage();
1354
else if (statement != NULL)
1468
transaction= getActiveTransactionMessage(session);
1356
transaction= getActiveTransactionMessage(in_session);
1471
1359
* If we've passed our threshold for the statement size (possible for
1472
1360
* a bulk insert), we'll finalize the Statement and Transaction (doing
1473
1361
* the Transaction will keep it from getting huge).
1475
1363
if (static_cast<size_t>(transaction->ByteSize()) >=
1476
transaction_message_threshold)
1364
in_session->variables.transaction_message_threshold)
1478
1366
/* Remember the transaction ID so we can re-use it */
1479
1367
uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
uint32_t seg_id= transaction->segment_id();
1482
1369
message::UpdateData *current_data= statement->mutable_update_data();
1484
1371
/* Caller should use this value when adding a new record */
1485
1372
*next_segment_id= current_data->segment_id() + 1;
1487
1374
current_data->set_end_segment(false);
1488
transaction->set_end_segment(false);
1491
1377
* Send the trx message to replicators after finalizing the
1492
1378
* statement and transaction. This will also set the Transaction
1493
1379
* and Statement objects in Session to NULL.
1495
commitTransactionMessage(session);
1381
commitTransactionMessage(in_session);
1498
1384
* Statement and Transaction should now be NULL, so new ones will get
1499
1385
* created. We reuse the transaction id since we are segmenting
1500
1386
* one transaction.
1502
transaction= getActiveTransactionMessage(session, false);
1388
statement= in_session->getStatementMessage();
1389
transaction= getActiveTransactionMessage(in_session, false);
1503
1390
assert(transaction != NULL);
1505
statement= transaction->add_statement();
1506
setUpdateHeader(*statement, session, table, old_record, new_record);
1507
session.setStatementMessage(statement);
1509
1392
/* Set the transaction ID to match the previous messages */
1510
1393
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
transaction->set_segment_id(seg_id + 1);
1512
transaction->set_end_segment(true);
1517
* Continuation of the same statement. Carry forward the existing
1520
const message::UpdateData ¤t_data= statement->update_data();
1521
*next_segment_id= current_data.segment_id();
1397
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1399
/* carry forward the existing segment id */
1400
const message::UpdateData ¤t_data= statement->update_data();
1401
*next_segment_id= current_data.segment_id();
1405
finalizeStatementMessage(*statement, in_session);
1406
statement= in_session->getStatementMessage();
1411
if (statement == NULL)
1414
* Transaction will be non-NULL only if we had to segment it due to
1415
* transaction size above.
1417
if (transaction == NULL)
1418
transaction= getActiveTransactionMessage(in_session);
1421
* Transaction message initialized and set, but no statement created
1422
* yet. We construct one and initialize it, here, then return the
1423
* message after attaching the new Statement message pointer to the
1424
* Session for easy retrieval later...
1426
statement= transaction->add_statement();
1427
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1428
in_session->setStatementMessage(statement);
1525
1430
return *statement;
1433
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1435
const unsigned char *old_record,
1436
const unsigned char *new_record)
1438
const message::UpdateHeader &update_header= statement.update_header();
1439
string old_table_name= update_header.table_metadata().table_name();
1441
string current_table_name;
1442
(void) in_table->getShare()->getTableName(current_table_name);
1443
if (current_table_name.compare(old_table_name))
1449
/* Compare the set fields in the existing UpdateHeader and see if they
1450
* match the updated fields in the new record, if they do not we must
1451
* create a new UpdateHeader
1453
size_t num_set_fields= update_header.set_field_metadata_size();
1455
Field *current_field;
1456
Field **table_fields= in_table->getFields();
1457
in_table->setReadSet();
1459
size_t num_calculated_updated_fields= 0;
1461
while ((current_field= *table_fields++) != NULL)
1463
if (num_calculated_updated_fields > num_set_fields)
1468
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1470
/* check that this field exists in the UpdateHeader record */
1473
for (size_t x= 0; x < num_set_fields; ++x)
1475
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1476
string name= field_metadata.name();
1477
if (name.compare(current_field->field_name) == 0)
1480
++num_calculated_updated_fields;
1491
if ((num_calculated_updated_fields == num_set_fields) && found)
1528
1502
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
Session::const_reference session,
1503
Session *in_session,
1531
1505
const unsigned char *old_record,
1532
1506
const unsigned char *new_record)
1534
initStatementMessage(statement, message::Statement::UPDATE, session);
1508
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1537
1511
* Now we construct the specialized UpdateHeader message inside
1707
1681
return isUpdated;
1710
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1684
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1712
1686
uint32_t *next_segment_id)
1714
message::Statement *statement= session.getStatementMessage();
1688
message::Statement *statement= in_session->getStatementMessage();
1715
1689
message::Transaction *transaction= NULL;
1718
* If statement is NULL, this is a new statement.
1719
* If statement is NOT NULL, this a continuation of the same statement.
1720
* This is because autocommitOrRollback() finalizes the statement so that
1721
* we guarantee only one Statement message per statement (i.e., we no longer
1722
* share a single GPB message for multiple statements).
1692
* Check the type for the current Statement message, if it is anything
1693
* other then DELETE we need to call finalize, this will ensure a
1694
* new DeleteStatement is created. If it is of type DELETE check
1695
* what table the DELETE belongs to, if it is a different table
1696
* call finalize, so a new DeleteStatement can be created.
1724
if (statement == NULL)
1698
if (statement != NULL && statement->type() != message::Statement::DELETE)
1726
transaction= getActiveTransactionMessage(session);
1728
if (static_cast<size_t>(transaction->ByteSize()) >=
1729
transaction_message_threshold)
1731
transaction= segmentTransactionMessage(session, transaction);
1734
statement= transaction->add_statement();
1735
setDeleteHeader(*statement, session, table);
1736
session.setStatementMessage(statement);
1700
finalizeStatementMessage(*statement, in_session);
1701
statement= in_session->getStatementMessage();
1703
else if (statement != NULL)
1740
transaction= getActiveTransactionMessage(session);
1705
transaction= getActiveTransactionMessage(in_session);
1743
1708
* If we've passed our threshold for the statement size (possible for
1744
1709
* a bulk insert), we'll finalize the Statement and Transaction (doing
1745
1710
* the Transaction will keep it from getting huge).
1747
1712
if (static_cast<size_t>(transaction->ByteSize()) >=
1748
transaction_message_threshold)
1713
in_session->variables.transaction_message_threshold)
1750
1715
/* Remember the transaction ID so we can re-use it */
1751
1716
uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
uint32_t seg_id= transaction->segment_id();
1754
1718
message::DeleteData *current_data= statement->mutable_delete_data();
1756
1720
/* Caller should use this value when adding a new record */
1757
1721
*next_segment_id= current_data->segment_id() + 1;
1759
1723
current_data->set_end_segment(false);
1760
transaction->set_end_segment(false);
1763
1726
* Send the trx message to replicators after finalizing the
1764
1727
* statement and transaction. This will also set the Transaction
1765
1728
* and Statement objects in Session to NULL.
1767
commitTransactionMessage(session);
1730
commitTransactionMessage(in_session);
1770
1733
* Statement and Transaction should now be NULL, so new ones will get
1771
1734
* created. We reuse the transaction id since we are segmenting
1772
1735
* one transaction.
1774
transaction= getActiveTransactionMessage(session, false);
1737
statement= in_session->getStatementMessage();
1738
transaction= getActiveTransactionMessage(in_session, false);
1775
1739
assert(transaction != NULL);
1777
statement= transaction->add_statement();
1778
setDeleteHeader(*statement, session, table);
1779
session.setStatementMessage(statement);
1781
1741
/* Set the transaction ID to match the previous messages */
1782
1742
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
transaction->set_segment_id(seg_id + 1);
1784
transaction->set_end_segment(true);
1789
* Continuation of the same statement. Carry forward the existing
1792
const message::DeleteData ¤t_data= statement->delete_data();
1793
*next_segment_id= current_data.segment_id();
1746
const message::DeleteHeader &delete_header= statement->delete_header();
1747
string old_table_name= delete_header.table_metadata().table_name();
1749
string current_table_name;
1750
(void) in_table->getShare()->getTableName(current_table_name);
1751
if (current_table_name.compare(old_table_name))
1753
finalizeStatementMessage(*statement, in_session);
1754
statement= in_session->getStatementMessage();
1758
/* carry forward the existing segment id */
1759
const message::DeleteData ¤t_data= statement->delete_data();
1760
*next_segment_id= current_data.segment_id();
1765
if (statement == NULL)
1768
* Transaction will be non-NULL only if we had to segment it due to
1769
* transaction size above.
1771
if (transaction == NULL)
1772
transaction= getActiveTransactionMessage(in_session);
1775
* Transaction message initialized and set, but no statement created
1776
* yet. We construct one and initialize it, here, then return the
1777
* message after attaching the new Statement message pointer to the
1778
* Session for easy retrieval later...
1780
statement= transaction->add_statement();
1781
setDeleteHeader(*statement, in_session, in_table);
1782
in_session->setStatementMessage(statement);
1797
1784
return *statement;
1800
1787
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
Session::const_reference session,
1788
Session *in_session,
1804
initStatementMessage(statement, message::Statement::DELETE, session);
1791
initStatementMessage(statement, message::Statement::DELETE, in_session);
1807
1794
* Now we construct the specialized DeleteHeader message inside
1899
void TransactionServices::createTable(Session::reference session,
1886
* Template for removing Statement records of different types.
1888
* The code for removing records from different Statement message types
1889
* is identical except for the class types that are embedded within the
1892
* There are 3 scenarios we need to look for:
1893
* - We've been asked to remove more records than exist in the Statement
1894
* - We've been asked to remove less records than exist in the Statement
1895
* - We've been asked to remove ALL records that exist in the Statement
1897
* If we are removing ALL records, then effectively we would be left with
1898
* an empty Statement message, so we should just remove it and clean up
1899
* message pointers in the Session object.
1901
template <class DataType, class RecordType>
1902
static bool removeStatementRecordsWithType(Session *session,
1906
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1908
/* If there aren't enough records to remove 'count' of them, error. */
1909
if (num_avail_recs < count)
1913
* If we are removing all of the data records, we'll just remove this
1914
* entire Statement message.
1916
if (num_avail_recs == count)
1918
message::Transaction *transaction= session->getTransactionMessage();
1919
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1920
statements->RemoveLast();
1923
* Now need to set the Session Statement pointer to either the previous
1924
* Statement, or NULL if there isn't one.
1926
if (statements->size() == 0)
1928
session->setStatementMessage(NULL);
1933
* There isn't a great way to get a pointer to the previous Statement
1934
* message using the RepeatedPtrField object, so we'll just get to it
1935
* using the Transaction message.
1937
int last_stmt_idx= transaction->statement_size() - 1;
1938
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1941
/* We only need to remove 'count' records */
1942
else if (num_avail_recs > count)
1944
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1946
records->RemoveLast();
1953
bool TransactionServices::removeStatementRecords(Session *session,
1956
ReplicationServices &replication_services= ReplicationServices::singleton();
1957
if (! replication_services.isActive())
1960
/* Get the most current Statement */
1961
message::Statement *statement= session->getStatementMessage();
1963
/* Make sure we have work to do */
1964
if (statement == NULL)
1969
switch (statement->type())
1971
case message::Statement::INSERT:
1973
message::InsertData *data= statement->mutable_insert_data();
1974
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1978
case message::Statement::UPDATE:
1980
message::UpdateData *data= statement->mutable_update_data();
1981
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1985
case message::Statement::DELETE: /* not sure if this one is possible... */
1987
message::DeleteData *data= statement->mutable_delete_data();
1988
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2001
void TransactionServices::createTable(Session *in_session,
1900
2002
const message::Table &table)
1902
2004
ReplicationServices &replication_services= ReplicationServices::singleton();
1903
2005
if (! replication_services.isActive())
1906
message::Transaction *transaction= getActiveTransactionMessage(session);
2008
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1907
2009
message::Statement *statement= transaction->add_statement();
1909
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
2011
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1912
2014
* Construct the specialized CreateTableStatement message and attach
1975
2076
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1977
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1979
finalizeStatementMessage(*statement, session);
1981
finalizeTransactionMessage(*transaction, session);
1983
(void) replication_services.pushTransactionMessage(session, *transaction);
1985
cleanupTransactionMessage(transaction, session);
1988
void TransactionServices::alterSchema(Session::reference session,
1989
const message::schema::shared_ptr &old_schema,
1990
const message::Schema &new_schema)
1992
ReplicationServices &replication_services= ReplicationServices::singleton();
1993
if (! replication_services.isActive())
1996
message::Transaction *transaction= getActiveTransactionMessage(session);
1997
message::Statement *statement= transaction->add_statement();
1999
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2002
* Construct the specialized AlterSchemaStatement message and attach
2003
* it to the generic Statement message
2005
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2007
message::Schema *before= alter_schema_statement->mutable_before();
2008
message::Schema *after= alter_schema_statement->mutable_after();
2010
*before= *old_schema;
2013
finalizeStatementMessage(*statement, session);
2015
finalizeTransactionMessage(*transaction, session);
2017
(void) replication_services.pushTransactionMessage(session, *transaction);
2019
cleanupTransactionMessage(transaction, session);
2022
void TransactionServices::dropTable(Session::reference session,
2023
const identifier::Table &table,
2078
drop_schema_statement->set_schema_name(schema_name);
2080
finalizeStatementMessage(*statement, in_session);
2082
finalizeTransactionMessage(*transaction, in_session);
2084
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2086
cleanupTransactionMessage(transaction, in_session);
2089
void TransactionServices::dropTable(Session *in_session,
2090
const string &schema_name,
2091
const string &table_name,
2024
2092
bool if_exists)
2026
2094
ReplicationServices &replication_services= ReplicationServices::singleton();
2027
2095
if (! replication_services.isActive())
2030
message::Transaction *transaction= getActiveTransactionMessage(session);
2098
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2031
2099
message::Statement *statement= transaction->add_statement();
2033
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2101
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2036
2104
* Construct the specialized DropTableStatement message and attach
2074
2141
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2076
2143
string schema_name;
2077
(void) table.getShare()->getSchemaName(schema_name);
2144
(void) in_table->getShare()->getSchemaName(schema_name);
2078
2145
string table_name;
2079
(void) table.getShare()->getTableName(table_name);
2146
(void) in_table->getShare()->getTableName(table_name);
2081
2148
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
2149
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2084
finalizeStatementMessage(*statement, session);
2151
finalizeStatementMessage(*statement, in_session);
2086
finalizeTransactionMessage(*transaction, session);
2153
finalizeTransactionMessage(*transaction, in_session);
2088
(void) replication_services.pushTransactionMessage(session, *transaction);
2155
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2090
cleanupTransactionMessage(transaction, session);
2157
cleanupTransactionMessage(transaction, in_session);
2093
void TransactionServices::rawStatement(Session::reference session,
2094
const string &query)
2160
void TransactionServices::rawStatement(Session *in_session, const string &query)
2096
2162
ReplicationServices &replication_services= ReplicationServices::singleton();
2097
2163
if (! replication_services.isActive())
2100
message::Transaction *transaction= getActiveTransactionMessage(session);
2166
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2101
2167
message::Statement *statement= transaction->add_statement();
2103
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2169
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2104
2170
statement->set_sql(query);
2105
finalizeStatementMessage(*statement, session);
2171
finalizeStatementMessage(*statement, in_session);
2107
finalizeTransactionMessage(*transaction, session);
2173
finalizeTransactionMessage(*transaction, in_session);
2109
(void) replication_services.pushTransactionMessage(session, *transaction);
2175
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2111
cleanupTransactionMessage(transaction, session);
2177
cleanupTransactionMessage(transaction, in_session);
2114
int TransactionServices::sendEvent(Session::reference session,
2115
const message::Event &event)
2180
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2117
2182
ReplicationServices &replication_services= ReplicationServices::singleton();
2118
2183
if (! replication_services.isActive())