~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Jay Pipes
  • Date: 2010-03-16 21:30:44 UTC
  • mto: This revision was merged to the branch mainline in revision 1351.
  • Revision ID: jpipes@serialcoder-20100316213044-e4f0xc0aga34l1es
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices.  Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
/**
22
22
 * @file Transaction processing code
 
23
 *
 
24
 * @note
 
25
 *
 
26
 * The TransactionServices component takes internal events (for instance the start of a 
 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
 
29
 * component and used during replication.
 
30
 *
 
31
 * The reason for this functionality is to encapsulate all communication
 
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
33
 * Instead of the plugin having to understand the (often fluidly changing)
 
34
 * mechanics of the kernel, all the plugin needs to understand is the message
 
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
36
 * these messages.
 
37
 *
 
38
 * @see /drizzled/message/transaction.proto
 
39
 *
 
40
 * @todo
 
41
 *
 
42
 * We really should store the raw bytes in the messages, not the
 
43
 * String value of the Field.  But, to do that, the
 
44
 * statement_transform library needs first to be updated
 
45
 * to include the transformation code to convert raw
 
46
 * Drizzle-internal Field byte representation into something
 
47
 * plugins can understand.
23
48
 */
24
49
 
25
50
#include "config.h"
33
58
#include "drizzled/replication_services.h"
34
59
#include "drizzled/transaction_services.h"
35
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
36
63
#include "drizzled/resource_context.h"
37
64
#include "drizzled/lock.h"
38
65
#include "drizzled/item/int.h"
619
646
       * We commit the normal transaction by finalizing the transaction message
620
647
       * and propogating the message to all registered replicators.
621
648
       */
622
 
      ReplicationServices &replication_services= ReplicationServices::singleton();
623
 
      replication_services.commitTransaction(session);
 
649
      commitTransactionMessage(session);
624
650
    }
625
651
  }
626
652
  return error;
686
712
     * a rollback statement with the corresponding transaction ID
687
713
     * to rollback.
688
714
     */
689
 
    ReplicationServices &replication_services= ReplicationServices::singleton();
690
 
    replication_services.rollbackTransaction(session);
 
715
    rollbackTransactionMessage(session);
691
716
 
692
717
    if (is_real_trans)
693
718
      session->transaction.xid_state.xid.null();
963
988
  return error;
964
989
}
965
990
 
 
991
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
 
992
{
 
993
  message::Transaction *transaction= in_session->getTransactionMessage();
 
994
 
 
995
  if (unlikely(transaction == NULL))
 
996
  {
 
997
    /* 
 
998
     * Allocate and initialize a new transaction message 
 
999
     * for this Session object.  Session is responsible for
 
1000
     * deleting transaction message when done with it.
 
1001
     */
 
1002
    transaction= new (nothrow) message::Transaction();
 
1003
    initTransactionMessage(*transaction, in_session);
 
1004
    in_session->setTransactionMessage(transaction);
 
1005
    return transaction;
 
1006
  }
 
1007
  else
 
1008
    return transaction;
 
1009
}
 
1010
 
 
1011
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
1012
                                          Session *in_session)
 
1013
{
 
1014
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1015
  trx->set_server_id(in_session->getServerId());
 
1016
  trx->set_transaction_id(in_session->getQueryId());
 
1017
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
1018
}
 
1019
 
 
1020
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
1021
                                              Session *in_session)
 
1022
{
 
1023
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
1024
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
1025
}
 
1026
 
 
1027
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
1028
                                             Session *in_session)
 
1029
{
 
1030
  delete in_transaction;
 
1031
  in_session->setStatementMessage(NULL);
 
1032
  in_session->setTransactionMessage(NULL);
 
1033
}
 
1034
 
 
1035
void TransactionServices::commitTransactionMessage(Session *in_session)
 
