47
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
51
#include <drizzled/current_session.h>
52
#include <drizzled/error.h>
53
#include <drizzled/gettext.h>
54
#include <drizzled/probes.h>
55
#include <drizzled/sql_parse.h>
56
#include <drizzled/session.h>
57
#include <drizzled/sql_base.h>
58
#include <drizzled/replication_services.h>
59
#include <drizzled/transaction_services.h>
60
#include <drizzled/transaction_context.h>
61
#include <drizzled/message/transaction.pb.h>
62
#include <drizzled/message/statement_transform.h>
63
#include <drizzled/resource_context.h>
64
#include <drizzled/lock.h>
65
#include <drizzled/item/int.h>
66
#include <drizzled/item/empty_string.h>
67
#include <drizzled/field/epoch.h>
68
#include <drizzled/plugin/client.h>
69
#include <drizzled/plugin/monitored_in_transaction.h>
70
#include <drizzled/plugin/transactional_storage_engine.h>
71
#include <drizzled/plugin/xa_resource_manager.h>
72
#include <drizzled/plugin/xa_storage_engine.h>
73
#include <drizzled/internal/my_sys.h>
76
76
#include <algorithm>
400
400
resource_context->setTransactionalStorageEngine(engine);
401
401
trans->no_2pc|= true;
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
403
if (session.transaction.xid_state.xid.is_null())
404
session.transaction.xid_state.xid.set(session.getQueryId());
406
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session->getResourceContext(monitored, 0)->isStarted())
407
if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session *session,
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
plugin::MonitoredInTransaction *monitored,
413
413
plugin::TransactionalStorageEngine *engine,
414
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
416
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
419
419
if (resource_context->isStarted())
420
420
return; /* already registered, return */
422
session->server_status|= SERVER_STATUS_IN_TRANS;
422
session.server_status|= SERVER_STATUS_IN_TRANS;
424
424
trans->registerResource(resource_context);
453
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
458
if (session->getXaId() == 0)
458
if (session.getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
463
return session->getXaId();
463
return session.getXaId();
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
466
int TransactionServices::commitTransaction(Session::reference session,
467
bool normal_transaction)
482
469
int error= 0, cookie= 0;
484
471
'all' means that this is either an explicit commit issued by
485
472
user, or an implicit commit issued by a DDL.
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
474
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
477
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
493
480
We must not commit the normal transaction if a statement
591
586
if (resource->participatesInXaTransaction())
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
595
590
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
598
593
else if (normal_transaction)
600
session->status_var.ha_commit_count++;
595
session.status_var.ha_commit_count++;
603
598
else if (resource->participatesInSqlTransaction())
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
607
602
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
610
605
else if (normal_transaction)
612
session->status_var.ha_commit_count++;
607
session.status_var.ha_commit_count++;
615
610
resource_context->reset(); /* keep it conveniently zero-filled */
618
613
if (is_real_trans)
619
session->transaction.xid_state.xid.null();
614
session.transaction.xid_state.xid.null();
621
616
if (normal_transaction)
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
635
631
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
640
637
We must not rollback the normal transaction if a statement
641
638
transaction is pending.
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
646
643
if (resource_contexts.empty() == false)
1003
998
* deleting transaction message when done with it.
1005
1000
transaction= new (nothrow) message::Transaction();
1006
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
in_session->setTransactionMessage(transaction);
1001
initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
session.setTransactionMessage(transaction);
1008
1003
return transaction;
1011
1006
return transaction;
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
Session *in_session,
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
Session::reference session,
1016
1011
bool should_inc_trx_id)
1018
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
trx->set_server_id(in_session->getServerId());
1013
message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
trx->set_server_id(session.getServerId());
1021
1016
if (should_inc_trx_id)
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1018
trx->set_transaction_id(getCurrentTransactionId(session));
1023
/* trx and seg id will get set properly elsewhere */
1028
1024
trx->set_transaction_id(0);
1031
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
Session *in_session)
1037
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
Session *in_session)
1044
delete in_transaction;
1045
in_session->setStatementMessage(NULL);
1046
in_session->setTransactionMessage(NULL);
1047
in_session->setXaId(0);
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1027
trx->set_start_timestamp(session.getCurrentTimestamp());
1029
/* segment info may get set elsewhere as needed */
1030
transaction.set_segment_id(1);
1031
transaction.set_end_segment(true);
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
Session::const_reference session)
1037
message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(session.getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
Session::reference session)
1045
session.setStatementMessage(NULL);
1046
session.setTransactionMessage(NULL);
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1052
1052
ReplicationServices &replication_services= ReplicationServices::singleton();
1053
1053
if (! replication_services.isActive())
1079
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, in_session);
1081
cleanupTransactionMessage(transaction, session);
1085
finalizeTransactionMessage(*transaction, in_session);
1085
finalizeTransactionMessage(*transaction, session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
cleanupTransactionMessage(transaction, in_session);
1089
cleanupTransactionMessage(transaction, session);
1091
1091
return static_cast<int>(result);
1094
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
message::Statement::Type in_type,
1096
Session *in_session)
1095
message::Statement::Type type,
1096
Session::const_reference session)
1098
statement.set_type(in_type);
1099
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1098
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1101
if (in_session->variables.replicate_query)
1102
statement.set_sql(in_session->getQueryString()->c_str());
1101
if (session.variables.replicate_query)
1102
statement.set_sql(session.getQueryString()->c_str());
1105
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session *in_session)
1106
Session::reference session)
1108
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
in_session->setStatementMessage(NULL);
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1109
session.setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1114
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
1115
if (! replication_services.isActive())
1118
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1118
message::Transaction *transaction= getActiveTransactionMessage(session);
1121
1121
* OK, so there are two situations that we need to deal with here:
1139
1139
/* Remember the transaction ID so we can re-use it */
1140
1140
uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
uint32_t seg_id= transaction->segment_id();
1143
1144
* Clear the transaction, create a Rollback statement message,
1144
1145
* attach it to the transaction, and push it to replicators.
1146
1147
transaction->Clear();
1147
initTransactionMessage(*transaction, in_session, false);
1148
initTransactionMessage(*transaction, session, false);
1149
1150
/* Set the transaction ID to match the previous messages */
1150
1151
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
transaction->set_segment_id(seg_id);
1153
transaction->set_end_segment(true);
1152
1155
message::Statement *statement= transaction->add_statement();
1154
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
finalizeStatementMessage(*statement, in_session);
1157
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
finalizeStatementMessage(*statement, session);
1157
finalizeTransactionMessage(*transaction, in_session);
1160
finalizeTransactionMessage(*transaction, session);
1159
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1162
(void) replication_services.pushTransactionMessage(session, *transaction);
1161
cleanupTransactionMessage(transaction, in_session);
1165
cleanupTransactionMessage(transaction, session);
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1166
1170
ReplicationServices &replication_services= ReplicationServices::singleton();
1167
1171
if (! replication_services.isActive())
1170
message::Statement *current_statement= in_session->getStatementMessage();
1174
message::Statement *current_statement= session.getStatementMessage();
1172
1176
/* If we never added a Statement message, nothing to undo. */
1173
1177
if (current_statement == NULL)
1222
1226
current_statement= transaction->add_statement();
1223
1227
initStatementMessage(*current_statement,
1224
1228
message::Statement::ROLLBACK_STATEMENT,
1226
finalizeStatementMessage(*current_statement, in_session);
1230
finalizeStatementMessage(*current_statement, session);
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
message::Transaction *transaction)
1237
uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
uint32_t seg_id= transaction->segment_id();
1240
transaction->set_end_segment(false);
1241
commitTransactionMessage(session);
1242
transaction= getActiveTransactionMessage(session, false);
1244
/* Set the transaction ID to match the previous messages */
1245
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
transaction->set_segment_id(seg_id + 1);
1247
transaction->set_end_segment(true);
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1232
1254
uint32_t *next_segment_id)
1234
message::Statement *statement= in_session->getStatementMessage();
1256
message::Statement *statement= session.getStatementMessage();
1235
1257
message::Transaction *transaction= NULL;
1238
* Check the type for the current Statement message, if it is anything
1239
* other then INSERT we need to call finalize, this will ensure a
1240
* new InsertStatement is created. If it is of type INSERT check
1241
* what table the INSERT belongs to, if it is a different table
1242
* call finalize, so a new InsertStatement can be created.
1260
* If statement is NULL, this is a new statement.
1261
* If statement is NOT NULL, this a continuation of the same statement.
1262
* This is because autocommitOrRollback() finalizes the statement so that
1263
* we guarantee only one Statement message per statement (i.e., we no longer
1264
* share a single GPB message for multiple statements).
1244
if (statement != NULL && statement->type() != message::Statement::INSERT)
1246
finalizeStatementMessage(*statement, in_session);
1247
statement= in_session->getStatementMessage();
1249
else if (statement != NULL)
1251
transaction= getActiveTransactionMessage(in_session);
1266
if (statement == NULL)
1268
transaction= getActiveTransactionMessage(session);
1270
if (static_cast<size_t>(transaction->ByteSize()) >=
1271
transaction_message_threshold)
1273
transaction= segmentTransactionMessage(session, transaction);
1276
statement= transaction->add_statement();
1277
setInsertHeader(*statement, session, table);
1278
session.setStatementMessage(statement);
1282
transaction= getActiveTransactionMessage(session);
1254
1285
* If we've passed our threshold for the statement size (possible for
1255
1286
* a bulk insert), we'll finalize the Statement and Transaction (doing
1256
1287
* the Transaction will keep it from getting huge).
1258
1289
if (static_cast<size_t>(transaction->ByteSize()) >=
1259
in_session->variables.transaction_message_threshold)
1290
transaction_message_threshold)
1261
1292
/* Remember the transaction ID so we can re-use it */
1262
1293
uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
uint32_t seg_id= transaction->segment_id();
1264
1296
message::InsertData *current_data= statement->mutable_insert_data();
1266
1298
/* Caller should use this value when adding a new record */
1267
1299
*next_segment_id= current_data->segment_id() + 1;
1269
1301
current_data->set_end_segment(false);
1302
transaction->set_end_segment(false);
1272
1305
* Send the trx message to replicators after finalizing the
1273
1306
* statement and transaction. This will also set the Transaction
1274
1307
* and Statement objects in Session to NULL.
1276
commitTransactionMessage(in_session);
1309
commitTransactionMessage(session);
1279
1312
* Statement and Transaction should now be NULL, so new ones will get
1280
1313
* created. We reuse the transaction id since we are segmenting
1281
1314
* one transaction.
1283
statement= in_session->getStatementMessage();
1284
transaction= getActiveTransactionMessage(in_session, false);
1316
transaction= getActiveTransactionMessage(session, false);
1285
1317
assert(transaction != NULL);
1319
statement= transaction->add_statement();
1320
setInsertHeader(*statement, session, table);
1321
session.setStatementMessage(statement);
1287
1323
/* Set the transaction ID to match the previous messages */
1288
1324
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
transaction->set_segment_id(seg_id + 1);
1326
transaction->set_end_segment(true);
1292
const message::InsertHeader &insert_header= statement->insert_header();
1293
string old_table_name= insert_header.table_metadata().table_name();
1295
string current_table_name;
1296
(void) in_table->getShare()->getTableName(current_table_name);
1298
if (current_table_name.compare(old_table_name))
1300
finalizeStatementMessage(*statement, in_session);
1301
statement= in_session->getStatementMessage();
1305
/* carry forward the existing segment id */
1306
const message::InsertData ¤t_data= statement->insert_data();
1307
*next_segment_id= current_data.segment_id();
1331
* Continuation of the same statement. Carry forward the existing
1334
const message::InsertData ¤t_data= statement->insert_data();
1335
*next_segment_id= current_data.segment_id();
1312
if (statement == NULL)
1315
* Transaction will be non-NULL only if we had to segment it due to
1316
* transaction size above.
1318
if (transaction == NULL)
1319
transaction= getActiveTransactionMessage(in_session);
1322
* Transaction message initialized and set, but no statement created
1323
* yet. We construct one and initialize it, here, then return the
1324
* message after attaching the new Statement message pointer to the
1325
* Session for easy retrieval later...
1327
statement= transaction->add_statement();
1328
setInsertHeader(*statement, in_session, in_table);
1329
in_session->setStatementMessage(statement);
1331
1339
return *statement;
1334
1342
void TransactionServices::setInsertHeader(message::Statement &statement,
1335
Session *in_session,
1343
Session::const_reference session,
1338
initStatementMessage(statement, message::Statement::INSERT, in_session);
1346
initStatementMessage(statement, message::Statement::INSERT, session);
1341
1349
* Now we construct the specialized InsertHeader message inside
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1439
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1428
1441
const unsigned char *old_record,
1429
1442
const unsigned char *new_record,
1430
1443
uint32_t *next_segment_id)
1432
message::Statement *statement= in_session->getStatementMessage();
1445
message::Statement *statement= session.getStatementMessage();
1433
1446
message::Transaction *transaction= NULL;
1436
* Check the type for the current Statement message, if it is anything
1437
* other then UPDATE we need to call finalize, this will ensure a
1438
* new UpdateStatement is created. If it is of type UPDATE check
1439
* what table the UPDATE belongs to, if it is a different table
1440
* call finalize, so a new UpdateStatement can be created.
1449
* If statement is NULL, this is a new statement.
1450
* If statement is NOT NULL, this a continuation of the same statement.
1451
* This is because autocommitOrRollback() finalizes the statement so that
1452
* we guarantee only one Statement message per statement (i.e., we no longer
1453
* share a single GPB message for multiple statements).
1442
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1455
if (statement == NULL)
1444
finalizeStatementMessage(*statement, in_session);
1445
statement= in_session->getStatementMessage();
1457
transaction= getActiveTransactionMessage(session);
1459
if (static_cast<size_t>(transaction->ByteSize()) >=
1460
transaction_message_threshold)
1462
transaction= segmentTransactionMessage(session, transaction);
1465
statement= transaction->add_statement();
1466
setUpdateHeader(*statement, session, table, old_record, new_record);
1467
session.setStatementMessage(statement);
1447
else if (statement != NULL)
1449
transaction= getActiveTransactionMessage(in_session);
1471
transaction= getActiveTransactionMessage(session);
1452
1474
* If we've passed our threshold for the statement size (possible for
1453
1475
* a bulk insert), we'll finalize the Statement and Transaction (doing
1454
1476
* the Transaction will keep it from getting huge).
1456
1478
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
in_session->variables.transaction_message_threshold)
1479
transaction_message_threshold)
1459
1481
/* Remember the transaction ID so we can re-use it */
1460
1482
uint64_t trx_id= transaction->transaction_context().transaction_id();
1483
uint32_t seg_id= transaction->segment_id();
1462
1485
message::UpdateData *current_data= statement->mutable_update_data();
1464
1487
/* Caller should use this value when adding a new record */
1465
1488
*next_segment_id= current_data->segment_id() + 1;
1467
1490
current_data->set_end_segment(false);
1491
transaction->set_end_segment(false);
1470
1494
* Send the trx message to replicators after finalizing the
1471
1495
* statement and transaction. This will also set the Transaction
1472
1496
* and Statement objects in Session to NULL.
1474
commitTransactionMessage(in_session);
1498
commitTransactionMessage(session);
1477
1501
* Statement and Transaction should now be NULL, so new ones will get
1478
1502
* created. We reuse the transaction id since we are segmenting
1479
1503
* one transaction.
1481
statement= in_session->getStatementMessage();
1482
transaction= getActiveTransactionMessage(in_session, false);
1505
transaction= getActiveTransactionMessage(session, false);
1483
1506
assert(transaction != NULL);
1508
statement= transaction->add_statement();
1509
setUpdateHeader(*statement, session, table, old_record, new_record);
1510
session.setStatementMessage(statement);
1485
1512
/* Set the transaction ID to match the previous messages */
1486
1513
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1514
transaction->set_segment_id(seg_id + 1);
1515
transaction->set_end_segment(true);
1490
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1492
/* carry forward the existing segment id */
1493
const message::UpdateData ¤t_data= statement->update_data();
1494
*next_segment_id= current_data.segment_id();
1498
finalizeStatementMessage(*statement, in_session);
1499
statement= in_session->getStatementMessage();
1520
* Continuation of the same statement. Carry forward the existing
1523
const message::UpdateData ¤t_data= statement->update_data();
1524
*next_segment_id= current_data.segment_id();
1504
if (statement == NULL)
1507
* Transaction will be non-NULL only if we had to segment it due to
1508
* transaction size above.
1510
if (transaction == NULL)
1511
transaction= getActiveTransactionMessage(in_session);
1514
* Transaction message initialized and set, but no statement created
1515
* yet. We construct one and initialize it, here, then return the
1516
* message after attaching the new Statement message pointer to the
1517
* Session for easy retrieval later...
1519
statement= transaction->add_statement();
1520
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
in_session->setStatementMessage(statement);
1523
1528
return *statement;
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1528
const unsigned char *old_record,
1529
const unsigned char *new_record)
1531
const message::UpdateHeader &update_header= statement.update_header();
1532
string old_table_name= update_header.table_metadata().table_name();
1534
string current_table_name;
1535
(void) in_table->getShare()->getTableName(current_table_name);
1536
if (current_table_name.compare(old_table_name))
1542
/* Compare the set fields in the existing UpdateHeader and see if they
1543
* match the updated fields in the new record, if they do not we must
1544
* create a new UpdateHeader
1546
size_t num_set_fields= update_header.set_field_metadata_size();
1548
Field *current_field;
1549
Field **table_fields= in_table->getFields();
1550
in_table->setReadSet();
1552
size_t num_calculated_updated_fields= 0;
1554
while ((current_field= *table_fields++) != NULL)
1556
if (num_calculated_updated_fields > num_set_fields)
1561
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1563
/* check that this field exists in the UpdateHeader record */
1566
for (size_t x= 0; x < num_set_fields; ++x)
1568
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
string name= field_metadata.name();
1570
if (name.compare(current_field->field_name) == 0)
1573
++num_calculated_updated_fields;
1584
if ((num_calculated_updated_fields == num_set_fields) && found)
1595
1531
void TransactionServices::setUpdateHeader(message::Statement &statement,
1596
Session *in_session,
1532
Session::const_reference session,
1598
1534
const unsigned char *old_record,
1599
1535
const unsigned char *new_record)
1601
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1537
initStatementMessage(statement, message::Statement::UPDATE, session);
1604
1540
* Now we construct the specialized UpdateHeader message inside
1774
1714
return isUpdated;
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1717
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1779
1719
uint32_t *next_segment_id)
1781
message::Statement *statement= in_session->getStatementMessage();
1721
message::Statement *statement= session.getStatementMessage();
1782
1722
message::Transaction *transaction= NULL;
1785
* Check the type for the current Statement message, if it is anything
1786
* other then DELETE we need to call finalize, this will ensure a
1787
* new DeleteStatement is created. If it is of type DELETE check
1788
* what table the DELETE belongs to, if it is a different table
1789
* call finalize, so a new DeleteStatement can be created.
1725
* If statement is NULL, this is a new statement.
1726
* If statement is NOT NULL, this a continuation of the same statement.
1727
* This is because autocommitOrRollback() finalizes the statement so that
1728
* we guarantee only one Statement message per statement (i.e., we no longer
1729
* share a single GPB message for multiple statements).
1791
if (statement != NULL && statement->type() != message::Statement::DELETE)
1731
if (statement == NULL)
1793
finalizeStatementMessage(*statement, in_session);
1794
statement= in_session->getStatementMessage();
1733
transaction= getActiveTransactionMessage(session);
1735
if (static_cast<size_t>(transaction->ByteSize()) >=
1736
transaction_message_threshold)
1738
transaction= segmentTransactionMessage(session, transaction);
1741
statement= transaction->add_statement();
1742
setDeleteHeader(*statement, session, table);
1743
session.setStatementMessage(statement);
1796
else if (statement != NULL)
1798
transaction= getActiveTransactionMessage(in_session);
1747
transaction= getActiveTransactionMessage(session);
1801
1750
* If we've passed our threshold for the statement size (possible for
1802
1751
* a bulk insert), we'll finalize the Statement and Transaction (doing
1803
1752
* the Transaction will keep it from getting huge).
1805
1754
if (static_cast<size_t>(transaction->ByteSize()) >=
1806
in_session->variables.transaction_message_threshold)
1755
transaction_message_threshold)
1808
1757
/* Remember the transaction ID so we can re-use it */
1809
1758
uint64_t trx_id= transaction->transaction_context().transaction_id();
1759
uint32_t seg_id= transaction->segment_id();
1811
1761
message::DeleteData *current_data= statement->mutable_delete_data();
1813
1763
/* Caller should use this value when adding a new record */
1814
1764
*next_segment_id= current_data->segment_id() + 1;
1816
1766
current_data->set_end_segment(false);
1767
transaction->set_end_segment(false);
1819
1770
* Send the trx message to replicators after finalizing the
1820
1771
* statement and transaction. This will also set the Transaction
1821
1772
* and Statement objects in Session to NULL.
1823
commitTransactionMessage(in_session);
1774
commitTransactionMessage(session);
1826
1777
* Statement and Transaction should now be NULL, so new ones will get
1827
1778
* created. We reuse the transaction id since we are segmenting
1828
1779
* one transaction.
1830
statement= in_session->getStatementMessage();
1831
transaction= getActiveTransactionMessage(in_session, false);
1781
transaction= getActiveTransactionMessage(session, false);
1832
1782
assert(transaction != NULL);
1784
statement= transaction->add_statement();
1785
setDeleteHeader(*statement, session, table);
1786
session.setStatementMessage(statement);
1834
1788
/* Set the transaction ID to match the previous messages */
1835
1789
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1790
transaction->set_segment_id(seg_id + 1);
1791
transaction->set_end_segment(true);
1839
const message::DeleteHeader &delete_header= statement->delete_header();
1840
string old_table_name= delete_header.table_metadata().table_name();
1842
string current_table_name;
1843
(void) in_table->getShare()->getTableName(current_table_name);
1844
if (current_table_name.compare(old_table_name))
1846
finalizeStatementMessage(*statement, in_session);
1847
statement= in_session->getStatementMessage();
1851
/* carry forward the existing segment id */
1852
const message::DeleteData ¤t_data= statement->delete_data();
1853
*next_segment_id= current_data.segment_id();
1796
* Continuation of the same statement. Carry forward the existing
1799
const message::DeleteData ¤t_data= statement->delete_data();
1800
*next_segment_id= current_data.segment_id();
1858
if (statement == NULL)
1861
* Transaction will be non-NULL only if we had to segment it due to
1862
* transaction size above.
1864
if (transaction == NULL)
1865
transaction= getActiveTransactionMessage(in_session);
1868
* Transaction message initialized and set, but no statement created
1869
* yet. We construct one and initialize it, here, then return the
1870
* message after attaching the new Statement message pointer to the
1871
* Session for easy retrieval later...
1873
statement= transaction->add_statement();
1874
setDeleteHeader(*statement, in_session, in_table);
1875
in_session->setStatementMessage(statement);
1877
1804
return *statement;
1880
1807
void TransactionServices::setDeleteHeader(message::Statement &statement,
1881
Session *in_session,
1808
Session::const_reference session,
1884
initStatementMessage(statement, message::Statement::DELETE, in_session);
1811
initStatementMessage(statement, message::Statement::DELETE, session);
1887
1814
* Now we construct the specialized DeleteHeader message inside
1994
1929
message::Table *new_table_message= create_table_statement->mutable_table();
1995
1930
*new_table_message= table;
1997
finalizeStatementMessage(*statement, in_session);
1932
finalizeStatementMessage(*statement, session);
1999
finalizeTransactionMessage(*transaction, in_session);
1934
finalizeTransactionMessage(*transaction, session);
2001
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1936
(void) replication_services.pushTransactionMessage(session, *transaction);
2003
cleanupTransactionMessage(transaction, in_session);
1938
cleanupTransactionMessage(transaction, session);
2007
void TransactionServices::createSchema(Session *in_session,
1942
void TransactionServices::createSchema(Session::reference session,
2008
1943
const message::Schema &schema)
2010
1945
ReplicationServices &replication_services= ReplicationServices::singleton();
2011
1946
if (! replication_services.isActive())
2014
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1949
if (not message::is_replicated(schema))
1952
message::Transaction *transaction= getActiveTransactionMessage(session);
2015
1953
message::Statement *statement= transaction->add_statement();
2017
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1955
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
2020
1958
* Construct the specialized CreateSchemaStatement message and attach
2024
1962
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2025
1963
*new_schema_message= schema;
2027
finalizeStatementMessage(*statement, in_session);
1965
finalizeStatementMessage(*statement, session);
2029
finalizeTransactionMessage(*transaction, in_session);
1967
finalizeTransactionMessage(*transaction, session);
2031
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1969
(void) replication_services.pushTransactionMessage(session, *transaction);
2033
cleanupTransactionMessage(transaction, in_session);
1971
cleanupTransactionMessage(transaction, session);
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1975
void TransactionServices::dropSchema(Session::reference session,
1976
identifier::Schema::const_reference identifier,
1977
message::schema::const_reference schema)
2039
1979
ReplicationServices &replication_services= ReplicationServices::singleton();
2040
if (! replication_services.isActive())
2043
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1980
if (not replication_services.isActive())
1983
if (not message::is_replicated(schema))
1986
message::Transaction *transaction= getActiveTransactionMessage(session);
2044
1987
message::Statement *statement= transaction->add_statement();
2046
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1989
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
2049
1992
* Construct the specialized DropSchemaStatement message and attach
2052
1995
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2054
drop_schema_statement->set_schema_name(schema_name);
2056
finalizeStatementMessage(*statement, in_session);
2058
finalizeTransactionMessage(*transaction, in_session);
2060
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2062
cleanupTransactionMessage(transaction, in_session);
2065
void TransactionServices::dropTable(Session *in_session,
2066
const string &schema_name,
2067
const string &table_name,
1997
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1999
finalizeStatementMessage(*statement, session);
2001
finalizeTransactionMessage(*transaction, session);
2003
(void) replication_services.pushTransactionMessage(session, *transaction);
2005
cleanupTransactionMessage(transaction, session);
2008
void TransactionServices::alterSchema(Session::reference session,
2009
const message::Schema &old_schema,
2010
const message::Schema &new_schema)
2012
ReplicationServices &replication_services= ReplicationServices::singleton();
2013
if (! replication_services.isActive())
2016
if (not message::is_replicated(old_schema))
2019
message::Transaction *transaction= getActiveTransactionMessage(session);
2020
message::Statement *statement= transaction->add_statement();
2022
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2025
* Construct the specialized AlterSchemaStatement message and attach
2026
* it to the generic Statement message
2028
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2030
message::Schema *before= alter_schema_statement->mutable_before();
2031
message::Schema *after= alter_schema_statement->mutable_after();
2033
*before= old_schema;
2036
finalizeStatementMessage(*statement, session);
2038
finalizeTransactionMessage(*transaction, session);
2040
(void) replication_services.pushTransactionMessage(session, *transaction);
2042
cleanupTransactionMessage(transaction, session);
2045
void TransactionServices::dropTable(Session::reference session,
2046
identifier::Table::const_reference identifier,
2047
message::table::const_reference table,
2068
2048
bool if_exists)
2070
2050
ReplicationServices &replication_services= ReplicationServices::singleton();
2071
2051
if (! replication_services.isActive())
2074
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2054
if (not message::is_replicated(table))
2057
message::Transaction *transaction= getActiveTransactionMessage(session);
2075
2058
message::Statement *statement= transaction->add_statement();
2077
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2060
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2080
2063
* Construct the specialized DropTableStatement message and attach
2087
2070
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2089
table_metadata->set_schema_name(schema_name);
2090
table_metadata->set_table_name(table_name);
2092
finalizeStatementMessage(*statement, in_session);
2094
finalizeTransactionMessage(*transaction, in_session);
2072
table_metadata->set_schema_name(identifier.getSchemaName());
2073
table_metadata->set_table_name(identifier.getTableName());
2075
finalizeStatementMessage(*statement, session);
2077
finalizeTransactionMessage(*transaction, session);
2096
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2079
(void) replication_services.pushTransactionMessage(session, *transaction);
2098
cleanupTransactionMessage(transaction, in_session);
2081
cleanupTransactionMessage(transaction, session);
2101
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2084
void TransactionServices::truncateTable(Session::reference session,
2103
2087
ReplicationServices &replication_services= ReplicationServices::singleton();
2104
2088
if (! replication_services.isActive())
2107
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2091
if (not table.getShare()->is_replicated())
2094
message::Transaction *transaction= getActiveTransactionMessage(session);
2108
2095
message::Statement *statement= transaction->add_statement();
2110
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2097
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2113
2100
* Construct the specialized TruncateTableStatement message and attach
2117
2104
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2119
2106
string schema_name;
2120
(void) in_table->getShare()->getSchemaName(schema_name);
2107
(void) table.getShare()->getSchemaName(schema_name);
2121
2109
string table_name;
2122
(void) in_table->getShare()->getTableName(table_name);
2110
(void) table.getShare()->getTableName(table_name);
2124
2112
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2125
2113
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2127
finalizeStatementMessage(*statement, in_session);
2115
finalizeStatementMessage(*statement, session);
2129
finalizeTransactionMessage(*transaction, in_session);
2117
finalizeTransactionMessage(*transaction, session);
2131
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2119
(void) replication_services.pushTransactionMessage(session, *transaction);
2133
cleanupTransactionMessage(transaction, in_session);
2121
cleanupTransactionMessage(transaction, session);
2136
void TransactionServices::rawStatement(Session *in_session, const string &query)
2124
void TransactionServices::rawStatement(Session::reference session,
2125
const string &query,
2126
const string &schema)
2138
2128
ReplicationServices &replication_services= ReplicationServices::singleton();
2139
2129
if (! replication_services.isActive())
2142
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2132
message::Transaction *transaction= getActiveTransactionMessage(session);
2143
2133
message::Statement *statement= transaction->add_statement();
2145
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2135
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2146
2136
statement->set_sql(query);
2147
finalizeStatementMessage(*statement, in_session);
2137
if (not schema.empty())
2138
statement->set_raw_sql_schema(schema);
2139
finalizeStatementMessage(*statement, session);
2149
finalizeTransactionMessage(*transaction, in_session);
2141
finalizeTransactionMessage(*transaction, session);
2151
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2143
(void) replication_services.pushTransactionMessage(session, *transaction);
2153
cleanupTransactionMessage(transaction, in_session);
2145
cleanupTransactionMessage(transaction, session);
2156
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2148
int TransactionServices::sendEvent(Session::reference session,
2149
const message::Event &event)
2158
2151
ReplicationServices &replication_services= ReplicationServices::singleton();
2159
2152
if (! replication_services.isActive())