69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
72
#include "drizzled/internal/my_sys.h"
76
77
#include <algorithm>
77
78
#include <functional>
78
#include <google/protobuf/repeated_field.h>
81
using namespace google;
300
297
* transaction after all DDLs, just like the statement transaction
301
298
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
316
300
void TransactionServices::registerResourceForStatement(Session *session,
317
301
plugin::MonitoredInTransaction *monitored,
318
302
plugin::TransactionalStorageEngine *engine)
403
387
if (session->transaction.xid_state.xid.is_null())
404
388
session->transaction.xid_state.xid.set(session->getQueryId());
390
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
406
392
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
393
if (! session->getResourceContext(monitored, 0)->isStarted())
408
394
registerResourceForStatement(session, monitored, engine);
440
426
registerResourceForStatement(session, monitored, engine, resource_manager);
443
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
451
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
if (session->getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
463
return session->getXaId();
501
464
if (resource_contexts.empty() == false)
503
if (is_real_trans && session->wait_if_global_read_lock(false, false))
466
if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
505
468
rollbackTransaction(session, normal_transaction);
710
671
if (is_real_trans &&
711
672
session->transaction.all.hasModifiedNonTransData() &&
712
session->getKilled() != Session::KILL_CONNECTION)
673
session->killed != Session::KILL_CONNECTION)
714
675
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
676
ER_WARNING_NOT_COMPLETE_ROLLBACK,
733
694
int TransactionServices::autocommitOrRollback(Session *session, int error)
735
/* One GPB Statement message per SQL statement */
736
message::Statement *statement= session->getStatementMessage();
737
if ((statement != NULL) && (! error))
738
finalizeStatementMessage(*statement, session);
740
696
if (session->transaction.stmt.getResourceContexts().empty() == false)
742
TransactionContext *trans = &session->transaction.stmt;
743
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
it != resource_contexts.end();
748
ResourceContext *resource_context= *it;
750
resource_context->getTransactionalStorageEngine()->endStatement(session);
755
700
if (commitTransaction(session, false))
1019
964
trx->set_server_id(in_session->getServerId());
1021
966
if (should_inc_trx_id)
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1028
trx->set_transaction_id(0);
967
trx->set_transaction_id(getNextTransactionId());
1031
969
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1053
990
if (! replication_services.isActive())
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (in_session->getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
993
/* If there is an active statement message, finalize it */
1064
994
message::Statement *statement= in_session->getStatementMessage();
1066
996
if (statement != NULL)
1068
998
finalizeStatementMessage(*statement, in_session);
1001
return 0; /* No data modification occurred inside the transaction */
1071
1003
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, in_session);
1085
1005
finalizeTransactionMessage(*transaction, in_session);
1087
1007
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1098
1018
statement.set_type(in_type);
1099
1019
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1101
if (in_session->variables.replicate_query)
1102
statement.set_sql(in_session->getQueryString()->c_str());
1020
/** @TODO Set sql string optionally */
1105
1023
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1161
1079
cleanupTransactionMessage(transaction, in_session);
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1166
ReplicationServices &replication_services= ReplicationServices::singleton();
1167
if (! replication_services.isActive())
1170
message::Statement *current_statement= in_session->getStatementMessage();
1172
/* If we never added a Statement message, nothing to undo. */
1173
if (current_statement == NULL)
1177
* If the Statement has been segmented, then we've already pushed a portion
1178
* of this Statement's row changes through the replication stream and we
1179
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
* delete the current Statement message.
1182
bool is_segmented= false;
1184
switch (current_statement->type())
1186
case message::Statement::INSERT:
1187
if (current_statement->insert_data().segment_id() > 1)
1191
case message::Statement::UPDATE:
1192
if (current_statement->update_data().segment_id() > 1)
1196
case message::Statement::DELETE:
1197
if (current_statement->delete_data().segment_id() > 1)
1206
* Remove the Statement message we've been working with (same as
1207
* current_statement).
1209
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
statements_in_txn= transaction->mutable_statement();
1212
statements_in_txn->RemoveLast();
1213
in_session->setStatementMessage(NULL);
1216
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
* an indicator to cancel the previous Statement message which should have
1218
* had its end_segment attribute set to false.
1222
current_statement= transaction->add_statement();
1223
initStatementMessage(*current_statement,
1224
message::Statement::ROLLBACK_STATEMENT,
1226
finalizeStatementMessage(*current_statement, in_session);
1230
1082
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1231
1083
Table *in_table,
1232
1084
uint32_t *next_segment_id)
1417
string_value= current_field->val_str_internal(string_value);
1269
string_value= current_field->val_str(string_value);
1418
1270
record->add_is_null(false);
1419
1271
record->add_insert_value(string_value->c_ptr(), string_value->length());
1420
1272
string_value->free();
1687
1539
bool is_read_set= current_field->isReadSet();
1689
1541
/* We need to mark that we will "read" this field... */
1690
in_table->setReadSet(current_field->position());
1542
in_table->setReadSet(current_field->field_index);
1692
1544
/* Read the string value of this field's contents */
1693
string_value= current_field->val_str_internal(string_value);
1545
string_value= current_field->val_str(string_value);
1696
1548
* Reset the read bit after reading field to its original state. This
1724
1576
* @todo Move this crap into a real Record API.
1726
string_value= current_field->val_str_internal(string_value,
1728
current_field->offset(const_cast<unsigned char *>(new_record)));
1578
string_value= current_field->val_str(string_value,
1580
current_field->offset(const_cast<unsigned char *>(new_record)));
1729
1581
record->add_key_value(string_value->c_ptr(), string_value->length());
1730
1582
string_value->free();
1959
1811
const unsigned char *old_ptr= current_field->ptr;
1960
1812
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1961
string_value= current_field->val_str_internal(string_value);
1813
string_value= current_field->val_str(string_value);
1962
1814
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1966
string_value= current_field->val_str_internal(string_value);
1818
string_value= current_field->val_str(string_value);
1968
1820
* @TODO Store optional old record value in the before data member