~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

Style changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
/**
22
22
 * @file Transaction processing code
 
23
 *
 
24
 * @note
 
25
 *
 
26
 * The TransactionServices component takes internal events (for instance the start of a 
 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
 
29
 * component and used during replication.
 
30
 *
 
31
 * The reason for this functionality is to encapsulate all communication
 
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
33
 * Instead of the plugin having to understand the (often fluidly changing)
 
34
 * mechanics of the kernel, all the plugin needs to understand is the message
 
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
36
 * these messages.
 
37
 *
 
38
 * @see /drizzled/message/transaction.proto
 
39
 *
 
40
 * @todo
 
41
 *
 
42
 * We really should store the raw bytes in the messages, not the
 
43
 * String value of the Field.  But, to do that, the
 
44
 * statement_transform library needs first to be updated
 
45
 * to include the transformation code to convert raw
 
46
 * Drizzle-internal Field byte representation into something
 
47
 * plugins can understand.
23
48
 */
24
49
 
25
50
#include "config.h"
33
58
#include "drizzled/replication_services.h"
34
59
#include "drizzled/transaction_services.h"
35
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
36
63
#include "drizzled/resource_context.h"
37
64
#include "drizzled/lock.h"
38
65
#include "drizzled/item/int.h"
619
646
       * We commit the normal transaction by finalizing the transaction message
620
647
       * and propogating the message to all registered replicators.
621
648
       */
622
 
      ReplicationServices &replication_services= ReplicationServices::singleton();
623
 
      replication_services.commitTransaction(session);
 
649
      commitTransactionMessage(session);
624
650
    }
625
651
  }
626
652
  return error;
686
712
     * a rollback statement with the corresponding transaction ID
687
713
     * to rollback.
688
714
     */
689
 
    ReplicationServices &replication_services= ReplicationServices::singleton();
690
 
    replication_services.rollbackTransaction(session);
 
715
    rollbackTransactionMessage(session);
691
716
 
692
717
    if (is_real_trans)
693
718
      session->transaction.xid_state.xid.null();
963
988
  return error;
964
989
}
965
990
 
 
991
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
 
992
{
 
993
  message::Transaction *transaction= in_session->getTransactionMessage();
 
994
 
 
995
  if (unlikely(transaction == NULL))
 
996
  {
 
997
    /* 
 
998
     * Allocate and initialize a new transaction message 
 
999
     * for this Session object.  Session is responsible for
 
1000
     * deleting transaction message when done with it.
 
1001
     */
 
1002
    transaction= new (nothrow) message::Transaction();
 
1003
    initTransactionMessage(*transaction, in_session);
 
1004
    in_session->setTransactionMessage(transaction);
 
1005
    return transaction;
 
1006
  }
 
1007
  else
 
1008
    return transaction;
 
1009
}
 
1010
 
 
1011
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1012
                                          Session *in_session)
 
1013
{
 
1014
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1015
  trx->set_server_id(in_session->getServerId());
 
1016
  trx->set_transaction_id(in_session->getQueryId());
 
1017
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1018
}
 
1019
 
 
1020
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1021
                                              Session *in_session)
 
1022
{
 
1023
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1024
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1025
}
 
1026
 
 
1027
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1028
                                             Session *in_session)
 
1029
{
 
1030
  delete in_transaction;
 
1031
  in_session->setStatementMessage(NULL);
 
1032
  in_session->setTransactionMessage(NULL);
 
1033
}
 
1034
 
 
1035
void TransactionServices::commitTransactionMessage(Session *in_session)
 
1036
{
 
1037
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1038
  if (! replication_services.isActive())
 
1039
    return;
 
1040
 
 
1041
  /* If there is an active statement message, finalize it */
 
1042
  message::Statement *statement= in_session->getStatementMessage();
 
1043
 
 
1044
  if (statement != NULL)
 
1045
  {
 
1046
    finalizeStatementMessage(*statement, in_session);
 
1047
  }
 
1048
  else
 
1049
    return; /* No data modification occurred inside the transaction */
 
1050
  
 
1051
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1052
 
 
1053
  finalizeTransactionMessage(*transaction, in_session);
 
1054
  
 
1055
  replication_services.pushTransactionMessage(*transaction);
 
1056
 
 
1057
  cleanupTransactionMessage(transaction, in_session);
 
1058
}
 
1059
 
 
1060
void TransactionServices::initStatementMessage(message::Statement &statement,
 
1061
                                        message::Statement::Type in_type,
 
1062
                                        Session *in_session)
 
