400
400
resource_context->setTransactionalStorageEngine(engine);
401
401
trans->no_2pc|= true;
403
if (session.transaction.xid_state.xid.is_null())
404
session.transaction.xid_state.xid.set(session.getQueryId());
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
406
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session.getResourceContext(monitored, 0)->isStarted())
407
if (! session->getResourceContext(monitored, 0)->isStarted())
408
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
412
plugin::MonitoredInTransaction *monitored,
413
413
plugin::TransactionalStorageEngine *engine,
414
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
419
419
if (resource_context->isStarted())
420
420
return; /* already registered, return */
422
session.server_status|= SERVER_STATUS_IN_TRANS;
422
session->server_status|= SERVER_STATUS_IN_TRANS;
424
424
trans->registerResource(resource_context);
453
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
if (session.getXaId() == 0)
458
if (session->getXaId() == 0)
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
463
return session.getXaId();
463
return session->getXaId();
466
int TransactionServices::commitTransaction(Session::reference session,
467
bool normal_transaction)
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
469
482
int error= 0, cookie= 0;
471
484
'all' means that this is either an explicit commit issued by
472
485
user, or an implicit commit issued by a DDL.
474
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
475
488
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
477
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
480
493
We must not commit the normal transaction if a statement
586
591
if (resource->participatesInXaTransaction())
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
590
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
593
598
else if (normal_transaction)
595
session.status_var.ha_commit_count++;
600
session->status_var.ha_commit_count++;
598
603
else if (resource->participatesInSqlTransaction())
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
602
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
605
610
else if (normal_transaction)
607
session.status_var.ha_commit_count++;
612
session->status_var.ha_commit_count++;
610
615
resource_context->reset(); /* keep it conveniently zero-filled */
613
618
if (is_real_trans)
614
session.transaction.xid_state.xid.null();
619
session->transaction.xid_state.xid.null();
616
621
if (normal_transaction)
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
631
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
637
640
We must not rollback the normal transaction if a statement
638
641
transaction is pending.
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
643
646
if (resource_contexts.empty() == false)
998
1003
* deleting transaction message when done with it.
1000
1005
transaction= new (nothrow) message::Transaction();
1001
initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
session.setTransactionMessage(transaction);
1006
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
in_session->setTransactionMessage(transaction);
1003
1008
return transaction;
1006
1011
return transaction;
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
Session::reference session,
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
Session *in_session,
1011
1016
bool should_inc_trx_id)
1013
message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
trx->set_server_id(session.getServerId());
1018
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
trx->set_server_id(in_session->getServerId());
1016
1021
if (should_inc_trx_id)
1018
trx->set_transaction_id(getCurrentTransactionId(session));
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1023
/* trx and seg id will get set properly elsewhere */
1024
1028
trx->set_transaction_id(0);
1027
trx->set_start_timestamp(session.getCurrentTimestamp());
1029
/* segment info may get set elsewhere as needed */
1030
transaction.set_segment_id(1);
1031
transaction.set_end_segment(true);
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
Session::const_reference session)
1037
message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(session.getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
Session::reference session)
1045
session.setStatementMessage(NULL);
1046
session.setTransactionMessage(NULL);
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1031
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
Session *in_session)
1037
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
Session *in_session)
1044
delete in_transaction;
1045
in_session->setStatementMessage(NULL);
1046
in_session->setTransactionMessage(NULL);
1047
in_session->setXaId(0);
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
1052
ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1053
if (! replication_services.isActive())
1079
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, session);
1081
cleanupTransactionMessage(transaction, in_session);
1085
finalizeTransactionMessage(*transaction, session);
1085
finalizeTransactionMessage(*transaction, in_session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1089
cleanupTransactionMessage(transaction, session);
1089
cleanupTransactionMessage(transaction, in_session);
1091
1091
return static_cast<int>(result);
1094
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
message::Statement::Type type,
1096
Session::const_reference session)
1095
message::Statement::Type in_type,
1096
Session *in_session)
1098
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1098
statement.set_type(in_type);
1099
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1101
if (session.variables.replicate_query)
1102
statement.set_sql(session.getQueryString()->c_str());
1101
if (in_session->variables.replicate_query)
1102
statement.set_sql(in_session->getQueryString()->c_str());
1105
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session::reference session)
1106
Session *in_session)
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1109
session.setStatementMessage(NULL);
1108
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
in_session->setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1115
if (! replication_services.isActive())
1118
message::Transaction *transaction= getActiveTransactionMessage(session);
1118
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1121
1121
* OK, so there are two situations that we need to deal with here:
1139
1139
/* Remember the transaction ID so we can re-use it */
1140
1140
uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
uint32_t seg_id= transaction->segment_id();
1144
1143
* Clear the transaction, create a Rollback statement message,
1145
1144
* attach it to the transaction, and push it to replicators.
1147
1146
transaction->Clear();
1148
initTransactionMessage(*transaction, session, false);
1147
initTransactionMessage(*transaction, in_session, false);
1150
1149
/* Set the transaction ID to match the previous messages */
1151
1150
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
transaction->set_segment_id(seg_id);
1153
transaction->set_end_segment(true);
1155
1152
message::Statement *statement= transaction->add_statement();
1157
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
finalizeStatementMessage(*statement, session);
1154
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
finalizeStatementMessage(*statement, in_session);
1160
finalizeTransactionMessage(*transaction, session);
1157
finalizeTransactionMessage(*transaction, in_session);
1162
(void) replication_services.pushTransactionMessage(session, *transaction);
1159
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1165
cleanupTransactionMessage(transaction, session);
1161
cleanupTransactionMessage(transaction, in_session);
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1170
1166
ReplicationServices &replication_services= ReplicationServices::singleton();
1171
1167
if (! replication_services.isActive())
1174
message::Statement *current_statement= session.getStatementMessage();
1170
message::Statement *current_statement= in_session->getStatementMessage();
1176
1172
/* If we never added a Statement message, nothing to undo. */
1177
1173
if (current_statement == NULL)
1226
1222
current_statement= transaction->add_statement();
1227
1223
initStatementMessage(*current_statement,
1228
1224
message::Statement::ROLLBACK_STATEMENT,
1230
finalizeStatementMessage(*current_statement, session);
1226
finalizeStatementMessage(*current_statement, in_session);
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
message::Transaction *transaction)
1237
uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
uint32_t seg_id= transaction->segment_id();
1240
transaction->set_end_segment(false);
1241
commitTransactionMessage(session);
1242
transaction= getActiveTransactionMessage(session, false);
1244
/* Set the transaction ID to match the previous messages */
1245
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
transaction->set_segment_id(seg_id + 1);
1247
transaction->set_end_segment(true);
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1254
1232
uint32_t *next_segment_id)
1256
message::Statement *statement= session.getStatementMessage();
1234
message::Statement *statement= in_session->getStatementMessage();
1257
1235
message::Transaction *transaction= NULL;
1260
* If statement is NULL, this is a new statement.
1261
* If statement is NOT NULL, this a continuation of the same statement.
1262
* This is because autocommitOrRollback() finalizes the statement so that
1263
* we guarantee only one Statement message per statement (i.e., we no longer
1264
* share a single GPB message for multiple statements).
1238
* Check the type for the current Statement message, if it is anything
1239
* other then INSERT we need to call finalize, this will ensure a
1240
* new InsertStatement is created. If it is of type INSERT check
1241
* what table the INSERT belongs to, if it is a different table
1242
* call finalize, so a new InsertStatement can be created.
1266
if (statement == NULL)
1268
transaction= getActiveTransactionMessage(session);
1270
if (static_cast<size_t>(transaction->ByteSize()) >=
1271
transaction_message_threshold)
1273
transaction= segmentTransactionMessage(session, transaction);
1276
statement= transaction->add_statement();
1277
setInsertHeader(*statement, session, table);
1278
session.setStatementMessage(statement);
1282
transaction= getActiveTransactionMessage(session);
1244
if (statement != NULL && statement->type() != message::Statement::INSERT)
1246
finalizeStatementMessage(*statement, in_session);
1247
statement= in_session->getStatementMessage();
1249
else if (statement != NULL)
1251
transaction= getActiveTransactionMessage(in_session);
1285
1254
* If we've passed our threshold for the statement size (possible for
1286
1255
* a bulk insert), we'll finalize the Statement and Transaction (doing
1287
1256
* the Transaction will keep it from getting huge).
1289
1258
if (static_cast<size_t>(transaction->ByteSize()) >=
1290
transaction_message_threshold)
1259
in_session->variables.transaction_message_threshold)
1292
1261
/* Remember the transaction ID so we can re-use it */
1293
1262
uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
uint32_t seg_id= transaction->segment_id();
1296
1264
message::InsertData *current_data= statement->mutable_insert_data();
1298
1266
/* Caller should use this value when adding a new record */
1299
1267
*next_segment_id= current_data->segment_id() + 1;
1301
1269
current_data->set_end_segment(false);
1302
transaction->set_end_segment(false);
1305
1272
* Send the trx message to replicators after finalizing the
1306
1273
* statement and transaction. This will also set the Transaction
1307
1274
* and Statement objects in Session to NULL.
1309
commitTransactionMessage(session);
1276
commitTransactionMessage(in_session);
1312
1279
* Statement and Transaction should now be NULL, so new ones will get
1313
1280
* created. We reuse the transaction id since we are segmenting
1314
1281
* one transaction.
1316
transaction= getActiveTransactionMessage(session, false);
1283
statement= in_session->getStatementMessage();
1284
transaction= getActiveTransactionMessage(in_session, false);
1317
1285
assert(transaction != NULL);
1319
statement= transaction->add_statement();
1320
setInsertHeader(*statement, session, table);
1321
session.setStatementMessage(statement);
1323
1287
/* Set the transaction ID to match the previous messages */
1324
1288
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
transaction->set_segment_id(seg_id + 1);
1326
transaction->set_end_segment(true);
1331
* Continuation of the same statement. Carry forward the existing
1334
const message::InsertData ¤t_data= statement->insert_data();
1335
*next_segment_id= current_data.segment_id();
1292
const message::InsertHeader &insert_header= statement->insert_header();
1293
string old_table_name= insert_header.table_metadata().table_name();
1295
string current_table_name;
1296
(void) in_table->getShare()->getTableName(current_table_name);
1298
if (current_table_name.compare(old_table_name))
1300
finalizeStatementMessage(*statement, in_session);
1301
statement= in_session->getStatementMessage();
1305
/* carry forward the existing segment id */
1306
const message::InsertData ¤t_data= statement->insert_data();
1307
*next_segment_id= current_data.segment_id();
1312
if (statement == NULL)
1315
* Transaction will be non-NULL only if we had to segment it due to
1316
* transaction size above.
1318
if (transaction == NULL)
1319
transaction= getActiveTransactionMessage(in_session);
1322
* Transaction message initialized and set, but no statement created
1323
* yet. We construct one and initialize it, here, then return the
1324
* message after attaching the new Statement message pointer to the
1325
* Session for easy retrieval later...
1327
statement= transaction->add_statement();
1328
setInsertHeader(*statement, in_session, in_table);
1329
in_session->setStatementMessage(statement);
1339
1331
return *statement;
1342
1334
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
Session::const_reference session,
1335
Session *in_session,
1346
initStatementMessage(statement, message::Statement::INSERT, session);
1338
initStatementMessage(statement, message::Statement::INSERT, in_session);
1349
1341
* Now we construct the specialized InsertHeader message inside
1436
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1438
1428
const unsigned char *old_record,
1439
1429
const unsigned char *new_record,
1440
1430
uint32_t *next_segment_id)
1442
message::Statement *statement= session.getStatementMessage();
1432
message::Statement *statement= in_session->getStatementMessage();
1443
1433
message::Transaction *transaction= NULL;
1446
* If statement is NULL, this is a new statement.
1447
* If statement is NOT NULL, this a continuation of the same statement.
1448
* This is because autocommitOrRollback() finalizes the statement so that
1449
* we guarantee only one Statement message per statement (i.e., we no longer
1450
* share a single GPB message for multiple statements).
1436
* Check the type for the current Statement message, if it is anything
1437
* other then UPDATE we need to call finalize, this will ensure a
1438
* new UpdateStatement is created. If it is of type UPDATE check
1439
* what table the UPDATE belongs to, if it is a different table
1440
* call finalize, so a new UpdateStatement can be created.
1452
if (statement == NULL)
1442
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1454
transaction= getActiveTransactionMessage(session);
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
transaction_message_threshold)
1459
transaction= segmentTransactionMessage(session, transaction);
1462
statement= transaction->add_statement();
1463
setUpdateHeader(*statement, session, table, old_record, new_record);
1464
session.setStatementMessage(statement);
1444
finalizeStatementMessage(*statement, in_session);
1445
statement= in_session->getStatementMessage();
1447
else if (statement != NULL)
1468
transaction= getActiveTransactionMessage(session);
1449
transaction= getActiveTransactionMessage(in_session);
1471
1452
* If we've passed our threshold for the statement size (possible for
1472
1453
* a bulk insert), we'll finalize the Statement and Transaction (doing
1473
1454
* the Transaction will keep it from getting huge).
1475
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1476
transaction_message_threshold)
1457
in_session->variables.transaction_message_threshold)
1478
1459
/* Remember the transaction ID so we can re-use it */
1479
1460
uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
uint32_t seg_id= transaction->segment_id();
1482
1462
message::UpdateData *current_data= statement->mutable_update_data();
1484
1464
/* Caller should use this value when adding a new record */
1485
1465
*next_segment_id= current_data->segment_id() + 1;
1487
1467
current_data->set_end_segment(false);
1488
transaction->set_end_segment(false);
1491
1470
* Send the trx message to replicators after finalizing the
1492
1471
* statement and transaction. This will also set the Transaction
1493
1472
* and Statement objects in Session to NULL.
1495
commitTransactionMessage(session);
1474
commitTransactionMessage(in_session);
1498
1477
* Statement and Transaction should now be NULL, so new ones will get
1499
1478
* created. We reuse the transaction id since we are segmenting
1500
1479
* one transaction.
1502
transaction= getActiveTransactionMessage(session, false);
1481
statement= in_session->getStatementMessage();
1482
transaction= getActiveTransactionMessage(in_session, false);
1503
1483
assert(transaction != NULL);
1505
statement= transaction->add_statement();
1506
setUpdateHeader(*statement, session, table, old_record, new_record);
1507
session.setStatementMessage(statement);
1509
1485
/* Set the transaction ID to match the previous messages */
1510
1486
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
transaction->set_segment_id(seg_id + 1);
1512
transaction->set_end_segment(true);
1517
* Continuation of the same statement. Carry forward the existing
1520
const message::UpdateData ¤t_data= statement->update_data();
1521
*next_segment_id= current_data.segment_id();
1490
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1492
/* carry forward the existing segment id */
1493
const message::UpdateData ¤t_data= statement->update_data();
1494
*next_segment_id= current_data.segment_id();
1498
finalizeStatementMessage(*statement, in_session);
1499
statement= in_session->getStatementMessage();
1504
if (statement == NULL)
1507
* Transaction will be non-NULL only if we had to segment it due to
1508
* transaction size above.
1510
if (transaction == NULL)
1511
transaction= getActiveTransactionMessage(in_session);
1514
* Transaction message initialized and set, but no statement created
1515
* yet. We construct one and initialize it, here, then return the
1516
* message after attaching the new Statement message pointer to the
1517
* Session for easy retrieval later...
1519
statement= transaction->add_statement();
1520
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
in_session->setStatementMessage(statement);
1525
1523
return *statement;
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1528
const unsigned char *old_record,
1529
const unsigned char *new_record)
1531
const message::UpdateHeader &update_header= statement.update_header();
1532
string old_table_name= update_header.table_metadata().table_name();
1534
string current_table_name;
1535
(void) in_table->getShare()->getTableName(current_table_name);
1536
if (current_table_name.compare(old_table_name))
1542
/* Compare the set fields in the existing UpdateHeader and see if they
1543
* match the updated fields in the new record, if they do not we must
1544
* create a new UpdateHeader
1546
size_t num_set_fields= update_header.set_field_metadata_size();
1548
Field *current_field;
1549
Field **table_fields= in_table->getFields();
1550
in_table->setReadSet();
1552
size_t num_calculated_updated_fields= 0;
1554
while ((current_field= *table_fields++) != NULL)
1556
if (num_calculated_updated_fields > num_set_fields)
1561
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1563
/* check that this field exists in the UpdateHeader record */
1566
for (size_t x= 0; x < num_set_fields; ++x)
1568
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
string name= field_metadata.name();
1570
if (name.compare(current_field->field_name) == 0)
1573
++num_calculated_updated_fields;
1584
if ((num_calculated_updated_fields == num_set_fields) && found)
1528
1595
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
Session::const_reference session,
1596
Session *in_session,
1531
1598
const unsigned char *old_record,
1532
1599
const unsigned char *new_record)
1534
initStatementMessage(statement, message::Statement::UPDATE, session);
1601
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1537
1604
* Now we construct the specialized UpdateHeader message inside
1707
1774
return isUpdated;
1710
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1712
1779
uint32_t *next_segment_id)
1714
message::Statement *statement= session.getStatementMessage();
1781
message::Statement *statement= in_session->getStatementMessage();
1715
1782
message::Transaction *transaction= NULL;
1718
* If statement is NULL, this is a new statement.
1719
* If statement is NOT NULL, this a continuation of the same statement.
1720
* This is because autocommitOrRollback() finalizes the statement so that
1721
* we guarantee only one Statement message per statement (i.e., we no longer
1722
* share a single GPB message for multiple statements).
1785
* Check the type for the current Statement message, if it is anything
1786
* other then DELETE we need to call finalize, this will ensure a
1787
* new DeleteStatement is created. If it is of type DELETE check
1788
* what table the DELETE belongs to, if it is a different table
1789
* call finalize, so a new DeleteStatement can be created.
1724
if (statement == NULL)
1791
if (statement != NULL && statement->type() != message::Statement::DELETE)
1726
transaction= getActiveTransactionMessage(session);
1728
if (static_cast<size_t>(transaction->ByteSize()) >=
1729
transaction_message_threshold)
1731
transaction= segmentTransactionMessage(session, transaction);
1734
statement= transaction->add_statement();
1735
setDeleteHeader(*statement, session, table);
1736
session.setStatementMessage(statement);
1793
finalizeStatementMessage(*statement, in_session);
1794
statement= in_session->getStatementMessage();
1796
else if (statement != NULL)
1740
transaction= getActiveTransactionMessage(session);
1798
transaction= getActiveTransactionMessage(in_session);
1743
1801
* If we've passed our threshold for the statement size (possible for
1744
1802
* a bulk insert), we'll finalize the Statement and Transaction (doing
1745
1803
* the Transaction will keep it from getting huge).
1747
1805
if (static_cast<size_t>(transaction->ByteSize()) >=
1748
transaction_message_threshold)
1806
in_session->variables.transaction_message_threshold)
1750
1808
/* Remember the transaction ID so we can re-use it */
1751
1809
uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
uint32_t seg_id= transaction->segment_id();
1754
1811
message::DeleteData *current_data= statement->mutable_delete_data();
1756
1813
/* Caller should use this value when adding a new record */
1757
1814
*next_segment_id= current_data->segment_id() + 1;
1759
1816
current_data->set_end_segment(false);
1760
transaction->set_end_segment(false);
1763
1819
* Send the trx message to replicators after finalizing the
1764
1820
* statement and transaction. This will also set the Transaction
1765
1821
* and Statement objects in Session to NULL.
1767
commitTransactionMessage(session);
1823
commitTransactionMessage(in_session);
1770
1826
* Statement and Transaction should now be NULL, so new ones will get
1771
1827
* created. We reuse the transaction id since we are segmenting
1772
1828
* one transaction.
1774
transaction= getActiveTransactionMessage(session, false);
1830
statement= in_session->getStatementMessage();
1831
transaction= getActiveTransactionMessage(in_session, false);
1775
1832
assert(transaction != NULL);
1777
statement= transaction->add_statement();
1778
setDeleteHeader(*statement, session, table);
1779
session.setStatementMessage(statement);
1781
1834
/* Set the transaction ID to match the previous messages */
1782
1835
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
transaction->set_segment_id(seg_id + 1);
1784
transaction->set_end_segment(true);
1789
* Continuation of the same statement. Carry forward the existing
1792
const message::DeleteData ¤t_data= statement->delete_data();
1793
*next_segment_id= current_data.segment_id();
1839
const message::DeleteHeader &delete_header= statement->delete_header();
1840
string old_table_name= delete_header.table_metadata().table_name();
1842
string current_table_name;
1843
(void) in_table->getShare()->getTableName(current_table_name);
1844
if (current_table_name.compare(old_table_name))
1846
finalizeStatementMessage(*statement, in_session);
1847
statement= in_session->getStatementMessage();
1851
/* carry forward the existing segment id */
1852
const message::DeleteData ¤t_data= statement->delete_data();
1853
*next_segment_id= current_data.segment_id();
1858
if (statement == NULL)
1861
* Transaction will be non-NULL only if we had to segment it due to
1862
* transaction size above.
1864
if (transaction == NULL)
1865
transaction= getActiveTransactionMessage(in_session);
1868
* Transaction message initialized and set, but no statement created
1869
* yet. We construct one and initialize it, here, then return the
1870
* message after attaching the new Statement message pointer to the
1871
* Session for easy retrieval later...
1873
statement= transaction->add_statement();
1874
setDeleteHeader(*statement, in_session, in_table);
1875
in_session->setStatementMessage(statement);
1797
1877
return *statement;
1800
1880
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
Session::const_reference session,
1881
Session *in_session,
1804
initStatementMessage(statement, message::Statement::DELETE, session);
1884
initStatementMessage(statement, message::Statement::DELETE, in_session);
1807
1887
* Now we construct the specialized DeleteHeader message inside
1916
1994
message::Table *new_table_message= create_table_statement->mutable_table();
1917
1995
*new_table_message= table;
1919
finalizeStatementMessage(*statement, session);
1997
finalizeStatementMessage(*statement, in_session);
1921
finalizeTransactionMessage(*transaction, session);
1999
finalizeTransactionMessage(*transaction, in_session);
1923
(void) replication_services.pushTransactionMessage(session, *transaction);
2001
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1925
cleanupTransactionMessage(transaction, session);
2003
cleanupTransactionMessage(transaction, in_session);
1929
void TransactionServices::createSchema(Session::reference session,
2007
void TransactionServices::createSchema(Session *in_session,
1930
2008
const message::Schema &schema)
1932
2010
ReplicationServices &replication_services= ReplicationServices::singleton();
1933
2011
if (! replication_services.isActive())
1936
message::Transaction *transaction= getActiveTransactionMessage(session);
2014
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1937
2015
message::Statement *statement= transaction->add_statement();
1939
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
2017
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1942
2020
* Construct the specialized CreateSchemaStatement message and attach
1946
2024
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
2025
*new_schema_message= schema;
1949
finalizeStatementMessage(*statement, session);
2027
finalizeStatementMessage(*statement, in_session);
1951
finalizeTransactionMessage(*transaction, session);
2029
finalizeTransactionMessage(*transaction, in_session);
1953
(void) replication_services.pushTransactionMessage(session, *transaction);
2031
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1955
cleanupTransactionMessage(transaction, session);
2033
cleanupTransactionMessage(transaction, in_session);
1959
void TransactionServices::dropSchema(Session::reference session,
1960
identifier::Schema::const_reference identifier)
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1962
2039
ReplicationServices &replication_services= ReplicationServices::singleton();
1963
2040
if (! replication_services.isActive())
1966
message::Transaction *transaction= getActiveTransactionMessage(session);
2043
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1967
2044
message::Statement *statement= transaction->add_statement();
1969
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
2046
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1972
2049
* Construct the specialized DropSchemaStatement message and attach
1975
2052
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1977
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1979
finalizeStatementMessage(*statement, session);
1981
finalizeTransactionMessage(*transaction, session);
1983
(void) replication_services.pushTransactionMessage(session, *transaction);
1985
cleanupTransactionMessage(transaction, session);
1988
void TransactionServices::alterSchema(Session::reference session,
1989
const message::schema::shared_ptr &old_schema,
1990
const message::Schema &new_schema)
1992
ReplicationServices &replication_services= ReplicationServices::singleton();
1993
if (! replication_services.isActive())
1996
message::Transaction *transaction= getActiveTransactionMessage(session);
1997
message::Statement *statement= transaction->add_statement();
1999
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2002
* Construct the specialized AlterSchemaStatement message and attach
2003
* it to the generic Statement message
2005
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2007
message::Schema *before= alter_schema_statement->mutable_before();
2008
message::Schema *after= alter_schema_statement->mutable_after();
2010
*before= *old_schema;
2013
finalizeStatementMessage(*statement, session);
2015
finalizeTransactionMessage(*transaction, session);
2017
(void) replication_services.pushTransactionMessage(session, *transaction);
2019
cleanupTransactionMessage(transaction, session);
2022
void TransactionServices::dropTable(Session::reference session,
2023
const identifier::Table &table,
2054
drop_schema_statement->set_schema_name(schema_name);
2056
finalizeStatementMessage(*statement, in_session);
2058
finalizeTransactionMessage(*transaction, in_session);
2060
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2062
cleanupTransactionMessage(transaction, in_session);
2065
void TransactionServices::dropTable(Session *in_session,
2066
const string &schema_name,
2067
const string &table_name,
2024
2068
bool if_exists)
2026
2070
ReplicationServices &replication_services= ReplicationServices::singleton();
2027
2071
if (! replication_services.isActive())
2030
message::Transaction *transaction= getActiveTransactionMessage(session);
2074
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2031
2075
message::Statement *statement= transaction->add_statement();
2033
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2077
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2036
2080
* Construct the specialized DropTableStatement message and attach
2043
2087
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2045
table_metadata->set_schema_name(table.getSchemaName());
2046
table_metadata->set_table_name(table.getTableName());
2048
finalizeStatementMessage(*statement, session);
2050
finalizeTransactionMessage(*transaction, session);
2089
table_metadata->set_schema_name(schema_name);
2090
table_metadata->set_table_name(table_name);
2092
finalizeStatementMessage(*statement, in_session);
2094
finalizeTransactionMessage(*transaction, in_session);
2052
(void) replication_services.pushTransactionMessage(session, *transaction);
2096
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2054
cleanupTransactionMessage(transaction, session);
2098
cleanupTransactionMessage(transaction, in_session);
2057
void TransactionServices::truncateTable(Session::reference session,
2101
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2060
2103
ReplicationServices &replication_services= ReplicationServices::singleton();
2061
2104
if (! replication_services.isActive())
2064
message::Transaction *transaction= getActiveTransactionMessage(session);
2107
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2065
2108
message::Statement *statement= transaction->add_statement();
2067
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2110
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2070
2113
* Construct the specialized TruncateTableStatement message and attach
2074
2117
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2076
2119
string schema_name;
2077
(void) table.getShare()->getSchemaName(schema_name);
2120
(void) in_table->getShare()->getSchemaName(schema_name);
2078
2121
string table_name;
2079
(void) table.getShare()->getTableName(table_name);
2122
(void) in_table->getShare()->getTableName(table_name);
2081
2124
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
2125
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2084
finalizeStatementMessage(*statement, session);
2127
finalizeStatementMessage(*statement, in_session);
2086
finalizeTransactionMessage(*transaction, session);
2129
finalizeTransactionMessage(*transaction, in_session);
2088
(void) replication_services.pushTransactionMessage(session, *transaction);
2131
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2090
cleanupTransactionMessage(transaction, session);
2133
cleanupTransactionMessage(transaction, in_session);
2093
void TransactionServices::rawStatement(Session::reference session,
2094
const string &query)
2136
void TransactionServices::rawStatement(Session *in_session, const string &query)
2096
2138
ReplicationServices &replication_services= ReplicationServices::singleton();
2097
2139
if (! replication_services.isActive())
2100
message::Transaction *transaction= getActiveTransactionMessage(session);
2142
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2101
2143
message::Statement *statement= transaction->add_statement();
2103
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2145
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2104
2146
statement->set_sql(query);
2105
finalizeStatementMessage(*statement, session);
2147
finalizeStatementMessage(*statement, in_session);
2107
finalizeTransactionMessage(*transaction, session);
2149
finalizeTransactionMessage(*transaction, in_session);
2109
(void) replication_services.pushTransactionMessage(session, *transaction);
2151
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2111
cleanupTransactionMessage(transaction, session);
2153
cleanupTransactionMessage(transaction, in_session);
2114
int TransactionServices::sendEvent(Session::reference session,
2115
const message::Event &event)
2156
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2117
2158
ReplicationServices &replication_services= ReplicationServices::singleton();
2118
2159
if (! replication_services.isActive())