38
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/replication.proto
41
* @see /drizzled/message/transaction.proto
45
* We really should store the raw bytes in the messages, not the
46
* String value of the Field. But, to do that, the
47
* statement_transform library needs first to be updated
48
* to include the transformation code to convert raw
49
* Drizzle-internal Field byte representation into something
50
* plugins can understand.
44
53
#include "drizzled/server_includes.h"
45
54
#include "drizzled/replication_services.h"
46
#include "drizzled/plugin/command_replicator.h"
47
#include "drizzled/plugin/command_applier.h"
48
#include "drizzled/message/replication.pb.h"
55
#include "drizzled/plugin/transaction_replicator.h"
56
#include "drizzled/plugin/transaction_applier.h"
57
#include "drizzled/message/transaction.pb.h"
49
58
#include "drizzled/message/table.pb.h"
50
59
#include "drizzled/gettext.h"
51
60
#include "drizzled/session.h"
121
132
is_active= false;
124
void ReplicationServices::attachReplicator(plugin::CommandReplicator *in_replicator)
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
126
137
replicators.push_back(in_replicator);
127
138
evaluateActivePlugins();
130
void ReplicationServices::detachReplicator(plugin::CommandReplicator *in_replicator)
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
132
143
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
133
144
evaluateActivePlugins();
136
void ReplicationServices::attachApplier(plugin::CommandApplier *in_applier)
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
138
149
appliers.push_back(in_applier);
139
150
evaluateActivePlugins();
142
void ReplicationServices::detachApplier(plugin::CommandApplier *in_applier)
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
144
155
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
145
156
evaluateActivePlugins();
150
161
return is_active;
153
void ReplicationServices::setCommandTransactionContext(message::Command &in_command,
154
Session *in_session) const
156
message::TransactionContext *trx= in_command.mutable_transaction_context();
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
166
message::Transaction *transaction= in_session->getTransactionMessage();
168
if (unlikely(transaction == NULL))
171
* Allocate and initialize a new transaction message
172
* for this Session object. Session is responsible for
173
* deleting transaction message when done with it.
175
transaction= new (nothrow) message::Transaction();
176
initTransaction(*transaction, in_session);
177
in_session->setTransactionMessage(transaction);
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
Session *in_session) const
187
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
157
188
trx->set_server_id(in_session->getServerId());
158
189
trx->set_transaction_id(in_session->getTransactionId());
160
in_command.set_session_id((uint32_t) in_session->getSessionId());
163
void ReplicationServices::startTransaction(Session *in_session)
168
message::Command command;
169
command.set_type(message::Command::START_TRANSACTION);
170
command.set_timestamp(in_session->getCurrentTimestamp());
172
setCommandTransactionContext(command, in_session);
177
void ReplicationServices::commitTransaction(Session *in_session)
182
message::Command command;
183
command.set_type(message::Command::COMMIT);
184
command.set_timestamp(in_session->getCurrentTimestamp());
186
setCommandTransactionContext(command, in_session);
190
trx->set_start_timestamp(in_session->getCurrentTimestamp());
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
Session *in_session) const
196
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
trx->set_end_timestamp(in_session->getCurrentTimestamp());
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
Session *in_session) const
203
delete in_transaction;
204
in_session->setStatementMessage(NULL);
205
in_session->setTransactionMessage(NULL);
208
void ReplicationServices::startNormalTransaction(Session *in_session)
213
/* Safeguard...other transactions should have already been closed */
214
message::Transaction *transaction= in_session->getTransactionMessage();
215
assert(transaction == NULL);
218
* A "normal" transaction for the replication services component
219
* is simply a Transaction message, nothing more...so we create a
220
* new message and attach it to the Session object.
222
* Allocate and initialize a new transaction message
223
* for this Session object. This memory is deleted when the
224
* transaction is pushed out to replicators. Session is NOT
225
* responsible for deleting this memory.
227
transaction= new (nothrow) message::Transaction();
228
initTransaction(*transaction, in_session);
229
in_session->setTransactionMessage(transaction);
232
void ReplicationServices::commitNormalTransaction(Session *in_session)
237
/* If there is an active statement message, finalize it */
238
message::Statement *statement= in_session->getStatementMessage();
240
if (statement != NULL)
242
finalizeStatement(*statement, in_session);
245
return; /* No data modification occurred inside the transaction */
247
message::Transaction* transaction= getActiveTransaction(in_session);
249
finalizeTransaction(*transaction, in_session);
253
cleanupTransaction(transaction, in_session);
256
void ReplicationServices::initStatement(message::Statement &statement,
257
message::Statement::Type in_type,
258
Session *in_session) const
260
statement.set_type(in_type);
261
statement.set_start_timestamp(in_session->getCurrentTimestamp());
262
/** @TODO Set sql string optionally */
265
void ReplicationServices::finalizeStatement(message::Statement &statement,
266
Session *in_session) const
268
statement.set_end_timestamp(in_session->getCurrentTimestamp());
269
in_session->setStatementMessage(NULL);
191
272
void ReplicationServices::rollbackTransaction(Session *in_session)
196
message::Command command;
197
command.set_type(message::Command::ROLLBACK);
198
command.set_timestamp(in_session->getCurrentTimestamp());
200
setCommandTransactionContext(command, in_session);
277
message::Transaction *transaction= getActiveTransaction(in_session);
280
* We clear the current transaction, reset its transaction ID properly,
281
* then set its type to ROLLBACK and push it out to the replicators.
283
transaction->Clear();
284
initTransaction(*transaction, in_session);
286
message::Statement *statement= transaction->add_statement();
288
initStatement(*statement, message::Statement::ROLLBACK, in_session);
289
finalizeStatement(*statement, in_session);
291
finalizeTransaction(*transaction, in_session);
295
cleanupTransaction(transaction, in_session);
298
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
299
Table *in_table) const
301
message::Statement *statement= in_session->getStatementMessage();
303
if (statement == NULL)
305
message::Transaction *transaction= getActiveTransaction(in_session);
307
* Transaction message initialized and set, but no statement created
308
* yet. We construct one and initialize it, here, then return the
309
* message after attaching the new Statement message pointer to the
310
* Session for easy retrieval later...
312
statement= transaction->add_statement();
313
setInsertHeader(*statement, in_session, in_table);
314
in_session->setStatementMessage(statement);
319
void ReplicationServices::setInsertHeader(message::Statement &statement,
321
Table *in_table) const
323
initStatement(statement, message::Statement::INSERT, in_session);
326
* Now we construct the specialized InsertHeader message inside
327
* the generalized message::Statement container...
329
/* Set up the insert header */
330
message::InsertHeader *header= statement.mutable_insert_header();
331
message::TableMetadata *table_metadata= header->mutable_table_metadata();
333
const char *schema_name= in_table->getShare()->db.str;
334
const char *table_name= in_table->getShare()->table_name.str;
336
table_metadata->set_schema_name(schema_name);
337
table_metadata->set_table_name(table_name);
339
Field *current_field;
340
Field **table_fields= in_table->field;
342
message::FieldMetadata *field_metadata;
344
/* We will read all the table's fields... */
345
in_table->setReadSet();
347
while ((current_field= *table_fields++) != NULL)
349
field_metadata= header->add_field_metadata();
350
field_metadata->set_name(current_field->field_name);
351
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
205
355
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
210
message::Command command;
211
command.set_type(message::Command::INSERT);
212
command.set_timestamp(in_session->getCurrentTimestamp());
214
setCommandTransactionContext(command, in_session);
216
const char *schema_name= in_table->getShare()->db.str;
217
const char *table_name= in_table->getShare()->table_name.str;
219
command.set_schema(schema_name);
220
command.set_table(table_name);
223
* Now we construct the specialized InsertRecord command inside
224
* the message::Command container...
226
message::InsertRecord *change_record= command.mutable_insert_record();
360
message::Statement &statement= getInsertStatement(in_session, in_table);
362
message::InsertData *data= statement.mutable_insert_data();
363
data->set_segment_id(1);
364
data->set_end_segment(true);
365
message::InsertRecord *record= data->add_record();
228
367
Field *current_field;
229
368
Field **table_fields= in_table->field;
230
370
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
231
371
string_value->set_charset(system_charset_info);
233
message::Table::Field *current_proto_field;
235
373
/* We will read all the table's fields... */
236
374
in_table->setReadSet();
238
376
while ((current_field= *table_fields++) != NULL)
240
current_proto_field= change_record->add_insert_field();
241
current_proto_field->set_name(current_field->field_name);
242
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
243
378
string_value= current_field->val_str(string_value);
244
change_record->add_insert_value(string_value->c_ptr());
379
record->add_insert_value(string_value->c_ptr());
245
380
string_value->free();
384
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
386
const unsigned char *old_record,
387
const unsigned char *new_record) const
389
message::Statement *statement= in_session->getStatementMessage();
391
if (statement == NULL)
393
message::Transaction *transaction= getActiveTransaction(in_session);
395
* Transaction message initialized and set, but no statement created
396
* yet. We construct one and initialize it, here, then return the
397
* message after attaching the new Statement message pointer to the
398
* Session for easy retrieval later...
400
statement= transaction->add_statement();
401
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
402
in_session->setStatementMessage(statement);
407
void ReplicationServices::setUpdateHeader(message::Statement &statement,
410
const unsigned char *old_record,
411
const unsigned char *new_record) const
413
initStatement(statement, message::Statement::UPDATE, in_session);
416
* Now we construct the specialized UpdateHeader message inside
417
* the generalized message::Statement container...
419
/* Set up the update header */
420
message::UpdateHeader *header= statement.mutable_update_header();
421
message::TableMetadata *table_metadata= header->mutable_table_metadata();
423
const char *schema_name= in_table->getShare()->db.str;
424
const char *table_name= in_table->getShare()->table_name.str;
426
table_metadata->set_schema_name(schema_name);
427
table_metadata->set_table_name(table_name);
429
Field *current_field;
430
Field **table_fields= in_table->field;
431
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
432
string_value->set_charset(system_charset_info);
434
message::FieldMetadata *field_metadata;
436
/* We will read all the table's fields... */
437
in_table->setReadSet();
439
while ((current_field= *table_fields++) != NULL)
442
* We add the "key field metadata" -- i.e. the fields which is
443
* the primary key for the table.
445
if (in_table->s->primary_key == current_field->field_index)
447
field_metadata= header->add_key_field_metadata();
448
field_metadata->set_name(current_field->field_name);
449
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
453
* The below really should be moved into the Field API and Record API. But for now
454
* we do this crazy pointer fiddling to figure out if the current field
455
* has been updated in the supplied record raw byte pointers.
457
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
458
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
460
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
462
if (memcmp(old_ptr, new_ptr, field_length) != 0)
464
/* Field is changed from old to new */
465
field_metadata= header->add_set_field_metadata();
466
field_metadata->set_name(current_field->field_name);
467
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
251
471
void ReplicationServices::updateRecord(Session *in_session,
253
473
const unsigned char *old_record,
259
message::Command command;
260
command.set_type(message::Command::UPDATE);
261
command.set_timestamp(in_session->getCurrentTimestamp());
263
setCommandTransactionContext(command, in_session);
265
const char *schema_name= in_table->getShare()->db.str;
266
const char *table_name= in_table->getShare()->table_name.str;
268
command.set_schema(schema_name);
269
command.set_table(table_name);
272
* Now we construct the specialized UpdateRecord command inside
273
* the message::Command container...
275
message::UpdateRecord *change_record= command.mutable_update_record();
479
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
481
message::UpdateData *data= statement.mutable_update_data();
482
data->set_segment_id(1);
483
data->set_end_segment(true);
484
message::UpdateRecord *record= data->add_record();
277
486
Field *current_field;
278
487
Field **table_fields= in_table->field;
279
488
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
280
489
string_value->set_charset(system_charset_info);
282
message::Table::Field *current_proto_field;
284
491
while ((current_field= *table_fields++) != NULL)
494
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
495
* but then realized that an UPDATE statement could potentially have different values for
496
* the SET field. For instance, imagine this SQL scenario:
498
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
499
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
500
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
502
* We will generate two UpdateRecord messages with different set_value byte arrays.
287
504
* The below really should be moved into the Field API and Record API. But for now
288
505
* we do this crazy pointer fiddling to figure out if the current field
289
506
* has been updated in the supplied record raw byte pointers.
327
539
if (current_field->isReadSet())
329
current_proto_field= change_record->add_where_field();
330
current_proto_field->set_name(current_field->field_name);
331
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
332
541
string_value= current_field->val_str(string_value);
333
change_record->add_where_value(string_value->c_ptr());
542
record->add_key_value(string_value->c_ptr());
544
* @TODO Store optional old record value in the before data member
334
546
string_value->free();
552
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
553
Table *in_table) const
555
message::Statement *statement= in_session->getStatementMessage();
557
if (statement == NULL)
559
message::Transaction *transaction= getActiveTransaction(in_session);
561
* Transaction message initialized and set, but no statement created
562
* yet. We construct one and initialize it, here, then return the
563
* message after attaching the new Statement message pointer to the
564
* Session for easy retrieval later...
566
statement= transaction->add_statement();
567
setDeleteHeader(*statement, in_session, in_table);
568
in_session->setStatementMessage(statement);
573
void ReplicationServices::setDeleteHeader(message::Statement &statement,
575
Table *in_table) const
577
initStatement(statement, message::Statement::DELETE, in_session);
580
* Now we construct the specialized DeleteHeader message inside
581
* the generalized message::Statement container...
583
message::DeleteHeader *header= statement.mutable_delete_header();
584
message::TableMetadata *table_metadata= header->mutable_table_metadata();
586
const char *schema_name= in_table->getShare()->db.str;
587
const char *table_name= in_table->getShare()->table_name.str;
589
table_metadata->set_schema_name(schema_name);
590
table_metadata->set_table_name(table_name);
592
Field *current_field;
593
Field **table_fields= in_table->field;
595
message::FieldMetadata *field_metadata;
597
while ((current_field= *table_fields++) != NULL)
600
* Add the WHERE clause values now...for now, this means the
601
* primary key field value. Replication only supports tables
602
* with a primary key.
604
if (in_table->s->primary_key == current_field->field_index)
606
field_metadata= header->add_key_field_metadata();
607
field_metadata->set_name(current_field->field_name);
608
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
341
613
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
346
message::Command command;
347
command.set_type(message::Command::DELETE);
348
command.set_timestamp(in_session->getCurrentTimestamp());
350
setCommandTransactionContext(command, in_session);
352
const char *schema_name= in_table->getShare()->db.str;
353
const char *table_name= in_table->getShare()->table_name.str;
355
command.set_schema(schema_name);
356
command.set_table(table_name);
359
* Now we construct the specialized DeleteRecord command inside
360
* the message::Command container...
362
message::DeleteRecord *change_record= command.mutable_delete_record();
618
message::Statement &statement= getDeleteStatement(in_session, in_table);
620
message::DeleteData *data= statement.mutable_delete_data();
621
data->set_segment_id(1);
622
data->set_end_segment(true);
623
message::DeleteRecord *record= data->add_record();
364
625
Field *current_field;
365
626
Field **table_fields= in_table->field;
366
627
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
367
628
string_value->set_charset(system_charset_info);
369
message::Table::Field *current_proto_field;
371
while ((current_field= *table_fields++) != NULL)
630
while ((current_field= *table_fields++) != NULL)
374
* Add the WHERE clause values now...the fields which return true
375
* for isReadSet() are in the WHERE clause. For tables with no
376
* primary or unique key, all fields will be returned.
633
* Add the WHERE clause values now...for now, this means the
634
* primary key field value. Replication only supports tables
635
* with a primary key.
378
if (current_field->isReadSet())
637
if (in_table->s->primary_key == current_field->field_index)
380
current_proto_field= change_record->add_where_field();
381
current_proto_field->set_name(current_field->field_name);
382
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
383
639
string_value= current_field->val_str(string_value);
384
change_record->add_where_value(string_value->c_ptr());
640
record->add_key_value(string_value->c_ptr());
642
* @TODO Store optional old record value in the before data member
385
644
string_value->free();
392
649
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
397
message::Command command;
398
command.set_type(message::Command::RAW_SQL);
399
command.set_timestamp(in_session->getCurrentTimestamp());
401
setCommandTransactionContext(command, in_session);
654
message::Transaction *transaction= getActiveTransaction(in_session);
655
message::Statement *statement= transaction->add_statement();
657
initStatement(*statement, message::Statement::RAW_SQL, in_session);
403
658
string query(in_query, in_query_len);
404
command.set_sql(query);
659
statement->set_sql(query);
660
finalizeStatement(*statement, in_session);
662
finalizeTransaction(*transaction, in_session);
666
cleanupTransaction(transaction, in_session);
409
void ReplicationServices::push(drizzled::message::Command &to_push)
669
void ReplicationServices::push(drizzled::message::Transaction &to_push)
411
vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
412
vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
671
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
672
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
413
673
appl_start_iter= appliers.begin();
415
plugin::CommandReplicator *cur_repl;
416
plugin::CommandApplier *cur_appl;
675
plugin::TransactionReplicator *cur_repl;
676
plugin::TransactionApplier *cur_appl;
418
678
while (repl_iter != replicators.end())
438
698
cur_repl->replicate(cur_appl, to_push);
441
* We update the timestamp for the last applied Command so that
701
* We update the timestamp for the last applied Transaction so that
442
702
* publisher plugins can ask the replication services when the
443
* last known applied Command was using the getLastAppliedTimestamp()
703
* last known applied Transaction was using the getLastAppliedTimestamp()
446
last_applied_timestamp.fetch_and_store(to_push.timestamp());
706
last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
713
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type)
717
case DRIZZLE_TYPE_LONGLONG:
718
return message::Table::Field::BIGINT;
719
case DRIZZLE_TYPE_LONG:
720
return message::Table::Field::INTEGER;
721
case DRIZZLE_TYPE_NEWDECIMAL:
722
return message::Table::Field::DECIMAL;
723
case DRIZZLE_TYPE_DOUBLE:
724
return message::Table::Field::DOUBLE;
725
case DRIZZLE_TYPE_DATE:
726
return message::Table::Field::DATE;
727
case DRIZZLE_TYPE_DATETIME:
728
return message::Table::Field::DATETIME;
729
case DRIZZLE_TYPE_TIMESTAMP:
730
return message::Table::Field::TIMESTAMP;
731
case DRIZZLE_TYPE_VARCHAR:
732
return message::Table::Field::VARCHAR;
734
return message::Table::Field::VARCHAR;
453
738
} /* namespace drizzled */