1063
{
 
1064
  statement.set_type(in_type);
 
1065
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1066
  /** @TODO Set sql string optionally */
 
1067
}
 
1068
 
 
1069
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
 
1070
                                            Session *in_session)
 
1071
{
 
1072
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1073
  in_session->setStatementMessage(NULL);
 
1074
}
 
1075
 
 
1076
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1077
{
 
1078
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1079
  if (! replication_services.isActive())
 
1080
    return;
 
1081
  
 
1082
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1083
 
 
1084
  /*
 
1085
   * OK, so there are two situations that we need to deal with here:
 
1086
   *
 
1087
   * 1) We receive an instruction to ROLLBACK the current transaction
 
1088
   *    and the currently-stored Transaction message is *self-contained*, 
 
1089
   *    meaning that no Statement messages in the Transaction message
 
1090
   *    contain a message having its segment_id member greater than 1.  If
 
1091
   *    no non-segment ID 1 members are found, we can simply clear the
 
1092
   *    current Transaction message and remove it from memory.
 
1093
   *
 
1094
   * 2) If the Transaction message does indeed have a non-end segment, that
 
1095
   *    means that a bulk update/delete/insert Transaction message segment
 
1096
   *    has previously been sent over the wire to replicators.  In this case, 
 
1097
   *    we need to package a Transaction with a Statement message of type
 
1098
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
1099
   *    messages must be un-applied.
 
1100
   */
 
1101
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
 
1102
  {
 
1103
    /*
 
1104
     * Clear the transaction, create a Rollback statement message, 
 
1105
     * attach it to the transaction, and push it to replicators.
 
1106
     */
 
1107
    transaction->Clear();
 
1108
    initTransactionMessage(*transaction, in_session);
 
1109
 
 
1110
    message::Statement *statement= transaction->add_statement();
 
1111
 
 
1112
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1113
    finalizeStatementMessage(*statement, in_session);
 
1114
 
 
1115
    finalizeTransactionMessage(*transaction, in_session);
 
1116
    
 
1117
    replication_services.pushTransactionMessage(*transaction);
 
1118
  }
 
1119
  cleanupTransactionMessage(transaction, in_session);
 
1120
}
 
1121
 
 
1122
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1123
                                                                 Table *in_table)
 
1124
{
 
1125
  message::Statement *statement= in_session->getStatementMessage();
 
1126
  /*
 
1127
   * We check to see if the current Statement message is of type INSERT.
 
1128
   * If it is not, we finalize the current Statement and ensure a new
 
1129
   * InsertStatement is created.
 
1130
   */
 
1131
  if (statement != NULL &&
 
1132
      statement->type() != message::Statement::INSERT)
 
1133
  {
 
1134
    finalizeStatementMessage(*statement, in_session);
 
1135
    statement= in_session->getStatementMessage();
 
1136
  }
 
1137
 
 
1138
  if (statement == NULL)
 
1139
  {
 
1140
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1141
    /* 
 
1142
     * Transaction message initialized and set, but no statement created
 
1143
     * yet.  We construct one and initialize it, here, then return the
 
1144
     * message after attaching the new Statement message pointer to the 
 
1145
     * Session for easy retrieval later...
 
1146
     */
 
1147
    statement= transaction->add_statement();
 
1148
    setInsertHeader(*statement, in_session, in_table);
 
1149
    in_session->setStatementMessage(statement);
 
1150
  }
 
1151
  return *statement;
 
1152
}
 
1153
 
 
1154
void TransactionServices::setInsertHeader(message::Statement &statement,
 
1155
                                          Session *in_session,
 
1156
                                          Table *in_table)
 
1157
{
 
1158
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1159
 
 
1160
  /* 
 
1161
   * Now we construct the specialized InsertHeader message inside
 
1162
   * the generalized message::Statement container...
 
1163
   */
 
1164
  /* Set up the insert header */
 
1165
  message::InsertHeader *header= statement.mutable_insert_header();
 
1166
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1167
 
 
1168
  string schema_name;
 
1169
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1170
  string table_name;
 
1171
  (void) in_table->getShare()->getTableName(table_name);
 
1172
 
 
1173
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1174
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1175
 
 
1176
  Field *current_field;
 
1177
  Field **table_fields= in_table->field;
 
1178
 
 
1179
  message::FieldMetadata *field_metadata;
 
1180
 
 
1181
  /* We will read all the table's fields... */
 
1182
  in_table->setReadSet();
 
1183
 
 
1184
  while ((current_field= *table_fields++) != NULL) 
 
1185
  {
 
1186
    field_metadata= header->add_field_metadata();
 
1187
    field_metadata->set_name(current_field->field_name);
 
1188
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1189
  }
 
1190
}
 
