861
session->status_var.ha_rollback_count++;
811
status_var_increment(session->status_var.ha_rollback_count);
864
814
resource_context->reset(); /* keep it conveniently zero-filled */
867
817
trans->setResourceContexts(sv_resource_contexts);
869
if (shouldConstructMessages())
871
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
872
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
873
if (savepoint_transaction != NULL)
875
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
876
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
877
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
878
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
880
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
881
uint32_t num_statements = savepoint_transaction_copy->statement_size();
882
if (num_statements == 0)
884
session->setStatementMessage(NULL);
888
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
890
session->setTransactionMessage(savepoint_transaction_copy);
1141
1031
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1143
uint32_t *next_segment_id)
1145
1034
message::Statement *statement= in_session->getStatementMessage();
1146
message::Transaction *transaction= NULL;
1149
* Check the type for the current Statement message, if it is anything
1150
* other then INSERT we need to call finalize, this will ensure a
1151
* new InsertStatement is created. If it is of type INSERT check
1152
* what table the INSERT belongs to, if it is a different table
1153
* call finalize, so a new InsertStatement can be created.
1036
* We check to see if the current Statement message is of type INSERT.
1037
* If it is not, we finalize the current Statement and ensure a new
1038
* InsertStatement is created.
1155
if (statement != NULL && statement->type() != message::Statement::INSERT)
1040
if (statement != NULL &&
1041
statement->type() != message::Statement::INSERT)
1157
1043
finalizeStatementMessage(*statement, in_session);
1158
1044
statement= in_session->getStatementMessage();
1160
else if (statement != NULL)
1162
transaction= getActiveTransactionMessage(in_session);
1165
* If we've passed our threshold for the statement size (possible for
1166
* a bulk insert), we'll finalize the Statement and Transaction (doing
1167
* the Transaction will keep it from getting huge).
1169
if (static_cast<size_t>(transaction->ByteSize()) >=
1170
in_session->variables.transaction_message_threshold)
1172
/* Remember the transaction ID so we can re-use it */
1173
uint64_t trx_id= transaction->transaction_context().transaction_id();
1175
message::InsertData *current_data= statement->mutable_insert_data();
1177
/* Caller should use this value when adding a new record */
1178
*next_segment_id= current_data->segment_id() + 1;
1180
current_data->set_end_segment(false);
1183
* Send the trx message to replicators after finalizing the
1184
* statement and transaction. This will also set the Transaction
1185
* and Statement objects in Session to NULL.
1187
commitTransactionMessage(in_session);
1190
* Statement and Transaction should now be NULL, so new ones will get
1191
* created. We reuse the transaction id since we are segmenting
1194
statement= in_session->getStatementMessage();
1195
transaction= getActiveTransactionMessage(in_session, false);
1196
assert(transaction != NULL);
1198
/* Set the transaction ID to match the previous messages */
1199
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1203
const message::InsertHeader &insert_header= statement->insert_header();
1204
string old_table_name= insert_header.table_metadata().table_name();
1206
string current_table_name;
1207
(void) in_table->getShare()->getTableName(current_table_name);
1209
if (current_table_name.compare(old_table_name))
1211
finalizeStatementMessage(*statement, in_session);
1212
statement= in_session->getStatementMessage();
1216
/* carry forward the existing segment id */
1217
const message::InsertData ¤t_data= statement->insert_data();
1218
*next_segment_id= current_data.segment_id();
1223
1047
if (statement == NULL)
1226
* Transaction will be non-NULL only if we had to segment it due to
1227
* transaction size above.
1229
if (transaction == NULL)
1230
transaction= getActiveTransactionMessage(in_session);
1049
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1233
1051
* Transaction message initialized and set, but no statement created
1234
1052
* yet. We construct one and initialize it, here, then return the
1337
1145
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1338
1146
Table *in_table,
1339
1147
const unsigned char *old_record,
1340
const unsigned char *new_record,
1341
uint32_t *next_segment_id)
1148
const unsigned char *new_record)
1343
1150
message::Statement *statement= in_session->getStatementMessage();
1344
message::Transaction *transaction= NULL;
1347
* Check the type for the current Statement message, if it is anything
1348
* other then UPDATE we need to call finalize, this will ensure a
1349
* new UpdateStatement is created. If it is of type UPDATE check
1350
* what table the UPDATE belongs to, if it is a different table
1351
* call finalize, so a new UpdateStatement can be created.
1152
* We check to see if the current Statement message is of type UPDATE.
1153
* If it is not, we finalize the current Statement and ensure a new
1154
* UpdateStatement is created.
1353
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1156
if (statement != NULL &&
1157
statement->type() != message::Statement::UPDATE)
1355
1159
finalizeStatementMessage(*statement, in_session);
1356
1160
statement= in_session->getStatementMessage();
1358
else if (statement != NULL)
1360
transaction= getActiveTransactionMessage(in_session);
1363
* If we've passed our threshold for the statement size (possible for
1364
* a bulk insert), we'll finalize the Statement and Transaction (doing
1365
* the Transaction will keep it from getting huge).
1367
if (static_cast<size_t>(transaction->ByteSize()) >=
1368
in_session->variables.transaction_message_threshold)
1370
/* Remember the transaction ID so we can re-use it */
1371
uint64_t trx_id= transaction->transaction_context().transaction_id();
1373
message::UpdateData *current_data= statement->mutable_update_data();
1375
/* Caller should use this value when adding a new record */
1376
*next_segment_id= current_data->segment_id() + 1;
1378
current_data->set_end_segment(false);
1381
* Send the trx message to replicators after finalizing the
1382
* statement and transaction. This will also set the Transaction
1383
* and Statement objects in Session to NULL.
1385
commitTransactionMessage(in_session);
1388
* Statement and Transaction should now be NULL, so new ones will get
1389
* created. We reuse the transaction id since we are segmenting
1392
statement= in_session->getStatementMessage();
1393
transaction= getActiveTransactionMessage(in_session, false);
1394
assert(transaction != NULL);
1396
/* Set the transaction ID to match the previous messages */
1397
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1401
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1403
/* carry forward the existing segment id */
1404
const message::UpdateData ¤t_data= statement->update_data();
1405
*next_segment_id= current_data.segment_id();
1409
finalizeStatementMessage(*statement, in_session);
1410
statement= in_session->getStatementMessage();
1415
1163
if (statement == NULL)
1418
* Transaction will be non-NULL only if we had to segment it due to
1419
* transaction size above.
1421
if (transaction == NULL)
1422
transaction= getActiveTransactionMessage(in_session);
1165
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1425
1167
* Transaction message initialized and set, but no statement created
1426
1168
* yet. We construct one and initialize it, here, then return the
1434
1176
return *statement;
1437
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1439
const unsigned char *old_record,
1440
const unsigned char *new_record)
1442
const message::UpdateHeader &update_header= statement.update_header();
1443
string old_table_name= update_header.table_metadata().table_name();
1445
string current_table_name;
1446
(void) in_table->getShare()->getTableName(current_table_name);
1447
if (current_table_name.compare(old_table_name))
1453
/* Compare the set fields in the existing UpdateHeader and see if they
1454
* match the updated fields in the new record, if they do not we must
1455
* create a new UpdateHeader
1457
size_t num_set_fields= update_header.set_field_metadata_size();
1459
Field *current_field;
1460
Field **table_fields= in_table->getFields();
1461
in_table->setReadSet();
1463
size_t num_calculated_updated_fields= 0;
1465
while ((current_field= *table_fields++) != NULL)
1467
if (num_calculated_updated_fields > num_set_fields)
1472
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1474
/* check that this field exists in the UpdateHeader record */
1477
for (size_t x= 0; x < num_set_fields; ++x)
1479
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1480
string name= field_metadata.name();
1481
if (name.compare(current_field->field_name) == 0)
1484
++num_calculated_updated_fields;
1495
if ((num_calculated_updated_fields == num_set_fields) && found)
1506
1179
void TransactionServices::setUpdateHeader(message::Statement &statement,
1507
1180
Session *in_session,
1508
1181
Table *in_table,
1548
1221
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1551
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1225
* The below really should be moved into the Field API and Record API. But for now
1226
* we do this crazy pointer fiddling to figure out if the current field
1227
* has been updated in the supplied record raw byte pointers.
1229
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1230
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1232
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1234
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1553
1236
/* Field is changed from old to new */
1554
1237
field_metadata= header->add_set_field_metadata();
1591
1273
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1593
1275
* We will generate two UpdateRecord messages with different set_value byte arrays.
1277
* The below really should be moved into the Field API and Record API. But for now
1278
* we do this crazy pointer fiddling to figure out if the current field
1279
* has been updated in the supplied record raw byte pointers.
1595
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1281
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1282
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1284
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1286
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1597
1288
/* Store the original "read bit" for this field */
1598
1289
bool is_read_set= current_field->isReadSet();
1647
bool TransactionServices::isFieldUpdated(Field *current_field,
1649
const unsigned char *old_record,
1650
const unsigned char *new_record)
1653
* The below really should be moved into the Field API and Record API. But for now
1654
* we do this crazy pointer fiddling to figure out if the current field
1655
* has been updated in the supplied record raw byte pointers.
1657
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1658
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1660
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1662
bool old_value_is_null= current_field->is_null_in_record(old_record);
1663
bool new_value_is_null= current_field->is_null_in_record(new_record);
1665
bool isUpdated= false;
1666
if (old_value_is_null != new_value_is_null)
1668
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1672
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1680
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1688
1329
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1690
uint32_t *next_segment_id)
1692
1332
message::Statement *statement= in_session->getStatementMessage();
1693
message::Transaction *transaction= NULL;
1696
* Check the type for the current Statement message, if it is anything
1697
* other then DELETE we need to call finalize, this will ensure a
1698
* new DeleteStatement is created. If it is of type DELETE check
1699
* what table the DELETE belongs to, if it is a different table
1700
* call finalize, so a new DeleteStatement can be created.
1334
* We check to see if the current Statement message is of type DELETE.
1335
* If it is not, we finalize the current Statement and ensure a new
1336
* DeleteStatement is created.
1702
if (statement != NULL && statement->type() != message::Statement::DELETE)
1338
if (statement != NULL &&
1339
statement->type() != message::Statement::DELETE)
1704
1341
finalizeStatementMessage(*statement, in_session);
1705
1342
statement= in_session->getStatementMessage();
1707
else if (statement != NULL)
1709
transaction= getActiveTransactionMessage(in_session);
1712
* If we've passed our threshold for the statement size (possible for
1713
* a bulk insert), we'll finalize the Statement and Transaction (doing
1714
* the Transaction will keep it from getting huge).
1716
if (static_cast<size_t>(transaction->ByteSize()) >=
1717
in_session->variables.transaction_message_threshold)
1719
/* Remember the transaction ID so we can re-use it */
1720
uint64_t trx_id= transaction->transaction_context().transaction_id();
1722
message::DeleteData *current_data= statement->mutable_delete_data();
1724
/* Caller should use this value when adding a new record */
1725
*next_segment_id= current_data->segment_id() + 1;
1727
current_data->set_end_segment(false);
1730
* Send the trx message to replicators after finalizing the
1731
* statement and transaction. This will also set the Transaction
1732
* and Statement objects in Session to NULL.
1734
commitTransactionMessage(in_session);
1737
* Statement and Transaction should now be NULL, so new ones will get
1738
* created. We reuse the transaction id since we are segmenting
1741
statement= in_session->getStatementMessage();
1742
transaction= getActiveTransactionMessage(in_session, false);
1743
assert(transaction != NULL);
1745
/* Set the transaction ID to match the previous messages */
1746
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1750
const message::DeleteHeader &delete_header= statement->delete_header();
1751
string old_table_name= delete_header.table_metadata().table_name();
1753
string current_table_name;
1754
(void) in_table->getShare()->getTableName(current_table_name);
1755
if (current_table_name.compare(old_table_name))
1757
finalizeStatementMessage(*statement, in_session);
1758
statement= in_session->getStatementMessage();
1762
/* carry forward the existing segment id */
1763
const message::DeleteData ¤t_data= statement->delete_data();
1764
*next_segment_id= current_data.segment_id();
1769
1345
if (statement == NULL)
1772
* Transaction will be non-NULL only if we had to segment it due to
1773
* transaction size above.
1775
if (transaction == NULL)
1776
transaction= getActiveTransactionMessage(in_session);
1347
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1779
1349
* Transaction message initialized and set, but no statement created
1780
1350
* yet. We construct one and initialize it, here, then return the
1833
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1403
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1835
1405
ReplicationServices &replication_services= ReplicationServices::singleton();
1836
1406
if (! replication_services.isActive())
1839
uint32_t next_segment_id= 1;
1840
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1409
message::Statement &statement= getDeleteStatement(in_session, in_table);
1842
1411
message::DeleteData *data= statement.mutable_delete_data();
1843
data->set_segment_id(next_segment_id);
1412
data->set_segment_id(1);
1844
1413
data->set_end_segment(true);
1845
1414
message::DeleteRecord *record= data->add_record();
1859
1428
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1861
if (use_update_record)
1864
* Temporarily point to the update record to get its value.
1865
* This is pretty much a hack in order to get the PK value from
1866
* the update record rather than the insert record. Field::val_str()
1867
* should not change anything in Field::ptr, so this should be safe.
1868
* We are careful not to change anything in old_ptr.
1870
const unsigned char *old_ptr= current_field->ptr;
1871
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1872
string_value= current_field->val_str(string_value);
1873
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1877
string_value= current_field->val_str(string_value);
1879
* @TODO Store optional old record value in the before data member
1430
string_value= current_field->val_str(string_value);
1882
1431
record->add_key_value(string_value->c_ptr(), string_value->length());
1433
* @TODO Store optional old record value in the before data member
1883
1435
string_value->free();
1890
* Template for removing Statement records of different types.
1892
* The code for removing records from different Statement message types
1893
* is identical except for the class types that are embedded within the
1896
* There are 3 scenarios we need to look for:
1897
* - We've been asked to remove more records than exist in the Statement
1898
* - We've been asked to remove less records than exist in the Statement
1899
* - We've been asked to remove ALL records that exist in the Statement
1901
* If we are removing ALL records, then effectively we would be left with
1902
* an empty Statement message, so we should just remove it and clean up
1903
* message pointers in the Session object.
1905
template <class DataType, class RecordType>
1906
static bool removeStatementRecordsWithType(Session *session,
1910
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1912
/* If there aren't enough records to remove 'count' of them, error. */
1913
if (num_avail_recs < count)
1917
* If we are removing all of the data records, we'll just remove this
1918
* entire Statement message.
1920
if (num_avail_recs == count)
1922
message::Transaction *transaction= session->getTransactionMessage();
1923
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1924
statements->RemoveLast();
1927
* Now need to set the Session Statement pointer to either the previous
1928
* Statement, or NULL if there isn't one.
1930
if (statements->size() == 0)
1932
session->setStatementMessage(NULL);
1937
* There isn't a great way to get a pointer to the previous Statement
1938
* message using the RepeatedPtrField object, so we'll just get to it
1939
* using the Transaction message.
1941
int last_stmt_idx= transaction->statement_size() - 1;
1942
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1945
/* We only need to remove 'count' records */
1946
else if (num_avail_recs > count)
1948
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1950
records->RemoveLast();
1957
bool TransactionServices::removeStatementRecords(Session *session,
1960
ReplicationServices &replication_services= ReplicationServices::singleton();
1961
if (! replication_services.isActive())
1964
/* Get the most current Statement */
1965
message::Statement *statement= session->getStatementMessage();
1967
/* Make sure we have work to do */
1968
if (statement == NULL)
1973
switch (statement->type())
1975
case message::Statement::INSERT:
1977
message::InsertData *data= statement->mutable_insert_data();
1978
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1982
case message::Statement::UPDATE:
1984
message::UpdateData *data= statement->mutable_update_data();
1985
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1989
case message::Statement::DELETE: /* not sure if this one is possible... */
1991
message::DeleteData *data= statement->mutable_delete_data();
1992
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2005
1440
void TransactionServices::createTable(Session *in_session,
2006
1441
const message::Table &table)
2181
1616
cleanupTransactionMessage(transaction, in_session);
2184
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2186
ReplicationServices &replication_services= ReplicationServices::singleton();
2187
if (! replication_services.isActive())
2190
message::Transaction *transaction= new (nothrow) message::Transaction();
2192
// set server id, start timestamp
2193
initTransactionMessage(*transaction, session, true);
2195
// set end timestamp
2196
finalizeTransactionMessage(*transaction, session);
2198
message::Event *trx_event= transaction->mutable_event();
2200
trx_event->CopyFrom(event);
2202
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2206
return static_cast<int>(result);
2209
bool TransactionServices::sendStartupEvent(Session *session)
2211
message::Event event;
2212
event.set_type(message::Event::STARTUP);
2213
if (sendEvent(session, event) != 0)
2218
bool TransactionServices::sendShutdownEvent(Session *session)
2220
message::Event event;
2221
event.set_type(message::Event::SHUTDOWN);
2222
if (sendEvent(session, event) != 0)
2227
1619
} /* namespace drizzled */