1036
{
 
1037
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1038
  if (! replication_services.isActive())
 
1039
    return;
 
1040
 
 
1041
  /* If there is an active statement message, finalize it */
 
1042
  message::Statement *statement= in_session->getStatementMessage();
 
1043
 
 
1044
  if (statement != NULL)
 
1045
  {
 
1046
    finalizeStatementMessage(*statement, in_session);
 
1047
  }
 
1048
  else
 
1049
    return; /* No data modification occurred inside the transaction */
 
1050
  
 
1051
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1052
 
 
1053
  finalizeTransactionMessage(*transaction, in_session);
 
1054
  
 
1055
  replication_services.pushTransactionMessage(*transaction);
 
1056
 
 
1057
  cleanupTransactionMessage(transaction, in_session);
 
1058
}
 
1059
 
 
1060
void TransactionServices::initStatementMessage(message::Statement &statement,
 
1061
                                        message::Statement::Type in_type,
 
1062
                                        Session *in_session)
 
1063
{
 
1064
  statement.set_type(in_type);
 
1065
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1066
  /** @TODO Set sql string optionally */
 
1067
}
 
1068
 
 
1069
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
 
1070
                                            Session *in_session)
 
1071
{
 
1072
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1073
  in_session->setStatementMessage(NULL);
 
1074
}
 
1075
 
 
1076
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1077
{
 
1078
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1079
  if (! replication_services.isActive())
 
1080
    return;
 
1081
  
 
1082
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1083
 
 
1084
  /*
 
1085
   * OK, so there are two situations that we need to deal with here:
 
1086
   *
 
1087
   * 1) We receive an instruction to ROLLBACK the current transaction
 
1088
   *    and the currently-stored Transaction message is *self-contained*, 
 
1089
   *    meaning that no Statement messages in the Transaction message
 
1090
   *    contain a message having its segment_id member greater than 1.  If
 
1091
   *    no non-segment ID 1 members are found, we can simply clear the
 
1092
   *    current Transaction message and remove it from memory.
 
1093
   *
 
1094
   * 2) If the Transaction message does indeed have a non-end segment, that
 
1095
   *    means that a bulk update/delete/insert Transaction message segment
 
1096
   *    has previously been sent over the wire to replicators.  In this case, 
 
1097
   *    we need to package a Transaction with a Statement message of type
 
1098
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
1099
   *    messages must be un-applied.
 
1100
   */
 
1101
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
 
1102
  {
 
1103
    /*
 
1104
     * Clear the transaction, create a Rollback statement message, 
 
1105
     * attach it to the transaction, and push it to replicators.
 
1106
     */
 
1107
    transaction->Clear();
 
1108
    initTransactionMessage(*transaction, in_session);
 
1109
 
 
1110
    message::Statement *statement= transaction->add_statement();
 
1111
 
 
1112
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1113
    finalizeStatementMessage(*statement, in_session);
 
1114
 
 
1115
    finalizeTransactionMessage(*transaction, in_session);
 
1116
    
 
1117
    replication_services.pushTransactionMessage(*transaction);
 
1118
  }
 
1119
  cleanupTransactionMessage(transaction, in_session);
 
1120
}
 
1121
 
 
1122
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1123
                                                                 Table *in_table)
 
1124
{
 
1125
  message::Statement *statement= in_session->getStatementMessage();
 
1126
  /*
 
1127
   * We check to see if the current Statement message is of type INSERT.
 
1128
   * If it is not, we finalize the current Statement and ensure a new
 
1129
   * InsertStatement is created.
 
1130
   */
 
1131
  if (statement != NULL &&
 
1132
      statement->type() != message::Statement::INSERT)
 
1133
  {
 
1134
    finalizeStatementMessage(*statement, in_session);
 
1135
    statement= in_session->getStatementMessage();
 
1136
  }
 
1137
 
 
1138
  if (statement == NULL)
 
1139
  {
 
1140
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1141
    /* 
 
1142
     * Transaction message initialized and set, but no statement created
 
1143
     * yet.  We construct one and initialize it, here, then return the
 
1144
     * message after attaching the new Statement message pointer to the 
 
1145
     * Session for easy retrieval later...
 
1146
     */
 
1147
    statement= transaction->add_statement();
 
1148
    setInsertHeader(*statement, in_session, in_table);
 
1149
    in_session->setStatementMessage(statement);
 
1150
  }
 
1151
  return *statement;
 
1152
}
 