1191
 
 
1192
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1193
{
 
1194
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1195
  if (! replication_services.isActive())
 
1196
    return false;
 
1197
  /**
 
1198
   * We do this check here because we don't want to even create a 
 
1199
   * statement if there isn't a primary key on the table...
 
1200
   *
 
1201
   * @todo
 
1202
   *
 
1203
   * Multi-column primary keys are handled how exactly?
 
1204
   */
 
1205
  if (in_table->s->primary_key == MAX_KEY)
 
1206
  {
 
1207
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
 
1208
    return true;
 
1209
  }
 
1210
 
 
1211
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
1212
 
 
1213
  message::InsertData *data= statement.mutable_insert_data();
 
1214
  data->set_segment_id(1);
 
1215
  data->set_end_segment(true);
 
1216
  message::InsertRecord *record= data->add_record();
 
1217
 
 
1218
  Field *current_field;
 
1219
  Field **table_fields= in_table->field;
 
1220
 
 
1221
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1222
  string_value->set_charset(system_charset_info);
 
1223
 
 
1224
  /* We will read all the table's fields... */
 
1225
  in_table->setReadSet();
 
1226
 
 
1227
  while ((current_field= *table_fields++) != NULL) 
 
1228
  {
 
1229
    string_value= current_field->val_str(string_value);
 
1230
    record->add_insert_value(string_value->c_ptr(), string_value->length());
 
1231
    string_value->free();
 
1232
  }
 
1233
  return false;
 
1234
}
 
1235
 
 
1236
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1237
                                                            Table *in_table,
 
1238
                                                            const unsigned char *old_record, 
 
1239
                                                            const unsigned char *new_record)
 
1240
{
 
1241
  message::Statement *statement= in_session->getStatementMessage();
 
1242
  /*
 
1243
   * We check to see if the current Statement message is of type UPDATE.
 
1244
   * If it is not, we finalize the current Statement and ensure a new
 
1245
   * UpdateStatement is created.
 
1246
   */
 
1247
  if (statement != NULL &&
 
1248
      statement->type() != message::Statement::UPDATE)
 
1249
  {
 
1250
    finalizeStatementMessage(*statement, in_session);
 
1251
    statement= in_session->getStatementMessage();
 
1252
  }
 
1253
 
 
1254
  if (statement == NULL)
 
1255
  {
 
1256
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1257
    /* 
 
1258
     * Transaction message initialized and set, but no statement created
 
1259
     * yet.  We construct one and initialize it, here, then return the
 
1260
     * message after attaching the new Statement message pointer to the 
 
1261
     * Session for easy retrieval later...
 
1262
     */
 
1263
    statement= transaction->add_statement();
 
1264
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1265
    in_session->setStatementMessage(statement);
 
1266
  }
 
1267
  return *statement;
 
1268
}
 
1269
 
 
1270
void TransactionServices::setUpdateHeader(message::Statement &statement,
 
1271
                                          Session *in_session,
 
1272
                                          Table *in_table,
 
1273
                                          const unsigned char *old_record, 
 
1274
                                          const unsigned char *new_record)
 
1275
{
 
1276
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1277
 
 
1278
  /* 
 
1279
   * Now we construct the specialized UpdateHeader message inside
 
1280
   * the generalized message::Statement container...
 
1281
   */
 
1282
  /* Set up the update header */
 
1283
  message::UpdateHeader *header= statement.mutable_update_header();
 
1284
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1285
 
 
1286
  string schema_name;
 
1287
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1288
  string table_name;
 
1289
  (void) in_table->getShare()->getTableName(table_name);
 
1290
 
 
1291
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1292
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1293
 
 
1294
  Field *current_field;
 
1295
  Field **table_fields= in_table->field;
 
1296
 
 
1297
  message::FieldMetadata *field_metadata;
 
1298
 
 
1299
  /* We will read all the table's fields... */
 
1300
  in_table->setReadSet();
 
1301
 
 
1302
  while ((current_field= *table_fields++) != NULL) 
 
1303
  {
 
1304
    /*
 
1305
     * We add the "key field metadata" -- i.e. the fields which is
 
1306
     * the primary key for the table.
 
1307
     */
 
1308
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1309
    {
 
1310
      field_metadata= header->add_key_field_metadata();
 
1311
      field_metadata->set_name(current_field->field_name);
 
1312
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1313
    }
 
1314
 
 
1315
    /*
 
1316
     * The below really should be moved into the Field API and Record API.  But for now
 
1317
     * we do this crazy pointer fiddling to figure out if the current field
 
1318
     * has been updated in the supplied record raw byte pointers.
 
1319
     */
 
1320
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1321
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1322
 
 
1323
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1324
 
 
1325
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1326
    {
 
1327
      /* Field is changed from old to new */
 
1328
      field_metadata= header->add_set_field_metadata();
 
1329
      field_metadata->set_name(current_field->field_name);
 
1330
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1331
    }
 
1332
  }
 
1333
}
 
