161
145
return is_active;
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
166
message::Transaction *transaction= in_session->getTransactionMessage();
168
if (unlikely(transaction == NULL))
171
* Allocate and initialize a new transaction message
172
* for this Session object. Session is responsible for
173
* deleting transaction message when done with it.
175
transaction= new (nothrow) message::Transaction();
176
initTransaction(*transaction, in_session);
177
in_session->setTransactionMessage(transaction);
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
Session *in_session) const
187
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
188
trx->set_server_id(in_session->getServerId());
189
trx->set_transaction_id(in_session->getQueryId());
190
trx->set_start_timestamp(in_session->getCurrentTimestamp());
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
Session *in_session) const
196
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
trx->set_end_timestamp(in_session->getCurrentTimestamp());
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
Session *in_session) const
203
delete in_transaction;
204
in_session->setStatementMessage(NULL);
205
in_session->setTransactionMessage(NULL);
208
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
210
size_t num_statements= transaction.statement_size();
211
if (num_statements == 0)
215
* Only INSERT, UPDATE, and DELETE statements can possibly
216
* have bulk segments. So, we loop through the statements
217
* checking for segment_id > 1 in those specific submessages.
220
for (x= 0; x < num_statements; ++x)
222
const message::Statement &statement= transaction.statement(x);
223
message::Statement::Type type= statement.type();
227
case message::Statement::INSERT:
228
if (statement.insert_data().segment_id() > 1)
231
case message::Statement::UPDATE:
232
if (statement.update_data().segment_id() > 1)
235
case message::Statement::DELETE:
236
if (statement.delete_data().segment_id() > 1)
245
void ReplicationServices::commitTransaction(Session *in_session)
250
/* If there is an active statement message, finalize it */
251
message::Statement *statement= in_session->getStatementMessage();
253
if (statement != NULL)
255
finalizeStatement(*statement, in_session);
258
return; /* No data modification occurred inside the transaction */
260
message::Transaction* transaction= getActiveTransaction(in_session);
262
finalizeTransaction(*transaction, in_session);
266
cleanupTransaction(transaction, in_session);
269
void ReplicationServices::initStatement(message::Statement &statement,
270
message::Statement::Type in_type,
271
Session *in_session) const
273
statement.set_type(in_type);
274
statement.set_start_timestamp(in_session->getCurrentTimestamp());
275
/** @TODO Set sql string optionally */
278
void ReplicationServices::finalizeStatement(message::Statement &statement,
279
Session *in_session) const
281
statement.set_end_timestamp(in_session->getCurrentTimestamp());
282
in_session->setStatementMessage(NULL);
285
void ReplicationServices::rollbackTransaction(Session *in_session)
290
message::Transaction *transaction= getActiveTransaction(in_session);
293
* OK, so there are two situations that we need to deal with here:
295
* 1) We receive an instruction to ROLLBACK the current transaction
296
* and the currently-stored Transaction message is *self-contained*,
297
* meaning that no Statement messages in the Transaction message
298
* contain a message having its segment_id member greater than 1. If
299
* no non-segment ID 1 members are found, we can simply clear the
300
* current Transaction message and remove it from memory.
302
* 2) If the Transaction message does indeed have a non-end segment, that
303
* means that a bulk update/delete/insert Transaction message segment
304
* has previously been sent over the wire to replicators. In this case,
305
* we need to package a Transaction with a Statement message of type
306
* ROLLBACK to indicate to replicators that previously-transmitted
307
* messages must be un-applied.
309
if (unlikely(transactionContainsBulkSegment(*transaction)))
312
* Clear the transaction, create a Rollback statement message,
313
* attach it to the transaction, and push it to replicators.
315
transaction->Clear();
316
initTransaction(*transaction, in_session);
318
message::Statement *statement= transaction->add_statement();
320
initStatement(*statement, message::Statement::ROLLBACK, in_session);
321
finalizeStatement(*statement, in_session);
323
finalizeTransaction(*transaction, in_session);
327
cleanupTransaction(transaction, in_session);
330
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
331
Table *in_table) const
333
message::Statement *statement= in_session->getStatementMessage();
335
* We check to see if the current Statement message is of type INSERT.
336
* If it is not, we finalize the current Statement and ensure a new
337
* InsertStatement is created.
339
if (statement != NULL &&
340
statement->type() != message::Statement::INSERT)
342
finalizeStatement(*statement, in_session);
343
statement= in_session->getStatementMessage();
346
if (statement == NULL)
348
message::Transaction *transaction= getActiveTransaction(in_session);
350
* Transaction message initialized and set, but no statement created
351
* yet. We construct one and initialize it, here, then return the
352
* message after attaching the new Statement message pointer to the
353
* Session for easy retrieval later...
355
statement= transaction->add_statement();
356
setInsertHeader(*statement, in_session, in_table);
357
in_session->setStatementMessage(statement);
362
void ReplicationServices::setInsertHeader(message::Statement &statement,
364
Table *in_table) const
366
initStatement(statement, message::Statement::INSERT, in_session);
369
* Now we construct the specialized InsertHeader message inside
370
* the generalized message::Statement container...
372
/* Set up the insert header */
373
message::InsertHeader *header= statement.mutable_insert_header();
374
message::TableMetadata *table_metadata= header->mutable_table_metadata();
376
const char *schema_name= in_table->getShare()->getSchemaName();
377
const char *table_name= in_table->getShare()->table_name.str;
379
table_metadata->set_schema_name(schema_name);
380
table_metadata->set_table_name(table_name);
382
Field *current_field;
383
Field **table_fields= in_table->field;
385
message::FieldMetadata *field_metadata;
387
/* We will read all the table's fields... */
388
in_table->setReadSet();
390
while ((current_field= *table_fields++) != NULL)
392
field_metadata= header->add_field_metadata();
393
field_metadata->set_name(current_field->field_name);
394
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
398
bool ReplicationServices::insertRecord(Session *in_session, Table *in_table)
403
* We do this check here because we don't want to even create a
404
* statement if there isn't a primary key on the table...
408
* Multi-column primary keys are handled how exactly?
410
if (in_table->s->primary_key == MAX_KEY)
412
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
416
message::Statement &statement= getInsertStatement(in_session, in_table);
418
message::InsertData *data= statement.mutable_insert_data();
419
data->set_segment_id(1);
420
data->set_end_segment(true);
421
message::InsertRecord *record= data->add_record();
423
Field *current_field;
424
Field **table_fields= in_table->field;
426
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
427
string_value->set_charset(system_charset_info);
429
/* We will read all the table's fields... */
430
in_table->setReadSet();
432
while ((current_field= *table_fields++) != NULL)
434
string_value= current_field->val_str(string_value);
435
record->add_insert_value(string_value->c_ptr(), string_value->length());
436
string_value->free();
441
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
443
const unsigned char *old_record,
444
const unsigned char *new_record) const
446
message::Statement *statement= in_session->getStatementMessage();
448
* We check to see if the current Statement message is of type UPDATE.
449
* If it is not, we finalize the current Statement and ensure a new
450
* UpdateStatement is created.
452
if (statement != NULL &&
453
statement->type() != message::Statement::UPDATE)
455
finalizeStatement(*statement, in_session);
456
statement= in_session->getStatementMessage();
459
if (statement == NULL)
461
message::Transaction *transaction= getActiveTransaction(in_session);
463
* Transaction message initialized and set, but no statement created
464
* yet. We construct one and initialize it, here, then return the
465
* message after attaching the new Statement message pointer to the
466
* Session for easy retrieval later...
468
statement= transaction->add_statement();
469
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
470
in_session->setStatementMessage(statement);
475
void ReplicationServices::setUpdateHeader(message::Statement &statement,
478
const unsigned char *old_record,
479
const unsigned char *new_record) const
481
initStatement(statement, message::Statement::UPDATE, in_session);
484
* Now we construct the specialized UpdateHeader message inside
485
* the generalized message::Statement container...
487
/* Set up the update header */
488
message::UpdateHeader *header= statement.mutable_update_header();
489
message::TableMetadata *table_metadata= header->mutable_table_metadata();
491
const char *schema_name= in_table->getShare()->getSchemaName();
492
const char *table_name= in_table->getShare()->table_name.str;
494
table_metadata->set_schema_name(schema_name);
495
table_metadata->set_table_name(table_name);
497
Field *current_field;
498
Field **table_fields= in_table->field;
500
message::FieldMetadata *field_metadata;
502
/* We will read all the table's fields... */
503
in_table->setReadSet();
505
while ((current_field= *table_fields++) != NULL)
508
* We add the "key field metadata" -- i.e. the fields which is
509
* the primary key for the table.
511
if (in_table->s->fieldInPrimaryKey(current_field))
513
field_metadata= header->add_key_field_metadata();
514
field_metadata->set_name(current_field->field_name);
515
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
519
* The below really should be moved into the Field API and Record API. But for now
520
* we do this crazy pointer fiddling to figure out if the current field
521
* has been updated in the supplied record raw byte pointers.
523
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
524
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
526
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
528
if (memcmp(old_ptr, new_ptr, field_length) != 0)
530
/* Field is changed from old to new */
531
field_metadata= header->add_set_field_metadata();
532
field_metadata->set_name(current_field->field_name);
533
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
537
void ReplicationServices::updateRecord(Session *in_session,
539
const unsigned char *old_record,
540
const unsigned char *new_record)
545
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
547
message::UpdateData *data= statement.mutable_update_data();
548
data->set_segment_id(1);
549
data->set_end_segment(true);
550
message::UpdateRecord *record= data->add_record();
552
Field *current_field;
553
Field **table_fields= in_table->field;
554
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
555
string_value->set_charset(system_charset_info);
557
while ((current_field= *table_fields++) != NULL)
560
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
561
* but then realized that an UPDATE statement could potentially have different values for
562
* the SET field. For instance, imagine this SQL scenario:
564
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
565
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
566
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
568
* We will generate two UpdateRecord messages with different set_value byte arrays.
570
* The below really should be moved into the Field API and Record API. But for now
571
* we do this crazy pointer fiddling to figure out if the current field
572
* has been updated in the supplied record raw byte pointers.
574
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
575
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
577
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
579
if (memcmp(old_ptr, new_ptr, field_length) != 0)
581
/* Store the original "read bit" for this field */
582
bool is_read_set= current_field->isReadSet();
584
/* We need to mark that we will "read" this field... */
585
in_table->setReadSet(current_field->field_index);
587
/* Read the string value of this field's contents */
588
string_value= current_field->val_str(string_value);
591
* Reset the read bit after reading field to its original state. This
592
* prevents the field from being included in the WHERE clause
594
current_field->setReadSet(is_read_set);
596
record->add_after_value(string_value->c_ptr(), string_value->length());
597
string_value->free();
601
* Add the WHERE clause values now...for now, this means the
602
* primary key field value. Replication only supports tables
603
* with a primary key.
605
if (in_table->s->fieldInPrimaryKey(current_field))
608
* To say the below is ugly is an understatement. But it works.
610
* @todo Move this crap into a real Record API.
612
string_value= current_field->val_str(string_value,
614
current_field->offset(const_cast<unsigned char *>(new_record)));
615
record->add_key_value(string_value->c_ptr(), string_value->length());
616
string_value->free();
622
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
623
Table *in_table) const
625
message::Statement *statement= in_session->getStatementMessage();
627
* We check to see if the current Statement message is of type DELETE.
628
* If it is not, we finalize the current Statement and ensure a new
629
* DeleteStatement is created.
631
if (statement != NULL &&
632
statement->type() != message::Statement::DELETE)
634
finalizeStatement(*statement, in_session);
635
statement= in_session->getStatementMessage();
638
if (statement == NULL)
640
message::Transaction *transaction= getActiveTransaction(in_session);
642
* Transaction message initialized and set, but no statement created
643
* yet. We construct one and initialize it, here, then return the
644
* message after attaching the new Statement message pointer to the
645
* Session for easy retrieval later...
647
statement= transaction->add_statement();
648
setDeleteHeader(*statement, in_session, in_table);
649
in_session->setStatementMessage(statement);
654
void ReplicationServices::setDeleteHeader(message::Statement &statement,
656
Table *in_table) const
658
initStatement(statement, message::Statement::DELETE, in_session);
661
* Now we construct the specialized DeleteHeader message inside
662
* the generalized message::Statement container...
664
message::DeleteHeader *header= statement.mutable_delete_header();
665
message::TableMetadata *table_metadata= header->mutable_table_metadata();
667
const char *schema_name= in_table->getShare()->getSchemaName();
668
const char *table_name= in_table->getShare()->table_name.str;
670
table_metadata->set_schema_name(schema_name);
671
table_metadata->set_table_name(table_name);
673
Field *current_field;
674
Field **table_fields= in_table->field;
676
message::FieldMetadata *field_metadata;
678
while ((current_field= *table_fields++) != NULL)
681
* Add the WHERE clause values now...for now, this means the
682
* primary key field value. Replication only supports tables
683
* with a primary key.
685
if (in_table->s->fieldInPrimaryKey(current_field))
687
field_metadata= header->add_key_field_metadata();
688
field_metadata->set_name(current_field->field_name);
689
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
694
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
699
message::Statement &statement= getDeleteStatement(in_session, in_table);
701
message::DeleteData *data= statement.mutable_delete_data();
702
data->set_segment_id(1);
703
data->set_end_segment(true);
704
message::DeleteRecord *record= data->add_record();
706
Field *current_field;
707
Field **table_fields= in_table->field;
708
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
709
string_value->set_charset(system_charset_info);
711
while ((current_field= *table_fields++) != NULL)
714
* Add the WHERE clause values now...for now, this means the
715
* primary key field value. Replication only supports tables
716
* with a primary key.
718
if (in_table->s->fieldInPrimaryKey(current_field))
720
string_value= current_field->val_str(string_value);
721
record->add_key_value(string_value->c_ptr(), string_value->length());
723
* @TODO Store optional old record value in the before data member
725
string_value->free();
730
void ReplicationServices::createTable(Session *in_session,
731
const message::Table &table)
736
message::Transaction *transaction= getActiveTransaction(in_session);
737
message::Statement *statement= transaction->add_statement();
739
initStatement(*statement, message::Statement::CREATE_TABLE, in_session);
742
* Construct the specialized CreateTableStatement message and attach
743
* it to the generic Statement message
745
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
746
message::Table *new_table_message= create_table_statement->mutable_table();
747
*new_table_message= table;
749
finalizeStatement(*statement, in_session);
751
finalizeTransaction(*transaction, in_session);
755
cleanupTransaction(transaction, in_session);
759
void ReplicationServices::createSchema(Session *in_session,
760
const message::Schema &schema)
765
message::Transaction *transaction= getActiveTransaction(in_session);
766
message::Statement *statement= transaction->add_statement();
768
initStatement(*statement, message::Statement::CREATE_SCHEMA, in_session);
771
* Construct the specialized CreateSchemaStatement message and attach
772
* it to the generic Statement message
774
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
775
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
776
*new_schema_message= schema;
778
finalizeStatement(*statement, in_session);
780
finalizeTransaction(*transaction, in_session);
784
cleanupTransaction(transaction, in_session);
788
void ReplicationServices::dropSchema(Session *in_session, const string &schema_name)
793
message::Transaction *transaction= getActiveTransaction(in_session);
794
message::Statement *statement= transaction->add_statement();
796
initStatement(*statement, message::Statement::DROP_SCHEMA, in_session);
799
* Construct the specialized DropSchemaStatement message and attach
800
* it to the generic Statement message
802
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
804
drop_schema_statement->set_schema_name(schema_name);
806
finalizeStatement(*statement, in_session);
808
finalizeTransaction(*transaction, in_session);
812
cleanupTransaction(transaction, in_session);
815
void ReplicationServices::dropTable(Session *in_session,
816
const string &schema_name,
817
const string &table_name,
823
message::Transaction *transaction= getActiveTransaction(in_session);
824
message::Statement *statement= transaction->add_statement();
826
initStatement(*statement, message::Statement::DROP_TABLE, in_session);
829
* Construct the specialized DropTableStatement message and attach
830
* it to the generic Statement message
832
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
834
drop_table_statement->set_if_exists_clause(if_exists);
836
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
838
table_metadata->set_schema_name(schema_name);
839
table_metadata->set_table_name(table_name);
841
finalizeStatement(*statement, in_session);
843
finalizeTransaction(*transaction, in_session);
847
cleanupTransaction(transaction, in_session);
850
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
855
message::Transaction *transaction= getActiveTransaction(in_session);
856
message::Statement *statement= transaction->add_statement();
858
initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
861
* Construct the specialized TruncateTableStatement message and attach
862
* it to the generic Statement message
864
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
865
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
867
const char *schema_name= in_table->getShare()->getSchemaName();
868
const char *table_name= in_table->getShare()->table_name.str;
870
table_metadata->set_schema_name(schema_name);
871
table_metadata->set_table_name(table_name);
873
finalizeStatement(*statement, in_session);
875
finalizeTransaction(*transaction, in_session);
879
cleanupTransaction(transaction, in_session);
882
void ReplicationServices::rawStatement(Session *in_session, const string &query)
887
message::Transaction *transaction= getActiveTransaction(in_session);
888
message::Statement *statement= transaction->add_statement();
890
initStatement(*statement, message::Statement::RAW_SQL, in_session);
891
statement->set_sql(query);
892
finalizeStatement(*statement, in_session);
894
finalizeTransaction(*transaction, in_session);
898
cleanupTransaction(transaction, in_session);
901
void ReplicationServices::push(message::Transaction &to_push)
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
903
150
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
904
151
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;