47
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/epoch.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
51
#include <drizzled/current_session.h>
52
#include <drizzled/error.h>
53
#include <drizzled/gettext.h>
54
#include <drizzled/probes.h>
55
#include <drizzled/sql_parse.h>
56
#include <drizzled/session.h>
57
#include <drizzled/session/times.h>
58
#include <drizzled/sql_base.h>
59
#include <drizzled/replication_services.h>
60
#include <drizzled/transaction_services.h>
61
#include <drizzled/transaction_context.h>
62
#include <drizzled/message/transaction.pb.h>
63
#include <drizzled/message/statement_transform.h>
64
#include <drizzled/resource_context.h>
65
#include <drizzled/lock.h>
66
#include <drizzled/item/int.h>
67
#include <drizzled/item/empty_string.h>
68
#include <drizzled/field/epoch.h>
69
#include <drizzled/plugin/client.h>
70
#include <drizzled/plugin/monitored_in_transaction.h>
71
#include <drizzled/plugin/transactional_storage_engine.h>
72
#include <drizzled/plugin/xa_resource_manager.h>
73
#include <drizzled/plugin/xa_storage_engine.h>
74
#include <drizzled/internal/my_sys.h>
75
#include <drizzled/statistics_variables.h>
76
#include <drizzled/system_variables.h>
77
#include <drizzled/session/transactions.h>
76
80
#include <algorithm>
300
303
* transaction after all DDLs, just like the statement transaction
301
304
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
307
static plugin::XaStorageEngine& xa_storage_engine()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
309
static plugin::XaStorageEngine& engine= static_cast<plugin::XaStorageEngine&>(*plugin::StorageEngine::findByName("InnoDB"));
316
void TransactionServices::registerResourceForStatement(Session::reference session,
313
void TransactionServices::registerResourceForStatement(Session& session,
317
314
plugin::MonitoredInTransaction *monitored,
318
315
plugin::TransactionalStorageEngine *engine)
329
326
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session.transaction.stmt;
333
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
329
TransactionContext& trans= session.transaction.stmt;
330
ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
335
if (resource_context->isStarted())
332
if (resource_context.isStarted())
336
333
return; /* already registered, return */
338
335
assert(monitored->participatesInSqlTransaction());
339
336
assert(not monitored->participatesInXaTransaction());
341
resource_context->setMonitored(monitored);
342
resource_context->setTransactionalStorageEngine(engine);
343
trans->registerResource(resource_context);
345
trans->no_2pc|= true;
338
resource_context.setMonitored(monitored);
339
resource_context.setTransactionalStorageEngine(engine);
340
trans.registerResource(&resource_context);
348
void TransactionServices::registerResourceForStatement(Session::reference session,
344
void TransactionServices::registerResourceForStatement(Session& session,
349
345
plugin::MonitoredInTransaction *monitored,
350
346
plugin::TransactionalStorageEngine *engine,
351
347
plugin::XaResourceManager *resource_manager)
362
358
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session.transaction.stmt;
366
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
361
TransactionContext& trans= session.transaction.stmt;
362
ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
368
if (resource_context->isStarted())
364
if (resource_context.isStarted())
369
365
return; /* already registered, return */
371
367
assert(monitored->participatesInXaTransaction());
372
368
assert(monitored->participatesInSqlTransaction());
374
resource_context->setMonitored(monitored);
375
resource_context->setTransactionalStorageEngine(engine);
376
resource_context->setXaResourceManager(resource_manager);
377
trans->registerResource(resource_context);
379
trans->no_2pc|= false;
370
resource_context.setMonitored(monitored);
371
resource_context.setTransactionalStorageEngine(engine);
372
resource_context.setXaResourceManager(resource_manager);
373
trans.registerResource(&resource_context);
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
376
void TransactionServices::registerResourceForTransaction(Session& session,
383
377
plugin::MonitoredInTransaction *monitored,
384
378
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session.transaction.all;
387
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
380
TransactionContext& trans= session.transaction.all;
381
ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
389
if (resource_context->isStarted())
383
if (resource_context.isStarted())
390
384
return; /* already registered, return */
392
386
session.server_status|= SERVER_STATUS_IN_TRANS;
394
trans->registerResource(resource_context);
388
trans.registerResource(&resource_context);
396
390
assert(monitored->participatesInSqlTransaction());
397
391
assert(not monitored->participatesInXaTransaction());
399
resource_context->setMonitored(monitored);
400
resource_context->setTransactionalStorageEngine(engine);
401
trans->no_2pc|= true;
393
resource_context.setMonitored(monitored);
394
resource_context.setTransactionalStorageEngine(engine);
403
397
if (session.transaction.xid_state.xid.is_null())
404
398
session.transaction.xid_state.xid.set(session.getQueryId());
406
400
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session.getResourceContext(monitored, 0)->isStarted())
401
if (not session.getResourceContext(*monitored, 0).isStarted())
408
402
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
405
void TransactionServices::registerResourceForTransaction(Session& session,
412
406
plugin::MonitoredInTransaction *monitored,
413
407
plugin::TransactionalStorageEngine *engine,
414
408
plugin::XaResourceManager *resource_manager)
416
410
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
411
ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
419
if (resource_context->isStarted())
413
if (resource_context.isStarted())
420
414
return; /* already registered, return */
422
416
session.server_status|= SERVER_STATUS_IN_TRANS;
424
trans->registerResource(resource_context);
418
trans->registerResource(&resource_context);
426
420
assert(monitored->participatesInSqlTransaction());
428
resource_context->setMonitored(monitored);
429
resource_context->setXaResourceManager(resource_manager);
430
resource_context->setTransactionalStorageEngine(engine);
431
trans->no_2pc|= true;
422
resource_context.setMonitored(monitored);
423
resource_context.setXaResourceManager(resource_manager);
424
resource_context.setTransactionalStorageEngine(engine);
433
427
if (session.transaction.xid_state.xid.is_null())
434
428
session.transaction.xid_state.xid.set(session.getQueryId());
436
430
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
438
432
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session.getResourceContext(monitored, 0)->isStarted())
433
if (! session.getResourceContext(*monitored, 0).isStarted())
440
434
registerResourceForStatement(session, monitored, engine, resource_manager);
443
437
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
439
if (! ReplicationServices::isActive())
451
444
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
445
uint64_t xa_id= xa_storage_engine().getNewTransactionId(my_session);
453
446
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
449
uint64_t TransactionServices::getCurrentTransactionId(Session& session)
458
451
if (session.getXaId() == 0)
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
453
session.setXaId(xa_storage_engine().getNewTransactionId(&session));
463
456
return session.getXaId();
466
int TransactionServices::commitTransaction(Session::reference session,
459
int TransactionServices::commitTransaction(Session& session,
467
460
bool normal_transaction)
469
462
int error= 0, cookie= 0;
575
566
if (resource_contexts.empty() == false)
577
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
578
it != resource_contexts.end();
568
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
582
ResourceContext *resource_context= *it;
584
570
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
586
572
if (resource->participatesInXaTransaction())
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
574
if (int err= resource_context->getXaResourceManager()->xaCommit(&session, all))
590
576
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
637
623
We must not rollback the normal transaction if a statement
638
624
transaction is pending.
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
626
assert(session.transaction.stmt.getResourceContexts().empty() || trans == &session.transaction.stmt);
643
628
if (resource_contexts.empty() == false)
645
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
646
it != resource_contexts.end();
630
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
650
ResourceContext *resource_context= *it;
652
632
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
654
634
if (resource->participatesInXaTransaction())
656
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
636
if (int err= resource_context->getXaResourceManager()->xaRollback(&session, all))
658
638
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
781
757
rolling back to savepoint in all storage engines that were part of the
782
758
transaction when the savepoint was set
784
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
785
it != sv_resource_contexts.end();
760
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, sv_resource_contexts)
789
ResourceContext *resource_context= *it;
791
762
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
793
764
if (resource->participatesInSqlTransaction())
795
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
766
if (int err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv))
797
768
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
838
809
* savepoint's resource contexts.
841
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
842
it != set_difference_contexts.end();
812
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, set_difference_contexts)
845
ResourceContext *resource_context= *it;
848
814
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
850
816
if (resource->participatesInSqlTransaction())
852
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
818
if (int err= resource_context->getTransactionalStorageEngine()->rollback(&session, true))
854
820
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
908
874
if (resource_contexts.empty() == false)
910
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
911
it != resource_contexts.end();
876
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
914
ResourceContext *resource_context= *it;
917
878
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
919
880
if (resource->participatesInSqlTransaction())
921
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
882
if (int err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv))
923
884
my_error(ER_GET_ERRNO, MYF(0), err);
953
int TransactionServices::releaseSavepoint(Session::reference session,
914
int TransactionServices::releaseSavepoint(Session& session,
954
915
NamedSavepoint &sv)
958
919
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
960
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
961
it != resource_contexts.end();
921
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
965
ResourceContext *resource_context= *it;
967
923
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
969
925
if (resource->participatesInSqlTransaction())
971
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
927
if (int err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv))
973
929
my_error(ER_GET_ERRNO, MYF(0), err);
1094
1046
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
1047
message::Statement::Type type,
1096
Session::const_reference session)
1048
const Session& session)
1098
1050
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1051
statement.set_start_timestamp(session.times.getCurrentTimestamp());
1101
1053
if (session.variables.replicate_query)
1102
1054
statement.set_sql(session.getQueryString()->c_str());
1105
1057
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session::reference session)
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1060
statement.set_end_timestamp(session.times.getCurrentTimestamp());
1109
1061
session.setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1064
void TransactionServices::rollbackTransactionMessage(Session& session)
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
if (! replication_services.isActive())
1066
if (! ReplicationServices::isActive())
1118
1069
message::Transaction *transaction= getActiveTransactionMessage(session);
1921
1864
finalizeTransactionMessage(*transaction, session);
1923
(void) replication_services.pushTransactionMessage(session, *transaction);
1866
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
1925
1868
cleanupTransactionMessage(transaction, session);
1929
void TransactionServices::createSchema(Session::reference session,
1872
void TransactionServices::createSchema(Session& session,
1930
1873
const message::Schema &schema)
1932
ReplicationServices &replication_services= ReplicationServices::singleton();
1933
if (! replication_services.isActive())
1875
if (! ReplicationServices::isActive())
1878
if (not message::is_replicated(schema))
1936
1881
message::Transaction *transaction= getActiveTransactionMessage(session);
1937
1882
message::Statement *statement= transaction->add_statement();
1951
1896
finalizeTransactionMessage(*transaction, session);
1953
(void) replication_services.pushTransactionMessage(session, *transaction);
1898
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
1955
1900
cleanupTransactionMessage(transaction, session);
1959
void TransactionServices::dropSchema(Session::reference session,
1960
identifier::Schema::const_reference identifier)
1904
void TransactionServices::dropSchema(Session& session,
1905
const identifier::Schema& identifier,
1906
message::schema::const_reference schema)
1962
ReplicationServices &replication_services= ReplicationServices::singleton();
1963
if (! replication_services.isActive())
1908
if (not ReplicationServices::isActive())
1911
if (not message::is_replicated(schema))
1966
1914
message::Transaction *transaction= getActiveTransactionMessage(session);
1967
1915
message::Statement *statement= transaction->add_statement();
1981
1929
finalizeTransactionMessage(*transaction, session);
1983
(void) replication_services.pushTransactionMessage(session, *transaction);
1931
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
1985
1933
cleanupTransactionMessage(transaction, session);
1988
void TransactionServices::alterSchema(Session::reference session,
1989
const message::schema::shared_ptr &old_schema,
1936
void TransactionServices::alterSchema(Session& session,
1937
const message::Schema &old_schema,
1990
1938
const message::Schema &new_schema)
1992
ReplicationServices &replication_services= ReplicationServices::singleton();
1993
if (! replication_services.isActive())
1940
if (! ReplicationServices::isActive())
1943
if (not message::is_replicated(old_schema))
1996
1946
message::Transaction *transaction= getActiveTransactionMessage(session);
1997
1947
message::Statement *statement= transaction->add_statement();
2007
1957
message::Schema *before= alter_schema_statement->mutable_before();
2008
1958
message::Schema *after= alter_schema_statement->mutable_after();
2010
*before= *old_schema;
1960
*before= old_schema;
2011
1961
*after= new_schema;
2013
1963
finalizeStatementMessage(*statement, session);
2015
1965
finalizeTransactionMessage(*transaction, session);
2017
(void) replication_services.pushTransactionMessage(session, *transaction);
1967
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2019
1969
cleanupTransactionMessage(transaction, session);
2022
void TransactionServices::dropTable(Session::reference session,
2023
const identifier::Table &table,
1972
void TransactionServices::dropTable(Session& session,
1973
const identifier::Table& identifier,
1974
message::table::const_reference table,
2024
1975
bool if_exists)
2026
ReplicationServices &replication_services= ReplicationServices::singleton();
2027
if (! replication_services.isActive())
1977
if (! ReplicationServices::isActive())
1980
if (not message::is_replicated(table))
2030
1983
message::Transaction *transaction= getActiveTransactionMessage(session);
2031
1984
message::Statement *statement= transaction->add_statement();
2043
1996
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2045
table_metadata->set_schema_name(table.getSchemaName());
2046
table_metadata->set_table_name(table.getTableName());
1998
table_metadata->set_schema_name(identifier.getSchemaName());
1999
table_metadata->set_table_name(identifier.getTableName());
2048
2001
finalizeStatementMessage(*statement, session);
2050
2003
finalizeTransactionMessage(*transaction, session);
2052
(void) replication_services.pushTransactionMessage(session, *transaction);
2005
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2054
2007
cleanupTransactionMessage(transaction, session);
2057
void TransactionServices::truncateTable(Session::reference session,
2010
void TransactionServices::truncateTable(Session& session, Table &table)
2060
ReplicationServices &replication_services= ReplicationServices::singleton();
2061
if (! replication_services.isActive())
2012
if (! ReplicationServices::isActive())
2015
if (not table.getShare()->is_replicated())
2064
2018
message::Transaction *transaction= getActiveTransactionMessage(session);
2065
2019
message::Statement *statement= transaction->add_statement();
2073
2027
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2074
2028
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2077
(void) table.getShare()->getSchemaName(schema_name);
2079
(void) table.getShare()->getTableName(table_name);
2081
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2030
table_metadata->set_schema_name(table.getShare()->getSchemaName());
2031
table_metadata->set_table_name(table.getShare()->getTableName());
2084
2033
finalizeStatementMessage(*statement, session);
2086
2035
finalizeTransactionMessage(*transaction, session);
2088
(void) replication_services.pushTransactionMessage(session, *transaction);
2037
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2090
2039
cleanupTransactionMessage(transaction, session);
2093
void TransactionServices::rawStatement(Session::reference session,
2094
const string &query)
2042
void TransactionServices::rawStatement(Session& session,
2043
const string &query,
2044
const string &schema)
2096
ReplicationServices &replication_services= ReplicationServices::singleton();
2097
if (! replication_services.isActive())
2046
if (! ReplicationServices::isActive())
2100
2049
message::Transaction *transaction= getActiveTransactionMessage(session);
2103
2052
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2104
2053
statement->set_sql(query);
2054
if (not schema.empty())
2055
statement->set_raw_sql_schema(schema);
2105
2056
finalizeStatementMessage(*statement, session);
2107
2058
finalizeTransactionMessage(*transaction, session);
2109
(void) replication_services.pushTransactionMessage(session, *transaction);
2060
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2111
2062
cleanupTransactionMessage(transaction, session);
2114
int TransactionServices::sendEvent(Session::reference session,
2115
const message::Event &event)
2065
int TransactionServices::sendEvent(Session& session, const message::Event &event)
2117
ReplicationServices &replication_services= ReplicationServices::singleton();
2118
if (! replication_services.isActive())
2067
if (not ReplicationServices::isActive())
2121
message::Transaction *transaction= new (nothrow) message::Transaction();
2069
message::Transaction transaction;
2123
2071
// set server id, start timestamp
2124
initTransactionMessage(*transaction, session, true);
2072
initTransactionMessage(transaction, session, true);
2126
2074
// set end timestamp
2127
finalizeTransactionMessage(*transaction, session);
2129
message::Event *trx_event= transaction->mutable_event();
2075
finalizeTransactionMessage(transaction, session);
2077
message::Event *trx_event= transaction.mutable_event();
2131
2078
trx_event->CopyFrom(event);
2133
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2137
return static_cast<int>(result);
2079
plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, transaction);
2140
bool TransactionServices::sendStartupEvent(Session::reference session)
2083
bool TransactionServices::sendStartupEvent(Session& session)
2142
2085
message::Event event;
2143
2086
event.set_type(message::Event::STARTUP);
2144
if (sendEvent(session, event) != 0)
2087
return not sendEvent(session, event);
2149
bool TransactionServices::sendShutdownEvent(Session::reference session)
2090
bool TransactionServices::sendShutdownEvent(Session& session)
2151
2092
message::Event event;
2152
2093
event.set_type(message::Event::SHUTDOWN);
2153
if (sendEvent(session, event) != 0)
2094
return not sendEvent(session, event);
2158
2097
} /* namespace drizzled */