~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

This patch adds the following functionality:

* Removes the need to manually enable replicators in order
  for an applier to work.
* Removes the enabled/disabled setting of both transaction
  applier plugins and transaction replicator plugins
* Pairs a replicator with an applier into a "ReplicationStream"
  and removes all checks for "enabled" replicators and appliers
* Allows modules that implement a TransactionApplier (such as
  the transaction_log module) to specify which replicator to
  use via a configuration variable.  For instance, the transaction
  log module now has --transaction-log-use-replicator=[default|filtered..]
  instead of the user having to do --default-replicator-enable and such
* Adds a new data dictionary table for REPLICATION_STREAMS, which
  allows querying of activated replication-to-applier streams
  managed by drizzled::ReplicationServices

Show diffs side-by-side

added added

removed removed

Lines of Context:
427
427
}
428
428
 
429
429
/**
430
 
  Check if we can skip the two-phase commit.
431
 
 
432
 
  A helper function to evaluate if two-phase commit is mandatory.
433
 
  As a side effect, propagates the read-only/read-write flags
434
 
  of the statement transaction to its enclosing normal transaction.
435
 
 
436
 
  @retval true   we must run a two-phase commit. Returned
437
 
                 if we have at least two engines with read-write changes.
438
 
  @retval false  Don't need two-phase commit. Even if we have two
439
 
                 transactional engines, we can run two independent
440
 
                 commits if changes in one of the engines are read-only.
441
 
*/
442
 
static
443
 
bool
444
 
ha_check_and_coalesce_trx_read_only(Session *session,
445
 
                                    TransactionContext::ResourceContexts &resource_contexts,
446
 
                                    bool normal_transaction)
447
 
{
448
 
  /* The number of storage engines that have actual changes. */
449
 
  unsigned num_resources_modified_data= 0;
450
 
  ResourceContext *resource_context;
451
 
 
452
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
 
       it != resource_contexts.end();
454
 
       ++it)
455
 
  {
456
 
    resource_context= *it;
457
 
    if (resource_context->hasModifiedData())
458
 
      ++num_resources_modified_data;
459
 
 
460
 
    if (! normal_transaction)
461
 
    {
462
 
      ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
 
      assert(resource_context != resource_context_normal);
464
 
      /*
465
 
        Merge read-only/read-write information about statement
466
 
        transaction to its enclosing normal transaction. Do this
467
 
        only if in a real transaction -- that is, if we know
468
 
        that resource_context_all is registered in session->transaction.all.
469
 
        Since otherwise we only clutter the normal transaction flags.
470
 
      */
471
 
      if (resource_context_normal->isStarted()) /* false if autocommit. */
472
 
        resource_context_normal->coalesceWith(resource_context);
473
 
    }
474
 
    else if (num_resources_modified_data > 1)
475
 
    {
476
 
      /*
477
 
        It is a normal transaction, so we don't need to merge read/write
478
 
        information up, and the need for two-phase commit has been
479
 
        already established. Break the loop prematurely.
480
 
      */
481
 
      break;
482
 
    }
483
 
  }
484
 
  return num_resources_modified_data > 1;
485
 
}
486
 
 
487
 
 
488
 
/**
489
430
  @retval
490
431
    0   ok
491
432
  @retval
499
440
    stored functions or triggers. So we simply do nothing now.
500
441
    TODO: This should be fixed in later ( >= 5.1) releases.
501
442
*/
502
 
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
 
443
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
503
444
{
504
445
  int error= 0, cookie= 0;
505
446
  /*
522
463
 
523
464
  if (resource_contexts.empty() == false)
524
465
  {
525
 
    bool must_2pc;
526
 
 
527
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
528
467
    {
529
 
      ha_rollback_trans(session, normal_transaction);
 
468
      rollbackTransaction(session, normal_transaction);
530
469
      return 1;
531
470
    }
532
471
 
533
 
    must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
534
 
 
535
 
    if (! trans->no_2pc && must_2pc)
 
472
    /*
 
473
     * If replication is on, we do a PREPARE on the resource managers, push the
 
474
     * Transaction message across the replication stream, and then COMMIT if the
 
475
     * replication stream returned successfully.
 
476
     */
 
477
    if (shouldConstructMessages())
536
478
    {
537
479
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
480
           it != resource_contexts.end() && ! error;
563
505
          }
564
506
        }
565
507
      }
 
