~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2010-10-19 21:58:26 UTC
  • mfrom: (1861.3.7 ld-version-script)
  • mto: This revision was merged to the branch mainline in revision 1863.
  • Revision ID: mordred@inaugust.com-20101019215826-ofh15co6vp47vndb
Merge Monty - Fixed the valgrind errors, made it so that haildb works in trunk with no extra magic.

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
71
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/plugin/xa_storage_engine.h"
73
72
#include "drizzled/internal/my_sys.h"
74
73
 
 
74
using namespace std;
 
75
 
75
76
#include <vector>
76
77
#include <algorithm>
77
78
#include <functional>
78
 
#include <google/protobuf/repeated_field.h>
79
 
 
80
 
using namespace std;
81
 
using namespace google;
82
79
 
83
80
namespace drizzled
84
81
{
300
297
 * transaction after all DDLs, just like the statement transaction
301
298
 * is always committed at the end of all statements.
302
299
 */
303
 
TransactionServices::TransactionServices()
304
 
{
305
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
306
 
  if (engine)
307
 
  {
308
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
309
 
  }
310
 
  else 
311
 
  {
312
 
    xa_storage_engine= NULL;
313
 
  }
314
 
}
315
 
 
316
300
void TransactionServices::registerResourceForStatement(Session *session,
317
301
                                                       plugin::MonitoredInTransaction *monitored,
318
302
                                                       plugin::TransactionalStorageEngine *engine)
403
387
  if (session->transaction.xid_state.xid.is_null())
404
388
    session->transaction.xid_state.xid.set(session->getQueryId());
405
389
 
 
390
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
 
391
 
406
392
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
393
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
394
    registerResourceForStatement(session, monitored, engine);
440
426
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
427
}
442
428
 
443
 
void TransactionServices::allocateNewTransactionId()
444
 
{
445
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
446
 
  if (! replication_services.isActive())
447
 
  {
448
 
    return;
449
 
  }
450
 
 
451
 
  Session *my_session= current_session;
452
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
 
  my_session->setXaId(xa_id);
454
 
}
455
 
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
 
{
458
 
  if (session->getXaId() == 0)
459
 
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
461
 
  }
462
 
 
463
 
  return session->getXaId();
464
 
}
465
 
 
466
429
/**
467
430
  @retval
468
431
    0   ok
500
463
 
501
464
  if (resource_contexts.empty() == false)
502
465
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
 
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
504
467
    {
505
468
      rollbackTransaction(session, normal_transaction);
506
469
      return 1;
560
523
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
524
end:
562
525
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
 
526
      start_waiting_global_read_lock(session);
564
527
  }
565
528
  return error;
566
529
}
690
653
     */
691
654
    if (normal_transaction)
692
655
      rollbackTransactionMessage(session);
693
 
    else
694
 
      rollbackStatementMessage(session);
695
656
 
696
657
    if (is_real_trans)
697
658
      session->transaction.xid_state.xid.null();
709
670
   */
710
671
  if (is_real_trans &&
711
672
      session->transaction.all.hasModifiedNonTransData() &&
712
 
      session->getKilled() != Session::KILL_CONNECTION)
 
673
      session->killed != Session::KILL_CONNECTION)
713
674
  {
714
675
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
676
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
732
693
*/
733
694
int TransactionServices::autocommitOrRollback(Session *session, int error)
734
695
{
735
 
  /* One GPB Statement message per SQL statement */
736
 
  message::Statement *statement= session->getStatementMessage();
737
 
  if ((statement != NULL) && (! error))
738
 
    finalizeStatementMessage(*statement, session);
739
 
 
740
696
  if (session->transaction.stmt.getResourceContexts().empty() == false)
741
697
  {
742
 
    TransactionContext *trans = &session->transaction.stmt;
743
 
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
 
         it != resource_contexts.end();
746
 
         ++it)
747
 
    {
748
 
      ResourceContext *resource_context= *it;
749
 
 
750
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
751
 
    }
752
 
 
753
698
    if (! error)
754
699
    {
755
700
      if (commitTransaction(session, false))
1019
964
  trx->set_server_id(in_session->getServerId());
1020
965
 
1021
966
  if (should_inc_trx_id)
1022
 
  {
1023
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
 
    in_session->setXaId(0);
1025
 
  }  
1026
 
  else
1027
 
  { 
1028
 
    trx->set_transaction_id(0);
1029
 
  }
 
967
    trx->set_transaction_id(getNextTransactionId());
1030
968
 
1031
969
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1032
970
}
1044
982
  delete in_transaction;
1045
983
  in_session->setStatementMessage(NULL);
1046
984
  in_session->setTransactionMessage(NULL);
1047
 
  in_session->setXaId(0);
1048
985
}
1049
986
 
1050
987
int TransactionServices::commitTransactionMessage(Session *in_session)
1053
990
  if (! replication_services.isActive())
1054
991
    return 0;
1055
992
 
1056
 
  /*
1057
 
   * If no Transaction message was ever created, then no data modification
1058
 
   * occurred inside the transaction, so nothing to do.
1059
 
   */
1060
 
  if (in_session->getTransactionMessage() == NULL)
1061
 
    return 0;
1062
 
  
1063
 
  /* If there is an active statement message, finalize it. */
 
993
  /* If there is an active statement message, finalize it */
1064
994
  message::Statement *statement= in_session->getStatementMessage();
1065
995
 
1066
996
  if (statement != NULL)
1067
997
  {
1068
998
    finalizeStatementMessage(*statement, in_session);
1069
999
  }
1070
 
 
 
1000
  else
 
1001
    return 0; /* No data modification occurred inside the transaction */
 
1002
  
1071
1003
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1072
1004
 
1073
 
  /*
1074
 
   * It is possible that we could have a Transaction without any Statements
1075
 
   * if we had created a Statement but had to roll it back due to it failing
1076
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
 
   * message. In this case, we simply clean up the message and not push it.
1078
 
   */
1079
 
  if (transaction->statement_size() == 0)
1080
 
  {
1081
 
    cleanupTransactionMessage(transaction, in_session);
1082
 
    return 0;
1083
 
  }
1084
 
  
1085
1005
  finalizeTransactionMessage(*transaction, in_session);
1086
1006
  
1087
1007
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1097
1017
{
1098
1018
  statement.set_type(in_type);
1099
1019
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1100
 
 
1101
 
  if (in_session->variables.replicate_query)
1102
 
    statement.set_sql(in_session->getQueryString()->c_str());
 
1020
  /** @TODO Set sql string optionally */
1103
1021
}
1104
1022
 
1105
1023
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1161
1079
  cleanupTransactionMessage(transaction, in_session);
1162
1080
}
1163
1081
 