1153
 
 
1154
void TransactionServices::setInsertHeader(message::Statement &statement,
 
1155
                                          Session *in_session,
 
1156
                                          Table *in_table)
 
1157
{
 
1158
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1159
 
 
1160
  /* 
 
1161
   * Now we construct the specialized InsertHeader message inside
 
1162
   * the generalized message::Statement container...
 
1163
   */
 
1164
  /* Set up the insert header */
 
1165
  message::InsertHeader *header= statement.mutable_insert_header();
 
1166
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1167
 
 
1168
  const char *schema_name= in_table->getShare()->db.str;
 
1169
  const char *table_name= in_table->getShare()->table_name.str;
 
1170
 
 
1171
  table_metadata->set_schema_name(schema_name);
 
1172
  table_metadata->set_table_name(table_name);
 
1173
 
 
1174
  Field *current_field;
 
1175
  Field **table_fields= in_table->field;
 
1176
 
 
1177
  message::FieldMetadata *field_metadata;
 
1178
 
 
1179
  /* We will read all the table's fields... */
 
1180
  in_table->setReadSet();
 
1181
 
 
1182
  while ((current_field= *table_fields++) != NULL) 
 
1183
  {
 
1184
    field_metadata= header->add_field_metadata();
 
1185
    field_metadata->set_name(current_field->field_name);
 
1186
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1187
  }
 
1188
}
 
1189
 
 
1190
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1191
{
 
1192
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1193
  if (! replication_services.isActive())
 
1194
    return false;
 
1195
  /**
 
1196
   * We do this check here because we don't want to even create a 
 
1197
   * statement if there isn't a primary key on the table...
 
1198
   *
 
1199
   * @todo
 
1200
   *
 
1201
   * Multi-column primary keys are handled how exactly?
 
1202
   */
 
1203
  if (in_table->s->primary_key == MAX_KEY)
 
1204
  {
 
1205
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
 
1206
    return true;
 
1207
  }
 
1208
 
 
1209
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
1210
 
 
1211
  message::InsertData *data= statement.mutable_insert_data();
 
1212
  data->set_segment_id(1);
 
1213
  data->set_end_segment(true);
 
1214
  message::InsertRecord *record= data->add_record();
 
1215
 
 
1216
  Field *current_field;
 
1217
  Field **table_fields= in_table->field;
 
1218
 
 
1219
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1220
  string_value->set_charset(system_charset_info);
 
1221
 
 
1222
  /* We will read all the table's fields... */
 
1223
  in_table->setReadSet();
 
1224
 
 
1225
  while ((current_field= *table_fields++) != NULL) 
 
1226
  {
 
1227
    string_value= current_field->val_str(string_value);
 
1228
    record->add_insert_value(string_value->c_ptr(), string_value->length());
 
1229
    string_value->free();
 
1230
  }
 
1231
  return false;
 
1232
}
 
1233
 
 
1234
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1235
                                                            Table *in_table,
 
1236
                                                            const unsigned char *old_record, 
 
1237
                                                            const unsigned char *new_record)
 
