~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-12-20 19:20:57 UTC
  • mto: This revision was merged to the branch mainline in revision 2016.
  • Revision ID: brian@tangent.org-20101220192057-1ch4b9uo008d8rje
Merge in additional fixes for sign, plus alter table, plus TIME on
processlist.

Show diffs side-by-side

added added

removed removed

Lines of Context:
690
690
     */
691
691
    if (normal_transaction)
692
692
      rollbackTransactionMessage(session);
 
693
    else
 
694
      rollbackStatementMessage(session);
693
695
 
694
696
    if (is_real_trans)
695
697
      session->transaction.xid_state.xid.null();
730
732
*/
731
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
732
734
{
 
735
  /* One GPB Statement message per SQL statement */
 
736
  message::Statement *statement= session->getStatementMessage();
 
737
  if ((statement != NULL) && (! error))
 
738
    finalizeStatementMessage(*statement, session);
733
739
 
734
740
  if (session->transaction.stmt.getResourceContexts().empty() == false)
735
741
  {
1047
1053
  if (! replication_services.isActive())
1048
1054
    return 0;
1049
1055
 
1050
 
  /* If there is an active statement message, finalize it */
 
1056
  /*
 
1057
   * If no Transaction message was ever created, then no data modification
 
1058
   * occurred inside the transaction, so nothing to do.
 
1059
   */
 
1060
  if (in_session->getTransactionMessage() == NULL)
 
1061
    return 0;
 
1062
  
 
1063
  /* If there is an active statement message, finalize it. */
1051
1064
  message::Statement *statement= in_session->getStatementMessage();
1052
1065
 
1053
1066
  if (statement != NULL)
1054
1067
  {
1055
1068
    finalizeStatementMessage(*statement, in_session);
1056
1069
  }
1057
 
  else
1058
 
    return 0; /* No data modification occurred inside the transaction */
1059
 
  
 
1070
 
1060
1071
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1061
1072
 
 
1073
  /*
 
1074
   * It is possible that we could have a Transaction without any Statements
 
1075
   * if we had created a Statement but had to roll it back due to it failing
 
1076
   * mid-execution, and no subsequent Statements were added to the Transaction
 
1077
   * message. In this case, we simply clean up the message and not push it.
 
1078
   */
 
1079
  if (transaction->statement_size() == 0)
 
1080
  {
 
1081
    cleanupTransactionMessage(transaction, in_session);
 
1082
    return 0;
 
1083
  }
 
1084
  
1062
1085
  finalizeTransactionMessage(*transaction, in_session);
1063
1086
  
1064
1087
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1138
1161
  cleanupTransactionMessage(transaction, in_session);
1139
1162
}
1140
1163
 
 
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
 
1165
{
 
1166
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1167
  if (! replication_services.isActive())
 
1168
    return;
 
1169
 
 
1170
  message::Statement *current_statement= in_session->getStatementMessage();
 
1171
 
 
1172
  /* If we never added a Statement message, nothing to undo. */
 
1173
  if (current_statement == NULL)
 
1174
    return;
 
1175
 
 
1176
  /*
 
1177
   * If the Statement has been segmented, then we've already pushed a portion
 
1178
   * of this Statement's row changes through the replication stream and we
 
1179
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
 
1180
   * delete the current Statement message.
 
1181
   */
 
1182
  bool is_segmented= false;
 
1183
 
 
1184
  switch (current_statement->type())
 
1185
  {
 
1186
    case message::Statement::INSERT:
 
1187
      if (current_statement->insert_data().segment_id() > 1)
 
1188
        is_segmented= true;
 
1189
      break;
 
1190
 
 
1191
    case message::Statement::UPDATE:
 
1192
      if (current_statement->update_data().segment_id() > 1)
 
1193
        is_segmented= true;
 
1194
      break;
 
1195
 
 
1196
    case message::Statement::DELETE:
 
1197
      if (current_statement->delete_data().segment_id() > 1)
 
1198
        is_segmented= true;
 
1199
      break;
 
1200
 
 
1201
    default:
 
1202
      break;
 
1203
  }
 
1204
 
 
1205
  /*
 
1206
   * Remove the Statement message we've been working with (same as
 
1207
   * current_statement).
 
1208
   */
 
1209
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1210
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
 
1211
  statements_in_txn= transaction->mutable_statement();
 
1212
  statements_in_txn->RemoveLast();
 
1213
  in_session->setStatementMessage(NULL);
 
1214
  
 
1215
  /*
 
1216
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
 
1217
   * an indicator to cancel the previous Statement message which should have
 
1218
   * had its end_segment attribute set to false.
 
1219
   */
 
1220
  if (is_segmented)
 
1221
  {
 
1222
    current_statement= transaction->add_statement();
 
1223
    initStatementMessage(*current_statement,
 
1224
                         message::Statement::ROLLBACK_STATEMENT,
 
1225
                         in_session);
 
1226
    finalizeStatementMessage(*current_statement, in_session);
 
1227
  }
 
1228
}
 
1229
  
1141
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1142
1231
                                                            Table *in_table,
1143
1232
                                                            uint32_t *next_segment_id)
