47
47
* plugins can understand.
51
#include <drizzled/current_session.h>
52
#include <drizzled/my_hash.h>
53
#include <drizzled/error.h>
54
#include <drizzled/gettext.h>
55
#include <drizzled/probes.h>
56
#include <drizzled/sql_parse.h>
57
#include <drizzled/session.h>
58
#include <drizzled/sql_base.h>
59
#include <drizzled/replication_services.h>
60
#include <drizzled/transaction_services.h>
61
#include <drizzled/transaction_context.h>
62
#include <drizzled/message/transaction.pb.h>
63
#include <drizzled/message/statement_transform.h>
64
#include <drizzled/resource_context.h>
65
#include <drizzled/lock.h>
66
#include <drizzled/item/int.h>
67
#include <drizzled/item/empty_string.h>
68
#include <drizzled/field/epoch.h>
69
#include <drizzled/plugin/client.h>
70
#include <drizzled/plugin/monitored_in_transaction.h>
71
#include <drizzled/plugin/transactional_storage_engine.h>
72
#include <drizzled/plugin/xa_resource_manager.h>
73
#include <drizzled/plugin/xa_storage_engine.h>
74
#include <drizzled/internal/my_sys.h>
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/internal/my_sys.h"
77
77
#include <algorithm>
78
78
#include <functional>
79
#include <google/protobuf/repeated_field.h>
82
using namespace google;
431
416
resource_context->setTransactionalStorageEngine(engine);
432
417
trans->no_2pc|= true;
434
if (session.transaction.xid_state.xid.is_null())
435
session.transaction.xid_state.xid.set(session.getQueryId());
419
if (session->transaction.xid_state.xid.is_null())
420
session->transaction.xid_state.xid.set(session->getQueryId());
437
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
422
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
439
424
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
if (! session.getResourceContext(monitored, 0)->isStarted())
425
if (! session->getResourceContext(monitored, 0)->isStarted())
441
426
registerResourceForStatement(session, monitored, engine, resource_manager);
444
void TransactionServices::allocateNewTransactionId()
446
ReplicationServices &replication_services= ReplicationServices::singleton();
447
if (! replication_services.isActive())
452
Session *my_session= current_session;
453
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
454
my_session->setXaId(xa_id);
457
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
459
if (session.getXaId() == 0)
461
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
464
return session.getXaId();
467
int TransactionServices::commitTransaction(Session::reference session,
468
bool normal_transaction)
433
1 transaction was rolled back
435
2 error during commit, data may be inconsistent
438
Since we don't support nested statement transactions in 5.0,
439
we can't commit or rollback stmt transactions while we are inside
440
stored functions or triggers. So we simply do nothing now.
441
TODO: This should be fixed in later ( >= 5.1) releases.
443
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
470
445
int error= 0, cookie= 0;
472
447
'all' means that this is either an explicit commit issued by
473
448
user, or an implicit commit issued by a DDL.
475
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
450
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
476
451
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
478
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
453
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
481
456
We must not commit the normal transaction if a statement
587
554
if (resource->participatesInXaTransaction())
589
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
556
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
591
558
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
594
561
else if (normal_transaction)
596
session.status_var.ha_commit_count++;
563
status_var_increment(session->status_var.ha_commit_count);
599
566
else if (resource->participatesInSqlTransaction())
601
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
568
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
603
570
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
606
573
else if (normal_transaction)
608
session.status_var.ha_commit_count++;
575
status_var_increment(session->status_var.ha_commit_count);
611
578
resource_context->reset(); /* keep it conveniently zero-filled */
614
581
if (is_real_trans)
615
session.transaction.xid_state.xid.null();
582
session->transaction.xid_state.xid.null();
617
584
if (normal_transaction)
619
session.variables.tx_isolation= session.session_tx_isolation;
620
session.transaction.cleanup();
586
session->variables.tx_isolation= session->session_tx_isolation;
587
session->transaction.cleanup();
627
int TransactionServices::rollbackTransaction(Session::reference session,
628
bool normal_transaction)
594
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
631
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
597
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
632
598
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
634
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
600
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
638
603
We must not rollback the normal transaction if a statement
639
604
transaction is pending.
641
assert(session.transaction.stmt.getResourceContexts().empty() ||
642
trans == &session.transaction.stmt);
606
assert(session->transaction.stmt.getResourceContexts().empty() ||
607
trans == &session->transaction.stmt);
644
609
if (resource_contexts.empty() == false)
999
907
* deleting transaction message when done with it.
1001
909
transaction= new (nothrow) message::Transaction();
1002
initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
session.setTransactionMessage(transaction);
1010
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
Session::reference session,
1012
bool should_inc_trx_id)
1014
message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
trx->set_server_id(session.getServerId());
1017
if (should_inc_trx_id)
1019
trx->set_transaction_id(getCurrentTransactionId(session));
1024
/* trx and seg id will get set properly elsewhere */
1025
trx->set_transaction_id(0);
1028
trx->set_start_timestamp(session.getCurrentTimestamp());
1030
/* segment info may get set elsewhere as needed */
1031
transaction.set_segment_id(1);
1032
transaction.set_end_segment(true);
1035
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
Session::const_reference session)
1038
message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
trx->set_end_timestamp(session.getCurrentTimestamp());
1042
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
Session::reference session)
1046
session.setStatementMessage(NULL);
1047
session.setTransactionMessage(NULL);
1051
int TransactionServices::commitTransactionMessage(Session::reference session)
910
initTransactionMessage(*transaction, in_session);
911
in_session->setTransactionMessage(transaction);
918
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
921
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
922
trx->set_server_id(in_session->getServerId());
923
trx->set_transaction_id(getNextTransactionId());
924
trx->set_start_timestamp(in_session->getCurrentTimestamp());
927
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
930
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
931
trx->set_end_timestamp(in_session->getCurrentTimestamp());
934
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
937
delete in_transaction;
938
in_session->setStatementMessage(NULL);
939
in_session->setTransactionMessage(NULL);
942
int TransactionServices::commitTransactionMessage(Session *in_session)
1053
944
ReplicationServices &replication_services= ReplicationServices::singleton();
1054
945
if (! replication_services.isActive())
1058
* If no Transaction message was ever created, then no data modification
1059
* occurred inside the transaction, so nothing to do.
1061
if (session.getTransactionMessage() == NULL)
1064
/* If there is an active statement message, finalize it. */
1065
message::Statement *statement= session.getStatementMessage();
948
/* If there is an active statement message, finalize it */
949
message::Statement *statement= in_session->getStatementMessage();
1067
951
if (statement != NULL)
1069
finalizeStatementMessage(*statement, session);
1072
message::Transaction* transaction= getActiveTransactionMessage(session);
1075
* It is possible that we could have a Transaction without any Statements
1076
* if we had created a Statement but had to roll it back due to it failing
1077
* mid-execution, and no subsequent Statements were added to the Transaction
1078
* message. In this case, we simply clean up the message and not push it.
1080
if (transaction->statement_size() == 0)
1082
cleanupTransactionMessage(transaction, session);
1086
finalizeTransactionMessage(*transaction, session);
1088
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1090
cleanupTransactionMessage(transaction, session);
953
finalizeStatementMessage(*statement, in_session);
956
return 0; /* No data modification occurred inside the transaction */
958
message::Transaction* transaction= getActiveTransactionMessage(in_session);
960
finalizeTransactionMessage(*transaction, in_session);
962
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
964
cleanupTransactionMessage(transaction, in_session);
1092
966
return static_cast<int>(result);
1095
969
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
message::Statement::Type type,
1097
Session::const_reference session)
970
message::Statement::Type in_type,
1099
statement.set_type(type);
1100
statement.set_start_timestamp(session.getCurrentTimestamp());
1102
if (session.variables.replicate_query)
1103
statement.set_sql(session.getQueryString()->c_str());
973
statement.set_type(in_type);
974
statement.set_start_timestamp(in_session->getCurrentTimestamp());
975
/** @TODO Set sql string optionally */
1106
978
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
Session::reference session)
1109
statement.set_end_timestamp(session.getCurrentTimestamp());
1110
session.setStatementMessage(NULL);
981
statement.set_end_timestamp(in_session->getCurrentTimestamp());
982
in_session->setStatementMessage(NULL);
1113
void TransactionServices::rollbackTransactionMessage(Session::reference session)
985
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1115
987
ReplicationServices &replication_services= ReplicationServices::singleton();
1116
988
if (! replication_services.isActive())
1119
message::Transaction *transaction= getActiveTransactionMessage(session);
991
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1122
994
* OK, so there are two situations that we need to deal with here:
1138
1010
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1140
/* Remember the transaction ID so we can re-use it */
1141
uint64_t trx_id= transaction->transaction_context().transaction_id();
1142
uint32_t seg_id= transaction->segment_id();
1145
1013
* Clear the transaction, create a Rollback statement message,
1146
1014
* attach it to the transaction, and push it to replicators.
1148
1016
transaction->Clear();
1149
initTransactionMessage(*transaction, session, false);
1151
/* Set the transaction ID to match the previous messages */
1152
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1153
transaction->set_segment_id(seg_id);
1154
transaction->set_end_segment(true);
1017
initTransactionMessage(*transaction, in_session);
1156
1019
message::Statement *statement= transaction->add_statement();
1158
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
finalizeStatementMessage(*statement, session);
1021
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1022
finalizeStatementMessage(*statement, in_session);
1161
finalizeTransactionMessage(*transaction, session);
1024
finalizeTransactionMessage(*transaction, in_session);
1163
(void) replication_services.pushTransactionMessage(session, *transaction);
1166
cleanupTransactionMessage(transaction, session);
1169
void TransactionServices::rollbackStatementMessage(Session::reference session)
1171
ReplicationServices &replication_services= ReplicationServices::singleton();
1172
if (! replication_services.isActive())
1175
message::Statement *current_statement= session.getStatementMessage();
1177
/* If we never added a Statement message, nothing to undo. */
1178
if (current_statement == NULL)
1182
* If the Statement has been segmented, then we've already pushed a portion
1183
* of this Statement's row changes through the replication stream and we
1184
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1185
* delete the current Statement message.
1187
bool is_segmented= false;
1189
switch (current_statement->type())
1191
case message::Statement::INSERT:
1192
if (current_statement->insert_data().segment_id() > 1)
1196
case message::Statement::UPDATE:
1197
if (current_statement->update_data().segment_id() > 1)
1201
case message::Statement::DELETE:
1202
if (current_statement->delete_data().segment_id() > 1)
1211
* Remove the Statement message we've been working with (same as
1212
* current_statement).
1214
message::Transaction *transaction= getActiveTransactionMessage(session);
1215
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
statements_in_txn= transaction->mutable_statement();
1217
statements_in_txn->RemoveLast();
1218
session.setStatementMessage(NULL);
1221
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1222
* an indicator to cancel the previous Statement message which should have
1223
* had its end_segment attribute set to false.
1227
current_statement= transaction->add_statement();
1228
initStatementMessage(*current_statement,
1229
message::Statement::ROLLBACK_STATEMENT,
1231
finalizeStatementMessage(*current_statement, session);
1235
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1236
message::Transaction *transaction)
1238
uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
uint32_t seg_id= transaction->segment_id();
1241
transaction->set_end_segment(false);
1242
commitTransactionMessage(session);
1243
transaction= getActiveTransactionMessage(session, false);
1245
/* Set the transaction ID to match the previous messages */
1246
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1247
transaction->set_segment_id(seg_id + 1);
1248
transaction->set_end_segment(true);
1253
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1255
uint32_t *next_segment_id)
1257
message::Statement *statement= session.getStatementMessage();
1258
message::Transaction *transaction= NULL;
1261
* If statement is NULL, this is a new statement.
1262
* If statement is NOT NULL, this a continuation of the same statement.
1263
* This is because autocommitOrRollback() finalizes the statement so that
1264
* we guarantee only one Statement message per statement (i.e., we no longer
1265
* share a single GPB message for multiple statements).
1026
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1028
cleanupTransactionMessage(transaction, in_session);
1031
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1034
message::Statement *statement= in_session->getStatementMessage();
1037
* Check the type for the current Statement message, if it is anything
1038
* other then INSERT we need to call finalize, this will ensure a
1039
* new InsertStatement is created. If it is of type INSERT check
1040
* what table the INSERT belongs to, if it is a different table
1041
* call finalize, so a new InsertStatement can be created.
1043
if (statement != NULL && statement->type() != message::Statement::INSERT)
1045
finalizeStatementMessage(*statement, in_session);
1046
statement= in_session->getStatementMessage();
1048
else if (statement != NULL)
1050
const message::InsertHeader &insert_header= statement->insert_header();
1051
string old_table_name= insert_header.table_metadata().table_name();
1053
string current_table_name;
1054
(void) in_table->getShare()->getTableName(current_table_name);
1055
if (current_table_name.compare(old_table_name))
1057
finalizeStatementMessage(*statement, in_session);
1058
statement= in_session->getStatementMessage();
1267
1062
if (statement == NULL)
1269
transaction= getActiveTransactionMessage(session);
1271
if (static_cast<size_t>(transaction->ByteSize()) >=
1272
transaction_message_threshold)
1274
transaction= segmentTransactionMessage(session, transaction);
1064
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1066
* Transaction message initialized and set, but no statement created
1067
* yet. We construct one and initialize it, here, then return the
1068
* message after attaching the new Statement message pointer to the
1069
* Session for easy retrieval later...
1277
1071
statement= transaction->add_statement();
1278
setInsertHeader(*statement, session, table);
1279
session.setStatementMessage(statement);
1283
transaction= getActiveTransactionMessage(session);
1286
* If we've passed our threshold for the statement size (possible for
1287
* a bulk insert), we'll finalize the Statement and Transaction (doing
1288
* the Transaction will keep it from getting huge).
1290
if (static_cast<size_t>(transaction->ByteSize()) >=
1291
transaction_message_threshold)
1293
/* Remember the transaction ID so we can re-use it */
1294
uint64_t trx_id= transaction->transaction_context().transaction_id();
1295
uint32_t seg_id= transaction->segment_id();
1297
message::InsertData *current_data= statement->mutable_insert_data();
1299
/* Caller should use this value when adding a new record */
1300
*next_segment_id= current_data->segment_id() + 1;
1302
current_data->set_end_segment(false);
1303
transaction->set_end_segment(false);
1306
* Send the trx message to replicators after finalizing the
1307
* statement and transaction. This will also set the Transaction
1308
* and Statement objects in Session to NULL.
1310
commitTransactionMessage(session);
1313
* Statement and Transaction should now be NULL, so new ones will get
1314
* created. We reuse the transaction id since we are segmenting
1317
transaction= getActiveTransactionMessage(session, false);
1318
assert(transaction != NULL);
1320
statement= transaction->add_statement();
1321
setInsertHeader(*statement, session, table);
1322
session.setStatementMessage(statement);
1324
/* Set the transaction ID to match the previous messages */
1325
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1326
transaction->set_segment_id(seg_id + 1);
1327
transaction->set_end_segment(true);
1332
* Continuation of the same statement. Carry forward the existing
1335
const message::InsertData ¤t_data= statement->insert_data();
1336
*next_segment_id= current_data.segment_id();
1072
setInsertHeader(*statement, in_session, in_table);
1073
in_session->setStatementMessage(statement);
1340
1075
return *statement;
1343
1078
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
Session::const_reference session,
1079
Session *in_session,
1347
initStatementMessage(statement, message::Statement::INSERT, session);
1082
initStatementMessage(statement, message::Statement::INSERT, in_session);
1350
1085
* Now we construct the specialized InsertHeader message inside
1440
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1169
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1442
1171
const unsigned char *old_record,
1443
const unsigned char *new_record,
1444
uint32_t *next_segment_id)
1172
const unsigned char *new_record)
1446
message::Statement *statement= session.getStatementMessage();
1447
message::Transaction *transaction= NULL;
1174
message::Statement *statement= in_session->getStatementMessage();
1450
* If statement is NULL, this is a new statement.
1451
* If statement is NOT NULL, this a continuation of the same statement.
1452
* This is because autocommitOrRollback() finalizes the statement so that
1453
* we guarantee only one Statement message per statement (i.e., we no longer
1454
* share a single GPB message for multiple statements).
1177
* Check the type for the current Statement message, if it is anything
1178
* other then UPDATE we need to call finalize, this will ensure a
1179
* new UpdateStatement is created. If it is of type UPDATE check
1180
* what table the UPDATE belongs to, if it is a different table
1181
* call finalize, so a new UpdateStatement can be created.
1183
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1185
finalizeStatementMessage(*statement, in_session);
1186
statement= in_session->getStatementMessage();
1188
else if (statement != NULL)
1190
const message::UpdateHeader &update_header= statement->update_header();
1191
string old_table_name= update_header.table_metadata().table_name();
1193
string current_table_name;
1194
(void) in_table->getShare()->getTableName(current_table_name);
1195
if (current_table_name.compare(old_table_name))
1197
finalizeStatementMessage(*statement, in_session);
1198
statement= in_session->getStatementMessage();
1456
1202
if (statement == NULL)
1458
transaction= getActiveTransactionMessage(session);
1460
if (static_cast<size_t>(transaction->ByteSize()) >=
1461
transaction_message_threshold)
1463
transaction= segmentTransactionMessage(session, transaction);
1204
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1206
* Transaction message initialized and set, but no statement created
1207
* yet. We construct one and initialize it, here, then return the
1208
* message after attaching the new Statement message pointer to the
1209
* Session for easy retrieval later...
1466
1211
statement= transaction->add_statement();
1467
setUpdateHeader(*statement, session, table, old_record, new_record);
1468
session.setStatementMessage(statement);
1472
transaction= getActiveTransactionMessage(session);
1475
* If we've passed our threshold for the statement size (possible for
1476
* a bulk insert), we'll finalize the Statement and Transaction (doing
1477
* the Transaction will keep it from getting huge).
1479
if (static_cast<size_t>(transaction->ByteSize()) >=
1480
transaction_message_threshold)
1482
/* Remember the transaction ID so we can re-use it */
1483
uint64_t trx_id= transaction->transaction_context().transaction_id();
1484
uint32_t seg_id= transaction->segment_id();
1486
message::UpdateData *current_data= statement->mutable_update_data();
1488
/* Caller should use this value when adding a new record */
1489
*next_segment_id= current_data->segment_id() + 1;
1491
current_data->set_end_segment(false);
1492
transaction->set_end_segment(false);
1495
* Send the trx message to replicators after finalizing the
1496
* statement and transaction. This will also set the Transaction
1497
* and Statement objects in Session to NULL.
1499
commitTransactionMessage(session);
1502
* Statement and Transaction should now be NULL, so new ones will get
1503
* created. We reuse the transaction id since we are segmenting
1506
transaction= getActiveTransactionMessage(session, false);
1507
assert(transaction != NULL);
1509
statement= transaction->add_statement();
1510
setUpdateHeader(*statement, session, table, old_record, new_record);
1511
session.setStatementMessage(statement);
1513
/* Set the transaction ID to match the previous messages */
1514
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1515
transaction->set_segment_id(seg_id + 1);
1516
transaction->set_end_segment(true);
1521
* Continuation of the same statement. Carry forward the existing
1524
const message::UpdateData ¤t_data= statement->update_data();
1525
*next_segment_id= current_data.segment_id();
1212
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1213
in_session->setStatementMessage(statement);
1529
1215
return *statement;
1532
1218
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
Session::const_reference session,
1219
Session *in_session,
1535
1221
const unsigned char *old_record,
1536
1222
const unsigned char *new_record)
1538
initStatementMessage(statement, message::Statement::UPDATE, session);
1224
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1541
1227
* Now we construct the specialized UpdateHeader message inside
1677
bool TransactionServices::isFieldUpdated(Field *current_field,
1679
const unsigned char *old_record,
1680
const unsigned char *new_record)
1683
* The below really should be moved into the Field API and Record API. But for now
1684
* we do this crazy pointer fiddling to figure out if the current field
1685
* has been updated in the supplied record raw byte pointers.
1687
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1690
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1692
bool old_value_is_null= current_field->is_null_in_record(old_record);
1693
bool new_value_is_null= current_field->is_null_in_record(new_record);
1695
bool isUpdated= false;
1696
if (old_value_is_null != new_value_is_null)
1698
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1702
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1710
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1718
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1720
uint32_t *next_segment_id)
1722
message::Statement *statement= session.getStatementMessage();
1723
message::Transaction *transaction= NULL;
1726
* If statement is NULL, this is a new statement.
1727
* If statement is NOT NULL, this a continuation of the same statement.
1728
* This is because autocommitOrRollback() finalizes the statement so that
1729
* we guarantee only one Statement message per statement (i.e., we no longer
1730
* share a single GPB message for multiple statements).
1377
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1380
message::Statement *statement= in_session->getStatementMessage();
1383
* Check the type for the current Statement message, if it is anything
1384
* other then DELETE we need to call finalize, this will ensure a
1385
* new DeleteStatement is created. If it is of type DELETE check
1386
* what table the DELETE belongs to, if it is a different table
1387
* call finalize, so a new DeleteStatement can be created.
1389
if (statement != NULL && statement->type() != message::Statement::DELETE)
1391
finalizeStatementMessage(*statement, in_session);
1392
statement= in_session->getStatementMessage();
1394
else if (statement != NULL)
1396
const message::DeleteHeader &delete_header= statement->delete_header();
1397
string old_table_name= delete_header.table_metadata().table_name();
1399
string current_table_name;
1400
(void) in_table->getShare()->getTableName(current_table_name);
1401
if (current_table_name.compare(old_table_name))
1403
finalizeStatementMessage(*statement, in_session);
1404
statement= in_session->getStatementMessage();
1732
1408
if (statement == NULL)
1734
transaction= getActiveTransactionMessage(session);
1736
if (static_cast<size_t>(transaction->ByteSize()) >=
1737
transaction_message_threshold)
1739
transaction= segmentTransactionMessage(session, transaction);
1410
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1412
* Transaction message initialized and set, but no statement created
1413
* yet. We construct one and initialize it, here, then return the
1414
* message after attaching the new Statement message pointer to the
1415
* Session for easy retrieval later...
1742
1417
statement= transaction->add_statement();
1743
setDeleteHeader(*statement, session, table);
1744
session.setStatementMessage(statement);
1748
transaction= getActiveTransactionMessage(session);
1751
* If we've passed our threshold for the statement size (possible for
1752
* a bulk insert), we'll finalize the Statement and Transaction (doing
1753
* the Transaction will keep it from getting huge).
1755
if (static_cast<size_t>(transaction->ByteSize()) >=
1756
transaction_message_threshold)
1758
/* Remember the transaction ID so we can re-use it */
1759
uint64_t trx_id= transaction->transaction_context().transaction_id();
1760
uint32_t seg_id= transaction->segment_id();
1762
message::DeleteData *current_data= statement->mutable_delete_data();
1764
/* Caller should use this value when adding a new record */
1765
*next_segment_id= current_data->segment_id() + 1;
1767
current_data->set_end_segment(false);
1768
transaction->set_end_segment(false);
1771
* Send the trx message to replicators after finalizing the
1772
* statement and transaction. This will also set the Transaction
1773
* and Statement objects in Session to NULL.
1775
commitTransactionMessage(session);
1778
* Statement and Transaction should now be NULL, so new ones will get
1779
* created. We reuse the transaction id since we are segmenting
1782
transaction= getActiveTransactionMessage(session, false);
1783
assert(transaction != NULL);
1785
statement= transaction->add_statement();
1786
setDeleteHeader(*statement, session, table);
1787
session.setStatementMessage(statement);
1789
/* Set the transaction ID to match the previous messages */
1790
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1791
transaction->set_segment_id(seg_id + 1);
1792
transaction->set_end_segment(true);
1797
* Continuation of the same statement. Carry forward the existing
1800
const message::DeleteData ¤t_data= statement->delete_data();
1801
*next_segment_id= current_data.segment_id();
1418
setDeleteHeader(*statement, in_session, in_table);
1419
in_session->setStatementMessage(statement);
1805
1421
return *statement;
1808
1424
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
Session::const_reference session,
1425
Session *in_session,
1812
initStatementMessage(statement, message::Statement::DELETE, session);
1428
initStatementMessage(statement, message::Statement::DELETE, in_session);
1815
1431
* Now we construct the specialized DeleteHeader message inside
1878
1488
* primary key field value. Replication only supports tables
1879
1489
* with a primary key.
1881
if (table.getShare()->fieldInPrimaryKey(current_field))
1491
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1883
if (use_update_record)
1886
* Temporarily point to the update record to get its value.
1887
* This is pretty much a hack in order to get the PK value from
1888
* the update record rather than the insert record. Field::val_str()
1889
* should not change anything in Field::ptr, so this should be safe.
1890
* We are careful not to change anything in old_ptr.
1892
const unsigned char *old_ptr= current_field->ptr;
1893
current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1894
string_value= current_field->val_str_internal(string_value);
1895
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1899
string_value= current_field->val_str_internal(string_value);
1901
* @TODO Store optional old record value in the before data member
1493
string_value= current_field->val_str(string_value);
1904
1494
record->add_key_value(string_value->c_ptr(), string_value->length());
1496
* @TODO Store optional old record value in the before data member
1905
1498
string_value->free();
1910
void TransactionServices::createTable(Session::reference session,
1503
void TransactionServices::createTable(Session *in_session,
1911
1504
const message::Table &table)
1913
1506
ReplicationServices &replication_services= ReplicationServices::singleton();
1914
1507
if (! replication_services.isActive())
1917
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1920
message::Transaction *transaction= getActiveTransactionMessage(session);
1510
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1921
1511
message::Statement *statement= transaction->add_statement();
1923
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1513
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1926
1516
* Construct the specialized CreateTableStatement message and attach
1996
1578
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1998
drop_schema_statement->set_schema_name(identifier.getSchemaName());
2000
finalizeStatementMessage(*statement, session);
2002
finalizeTransactionMessage(*transaction, session);
2004
(void) replication_services.pushTransactionMessage(session, *transaction);
2006
cleanupTransactionMessage(transaction, session);
2009
void TransactionServices::alterSchema(Session::reference session,
2010
const message::Schema &old_schema,
2011
const message::Schema &new_schema)
2013
ReplicationServices &replication_services= ReplicationServices::singleton();
2014
if (! replication_services.isActive())
2017
if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2020
message::Transaction *transaction= getActiveTransactionMessage(session);
2021
message::Statement *statement= transaction->add_statement();
2023
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2026
* Construct the specialized AlterSchemaStatement message and attach
2027
* it to the generic Statement message
2029
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2031
message::Schema *before= alter_schema_statement->mutable_before();
2032
message::Schema *after= alter_schema_statement->mutable_after();
2034
*before= old_schema;
2037
finalizeStatementMessage(*statement, session);
2039
finalizeTransactionMessage(*transaction, session);
2041
(void) replication_services.pushTransactionMessage(session, *transaction);
2043
cleanupTransactionMessage(transaction, session);
2046
void TransactionServices::dropTable(Session::reference session,
2047
identifier::Table::const_reference identifier,
2048
message::table::const_reference table,
1580
drop_schema_statement->set_schema_name(schema_name);
1582
finalizeStatementMessage(*statement, in_session);
1584
finalizeTransactionMessage(*transaction, in_session);
1586
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1588
cleanupTransactionMessage(transaction, in_session);
1591
void TransactionServices::dropTable(Session *in_session,
1592
const string &schema_name,
1593
const string &table_name,
2049
1594
bool if_exists)
2051
1596
ReplicationServices &replication_services= ReplicationServices::singleton();
2052
1597
if (! replication_services.isActive())
2055
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2058
message::Transaction *transaction= getActiveTransactionMessage(session);
1600
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2059
1601
message::Statement *statement= transaction->add_statement();
2061
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
1603
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2064
1606
* Construct the specialized DropTableStatement message and attach
2071
1613
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2073
table_metadata->set_schema_name(identifier.getSchemaName());
2074
table_metadata->set_table_name(identifier.getTableName());
2076
finalizeStatementMessage(*statement, session);
2078
finalizeTransactionMessage(*transaction, session);
1615
table_metadata->set_schema_name(schema_name);
1616
table_metadata->set_table_name(table_name);
1618
finalizeStatementMessage(*statement, in_session);
1620
finalizeTransactionMessage(*transaction, in_session);
2080
(void) replication_services.pushTransactionMessage(session, *transaction);
1622
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2082
cleanupTransactionMessage(transaction, session);
1624
cleanupTransactionMessage(transaction, in_session);
2085
void TransactionServices::truncateTable(Session::reference session,
1627
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2088
1629
ReplicationServices &replication_services= ReplicationServices::singleton();
2089
1630
if (! replication_services.isActive())
2092
if (not table.getShare()->replicate())
2095
message::Transaction *transaction= getActiveTransactionMessage(session);
1633
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
1634
message::Statement *statement= transaction->add_statement();
2098
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
1636
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2101
1639
* Construct the specialized TruncateTableStatement message and attach
2105
1643
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2107
1645
string schema_name;
2108
(void) table.getShare()->getSchemaName(schema_name);
1646
(void) in_table->getShare()->getSchemaName(schema_name);
2110
1647
string table_name;
2111
(void) table.getShare()->getTableName(table_name);
1648
(void) in_table->getShare()->getTableName(table_name);
2113
1650
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
1651
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2116
finalizeStatementMessage(*statement, session);
1653
finalizeStatementMessage(*statement, in_session);
2118
finalizeTransactionMessage(*transaction, session);
1655
finalizeTransactionMessage(*transaction, in_session);
2120
(void) replication_services.pushTransactionMessage(session, *transaction);
1657
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2122
cleanupTransactionMessage(transaction, session);
1659
cleanupTransactionMessage(transaction, in_session);
2125
void TransactionServices::rawStatement(Session::reference session,
2126
const string &query)
1662
void TransactionServices::rawStatement(Session *in_session, const string &query)
2128
1664
ReplicationServices &replication_services= ReplicationServices::singleton();
2129
1665
if (! replication_services.isActive())
2132
message::Transaction *transaction= getActiveTransactionMessage(session);
1668
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2133
1669
message::Statement *statement= transaction->add_statement();
2135
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
1671
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2136
1672
statement->set_sql(query);
2137
finalizeStatementMessage(*statement, session);
1673
finalizeStatementMessage(*statement, in_session);
2139
finalizeTransactionMessage(*transaction, session);
1675
finalizeTransactionMessage(*transaction, in_session);
2141
(void) replication_services.pushTransactionMessage(session, *transaction);
2143
cleanupTransactionMessage(transaction, session);
2146
int TransactionServices::sendEvent(Session::reference session,
2147
const message::Event &event)
2149
ReplicationServices &replication_services= ReplicationServices::singleton();
2150
if (! replication_services.isActive())
2153
message::Transaction *transaction= new (nothrow) message::Transaction();
2155
// set server id, start timestamp
2156
initTransactionMessage(*transaction, session, true);
2158
// set end timestamp
2159
finalizeTransactionMessage(*transaction, session);
2161
message::Event *trx_event= transaction->mutable_event();
2163
trx_event->CopyFrom(event);
2165
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2169
return static_cast<int>(result);
2172
bool TransactionServices::sendStartupEvent(Session::reference session)
2174
message::Event event;
2175
event.set_type(message::Event::STARTUP);
2176
if (sendEvent(session, event) != 0)
2181
bool TransactionServices::sendShutdownEvent(Session::reference session)
2183
message::Event event;
2184
event.set_type(message::Event::SHUTDOWN);
2185
if (sendEvent(session, event) != 0)
1677
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1679
cleanupTransactionMessage(transaction, in_session);
2190
1682
} /* namespace drizzled */