508
      if (error == 0 && is_real_trans)
 
509
      {
 
510
        /*
 
511
         * Push the constructed Transaction messages across to
 
512
         * replicators and appliers.
 
513
         */
 
514
        error= commitTransactionMessage(session);
 
515
      }
566
516
      if (error)
567
517
      {
568
 
        ha_rollback_trans(session, normal_transaction);
 
518
        rollbackTransaction(session, normal_transaction);
569
519
        error= 1;
570
520
        goto end;
571
521
      }
572
522
    }
573
 
    error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
 
523
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
574
524
end:
575
525
    if (is_real_trans)
576
526
      start_waiting_global_read_lock(session);
582
532
  @note
583
533
  This function does not care about global read lock. A caller should.
584
534
*/
585
 
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
 
535
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
586
536
{
587
537
  int error=0;
588
538
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
638
588
    }
639
589
  }
640
590
  trans->reset();
641
 
  if (error == 0)
642
 
  {
643
 
    if (is_real_trans)
644
 
    {
645
 
      /* 
646
 
       * We commit the normal transaction by finalizing the transaction message
647
 
       * and propogating the message to all registered replicators.
648
 
       */
649
 
      commitTransactionMessage(session);
650
 
    }
651
 
  }
652
591
  return error;
653
592
}
654
593
 
655
 
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
 
594
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
656
595
{
657
596
  int error= 0;
658
597
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
751
690
    the user has used LOCK TABLES then that mechanism does not know to do the
752
691
    commit.
753
692
*/
754
 
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
 
693
int TransactionServices::autocommitOrRollback(Session *session, int error)
755
694
{
756
695
  if (session->transaction.stmt.getResourceContexts().empty() == false)
757
696
  {
758
697
    if (! error)
759
698
    {
760
 
      if (ha_commit_trans(session, false))
 
699
      if (commitTransaction(session, false))
761
700
        error= 1;
762
701
    }
763
702
    else
764
703
    {
765
 
      (void) ha_rollback_trans(session, false);
 
704
      (void) rollbackTransaction(session, false);
766
705
      if (session->transaction_rollback_request)
767
 
        (void) ha_rollback_trans(session, true);
 
706
        (void) rollbackTransaction(session, true);
768
707
    }
769
708
 
770
709
    session->variables.tx_isolation= session->session_tx_isolation;
772
711
  return error;
773
712
}
774
713
 
775
 
/**
776
 
  return the list of XID's to a client, the same way SHOW commands do.
777
 
 
778
 
  @note
779
 
    I didn't find in XA specs that an RM cannot return the same XID twice,
780
 
    so mysql_xa_recover does not filter XID's to ensure uniqueness.
781
 
    It can be easily fixed later, if necessary.
782
 
*/
783
 
bool TransactionServices::mysql_xa_recover(Session *session)
784
 
{
785
 
  List<Item> field_list;
786
 
  int i= 0;
787
 
  XID_STATE *xs;
788
 
 
789
 
  field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790
 
  field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791
 
  field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792
 
  field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
793
 
 
794
 
  if (session->client->sendFields(&field_list))
795
 
    return 1;
796
 
 
797
 
  pthread_mutex_lock(&LOCK_xid_cache);
798
 
  while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
799
 
  {
800
 
    if (xs->xa_state==XA_PREPARED)
801
 
    {
802
 
      session->client->store((int64_t)xs->xid.formatID);
803
 
      session->client->store((int64_t)xs->xid.gtrid_length);
804
 
      session->client->store((int64_t)xs->xid.bqual_length);
805
 
      session->client->store(xs->xid.data,
806
 
                             xs->xid.gtrid_length+xs->xid.bqual_length);
807
 
      if (session->client->flush())
808
 
      {
809
 
        pthread_mutex_unlock(&LOCK_xid_cache);
810
 
        return 1;
811
 
      }
812
 
    }
813
 
  }
814
 
 
815
 
  pthread_mutex_unlock(&LOCK_xid_cache);
816
 
  session->my_eof();
817
 
  return 0;
818
 
}
819
 
 
820
714
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
821
715
{
822
716
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
827
721
  }
828
722
};
829
723
 
830
 
int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
 
724
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
831
725
{
832
726
  int error= 0;
833
727
  TransactionContext *trans= &session->transaction.all;
923
817
  section "4.33.4 SQL-statements and transaction states",
924
818
  NamedSavepoint is *not* transaction-initiating SQL-statement
925
819
*/
926
 
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
 
820
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
927
821
{
928
822
  int error= 0;
929
823
  TransactionContext *trans= &session->transaction.all;
961
855
  return error;
962
856
}
963
857
 
964
 
int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
 
858
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
965
859
{
966
860
  int error= 0;
967
861
 
988
882
  return error;
989
883
}
990
884
 
 
885
bool TransactionServices::shouldConstructMessages()
 
886
{
 
887
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
888
  return replication_services.isActive();
 
889
}
 
890
 
991
891
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
992
892
{
993
893
  message::Transaction *transaction= in_session->getTransactionMessage();
1013
913
{
1014
914
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
915
  trx->set_server_id(in_session->getServerId());
1016
 
  trx->set_transaction_id(in_session->getQueryId());
 
916
  trx->set_transaction_id(getNextTransactionId());
1017
917
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1018
918
}
1019
919
 
1032
932
  in_session->setTransactionMessage(NULL);
1033
933
}
1034
934
 
1035
 
void TransactionServices::commitTransactionMessage(Session *in_session)
 
935
int TransactionServices::commitTransactionMessage(Session *in_session)
1036
936
{
1037
937
  ReplicationServices &replication_services= ReplicationServices::singleton();
1038
938
  if (! replication_services.isActive())
1039
 
    return;
 
939
    return 0;
1040
940
 
1041
941
  /* If there is an active statement message, finalize it */
1042
942
  message::Statement *statement= in_session->getStatementMessage();
1046
946
    finalizeStatementMessage(*statement, in_session);
1047
947
  }
1048
948
  else
1049
 
    return; /* No data modification occurred inside the transaction */
 
949
    return 0; /* No data modification occurred inside the transaction */
1050
950
  
1051
951
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1052
952
 
1053
953
  finalizeTransactionMessage(*transaction, in_session);
1054
954
  
1055
 
  replication_services.pushTransactionMessage(*transaction);
 
955
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1056
956
 
1057
957
  cleanupTransactionMessage(transaction, in_session);
 
958
 
 
959
  return static_cast<int>(result);
1058
960
}
1059
961
 
1060
962
void TransactionServices::initStatementMessage(message::Statement &statement,
1114
1016
 
1115
1017
    finalizeTransactionMessage(*transaction, in_session);
1116
1018
    
1117
 
    replication_services.pushTransactionMessage(*transaction);
 
1019
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1118
1020
  }
1119
1021
  cleanupTransactionMessage(transaction, in_session);
1120
1022
}
1552
1454
 
