591
586
if (resource->participatesInXaTransaction())
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
595
590
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
598
593
else if (normal_transaction)
600
session->status_var.ha_commit_count++;
595
session.status_var.ha_commit_count++;
603
598
else if (resource->participatesInSqlTransaction())
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
607
602
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
610
605
else if (normal_transaction)
612
session->status_var.ha_commit_count++;
607
session.status_var.ha_commit_count++;
615
610
resource_context->reset(); /* keep it conveniently zero-filled */
618
613
if (is_real_trans)
619
session->transaction.xid_state.xid.null();
614
session.transaction.xid_state.xid.null();
621
616
if (normal_transaction)
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
635
631
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
640
637
We must not rollback the normal transaction if a statement
641
638
transaction is pending.
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
646
643
if (resource_contexts.empty() == false)
997
998
* deleting transaction message when done with it.
999
1000
transaction= new (nothrow) message::Transaction();
1000
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1001
in_session->setTransactionMessage(transaction);
1001
initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
session.setTransactionMessage(transaction);
1002
1003
return transaction;
1005
1006
return transaction;
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1009
Session *in_session,
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
Session::reference session,
1010
1011
bool should_inc_trx_id)
1012
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1013
trx->set_server_id(in_session->getServerId());
1013
message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
trx->set_server_id(session.getServerId());
1015
1016
if (should_inc_trx_id)
1017
trx->set_transaction_id(getCurrentTransactionId(in_session));
1018
in_session->setXaId(0);
1018
trx->set_transaction_id(getCurrentTransactionId(session));
1023
/* trx and seg id will get set properly elsewhere */
1022
1024
trx->set_transaction_id(0);
1025
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1029
Session *in_session)
1031
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1032
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1036
Session *in_session)
1038
delete in_transaction;
1039
in_session->setStatementMessage(NULL);
1040
in_session->setTransactionMessage(NULL);
1041
in_session->setXaId(0);
1044
int TransactionServices::commitTransactionMessage(Session *in_session)
1027
trx->set_start_timestamp(session.getCurrentTimestamp());
1029
/* segment info may get set elsewhere as needed */
1030
transaction.set_segment_id(1);
1031
transaction.set_end_segment(true);
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
Session::const_reference session)
1037
message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(session.getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
Session::reference session)
1045
session.setStatementMessage(NULL);
1046
session.setTransactionMessage(NULL);
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1046
1052
ReplicationServices &replication_services= ReplicationServices::singleton();
1047
1053
if (! replication_services.isActive())
1050
/* If there is an active statement message, finalize it */
1051
message::Statement *statement= in_session->getStatementMessage();
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (session.getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
1064
message::Statement *statement= session.getStatementMessage();
1053
1066
if (statement != NULL)
1055
finalizeStatementMessage(*statement, in_session);
1058
return 0; /* No data modification occurred inside the transaction */
1060
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1062
finalizeTransactionMessage(*transaction, in_session);
1064
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1066
cleanupTransactionMessage(transaction, in_session);
1068
finalizeStatementMessage(*statement, session);
1071
message::Transaction* transaction= getActiveTransactionMessage(session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, session);
1085
finalizeTransactionMessage(*transaction, session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
cleanupTransactionMessage(transaction, session);
1068
1091
return static_cast<int>(result);
1071
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1072
message::Statement::Type in_type,
1073
Session *in_session)
1095
message::Statement::Type type,
1096
Session::const_reference session)
1075
statement.set_type(in_type);
1076
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1098
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1078
if (in_session->variables.replicate_query)
1079
statement.set_sql(in_session->getQueryString()->c_str());
1101
if (session.variables.replicate_query)
1102
statement.set_sql(session.getQueryString()->c_str());
1082
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1083
Session *in_session)
1106
Session::reference session)
1085
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1086
in_session->setStatementMessage(NULL);
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1109
session.setStatementMessage(NULL);
1089
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1091
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1092
1115
if (! replication_services.isActive())
1095
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1118
message::Transaction *transaction= getActiveTransactionMessage(session);
1098
1121
* OK, so there are two situations that we need to deal with here:
1116
1139
/* Remember the transaction ID so we can re-use it */
1117
1140
uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
uint32_t seg_id= transaction->segment_id();
1120
1144
* Clear the transaction, create a Rollback statement message,
1121
1145
* attach it to the transaction, and push it to replicators.
1123
1147
transaction->Clear();
1124
initTransactionMessage(*transaction, in_session, false);
1148
initTransactionMessage(*transaction, session, false);
1126
1150
/* Set the transaction ID to match the previous messages */
1127
1151
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
transaction->set_segment_id(seg_id);
1153
transaction->set_end_segment(true);
1129
1155
message::Statement *statement= transaction->add_statement();
1131
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1132
finalizeStatementMessage(*statement, in_session);
1157
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
finalizeStatementMessage(*statement, session);
1134
finalizeTransactionMessage(*transaction, in_session);
1160
finalizeTransactionMessage(*transaction, session);
1136
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1138
cleanupTransactionMessage(transaction, in_session);
1141
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1162
(void) replication_services.pushTransactionMessage(session, *transaction);
1165
cleanupTransactionMessage(transaction, session);
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1170
ReplicationServices &replication_services= ReplicationServices::singleton();
1171
if (! replication_services.isActive())
1174
message::Statement *current_statement= session.getStatementMessage();
1176
/* If we never added a Statement message, nothing to undo. */
1177
if (current_statement == NULL)
1181
* If the Statement has been segmented, then we've already pushed a portion
1182
* of this Statement's row changes through the replication stream and we
1183
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
* delete the current Statement message.
1186
bool is_segmented= false;
1188
switch (current_statement->type())
1190
case message::Statement::INSERT:
1191
if (current_statement->insert_data().segment_id() > 1)
1195
case message::Statement::UPDATE:
1196
if (current_statement->update_data().segment_id() > 1)
1200
case message::Statement::DELETE:
1201
if (current_statement->delete_data().segment_id() > 1)
1210
* Remove the Statement message we've been working with (same as
1211
* current_statement).
1213
message::Transaction *transaction= getActiveTransactionMessage(session);
1214
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
statements_in_txn= transaction->mutable_statement();
1216
statements_in_txn->RemoveLast();
1217
session.setStatementMessage(NULL);
1220
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
* an indicator to cancel the previous Statement message which should have
1222
* had its end_segment attribute set to false.
1226
current_statement= transaction->add_statement();
1227
initStatementMessage(*current_statement,
1228
message::Statement::ROLLBACK_STATEMENT,
1230
finalizeStatementMessage(*current_statement, session);
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
message::Transaction *transaction)
1237
uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
uint32_t seg_id= transaction->segment_id();
1240
transaction->set_end_segment(false);
1241
commitTransactionMessage(session);
1242
transaction= getActiveTransactionMessage(session, false);
1244
/* Set the transaction ID to match the previous messages */
1245
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
transaction->set_segment_id(seg_id + 1);
1247
transaction->set_end_segment(true);
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1143
1254
uint32_t *next_segment_id)
1145
message::Statement *statement= in_session->getStatementMessage();
1256
message::Statement *statement= session.getStatementMessage();
1146
1257
message::Transaction *transaction= NULL;
1149
* Check the type for the current Statement message, if it is anything
1150
* other then INSERT we need to call finalize, this will ensure a
1151
* new InsertStatement is created. If it is of type INSERT check
1152
* what table the INSERT belongs to, if it is a different table
1153
* call finalize, so a new InsertStatement can be created.
1260
* If statement is NULL, this is a new statement.
1261
* If statement is NOT NULL, this a continuation of the same statement.
1262
* This is because autocommitOrRollback() finalizes the statement so that
1263
* we guarantee only one Statement message per statement (i.e., we no longer
1264
* share a single GPB message for multiple statements).
1155
if (statement != NULL && statement->type() != message::Statement::INSERT)
1157
finalizeStatementMessage(*statement, in_session);
1158
statement= in_session->getStatementMessage();
1160
else if (statement != NULL)
1162
transaction= getActiveTransactionMessage(in_session);
1266
if (statement == NULL)
1268
transaction= getActiveTransactionMessage(session);
1270
if (static_cast<size_t>(transaction->ByteSize()) >=
1271
transaction_message_threshold)
1273
transaction= segmentTransactionMessage(session, transaction);
1276
statement= transaction->add_statement();
1277
setInsertHeader(*statement, session, table);
1278
session.setStatementMessage(statement);
1282
transaction= getActiveTransactionMessage(session);
1165
1285
* If we've passed our threshold for the statement size (possible for
1166
1286
* a bulk insert), we'll finalize the Statement and Transaction (doing
1167
1287
* the Transaction will keep it from getting huge).
1169
1289
if (static_cast<size_t>(transaction->ByteSize()) >=
1170
in_session->variables.transaction_message_threshold)
1290
transaction_message_threshold)
1172
1292
/* Remember the transaction ID so we can re-use it */
1173
1293
uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
uint32_t seg_id= transaction->segment_id();
1175
1296
message::InsertData *current_data= statement->mutable_insert_data();
1177
1298
/* Caller should use this value when adding a new record */
1178
1299
*next_segment_id= current_data->segment_id() + 1;
1180
1301
current_data->set_end_segment(false);
1302
transaction->set_end_segment(false);
1183
1305
* Send the trx message to replicators after finalizing the
1184
1306
* statement and transaction. This will also set the Transaction
1185
1307
* and Statement objects in Session to NULL.
1187
commitTransactionMessage(in_session);
1309
commitTransactionMessage(session);
1190
1312
* Statement and Transaction should now be NULL, so new ones will get
1191
1313
* created. We reuse the transaction id since we are segmenting
1192
1314
* one transaction.
1194
statement= in_session->getStatementMessage();
1195
transaction= getActiveTransactionMessage(in_session, false);
1316
transaction= getActiveTransactionMessage(session, false);
1196
1317
assert(transaction != NULL);
1319
statement= transaction->add_statement();
1320
setInsertHeader(*statement, session, table);
1321
session.setStatementMessage(statement);
1198
1323
/* Set the transaction ID to match the previous messages */
1199
1324
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
transaction->set_segment_id(seg_id + 1);
1326
transaction->set_end_segment(true);
1203
const message::InsertHeader &insert_header= statement->insert_header();
1204
string old_table_name= insert_header.table_metadata().table_name();
1206
string current_table_name;
1207
(void) in_table->getShare()->getTableName(current_table_name);
1209
if (current_table_name.compare(old_table_name))
1211
finalizeStatementMessage(*statement, in_session);
1212
statement= in_session->getStatementMessage();
1216
/* carry forward the existing segment id */
1217
const message::InsertData ¤t_data= statement->insert_data();
1218
*next_segment_id= current_data.segment_id();
1331
* Continuation of the same statement. Carry forward the existing
1334
const message::InsertData ¤t_data= statement->insert_data();
1335
*next_segment_id= current_data.segment_id();
1223
if (statement == NULL)
1226
* Transaction will be non-NULL only if we had to segment it due to
1227
* transaction size above.
1229
if (transaction == NULL)
1230
transaction= getActiveTransactionMessage(in_session);
1233
* Transaction message initialized and set, but no statement created
1234
* yet. We construct one and initialize it, here, then return the
1235
* message after attaching the new Statement message pointer to the
1236
* Session for easy retrieval later...
1238
statement= transaction->add_statement();
1239
setInsertHeader(*statement, in_session, in_table);
1240
in_session->setStatementMessage(statement);
1242
1339
return *statement;
1245
1342
void TransactionServices::setInsertHeader(message::Statement &statement,
1246
Session *in_session,
1343
Session::const_reference session,
1249
initStatementMessage(statement, message::Statement::INSERT, in_session);
1346
initStatementMessage(statement, message::Statement::INSERT, session);
1252
1349
* Now we construct the specialized InsertHeader message inside
1337
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1436
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1339
1438
const unsigned char *old_record,
1340
1439
const unsigned char *new_record,
1341
1440
uint32_t *next_segment_id)
1343
message::Statement *statement= in_session->getStatementMessage();
1442
message::Statement *statement= session.getStatementMessage();
1344
1443
message::Transaction *transaction= NULL;
1347
* Check the type for the current Statement message, if it is anything
1348
* other then UPDATE we need to call finalize, this will ensure a
1349
* new UpdateStatement is created. If it is of type UPDATE check
1350
* what table the UPDATE belongs to, if it is a different table
1351
* call finalize, so a new UpdateStatement can be created.
1446
* If statement is NULL, this is a new statement.
1447
* If statement is NOT NULL, this a continuation of the same statement.
1448
* This is because autocommitOrRollback() finalizes the statement so that
1449
* we guarantee only one Statement message per statement (i.e., we no longer
1450
* share a single GPB message for multiple statements).
1353
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1452
if (statement == NULL)
1355
finalizeStatementMessage(*statement, in_session);
1356
statement= in_session->getStatementMessage();
1454
transaction= getActiveTransactionMessage(session);
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
transaction_message_threshold)
1459
transaction= segmentTransactionMessage(session, transaction);
1462
statement= transaction->add_statement();
1463
setUpdateHeader(*statement, session, table, old_record, new_record);
1464
session.setStatementMessage(statement);
1358
else if (statement != NULL)
1360
transaction= getActiveTransactionMessage(in_session);
1468
transaction= getActiveTransactionMessage(session);
1363
1471
* If we've passed our threshold for the statement size (possible for
1364
1472
* a bulk insert), we'll finalize the Statement and Transaction (doing
1365
1473
* the Transaction will keep it from getting huge).
1367
1475
if (static_cast<size_t>(transaction->ByteSize()) >=
1368
in_session->variables.transaction_message_threshold)
1476
transaction_message_threshold)
1370
1478
/* Remember the transaction ID so we can re-use it */
1371
1479
uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
uint32_t seg_id= transaction->segment_id();
1373
1482
message::UpdateData *current_data= statement->mutable_update_data();
1375
1484
/* Caller should use this value when adding a new record */
1376
1485
*next_segment_id= current_data->segment_id() + 1;
1378
1487
current_data->set_end_segment(false);
1488
transaction->set_end_segment(false);
1381
1491
* Send the trx message to replicators after finalizing the
1382
1492
* statement and transaction. This will also set the Transaction
1383
1493
* and Statement objects in Session to NULL.
1385
commitTransactionMessage(in_session);
1495
commitTransactionMessage(session);
1388
1498
* Statement and Transaction should now be NULL, so new ones will get
1389
1499
* created. We reuse the transaction id since we are segmenting
1390
1500
* one transaction.
1392
statement= in_session->getStatementMessage();
1393
transaction= getActiveTransactionMessage(in_session, false);
1502
transaction= getActiveTransactionMessage(session, false);
1394
1503
assert(transaction != NULL);
1505
statement= transaction->add_statement();
1506
setUpdateHeader(*statement, session, table, old_record, new_record);
1507
session.setStatementMessage(statement);
1396
1509
/* Set the transaction ID to match the previous messages */
1397
1510
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
transaction->set_segment_id(seg_id + 1);
1512
transaction->set_end_segment(true);
1401
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1403
/* carry forward the existing segment id */
1404
const message::UpdateData ¤t_data= statement->update_data();
1405
*next_segment_id= current_data.segment_id();
1409
finalizeStatementMessage(*statement, in_session);
1410
statement= in_session->getStatementMessage();
1517
* Continuation of the same statement. Carry forward the existing
1520
const message::UpdateData ¤t_data= statement->update_data();
1521
*next_segment_id= current_data.segment_id();
1415
if (statement == NULL)
1418
* Transaction will be non-NULL only if we had to segment it due to
1419
* transaction size above.
1421
if (transaction == NULL)
1422
transaction= getActiveTransactionMessage(in_session);
1425
* Transaction message initialized and set, but no statement created
1426
* yet. We construct one and initialize it, here, then return the
1427
* message after attaching the new Statement message pointer to the
1428
* Session for easy retrieval later...
1430
statement= transaction->add_statement();
1431
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1432
in_session->setStatementMessage(statement);
1434
1525
return *statement;
1437
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1439
const unsigned char *old_record,
1440
const unsigned char *new_record)
1442
const message::UpdateHeader &update_header= statement.update_header();
1443
string old_table_name= update_header.table_metadata().table_name();
1445
string current_table_name;
1446
(void) in_table->getShare()->getTableName(current_table_name);
1447
if (current_table_name.compare(old_table_name))
1453
/* Compare the set fields in the existing UpdateHeader and see if they
1454
* match the updated fields in the new record, if they do not we must
1455
* create a new UpdateHeader
1457
size_t num_set_fields= update_header.set_field_metadata_size();
1459
Field *current_field;
1460
Field **table_fields= in_table->getFields();
1461
in_table->setReadSet();
1463
size_t num_calculated_updated_fields= 0;
1465
while ((current_field= *table_fields++) != NULL)
1467
if (num_calculated_updated_fields > num_set_fields)
1472
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1474
/* check that this field exists in the UpdateHeader record */
1477
for (size_t x= 0; x < num_set_fields; ++x)
1479
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1480
string name= field_metadata.name();
1481
if (name.compare(current_field->field_name) == 0)
1484
++num_calculated_updated_fields;
1495
if ((num_calculated_updated_fields == num_set_fields) && found)
1506
1528
void TransactionServices::setUpdateHeader(message::Statement &statement,
1507
Session *in_session,
1529
Session::const_reference session,
1509
1531
const unsigned char *old_record,
1510
1532
const unsigned char *new_record)
1512
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1534
initStatementMessage(statement, message::Statement::UPDATE, session);
1515
1537
* Now we construct the specialized UpdateHeader message inside
1685
1707
return isUpdated;
1688
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1710
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1690
1712
uint32_t *next_segment_id)
1692
message::Statement *statement= in_session->getStatementMessage();
1714
message::Statement *statement= session.getStatementMessage();
1693
1715
message::Transaction *transaction= NULL;
1696
* Check the type for the current Statement message, if it is anything
1697
* other then DELETE we need to call finalize, this will ensure a
1698
* new DeleteStatement is created. If it is of type DELETE check
1699
* what table the DELETE belongs to, if it is a different table
1700
* call finalize, so a new DeleteStatement can be created.
1718
* If statement is NULL, this is a new statement.
1719
* If statement is NOT NULL, this a continuation of the same statement.
1720
* This is because autocommitOrRollback() finalizes the statement so that
1721
* we guarantee only one Statement message per statement (i.e., we no longer
1722
* share a single GPB message for multiple statements).
1702
if (statement != NULL && statement->type() != message::Statement::DELETE)
1724
if (statement == NULL)
1704
finalizeStatementMessage(*statement, in_session);
1705
statement= in_session->getStatementMessage();
1726
transaction= getActiveTransactionMessage(session);
1728
if (static_cast<size_t>(transaction->ByteSize()) >=
1729
transaction_message_threshold)
1731
transaction= segmentTransactionMessage(session, transaction);
1734
statement= transaction->add_statement();
1735
setDeleteHeader(*statement, session, table);
1736
session.setStatementMessage(statement);
1707
else if (statement != NULL)
1709
transaction= getActiveTransactionMessage(in_session);
1740
transaction= getActiveTransactionMessage(session);
1712
1743
* If we've passed our threshold for the statement size (possible for
1713
1744
* a bulk insert), we'll finalize the Statement and Transaction (doing
1714
1745
* the Transaction will keep it from getting huge).
1716
1747
if (static_cast<size_t>(transaction->ByteSize()) >=
1717
in_session->variables.transaction_message_threshold)
1748
transaction_message_threshold)
1719
1750
/* Remember the transaction ID so we can re-use it */
1720
1751
uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
uint32_t seg_id= transaction->segment_id();
1722
1754
message::DeleteData *current_data= statement->mutable_delete_data();
1724
1756
/* Caller should use this value when adding a new record */
1725
1757
*next_segment_id= current_data->segment_id() + 1;
1727
1759
current_data->set_end_segment(false);
1760
transaction->set_end_segment(false);
1730
1763
* Send the trx message to replicators after finalizing the
1731
1764
* statement and transaction. This will also set the Transaction
1732
1765
* and Statement objects in Session to NULL.
1734
commitTransactionMessage(in_session);
1767
commitTransactionMessage(session);
1737
1770
* Statement and Transaction should now be NULL, so new ones will get
1738
1771
* created. We reuse the transaction id since we are segmenting
1739
1772
* one transaction.
1741
statement= in_session->getStatementMessage();
1742
transaction= getActiveTransactionMessage(in_session, false);
1774
transaction= getActiveTransactionMessage(session, false);
1743
1775
assert(transaction != NULL);
1777
statement= transaction->add_statement();
1778
setDeleteHeader(*statement, session, table);
1779
session.setStatementMessage(statement);
1745
1781
/* Set the transaction ID to match the previous messages */
1746
1782
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
transaction->set_segment_id(seg_id + 1);
1784
transaction->set_end_segment(true);
1750
const message::DeleteHeader &delete_header= statement->delete_header();
1751
string old_table_name= delete_header.table_metadata().table_name();
1753
string current_table_name;
1754
(void) in_table->getShare()->getTableName(current_table_name);
1755
if (current_table_name.compare(old_table_name))
1757
finalizeStatementMessage(*statement, in_session);
1758
statement= in_session->getStatementMessage();
1762
/* carry forward the existing segment id */
1763
const message::DeleteData ¤t_data= statement->delete_data();
1764
*next_segment_id= current_data.segment_id();
1789
* Continuation of the same statement. Carry forward the existing
1792
const message::DeleteData ¤t_data= statement->delete_data();
1793
*next_segment_id= current_data.segment_id();
1769
if (statement == NULL)
1772
* Transaction will be non-NULL only if we had to segment it due to
1773
* transaction size above.
1775
if (transaction == NULL)
1776
transaction= getActiveTransactionMessage(in_session);
1779
* Transaction message initialized and set, but no statement created
1780
* yet. We construct one and initialize it, here, then return the
1781
* message after attaching the new Statement message pointer to the
1782
* Session for easy retrieval later...
1784
statement= transaction->add_statement();
1785
setDeleteHeader(*statement, in_session, in_table);
1786
in_session->setStatementMessage(statement);
1788
1797
return *statement;
1791
1800
void TransactionServices::setDeleteHeader(message::Statement &statement,
1792
Session *in_session,
1801
Session::const_reference session,
1795
initStatementMessage(statement, message::Statement::DELETE, in_session);
1804
initStatementMessage(statement, message::Statement::DELETE, session);
1798
1807
* Now we construct the specialized DeleteHeader message inside
1890
* Template for removing Statement records of different types.
1892
* The code for removing records from different Statement message types
1893
* is identical except for the class types that are embedded within the
1896
* There are 3 scenarios we need to look for:
1897
* - We've been asked to remove more records than exist in the Statement
1898
* - We've been asked to remove less records than exist in the Statement
1899
* - We've been asked to remove ALL records that exist in the Statement
1901
* If we are removing ALL records, then effectively we would be left with
1902
* an empty Statement message, so we should just remove it and clean up
1903
* message pointers in the Session object.
1905
template <class DataType, class RecordType>
1906
static bool removeStatementRecordsWithType(Session *session,
1910
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1912
/* If there aren't enough records to remove 'count' of them, error. */
1913
if (num_avail_recs < count)
1917
* If we are removing all of the data records, we'll just remove this
1918
* entire Statement message.
1920
if (num_avail_recs == count)
1922
message::Transaction *transaction= session->getTransactionMessage();
1923
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1924
statements->RemoveLast();
1927
* Now need to set the Session Statement pointer to either the previous
1928
* Statement, or NULL if there isn't one.
1930
if (statements->size() == 0)
1932
session->setStatementMessage(NULL);
1937
* There isn't a great way to get a pointer to the previous Statement
1938
* message using the RepeatedPtrField object, so we'll just get to it
1939
* using the Transaction message.
1941
int last_stmt_idx= transaction->statement_size() - 1;
1942
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1945
/* We only need to remove 'count' records */
1946
else if (num_avail_recs > count)
1948
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1950
records->RemoveLast();
1957
bool TransactionServices::removeStatementRecords(Session *session,
1960
ReplicationServices &replication_services= ReplicationServices::singleton();
1961
if (! replication_services.isActive())
1964
/* Get the most current Statement */
1965
message::Statement *statement= session->getStatementMessage();
1967
/* Make sure we have work to do */
1968
if (statement == NULL)
1973
switch (statement->type())
1975
case message::Statement::INSERT:
1977
message::InsertData *data= statement->mutable_insert_data();
1978
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1982
case message::Statement::UPDATE:
1984
message::UpdateData *data= statement->mutable_update_data();
1985
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1989
case message::Statement::DELETE: /* not sure if this one is possible... */
1991
message::DeleteData *data= statement->mutable_delete_data();
1992
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2005
void TransactionServices::createTable(Session *in_session,
1899
void TransactionServices::createTable(Session::reference session,
2006
1900
const message::Table &table)
2008
1902
ReplicationServices &replication_services= ReplicationServices::singleton();
2009
1903
if (! replication_services.isActive())
2012
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1906
message::Transaction *transaction= getActiveTransactionMessage(session);
2013
1907
message::Statement *statement= transaction->add_statement();
2015
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1909
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
2018
1912
* Construct the specialized CreateTableStatement message and attach
2080
1975
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2082
drop_schema_statement->set_schema_name(schema_name);
2084
finalizeStatementMessage(*statement, in_session);
2086
finalizeTransactionMessage(*transaction, in_session);
2088
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2090
cleanupTransactionMessage(transaction, in_session);
2093
void TransactionServices::dropTable(Session *in_session,
2094
const string &schema_name,
2095
const string &table_name,
1977
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1979
finalizeStatementMessage(*statement, session);
1981
finalizeTransactionMessage(*transaction, session);
1983
(void) replication_services.pushTransactionMessage(session, *transaction);
1985
cleanupTransactionMessage(transaction, session);
1988
void TransactionServices::alterSchema(Session::reference session,
1989
const message::schema::shared_ptr &old_schema,
1990
const message::Schema &new_schema)
1992
ReplicationServices &replication_services= ReplicationServices::singleton();
1993
if (! replication_services.isActive())
1996
message::Transaction *transaction= getActiveTransactionMessage(session);
1997
message::Statement *statement= transaction->add_statement();
1999
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2002
* Construct the specialized AlterSchemaStatement message and attach
2003
* it to the generic Statement message
2005
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2007
message::Schema *before= alter_schema_statement->mutable_before();
2008
message::Schema *after= alter_schema_statement->mutable_after();
2010
*before= *old_schema;
2013
finalizeStatementMessage(*statement, session);
2015
finalizeTransactionMessage(*transaction, session);
2017
(void) replication_services.pushTransactionMessage(session, *transaction);
2019
cleanupTransactionMessage(transaction, session);
2022
void TransactionServices::dropTable(Session::reference session,
2023
const identifier::Table &table,
2096
2024
bool if_exists)
2098
2026
ReplicationServices &replication_services= ReplicationServices::singleton();
2099
2027
if (! replication_services.isActive())
2102
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2030
message::Transaction *transaction= getActiveTransactionMessage(session);
2103
2031
message::Statement *statement= transaction->add_statement();
2105
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2033
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2108
2036
* Construct the specialized DropTableStatement message and attach
2145
2074
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2147
2076
string schema_name;
2148
(void) in_table->getShare()->getSchemaName(schema_name);
2077
(void) table.getShare()->getSchemaName(schema_name);
2149
2078
string table_name;
2150
(void) in_table->getShare()->getTableName(table_name);
2079
(void) table.getShare()->getTableName(table_name);
2152
2081
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2153
2082
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2155
finalizeStatementMessage(*statement, in_session);
2084
finalizeStatementMessage(*statement, session);
2157
finalizeTransactionMessage(*transaction, in_session);
2086
finalizeTransactionMessage(*transaction, session);
2159
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2088
(void) replication_services.pushTransactionMessage(session, *transaction);
2161
cleanupTransactionMessage(transaction, in_session);
2090
cleanupTransactionMessage(transaction, session);
2164
void TransactionServices::rawStatement(Session *in_session, const string &query)
2093
void TransactionServices::rawStatement(Session::reference session,
2094
const string &query)
2166
2096
ReplicationServices &replication_services= ReplicationServices::singleton();
2167
2097
if (! replication_services.isActive())
2170
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2100
message::Transaction *transaction= getActiveTransactionMessage(session);
2171
2101
message::Statement *statement= transaction->add_statement();
2173
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2103
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2174
2104
statement->set_sql(query);
2175
finalizeStatementMessage(*statement, in_session);
2105
finalizeStatementMessage(*statement, session);
2177
finalizeTransactionMessage(*transaction, in_session);
2107
finalizeTransactionMessage(*transaction, session);
2179
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2109
(void) replication_services.pushTransactionMessage(session, *transaction);
2181
cleanupTransactionMessage(transaction, in_session);
2111
cleanupTransactionMessage(transaction, session);
2184
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2114
int TransactionServices::sendEvent(Session::reference session,
2115
const message::Event &event)
2186
2117
ReplicationServices &replication_services= ReplicationServices::singleton();
2187
2118
if (! replication_services.isActive())