~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-12-07 09:12:12 UTC
  • mto: This revision was merged to the branch mainline in revision 1985.
  • Revision ID: brian@tangent.org-20101207091212-1m0w20tck6z7632m
This is a fix for bug lp:686197

Show diffs side-by-side

added added

removed removed

Lines of Context:
690
690
     */
691
691
    if (normal_transaction)
692
692
      rollbackTransactionMessage(session);
693
 
    else
694
 
      rollbackStatementMessage(session);
695
693
 
696
694
    if (is_real_trans)
697
695
      session->transaction.xid_state.xid.null();
732
730
*/
733
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
734
732
{
735
 
  /* One GPB Statement message per SQL statement */
736
 
  message::Statement *statement= session->getStatementMessage();
737
 
  if ((statement != NULL) && (! error))
738
 
    finalizeStatementMessage(*statement, session);
739
733
 
740
734
  if (session->transaction.stmt.getResourceContexts().empty() == false)
741
735
  {
1053
1047
  if (! replication_services.isActive())
1054
1048
    return 0;
1055
1049
 
1056
 
  /*
1057
 
   * If no Transaction message was ever created, then no data modification
1058
 
   * occurred inside the transaction, so nothing to do.
1059
 
   */
1060
 
  if (in_session->getTransactionMessage() == NULL)
1061
 
    return 0;
1062
 
  
1063
 
  /* If there is an active statement message, finalize it. */
 
1050
  /* If there is an active statement message, finalize it */
1064
1051
  message::Statement *statement= in_session->getStatementMessage();
1065
1052
 
1066
1053
  if (statement != NULL)
1067
1054
  {
1068
1055
    finalizeStatementMessage(*statement, in_session);
1069
1056
  }
1070
 
 
 
1057
  else
 
1058
    return 0; /* No data modification occurred inside the transaction */
 
1059
  
1071
1060
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1072
1061
 
1073
 
  /*
1074
 
   * It is possible that we could have a Transaction without any Statements
1075
 
   * if we had created a Statement but had to roll it back due to it failing
1076
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
 
   * message. In this case, we simply clean up the message and not push it.
1078
 
   */
1079
 
  if (transaction->statement_size() == 0)
1080
 
  {
1081
 
    cleanupTransactionMessage(transaction, in_session);
1082
 
    return 0;
1083
 
  }
1084
 
  
1085
1062
  finalizeTransactionMessage(*transaction, in_session);
1086
1063
  
1087
1064
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1161
1138
  cleanupTransactionMessage(transaction, in_session);
1162
1139
}
1163
1140
 
1164
 
void TransactionServices::rollbackStatementMessage(Session *in_session)
1165
 
{
1166
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1167
 
  if (! replication_services.isActive())
1168
 
    return;
1169
 
 
1170
 
  message::Statement *current_statement= in_session->getStatementMessage();
1171
 
 
1172
 
  /* If we never added a Statement message, nothing to undo. */
1173
 
  if (current_statement == NULL)
1174
 
    return;
1175
 
 
1176
 
  /*
1177
 
   * If the Statement has been segmented, then we've already pushed a portion
1178
 
   * of this Statement's row changes through the replication stream and we
1179
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
 
   * delete the current Statement message.
1181
 
   */
1182
 
  bool is_segmented= false;
1183
 
 
1184
 
  switch (current_statement->type())
1185
 
  {
1186
 
    case message::Statement::INSERT:
1187
 
      if (current_statement->insert_data().segment_id() > 1)
1188
 
        is_segmented= true;
1189
 
      break;
1190
 
 
1191
 
    case message::Statement::UPDATE:
1192
 
      if (current_statement->update_data().segment_id() > 1)
1193
 
        is_segmented= true;
1194
 
      break;
1195
 
 
1196
 
    case message::Statement::DELETE:
1197
 
      if (current_statement->delete_data().segment_id() > 1)
1198
 
        is_segmented= true;
1199
 
      break;
1200
 
 
1201
 
    default:
1202
 
      break;
1203
 
  }
1204
 
 
1205
 
  /*
1206
 
   * Remove the Statement message we've been working with (same as
1207
 
   * current_statement).
1208
 
   */
1209
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
 
  statements_in_txn= transaction->mutable_statement();
1212
 
  statements_in_txn->RemoveLast();
1213
 
  in_session->setStatementMessage(NULL);
1214
 
  
1215
 
  /*
1216
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
 
   * an indicator to cancel the previous Statement message which should have
1218
 
   * had its end_segment attribute set to false.
1219
 
   */
1220
 
  if (is_segmented)
1221
 
  {
1222
 
    current_statement= transaction->add_statement();
1223
 
    initStatementMessage(*current_statement,
1224
 
                         message::Statement::ROLLBACK_STATEMENT,
1225
 
                         in_session);
1226
 
    finalizeStatementMessage(*current_statement, in_session);
1227
 
  }
1228
 
}
1229
 
  
1230
1141
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1231
1142
                                                            Table *in_table,
1232
1143
                                                            uint32_t *next_segment_id)
