430
Check if we can skip the two-phase commit.
432
A helper function to evaluate if two-phase commit is mandatory.
433
As a side effect, propagates the read-only/read-write flags
434
of the statement transaction to its enclosing normal transaction.
436
@retval true we must run a two-phase commit. Returned
437
if we have at least two engines with read-write changes.
438
@retval false Don't need two-phase commit. Even if we have two
439
transactional engines, we can run two independent
440
commits if changes in one of the engines are read-only.
444
ha_check_and_coalesce_trx_read_only(Session *session,
445
TransactionContext::ResourceContexts &resource_contexts,
446
bool normal_transaction)
448
/* The number of storage engines that have actual changes. */
449
unsigned num_resources_modified_data= 0;
450
ResourceContext *resource_context;
452
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
it != resource_contexts.end();
456
resource_context= *it;
457
if (resource_context->hasModifiedData())
458
++num_resources_modified_data;
460
if (! normal_transaction)
462
ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
assert(resource_context != resource_context_normal);
465
Merge read-only/read-write information about statement
466
transaction to its enclosing normal transaction. Do this
467
only if in a real transaction -- that is, if we know
468
that resource_context_all is registered in session->transaction.all.
469
Since otherwise we only clutter the normal transaction flags.
471
if (resource_context_normal->isStarted()) /* false if autocommit. */
472
resource_context_normal->coalesceWith(resource_context);
474
else if (num_resources_modified_data > 1)
477
It is a normal transaction, so we don't need to merge read/write
478
information up, and the need for two-phase commit has been
479
already established. Break the loop prematurely.
484
return num_resources_modified_data > 1;
523
464
if (resource_contexts.empty() == false)
527
466
if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
529
468
rollbackTransaction(session, normal_transaction);
533
must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
535
if (! trans->no_2pc && must_2pc)
473
* If replication is on, we do a PREPARE on the resource managers, push the
474
* Transaction message across the replication stream, and then COMMIT if the
475
* replication stream returned successfully.
477
if (shouldConstructMessages())
537
479
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
480
it != resource_contexts.end() && ! error;
508
if (error == 0 && is_real_trans)
511
* Push the constructed Transaction messages across to
512
* replicators and appliers.
514
error= commitTransactionMessage(session);
568
518
rollbackTransaction(session, normal_transaction);
885
inline bool TransactionServices::shouldConstructMessages()
887
ReplicationServices &replication_services= ReplicationServices::singleton();
888
return replication_services.isActive();
946
891
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
948
893
message::Transaction *transaction= in_session->getTransactionMessage();
987
932
in_session->setTransactionMessage(NULL);
990
void TransactionServices::commitTransactionMessage(Session *in_session)
935
int TransactionServices::commitTransactionMessage(Session *in_session)
992
937
ReplicationServices &replication_services= ReplicationServices::singleton();
993
938
if (! replication_services.isActive())
996
941
/* If there is an active statement message, finalize it */
997
942
message::Statement *statement= in_session->getStatementMessage();
1001
946
finalizeStatementMessage(*statement, in_session);
1004
return; /* No data modification occurred inside the transaction */
949
return 0; /* No data modification occurred inside the transaction */
1006
951
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1008
953
finalizeTransactionMessage(*transaction, in_session);
1010
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
955
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1012
957
cleanupTransactionMessage(transaction, in_session);
959
return static_cast<int>(result);
1015
962
void TransactionServices::initStatementMessage(message::Statement &statement,