430
Check if we can skip the two-phase commit.
432
A helper function to evaluate if two-phase commit is mandatory.
433
As a side effect, propagates the read-only/read-write flags
434
of the statement transaction to its enclosing normal transaction.
436
@retval true we must run a two-phase commit. Returned
437
if we have at least two engines with read-write changes.
438
@retval false Don't need two-phase commit. Even if we have two
439
transactional engines, we can run two independent
440
commits if changes in one of the engines are read-only.
444
ha_check_and_coalesce_trx_read_only(Session *session,
445
TransactionContext::ResourceContexts &resource_contexts,
446
bool normal_transaction)
448
/* The number of storage engines that have actual changes. */
449
unsigned num_resources_modified_data= 0;
450
ResourceContext *resource_context;
452
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
it != resource_contexts.end();
456
resource_context= *it;
457
if (resource_context->hasModifiedData())
458
++num_resources_modified_data;
460
if (! normal_transaction)
462
ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
assert(resource_context != resource_context_normal);
465
Merge read-only/read-write information about statement
466
transaction to its enclosing normal transaction. Do this
467
only if in a real transaction -- that is, if we know
468
that resource_context_all is registered in session->transaction.all.
469
Since otherwise we only clutter the normal transaction flags.
471
if (resource_context_normal->isStarted()) /* false if autocommit. */
472
resource_context_normal->coalesceWith(resource_context);
474
else if (num_resources_modified_data > 1)
477
It is a normal transaction, so we don't need to merge read/write
478
information up, and the need for two-phase commit has been
479
already established. Break the loop prematurely.
484
return num_resources_modified_data > 1;
523
464
if (resource_contexts.empty() == false)
527
466
if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
529
ha_rollback_trans(session, normal_transaction);
468
rollbackTransaction(session, normal_transaction);
533
must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
535
if (! trans->no_2pc && must_2pc)
473
* If replication is on, we do a PREPARE on the resource managers, push the
474
* Transaction message across the replication stream, and then COMMIT if the
475
* replication stream returned successfully.
477
if (shouldConstructMessages())
537
479
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
480
it != resource_contexts.end() && ! error;
751
691
the user has used LOCK TABLES then that mechanism does not know to do the
754
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
694
int TransactionServices::autocommitOrRollback(Session *session, int error)
756
697
if (session->transaction.stmt.getResourceContexts().empty() == false)
699
TransactionContext *trans = &session->transaction.stmt;
700
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
701
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
702
it != resource_contexts.end();
705
ResourceContext *resource_context= *it;
707
resource_context->getTransactionalStorageEngine()->endStatement(session);
760
if (ha_commit_trans(session, false))
712
if (commitTransaction(session, false))
765
(void) ha_rollback_trans(session, false);
717
(void) rollbackTransaction(session, false);
766
718
if (session->transaction_rollback_request)
767
(void) ha_rollback_trans(session, true);
719
(void) rollbackTransaction(session, true);
770
722
session->variables.tx_isolation= session->session_tx_isolation;
776
return the list of XID's to a client, the same way SHOW commands do.
779
I didn't find in XA specs that an RM cannot return the same XID twice,
780
so mysql_xa_recover does not filter XID's to ensure uniqueness.
781
It can be easily fixed later, if necessary.
783
bool TransactionServices::mysql_xa_recover(Session *session)
785
List<Item> field_list;
789
field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790
field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791
field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792
field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
794
if (session->client->sendFields(&field_list))
797
pthread_mutex_lock(&LOCK_xid_cache);
798
while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
800
if (xs->xa_state==XA_PREPARED)
802
session->client->store((int64_t)xs->xid.formatID);
803
session->client->store((int64_t)xs->xid.gtrid_length);
804
session->client->store((int64_t)xs->xid.bqual_length);
805
session->client->store(xs->xid.data,
806
xs->xid.gtrid_length+xs->xid.bqual_length);
807
if (session->client->flush())
809
pthread_mutex_unlock(&LOCK_xid_cache);
815
pthread_mutex_unlock(&LOCK_xid_cache);
820
727
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
822
729
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
910
status_var_increment(session->status_var.ha_rollback_count);
824
session->status_var.ha_rollback_count++;
913
827
resource_context->reset(); /* keep it conveniently zero-filled */
916
830
trans->setResourceContexts(sv_resource_contexts);
832
if (shouldConstructMessages())
834
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
835
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
836
if (savepoint_transaction != NULL)
838
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
839
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
840
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
841
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
843
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
844
uint32_t num_statements = savepoint_transaction_copy->statement_size();
845
if (num_statements == 0)
847
session->setStatementMessage(NULL);
851
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
853
session->setTransactionMessage(savepoint_transaction_copy);
1115
1087
finalizeTransactionMessage(*transaction, in_session);
1117
replication_services.pushTransactionMessage(*transaction);
1089
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1119
1091
cleanupTransactionMessage(transaction, in_session);
1122
1094
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1096
uint32_t *next_segment_id)
1125
1098
message::Statement *statement= in_session->getStatementMessage();
1127
* We check to see if the current Statement message is of type INSERT.
1128
* If it is not, we finalize the current Statement and ensure a new
1129
* InsertStatement is created.
1099
message::Transaction *transaction= NULL;
1102
* Check the type for the current Statement message, if it is anything
1103
* other then INSERT we need to call finalize, this will ensure a
1104
* new InsertStatement is created. If it is of type INSERT check
1105
* what table the INSERT belongs to, if it is a different table
1106
* call finalize, so a new InsertStatement can be created.
1131
if (statement != NULL &&
1132
statement->type() != message::Statement::INSERT)
1108
if (statement != NULL && statement->type() != message::Statement::INSERT)
1134
1110
finalizeStatementMessage(*statement, in_session);
1135
1111
statement= in_session->getStatementMessage();
1113
else if (statement != NULL)
1115
transaction= getActiveTransactionMessage(in_session);
1118
* If we've passed our threshold for the statement size (possible for
1119
* a bulk insert), we'll finalize the Statement and Transaction (doing
1120
* the Transaction will keep it from getting huge).
1122
if (static_cast<size_t>(transaction->ByteSize()) >=
1123
in_session->variables.transaction_message_threshold)
1125
/* Remember the transaction ID so we can re-use it */
1126
uint64_t trx_id= transaction->transaction_context().transaction_id();
1128
message::InsertData *current_data= statement->mutable_insert_data();
1130
/* Caller should use this value when adding a new record */
1131
*next_segment_id= current_data->segment_id() + 1;
1133
current_data->set_end_segment(false);
1136
* Send the trx message to replicators after finalizing the
1137
* statement and transaction. This will also set the Transaction
1138
* and Statement objects in Session to NULL.
1140
commitTransactionMessage(in_session);
1143
* Statement and Transaction should now be NULL, so new ones will get
1144
* created. We reuse the transaction id since we are segmenting
1147
statement= in_session->getStatementMessage();
1148
transaction= getActiveTransactionMessage(in_session, false);
1149
assert(transaction != NULL);
1151
/* Set the transaction ID to match the previous messages */
1152
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1156
const message::InsertHeader &insert_header= statement->insert_header();
1157
string old_table_name= insert_header.table_metadata().table_name();
1159
string current_table_name;
1160
(void) in_table->getShare()->getTableName(current_table_name);
1162
if (current_table_name.compare(old_table_name))
1164
finalizeStatementMessage(*statement, in_session);
1165
statement= in_session->getStatementMessage();
1169
/* carry forward the existing segment id */
1170
const message::InsertData ¤t_data= statement->insert_data();
1171
*next_segment_id= current_data.segment_id();
1138
1176
if (statement == NULL)
1140
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1179
* Transaction will be non-NULL only if we had to segment it due to
1180
* transaction size above.
1182
if (transaction == NULL)
1183
transaction= getActiveTransactionMessage(in_session);
1142
1186
* Transaction message initialized and set, but no statement created
1143
1187
* yet. We construct one and initialize it, here, then return the
1203
1247
* Multi-column primary keys are handled how exactly?
1205
if (in_table->s->primary_key == MAX_KEY)
1249
if (not in_table->getShare()->hasPrimaryKey())
1207
1251
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1211
message::Statement &statement= getInsertStatement(in_session, in_table);
1255
uint32_t next_segment_id= 1;
1256
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1213
1258
message::InsertData *data= statement.mutable_insert_data();
1214
data->set_segment_id(1);
1259
data->set_segment_id(next_segment_id);
1215
1260
data->set_end_segment(true);
1216
1261
message::InsertRecord *record= data->add_record();
1218
1263
Field *current_field;
1219
Field **table_fields= in_table->field;
1264
Field **table_fields= in_table->getFields();
1221
1266
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1222
1267
string_value->set_charset(system_charset_info);
1236
1290
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1237
1291
Table *in_table,
1238
1292
const unsigned char *old_record,
1239
const unsigned char *new_record)
1293
const unsigned char *new_record,
1294
uint32_t *next_segment_id)
1241
1296
message::Statement *statement= in_session->getStatementMessage();
1297
message::Transaction *transaction= NULL;
1243
* We check to see if the current Statement message is of type UPDATE.
1244
* If it is not, we finalize the current Statement and ensure a new
1245
* UpdateStatement is created.
1300
* Check the type for the current Statement message, if it is anything
1301
* other then UPDATE we need to call finalize, this will ensure a
1302
* new UpdateStatement is created. If it is of type UPDATE check
1303
* what table the UPDATE belongs to, if it is a different table
1304
* call finalize, so a new UpdateStatement can be created.
1247
if (statement != NULL &&
1248
statement->type() != message::Statement::UPDATE)
1306
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1250
1308
finalizeStatementMessage(*statement, in_session);
1251
1309
statement= in_session->getStatementMessage();
1311
else if (statement != NULL)
1313
transaction= getActiveTransactionMessage(in_session);
1316
* If we've passed our threshold for the statement size (possible for
1317
* a bulk insert), we'll finalize the Statement and Transaction (doing
1318
* the Transaction will keep it from getting huge).
1320
if (static_cast<size_t>(transaction->ByteSize()) >=
1321
in_session->variables.transaction_message_threshold)
1323
/* Remember the transaction ID so we can re-use it */
1324
uint64_t trx_id= transaction->transaction_context().transaction_id();
1326
message::UpdateData *current_data= statement->mutable_update_data();
1328
/* Caller should use this value when adding a new record */
1329
*next_segment_id= current_data->segment_id() + 1;
1331
current_data->set_end_segment(false);
1334
* Send the trx message to replicators after finalizing the
1335
* statement and transaction. This will also set the Transaction
1336
* and Statement objects in Session to NULL.
1338
commitTransactionMessage(in_session);
1341
* Statement and Transaction should now be NULL, so new ones will get
1342
* created. We reuse the transaction id since we are segmenting
1345
statement= in_session->getStatementMessage();
1346
transaction= getActiveTransactionMessage(in_session, false);
1347
assert(transaction != NULL);
1349
/* Set the transaction ID to match the previous messages */
1350
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1354
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1356
/* carry forward the existing segment id */
1357
const message::UpdateData ¤t_data= statement->update_data();
1358
*next_segment_id= current_data.segment_id();
1362
finalizeStatementMessage(*statement, in_session);
1363
statement= in_session->getStatementMessage();
1254
1368
if (statement == NULL)
1256
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1371
* Transaction will be non-NULL only if we had to segment it due to
1372
* transaction size above.
1374
if (transaction == NULL)
1375
transaction= getActiveTransactionMessage(in_session);
1258
1378
* Transaction message initialized and set, but no statement created
1259
1379
* yet. We construct one and initialize it, here, then return the
1267
1387
return *statement;
1390
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1392
const unsigned char *old_record,
1393
const unsigned char *new_record)
1395
const message::UpdateHeader &update_header= statement.update_header();
1396
string old_table_name= update_header.table_metadata().table_name();
1398
string current_table_name;
1399
(void) in_table->getShare()->getTableName(current_table_name);
1400
if (current_table_name.compare(old_table_name))
1406
/* Compare the set fields in the existing UpdateHeader and see if they
1407
* match the updated fields in the new record, if they do not we must
1408
* create a new UpdateHeader
1410
size_t num_set_fields= update_header.set_field_metadata_size();
1412
Field *current_field;
1413
Field **table_fields= in_table->getFields();
1414
in_table->setReadSet();
1416
size_t num_calculated_updated_fields= 0;
1418
while ((current_field= *table_fields++) != NULL)
1420
if (num_calculated_updated_fields > num_set_fields)
1425
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1427
/* check that this field exists in the UpdateHeader record */
1430
for (size_t x= 0; x < num_set_fields; ++x)
1432
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1433
string name= field_metadata.name();
1434
if (name.compare(current_field->field_name) == 0)
1437
++num_calculated_updated_fields;
1448
if ((num_calculated_updated_fields == num_set_fields) && found)
1270
1459
void TransactionServices::setUpdateHeader(message::Statement &statement,
1271
1460
Session *in_session,
1272
1461
Table *in_table,
1305
1494
* We add the "key field metadata" -- i.e. the fields which is
1306
1495
* the primary key for the table.
1308
if (in_table->s->fieldInPrimaryKey(current_field))
1497
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1310
1499
field_metadata= header->add_key_field_metadata();
1311
1500
field_metadata->set_name(current_field->field_name);
1312
1501
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1316
* The below really should be moved into the Field API and Record API. But for now
1317
* we do this crazy pointer fiddling to figure out if the current field
1318
* has been updated in the supplied record raw byte pointers.
1320
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1321
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1323
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1325
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1504
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1327
1506
/* Field is changed from old to new */
1328
1507
field_metadata= header->add_set_field_metadata();
1340
1519
if (! replication_services.isActive())
1343
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1522
uint32_t next_segment_id= 1;
1523
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1345
1525
message::UpdateData *data= statement.mutable_update_data();
1346
data->set_segment_id(1);
1526
data->set_segment_id(next_segment_id);
1347
1527
data->set_end_segment(true);
1348
1528
message::UpdateRecord *record= data->add_record();
1350
1530
Field *current_field;
1351
Field **table_fields= in_table->field;
1531
Field **table_fields= in_table->getFields();
1352
1532
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1353
1533
string_value->set_charset(system_charset_info);
1364
1544
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1366
1546
* We will generate two UpdateRecord messages with different set_value byte arrays.
1368
* The below really should be moved into the Field API and Record API. But for now
1369
* we do this crazy pointer fiddling to figure out if the current field
1370
* has been updated in the supplied record raw byte pointers.
1372
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1373
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1375
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1377
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1548
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1379
1550
/* Store the original "read bit" for this field */
1380
1551
bool is_read_set= current_field->isReadSet();
1600
bool TransactionServices::isFieldUpdated(Field *current_field,
1602
const unsigned char *old_record,
1603
const unsigned char *new_record)
1606
* The below really should be moved into the Field API and Record API. But for now
1607
* we do this crazy pointer fiddling to figure out if the current field
1608
* has been updated in the supplied record raw byte pointers.
1610
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1611
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1613
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1615
bool old_value_is_null= current_field->is_null_in_record(old_record);
1616
bool new_value_is_null= current_field->is_null_in_record(new_record);
1618
bool isUpdated= false;
1619
if (old_value_is_null != new_value_is_null)
1621
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1625
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1633
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1420
1641
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1643
uint32_t *next_segment_id)
1423
1645
message::Statement *statement= in_session->getStatementMessage();
1646
message::Transaction *transaction= NULL;
1425
* We check to see if the current Statement message is of type DELETE.
1426
* If it is not, we finalize the current Statement and ensure a new
1427
* DeleteStatement is created.
1649
* Check the type for the current Statement message, if it is anything
1650
* other then DELETE we need to call finalize, this will ensure a
1651
* new DeleteStatement is created. If it is of type DELETE check
1652
* what table the DELETE belongs to, if it is a different table
1653
* call finalize, so a new DeleteStatement can be created.
1429
if (statement != NULL &&
1430
statement->type() != message::Statement::DELETE)
1655
if (statement != NULL && statement->type() != message::Statement::DELETE)
1432
1657
finalizeStatementMessage(*statement, in_session);
1433
1658
statement= in_session->getStatementMessage();
1660
else if (statement != NULL)
1662
transaction= getActiveTransactionMessage(in_session);
1665
* If we've passed our threshold for the statement size (possible for
1666
* a bulk insert), we'll finalize the Statement and Transaction (doing
1667
* the Transaction will keep it from getting huge).
1669
if (static_cast<size_t>(transaction->ByteSize()) >=
1670
in_session->variables.transaction_message_threshold)
1672
/* Remember the transaction ID so we can re-use it */
1673
uint64_t trx_id= transaction->transaction_context().transaction_id();
1675
message::DeleteData *current_data= statement->mutable_delete_data();
1677
/* Caller should use this value when adding a new record */
1678
*next_segment_id= current_data->segment_id() + 1;
1680
current_data->set_end_segment(false);
1683
* Send the trx message to replicators after finalizing the
1684
* statement and transaction. This will also set the Transaction
1685
* and Statement objects in Session to NULL.
1687
commitTransactionMessage(in_session);
1690
* Statement and Transaction should now be NULL, so new ones will get
1691
* created. We reuse the transaction id since we are segmenting
1694
statement= in_session->getStatementMessage();
1695
transaction= getActiveTransactionMessage(in_session, false);
1696
assert(transaction != NULL);
1698
/* Set the transaction ID to match the previous messages */
1699
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1703
const message::DeleteHeader &delete_header= statement->delete_header();
1704
string old_table_name= delete_header.table_metadata().table_name();
1706
string current_table_name;
1707
(void) in_table->getShare()->getTableName(current_table_name);
1708
if (current_table_name.compare(old_table_name))
1710
finalizeStatementMessage(*statement, in_session);
1711
statement= in_session->getStatementMessage();
1715
/* carry forward the existing segment id */
1716
const message::DeleteData ¤t_data= statement->delete_data();
1717
*next_segment_id= current_data.segment_id();
1436
1722
if (statement == NULL)
1438
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1725
* Transaction will be non-NULL only if we had to segment it due to
1726
* transaction size above.
1728
if (transaction == NULL)
1729
transaction= getActiveTransactionMessage(in_session);
1440
1732
* Transaction message initialized and set, but no statement created
1441
1733
* yet. We construct one and initialize it, here, then return the
1494
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1786
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1496
1788
ReplicationServices &replication_services= ReplicationServices::singleton();
1497
1789
if (! replication_services.isActive())
1500
message::Statement &statement= getDeleteStatement(in_session, in_table);
1792
uint32_t next_segment_id= 1;
1793
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1502
1795
message::DeleteData *data= statement.mutable_delete_data();
1503
data->set_segment_id(1);
1796
data->set_segment_id(next_segment_id);
1504
1797
data->set_end_segment(true);
1505
1798
message::DeleteRecord *record= data->add_record();
1507
1800
Field *current_field;
1508
Field **table_fields= in_table->field;
1801
Field **table_fields= in_table->getFields();
1509
1802
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1510
1803
string_value->set_charset(system_charset_info);
1516
1809
* primary key field value. Replication only supports tables
1517
1810
* with a primary key.
1519
if (in_table->s->fieldInPrimaryKey(current_field))
1812
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1521
string_value= current_field->val_str(string_value);
1814
if (use_update_record)
1817
* Temporarily point to the update record to get its value.
1818
* This is pretty much a hack in order to get the PK value from
1819
* the update record rather than the insert record. Field::val_str()
1820
* should not change anything in Field::ptr, so this should be safe.
1821
* We are careful not to change anything in old_ptr.
1823
const unsigned char *old_ptr= current_field->ptr;
1824
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1825
string_value= current_field->val_str(string_value);
1826
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1830
string_value= current_field->val_str(string_value);
1832
* @TODO Store optional old record value in the before data member
1522
1835
record->add_key_value(string_value->c_ptr(), string_value->length());
1524
* @TODO Store optional old record value in the before data member
1836
string_value->free();
1843
* Template for removing Statement records of different types.
1845
* The code for removing records from different Statement message types
1846
* is identical except for the class types that are embedded within the
1849
* There are 3 scenarios we need to look for:
1850
* - We've been asked to remove more records than exist in the Statement
1851
* - We've been asked to remove less records than exist in the Statement
1852
* - We've been asked to remove ALL records that exist in the Statement
1854
* If we are removing ALL records, then effectively we would be left with
1855
* an empty Statement message, so we should just remove it and clean up
1856
* message pointers in the Session object.
1858
template <class DataType, class RecordType>
1859
static bool removeStatementRecordsWithType(Session *session,
1863
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1865
/* If there aren't enough records to remove 'count' of them, error. */
1866
if (num_avail_recs < count)
1870
* If we are removing all of the data records, we'll just remove this
1871
* entire Statement message.
1873
if (num_avail_recs == count)
1875
message::Transaction *transaction= session->getTransactionMessage();
1876
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1877
statements->RemoveLast();
1880
* Now need to set the Session Statement pointer to either the previous
1881
* Statement, or NULL if there isn't one.
1883
if (statements->size() == 0)
1885
session->setStatementMessage(NULL);
1890
* There isn't a great way to get a pointer to the previous Statement
1891
* message using the RepeatedPtrField object, so we'll just get to it
1892
* using the Transaction message.
1526
string_value->free();
1894
int last_stmt_idx= transaction->statement_size() - 1;
1895
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1898
/* We only need to remove 'count' records */
1899
else if (num_avail_recs > count)
1901
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1903
records->RemoveLast();
1910
bool TransactionServices::removeStatementRecords(Session *session,
1913
ReplicationServices &replication_services= ReplicationServices::singleton();
1914
if (! replication_services.isActive())
1917
/* Get the most current Statement */
1918
message::Statement *statement= session->getStatementMessage();
1920
/* Make sure we have work to do */
1921
if (statement == NULL)
1926
switch (statement->type())
1928
case message::Statement::INSERT:
1930
message::InsertData *data= statement->mutable_insert_data();
1931
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1935
case message::Statement::UPDATE:
1937
message::UpdateData *data= statement->mutable_update_data();
1938
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1942
case message::Statement::DELETE: /* not sure if this one is possible... */
1944
message::DeleteData *data= statement->mutable_delete_data();
1945
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
1531
1958
void TransactionServices::createTable(Session *in_session,
1532
1959
const message::Table &table)
1703
2130
finalizeTransactionMessage(*transaction, in_session);
1705
replication_services.pushTransactionMessage(*transaction);
2132
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1707
2134
cleanupTransactionMessage(transaction, in_session);
2137
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2139
ReplicationServices &replication_services= ReplicationServices::singleton();
2140
if (! replication_services.isActive())
2143
message::Transaction *transaction= new (nothrow) message::Transaction();
2145
// set server id, start timestamp
2146
initTransactionMessage(*transaction, session, true);
2148
// set end timestamp
2149
finalizeTransactionMessage(*transaction, session);
2151
message::Event *trx_event= transaction->mutable_event();
2153
trx_event->CopyFrom(event);
2155
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2159
return static_cast<int>(result);
2162
bool TransactionServices::sendStartupEvent(Session *session)
2164
message::Event event;
2165
event.set_type(message::Event::STARTUP);
2166
if (sendEvent(session, event) != 0)
2171
bool TransactionServices::sendShutdownEvent(Session *session)
2173
message::Event event;
2174
event.set_type(message::Event::SHUTDOWN);
2175
if (sendEvent(session, event) != 0)
1710
2180
} /* namespace drizzled */