1238
{
 
1239
  message::Statement *statement= in_session->getStatementMessage();
 
1240
  /*
 
1241
   * We check to see if the current Statement message is of type UPDATE.
 
1242
   * If it is not, we finalize the current Statement and ensure a new
 
1243
   * UpdateStatement is created.
 
1244
   */
 
1245
  if (statement != NULL &&
 
1246
      statement->type() != message::Statement::UPDATE)
 
1247
  {
 
1248
    finalizeStatementMessage(*statement, in_session);
 
1249
    statement= in_session->getStatementMessage();
 
1250
  }
 
1251
 
 
1252
  if (statement == NULL)
 
1253
  {
 
1254
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1255
    /* 
 
1256
     * Transaction message initialized and set, but no statement created
 
1257
     * yet.  We construct one and initialize it, here, then return the
 
1258
     * message after attaching the new Statement message pointer to the 
 
1259
     * Session for easy retrieval later...
 
1260
     */
 
1261
    statement= transaction->add_statement();
 
1262
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1263
    in_session->setStatementMessage(statement);
 
1264
  }
 
1265
  return *statement;
 
1266
}
 
1267
 
 
1268
void TransactionServices::setUpdateHeader(message::Statement &statement,
 
1269
                                          Session *in_session,
 
1270
                                          Table *in_table,
 
1271
                                          const unsigned char *old_record, 
 
1272
                                          const unsigned char *new_record)
 
1273
{
 
1274
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1275
 
 
1276
  /* 
 
1277
   * Now we construct the specialized UpdateHeader message inside
 
1278
   * the generalized message::Statement container...
 
1279
   */
 
1280
  /* Set up the update header */
 
1281
  message::UpdateHeader *header= statement.mutable_update_header();
 
1282
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1283
 
 
1284
  const char *schema_name= in_table->getShare()->db.str;
 
1285
  const char *table_name= in_table->getShare()->table_name.str;
 
1286
 
 
1287
  table_metadata->set_schema_name(schema_name);
 
1288
  table_metadata->set_table_name(table_name);
 
1289
 
 
1290
  Field *current_field;
 
1291
  Field **table_fields= in_table->field;
 
1292
 
 
1293
  message::FieldMetadata *field_metadata;
 
1294
 
 
1295
  /* We will read all the table's fields... */
 
1296
  in_table->setReadSet();
 
1297
 
 
1298
  while ((current_field= *table_fields++) != NULL) 
 
1299
  {
 
1300
    /*
 
1301
     * We add the "key field metadata" -- i.e. the fields which is
 
1302
     * the primary key for the table.
 
1303
     */
 
1304
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1305
    {
 
1306
      field_metadata= header->add_key_field_metadata();
 
1307
      field_metadata->set_name(current_field->field_name);
 
1308
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1309
    }
 
1310
 
 
1311
    /*
 
1312
     * The below really should be moved into the Field API and Record API.  But for now
 
1313
     * we do this crazy pointer fiddling to figure out if the current field
 
1314
     * has been updated in the supplied record raw byte pointers.
 
1315
     */
 
1316
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1317
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1318
 
 
1319
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1320
 
 
1321
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1322
    {
 
1323
      /* Field is changed from old to new */
 
1324
      field_metadata= header->add_set_field_metadata();
 
1325
      field_metadata->set_name(current_field->field_name);
 
1326
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1327
    }
 
1328
  }
 
1329
}
 
1330
void TransactionServices::updateRecord(Session *in_session,
 
1331
                                       Table *in_table, 
 
1332
                                       const unsigned char *old_record, 
 
1333
                                       const unsigned char *new_record)
 