1334
void TransactionServices::updateRecord(Session *in_session,
 
1335
                                       Table *in_table, 
 
1336
                                       const unsigned char *old_record, 
 
1337
                                       const unsigned char *new_record)
 
1338
{
 
1339
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1340
  if (! replication_services.isActive())
 
1341
    return;
 
1342
 
 
1343
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
1344
 
 
1345
  message::UpdateData *data= statement.mutable_update_data();
 
1346
  data->set_segment_id(1);
 
1347
  data->set_end_segment(true);
 
1348
  message::UpdateRecord *record= data->add_record();
 
1349
 
 
1350
  Field *current_field;
 
1351
  Field **table_fields= in_table->field;
 
1352
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1353
  string_value->set_charset(system_charset_info);
 
1354
 
 
1355
  while ((current_field= *table_fields++) != NULL) 
 
1356
  {
 
1357
    /*
 
1358
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
1359
     * but then realized that an UPDATE statement could potentially have different values for
 
1360
     * the SET field.  For instance, imagine this SQL scenario:
 
1361
     *
 
1362
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
1363
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
1364
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
1365
     *
 
1366
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
1367
     *
 
1368
     * The below really should be moved into the Field API and Record API.  But for now
 
1369
     * we do this crazy pointer fiddling to figure out if the current field
 
1370
     * has been updated in the supplied record raw byte pointers.
 
1371
     */
 
1372
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1373
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1374
 
 
1375
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1376
 
 
1377
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1378
    {
 
1379
      /* Store the original "read bit" for this field */
 
1380
      bool is_read_set= current_field->isReadSet();
 
1381
 
 
1382
      /* We need to mark that we will "read" this field... */
 
1383
      in_table->setReadSet(current_field->field_index);
 
1384
 
 
1385
      /* Read the string value of this field's contents */
 
1386
      string_value= current_field->val_str(string_value);
 
1387
 
 
1388
      /* 
 
1389
       * Reset the read bit after reading field to its original state.  This 
 
1390
       * prevents the field from being included in the WHERE clause
 
1391
       */
 
1392
      current_field->setReadSet(is_read_set);
 
1393
 
 
1394
      record->add_after_value(string_value->c_ptr(), string_value->length());
 
1395
      string_value->free();
 
1396
    }
 
1397
 
 
1398
    /* 
 
1399
     * Add the WHERE clause values now...for now, this means the
 
1400
     * primary key field value.  Replication only supports tables
 
1401
     * with a primary key.
 
1402
     */
 
1403
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1404
    {
 
1405
      /**
 
1406
       * To say the below is ugly is an understatement. But it works.
 
1407
       * 
 
1408
       * @todo Move this crap into a real Record API.
 
1409
       */
 
1410
      string_value= current_field->val_str(string_value,
 
1411
                                           old_record + 
 
1412
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
1413
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1414
      string_value->free();
 
1415
    }
 
1416
 
 
1417
  }
 
1418
}
 
1419
 
 
1420
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1421
                                                            Table *in_table)
 
1422
{
 
1423
  message::Statement *statement= in_session->getStatementMessage();
 
1424
  /*
 
1425
   * We check to see if the current Statement message is of type DELETE.
 
1426
   * If it is not, we finalize the current Statement and ensure a new
 
1427
   * DeleteStatement is created.
 
1428
   */
 
1429
  if (statement != NULL &&
 
1430
      statement->type() != message::Statement::DELETE)
 
1431
  {
 
1432
    finalizeStatementMessage(*statement, in_session);
 
1433
    statement= in_session->getStatementMessage();
 
1434
  }
 
1435
 
 
1436
  if (statement == NULL)
 
1437
  {
 
1438
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1439
    /* 
 
1440
     * Transaction message initialized and set, but no statement created
 
1441
     * yet.  We construct one and initialize it, here, then return the
 
1442
     * message after attaching the new Statement message pointer to the 
 
1443
     * Session for easy retrieval later...
 
1444
     */
 
1445
    statement= transaction->add_statement();
 
1446
    setDeleteHeader(*statement, in_session, in_table);
 
1447
    in_session->setStatementMessage(statement);
 
1448
  }
 
1449
  return *statement;
 
1450
}
 