1302
1213
      }
1303
1214
      else
1304
1215
      {
 
1216
        /* append this INSERT query string */
 
1217
        if (in_session->variables.replicate_query)
 
1218
        {
 
1219
          string s(statement->sql());
 
1220
          if (not s.empty())
 
1221
          {
 
1222
            s.append(" ; ");
 
1223
            s.append(in_session->getQueryString()->c_str());
 
1224
            statement->set_sql(s);
 
1225
          }
 
1226
          else
 
1227
            statement->set_sql(in_session->getQueryString()->c_str());
 
1228
        }
 
1229
 
1305
1230
        /* carry forward the existing segment id */
1306
1231
        const message::InsertData &current_data= statement->insert_data();
1307
1232
        *next_segment_id= current_data.segment_id();
1414
1339
    } 
1415
1340
    else 
1416
1341
    {
1417
 
      string_value= current_field->val_str_internal(string_value);
 
1342
      string_value= current_field->val_str(string_value);
1418
1343
      record->add_is_null(false);
1419
1344
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1420
1345
      string_value->free();
1489
1414
    {
1490
1415
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1491
1416
      {
 
1417
        /* append this UPDATE query string */
 
1418
        if (in_session->variables.replicate_query)
 
1419
        {
 
1420
          string s(statement->sql());
 
1421
          if (not s.empty())
 
1422
          {
 
1423
            s.append(" ; ");
 
1424
            s.append(in_session->getQueryString()->c_str());
 
1425
            statement->set_sql(s);
 
1426
          }
 
1427
          else
 
1428
            statement->set_sql(in_session->getQueryString()->c_str());
 
1429
        }
 
1430
 
1492
1431
        /* carry forward the existing segment id */
1493
1432
        const message::UpdateData &current_data= statement->update_data();
1494
1433
        *next_segment_id= current_data.segment_id();
1687
1626
      bool is_read_set= current_field->isReadSet();
1688
1627
 
1689
1628
      /* We need to mark that we will "read" this field... */
1690
 
      in_table->setReadSet(current_field->position());
 
1629
      in_table->setReadSet(current_field->field_index);
1691
1630
 
1692
1631
      /* Read the string value of this field's contents */
1693
 
      string_value= current_field->val_str_internal(string_value);
 
1632
      string_value= current_field->val_str(string_value);
1694
1633
 
1695
1634
      /* 
1696
1635
       * Reset the read bit after reading field to its original state.  This 
1723
1662
       * 
1724
1663
       * @todo Move this crap into a real Record API.
1725
1664
       */
1726
 
      string_value= current_field->val_str_internal(string_value,
1727
 
                                                    old_record + 
1728
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
 
1665
      string_value= current_field->val_str(string_value,
 
1666
                                           old_record + 
 
1667
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1729
1668
      record->add_key_value(string_value->c_ptr(), string_value->length());
1730
1669
      string_value->free();
1731
1670
    }
1848
1787
      }
1849
1788
      else
1850
1789
      {
 
1790
        /* append this DELETE query string */
 
1791
        if (in_session->variables.replicate_query)
 
1792
        {
 
1793
          string s(statement->sql());
 
1794
          if (not s.empty())
 
1795
          {
 
1796
            s.append(" ; ");
 
1797
            s.append(in_session->getQueryString()->c_str());
 
1798
            statement->set_sql(s);
 
1799
          }
 
1800
          else
 
1801
            statement->set_sql(in_session->getQueryString()->c_str());
 
1802
        }
 
1803
 
1851
1804
        /* carry forward the existing segment id */
1852
1805
        const message::DeleteData &current_data= statement->delete_data();
1853
1806
        *next_segment_id= current_data.segment_id();
1958
1911
         */
1959
1912
        const unsigned char *old_ptr= current_field->ptr;
1960
1913
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1961
 
        string_value= current_field->val_str_internal(string_value);
 
1914
        string_value= current_field->val_str(string_value);
1962
1915
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1963
1916
      }
1964
1917
      else
1965
1918
      {
1966
 
        string_value= current_field->val_str_internal(string_value);
 
1919
        string_value= current_field->val_str(string_value);
1967
1920
        /**
1968
1921
         * @TODO Store optional old record value in the before data member
1969
1922
         */
1974
1927
  }
1975
1928
}
1976
1929
 
 
1930
 
 
1931
/**
 
1932
 * Template for removing Statement records of different types.
 
1933
 *
 
1934
 * The code for removing records from different Statement message types
 
1935
 * is identical except for the class types that are embedded within the
 
1936
 * Statement.
 
1937
 *
 
1938
 * There are 3 scenarios we need to look for:
 
1939
 *   - We've been asked to remove more records than exist in the Statement
 
1940
 *   - We've been asked to remove less records than exist in the Statement
 
1941
 *   - We've been asked to remove ALL records that exist in the Statement
 
1942
 *
 
1943
 * If we are removing ALL records, then effectively we would be left with
 
1944
 * an empty Statement message, so we should just remove it and clean up
 
1945
 * message pointers in the Session object.
 
1946
 */
 
1947
template <class DataType, class RecordType>
 
1948
static bool removeStatementRecordsWithType(Session *session,
 
1949
                                           DataType *data,
 
1950
                                           uint32_t count)
 
1951
{
 
1952
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1953
 
 
1954
  /* If there aren't enough records to remove 'count' of them, error. */
 
1955
  if (num_avail_recs < count)
 
1956
    return false;
 
1957
 
 
1958
  /*
 
1959
   * If we are removing all of the data records, we'll just remove this
 
1960
   * entire Statement message.
 
1961
   */
 
1962
  if (num_avail_recs == count)
 
1963
  {
 
1964
    message::Transaction *transaction= session->getTransactionMessage();
 
1965
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1966
    statements->RemoveLast();
 
1967
 
 
1968
    /*
 
1969
     * Now need to set the Session Statement pointer to either the previous
 
1970
     * Statement, or NULL if there isn't one.
 
1971
     */
 
1972
    if (statements->size() == 0)
 
1973
    {
 
1974
      session->setStatementMessage(NULL);
 
1975
    }
 
1976
    else
 
1977
    {
 
1978
      /*
 
1979
       * There isn't a great way to get a pointer to the previous Statement
 
1980
       * message using the RepeatedPtrField object, so we'll just get to it
 
1981
       * using the Transaction message.
 
1982
       */
 
1983
      int last_stmt_idx= transaction->statement_size() - 1;
 
1984
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1985
    }
 
1986
  }
 
1987
  /* We only need to remove 'count' records */
 
1988
  else if (num_avail_recs > count)
 
1989
  {
 
1990
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1991
    while (count--)
 
1992
      records->RemoveLast();
 
1993
  }
 
1994
 
 
1995
  return true;
 
1996
}
 
1997
 
 
1998
 
 
1999
bool TransactionServices::removeStatementRecords(Session *session,
 
2000
                                                 uint32_t count)
 
2001
{
 
2002
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2003
  if (! replication_services.isActive())
 
2004
    return false;
 
2005
 
 
2006
  /* Get the most current Statement */
 
2007
  message::Statement *statement= session->getStatementMessage();
 
2008
 
 
2009
  /* Make sure we have work to do */
 
2010
  if (statement == NULL)
 
2011
    return false;
 
2012
 
 
2013
  bool retval= false;
 
2014
 
 
2015
  switch (statement->type())
 
2016
  {
 
2017
    case message::Statement::INSERT:
 
2018
    {
 
2019
      message::InsertData *data= statement->mutable_insert_data();
 
2020
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
2021
      break;
 
2022
    }
 
2023
 
 
2024
    case message::Statement::UPDATE:
 
2025
    {
 
2026
      message::UpdateData *data= statement->mutable_update_data();
 
2027
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
2028
      break;
 
2029
    }
 
2030
 
 
2031
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
2032
    {
 
2033
      message::DeleteData *data= statement->mutable_delete_data();
 
2034
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
2035
      break;
 
2036
    }
 
2037
 
 
2038
    default:
 
2039
      retval= false;
 
2040
      break;
 
2041
  }
 
2042
 
 
2043
  return retval;
 
2044
}
 
2045
 
 
2046
 
1977
2047
void TransactionServices::createTable(Session *in_session,
1978
2048
                                      const message::Table &table)
1979
2049
{