1334
{
 
1335
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1336
  if (! replication_services.isActive())
 
1337
    return;
 
1338
 
 
1339
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
1340
 
 
1341
  message::UpdateData *data= statement.mutable_update_data();
 
1342
  data->set_segment_id(1);
 
1343
  data->set_end_segment(true);
 
1344
  message::UpdateRecord *record= data->add_record();
 
1345
 
 
1346
  Field *current_field;
 
1347
  Field **table_fields= in_table->field;
 
1348
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1349
  string_value->set_charset(system_charset_info);
 
1350
 
 
1351
  while ((current_field= *table_fields++) != NULL) 
 
1352
  {
 
1353
    /*
 
1354
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
1355
     * but then realized that an UPDATE statement could potentially have different values for
 
1356
     * the SET field.  For instance, imagine this SQL scenario:
 
1357
     *
 
1358
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
1359
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
1360
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
1361
     *
 
1362
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
1363
     *
 
1364
     * The below really should be moved into the Field API and Record API.  But for now
 
1365
     * we do this crazy pointer fiddling to figure out if the current field
 
1366
     * has been updated in the supplied record raw byte pointers.
 
1367
     */
 
1368
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1369
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
1370
 
 
1371
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1372
 
 
1373
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1374
    {
 
1375
      /* Store the original "read bit" for this field */
 
1376
      bool is_read_set= current_field->isReadSet();
 
1377
 
 
1378
      /* We need to mark that we will "read" this field... */
 
1379
      in_table->setReadSet(current_field->field_index);
 
1380
 
 
1381
      /* Read the string value of this field's contents */
 
1382
      string_value= current_field->val_str(string_value);
 
1383
 
 
1384
      /* 
 
1385
       * Reset the read bit after reading field to its original state.  This 
 
1386
       * prevents the field from being included in the WHERE clause
 
1387
       */
 
1388
      current_field->setReadSet(is_read_set);
 
1389
 
 
1390
      record->add_after_value(string_value->c_ptr(), string_value->length());
 
1391
      string_value->free();
 
1392
    }
 
1393
 
 
1394
    /* 
 
1395
     * Add the WHERE clause values now...for now, this means the
 
1396
     * primary key field value.  Replication only supports tables
 
1397
     * with a primary key.
 
1398
     */
 
1399
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1400
    {
 
1401
      /**
 
1402
       * To say the below is ugly is an understatement. But it works.
 
1403
       * 
 
1404
       * @todo Move this crap into a real Record API.
 
1405
       */
 
1406
      string_value= current_field->val_str(string_value,
 
1407
                                           old_record + 
 
1408
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
1409
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1410
      string_value->free();
 
1411
    }
 
1412
 
 
1413
  }
 
1414
}
 
1415
 
 
1416
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1417
                                                            Table *in_table)
 
1418
{
 
1419
  message::Statement *statement= in_session->getStatementMessage();
 
1420
  /*
 
1421
   * We check to see if the current Statement message is of type DELETE.
 
1422
   * If it is not, we finalize the current Statement and ensure a new
 
1423
   * DeleteStatement is created.
 
1424
   */
 
1425
  if (statement != NULL &&
 
1426
      statement->type() != message::Statement::DELETE)
 
1427
  {
 
1428
    finalizeStatementMessage(*statement, in_session);
 
1429
    statement= in_session->getStatementMessage();
 
1430
  }
 
1431
 
 
1432
  if (statement == NULL)
 
1433
  {
 
1434
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1435
    /* 
 
1436
     * Transaction message initialized and set, but no statement created
 
1437
     * yet.  We construct one and initialize it, here, then return the
 
1438
     * message after attaching the new Statement message pointer to the 
 
1439
     * Session for easy retrieval later...
 
1440
     */
 
1441
    statement= transaction->add_statement();
 
1442
    setDeleteHeader(*statement, in_session, in_table);
 
1443
    in_session->setStatementMessage(statement);
 
1444
  }
 
1445
  return *statement;
 
1446
}
 
1447
 
 
1448
void TransactionServices::setDeleteHeader(message::Statement &statement,
 
1449
                                          Session *in_session,
 
1450
                                          Table *in_table)
 