1451
 
 
1452
void TransactionServices::setDeleteHeader(message::Statement &statement,
 
1453
                                          Session *in_session,
 
1454
                                          Table *in_table)
 
1455
{
 
1456
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1457
 
 
1458
  /* 
 
1459
   * Now we construct the specialized DeleteHeader message inside
 
1460
   * the generalized message::Statement container...
 
1461
   */
 
1462
  message::DeleteHeader *header= statement.mutable_delete_header();
 
1463
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1464
 
 
1465
  string schema_name;
 
1466
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1467
  string table_name;
 
1468
  (void) in_table->getShare()->getTableName(table_name);
 
1469
 
 
1470
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1471
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1472
 
 
1473
  Field *current_field;
 
1474
  Field **table_fields= in_table->field;
 
1475
 
 
1476
  message::FieldMetadata *field_metadata;
 
1477
 
 
1478
  while ((current_field= *table_fields++) != NULL) 
 
1479
  {
 
1480
    /* 
 
1481
     * Add the WHERE clause values now...for now, this means the
 
1482
     * primary key field value.  Replication only supports tables
 
1483
     * with a primary key.
 
1484
     */
 
1485
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1486
    {
 
1487
      field_metadata= header->add_key_field_metadata();
 
1488
      field_metadata->set_name(current_field->field_name);
 
1489
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1490
    }
 
1491
  }
 
1492
}
 
1493
 
 
1494
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
1495
{
 
1496
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1497
  if (! replication_services.isActive())
 
1498
    return;
 
1499
 
 
1500
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
1501
 
 
1502
  message::DeleteData *data= statement.mutable_delete_data();
 
1503
  data->set_segment_id(1);
 
1504
  data->set_end_segment(true);
 
1505
  message::DeleteRecord *record= data->add_record();
 
1506
 
 
1507
  Field *current_field;
 
1508
  Field **table_fields= in_table->field;
 
1509
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1510
  string_value->set_charset(system_charset_info);
 
1511
 
 
1512
  while ((current_field= *table_fields++) != NULL) 
 
1513
  {
 
1514
    /* 
 
1515
     * Add the WHERE clause values now...for now, this means the
 
1516
     * primary key field value.  Replication only supports tables
 
1517
     * with a primary key.
 
1518
     */
 
1519
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1520
    {
 
1521
      string_value= current_field->val_str(string_value);
 
1522
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1523
      /**
 
1524
       * @TODO Store optional old record value in the before data member
 
1525
       */
 
1526
      string_value->free();
 
1527
    }
 
1528
  }
 
1529
}
 
1530
 
 
1531
void TransactionServices::createTable(Session *in_session,
 
1532
                                      const message::Table &table)
 
1533
{
 
1534
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1535
  if (! replication_services.isActive())
 
1536
    return;
 
1537
  
 
1538
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1539
  message::Statement *statement= transaction->add_statement();
 
1540
 
 
1541
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1542
 
 
1543
  /* 
 
1544
   * Construct the specialized CreateTableStatement message and attach
 
1545
   * it to the generic Statement message
 
1546
   */
 
1547
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
 
1548
  message::Table *new_table_message= create_table_statement->mutable_table();
 
1549
  *new_table_message= table;
 
1550
 
 
1551
  finalizeStatementMessage(*statement, in_session);
 
1552
 
 
1553
  finalizeTransactionMessage(*transaction, in_session);
 
1554
  
 
1555
  replication_services.pushTransactionMessage(*transaction);
 
1556
 
 
1557
  cleanupTransactionMessage(transaction, in_session);
 
1558
 
 
1559
}
 
1560
 
 
1561
void TransactionServices::createSchema(Session *in_session,
 
1562
                                       const message::Schema &schema)
 