1213
1302
      }
1214
1303
      else
1215
1304
      {
1216
 
        /* append this INSERT query string */
1217
 
        if (in_session->variables.replicate_query)
1218
 
        {
1219
 
          string s(statement->sql());
1220
 
          if (not s.empty())
1221
 
          {
1222
 
            s.append(" ; ");
1223
 
            s.append(in_session->getQueryString()->c_str());
1224
 
            statement->set_sql(s);
1225
 
          }
1226
 
          else
1227
 
            statement->set_sql(in_session->getQueryString()->c_str());
1228
 
        }
1229
 
 
1230
1305
        /* carry forward the existing segment id */
1231
1306
        const message::InsertData &current_data= statement->insert_data();
1232
1307
        *next_segment_id= current_data.segment_id();
1339
1414
    } 
1340
1415
    else 
1341
1416
    {
1342
 
      string_value= current_field->val_str(string_value);
 
1417
      string_value= current_field->val_str_internal(string_value);
1343
1418
      record->add_is_null(false);
1344
1419
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1345
1420
      string_value->free();
1414
1489
    {
1415
1490
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1416
1491
      {
1417
 
        /* append this UPDATE query string */
1418
 
        if (in_session->variables.replicate_query)
1419
 
        {
1420
 
          string s(statement->sql());
1421
 
          if (not s.empty())
1422
 
          {
1423
 
            s.append(" ; ");
1424
 
            s.append(in_session->getQueryString()->c_str());
1425
 
            statement->set_sql(s);
1426
 
          }
1427
 
          else
1428
 
            statement->set_sql(in_session->getQueryString()->c_str());
1429
 
        }
1430
 
 
1431
1492
        /* carry forward the existing segment id */
1432
1493
        const message::UpdateData &current_data= statement->update_data();
1433
1494
        *next_segment_id= current_data.segment_id();
1626
1687
      bool is_read_set= current_field->isReadSet();
1627
1688
 
1628
1689
      /* We need to mark that we will "read" this field... */
1629
 
      in_table->setReadSet(current_field->field_index);
 
1690
      in_table->setReadSet(current_field->position());
1630
1691
 
1631
1692
      /* Read the string value of this field's contents */
1632
 
      string_value= current_field->val_str(string_value);
 
1693
      string_value= current_field->val_str_internal(string_value);
1633
1694
 
1634
1695
      /* 
1635
1696
       * Reset the read bit after reading field to its original state.  This 
1662
1723
       * 
1663
1724
       * @todo Move this crap into a real Record API.
1664
1725
       */
1665
 
      string_value= current_field->val_str(string_value,
1666
 
                                           old_record + 
1667
 
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
1726
      string_value= current_field->val_str_internal(string_value,
 
1727
                                                    old_record + 
 
1728
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
1668
1729
      record->add_key_value(string_value->c_ptr(), string_value->length());
1669
1730
      string_value->free();
1670
1731
    }
1787
1848
      }
1788
1849
      else
1789
1850
      {
1790
 
        /* append this DELETE query string */
1791
 
        if (in_session->variables.replicate_query)
1792
 
        {
1793
 
          string s(statement->sql());
1794
 
          if (not s.empty())
1795
 
          {
1796
 
            s.append(" ; ");
1797
 
            s.append(in_session->getQueryString()->c_str());
1798
 
            statement->set_sql(s);
1799
 
          }
1800
 
          else
1801
 
            statement->set_sql(in_session->getQueryString()->c_str());
1802
 
        }
1803
 
 
1804
1851
        /* carry forward the existing segment id */
1805
1852
        const message::DeleteData &current_data= statement->delete_data();
1806
1853
        *next_segment_id= current_data.segment_id();
1911
1958
         */
1912
1959
        const unsigned char *old_ptr= current_field->ptr;
1913
1960
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1914
 
        string_value= current_field->val_str(string_value);
 
1961
        string_value= current_field->val_str_internal(string_value);
1915
1962
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1916
1963
      }
1917
1964
      else
1918
1965
      {
1919
 
        string_value= current_field->val_str(string_value);
 
1966
        string_value= current_field->val_str_internal(string_value);
1920
1967
        /**
1921
1968
         * @TODO Store optional old record value in the before data member
1922
1969
         */
1927
1974
  }
1928
1975
}
1929
1976
 
1930
 
 
1931
 
/**
1932
 
 * Template for removing Statement records of different types.
1933
 
 *
1934
 
 * The code for removing records from different Statement message types
1935
 
 * is identical except for the class types that are embedded within the
1936
 
 * Statement.
1937
 
 *
1938
 
 * There are 3 scenarios we need to look for:
1939
 
 *   - We've been asked to remove more records than exist in the Statement
1940
 
 *   - We've been asked to remove less records than exist in the Statement
1941
 
 *   - We've been asked to remove ALL records that exist in the Statement
1942
 
 *
1943
 
 * If we are removing ALL records, then effectively we would be left with
1944
 
 * an empty Statement message, so we should just remove it and clean up
1945
 
 * message pointers in the Session object.
1946
 
 */
1947
 
template <class DataType, class RecordType>
1948
 
static bool removeStatementRecordsWithType(Session *session,
1949
 
                                           DataType *data,
1950
 
                                           uint32_t count)
1951
 
{
1952
 
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1953
 
 
1954
 
  /* If there aren't enough records to remove 'count' of them, error. */
1955
 
  if (num_avail_recs < count)
1956
 
    return false;
1957
 
 
1958
 
  /*
1959
 
   * If we are removing all of the data records, we'll just remove this
1960
 
   * entire Statement message.
1961
 
   */
1962
 
  if (num_avail_recs == count)
1963
 
  {
1964
 
    message::Transaction *transaction= session->getTransactionMessage();
1965
 
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1966
 
    statements->RemoveLast();
1967
 
 
1968
 
    /*
1969
 
     * Now need to set the Session Statement pointer to either the previous
1970
 
     * Statement, or NULL if there isn't one.
1971
 
     */
1972
 
    if (statements->size() == 0)
1973
 
    {
1974
 
      session->setStatementMessage(NULL);
1975
 
    }
1976
 
    else
1977
 
    {
1978
 
      /*
1979
 
       * There isn't a great way to get a pointer to the previous Statement
1980
 
       * message using the RepeatedPtrField object, so we'll just get to it
1981
 
       * using the Transaction message.
1982
 
       */
1983
 
      int last_stmt_idx= transaction->statement_size() - 1;
1984
 
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1985
 
    }
1986
 
  }
1987
 
  /* We only need to remove 'count' records */
1988
 
  else if (num_avail_recs > count)
1989
 
  {
1990
 
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1991
 
    while (count--)
1992
 
      records->RemoveLast();
1993
 
  }
1994
 
 
1995
 
  return true;
1996
 
}
1997
 
 
1998
 
 
1999
 
bool TransactionServices::removeStatementRecords(Session *session,
2000
 
                                                 uint32_t count)
2001
 
{
2002
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2003
 
  if (! replication_services.isActive())
2004
 
    return false;
2005
 
 
2006
 
  /* Get the most current Statement */
2007
 
  message::Statement *statement= session->getStatementMessage();
2008
 
 
2009
 
  /* Make sure we have work to do */
2010
 
  if (statement == NULL)
2011
 
    return false;
2012
 
 
2013
 
  bool retval= false;
2014
 
 
2015
 
  switch (statement->type())
2016
 
  {
2017
 
    case message::Statement::INSERT:
2018
 
    {
2019
 
      message::InsertData *data= statement->mutable_insert_data();
2020
 
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
2021
 
      break;
2022
 
    }
2023
 
 
2024
 
    case message::Statement::UPDATE:
2025
 
    {
2026
 
      message::UpdateData *data= statement->mutable_update_data();
2027
 
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
2028
 
      break;
2029
 
    }
2030
 
 
2031
 
    case message::Statement::DELETE:  /* not sure if this one is possible... */
2032
 
    {
2033
 
      message::DeleteData *data= statement->mutable_delete_data();
2034
 
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2035
 
      break;
2036
 
    }
2037
 
 
2038
 
    default:
2039
 
      retval= false;
2040
 
      break;
2041
 
  }
2042
 
 
2043
 
  return retval;
2044
 
}
2045
 
 
2046
 
 
2047
1977
void TransactionServices::createTable(Session *in_session,
2048
1978
                                      const message::Table &table)
2049
1979
{