47
47
* plugins can understand.
51
#include <drizzled/current_session.h>
52
#include <drizzled/my_hash.h>
53
#include <drizzled/error.h>
54
#include <drizzled/gettext.h>
55
#include <drizzled/probes.h>
56
#include <drizzled/sql_parse.h>
57
#include <drizzled/session.h>
58
#include <drizzled/sql_base.h>
59
#include <drizzled/replication_services.h>
60
#include <drizzled/transaction_services.h>
61
#include <drizzled/transaction_context.h>
62
#include <drizzled/message/transaction.pb.h>
63
#include <drizzled/message/statement_transform.h>
64
#include <drizzled/resource_context.h>
65
#include <drizzled/lock.h>
66
#include <drizzled/item/int.h>
67
#include <drizzled/item/empty_string.h>
68
#include <drizzled/field/epoch.h>
69
#include <drizzled/plugin/client.h>
70
#include <drizzled/plugin/monitored_in_transaction.h>
71
#include <drizzled/plugin/transactional_storage_engine.h>
72
#include <drizzled/plugin/xa_resource_manager.h>
73
#include <drizzled/plugin/xa_storage_engine.h>
74
#include <drizzled/internal/my_sys.h>
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/epoch.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
77
76
#include <algorithm>
401
400
resource_context->setTransactionalStorageEngine(engine);
402
401
trans->no_2pc|= true;
404
if (session.transaction.xid_state.xid.is_null())
405
session.transaction.xid_state.xid.set(session.getQueryId());
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
407
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
if (! session.getResourceContext(monitored, 0)->isStarted())
407
if (! session->getResourceContext(monitored, 0)->isStarted())
409
408
registerResourceForStatement(session, monitored, engine);
412
void TransactionServices::registerResourceForTransaction(Session::reference session,
411
void TransactionServices::registerResourceForTransaction(Session *session,
413
412
plugin::MonitoredInTransaction *monitored,
414
413
plugin::TransactionalStorageEngine *engine,
415
414
plugin::XaResourceManager *resource_manager)
417
TransactionContext *trans= &session.transaction.all;
418
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
420
419
if (resource_context->isStarted())
421
420
return; /* already registered, return */
423
session.server_status|= SERVER_STATUS_IN_TRANS;
422
session->server_status|= SERVER_STATUS_IN_TRANS;
425
424
trans->registerResource(resource_context);
454
453
my_session->setXaId(xa_id);
457
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
459
if (session.getXaId() == 0)
458
if (session->getXaId() == 0)
461
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
464
return session.getXaId();
463
return session->getXaId();
467
int TransactionServices::commitTransaction(Session::reference session,
468
bool normal_transaction)
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
470
482
int error= 0, cookie= 0;
472
484
'all' means that this is either an explicit commit issued by
473
485
user, or an implicit commit issued by a DDL.
475
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
476
488
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
478
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
481
493
We must not commit the normal transaction if a statement
587
591
if (resource->participatesInXaTransaction())
589
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
591
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
594
598
else if (normal_transaction)
596
session.status_var.ha_commit_count++;
600
session->status_var.ha_commit_count++;
599
603
else if (resource->participatesInSqlTransaction())
601
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
603
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
606
610
else if (normal_transaction)
608
session.status_var.ha_commit_count++;
612
session->status_var.ha_commit_count++;
611
615
resource_context->reset(); /* keep it conveniently zero-filled */
614
618
if (is_real_trans)
615
session.transaction.xid_state.xid.null();
619
session->transaction.xid_state.xid.null();
617
621
if (normal_transaction)
619
session.variables.tx_isolation= session.session_tx_isolation;
620
session.transaction.cleanup();
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
627
int TransactionServices::rollbackTransaction(Session::reference session,
628
bool normal_transaction)
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
631
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
632
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
634
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
638
640
We must not rollback the normal transaction if a statement
639
641
transaction is pending.
641
assert(session.transaction.stmt.getResourceContexts().empty() ||
642
trans == &session.transaction.stmt);
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
644
646
if (resource_contexts.empty() == false)
999
1003
* deleting transaction message when done with it.
1001
1005
transaction= new (nothrow) message::Transaction();
1002
initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
session.setTransactionMessage(transaction);
1006
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
in_session->setTransactionMessage(transaction);
1004
1008
return transaction;
1007
1011
return transaction;
1010
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
Session::reference session,
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
Session *in_session,
1012
1016
bool should_inc_trx_id)
1014
message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
trx->set_server_id(session.getServerId());
1018
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
trx->set_server_id(in_session->getServerId());
1017
1021
if (should_inc_trx_id)
1019
trx->set_transaction_id(getCurrentTransactionId(session));
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1024
/* trx and seg id will get set properly elsewhere */
1025
1028
trx->set_transaction_id(0);
1028
trx->set_start_timestamp(session.getCurrentTimestamp());
1030
/* segment info may get set elsewhere as needed */
1031
transaction.set_segment_id(1);
1032
transaction.set_end_segment(true);
1035
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
Session::const_reference session)
1038
message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
trx->set_end_timestamp(session.getCurrentTimestamp());
1042
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
Session::reference session)
1046
session.setStatementMessage(NULL);
1047
session.setTransactionMessage(NULL);
1051
int TransactionServices::commitTransactionMessage(Session::reference session)
1031
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
Session *in_session)
1037
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
Session *in_session)
1044
delete in_transaction;
1045
in_session->setStatementMessage(NULL);
1046
in_session->setTransactionMessage(NULL);
1047
in_session->setXaId(0);
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1053
1052
ReplicationServices &replication_services= ReplicationServices::singleton();
1054
1053
if (! replication_services.isActive())
1080
1079
if (transaction->statement_size() == 0)
1082
cleanupTransactionMessage(transaction, session);
1081
cleanupTransactionMessage(transaction, in_session);
1086
finalizeTransactionMessage(*transaction, session);
1085
finalizeTransactionMessage(*transaction, in_session);
1088
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1090
cleanupTransactionMessage(transaction, session);
1089
cleanupTransactionMessage(transaction, in_session);
1092
1091
return static_cast<int>(result);
1095
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
message::Statement::Type type,
1097
Session::const_reference session)
1095
message::Statement::Type in_type,
1096
Session *in_session)
1099
statement.set_type(type);
1100
statement.set_start_timestamp(session.getCurrentTimestamp());
1098
statement.set_type(in_type);
1099
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1102
if (session.variables.replicate_query)
1103
statement.set_sql(session.getQueryString()->c_str());
1101
if (in_session->variables.replicate_query)
1102
statement.set_sql(in_session->getQueryString()->c_str());
1106
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
Session::reference session)
1106
Session *in_session)
1109
statement.set_end_timestamp(session.getCurrentTimestamp());
1110
session.setStatementMessage(NULL);
1108
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
in_session->setStatementMessage(NULL);
1113
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1115
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1116
1115
if (! replication_services.isActive())
1119
message::Transaction *transaction= getActiveTransactionMessage(session);
1118
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1122
1121
* OK, so there are two situations that we need to deal with here:
1140
1139
/* Remember the transaction ID so we can re-use it */
1141
1140
uint64_t trx_id= transaction->transaction_context().transaction_id();
1142
uint32_t seg_id= transaction->segment_id();
1145
1143
* Clear the transaction, create a Rollback statement message,
1146
1144
* attach it to the transaction, and push it to replicators.
1148
1146
transaction->Clear();
1149
initTransactionMessage(*transaction, session, false);
1147
initTransactionMessage(*transaction, in_session, false);
1151
1149
/* Set the transaction ID to match the previous messages */
1152
1150
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1153
transaction->set_segment_id(seg_id);
1154
transaction->set_end_segment(true);
1156
1152
message::Statement *statement= transaction->add_statement();
1158
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
finalizeStatementMessage(*statement, session);
1154
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
finalizeStatementMessage(*statement, in_session);
1161
finalizeTransactionMessage(*transaction, session);
1157
finalizeTransactionMessage(*transaction, in_session);
1163
(void) replication_services.pushTransactionMessage(session, *transaction);
1159
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1166
cleanupTransactionMessage(transaction, session);
1161
cleanupTransactionMessage(transaction, in_session);
1169
void TransactionServices::rollbackStatementMessage(Session::reference session)
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1171
1166
ReplicationServices &replication_services= ReplicationServices::singleton();
1172
1167
if (! replication_services.isActive())
1175
message::Statement *current_statement= session.getStatementMessage();
1170
message::Statement *current_statement= in_session->getStatementMessage();
1177
1172
/* If we never added a Statement message, nothing to undo. */
1178
1173
if (current_statement == NULL)
1227
1222
current_statement= transaction->add_statement();
1228
1223
initStatementMessage(*current_statement,
1229
1224
message::Statement::ROLLBACK_STATEMENT,
1231
finalizeStatementMessage(*current_statement, session);
1226
finalizeStatementMessage(*current_statement, in_session);
1235
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1236
message::Transaction *transaction)
1238
uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
uint32_t seg_id= transaction->segment_id();
1241
transaction->set_end_segment(false);
1242
commitTransactionMessage(session);
1243
transaction= getActiveTransactionMessage(session, false);
1245
/* Set the transaction ID to match the previous messages */
1246
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1247
transaction->set_segment_id(seg_id + 1);
1248
transaction->set_end_segment(true);
1253
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1255
1232
uint32_t *next_segment_id)
1257
message::Statement *statement= session.getStatementMessage();
1234
message::Statement *statement= in_session->getStatementMessage();
1258
1235
message::Transaction *transaction= NULL;
1261
* If statement is NULL, this is a new statement.
1262
* If statement is NOT NULL, this a continuation of the same statement.
1263
* This is because autocommitOrRollback() finalizes the statement so that
1264
* we guarantee only one Statement message per statement (i.e., we no longer
1265
* share a single GPB message for multiple statements).
1238
* Check the type for the current Statement message, if it is anything
1239
* other then INSERT we need to call finalize, this will ensure a
1240
* new InsertStatement is created. If it is of type INSERT check
1241
* what table the INSERT belongs to, if it is a different table
1242
* call finalize, so a new InsertStatement can be created.
1267
if (statement == NULL)
1269
transaction= getActiveTransactionMessage(session);
1271
if (static_cast<size_t>(transaction->ByteSize()) >=
1272
transaction_message_threshold)
1274
transaction= segmentTransactionMessage(session, transaction);
1277
statement= transaction->add_statement();
1278
setInsertHeader(*statement, session, table);
1279
session.setStatementMessage(statement);
1283
transaction= getActiveTransactionMessage(session);
1244
if (statement != NULL && statement->type() != message::Statement::INSERT)
1246
finalizeStatementMessage(*statement, in_session);
1247
statement= in_session->getStatementMessage();
1249
else if (statement != NULL)
1251
transaction= getActiveTransactionMessage(in_session);
1286
1254
* If we've passed our threshold for the statement size (possible for
1287
1255
* a bulk insert), we'll finalize the Statement and Transaction (doing
1288
1256
* the Transaction will keep it from getting huge).
1290
1258
if (static_cast<size_t>(transaction->ByteSize()) >=
1291
transaction_message_threshold)
1259
in_session->variables.transaction_message_threshold)
1293
1261
/* Remember the transaction ID so we can re-use it */
1294
1262
uint64_t trx_id= transaction->transaction_context().transaction_id();
1295
uint32_t seg_id= transaction->segment_id();
1297
1264
message::InsertData *current_data= statement->mutable_insert_data();
1299
1266
/* Caller should use this value when adding a new record */
1300
1267
*next_segment_id= current_data->segment_id() + 1;
1302
1269
current_data->set_end_segment(false);
1303
transaction->set_end_segment(false);
1306
1272
* Send the trx message to replicators after finalizing the
1307
1273
* statement and transaction. This will also set the Transaction
1308
1274
* and Statement objects in Session to NULL.
1310
commitTransactionMessage(session);
1276
commitTransactionMessage(in_session);
1313
1279
* Statement and Transaction should now be NULL, so new ones will get
1314
1280
* created. We reuse the transaction id since we are segmenting
1315
1281
* one transaction.
1317
transaction= getActiveTransactionMessage(session, false);
1283
statement= in_session->getStatementMessage();
1284
transaction= getActiveTransactionMessage(in_session, false);
1318
1285
assert(transaction != NULL);
1320
statement= transaction->add_statement();
1321
setInsertHeader(*statement, session, table);
1322
session.setStatementMessage(statement);
1324
1287
/* Set the transaction ID to match the previous messages */
1325
1288
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1326
transaction->set_segment_id(seg_id + 1);
1327
transaction->set_end_segment(true);
1332
* Continuation of the same statement. Carry forward the existing
1335
const message::InsertData ¤t_data= statement->insert_data();
1336
*next_segment_id= current_data.segment_id();
1292
const message::InsertHeader &insert_header= statement->insert_header();
1293
string old_table_name= insert_header.table_metadata().table_name();
1295
string current_table_name;
1296
(void) in_table->getShare()->getTableName(current_table_name);
1298
if (current_table_name.compare(old_table_name))
1300
finalizeStatementMessage(*statement, in_session);
1301
statement= in_session->getStatementMessage();
1305
/* carry forward the existing segment id */
1306
const message::InsertData ¤t_data= statement->insert_data();
1307
*next_segment_id= current_data.segment_id();
1312
if (statement == NULL)
1315
* Transaction will be non-NULL only if we had to segment it due to
1316
* transaction size above.
1318
if (transaction == NULL)
1319
transaction= getActiveTransactionMessage(in_session);
1322
* Transaction message initialized and set, but no statement created
1323
* yet. We construct one and initialize it, here, then return the
1324
* message after attaching the new Statement message pointer to the
1325
* Session for easy retrieval later...
1327
statement= transaction->add_statement();
1328
setInsertHeader(*statement, in_session, in_table);
1329
in_session->setStatementMessage(statement);
1340
1331
return *statement;
1343
1334
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
Session::const_reference session,
1335
Session *in_session,
1347
initStatementMessage(statement, message::Statement::INSERT, session);
1338
initStatementMessage(statement, message::Statement::INSERT, in_session);
1350
1341
* Now we construct the specialized InsertHeader message inside
1440
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1442
1428
const unsigned char *old_record,
1443
1429
const unsigned char *new_record,
1444
1430
uint32_t *next_segment_id)
1446
message::Statement *statement= session.getStatementMessage();
1432
message::Statement *statement= in_session->getStatementMessage();
1447
1433
message::Transaction *transaction= NULL;
1450
* If statement is NULL, this is a new statement.
1451
* If statement is NOT NULL, this a continuation of the same statement.
1452
* This is because autocommitOrRollback() finalizes the statement so that
1453
* we guarantee only one Statement message per statement (i.e., we no longer
1454
* share a single GPB message for multiple statements).
1436
* Check the type for the current Statement message, if it is anything
1437
* other then UPDATE we need to call finalize, this will ensure a
1438
* new UpdateStatement is created. If it is of type UPDATE check
1439
* what table the UPDATE belongs to, if it is a different table
1440
* call finalize, so a new UpdateStatement can be created.
1456
if (statement == NULL)
1442
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1458
transaction= getActiveTransactionMessage(session);
1460
if (static_cast<size_t>(transaction->ByteSize()) >=
1461
transaction_message_threshold)
1463
transaction= segmentTransactionMessage(session, transaction);
1466
statement= transaction->add_statement();
1467
setUpdateHeader(*statement, session, table, old_record, new_record);
1468
session.setStatementMessage(statement);
1444
finalizeStatementMessage(*statement, in_session);
1445
statement= in_session->getStatementMessage();
1447
else if (statement != NULL)
1472
transaction= getActiveTransactionMessage(session);
1449
transaction= getActiveTransactionMessage(in_session);
1475
1452
* If we've passed our threshold for the statement size (possible for
1476
1453
* a bulk insert), we'll finalize the Statement and Transaction (doing
1477
1454
* the Transaction will keep it from getting huge).
1479
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1480
transaction_message_threshold)
1457
in_session->variables.transaction_message_threshold)
1482
1459
/* Remember the transaction ID so we can re-use it */
1483
1460
uint64_t trx_id= transaction->transaction_context().transaction_id();
1484
uint32_t seg_id= transaction->segment_id();
1486
1462
message::UpdateData *current_data= statement->mutable_update_data();
1488
1464
/* Caller should use this value when adding a new record */
1489
1465
*next_segment_id= current_data->segment_id() + 1;
1491
1467
current_data->set_end_segment(false);
1492
transaction->set_end_segment(false);
1495
1470
* Send the trx message to replicators after finalizing the
1496
1471
* statement and transaction. This will also set the Transaction
1497
1472
* and Statement objects in Session to NULL.
1499
commitTransactionMessage(session);
1474
commitTransactionMessage(in_session);
1502
1477
* Statement and Transaction should now be NULL, so new ones will get
1503
1478
* created. We reuse the transaction id since we are segmenting
1504
1479
* one transaction.
1506
transaction= getActiveTransactionMessage(session, false);
1481
statement= in_session->getStatementMessage();
1482
transaction= getActiveTransactionMessage(in_session, false);
1507
1483
assert(transaction != NULL);
1509
statement= transaction->add_statement();
1510
setUpdateHeader(*statement, session, table, old_record, new_record);
1511
session.setStatementMessage(statement);
1513
1485
/* Set the transaction ID to match the previous messages */
1514
1486
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1515
transaction->set_segment_id(seg_id + 1);
1516
transaction->set_end_segment(true);
1521
* Continuation of the same statement. Carry forward the existing
1524
const message::UpdateData ¤t_data= statement->update_data();
1525
*next_segment_id= current_data.segment_id();
1490
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1492
/* carry forward the existing segment id */
1493
const message::UpdateData ¤t_data= statement->update_data();
1494
*next_segment_id= current_data.segment_id();
1498
finalizeStatementMessage(*statement, in_session);
1499
statement= in_session->getStatementMessage();
1504
if (statement == NULL)
1507
* Transaction will be non-NULL only if we had to segment it due to
1508
* transaction size above.
1510
if (transaction == NULL)
1511
transaction= getActiveTransactionMessage(in_session);
1514
* Transaction message initialized and set, but no statement created
1515
* yet. We construct one and initialize it, here, then return the
1516
* message after attaching the new Statement message pointer to the
1517
* Session for easy retrieval later...
1519
statement= transaction->add_statement();
1520
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
in_session->setStatementMessage(statement);
1529
1523
return *statement;
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1528
const unsigned char *old_record,
1529
const unsigned char *new_record)
1531
const message::UpdateHeader &update_header= statement.update_header();
1532
string old_table_name= update_header.table_metadata().table_name();
1534
string current_table_name;
1535
(void) in_table->getShare()->getTableName(current_table_name);
1536
if (current_table_name.compare(old_table_name))
1542
/* Compare the set fields in the existing UpdateHeader and see if they
1543
* match the updated fields in the new record, if they do not we must
1544
* create a new UpdateHeader
1546
size_t num_set_fields= update_header.set_field_metadata_size();
1548
Field *current_field;
1549
Field **table_fields= in_table->getFields();
1550
in_table->setReadSet();
1552
size_t num_calculated_updated_fields= 0;
1554
while ((current_field= *table_fields++) != NULL)
1556
if (num_calculated_updated_fields > num_set_fields)
1561
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1563
/* check that this field exists in the UpdateHeader record */
1566
for (size_t x= 0; x < num_set_fields; ++x)
1568
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
string name= field_metadata.name();
1570
if (name.compare(current_field->field_name) == 0)
1573
++num_calculated_updated_fields;
1584
if ((num_calculated_updated_fields == num_set_fields) && found)
1532
1595
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
Session::const_reference session,
1596
Session *in_session,
1535
1598
const unsigned char *old_record,
1536
1599
const unsigned char *new_record)
1538
initStatementMessage(statement, message::Statement::UPDATE, session);
1601
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1541
1604
* Now we construct the specialized UpdateHeader message inside
1715
1774
return isUpdated;
1718
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1720
1779
uint32_t *next_segment_id)
1722
message::Statement *statement= session.getStatementMessage();
1781
message::Statement *statement= in_session->getStatementMessage();
1723
1782
message::Transaction *transaction= NULL;
1726
* If statement is NULL, this is a new statement.
1727
* If statement is NOT NULL, this a continuation of the same statement.
1728
* This is because autocommitOrRollback() finalizes the statement so that
1729
* we guarantee only one Statement message per statement (i.e., we no longer
1730
* share a single GPB message for multiple statements).
1785
* Check the type for the current Statement message, if it is anything
1786
* other then DELETE we need to call finalize, this will ensure a
1787
* new DeleteStatement is created. If it is of type DELETE check
1788
* what table the DELETE belongs to, if it is a different table
1789
* call finalize, so a new DeleteStatement can be created.
1732
if (statement == NULL)
1791
if (statement != NULL && statement->type() != message::Statement::DELETE)
1734
transaction= getActiveTransactionMessage(session);
1736
if (static_cast<size_t>(transaction->ByteSize()) >=
1737
transaction_message_threshold)
1739
transaction= segmentTransactionMessage(session, transaction);
1742
statement= transaction->add_statement();
1743
setDeleteHeader(*statement, session, table);
1744
session.setStatementMessage(statement);
1793
finalizeStatementMessage(*statement, in_session);
1794
statement= in_session->getStatementMessage();
1796
else if (statement != NULL)
1748
transaction= getActiveTransactionMessage(session);
1798
transaction= getActiveTransactionMessage(in_session);
1751
1801
* If we've passed our threshold for the statement size (possible for
1752
1802
* a bulk insert), we'll finalize the Statement and Transaction (doing
1753
1803
* the Transaction will keep it from getting huge).
1755
1805
if (static_cast<size_t>(transaction->ByteSize()) >=
1756
transaction_message_threshold)
1806
in_session->variables.transaction_message_threshold)
1758
1808
/* Remember the transaction ID so we can re-use it */
1759
1809
uint64_t trx_id= transaction->transaction_context().transaction_id();
1760
uint32_t seg_id= transaction->segment_id();
1762
1811
message::DeleteData *current_data= statement->mutable_delete_data();
1764
1813
/* Caller should use this value when adding a new record */
1765
1814
*next_segment_id= current_data->segment_id() + 1;
1767
1816
current_data->set_end_segment(false);
1768
transaction->set_end_segment(false);
1771
1819
* Send the trx message to replicators after finalizing the
1772
1820
* statement and transaction. This will also set the Transaction
1773
1821
* and Statement objects in Session to NULL.
1775
commitTransactionMessage(session);
1823
commitTransactionMessage(in_session);
1778
1826
* Statement and Transaction should now be NULL, so new ones will get
1779
1827
* created. We reuse the transaction id since we are segmenting
1780
1828
* one transaction.
1782
transaction= getActiveTransactionMessage(session, false);
1830
statement= in_session->getStatementMessage();
1831
transaction= getActiveTransactionMessage(in_session, false);
1783
1832
assert(transaction != NULL);
1785
statement= transaction->add_statement();
1786
setDeleteHeader(*statement, session, table);
1787
session.setStatementMessage(statement);
1789
1834
/* Set the transaction ID to match the previous messages */
1790
1835
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1791
transaction->set_segment_id(seg_id + 1);
1792
transaction->set_end_segment(true);
1797
* Continuation of the same statement. Carry forward the existing
1800
const message::DeleteData ¤t_data= statement->delete_data();
1801
*next_segment_id= current_data.segment_id();
1839
const message::DeleteHeader &delete_header= statement->delete_header();
1840
string old_table_name= delete_header.table_metadata().table_name();
1842
string current_table_name;
1843
(void) in_table->getShare()->getTableName(current_table_name);
1844
if (current_table_name.compare(old_table_name))
1846
finalizeStatementMessage(*statement, in_session);
1847
statement= in_session->getStatementMessage();
1851
/* carry forward the existing segment id */
1852
const message::DeleteData ¤t_data= statement->delete_data();
1853
*next_segment_id= current_data.segment_id();
1858
if (statement == NULL)
1861
* Transaction will be non-NULL only if we had to segment it due to
1862
* transaction size above.
1864
if (transaction == NULL)
1865
transaction= getActiveTransactionMessage(in_session);
1868
* Transaction message initialized and set, but no statement created
1869
* yet. We construct one and initialize it, here, then return the
1870
* message after attaching the new Statement message pointer to the
1871
* Session for easy retrieval later...
1873
statement= transaction->add_statement();
1874
setDeleteHeader(*statement, in_session, in_table);
1875
in_session->setStatementMessage(statement);
1805
1877
return *statement;
1808
1880
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
Session::const_reference session,
1881
Session *in_session,
1812
initStatementMessage(statement, message::Statement::DELETE, session);
1884
initStatementMessage(statement, message::Statement::DELETE, in_session);
1815
1887
* Now we construct the specialized DeleteHeader message inside
1930
1994
message::Table *new_table_message= create_table_statement->mutable_table();
1931
1995
*new_table_message= table;
1933
finalizeStatementMessage(*statement, session);
1997
finalizeStatementMessage(*statement, in_session);
1935
finalizeTransactionMessage(*transaction, session);
1999
finalizeTransactionMessage(*transaction, in_session);
1937
(void) replication_services.pushTransactionMessage(session, *transaction);
2001
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1939
cleanupTransactionMessage(transaction, session);
2003
cleanupTransactionMessage(transaction, in_session);
1943
void TransactionServices::createSchema(Session::reference session,
2007
void TransactionServices::createSchema(Session *in_session,
1944
2008
const message::Schema &schema)
1946
2010
ReplicationServices &replication_services= ReplicationServices::singleton();
1947
2011
if (! replication_services.isActive())
1950
if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1953
message::Transaction *transaction= getActiveTransactionMessage(session);
2014
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1954
2015
message::Statement *statement= transaction->add_statement();
1956
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
2017
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1959
2020
* Construct the specialized CreateSchemaStatement message and attach
1963
2024
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
2025
*new_schema_message= schema;
1966
finalizeStatementMessage(*statement, session);
2027
finalizeStatementMessage(*statement, in_session);
1968
finalizeTransactionMessage(*transaction, session);
2029
finalizeTransactionMessage(*transaction, in_session);
1970
(void) replication_services.pushTransactionMessage(session, *transaction);
2031
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1972
cleanupTransactionMessage(transaction, session);
2033
cleanupTransactionMessage(transaction, in_session);
1976
void TransactionServices::dropSchema(Session::reference session,
1977
identifier::Schema::const_reference identifier,
1978
message::schema::const_reference schema)
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1980
2039
ReplicationServices &replication_services= ReplicationServices::singleton();
1981
2040
if (! replication_services.isActive())
1984
if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1987
message::Transaction *transaction= getActiveTransactionMessage(session);
2043
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1988
2044
message::Statement *statement= transaction->add_statement();
1990
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
2046
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1993
2049
* Construct the specialized DropSchemaStatement message and attach
1996
2052
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1998
drop_schema_statement->set_schema_name(identifier.getSchemaName());
2000
finalizeStatementMessage(*statement, session);
2002
finalizeTransactionMessage(*transaction, session);
2004
(void) replication_services.pushTransactionMessage(session, *transaction);
2006
cleanupTransactionMessage(transaction, session);
2009
void TransactionServices::alterSchema(Session::reference session,
2010
const message::Schema &old_schema,
2011
const message::Schema &new_schema)
2013
ReplicationServices &replication_services= ReplicationServices::singleton();
2014
if (! replication_services.isActive())
2017
if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2020
message::Transaction *transaction= getActiveTransactionMessage(session);
2021
message::Statement *statement= transaction->add_statement();
2023
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2026
* Construct the specialized AlterSchemaStatement message and attach
2027
* it to the generic Statement message
2029
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2031
message::Schema *before= alter_schema_statement->mutable_before();
2032
message::Schema *after= alter_schema_statement->mutable_after();
2034
*before= old_schema;
2037
finalizeStatementMessage(*statement, session);
2039
finalizeTransactionMessage(*transaction, session);
2041
(void) replication_services.pushTransactionMessage(session, *transaction);
2043
cleanupTransactionMessage(transaction, session);
2046
void TransactionServices::dropTable(Session::reference session,
2047
identifier::Table::const_reference identifier,
2048
message::table::const_reference table,
2051
ReplicationServices &replication_services= ReplicationServices::singleton();
2052
if (! replication_services.isActive())
2055
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2058
message::Transaction *transaction= getActiveTransactionMessage(session);
2059
message::Statement *statement= transaction->add_statement();
2061
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2054
drop_schema_statement->set_schema_name(schema_name);
2056
finalizeStatementMessage(*statement, in_session);
2058
finalizeTransactionMessage(*transaction, in_session);
2060
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2062
cleanupTransactionMessage(transaction, in_session);
2065
void TransactionServices::dropTable(Session *in_session,
2066
const string &schema_name,
2067
const string &table_name)
2069
ReplicationServices &replication_services= ReplicationServices::singleton();
2070
if (! replication_services.isActive())
2073
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2074
message::Statement *statement= transaction->add_statement();
2076
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2064
2079
* Construct the specialized DropTableStatement message and attach
2067
2082
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2069
drop_table_statement->set_if_exists_clause(if_exists);
2071
2084
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2073
table_metadata->set_schema_name(identifier.getSchemaName());
2074
table_metadata->set_table_name(identifier.getTableName());
2076
finalizeStatementMessage(*statement, session);
2078
finalizeTransactionMessage(*transaction, session);
2086
table_metadata->set_schema_name(schema_name);
2087
table_metadata->set_table_name(table_name);
2089
finalizeStatementMessage(*statement, in_session);
2091
finalizeTransactionMessage(*transaction, in_session);
2080
(void) replication_services.pushTransactionMessage(session, *transaction);
2093
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2082
cleanupTransactionMessage(transaction, session);
2095
cleanupTransactionMessage(transaction, in_session);
2085
void TransactionServices::truncateTable(Session::reference session,
2098
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2088
2100
ReplicationServices &replication_services= ReplicationServices::singleton();
2089
2101
if (! replication_services.isActive())
2092
if (not table.getShare()->replicate())
2095
message::Transaction *transaction= getActiveTransactionMessage(session);
2104
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
2105
message::Statement *statement= transaction->add_statement();
2098
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2107
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2101
2110
* Construct the specialized TruncateTableStatement message and attach
2105
2114
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2107
2116
string schema_name;
2108
(void) table.getShare()->getSchemaName(schema_name);
2117
(void) in_table->getShare()->getSchemaName(schema_name);
2110
2118
string table_name;
2111
(void) table.getShare()->getTableName(table_name);
2119
(void) in_table->getShare()->getTableName(table_name);
2113
2121
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
2122
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2116
finalizeStatementMessage(*statement, session);
2124
finalizeStatementMessage(*statement, in_session);
2118
finalizeTransactionMessage(*transaction, session);
2126
finalizeTransactionMessage(*transaction, in_session);
2120
(void) replication_services.pushTransactionMessage(session, *transaction);
2128
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2122
cleanupTransactionMessage(transaction, session);
2130
cleanupTransactionMessage(transaction, in_session);
2125
void TransactionServices::rawStatement(Session::reference session,
2126
const string &query)
2133
void TransactionServices::rawStatement(Session *in_session, const string &query)
2128
2135
ReplicationServices &replication_services= ReplicationServices::singleton();
2129
2136
if (! replication_services.isActive())
2132
message::Transaction *transaction= getActiveTransactionMessage(session);
2139
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2133
2140
message::Statement *statement= transaction->add_statement();
2135
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2142
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2136
2143
statement->set_sql(query);
2137
finalizeStatementMessage(*statement, session);
2144
finalizeStatementMessage(*statement, in_session);
2139
finalizeTransactionMessage(*transaction, session);
2146
finalizeTransactionMessage(*transaction, in_session);
2141
(void) replication_services.pushTransactionMessage(session, *transaction);
2148
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2143
cleanupTransactionMessage(transaction, session);
2150
cleanupTransactionMessage(transaction, in_session);
2146
int TransactionServices::sendEvent(Session::reference session,
2147
const message::Event &event)
2153
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2149
2155
ReplicationServices &replication_services= ReplicationServices::singleton();
2150
2156
if (! replication_services.isActive())