1563
{
 
1564
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1565
  if (! replication_services.isActive())
 
1566
    return;
 
1567
  
 
1568
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1569
  message::Statement *statement= transaction->add_statement();
 
1570
 
 
1571
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1572
 
 
1573
  /* 
 
1574
   * Construct the specialized CreateSchemaStatement message and attach
 
1575
   * it to the generic Statement message
 
1576
   */
 
1577
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
 
1578
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
 
1579
  *new_schema_message= schema;
 
1580
 
 
1581
  finalizeStatementMessage(*statement, in_session);
 
1582
 
 
1583
  finalizeTransactionMessage(*transaction, in_session);
 
1584
  
 
1585
  replication_services.pushTransactionMessage(*transaction);
 
1586
 
 
1587
  cleanupTransactionMessage(transaction, in_session);
 
1588
 
 
1589
}
 
1590
 
 
1591
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
 
1592
{
 
1593
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1594
  if (! replication_services.isActive())
 
1595
    return;
 
1596
  
 
1597
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1598
  message::Statement *statement= transaction->add_statement();
 
1599
 
 
1600
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1601
 
 
1602
  /* 
 
1603
   * Construct the specialized DropSchemaStatement message and attach
 
1604
   * it to the generic Statement message
 
1605
   */
 
1606
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
 
1607
 
 
1608
  drop_schema_statement->set_schema_name(schema_name);
 
1609
 
 
1610
  finalizeStatementMessage(*statement, in_session);
 
1611
 
 
1612
  finalizeTransactionMessage(*transaction, in_session);
 
1613
  
 
1614
  replication_services.pushTransactionMessage(*transaction);
 
1615
 
 
1616
  cleanupTransactionMessage(transaction, in_session);
 
1617
}
 
1618
 
 
1619
void TransactionServices::dropTable(Session *in_session,
 
1620
                                    const string &schema_name,
 
1621
                                    const string &table_name,
 
1622
                                    bool if_exists)
 
1623
{
 
1624
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1625
  if (! replication_services.isActive())
 
1626
    return;
 
1627
  
 
1628
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1629
  message::Statement *statement= transaction->add_statement();
 
1630
 
 
1631
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
1632
 
 
1633
  /* 
 
1634
   * Construct the specialized DropTableStatement message and attach
 
1635
   * it to the generic Statement message
 
1636
   */
 
1637
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
 
1638
 
 
1639
  drop_table_statement->set_if_exists_clause(if_exists);
 
1640
 
 
1641
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
 
1642
 
 
1643
  table_metadata->set_schema_name(schema_name);
 
1644
  table_metadata->set_table_name(table_name);
 
1645
 
 
1646
  finalizeStatementMessage(*statement, in_session);
 
1647
 
 
1648
  finalizeTransactionMessage(*transaction, in_session);
 
1649
  
 
1650
  replication_services.pushTransactionMessage(*transaction);
 
1651
 
 
1652
  cleanupTransactionMessage(transaction, in_session);
 
1653
}
 
1654
 
 
1655
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
1656
{
 
1657
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1658
  if (! replication_services.isActive())
 
1659
    return;
 
1660
  
 
1661
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1662
  message::Statement *statement= transaction->add_statement();
 
1663
 
 
1664
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
1665
 
 
1666
  /* 
 
1667
   * Construct the specialized TruncateTableStatement message and attach
 
1668
   * it to the generic Statement message
 
1669
   */
 
1670
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
1671
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
1672
 
 
1673
  string schema_name;
 
1674
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1675
  string table_name;
 
1676
  (void) in_table->getShare()->getTableName(table_name);
 
1677
 
 
1678
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1679
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1680
 
 
1681
  finalizeStatementMessage(*statement, in_session);
 
1682
 
 
1683
  finalizeTransactionMessage(*transaction, in_session);
 
1684
  
 
1685
  replication_services.pushTransactionMessage(*transaction);
 
1686
 
 
1687
  cleanupTransactionMessage(transaction, in_session);
 
1688
}
 
1689
 
 
1690
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
1691
{
 
1692
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1693
  if (! replication_services.isActive())
 
1694
    return;
 
1695
  
 
1696
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1697
  message::Statement *statement= transaction->add_statement();
 
1698
 
 
1699
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
1700
  statement->set_sql(query);
 
1701
  finalizeStatementMessage(*statement, in_session);
 
1702
 
 
1703
  finalizeTransactionMessage(*transaction, in_session);
 
1704
  
 
1705
  replication_services.pushTransactionMessage(*transaction);
 
1706
 
 
1707
  cleanupTransactionMessage(transaction, in_session);
 
1708
}
 
1709
 
966
1710
} /* namespace drizzled */