1164
 
void TransactionServices::rollbackStatementMessage(Session *in_session)
1165
 
{
1166
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1167
 
  if (! replication_services.isActive())
1168
 
    return;
1169
 
 
1170
 
  message::Statement *current_statement= in_session->getStatementMessage();
1171
 
 
1172
 
  /* If we never added a Statement message, nothing to undo. */
1173
 
  if (current_statement == NULL)
1174
 
    return;
1175
 
 
1176
 
  /*
1177
 
   * If the Statement has been segmented, then we've already pushed a portion
1178
 
   * of this Statement's row changes through the replication stream and we
1179
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
 
   * delete the current Statement message.
1181
 
   */
1182
 
  bool is_segmented= false;
1183
 
 
1184
 
  switch (current_statement->type())
1185
 
  {
1186
 
    case message::Statement::INSERT:
1187
 
      if (current_statement->insert_data().segment_id() > 1)
1188
 
        is_segmented= true;
1189
 
      break;
1190
 
 
1191
 
    case message::Statement::UPDATE:
1192
 
      if (current_statement->update_data().segment_id() > 1)
1193
 
        is_segmented= true;
1194
 
      break;
1195
 
 
1196
 
    case message::Statement::DELETE:
1197
 
      if (current_statement->delete_data().segment_id() > 1)
1198
 
        is_segmented= true;
1199
 
      break;
1200
 
 
1201
 
    default:
1202
 
      break;
1203
 
  }
1204
 
 
1205
 
  /*
1206
 
   * Remove the Statement message we've been working with (same as
1207
 
   * current_statement).
1208
 
   */
1209
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
 
  statements_in_txn= transaction->mutable_statement();
1212
 
  statements_in_txn->RemoveLast();
1213
 
  in_session->setStatementMessage(NULL);
1214
 
  
1215
 
  /*
1216
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
 
   * an indicator to cancel the previous Statement message which should have
1218
 
   * had its end_segment attribute set to false.
1219
 
   */
1220
 
  if (is_segmented)
1221
 
  {
1222
 
    current_statement= transaction->add_statement();
1223
 
    initStatementMessage(*current_statement,
1224
 
                         message::Statement::ROLLBACK_STATEMENT,
1225
 
                         in_session);
1226
 
    finalizeStatementMessage(*current_statement, in_session);
1227
 
  }
1228
 
}
1229
 
  
1230
1082
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1231
1083
                                                            Table *in_table,
1232
1084
                                                            uint32_t *next_segment_id)
1414
1266
    } 
1415
1267
    else 
1416
1268
    {
1417
 
      string_value= current_field->val_str_internal(string_value);
 
1269
      string_value= current_field->val_str(string_value);
1418
1270
      record->add_is_null(false);
1419
1271
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1420
1272
      string_value->free();
1687
1539
      bool is_read_set= current_field->isReadSet();
1688
1540
 
1689
1541
      /* We need to mark that we will "read" this field... */
1690
 
      in_table->setReadSet(current_field->position());
 
1542
      in_table->setReadSet(current_field->field_index);
1691
1543
 
1692
1544
      /* Read the string value of this field's contents */
1693
 
      string_value= current_field->val_str_internal(string_value);
 
1545
      string_value= current_field->val_str(string_value);
1694
1546
 
1695
1547
      /* 
1696
1548
       * Reset the read bit after reading field to its original state.  This 
1723
1575
       * 
1724
1576
       * @todo Move this crap into a real Record API.
1725
1577
       */
1726
 
      string_value= current_field->val_str_internal(string_value,
1727
 
                                                    old_record + 
1728
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
 
1578
      string_value= current_field->val_str(string_value,
 
1579
                                           old_record + 
 
1580
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1729
1581
      record->add_key_value(string_value->c_ptr(), string_value->length());
1730
1582
      string_value->free();
1731
1583
    }
1958
1810
         */
1959
1811
        const unsigned char *old_ptr= current_field->ptr;
1960
1812
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1961
 
        string_value= current_field->val_str_internal(string_value);
 
1813
        string_value= current_field->val_str(string_value);
1962
1814
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1963
1815
      }
1964
1816
      else
1965
1817
      {
1966
 
        string_value= current_field->val_str_internal(string_value);
 
1818
        string_value= current_field->val_str(string_value);
1967
1819
        /**
1968
1820
         * @TODO Store optional old record value in the before data member
1969
1821
         */
2138
1990
  ReplicationServices &replication_services= ReplicationServices::singleton();
2139
1991
  if (! replication_services.isActive())
2140
1992
    return;
2141
 
 
 
1993
  
2142
1994
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2143
1995
  message::Statement *statement= transaction->add_statement();
2144
1996