991
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
993
message::Transaction *transaction= in_session->getTransactionMessage();
995
if (unlikely(transaction == NULL))
998
* Allocate and initialize a new transaction message
999
* for this Session object. Session is responsible for
1000
* deleting transaction message when done with it.
1002
transaction= new (nothrow) message::Transaction();
1003
initTransactionMessage(*transaction, in_session);
1004
in_session->setTransactionMessage(transaction);
1011
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1012
Session *in_session)
1014
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
trx->set_server_id(in_session->getServerId());
1016
trx->set_transaction_id(in_session->getQueryId());
1017
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1020
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1021
Session *in_session)
1023
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1024
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1027
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1028
Session *in_session)
1030
delete in_transaction;
1031
in_session->setStatementMessage(NULL);
1032
in_session->setTransactionMessage(NULL);
1035
void TransactionServices::commitTransactionMessage(Session *in_session)
1037
ReplicationServices &replication_services= ReplicationServices::singleton();
1038
if (! replication_services.isActive())
1041
/* If there is an active statement message, finalize it */
1042
message::Statement *statement= in_session->getStatementMessage();
1044
if (statement != NULL)
1046
finalizeStatementMessage(*statement, in_session);
1049
return; /* No data modification occurred inside the transaction */
1051
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1053
finalizeTransactionMessage(*transaction, in_session);
1055
replication_services.pushTransactionMessage(*transaction);
1057
cleanupTransactionMessage(transaction, in_session);
1060
void TransactionServices::initStatementMessage(message::Statement &statement,
1061
message::Statement::Type in_type,
1062
Session *in_session)
1064
statement.set_type(in_type);
1065
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1066
/** @TODO Set sql string optionally */
1069
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1070
Session *in_session)
1072
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1073
in_session->setStatementMessage(NULL);
1076
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1078
ReplicationServices &replication_services= ReplicationServices::singleton();
1079
if (! replication_services.isActive())
1082
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1085
* OK, so there are two situations that we need to deal with here:
1087
* 1) We receive an instruction to ROLLBACK the current transaction
1088
* and the currently-stored Transaction message is *self-contained*,
1089
* meaning that no Statement messages in the Transaction message
1090
* contain a message having its segment_id member greater than 1. If
1091
* no non-segment ID 1 members are found, we can simply clear the
1092
* current Transaction message and remove it from memory.
1094
* 2) If the Transaction message does indeed have a non-end segment, that
1095
* means that a bulk update/delete/insert Transaction message segment
1096
* has previously been sent over the wire to replicators. In this case,
1097
* we need to package a Transaction with a Statement message of type
1098
* ROLLBACK to indicate to replicators that previously-transmitted
1099
* messages must be un-applied.
1101
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1104
* Clear the transaction, create a Rollback statement message,
1105
* attach it to the transaction, and push it to replicators.
1107
transaction->Clear();
1108
initTransactionMessage(*transaction, in_session);
1110
message::Statement *statement= transaction->add_statement();
1112
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1113
finalizeStatementMessage(*statement, in_session);
1115
finalizeTransactionMessage(*transaction, in_session);
1117
replication_services.pushTransactionMessage(*transaction);
1119
cleanupTransactionMessage(transaction, in_session);
1122
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1125
message::Statement *statement= in_session->getStatementMessage();
1127
* We check to see if the current Statement message is of type INSERT.
1128
* If it is not, we finalize the current Statement and ensure a new
1129
* InsertStatement is created.
1131
if (statement != NULL &&
1132
statement->type() != message::Statement::INSERT)
1134
finalizeStatementMessage(*statement, in_session);
1135
statement= in_session->getStatementMessage();
1138
if (statement == NULL)
1140
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1142
* Transaction message initialized and set, but no statement created
1143
* yet. We construct one and initialize it, here, then return the
1144
* message after attaching the new Statement message pointer to the
1145
* Session for easy retrieval later...
1147
statement= transaction->add_statement();
1148
setInsertHeader(*statement, in_session, in_table);
1149
in_session->setStatementMessage(statement);
1154
void TransactionServices::setInsertHeader(message::Statement &statement,
1155
Session *in_session,
1158
initStatementMessage(statement, message::Statement::INSERT, in_session);
1161
* Now we construct the specialized InsertHeader message inside
1162
* the generalized message::Statement container...
1164
/* Set up the insert header */
1165
message::InsertHeader *header= statement.mutable_insert_header();
1166
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1168
const char *schema_name= in_table->getShare()->db.str;
1169
const char *table_name= in_table->getShare()->table_name.str;
1171
table_metadata->set_schema_name(schema_name);
1172
table_metadata->set_table_name(table_name);
1174
Field *current_field;
1175
Field **table_fields= in_table->field;
1177
message::FieldMetadata *field_metadata;
1179
/* We will read all the table's fields... */
1180
in_table->setReadSet();
1182
while ((current_field= *table_fields++) != NULL)
1184
field_metadata= header->add_field_metadata();
1185
field_metadata->set_name(current_field->field_name);
1186
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1190
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1192
ReplicationServices &replication_services= ReplicationServices::singleton();
1193
if (! replication_services.isActive())
1196
* We do this check here because we don't want to even create a
1197
* statement if there isn't a primary key on the table...
1201
* Multi-column primary keys are handled how exactly?
1203
if (in_table->s->primary_key == MAX_KEY)
1205
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1209
message::Statement &statement= getInsertStatement(in_session, in_table);
1211
message::InsertData *data= statement.mutable_insert_data();
1212
data->set_segment_id(1);
1213
data->set_end_segment(true);
1214
message::InsertRecord *record= data->add_record();
1216
Field *current_field;
1217
Field **table_fields= in_table->field;
1219
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1220
string_value->set_charset(system_charset_info);
1222
/* We will read all the table's fields... */
1223
in_table->setReadSet();
1225
while ((current_field= *table_fields++) != NULL)
1227
string_value= current_field->val_str(string_value);
1228
record->add_insert_value(string_value->c_ptr(), string_value->length());
1229
string_value->free();
1234
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1236
const unsigned char *old_record,
1237
const unsigned char *new_record)
1239
message::Statement *statement= in_session->getStatementMessage();
1241
* We check to see if the current Statement message is of type UPDATE.
1242
* If it is not, we finalize the current Statement and ensure a new
1243
* UpdateStatement is created.
1245
if (statement != NULL &&
1246
statement->type() != message::Statement::UPDATE)
1248
finalizeStatementMessage(*statement, in_session);
1249
statement= in_session->getStatementMessage();
1252
if (statement == NULL)
1254
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1256
* Transaction message initialized and set, but no statement created
1257
* yet. We construct one and initialize it, here, then return the
1258
* message after attaching the new Statement message pointer to the
1259
* Session for easy retrieval later...
1261
statement= transaction->add_statement();
1262
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1263
in_session->setStatementMessage(statement);
1268
void TransactionServices::setUpdateHeader(message::Statement &statement,
1269
Session *in_session,
1271
const unsigned char *old_record,
1272
const unsigned char *new_record)
1274
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1277
* Now we construct the specialized UpdateHeader message inside
1278
* the generalized message::Statement container...
1280
/* Set up the update header */
1281
message::UpdateHeader *header= statement.mutable_update_header();
1282
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1284
const char *schema_name= in_table->getShare()->db.str;
1285
const char *table_name= in_table->getShare()->table_name.str;
1287
table_metadata->set_schema_name(schema_name);
1288
table_metadata->set_table_name(table_name);
1290
Field *current_field;
1291
Field **table_fields= in_table->field;
1293
message::FieldMetadata *field_metadata;
1295
/* We will read all the table's fields... */
1296
in_table->setReadSet();
1298
while ((current_field= *table_fields++) != NULL)
1301
* We add the "key field metadata" -- i.e. the fields which is
1302
* the primary key for the table.
1304
if (in_table->s->fieldInPrimaryKey(current_field))
1306
field_metadata= header->add_key_field_metadata();
1307
field_metadata->set_name(current_field->field_name);
1308
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1312
* The below really should be moved into the Field API and Record API. But for now
1313
* we do this crazy pointer fiddling to figure out if the current field
1314
* has been updated in the supplied record raw byte pointers.
1316
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1317
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1319
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1321
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1323
/* Field is changed from old to new */
1324
field_metadata= header->add_set_field_metadata();
1325
field_metadata->set_name(current_field->field_name);
1326
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1330
void TransactionServices::updateRecord(Session *in_session,
1332
const unsigned char *old_record,
1333
const unsigned char *new_record)
1335
ReplicationServices &replication_services= ReplicationServices::singleton();
1336
if (! replication_services.isActive())
1339
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1341
message::UpdateData *data= statement.mutable_update_data();
1342
data->set_segment_id(1);
1343
data->set_end_segment(true);
1344
message::UpdateRecord *record= data->add_record();
1346
Field *current_field;
1347
Field **table_fields= in_table->field;
1348
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1349
string_value->set_charset(system_charset_info);
1351
while ((current_field= *table_fields++) != NULL)
1354
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1355
* but then realized that an UPDATE statement could potentially have different values for
1356
* the SET field. For instance, imagine this SQL scenario:
1358
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1359
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1360
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1362
* We will generate two UpdateRecord messages with different set_value byte arrays.
1364
* The below really should be moved into the Field API and Record API. But for now
1365
* we do this crazy pointer fiddling to figure out if the current field
1366
* has been updated in the supplied record raw byte pointers.
1368
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1369
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1371
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1373
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1375
/* Store the original "read bit" for this field */
1376
bool is_read_set= current_field->isReadSet();
1378
/* We need to mark that we will "read" this field... */
1379
in_table->setReadSet(current_field->field_index);
1381
/* Read the string value of this field's contents */
1382
string_value= current_field->val_str(string_value);
1385
* Reset the read bit after reading field to its original state. This
1386
* prevents the field from being included in the WHERE clause
1388
current_field->setReadSet(is_read_set);
1390
record->add_after_value(string_value->c_ptr(), string_value->length());
1391
string_value->free();
1395
* Add the WHERE clause values now...for now, this means the
1396
* primary key field value. Replication only supports tables
1397
* with a primary key.
1399
if (in_table->s->fieldInPrimaryKey(current_field))
1402
* To say the below is ugly is an understatement. But it works.
1404
* @todo Move this crap into a real Record API.
1406
string_value= current_field->val_str(string_value,
1408
current_field->offset(const_cast<unsigned char *>(new_record)));
1409
record->add_key_value(string_value->c_ptr(), string_value->length());
1410
string_value->free();
1416
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1419
message::Statement *statement= in_session->getStatementMessage();
1421
* We check to see if the current Statement message is of type DELETE.
1422
* If it is not, we finalize the current Statement and ensure a new
1423
* DeleteStatement is created.
1425
if (statement != NULL &&
1426
statement->type() != message::Statement::DELETE)
1428
finalizeStatementMessage(*statement, in_session);
1429
statement= in_session->getStatementMessage();
1432
if (statement == NULL)
1434
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1436
* Transaction message initialized and set, but no statement created
1437
* yet. We construct one and initialize it, here, then return the
1438
* message after attaching the new Statement message pointer to the
1439
* Session for easy retrieval later...
1441
statement= transaction->add_statement();
1442
setDeleteHeader(*statement, in_session, in_table);
1443
in_session->setStatementMessage(statement);
1448
void TransactionServices::setDeleteHeader(message::Statement &statement,
1449
Session *in_session,
1452
initStatementMessage(statement, message::Statement::DELETE, in_session);
1455
* Now we construct the specialized DeleteHeader message inside
1456
* the generalized message::Statement container...
1458
message::DeleteHeader *header= statement.mutable_delete_header();
1459
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1461
const char *schema_name= in_table->getShare()->db.str;
1462
const char *table_name= in_table->getShare()->table_name.str;
1464
table_metadata->set_schema_name(schema_name);
1465
table_metadata->set_table_name(table_name);
1467
Field *current_field;
1468
Field **table_fields= in_table->field;
1470
message::FieldMetadata *field_metadata;
1472
while ((current_field= *table_fields++) != NULL)
1475
* Add the WHERE clause values now...for now, this means the
1476
* primary key field value. Replication only supports tables
1477
* with a primary key.
1479
if (in_table->s->fieldInPrimaryKey(current_field))
1481
field_metadata= header->add_key_field_metadata();
1482
field_metadata->set_name(current_field->field_name);
1483
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1488
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1490
ReplicationServices &replication_services= ReplicationServices::singleton();
1491
if (! replication_services.isActive())
1494
message::Statement &statement= getDeleteStatement(in_session, in_table);
1496
message::DeleteData *data= statement.mutable_delete_data();
1497
data->set_segment_id(1);
1498
data->set_end_segment(true);
1499
message::DeleteRecord *record= data->add_record();
1501
Field *current_field;
1502
Field **table_fields= in_table->field;
1503
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1504
string_value->set_charset(system_charset_info);
1506
while ((current_field= *table_fields++) != NULL)
1509
* Add the WHERE clause values now...for now, this means the
1510
* primary key field value. Replication only supports tables
1511
* with a primary key.
1513
if (in_table->s->fieldInPrimaryKey(current_field))
1515
string_value= current_field->val_str(string_value);
1516
record->add_key_value(string_value->c_ptr(), string_value->length());
1518
* @TODO Store optional old record value in the before data member
1520
string_value->free();
1525
void TransactionServices::createTable(Session *in_session,
1526
const message::Table &table)
1528
ReplicationServices &replication_services= ReplicationServices::singleton();
1529
if (! replication_services.isActive())
1532
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1533
message::Statement *statement= transaction->add_statement();
1535
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1538
* Construct the specialized CreateTableStatement message and attach
1539
* it to the generic Statement message
1541
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1542
message::Table *new_table_message= create_table_statement->mutable_table();
1543
*new_table_message= table;
1545
finalizeStatementMessage(*statement, in_session);
1547
finalizeTransactionMessage(*transaction, in_session);
1549
replication_services.pushTransactionMessage(*transaction);
1551
cleanupTransactionMessage(transaction, in_session);
1555
void TransactionServices::createSchema(Session *in_session,
1556
const message::Schema &schema)
1558
ReplicationServices &replication_services= ReplicationServices::singleton();
1559
if (! replication_services.isActive())
1562
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1563
message::Statement *statement= transaction->add_statement();
1565
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1568
* Construct the specialized CreateSchemaStatement message and attach
1569
* it to the generic Statement message
1571
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1572
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1573
*new_schema_message= schema;
1575
finalizeStatementMessage(*statement, in_session);
1577
finalizeTransactionMessage(*transaction, in_session);
1579
replication_services.pushTransactionMessage(*transaction);
1581
cleanupTransactionMessage(transaction, in_session);
1585
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1587
ReplicationServices &replication_services= ReplicationServices::singleton();
1588
if (! replication_services.isActive())
1591
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1592
message::Statement *statement= transaction->add_statement();
1594
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1597
* Construct the specialized DropSchemaStatement message and attach
1598
* it to the generic Statement message
1600
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1602
drop_schema_statement->set_schema_name(schema_name);
1604
finalizeStatementMessage(*statement, in_session);
1606
finalizeTransactionMessage(*transaction, in_session);
1608
replication_services.pushTransactionMessage(*transaction);
1610
cleanupTransactionMessage(transaction, in_session);
1613
void TransactionServices::dropTable(Session *in_session,
1614
const string &schema_name,
1615
const string &table_name,
1618
ReplicationServices &replication_services= ReplicationServices::singleton();
1619
if (! replication_services.isActive())
1622
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1623
message::Statement *statement= transaction->add_statement();
1625
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1628
* Construct the specialized DropTableStatement message and attach
1629
* it to the generic Statement message
1631
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1633
drop_table_statement->set_if_exists_clause(if_exists);
1635
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1637
table_metadata->set_schema_name(schema_name);
1638
table_metadata->set_table_name(table_name);
1640
finalizeStatementMessage(*statement, in_session);
1642
finalizeTransactionMessage(*transaction, in_session);
1644
replication_services.pushTransactionMessage(*transaction);
1646
cleanupTransactionMessage(transaction, in_session);
1649
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1651
ReplicationServices &replication_services= ReplicationServices::singleton();
1652
if (! replication_services.isActive())
1655
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1656
message::Statement *statement= transaction->add_statement();
1658
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1661
* Construct the specialized TruncateTableStatement message and attach
1662
* it to the generic Statement message
1664
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1665
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1667
const char *schema_name= in_table->getShare()->db.str;
1668
const char *table_name= in_table->getShare()->table_name.str;
1670
table_metadata->set_schema_name(schema_name);
1671
table_metadata->set_table_name(table_name);
1673
finalizeStatementMessage(*statement, in_session);
1675
finalizeTransactionMessage(*transaction, in_session);
1677
replication_services.pushTransactionMessage(*transaction);
1679
cleanupTransactionMessage(transaction, in_session);
1682
void TransactionServices::rawStatement(Session *in_session, const string &query)
1684
ReplicationServices &replication_services= ReplicationServices::singleton();
1685
if (! replication_services.isActive())
1688
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1689
message::Statement *statement= transaction->add_statement();
1691
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1692
statement->set_sql(query);
1693
finalizeStatementMessage(*statement, in_session);
1695
finalizeTransactionMessage(*transaction, in_session);
1697
replication_services.pushTransactionMessage(*transaction);
1699
cleanupTransactionMessage(transaction, in_session);
966
1702
} /* namespace drizzled */