991
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
993
message::Transaction *transaction= in_session->getTransactionMessage();
995
if (unlikely(transaction == NULL))
998
* Allocate and initialize a new transaction message
999
* for this Session object. Session is responsible for
1000
* deleting transaction message when done with it.
1002
transaction= new (nothrow) message::Transaction();
1003
initTransactionMessage(*transaction, in_session);
1004
in_session->setTransactionMessage(transaction);
1011
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1012
Session *in_session)
1014
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
trx->set_server_id(in_session->getServerId());
1016
trx->set_transaction_id(in_session->getQueryId());
1017
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1020
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1021
Session *in_session)
1023
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1024
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1027
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1028
Session *in_session)
1030
delete in_transaction;
1031
in_session->setStatementMessage(NULL);
1032
in_session->setTransactionMessage(NULL);
1035
void TransactionServices::commitTransactionMessage(Session *in_session)
1037
ReplicationServices &replication_services= ReplicationServices::singleton();
1038
if (! replication_services.isActive())
1041
/* If there is an active statement message, finalize it */
1042
message::Statement *statement= in_session->getStatementMessage();
1044
if (statement != NULL)
1046
finalizeStatementMessage(*statement, in_session);
1049
return; /* No data modification occurred inside the transaction */
1051
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1053
finalizeTransactionMessage(*transaction, in_session);
1055
replication_services.pushTransactionMessage(*transaction);
1057
cleanupTransactionMessage(transaction, in_session);
1060
void TransactionServices::initStatementMessage(message::Statement &statement,
1061
message::Statement::Type in_type,
1062
Session *in_session)
1064
statement.set_type(in_type);
1065
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1066
/** @TODO Set sql string optionally */
1069
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1070
Session *in_session)
1072
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1073
in_session->setStatementMessage(NULL);
1076
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1078
ReplicationServices &replication_services= ReplicationServices::singleton();
1079
if (! replication_services.isActive())
1082
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1085
* OK, so there are two situations that we need to deal with here:
1087
* 1) We receive an instruction to ROLLBACK the current transaction
1088
* and the currently-stored Transaction message is *self-contained*,
1089
* meaning that no Statement messages in the Transaction message
1090
* contain a message having its segment_id member greater than 1. If
1091
* no non-segment ID 1 members are found, we can simply clear the
1092
* current Transaction message and remove it from memory.
1094
* 2) If the Transaction message does indeed have a non-end segment, that
1095
* means that a bulk update/delete/insert Transaction message segment
1096
* has previously been sent over the wire to replicators. In this case,
1097
* we need to package a Transaction with a Statement message of type
1098
* ROLLBACK to indicate to replicators that previously-transmitted
1099
* messages must be un-applied.
1101
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1104
* Clear the transaction, create a Rollback statement message,
1105
* attach it to the transaction, and push it to replicators.
1107
transaction->Clear();
1108
initTransactionMessage(*transaction, in_session);
1110
message::Statement *statement= transaction->add_statement();
1112
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1113
finalizeStatementMessage(*statement, in_session);
1115
finalizeTransactionMessage(*transaction, in_session);
1117
replication_services.pushTransactionMessage(*transaction);
1119
cleanupTransactionMessage(transaction, in_session);
1122
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1125
message::Statement *statement= in_session->getStatementMessage();
1127
* We check to see if the current Statement message is of type INSERT.
1128
* If it is not, we finalize the current Statement and ensure a new
1129
* InsertStatement is created.
1131
if (statement != NULL &&
1132
statement->type() != message::Statement::INSERT)
1134
finalizeStatementMessage(*statement, in_session);
1135
statement= in_session->getStatementMessage();
1138
if (statement == NULL)
1140
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1142
* Transaction message initialized and set, but no statement created
1143
* yet. We construct one and initialize it, here, then return the
1144
* message after attaching the new Statement message pointer to the
1145
* Session for easy retrieval later...
1147
statement= transaction->add_statement();
1148
setInsertHeader(*statement, in_session, in_table);
1149
in_session->setStatementMessage(statement);
1154
void TransactionServices::setInsertHeader(message::Statement &statement,
1155
Session *in_session,
1158
initStatementMessage(statement, message::Statement::INSERT, in_session);
1161
* Now we construct the specialized InsertHeader message inside
1162
* the generalized message::Statement container...
1164
/* Set up the insert header */
1165
message::InsertHeader *header= statement.mutable_insert_header();
1166
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1169
(void) in_table->getShare()->getSchemaName(schema_name);
1171
(void) in_table->getShare()->getTableName(table_name);
1173
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1174
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1176
Field *current_field;
1177
Field **table_fields= in_table->field;
1179
message::FieldMetadata *field_metadata;
1181
/* We will read all the table's fields... */
1182
in_table->setReadSet();
1184
while ((current_field= *table_fields++) != NULL)
1186
field_metadata= header->add_field_metadata();
1187
field_metadata->set_name(current_field->field_name);
1188
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1192
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1194
ReplicationServices &replication_services= ReplicationServices::singleton();
1195
if (! replication_services.isActive())
1198
* We do this check here because we don't want to even create a
1199
* statement if there isn't a primary key on the table...
1203
* Multi-column primary keys are handled how exactly?
1205
if (in_table->s->primary_key == MAX_KEY)
1207
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1211
message::Statement &statement= getInsertStatement(in_session, in_table);
1213
message::InsertData *data= statement.mutable_insert_data();
1214
data->set_segment_id(1);
1215
data->set_end_segment(true);
1216
message::InsertRecord *record= data->add_record();
1218
Field *current_field;
1219
Field **table_fields= in_table->field;
1221
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1222
string_value->set_charset(system_charset_info);
1224
/* We will read all the table's fields... */
1225
in_table->setReadSet();
1227
while ((current_field= *table_fields++) != NULL)
1229
string_value= current_field->val_str(string_value);
1230
record->add_insert_value(string_value->c_ptr(), string_value->length());
1231
string_value->free();
1236
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1238
const unsigned char *old_record,
1239
const unsigned char *new_record)
1241
message::Statement *statement= in_session->getStatementMessage();
1243
* We check to see if the current Statement message is of type UPDATE.
1244
* If it is not, we finalize the current Statement and ensure a new
1245
* UpdateStatement is created.
1247
if (statement != NULL &&
1248
statement->type() != message::Statement::UPDATE)
1250
finalizeStatementMessage(*statement, in_session);
1251
statement= in_session->getStatementMessage();
1254
if (statement == NULL)
1256
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1258
* Transaction message initialized and set, but no statement created
1259
* yet. We construct one and initialize it, here, then return the
1260
* message after attaching the new Statement message pointer to the
1261
* Session for easy retrieval later...
1263
statement= transaction->add_statement();
1264
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1265
in_session->setStatementMessage(statement);
1270
void TransactionServices::setUpdateHeader(message::Statement &statement,
1271
Session *in_session,
1273
const unsigned char *old_record,
1274
const unsigned char *new_record)
1276
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1279
* Now we construct the specialized UpdateHeader message inside
1280
* the generalized message::Statement container...
1282
/* Set up the update header */
1283
message::UpdateHeader *header= statement.mutable_update_header();
1284
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1287
(void) in_table->getShare()->getSchemaName(schema_name);
1289
(void) in_table->getShare()->getTableName(table_name);
1291
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1292
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1294
Field *current_field;
1295
Field **table_fields= in_table->field;
1297
message::FieldMetadata *field_metadata;
1299
/* We will read all the table's fields... */
1300
in_table->setReadSet();
1302
while ((current_field= *table_fields++) != NULL)
1305
* We add the "key field metadata" -- i.e. the fields which is
1306
* the primary key for the table.
1308
if (in_table->s->fieldInPrimaryKey(current_field))
1310
field_metadata= header->add_key_field_metadata();
1311
field_metadata->set_name(current_field->field_name);
1312
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1316
* The below really should be moved into the Field API and Record API. But for now
1317
* we do this crazy pointer fiddling to figure out if the current field
1318
* has been updated in the supplied record raw byte pointers.
1320
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1321
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1323
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1325
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1327
/* Field is changed from old to new */
1328
field_metadata= header->add_set_field_metadata();
1329
field_metadata->set_name(current_field->field_name);
1330
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1334
void TransactionServices::updateRecord(Session *in_session,
1336
const unsigned char *old_record,
1337
const unsigned char *new_record)
1339
ReplicationServices &replication_services= ReplicationServices::singleton();
1340
if (! replication_services.isActive())
1343
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1345
message::UpdateData *data= statement.mutable_update_data();
1346
data->set_segment_id(1);
1347
data->set_end_segment(true);
1348
message::UpdateRecord *record= data->add_record();
1350
Field *current_field;
1351
Field **table_fields= in_table->field;
1352
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1353
string_value->set_charset(system_charset_info);
1355
while ((current_field= *table_fields++) != NULL)
1358
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1359
* but then realized that an UPDATE statement could potentially have different values for
1360
* the SET field. For instance, imagine this SQL scenario:
1362
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1363
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1364
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1366
* We will generate two UpdateRecord messages with different set_value byte arrays.
1368
* The below really should be moved into the Field API and Record API. But for now
1369
* we do this crazy pointer fiddling to figure out if the current field
1370
* has been updated in the supplied record raw byte pointers.
1372
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1373
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1375
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1377
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1379
/* Store the original "read bit" for this field */
1380
bool is_read_set= current_field->isReadSet();
1382
/* We need to mark that we will "read" this field... */
1383
in_table->setReadSet(current_field->field_index);
1385
/* Read the string value of this field's contents */
1386
string_value= current_field->val_str(string_value);
1389
* Reset the read bit after reading field to its original state. This
1390
* prevents the field from being included in the WHERE clause
1392
current_field->setReadSet(is_read_set);
1394
record->add_after_value(string_value->c_ptr(), string_value->length());
1395
string_value->free();
1399
* Add the WHERE clause values now...for now, this means the
1400
* primary key field value. Replication only supports tables
1401
* with a primary key.
1403
if (in_table->s->fieldInPrimaryKey(current_field))
1406
* To say the below is ugly is an understatement. But it works.
1408
* @todo Move this crap into a real Record API.
1410
string_value= current_field->val_str(string_value,
1412
current_field->offset(const_cast<unsigned char *>(new_record)));
1413
record->add_key_value(string_value->c_ptr(), string_value->length());
1414
string_value->free();
1420
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1423
message::Statement *statement= in_session->getStatementMessage();
1425
* We check to see if the current Statement message is of type DELETE.
1426
* If it is not, we finalize the current Statement and ensure a new
1427
* DeleteStatement is created.
1429
if (statement != NULL &&
1430
statement->type() != message::Statement::DELETE)
1432
finalizeStatementMessage(*statement, in_session);
1433
statement= in_session->getStatementMessage();
1436
if (statement == NULL)
1438
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1440
* Transaction message initialized and set, but no statement created
1441
* yet. We construct one and initialize it, here, then return the
1442
* message after attaching the new Statement message pointer to the
1443
* Session for easy retrieval later...
1445
statement= transaction->add_statement();
1446
setDeleteHeader(*statement, in_session, in_table);
1447
in_session->setStatementMessage(statement);
1452
void TransactionServices::setDeleteHeader(message::Statement &statement,
1453
Session *in_session,
1456
initStatementMessage(statement, message::Statement::DELETE, in_session);
1459
* Now we construct the specialized DeleteHeader message inside
1460
* the generalized message::Statement container...
1462
message::DeleteHeader *header= statement.mutable_delete_header();
1463
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1466
(void) in_table->getShare()->getSchemaName(schema_name);
1468
(void) in_table->getShare()->getTableName(table_name);
1470
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1471
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1473
Field *current_field;
1474
Field **table_fields= in_table->field;
1476
message::FieldMetadata *field_metadata;
1478
while ((current_field= *table_fields++) != NULL)
1481
* Add the WHERE clause values now...for now, this means the
1482
* primary key field value. Replication only supports tables
1483
* with a primary key.
1485
if (in_table->s->fieldInPrimaryKey(current_field))
1487
field_metadata= header->add_key_field_metadata();
1488
field_metadata->set_name(current_field->field_name);
1489
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1494
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1496
ReplicationServices &replication_services= ReplicationServices::singleton();
1497
if (! replication_services.isActive())
1500
message::Statement &statement= getDeleteStatement(in_session, in_table);
1502
message::DeleteData *data= statement.mutable_delete_data();
1503
data->set_segment_id(1);
1504
data->set_end_segment(true);
1505
message::DeleteRecord *record= data->add_record();
1507
Field *current_field;
1508
Field **table_fields= in_table->field;
1509
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1510
string_value->set_charset(system_charset_info);
1512
while ((current_field= *table_fields++) != NULL)
1515
* Add the WHERE clause values now...for now, this means the
1516
* primary key field value. Replication only supports tables
1517
* with a primary key.
1519
if (in_table->s->fieldInPrimaryKey(current_field))
1521
string_value= current_field->val_str(string_value);
1522
record->add_key_value(string_value->c_ptr(), string_value->length());
1524
* @TODO Store optional old record value in the before data member
1526
string_value->free();
1531
void TransactionServices::createTable(Session *in_session,
1532
const message::Table &table)
1534
ReplicationServices &replication_services= ReplicationServices::singleton();
1535
if (! replication_services.isActive())
1538
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1539
message::Statement *statement= transaction->add_statement();
1541
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1544
* Construct the specialized CreateTableStatement message and attach
1545
* it to the generic Statement message
1547
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1548
message::Table *new_table_message= create_table_statement->mutable_table();
1549
*new_table_message= table;
1551
finalizeStatementMessage(*statement, in_session);
1553
finalizeTransactionMessage(*transaction, in_session);
1555
replication_services.pushTransactionMessage(*transaction);
1557
cleanupTransactionMessage(transaction, in_session);
1561
void TransactionServices::createSchema(Session *in_session,
1562
const message::Schema &schema)
1564
ReplicationServices &replication_services= ReplicationServices::singleton();
1565
if (! replication_services.isActive())
1568
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1569
message::Statement *statement= transaction->add_statement();
1571
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1574
* Construct the specialized CreateSchemaStatement message and attach
1575
* it to the generic Statement message
1577
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1578
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1579
*new_schema_message= schema;
1581
finalizeStatementMessage(*statement, in_session);
1583
finalizeTransactionMessage(*transaction, in_session);
1585
replication_services.pushTransactionMessage(*transaction);
1587
cleanupTransactionMessage(transaction, in_session);
1591
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1593
ReplicationServices &replication_services= ReplicationServices::singleton();
1594
if (! replication_services.isActive())
1597
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1598
message::Statement *statement= transaction->add_statement();
1600
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1603
* Construct the specialized DropSchemaStatement message and attach
1604
* it to the generic Statement message
1606
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1608
drop_schema_statement->set_schema_name(schema_name);
1610
finalizeStatementMessage(*statement, in_session);
1612
finalizeTransactionMessage(*transaction, in_session);
1614
replication_services.pushTransactionMessage(*transaction);
1616
cleanupTransactionMessage(transaction, in_session);
1619
void TransactionServices::dropTable(Session *in_session,
1620
const string &schema_name,
1621
const string &table_name,
1624
ReplicationServices &replication_services= ReplicationServices::singleton();
1625
if (! replication_services.isActive())
1628
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1629
message::Statement *statement= transaction->add_statement();
1631
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1634
* Construct the specialized DropTableStatement message and attach
1635
* it to the generic Statement message
1637
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1639
drop_table_statement->set_if_exists_clause(if_exists);
1641
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1643
table_metadata->set_schema_name(schema_name);
1644
table_metadata->set_table_name(table_name);
1646
finalizeStatementMessage(*statement, in_session);
1648
finalizeTransactionMessage(*transaction, in_session);
1650
replication_services.pushTransactionMessage(*transaction);
1652
cleanupTransactionMessage(transaction, in_session);
1655
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1657
ReplicationServices &replication_services= ReplicationServices::singleton();
1658
if (! replication_services.isActive())
1661
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1662
message::Statement *statement= transaction->add_statement();
1664
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1667
* Construct the specialized TruncateTableStatement message and attach
1668
* it to the generic Statement message
1670
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1671
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1674
(void) in_table->getShare()->getSchemaName(schema_name);
1676
(void) in_table->getShare()->getTableName(table_name);
1678
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1679
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1681
finalizeStatementMessage(*statement, in_session);
1683
finalizeTransactionMessage(*transaction, in_session);
1685
replication_services.pushTransactionMessage(*transaction);
1687
cleanupTransactionMessage(transaction, in_session);
1690
void TransactionServices::rawStatement(Session *in_session, const string &query)
1692
ReplicationServices &replication_services= ReplicationServices::singleton();
1693
if (! replication_services.isActive())
1696
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1697
message::Statement *statement= transaction->add_statement();
1699
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1700
statement->set_sql(query);
1701
finalizeStatementMessage(*statement, in_session);
1703
finalizeTransactionMessage(*transaction, in_session);
1705
replication_services.pushTransactionMessage(*transaction);
1707
cleanupTransactionMessage(transaction, in_session);
966
1710
} /* namespace drizzled */