1553
1455
  finalizeTransactionMessage(*transaction, in_session);
1554
1456
  
1555
 
  replication_services.pushTransactionMessage(*transaction);
 
1457
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1556
1458
 
1557
1459
  cleanupTransactionMessage(transaction, in_session);
1558
1460
 
1582
1484
 
1583
1485
  finalizeTransactionMessage(*transaction, in_session);
1584
1486
  
1585
 
  replication_services.pushTransactionMessage(*transaction);
 
1487
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1586
1488
 
1587
1489
  cleanupTransactionMessage(transaction, in_session);
1588
1490
 
1611
1513
 
1612
1514
  finalizeTransactionMessage(*transaction, in_session);
1613
1515
  
1614
 
  replication_services.pushTransactionMessage(*transaction);
 
1516
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1615
1517
 
1616
1518
  cleanupTransactionMessage(transaction, in_session);
1617
1519
}
1647
1549
 
1648
1550
  finalizeTransactionMessage(*transaction, in_session);
1649
1551
  
1650
 
  replication_services.pushTransactionMessage(*transaction);
 
1552
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1651
1553
 
1652
1554
  cleanupTransactionMessage(transaction, in_session);
1653
1555
}
1682
1584
 
1683
1585
  finalizeTransactionMessage(*transaction, in_session);
1684
1586
  
1685
 
  replication_services.pushTransactionMessage(*transaction);
 
1587
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1686
1588
 
1687
1589
  cleanupTransactionMessage(transaction, in_session);
1688
1590
}
1702
1604
 
1703
1605
  finalizeTransactionMessage(*transaction, in_session);
1704
1606
  
1705
 
  replication_services.pushTransactionMessage(*transaction);
 
1607
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1706
1608
 
1707
1609
  cleanupTransactionMessage(transaction, in_session);
1708
1610
}