1451
{
 
1452
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1453
 
 
1454
  /* 
 
1455
   * Now we construct the specialized DeleteHeader message inside
 
1456
   * the generalized message::Statement container...
 
1457
   */
 
1458
  message::DeleteHeader *header= statement.mutable_delete_header();
 
1459
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1460
 
 
1461
  const char *schema_name= in_table->getShare()->db.str;
 
1462
  const char *table_name= in_table->getShare()->table_name.str;
 
1463
 
 
1464
  table_metadata->set_schema_name(schema_name);
 
1465
  table_metadata->set_table_name(table_name);
 
1466
 
 
1467
  Field *current_field;
 
1468
  Field **table_fields= in_table->field;
 
1469
 
 
1470
  message::FieldMetadata *field_metadata;
 
1471
 
 
1472
  while ((current_field= *table_fields++) != NULL) 
 
1473
  {
 
1474
    /* 
 
1475
     * Add the WHERE clause values now...for now, this means the
 
1476
     * primary key field value.  Replication only supports tables
 
1477
     * with a primary key.
 
1478
     */
 
1479
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1480
    {
 
1481
      field_metadata= header->add_key_field_metadata();
 
1482
      field_metadata->set_name(current_field->field_name);
 
1483
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1484
    }
 
1485
  }
 
1486
}
 
1487
 
 
1488
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
1489
{
 
1490
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1491
  if (! replication_services.isActive())
 
1492
    return;
 
1493
 
 
1494
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
1495
 
 
1496
  message::DeleteData *data= statement.mutable_delete_data();
 
1497
  data->set_segment_id(1);
 
1498
  data->set_end_segment(true);
 
1499
  message::DeleteRecord *record= data->add_record();
 
1500
 
 
1501
  Field *current_field;
 
1502
  Field **table_fields= in_table->field;
 
1503
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1504
  string_value->set_charset(system_charset_info);
 
1505
 
 
1506
  while ((current_field= *table_fields++) != NULL) 
 
1507
  {
 
1508
    /* 
 
1509
     * Add the WHERE clause values now...for now, this means the
 
1510
     * primary key field value.  Replication only supports tables
 
1511
     * with a primary key.
 
1512
     */
 
1513
    if (in_table->s->fieldInPrimaryKey(current_field))
 
1514
    {
 
1515
      string_value= current_field->val_str(string_value);
 
1516
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1517
      /**
 
1518
       * @TODO Store optional old record value in the before data member
 
1519
       */
 
1520
      string_value->free();
 
1521
    }
 
1522
  }
 
1523
}
 
1524
 
 
1525
void TransactionServices::createTable(Session *in_session,
 
1526
                                      const message::Table &table)
 
1527
{
 
1528
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1529
  if (! replication_services.isActive())
 
1530
    return;
 
1531
  
 
1532
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1533
  message::Statement *statement= transaction->add_statement();
 
1534
 
 
1535
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1536
 
 
1537
  /* 
 
1538
   * Construct the specialized CreateTableStatement message and attach
 
1539
   * it to the generic Statement message
 
1540
   */
 
1541
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
 
1542
  message::Table *new_table_message= create_table_statement->mutable_table();
 
1543
  *new_table_message= table;
 
1544
 
 
1545
  finalizeStatementMessage(*statement, in_session);
 
1546
 
 
1547
  finalizeTransactionMessage(*transaction, in_session);
 
1548
  
 
1549
  replication_services.pushTransactionMessage(*transaction);
 
1550
 
 
1551
  cleanupTransactionMessage(transaction, in_session);
 
1552
 
 
1553
}
 
1554
 
 
1555
void TransactionServices::createSchema(Session *in_session,
 
1556
                                       const message::Schema &schema)
 
1557
{
 
1558
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1559
  if (! replication_services.isActive())
 
1560
    return;
 
1561
  
 
1562
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1563
  message::Statement *statement= transaction->add_statement();
 
1564
 
 
1565
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1566
 
 
1567
  /* 
 
1568
   * Construct the specialized CreateSchemaStatement message and attach
 
1569
   * it to the generic Statement message
 
1570
   */
 
1571
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
 
1572
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
 
1573
  *new_schema_message= schema;
 
1574
 
 
1575
  finalizeStatementMessage(*statement, in_session);
 
1576
 
 
1577
  finalizeTransactionMessage(*transaction, in_session);
 
1578
  
 
1579
  replication_services.pushTransactionMessage(*transaction);
 
1580
 
 
1581
  cleanupTransactionMessage(transaction, in_session);
 
1582
 
 
1583
}
 
