586
591
if (resource->participatesInXaTransaction())
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
590
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
593
598
else if (normal_transaction)
595
session.status_var.ha_commit_count++;
600
session->status_var.ha_commit_count++;
598
603
else if (resource->participatesInSqlTransaction())
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
602
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
605
610
else if (normal_transaction)
607
session.status_var.ha_commit_count++;
612
session->status_var.ha_commit_count++;
610
615
resource_context->reset(); /* keep it conveniently zero-filled */
613
618
if (is_real_trans)
614
session.transaction.xid_state.xid.null();
619
session->transaction.xid_state.xid.null();
616
621
if (normal_transaction)
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
631
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
637
640
We must not rollback the normal transaction if a statement
638
641
transaction is pending.
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
643
646
if (resource_contexts.empty() == false)
998
997
* deleting transaction message when done with it.
1000
999
transaction= new (nothrow) message::Transaction();
1001
initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
session.setTransactionMessage(transaction);
1000
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1001
in_session->setTransactionMessage(transaction);
1003
1002
return transaction;
1006
1005
return transaction;
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
Session::reference session,
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1009
Session *in_session,
1011
1010
bool should_inc_trx_id)
1013
message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
trx->set_server_id(session.getServerId());
1012
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1013
trx->set_server_id(in_session->getServerId());
1016
1015
if (should_inc_trx_id)
1018
trx->set_transaction_id(getCurrentTransactionId(session));
1017
trx->set_transaction_id(getCurrentTransactionId(in_session));
1018
in_session->setXaId(0);
1023
/* trx and seg id will get set properly elsewhere */
1024
1022
trx->set_transaction_id(0);
1027
trx->set_start_timestamp(session.getCurrentTimestamp());
1029
/* segment info may get set elsewhere as needed */
1030
transaction.set_segment_id(1);
1031
transaction.set_end_segment(true);
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
Session::const_reference session)
1037
message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(session.getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
Session::reference session)
1045
session.setStatementMessage(NULL);
1046
session.setTransactionMessage(NULL);
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1025
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1029
Session *in_session)
1031
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1032
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1036
Session *in_session)
1038
delete in_transaction;
1039
in_session->setStatementMessage(NULL);
1040
in_session->setTransactionMessage(NULL);
1041
in_session->setXaId(0);
1044
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
1046
ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1047
if (! replication_services.isActive())
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (session.getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
1064
message::Statement *statement= session.getStatementMessage();
1050
/* If there is an active statement message, finalize it */
1051
message::Statement *statement= in_session->getStatementMessage();
1066
1053
if (statement != NULL)
1068
finalizeStatementMessage(*statement, session);
1071
message::Transaction* transaction= getActiveTransactionMessage(session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, session);
1085
finalizeTransactionMessage(*transaction, session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
cleanupTransactionMessage(transaction, session);
1055
finalizeStatementMessage(*statement, in_session);
1058
return 0; /* No data modification occurred inside the transaction */
1060
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1062
finalizeTransactionMessage(*transaction, in_session);
1064
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1066
cleanupTransactionMessage(transaction, in_session);
1091
1068
return static_cast<int>(result);
1094
1071
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
message::Statement::Type type,
1096
Session::const_reference session)
1072
message::Statement::Type in_type,
1073
Session *in_session)
1098
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1101
if (session.variables.replicate_query)
1102
statement.set_sql(session.getQueryString()->c_str());
1075
statement.set_type(in_type);
1076
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1105
1079
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session::reference session)
1080
Session *in_session)
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1109
session.setStatementMessage(NULL);
1082
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1083
in_session->setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1086
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
1088
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1089
if (! replication_services.isActive())
1118
message::Transaction *transaction= getActiveTransactionMessage(session);
1092
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1121
1095
* OK, so there are two situations that we need to deal with here:
1139
1113
/* Remember the transaction ID so we can re-use it */
1140
1114
uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
uint32_t seg_id= transaction->segment_id();
1144
1117
* Clear the transaction, create a Rollback statement message,
1145
1118
* attach it to the transaction, and push it to replicators.
1147
1120
transaction->Clear();
1148
initTransactionMessage(*transaction, session, false);
1121
initTransactionMessage(*transaction, in_session, false);
1150
1123
/* Set the transaction ID to match the previous messages */
1151
1124
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
transaction->set_segment_id(seg_id);
1153
transaction->set_end_segment(true);
1155
1126
message::Statement *statement= transaction->add_statement();
1157
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
finalizeStatementMessage(*statement, session);
1128
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1129
finalizeStatementMessage(*statement, in_session);
1160
finalizeTransactionMessage(*transaction, session);
1131
finalizeTransactionMessage(*transaction, in_session);
1162
(void) replication_services.pushTransactionMessage(session, *transaction);
1165
cleanupTransactionMessage(transaction, session);
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1170
ReplicationServices &replication_services= ReplicationServices::singleton();
1171
if (! replication_services.isActive())
1174
message::Statement *current_statement= session.getStatementMessage();
1176
/* If we never added a Statement message, nothing to undo. */
1177
if (current_statement == NULL)
1181
* If the Statement has been segmented, then we've already pushed a portion
1182
* of this Statement's row changes through the replication stream and we
1183
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
* delete the current Statement message.
1186
bool is_segmented= false;
1188
switch (current_statement->type())
1190
case message::Statement::INSERT:
1191
if (current_statement->insert_data().segment_id() > 1)
1195
case message::Statement::UPDATE:
1196
if (current_statement->update_data().segment_id() > 1)
1200
case message::Statement::DELETE:
1201
if (current_statement->delete_data().segment_id() > 1)
1210
* Remove the Statement message we've been working with (same as
1211
* current_statement).
1213
message::Transaction *transaction= getActiveTransactionMessage(session);
1214
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
statements_in_txn= transaction->mutable_statement();
1216
statements_in_txn->RemoveLast();
1217
session.setStatementMessage(NULL);
1220
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
* an indicator to cancel the previous Statement message which should have
1222
* had its end_segment attribute set to false.
1226
current_statement= transaction->add_statement();
1227
initStatementMessage(*current_statement,
1228
message::Statement::ROLLBACK_STATEMENT,
1230
finalizeStatementMessage(*current_statement, session);
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
message::Transaction *transaction)
1237
uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
uint32_t seg_id= transaction->segment_id();
1240
transaction->set_end_segment(false);
1241
commitTransactionMessage(session);
1242
transaction= getActiveTransactionMessage(session, false);
1244
/* Set the transaction ID to match the previous messages */
1245
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
transaction->set_segment_id(seg_id + 1);
1247
transaction->set_end_segment(true);
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1133
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1135
cleanupTransactionMessage(transaction, in_session);
1138
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1254
1140
uint32_t *next_segment_id)
1256
message::Statement *statement= session.getStatementMessage();
1142
message::Statement *statement= in_session->getStatementMessage();
1257
1143
message::Transaction *transaction= NULL;
1260
* If statement is NULL, this is a new statement.
1261
* If statement is NOT NULL, this a continuation of the same statement.
1262
* This is because autocommitOrRollback() finalizes the statement so that
1263
* we guarantee only one Statement message per statement (i.e., we no longer
1264
* share a single GPB message for multiple statements).
1146
* Check the type for the current Statement message, if it is anything
1147
* other then INSERT we need to call finalize, this will ensure a
1148
* new InsertStatement is created. If it is of type INSERT check
1149
* what table the INSERT belongs to, if it is a different table
1150
* call finalize, so a new InsertStatement can be created.
1266
if (statement == NULL)
1268
transaction= getActiveTransactionMessage(session);
1270
if (static_cast<size_t>(transaction->ByteSize()) >=
1271
transaction_message_threshold)
1273
transaction= segmentTransactionMessage(session, transaction);
1276
statement= transaction->add_statement();
1277
setInsertHeader(*statement, session, table);
1278
session.setStatementMessage(statement);
1282
transaction= getActiveTransactionMessage(session);
1152
if (statement != NULL && statement->type() != message::Statement::INSERT)
1154
finalizeStatementMessage(*statement, in_session);
1155
statement= in_session->getStatementMessage();
1157
else if (statement != NULL)
1159
transaction= getActiveTransactionMessage(in_session);
1285
1162
* If we've passed our threshold for the statement size (possible for
1286
1163
* a bulk insert), we'll finalize the Statement and Transaction (doing
1287
1164
* the Transaction will keep it from getting huge).
1289
1166
if (static_cast<size_t>(transaction->ByteSize()) >=
1290
transaction_message_threshold)
1167
in_session->variables.transaction_message_threshold)
1292
1169
/* Remember the transaction ID so we can re-use it */
1293
1170
uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
uint32_t seg_id= transaction->segment_id();
1296
1172
message::InsertData *current_data= statement->mutable_insert_data();
1298
1174
/* Caller should use this value when adding a new record */
1299
1175
*next_segment_id= current_data->segment_id() + 1;
1301
1177
current_data->set_end_segment(false);
1302
transaction->set_end_segment(false);
1305
1180
* Send the trx message to replicators after finalizing the
1306
1181
* statement and transaction. This will also set the Transaction
1307
1182
* and Statement objects in Session to NULL.
1309
commitTransactionMessage(session);
1184
commitTransactionMessage(in_session);
1312
1187
* Statement and Transaction should now be NULL, so new ones will get
1313
1188
* created. We reuse the transaction id since we are segmenting
1314
1189
* one transaction.
1316
transaction= getActiveTransactionMessage(session, false);
1191
statement= in_session->getStatementMessage();
1192
transaction= getActiveTransactionMessage(in_session, false);
1317
1193
assert(transaction != NULL);
1319
statement= transaction->add_statement();
1320
setInsertHeader(*statement, session, table);
1321
session.setStatementMessage(statement);
1323
1195
/* Set the transaction ID to match the previous messages */
1324
1196
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
transaction->set_segment_id(seg_id + 1);
1326
transaction->set_end_segment(true);
1331
* Continuation of the same statement. Carry forward the existing
1334
const message::InsertData ¤t_data= statement->insert_data();
1335
*next_segment_id= current_data.segment_id();
1200
const message::InsertHeader &insert_header= statement->insert_header();
1201
string old_table_name= insert_header.table_metadata().table_name();
1203
string current_table_name;
1204
(void) in_table->getShare()->getTableName(current_table_name);
1206
if (current_table_name.compare(old_table_name))
1208
finalizeStatementMessage(*statement, in_session);
1209
statement= in_session->getStatementMessage();
1213
/* carry forward the existing segment id */
1214
const message::InsertData ¤t_data= statement->insert_data();
1215
*next_segment_id= current_data.segment_id();
1220
if (statement == NULL)
1223
* Transaction will be non-NULL only if we had to segment it due to
1224
* transaction size above.
1226
if (transaction == NULL)
1227
transaction= getActiveTransactionMessage(in_session);
1230
* Transaction message initialized and set, but no statement created
1231
* yet. We construct one and initialize it, here, then return the
1232
* message after attaching the new Statement message pointer to the
1233
* Session for easy retrieval later...
1235
statement= transaction->add_statement();
1236
setInsertHeader(*statement, in_session, in_table);
1237
in_session->setStatementMessage(statement);
1339
1239
return *statement;
1342
1242
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
Session::const_reference session,
1243
Session *in_session,
1346
initStatementMessage(statement, message::Statement::INSERT, session);
1246
initStatementMessage(statement, message::Statement::INSERT, in_session);
1349
1249
* Now we construct the specialized InsertHeader message inside
1436
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1334
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1438
1336
const unsigned char *old_record,
1439
1337
const unsigned char *new_record,
1440
1338
uint32_t *next_segment_id)
1442
message::Statement *statement= session.getStatementMessage();
1340
message::Statement *statement= in_session->getStatementMessage();
1443
1341
message::Transaction *transaction= NULL;
1446
* If statement is NULL, this is a new statement.
1447
* If statement is NOT NULL, this a continuation of the same statement.
1448
* This is because autocommitOrRollback() finalizes the statement so that
1449
* we guarantee only one Statement message per statement (i.e., we no longer
1450
* share a single GPB message for multiple statements).
1344
* Check the type for the current Statement message, if it is anything
1345
* other then UPDATE we need to call finalize, this will ensure a
1346
* new UpdateStatement is created. If it is of type UPDATE check
1347
* what table the UPDATE belongs to, if it is a different table
1348
* call finalize, so a new UpdateStatement can be created.
1452
if (statement == NULL)
1350
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1454
transaction= getActiveTransactionMessage(session);
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
transaction_message_threshold)
1459
transaction= segmentTransactionMessage(session, transaction);
1462
statement= transaction->add_statement();
1463
setUpdateHeader(*statement, session, table, old_record, new_record);
1464
session.setStatementMessage(statement);
1352
finalizeStatementMessage(*statement, in_session);
1353
statement= in_session->getStatementMessage();
1355
else if (statement != NULL)
1468
transaction= getActiveTransactionMessage(session);
1357
transaction= getActiveTransactionMessage(in_session);
1471
1360
* If we've passed our threshold for the statement size (possible for
1472
1361
* a bulk insert), we'll finalize the Statement and Transaction (doing
1473
1362
* the Transaction will keep it from getting huge).
1475
1364
if (static_cast<size_t>(transaction->ByteSize()) >=
1476
transaction_message_threshold)
1365
in_session->variables.transaction_message_threshold)
1478
1367
/* Remember the transaction ID so we can re-use it */
1479
1368
uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
uint32_t seg_id= transaction->segment_id();
1482
1370
message::UpdateData *current_data= statement->mutable_update_data();
1484
1372
/* Caller should use this value when adding a new record */
1485
1373
*next_segment_id= current_data->segment_id() + 1;
1487
1375
current_data->set_end_segment(false);
1488
transaction->set_end_segment(false);
1491
1378
* Send the trx message to replicators after finalizing the
1492
1379
* statement and transaction. This will also set the Transaction
1493
1380
* and Statement objects in Session to NULL.
1495
commitTransactionMessage(session);
1382
commitTransactionMessage(in_session);
1498
1385
* Statement and Transaction should now be NULL, so new ones will get
1499
1386
* created. We reuse the transaction id since we are segmenting
1500
1387
* one transaction.
1502
transaction= getActiveTransactionMessage(session, false);
1389
statement= in_session->getStatementMessage();
1390
transaction= getActiveTransactionMessage(in_session, false);
1503
1391
assert(transaction != NULL);
1505
statement= transaction->add_statement();
1506
setUpdateHeader(*statement, session, table, old_record, new_record);
1507
session.setStatementMessage(statement);
1509
1393
/* Set the transaction ID to match the previous messages */
1510
1394
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
transaction->set_segment_id(seg_id + 1);
1512
transaction->set_end_segment(true);
1517
* Continuation of the same statement. Carry forward the existing
1520
const message::UpdateData ¤t_data= statement->update_data();
1521
*next_segment_id= current_data.segment_id();
1398
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1400
/* carry forward the existing segment id */
1401
const message::UpdateData ¤t_data= statement->update_data();
1402
*next_segment_id= current_data.segment_id();
1406
finalizeStatementMessage(*statement, in_session);
1407
statement= in_session->getStatementMessage();
1412
if (statement == NULL)
1415
* Transaction will be non-NULL only if we had to segment it due to
1416
* transaction size above.
1418
if (transaction == NULL)
1419
transaction= getActiveTransactionMessage(in_session);
1422
* Transaction message initialized and set, but no statement created
1423
* yet. We construct one and initialize it, here, then return the
1424
* message after attaching the new Statement message pointer to the
1425
* Session for easy retrieval later...
1427
statement= transaction->add_statement();
1428
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1429
in_session->setStatementMessage(statement);
1525
1431
return *statement;
1434
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1436
const unsigned char *old_record,
1437
const unsigned char *new_record)
1439
const message::UpdateHeader &update_header= statement.update_header();
1440
string old_table_name= update_header.table_metadata().table_name();
1442
string current_table_name;
1443
(void) in_table->getShare()->getTableName(current_table_name);
1444
if (current_table_name.compare(old_table_name))
1450
/* Compare the set fields in the existing UpdateHeader and see if they
1451
* match the updated fields in the new record, if they do not we must
1452
* create a new UpdateHeader
1454
size_t num_set_fields= update_header.set_field_metadata_size();
1456
Field *current_field;
1457
Field **table_fields= in_table->getFields();
1458
in_table->setReadSet();
1460
size_t num_calculated_updated_fields= 0;
1462
while ((current_field= *table_fields++) != NULL)
1464
if (num_calculated_updated_fields > num_set_fields)
1469
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1471
/* check that this field exists in the UpdateHeader record */
1474
for (size_t x= 0; x < num_set_fields; ++x)
1476
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1477
string name= field_metadata.name();
1478
if (name.compare(current_field->field_name) == 0)
1481
++num_calculated_updated_fields;
1492
if ((num_calculated_updated_fields == num_set_fields) && found)
1528
1503
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
Session::const_reference session,
1504
Session *in_session,
1531
1506
const unsigned char *old_record,
1532
1507
const unsigned char *new_record)
1534
initStatementMessage(statement, message::Statement::UPDATE, session);
1509
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1537
1512
* Now we construct the specialized UpdateHeader message inside
1707
1682
return isUpdated;
1710
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1685
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1712
1687
uint32_t *next_segment_id)
1714
message::Statement *statement= session.getStatementMessage();
1689
message::Statement *statement= in_session->getStatementMessage();
1715
1690
message::Transaction *transaction= NULL;
1718
* If statement is NULL, this is a new statement.
1719
* If statement is NOT NULL, this a continuation of the same statement.
1720
* This is because autocommitOrRollback() finalizes the statement so that
1721
* we guarantee only one Statement message per statement (i.e., we no longer
1722
* share a single GPB message for multiple statements).
1693
* Check the type for the current Statement message, if it is anything
1694
* other then DELETE we need to call finalize, this will ensure a
1695
* new DeleteStatement is created. If it is of type DELETE check
1696
* what table the DELETE belongs to, if it is a different table
1697
* call finalize, so a new DeleteStatement can be created.
1724
if (statement == NULL)
1699
if (statement != NULL && statement->type() != message::Statement::DELETE)
1726
transaction= getActiveTransactionMessage(session);
1728
if (static_cast<size_t>(transaction->ByteSize()) >=
1729
transaction_message_threshold)
1731
transaction= segmentTransactionMessage(session, transaction);
1734
statement= transaction->add_statement();
1735
setDeleteHeader(*statement, session, table);
1736
session.setStatementMessage(statement);
1701
finalizeStatementMessage(*statement, in_session);
1702
statement= in_session->getStatementMessage();
1704
else if (statement != NULL)
1740
transaction= getActiveTransactionMessage(session);
1706
transaction= getActiveTransactionMessage(in_session);
1743
1709
* If we've passed our threshold for the statement size (possible for
1744
1710
* a bulk insert), we'll finalize the Statement and Transaction (doing
1745
1711
* the Transaction will keep it from getting huge).
1747
1713
if (static_cast<size_t>(transaction->ByteSize()) >=
1748
transaction_message_threshold)
1714
in_session->variables.transaction_message_threshold)
1750
1716
/* Remember the transaction ID so we can re-use it */
1751
1717
uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
uint32_t seg_id= transaction->segment_id();
1754
1719
message::DeleteData *current_data= statement->mutable_delete_data();
1756
1721
/* Caller should use this value when adding a new record */
1757
1722
*next_segment_id= current_data->segment_id() + 1;
1759
1724
current_data->set_end_segment(false);
1760
transaction->set_end_segment(false);
1763
1727
* Send the trx message to replicators after finalizing the
1764
1728
* statement and transaction. This will also set the Transaction
1765
1729
* and Statement objects in Session to NULL.
1767
commitTransactionMessage(session);
1731
commitTransactionMessage(in_session);
1770
1734
* Statement and Transaction should now be NULL, so new ones will get
1771
1735
* created. We reuse the transaction id since we are segmenting
1772
1736
* one transaction.
1774
transaction= getActiveTransactionMessage(session, false);
1738
statement= in_session->getStatementMessage();
1739
transaction= getActiveTransactionMessage(in_session, false);
1775
1740
assert(transaction != NULL);
1777
statement= transaction->add_statement();
1778
setDeleteHeader(*statement, session, table);
1779
session.setStatementMessage(statement);
1781
1742
/* Set the transaction ID to match the previous messages */
1782
1743
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
transaction->set_segment_id(seg_id + 1);
1784
transaction->set_end_segment(true);
1789
* Continuation of the same statement. Carry forward the existing
1792
const message::DeleteData ¤t_data= statement->delete_data();
1793
*next_segment_id= current_data.segment_id();
1747
const message::DeleteHeader &delete_header= statement->delete_header();
1748
string old_table_name= delete_header.table_metadata().table_name();
1750
string current_table_name;
1751
(void) in_table->getShare()->getTableName(current_table_name);
1752
if (current_table_name.compare(old_table_name))
1754
finalizeStatementMessage(*statement, in_session);
1755
statement= in_session->getStatementMessage();
1759
/* carry forward the existing segment id */
1760
const message::DeleteData ¤t_data= statement->delete_data();
1761
*next_segment_id= current_data.segment_id();
1766
if (statement == NULL)
1769
* Transaction will be non-NULL only if we had to segment it due to
1770
* transaction size above.
1772
if (transaction == NULL)
1773
transaction= getActiveTransactionMessage(in_session);
1776
* Transaction message initialized and set, but no statement created
1777
* yet. We construct one and initialize it, here, then return the
1778
* message after attaching the new Statement message pointer to the
1779
* Session for easy retrieval later...
1781
statement= transaction->add_statement();
1782
setDeleteHeader(*statement, in_session, in_table);
1783
in_session->setStatementMessage(statement);
1797
1785
return *statement;
1800
1788
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
Session::const_reference session,
1789
Session *in_session,
1804
initStatementMessage(statement, message::Statement::DELETE, session);
1792
initStatementMessage(statement, message::Statement::DELETE, in_session);
1807
1795
* Now we construct the specialized DeleteHeader message inside
1899
void TransactionServices::createTable(Session::reference session,
1887
* Template for removing Statement records of different types.
1889
* The code for removing records from different Statement message types
1890
* is identical except for the class types that are embedded within the
1893
* There are 3 scenarios we need to look for:
1894
* - We've been asked to remove more records than exist in the Statement
1895
* - We've been asked to remove less records than exist in the Statement
1896
* - We've been asked to remove ALL records that exist in the Statement
1898
* If we are removing ALL records, then effectively we would be left with
1899
* an empty Statement message, so we should just remove it and clean up
1900
* message pointers in the Session object.
1902
template <class DataType, class RecordType>
1903
static bool removeStatementRecordsWithType(Session *session,
1907
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1909
/* If there aren't enough records to remove 'count' of them, error. */
1910
if (num_avail_recs < count)
1914
* If we are removing all of the data records, we'll just remove this
1915
* entire Statement message.
1917
if (num_avail_recs == count)
1919
message::Transaction *transaction= session->getTransactionMessage();
1920
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1921
statements->RemoveLast();
1924
* Now need to set the Session Statement pointer to either the previous
1925
* Statement, or NULL if there isn't one.
1927
if (statements->size() == 0)
1929
session->setStatementMessage(NULL);
1934
* There isn't a great way to get a pointer to the previous Statement
1935
* message using the RepeatedPtrField object, so we'll just get to it
1936
* using the Transaction message.
1938
int last_stmt_idx= transaction->statement_size() - 1;
1939
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1942
/* We only need to remove 'count' records */
1943
else if (num_avail_recs > count)
1945
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1947
records->RemoveLast();
1954
bool TransactionServices::removeStatementRecords(Session *session,
1957
ReplicationServices &replication_services= ReplicationServices::singleton();
1958
if (! replication_services.isActive())
1961
/* Get the most current Statement */
1962
message::Statement *statement= session->getStatementMessage();
1964
/* Make sure we have work to do */
1965
if (statement == NULL)
1970
switch (statement->type())
1972
case message::Statement::INSERT:
1974
message::InsertData *data= statement->mutable_insert_data();
1975
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1979
case message::Statement::UPDATE:
1981
message::UpdateData *data= statement->mutable_update_data();
1982
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1986
case message::Statement::DELETE: /* not sure if this one is possible... */
1988
message::DeleteData *data= statement->mutable_delete_data();
1989
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2002
void TransactionServices::createTable(Session *in_session,
1900
2003
const message::Table &table)
1902
2005
ReplicationServices &replication_services= ReplicationServices::singleton();
1903
2006
if (! replication_services.isActive())
1906
message::Transaction *transaction= getActiveTransactionMessage(session);
2009
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1907
2010
message::Statement *statement= transaction->add_statement();
1909
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
2012
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1912
2015
* Construct the specialized CreateTableStatement message and attach
1975
2077
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1977
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1979
finalizeStatementMessage(*statement, session);
1981
finalizeTransactionMessage(*transaction, session);
1983
(void) replication_services.pushTransactionMessage(session, *transaction);
1985
cleanupTransactionMessage(transaction, session);
1988
void TransactionServices::alterSchema(Session::reference session,
1989
const message::schema::shared_ptr &old_schema,
1990
const message::Schema &new_schema)
1992
ReplicationServices &replication_services= ReplicationServices::singleton();
1993
if (! replication_services.isActive())
1996
message::Transaction *transaction= getActiveTransactionMessage(session);
1997
message::Statement *statement= transaction->add_statement();
1999
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2002
* Construct the specialized AlterSchemaStatement message and attach
2003
* it to the generic Statement message
2005
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2007
message::Schema *before= alter_schema_statement->mutable_before();
2008
message::Schema *after= alter_schema_statement->mutable_after();
2010
*before= *old_schema;
2013
finalizeStatementMessage(*statement, session);
2015
finalizeTransactionMessage(*transaction, session);
2017
(void) replication_services.pushTransactionMessage(session, *transaction);
2019
cleanupTransactionMessage(transaction, session);
2022
void TransactionServices::dropTable(Session::reference session,
2023
const identifier::Table &table,
2079
drop_schema_statement->set_schema_name(schema_name);
2081
finalizeStatementMessage(*statement, in_session);
2083
finalizeTransactionMessage(*transaction, in_session);
2085
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2087
cleanupTransactionMessage(transaction, in_session);
2090
void TransactionServices::dropTable(Session *in_session,
2091
const string &schema_name,
2092
const string &table_name,
2024
2093
bool if_exists)
2026
2095
ReplicationServices &replication_services= ReplicationServices::singleton();
2027
2096
if (! replication_services.isActive())
2030
message::Transaction *transaction= getActiveTransactionMessage(session);
2099
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2031
2100
message::Statement *statement= transaction->add_statement();
2033
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2102
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2036
2105
* Construct the specialized DropTableStatement message and attach
2074
2142
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2076
2144
string schema_name;
2077
(void) table.getShare()->getSchemaName(schema_name);
2145
(void) in_table->getShare()->getSchemaName(schema_name);
2078
2146
string table_name;
2079
(void) table.getShare()->getTableName(table_name);
2147
(void) in_table->getShare()->getTableName(table_name);
2081
2149
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
2150
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2084
finalizeStatementMessage(*statement, session);
2152
finalizeStatementMessage(*statement, in_session);
2086
finalizeTransactionMessage(*transaction, session);
2154
finalizeTransactionMessage(*transaction, in_session);
2088
(void) replication_services.pushTransactionMessage(session, *transaction);
2156
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2090
cleanupTransactionMessage(transaction, session);
2158
cleanupTransactionMessage(transaction, in_session);
2093
void TransactionServices::rawStatement(Session::reference session,
2094
const string &query)
2161
void TransactionServices::rawStatement(Session *in_session, const string &query)
2096
2163
ReplicationServices &replication_services= ReplicationServices::singleton();
2097
2164
if (! replication_services.isActive())
2100
message::Transaction *transaction= getActiveTransactionMessage(session);
2167
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2101
2168
message::Statement *statement= transaction->add_statement();
2103
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2170
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2104
2171
statement->set_sql(query);
2105
finalizeStatementMessage(*statement, session);
2172
finalizeStatementMessage(*statement, in_session);
2107
finalizeTransactionMessage(*transaction, session);
2174
finalizeTransactionMessage(*transaction, in_session);
2109
(void) replication_services.pushTransactionMessage(session, *transaction);
2176
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2111
cleanupTransactionMessage(transaction, session);
2178
cleanupTransactionMessage(transaction, in_session);
2114
int TransactionServices::sendEvent(Session::reference session,
2115
const message::Event &event)
2181
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2117
2183
ReplicationServices &replication_services= ReplicationServices::singleton();
2118
2184
if (! replication_services.isActive())