430
Check if we can skip the two-phase commit.
432
A helper function to evaluate if two-phase commit is mandatory.
433
As a side effect, propagates the read-only/read-write flags
434
of the statement transaction to its enclosing normal transaction.
436
@retval true we must run a two-phase commit. Returned
437
if we have at least two engines with read-write changes.
438
@retval false Don't need two-phase commit. Even if we have two
439
transactional engines, we can run two independent
440
commits if changes in one of the engines are read-only.
444
ha_check_and_coalesce_trx_read_only(Session *session,
445
TransactionContext::ResourceContexts &resource_contexts,
446
bool normal_transaction)
448
/* The number of storage engines that have actual changes. */
449
unsigned num_resources_modified_data= 0;
450
ResourceContext *resource_context;
452
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
it != resource_contexts.end();
456
resource_context= *it;
457
if (resource_context->hasModifiedData())
458
++num_resources_modified_data;
460
if (! normal_transaction)
462
ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
assert(resource_context != resource_context_normal);
465
Merge read-only/read-write information about statement
466
transaction to its enclosing normal transaction. Do this
467
only if in a real transaction -- that is, if we know
468
that resource_context_all is registered in session->transaction.all.
469
Since otherwise we only clutter the normal transaction flags.
471
if (resource_context_normal->isStarted()) /* false if autocommit. */
472
resource_context_normal->coalesceWith(resource_context);
474
else if (num_resources_modified_data > 1)
477
It is a normal transaction, so we don't need to merge read/write
478
information up, and the need for two-phase commit has been
479
already established. Break the loop prematurely.
484
return num_resources_modified_data > 1;
499
440
stored functions or triggers. So we simply do nothing now.
500
441
TODO: This should be fixed in later ( >= 5.1) releases.
502
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
443
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
504
445
int error= 0, cookie= 0;
523
464
if (resource_contexts.empty() == false)
527
466
if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
529
ha_rollback_trans(session, normal_transaction);
468
rollbackTransaction(session, normal_transaction);
533
must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
535
if (! trans->no_2pc && must_2pc)
473
* If replication is on, we do a PREPARE on the resource managers, push the
474
* Transaction message across the replication stream, and then COMMIT if the
475
* replication stream returned successfully.
477
if (shouldConstructMessages())
537
479
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
480
it != resource_contexts.end() && ! error;
508
if (error == 0 && is_real_trans)
511
* Push the constructed Transaction messages across to
512
* replicators and appliers.
514
error= commitTransactionMessage(session);
568
ha_rollback_trans(session, normal_transaction);
518
rollbackTransaction(session, normal_transaction);
573
error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
523
error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
575
525
if (is_real_trans)
576
526
start_waiting_global_read_lock(session);
583
533
This function does not care about global read lock. A caller should.
585
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
535
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
588
538
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
646
* We commit the normal transaction by finalizing the transaction message
647
* and propogating the message to all registered replicators.
649
commitTransactionMessage(session);
655
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
594
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
658
597
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
751
690
the user has used LOCK TABLES then that mechanism does not know to do the
754
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
693
int TransactionServices::autocommitOrRollback(Session *session, int error)
756
695
if (session->transaction.stmt.getResourceContexts().empty() == false)
760
if (ha_commit_trans(session, false))
699
if (commitTransaction(session, false))
765
(void) ha_rollback_trans(session, false);
704
(void) rollbackTransaction(session, false);
766
705
if (session->transaction_rollback_request)
767
(void) ha_rollback_trans(session, true);
706
(void) rollbackTransaction(session, true);
770
709
session->variables.tx_isolation= session->session_tx_isolation;
776
return the list of XID's to a client, the same way SHOW commands do.
779
I didn't find in XA specs that an RM cannot return the same XID twice,
780
so mysql_xa_recover does not filter XID's to ensure uniqueness.
781
It can be easily fixed later, if necessary.
783
bool TransactionServices::mysql_xa_recover(Session *session)
785
List<Item> field_list;
789
field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790
field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791
field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792
field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
794
if (session->client->sendFields(&field_list))
797
pthread_mutex_lock(&LOCK_xid_cache);
798
while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
800
if (xs->xa_state==XA_PREPARED)
802
session->client->store((int64_t)xs->xid.formatID);
803
session->client->store((int64_t)xs->xid.gtrid_length);
804
session->client->store((int64_t)xs->xid.bqual_length);
805
session->client->store(xs->xid.data,
806
xs->xid.gtrid_length+xs->xid.bqual_length);
807
if (session->client->flush())
809
pthread_mutex_unlock(&LOCK_xid_cache);
815
pthread_mutex_unlock(&LOCK_xid_cache);
820
714
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
822
716
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
923
817
section "4.33.4 SQL-statements and transaction states",
924
818
NamedSavepoint is *not* transaction-initiating SQL-statement
926
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
820
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
929
823
TransactionContext *trans= &session->transaction.all;
885
bool TransactionServices::shouldConstructMessages()
887
ReplicationServices &replication_services= ReplicationServices::singleton();
888
return replication_services.isActive();
991
891
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
993
893
message::Transaction *transaction= in_session->getTransactionMessage();
1014
914
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
915
trx->set_server_id(in_session->getServerId());
1016
trx->set_transaction_id(in_session->getQueryId());
916
trx->set_transaction_id(getNextTransactionId());
1017
917
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1032
932
in_session->setTransactionMessage(NULL);
1035
void TransactionServices::commitTransactionMessage(Session *in_session)
935
int TransactionServices::commitTransactionMessage(Session *in_session)
1037
937
ReplicationServices &replication_services= ReplicationServices::singleton();
1038
938
if (! replication_services.isActive())
1041
941
/* If there is an active statement message, finalize it */
1042
942
message::Statement *statement= in_session->getStatementMessage();
1046
946
finalizeStatementMessage(*statement, in_session);
1049
return; /* No data modification occurred inside the transaction */
949
return 0; /* No data modification occurred inside the transaction */
1051
951
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1053
953
finalizeTransactionMessage(*transaction, in_session);
1055
replication_services.pushTransactionMessage(*transaction);
955
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1057
957
cleanupTransactionMessage(transaction, in_session);
959
return static_cast<int>(result);
1060
962
void TransactionServices::initStatementMessage(message::Statement &statement,
1115
1017
finalizeTransactionMessage(*transaction, in_session);
1117
replication_services.pushTransactionMessage(*transaction);
1019
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1119
1021
cleanupTransactionMessage(transaction, in_session);
1553
1455
finalizeTransactionMessage(*transaction, in_session);
1555
replication_services.pushTransactionMessage(*transaction);
1457
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1557
1459
cleanupTransactionMessage(transaction, in_session);
1583
1485
finalizeTransactionMessage(*transaction, in_session);
1585
replication_services.pushTransactionMessage(*transaction);
1487
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1587
1489
cleanupTransactionMessage(transaction, in_session);
1612
1514
finalizeTransactionMessage(*transaction, in_session);
1614
replication_services.pushTransactionMessage(*transaction);
1516
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1616
1518
cleanupTransactionMessage(transaction, in_session);
1648
1550
finalizeTransactionMessage(*transaction, in_session);
1650
replication_services.pushTransactionMessage(*transaction);
1552
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1652
1554
cleanupTransactionMessage(transaction, in_session);
1683
1585
finalizeTransactionMessage(*transaction, in_session);
1685
replication_services.pushTransactionMessage(*transaction);
1587
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1687
1589
cleanupTransactionMessage(transaction, in_session);
1703
1605
finalizeTransactionMessage(*transaction, in_session);
1705
replication_services.pushTransactionMessage(*transaction);
1607
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1707
1609
cleanupTransactionMessage(transaction, in_session);