161
145
return is_active;
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
166
message::Transaction *transaction= in_session->getTransactionMessage();
168
if (unlikely(transaction == NULL))
171
* Allocate and initialize a new transaction message
172
* for this Session object. Session is responsible for
173
* deleting transaction message when done with it.
175
transaction= new (nothrow) message::Transaction();
176
initTransaction(*transaction, in_session);
177
in_session->setTransactionMessage(transaction);
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
Session *in_session) const
187
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
188
trx->set_server_id(in_session->getServerId());
189
trx->set_transaction_id(in_session->getQueryId());
190
trx->set_start_timestamp(in_session->getCurrentTimestamp());
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
Session *in_session) const
196
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
trx->set_end_timestamp(in_session->getCurrentTimestamp());
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
Session *in_session) const
203
delete in_transaction;
204
in_session->setStatementMessage(NULL);
205
in_session->setTransactionMessage(NULL);
208
void ReplicationServices::commitTransaction(Session *in_session)
213
/* If there is an active statement message, finalize it */
214
message::Statement *statement= in_session->getStatementMessage();
216
if (statement != NULL)
218
finalizeStatement(*statement, in_session);
221
return; /* No data modification occurred inside the transaction */
223
message::Transaction* transaction= getActiveTransaction(in_session);
225
finalizeTransaction(*transaction, in_session);
229
cleanupTransaction(transaction, in_session);
232
void ReplicationServices::initStatement(message::Statement &statement,
233
message::Statement::Type in_type,
234
Session *in_session) const
236
statement.set_type(in_type);
237
statement.set_start_timestamp(in_session->getCurrentTimestamp());
238
/** @TODO Set sql string optionally */
241
void ReplicationServices::finalizeStatement(message::Statement &statement,
242
Session *in_session) const
244
statement.set_end_timestamp(in_session->getCurrentTimestamp());
245
in_session->setStatementMessage(NULL);
248
void ReplicationServices::rollbackTransaction(Session *in_session)
253
message::Transaction *transaction= getActiveTransaction(in_session);
256
* OK, so there are two situations that we need to deal with here:
258
* 1) We receive an instruction to ROLLBACK the current transaction
259
* and the currently-stored Transaction message is *self-contained*,
260
* meaning that no Statement messages in the Transaction message
261
* contain a message having its segment_id member greater than 1. If
262
* no non-segment ID 1 members are found, we can simply clear the
263
* current Transaction message and remove it from memory.
265
* 2) If the Transaction message does indeed have a non-end segment, that
266
* means that a bulk update/delete/insert Transaction message segment
267
* has previously been sent over the wire to replicators. In this case,
268
* we need to package a Transaction with a Statement message of type
269
* ROLLBACK to indicate to replicators that previously-transmitted
270
* messages must be un-applied.
272
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
275
* Clear the transaction, create a Rollback statement message,
276
* attach it to the transaction, and push it to replicators.
278
transaction->Clear();
279
initTransaction(*transaction, in_session);
281
message::Statement *statement= transaction->add_statement();
283
initStatement(*statement, message::Statement::ROLLBACK, in_session);
284
finalizeStatement(*statement, in_session);
286
finalizeTransaction(*transaction, in_session);
290
cleanupTransaction(transaction, in_session);
293
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
294
Table *in_table) const
296
message::Statement *statement= in_session->getStatementMessage();
298
* We check to see if the current Statement message is of type INSERT.
299
* If it is not, we finalize the current Statement and ensure a new
300
* InsertStatement is created.
302
if (statement != NULL &&
303
statement->type() != message::Statement::INSERT)
305
finalizeStatement(*statement, in_session);
306
statement= in_session->getStatementMessage();
309
if (statement == NULL)
311
message::Transaction *transaction= getActiveTransaction(in_session);
313
* Transaction message initialized and set, but no statement created
314
* yet. We construct one and initialize it, here, then return the
315
* message after attaching the new Statement message pointer to the
316
* Session for easy retrieval later...
318
statement= transaction->add_statement();
319
setInsertHeader(*statement, in_session, in_table);
320
in_session->setStatementMessage(statement);
325
void ReplicationServices::setInsertHeader(message::Statement &statement,
327
Table *in_table) const
329
initStatement(statement, message::Statement::INSERT, in_session);
332
* Now we construct the specialized InsertHeader message inside
333
* the generalized message::Statement container...
335
/* Set up the insert header */
336
message::InsertHeader *header= statement.mutable_insert_header();
337
message::TableMetadata *table_metadata= header->mutable_table_metadata();
339
const char *schema_name= in_table->getShare()->db.str;
340
const char *table_name= in_table->getShare()->table_name.str;
342
table_metadata->set_schema_name(schema_name);
343
table_metadata->set_table_name(table_name);
345
Field *current_field;
346
Field **table_fields= in_table->field;
348
message::FieldMetadata *field_metadata;
350
/* We will read all the table's fields... */
351
in_table->setReadSet();
353
while ((current_field= *table_fields++) != NULL)
355
field_metadata= header->add_field_metadata();
356
field_metadata->set_name(current_field->field_name);
357
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
361
bool ReplicationServices::insertRecord(Session *in_session, Table *in_table)
366
* We do this check here because we don't want to even create a
367
* statement if there isn't a primary key on the table...
371
* Multi-column primary keys are handled how exactly?
373
if (in_table->s->primary_key == MAX_KEY)
375
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
379
message::Statement &statement= getInsertStatement(in_session, in_table);
381
message::InsertData *data= statement.mutable_insert_data();
382
data->set_segment_id(1);
383
data->set_end_segment(true);
384
message::InsertRecord *record= data->add_record();
386
Field *current_field;
387
Field **table_fields= in_table->field;
389
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
390
string_value->set_charset(system_charset_info);
392
/* We will read all the table's fields... */
393
in_table->setReadSet();
395
while ((current_field= *table_fields++) != NULL)
397
string_value= current_field->val_str(string_value);
398
record->add_insert_value(string_value->c_ptr(), string_value->length());
399
string_value->free();
404
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
406
const unsigned char *old_record,
407
const unsigned char *new_record) const
409
message::Statement *statement= in_session->getStatementMessage();
411
* We check to see if the current Statement message is of type UPDATE.
412
* If it is not, we finalize the current Statement and ensure a new
413
* UpdateStatement is created.
415
if (statement != NULL &&
416
statement->type() != message::Statement::UPDATE)
418
finalizeStatement(*statement, in_session);
419
statement= in_session->getStatementMessage();
422
if (statement == NULL)
424
message::Transaction *transaction= getActiveTransaction(in_session);
426
* Transaction message initialized and set, but no statement created
427
* yet. We construct one and initialize it, here, then return the
428
* message after attaching the new Statement message pointer to the
429
* Session for easy retrieval later...
431
statement= transaction->add_statement();
432
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
433
in_session->setStatementMessage(statement);
438
void ReplicationServices::setUpdateHeader(message::Statement &statement,
441
const unsigned char *old_record,
442
const unsigned char *new_record) const
444
initStatement(statement, message::Statement::UPDATE, in_session);
447
* Now we construct the specialized UpdateHeader message inside
448
* the generalized message::Statement container...
450
/* Set up the update header */
451
message::UpdateHeader *header= statement.mutable_update_header();
452
message::TableMetadata *table_metadata= header->mutable_table_metadata();
454
const char *schema_name= in_table->getShare()->db.str;
455
const char *table_name= in_table->getShare()->table_name.str;
457
table_metadata->set_schema_name(schema_name);
458
table_metadata->set_table_name(table_name);
460
Field *current_field;
461
Field **table_fields= in_table->field;
463
message::FieldMetadata *field_metadata;
465
/* We will read all the table's fields... */
466
in_table->setReadSet();
468
while ((current_field= *table_fields++) != NULL)
471
* We add the "key field metadata" -- i.e. the fields which is
472
* the primary key for the table.
474
if (in_table->s->fieldInPrimaryKey(current_field))
476
field_metadata= header->add_key_field_metadata();
477
field_metadata->set_name(current_field->field_name);
478
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
482
* The below really should be moved into the Field API and Record API. But for now
483
* we do this crazy pointer fiddling to figure out if the current field
484
* has been updated in the supplied record raw byte pointers.
486
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
487
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
489
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
491
if (memcmp(old_ptr, new_ptr, field_length) != 0)
493
/* Field is changed from old to new */
494
field_metadata= header->add_set_field_metadata();
495
field_metadata->set_name(current_field->field_name);
496
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
500
void ReplicationServices::updateRecord(Session *in_session,
502
const unsigned char *old_record,
503
const unsigned char *new_record)
508
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
510
message::UpdateData *data= statement.mutable_update_data();
511
data->set_segment_id(1);
512
data->set_end_segment(true);
513
message::UpdateRecord *record= data->add_record();
515
Field *current_field;
516
Field **table_fields= in_table->field;
517
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
518
string_value->set_charset(system_charset_info);
520
while ((current_field= *table_fields++) != NULL)
523
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
524
* but then realized that an UPDATE statement could potentially have different values for
525
* the SET field. For instance, imagine this SQL scenario:
527
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
528
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
529
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
531
* We will generate two UpdateRecord messages with different set_value byte arrays.
533
* The below really should be moved into the Field API and Record API. But for now
534
* we do this crazy pointer fiddling to figure out if the current field
535
* has been updated in the supplied record raw byte pointers.
537
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
538
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
540
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
542
if (memcmp(old_ptr, new_ptr, field_length) != 0)
544
/* Store the original "read bit" for this field */
545
bool is_read_set= current_field->isReadSet();
547
/* We need to mark that we will "read" this field... */
548
in_table->setReadSet(current_field->field_index);
550
/* Read the string value of this field's contents */
551
string_value= current_field->val_str(string_value);
554
* Reset the read bit after reading field to its original state. This
555
* prevents the field from being included in the WHERE clause
557
current_field->setReadSet(is_read_set);
559
record->add_after_value(string_value->c_ptr(), string_value->length());
560
string_value->free();
564
* Add the WHERE clause values now...for now, this means the
565
* primary key field value. Replication only supports tables
566
* with a primary key.
568
if (in_table->s->fieldInPrimaryKey(current_field))
571
* To say the below is ugly is an understatement. But it works.
573
* @todo Move this crap into a real Record API.
575
string_value= current_field->val_str(string_value,
577
current_field->offset(const_cast<unsigned char *>(new_record)));
578
record->add_key_value(string_value->c_ptr(), string_value->length());
579
string_value->free();
585
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
586
Table *in_table) const
588
message::Statement *statement= in_session->getStatementMessage();
590
* We check to see if the current Statement message is of type DELETE.
591
* If it is not, we finalize the current Statement and ensure a new
592
* DeleteStatement is created.
594
if (statement != NULL &&
595
statement->type() != message::Statement::DELETE)
597
finalizeStatement(*statement, in_session);
598
statement= in_session->getStatementMessage();
601
if (statement == NULL)
603
message::Transaction *transaction= getActiveTransaction(in_session);
605
* Transaction message initialized and set, but no statement created
606
* yet. We construct one and initialize it, here, then return the
607
* message after attaching the new Statement message pointer to the
608
* Session for easy retrieval later...
610
statement= transaction->add_statement();
611
setDeleteHeader(*statement, in_session, in_table);
612
in_session->setStatementMessage(statement);
617
void ReplicationServices::setDeleteHeader(message::Statement &statement,
619
Table *in_table) const
621
initStatement(statement, message::Statement::DELETE, in_session);
624
* Now we construct the specialized DeleteHeader message inside
625
* the generalized message::Statement container...
627
message::DeleteHeader *header= statement.mutable_delete_header();
628
message::TableMetadata *table_metadata= header->mutable_table_metadata();
630
const char *schema_name= in_table->getShare()->db.str;
631
const char *table_name= in_table->getShare()->table_name.str;
633
table_metadata->set_schema_name(schema_name);
634
table_metadata->set_table_name(table_name);
636
Field *current_field;
637
Field **table_fields= in_table->field;
639
message::FieldMetadata *field_metadata;
641
while ((current_field= *table_fields++) != NULL)
644
* Add the WHERE clause values now...for now, this means the
645
* primary key field value. Replication only supports tables
646
* with a primary key.
648
if (in_table->s->fieldInPrimaryKey(current_field))
650
field_metadata= header->add_key_field_metadata();
651
field_metadata->set_name(current_field->field_name);
652
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
657
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
662
message::Statement &statement= getDeleteStatement(in_session, in_table);
664
message::DeleteData *data= statement.mutable_delete_data();
665
data->set_segment_id(1);
666
data->set_end_segment(true);
667
message::DeleteRecord *record= data->add_record();
669
Field *current_field;
670
Field **table_fields= in_table->field;
671
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
672
string_value->set_charset(system_charset_info);
674
while ((current_field= *table_fields++) != NULL)
677
* Add the WHERE clause values now...for now, this means the
678
* primary key field value. Replication only supports tables
679
* with a primary key.
681
if (in_table->s->fieldInPrimaryKey(current_field))
683
string_value= current_field->val_str(string_value);
684
record->add_key_value(string_value->c_ptr(), string_value->length());
686
* @TODO Store optional old record value in the before data member
688
string_value->free();
693
void ReplicationServices::createTable(Session *in_session,
694
const message::Table &table)
699
message::Transaction *transaction= getActiveTransaction(in_session);
700
message::Statement *statement= transaction->add_statement();
702
initStatement(*statement, message::Statement::CREATE_TABLE, in_session);
705
* Construct the specialized CreateTableStatement message and attach
706
* it to the generic Statement message
708
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
709
message::Table *new_table_message= create_table_statement->mutable_table();
710
*new_table_message= table;
712
finalizeStatement(*statement, in_session);
714
finalizeTransaction(*transaction, in_session);
718
cleanupTransaction(transaction, in_session);
722
void ReplicationServices::createSchema(Session *in_session,
723
const message::Schema &schema)
728
message::Transaction *transaction= getActiveTransaction(in_session);
729
message::Statement *statement= transaction->add_statement();
731
initStatement(*statement, message::Statement::CREATE_SCHEMA, in_session);
734
* Construct the specialized CreateSchemaStatement message and attach
735
* it to the generic Statement message
737
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
738
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
739
*new_schema_message= schema;
741
finalizeStatement(*statement, in_session);
743
finalizeTransaction(*transaction, in_session);
747
cleanupTransaction(transaction, in_session);
751
void ReplicationServices::dropSchema(Session *in_session, const string &schema_name)
756
message::Transaction *transaction= getActiveTransaction(in_session);
757
message::Statement *statement= transaction->add_statement();
759
initStatement(*statement, message::Statement::DROP_SCHEMA, in_session);
762
* Construct the specialized DropSchemaStatement message and attach
763
* it to the generic Statement message
765
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
767
drop_schema_statement->set_schema_name(schema_name);
769
finalizeStatement(*statement, in_session);
771
finalizeTransaction(*transaction, in_session);
775
cleanupTransaction(transaction, in_session);
778
void ReplicationServices::dropTable(Session *in_session,
779
const string &schema_name,
780
const string &table_name,
786
message::Transaction *transaction= getActiveTransaction(in_session);
787
message::Statement *statement= transaction->add_statement();
789
initStatement(*statement, message::Statement::DROP_TABLE, in_session);
792
* Construct the specialized DropTableStatement message and attach
793
* it to the generic Statement message
795
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
797
drop_table_statement->set_if_exists_clause(if_exists);
799
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
801
table_metadata->set_schema_name(schema_name);
802
table_metadata->set_table_name(table_name);
804
finalizeStatement(*statement, in_session);
806
finalizeTransaction(*transaction, in_session);
810
cleanupTransaction(transaction, in_session);
813
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
818
message::Transaction *transaction= getActiveTransaction(in_session);
819
message::Statement *statement= transaction->add_statement();
821
initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
824
* Construct the specialized TruncateTableStatement message and attach
825
* it to the generic Statement message
827
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
828
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
830
const char *schema_name= in_table->getShare()->db.str;
831
const char *table_name= in_table->getShare()->table_name.str;
833
table_metadata->set_schema_name(schema_name);
834
table_metadata->set_table_name(table_name);
836
finalizeStatement(*statement, in_session);
838
finalizeTransaction(*transaction, in_session);
842
cleanupTransaction(transaction, in_session);
845
void ReplicationServices::rawStatement(Session *in_session, const string &query)
850
message::Transaction *transaction= getActiveTransaction(in_session);
851
message::Statement *statement= transaction->add_statement();
853
initStatement(*statement, message::Statement::RAW_SQL, in_session);
854
statement->set_sql(query);
855
finalizeStatement(*statement, in_session);
857
finalizeTransaction(*transaction, in_session);
861
cleanupTransaction(transaction, in_session);
864
void ReplicationServices::push(message::Transaction &to_push)
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
866
150
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
867
151
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;