1584
 
 
1585
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
 
1586
{
 
1587
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1588
  if (! replication_services.isActive())
 
1589
    return;
 
1590
  
 
1591
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1592
  message::Statement *statement= transaction->add_statement();
 
1593
 
 
1594
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1595
 
 
1596
  /* 
 
1597
   * Construct the specialized DropSchemaStatement message and attach
 
1598
   * it to the generic Statement message
 
1599
   */
 
1600
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
 
1601
 
 
1602
  drop_schema_statement->set_schema_name(schema_name);
 
1603
 
 
1604
  finalizeStatementMessage(*statement, in_session);
 
1605
 
 
1606
  finalizeTransactionMessage(*transaction, in_session);
 
1607
  
 
1608
  replication_services.pushTransactionMessage(*transaction);
 
1609
 
 
1610
  cleanupTransactionMessage(transaction, in_session);
 
1611
}
 
1612
 
 
1613
void TransactionServices::dropTable(Session *in_session,
 
1614
                                    const string &schema_name,
 
1615
                                    const string &table_name,
 
1616
                                    bool if_exists)
 
1617
{
 
1618
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1619
  if (! replication_services.isActive())
 
1620
    return;
 
1621
  
 
1622
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1623
  message::Statement *statement= transaction->add_statement();
 
1624
 
 
1625
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
1626
 
 
1627
  /* 
 
1628
   * Construct the specialized DropTableStatement message and attach
 
1629
   * it to the generic Statement message
 
1630
   */
 
1631
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
 
1632
 
 
1633
  drop_table_statement->set_if_exists_clause(if_exists);
 
1634
 
 
1635
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
 
1636
 
 
1637
  table_metadata->set_schema_name(schema_name);
 
1638
  table_metadata->set_table_name(table_name);
 
1639
 
 
1640
  finalizeStatementMessage(*statement, in_session);
 
1641
 
 
1642
  finalizeTransactionMessage(*transaction, in_session);
 
1643
  
 
1644
  replication_services.pushTransactionMessage(*transaction);
 
1645
 
 
1646
  cleanupTransactionMessage(transaction, in_session);
 
1647
}
 
1648
 
 
1649
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
1650
{
 
1651
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1652
  if (! replication_services.isActive())
 
1653
    return;
 
1654
  
 
1655
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1656
  message::Statement *statement= transaction->add_statement();
 
1657
 
 
1658
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
1659
 
 
1660
  /* 
 
1661
   * Construct the specialized TruncateTableStatement message and attach
 
1662
   * it to the generic Statement message
 
1663
   */
 
1664
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
1665
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
1666
 
 
1667
  const char *schema_name= in_table->getShare()->db.str;
 
1668
  const char *table_name= in_table->getShare()->table_name.str;
 
1669
 
 
1670
  table_metadata->set_schema_name(schema_name);
 
1671
  table_metadata->set_table_name(table_name);
 
1672
 
 
1673
  finalizeStatementMessage(*statement, in_session);
 
1674
 
 
1675
  finalizeTransactionMessage(*transaction, in_session);
 
1676
  
 
1677
  replication_services.pushTransactionMessage(*transaction);
 
1678
 
 
1679
  cleanupTransactionMessage(transaction, in_session);
 
1680
}
 
1681
 
 
1682
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
1683
{
 
1684
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1685
  if (! replication_services.isActive())
 
1686
    return;
 
1687
  
 
1688
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1689
  message::Statement *statement= transaction->add_statement();
 
1690
 
 
1691
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
1692
  statement->set_sql(query);
 
1693
  finalizeStatementMessage(*statement, in_session);
 
1694
 
 
1695
  finalizeTransactionMessage(*transaction, in_session);
 
1696
  
 
1697
  replication_services.pushTransactionMessage(*transaction);
 
1698
 
 
1699
  cleanupTransactionMessage(transaction, in_session);
 
1700
}
 
1701
 
966
1702
} /* namespace drizzled */