~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
71
#include "drizzled/plugin/xa_resource_manager.h"
 
72
#include "drizzled/plugin/xa_storage_engine.h"
72
73
#include "drizzled/internal/my_sys.h"
73
74
 
74
 
using namespace std;
75
 
 
76
75
#include <vector>
77
76
#include <algorithm>
78
77
#include <functional>
 
78
#include <google/protobuf/repeated_field.h>
 
79
 
 
80
using namespace std;
 
81
using namespace google;
79
82
 
80
83
namespace drizzled
81
84
{
297
300
 * transaction after all DDLs, just like the statement transaction
298
301
 * is always committed at the end of all statements.
299
302
 */
 
303
TransactionServices::TransactionServices()
 
304
{
 
305
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
 
306
  if (engine)
 
307
  {
 
308
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
 
309
  }
 
310
  else 
 
311
  {
 
312
    xa_storage_engine= NULL;
 
313
  }
 
314
}
 
315
 
300
316
void TransactionServices::registerResourceForStatement(Session *session,
301
317
                                                       plugin::MonitoredInTransaction *monitored,
302
318
                                                       plugin::TransactionalStorageEngine *engine)
424
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
425
441
}
426
442
 
 
443
void TransactionServices::allocateNewTransactionId()
 
444
{
 
445
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
446
  if (! replication_services.isActive())
 
447
  {
 
448
    return;
 
449
  }
 
450
 
 
451
  Session *my_session= current_session;
 
452
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
 
453
  my_session->setXaId(xa_id);
 
454
}
 
455
 
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
457
{
 
458
  if (session->getXaId() == 0)
 
459
  {
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
461
  }
 
462
 
 
463
  return session->getXaId();
 
464
}
 
465
 
427
466
/**
428
467
  @retval
429
468
    0   ok
974
1013
  trx->set_server_id(in_session->getServerId());
975
1014
 
976
1015
  if (should_inc_trx_id)
977
 
    trx->set_transaction_id(getNextTransactionId());
 
1016
  {
 
1017
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1018
    in_session->setXaId(0);
 
1019
  }  
 
1020
  else
 
1021
  { 
 
1022
    trx->set_transaction_id(0);
 
1023
  }
978
1024
 
979
1025
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
980
1026
}
1027
1073
{
1028
1074
  statement.set_type(in_type);
1029
1075
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1030
 
  /** @TODO Set sql string optionally */
1031
1076
}
1032
1077
 
1033
1078
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1836
1881
  }
1837
1882
}
1838
1883
 
 
1884
 
 
1885
/**
 
1886
 * Template for removing Statement records of different types.
 
1887
 *
 
1888
 * The code for removing records from different Statement message types
 
1889
 * is identical except for the class types that are embedded within the
 
1890
 * Statement.
 
1891
 *
 
1892
 * There are 3 scenarios we need to look for:
 
1893
 *   - We've been asked to remove more records than exist in the Statement
 
1894
 *   - We've been asked to remove less records than exist in the Statement
 
1895
 *   - We've been asked to remove ALL records that exist in the Statement
 
1896
 *
 
1897
 * If we are removing ALL records, then effectively we would be left with
 
1898
 * an empty Statement message, so we should just remove it and clean up
 
1899
 * message pointers in the Session object.
 
1900
 */
 
1901
template <class DataType, class RecordType>
 
1902
static bool removeStatementRecordsWithType(Session *session,
 
1903
                                           DataType *data,
 
1904
                                           uint32_t count)
 
1905
{
 
1906
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1907
 
 
1908
  /* If there aren't enough records to remove 'count' of them, error. */
 
1909
  if (num_avail_recs < count)
 
1910
    return false;
 
1911
 
 
1912
  /*
 
1913
   * If we are removing all of the data records, we'll just remove this
 
1914
   * entire Statement message.
 
1915
   */
 
1916
  if (num_avail_recs == count)
 
1917
  {
 
1918
    message::Transaction *transaction= session->getTransactionMessage();
 
1919
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1920
    statements->RemoveLast();
 
1921
 
 
1922
    /*
 
1923
     * Now need to set the Session Statement pointer to either the previous
 
1924
     * Statement, or NULL if there isn't one.
 
1925
     */
 
1926
    if (statements->size() == 0)
 
1927
    {
 
1928
      session->setStatementMessage(NULL);
 
1929
    }
 
1930
    else
 
1931
    {
 
1932
      /*
 
1933
       * There isn't a great way to get a pointer to the previous Statement
 
1934
       * message using the RepeatedPtrField object, so we'll just get to it
 
1935
       * using the Transaction message.
 
1936
       */
 
1937
      int last_stmt_idx= transaction->statement_size() - 1;
 
1938
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1939
    }
 
1940
  }
 
1941
  /* We only need to remove 'count' records */
 
1942
  else if (num_avail_recs > count)
 
1943
  {
 
1944
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1945
    while (count--)
 
1946
      records->RemoveLast();
 
1947
  }
 
1948
 
 
1949
  return true;
 
1950
}
 
1951
 
 
1952
 
 
1953
bool TransactionServices::removeStatementRecords(Session *session,
 
1954
                                                 uint32_t count)
 
1955
{
 
1956
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1957
  if (! replication_services.isActive())
 
1958
    return false;
 
1959
 
 
1960
  /* Get the most current Statement */
 
1961
  message::Statement *statement= session->getStatementMessage();
 
1962
 
 
1963
  /* Make sure we have work to do */
 
1964
  if (statement == NULL)
 
1965
    return false;
 
1966
 
 
1967
  bool retval= false;
 
1968
 
 
1969
  switch (statement->type())
 
1970
  {
 
1971
    case message::Statement::INSERT:
 
1972
    {
 
1973
      message::InsertData *data= statement->mutable_insert_data();
 
1974
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
1975
      break;
 
1976
    }
 
1977
 
 
1978
    case message::Statement::UPDATE:
 
1979
    {
 
1980
      message::UpdateData *data= statement->mutable_update_data();
 
1981
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
1982
      break;
 
1983
    }
 
1984
 
 
1985
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
1986
    {
 
1987
      message::DeleteData *data= statement->mutable_delete_data();
 
1988
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
1989
      break;
 
1990
    }
 
1991
 
 
1992
    default:
 
1993
      retval= false;
 
1994
      break;
 
1995
  }
 
1996
 
 
1997
  return retval;
 
1998
}
 
1999
 
 
2000
 
1839
2001
void TransactionServices::createTable(Session *in_session,
1840
2002
                                      const message::Table &table)
1841
2003
{