~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

Here, we do two main things:

* We alter the code path inside the transaction coordinator
  to push transaction messages to replicators in between the
  prepare and commit stages of a transaction
* We remove the prepare_commit_mutex in the InnoDB
  storage engine

Show diffs side-by-side

added added

removed removed

Lines of Context:
427
427
}
428
428
 
429
429
/**
430
 
  Check if we can skip the two-phase commit.
431
 
 
432
 
  A helper function to evaluate if two-phase commit is mandatory.
433
 
  As a side effect, propagates the read-only/read-write flags
434
 
  of the statement transaction to its enclosing normal transaction.
435
 
 
436
 
  @retval true   we must run a two-phase commit. Returned
437
 
                 if we have at least two engines with read-write changes.
438
 
  @retval false  Don't need two-phase commit. Even if we have two
439
 
                 transactional engines, we can run two independent
440
 
                 commits if changes in one of the engines are read-only.
441
 
*/
442
 
static
443
 
bool
444
 
ha_check_and_coalesce_trx_read_only(Session *session,
445
 
                                    TransactionContext::ResourceContexts &resource_contexts,
446
 
                                    bool normal_transaction)
447
 
{
448
 
  /* The number of storage engines that have actual changes. */
449
 
  unsigned num_resources_modified_data= 0;
450
 
  ResourceContext *resource_context;
451
 
 
452
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
 
       it != resource_contexts.end();
454
 
       ++it)
455
 
  {
456
 
    resource_context= *it;
457
 
    if (resource_context->hasModifiedData())
458
 
      ++num_resources_modified_data;
459
 
 
460
 
    if (! normal_transaction)
461
 
    {
462
 
      ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
 
      assert(resource_context != resource_context_normal);
464
 
      /*
465
 
        Merge read-only/read-write information about statement
466
 
        transaction to its enclosing normal transaction. Do this
467
 
        only if in a real transaction -- that is, if we know
468
 
        that resource_context_all is registered in session->transaction.all.
469
 
        Since otherwise we only clutter the normal transaction flags.
470
 
      */
471
 
      if (resource_context_normal->isStarted()) /* false if autocommit. */
472
 
        resource_context_normal->coalesceWith(resource_context);
473
 
    }
474
 
    else if (num_resources_modified_data > 1)
475
 
    {
476
 
      /*
477
 
        It is a normal transaction, so we don't need to merge read/write
478
 
        information up, and the need for two-phase commit has been
479
 
        already established. Break the loop prematurely.
480
 
      */
481
 
      break;
482
 
    }
483
 
  }
484
 
  return num_resources_modified_data > 1;
485
 
}
486
 
 
487
 
 
488
 
/**
489
430
  @retval
490
431
    0   ok
491
432
  @retval
522
463
 
523
464
  if (resource_contexts.empty() == false)
524
465
  {
525
 
    bool must_2pc;
526
 
 
527
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
528
467
    {
529
468
      rollbackTransaction(session, normal_transaction);
530
469
      return 1;
531
470
    }
532
471
 
533
 
    must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
534
 
 
535
 
    if (! trans->no_2pc && must_2pc)
 
472
    /*
 
473
     * If replication is on, we do a PREPARE on the resource managers, push the
 
474
     * Transaction message across the replication stream, and then COMMIT if the
 
475
     * replication stream returned successfully.
 
476
     */
 
477
    if (shouldConstructMessages())
536
478
    {
537
479
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
480
           it != resource_contexts.end() && ! error;
563
505
          }
564
506
        }
565
507
      }
 
508
      if (error == 0 && is_real_trans)
 
509
      {
 
510
        /*
 
511
         * Push the constructed Transaction messages across to
 
512
         * replicators and appliers.
 
513
         */
 
514
        error= commitTransactionMessage(session);
 
515
      }
566
516
      if (error)
567
517
      {
568
518
        rollbackTransaction(session, normal_transaction);
638
588
    }
639
589
  }
640
590
  trans->reset();
641
 
  if (error == 0)
642
 
  {
643
 
    if (is_real_trans)
644
 
    {
645
 
      /* 
646
 
       * We commit the normal transaction by finalizing the transaction message
647
 
       * and propogating the message to all registered replicators.
648
 
       */
649
 
      commitTransactionMessage(session);
650
 
    }
651
 
  }
652
591
  return error;
653
592
}
654
593
 
943
882
  return error;
944
883
}
945
884
 
 
885
inline bool TransactionServices::shouldConstructMessages()
 
886
{
 
887
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
888
  return replication_services.isActive();
 
889
}
 
890
 
946
891
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
947
892
{
948
893
  message::Transaction *transaction= in_session->getTransactionMessage();
987
932
  in_session->setTransactionMessage(NULL);
988
933
}
989
934
 
990
 
void TransactionServices::commitTransactionMessage(Session *in_session)
 
935
int TransactionServices::commitTransactionMessage(Session *in_session)
991
936
{
992
937
  ReplicationServices &replication_services= ReplicationServices::singleton();
993
938
  if (! replication_services.isActive())
994
 
    return;
 
939
    return 0;
995
940
 
996
941
  /* If there is an active statement message, finalize it */
997
942
  message::Statement *statement= in_session->getStatementMessage();
1001
946
    finalizeStatementMessage(*statement, in_session);
1002
947
  }
1003
948
  else
1004
 
    return; /* No data modification occurred inside the transaction */
 
949
    return 0; /* No data modification occurred inside the transaction */
1005
950
  
1006
951
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1007
952
 
1008
953
  finalizeTransactionMessage(*transaction, in_session);
1009
954
  
1010
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
955
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1011
956
 
1012
957
  cleanupTransactionMessage(transaction, in_session);
 
958
 
 
959
  return static_cast<int>(result);
1013
960
}
1014
961
 
1015
962
void TransactionServices::initStatementMessage(message::Statement &statement,