26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
38
#include <drizzled/replication_services.h>
39
#include <drizzled/plugin/transaction_replicator.h>
40
#include <drizzled/plugin/transaction_applier.h>
41
#include <drizzled/message/transaction.pb.h>
42
#include <drizzled/gettext.h>
43
#include <drizzled/session.h>
44
#include <drizzled/error.h>
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* ReplicationServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/transaction.proto
45
* We really should store the raw bytes in the messages, not the
46
* String value of the Field. But, to do that, the
47
* statement_transform library needs first to be updated
48
* to include the transformation code to convert raw
49
* Drizzle-internal Field byte representation into something
50
* plugins can understand.
53
#include "drizzled/server_includes.h"
54
#include "drizzled/replication_services.h"
55
#include "drizzled/plugin/transaction_replicator.h"
56
#include "drizzled/plugin/transaction_applier.h"
57
#include "drizzled/message/transaction.pb.h"
58
#include "drizzled/message/table.pb.h"
59
#include "drizzled/gettext.h"
60
#include "drizzled/session.h"
50
64
using namespace std;
55
ReplicationServices::ReplicationServices() :
69
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type);
71
ReplicationServices::ReplicationServices()
60
void ReplicationServices::normalizeReplicatorName(string &name)
76
void ReplicationServices::evaluateActivePlugins()
62
transform(name.begin(),
66
if (name.find("replicator") == string::npos)
67
name.append("replicator", 10);
69
size_t found_underscore= name.find('_');
70
while (found_underscore != string::npos)
79
* We loop through replicators and appliers, evaluating
80
* whether or not there is at least one active replicator
81
* and one active applier. If not, we set is_active
84
bool tmp_is_active= false;
86
if (replicators.empty() || appliers.empty())
93
* Determine if any remaining replicators and if those
94
* replicators are active...if not, set is_active
97
for (Replicators::iterator repl_iter= replicators.begin();
98
repl_iter != replicators.end();
101
if ((*repl_iter)->isEnabled())
72
name.erase(found_underscore, 1);
73
found_underscore= name.find('_');
109
/* No active replicators. Set is_active to false and exit. */
78
bool ReplicationServices::evaluateRegisteredPlugins()
81
* We loop through appliers that have registered with us
82
* and attempts to pair the applier with its requested
83
* replicator. If an applier has requested a replicator
84
* that has either not been built or has not registered
85
* with the replication services, we print an error and
115
* OK, we know there's at least one active replicator.
117
* Now determine if any remaining replicators and if those
118
* replicators are active...if not, set is_active
91
if (replicators.empty() && not appliers.empty())
93
errmsg_printf(error::ERROR,
94
N_("You registered a TransactionApplier plugin but no "
95
"TransactionReplicator plugins were registered.\n"));
99
121
for (Appliers::iterator appl_iter= appliers.begin();
100
122
appl_iter != appliers.end();
103
plugin::TransactionApplier *applier= (*appl_iter).second;
104
string requested_replicator_name= (*appl_iter).first;
105
normalizeReplicatorName(requested_replicator_name);
108
Replicators::iterator repl_iter;
109
for (repl_iter= replicators.begin();
110
repl_iter != replicators.end();
113
string replicator_name= (*repl_iter)->getName();
114
normalizeReplicatorName(replicator_name);
116
if (requested_replicator_name.compare(replicator_name) == 0)
124
errmsg_printf(error::ERROR,
125
N_("You registered a TransactionApplier plugin but no "
126
"TransactionReplicator plugins were registered that match the "
127
"requested replicator name of '%s'.\n"
128
"We have deactivated the TransactionApplier '%s'.\n"),
129
requested_replicator_name.c_str(),
130
applier->getName().c_str());
131
applier->deactivate();
136
replication_streams.push_back(make_pair(*repl_iter, applier));
125
if ((*appl_iter)->isEnabled())
131
/* If we get here, there are no active appliers */
143
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
145
137
replicators.push_back(in_replicator);
138
evaluateActivePlugins();
148
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
150
143
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
155
appliers.push_back(make_pair(requested_replicator_name, in_applier));
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
144
evaluateActivePlugins();
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
149
appliers.push_back(in_applier);
150
evaluateActivePlugins();
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
155
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
156
evaluateActivePlugins();
162
159
bool ReplicationServices::isActive() const
164
161
return is_active;
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
message::Transaction &to_push)
170
plugin::ReplicationReturnCode result= plugin::SUCCESS;
172
for (ReplicationStreams::iterator iter= replication_streams.begin();
173
iter != replication_streams.end();
176
plugin::TransactionReplicator *cur_repl= (*iter).first;
177
plugin::TransactionApplier *cur_appl= (*iter).second;
179
result= cur_repl->replicate(cur_appl, in_session, to_push);
181
if (result == plugin::SUCCESS)
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
166
message::Transaction *transaction= in_session->getTransactionMessage();
168
if (unlikely(transaction == NULL))
171
* Allocate and initialize a new transaction message
172
* for this Session object. Session is responsible for
173
* deleting transaction message when done with it.
175
transaction= new (nothrow) message::Transaction();
176
initTransaction(*transaction, in_session);
177
in_session->setTransactionMessage(transaction);
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
Session *in_session) const
187
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
188
trx->set_server_id(in_session->getServerId());
189
trx->set_transaction_id(in_session->getTransactionId());
190
trx->set_start_timestamp(in_session->getCurrentTimestamp());
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
Session *in_session) const
196
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
trx->set_end_timestamp(in_session->getCurrentTimestamp());
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
Session *in_session) const
203
delete in_transaction;
204
in_session->setStatementMessage(NULL);
205
in_session->setTransactionMessage(NULL);
208
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
210
size_t num_statements= transaction.statement_size();
211
if (num_statements == 0)
215
* Only INSERT, UPDATE, and DELETE statements can possibly
216
* have bulk segments. So, we loop through the statements
217
* checking for segment_id > 1 in those specific submessages.
220
for (x= 0; x < num_statements; ++x)
222
const message::Statement &statement= transaction.statement(x);
223
message::Statement::Type type= statement.type();
227
case message::Statement::INSERT:
228
if (statement.insert_data().segment_id() > 1)
231
case message::Statement::UPDATE:
232
if (statement.update_data().segment_id() > 1)
235
case message::Statement::DELETE:
236
if (statement.delete_data().segment_id() > 1)
245
void ReplicationServices::commitTransaction(Session *in_session)
250
/* If there is an active statement message, finalize it */
251
message::Statement *statement= in_session->getStatementMessage();
253
if (statement != NULL)
255
finalizeStatement(*statement, in_session);
258
return; /* No data modification occurred inside the transaction */
260
message::Transaction* transaction= getActiveTransaction(in_session);
262
finalizeTransaction(*transaction, in_session);
266
cleanupTransaction(transaction, in_session);
269
void ReplicationServices::initStatement(message::Statement &statement,
270
message::Statement::Type in_type,
271
Session *in_session) const
273
statement.set_type(in_type);
274
statement.set_start_timestamp(in_session->getCurrentTimestamp());
275
/** @TODO Set sql string optionally */
278
void ReplicationServices::finalizeStatement(message::Statement &statement,
279
Session *in_session) const
281
statement.set_end_timestamp(in_session->getCurrentTimestamp());
282
in_session->setStatementMessage(NULL);
285
void ReplicationServices::rollbackTransaction(Session *in_session)
290
message::Transaction *transaction= getActiveTransaction(in_session);
293
* OK, so there are two situations that we need to deal with here:
295
* 1) We receive an instruction to ROLLBACK the current transaction
296
* and the currently-stored Transaction message is *self-contained*,
297
* meaning that no Statement messages in the Transaction message
298
* contain a message having its segment_id member greater than 1. If
299
* no non-segment ID 1 members are found, we can simply clear the
300
* current Transaction message and remove it from memory.
302
* 2) If the Transaction message does indeed have a non-end segment, that
303
* means that a bulk update/delete/insert Transaction message segment
304
* has previously been sent over the wire to replicators. In this case,
305
* we need to package a Transaction with a Statement message of type
306
* ROLLBACK to indicate to replicators that previously-transmitted
307
* messages must be un-applied.
309
if (unlikely(transactionContainsBulkSegment(*transaction)))
312
* Clear the transaction, create a Rollback statement message,
313
* attach it to the transaction, and push it to replicators.
315
transaction->Clear();
316
initTransaction(*transaction, in_session);
318
message::Statement *statement= transaction->add_statement();
320
initStatement(*statement, message::Statement::ROLLBACK, in_session);
321
finalizeStatement(*statement, in_session);
323
finalizeTransaction(*transaction, in_session);
327
cleanupTransaction(transaction, in_session);
330
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
331
Table *in_table) const
333
message::Statement *statement= in_session->getStatementMessage();
335
if (statement == NULL)
337
message::Transaction *transaction= getActiveTransaction(in_session);
339
* Transaction message initialized and set, but no statement created
340
* yet. We construct one and initialize it, here, then return the
341
* message after attaching the new Statement message pointer to the
342
* Session for easy retrieval later...
344
statement= transaction->add_statement();
345
setInsertHeader(*statement, in_session, in_table);
346
in_session->setStatementMessage(statement);
351
void ReplicationServices::setInsertHeader(message::Statement &statement,
353
Table *in_table) const
355
initStatement(statement, message::Statement::INSERT, in_session);
358
* Now we construct the specialized InsertHeader message inside
359
* the generalized message::Statement container...
361
/* Set up the insert header */
362
message::InsertHeader *header= statement.mutable_insert_header();
363
message::TableMetadata *table_metadata= header->mutable_table_metadata();
365
const char *schema_name= in_table->getShare()->db.str;
366
const char *table_name= in_table->getShare()->table_name.str;
368
table_metadata->set_schema_name(schema_name);
369
table_metadata->set_table_name(table_name);
371
Field *current_field;
372
Field **table_fields= in_table->field;
374
message::FieldMetadata *field_metadata;
376
/* We will read all the table's fields... */
377
in_table->setReadSet();
379
while ((current_field= *table_fields++) != NULL)
381
field_metadata= header->add_field_metadata();
382
field_metadata->set_name(current_field->field_name);
383
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
387
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
392
message::Statement &statement= getInsertStatement(in_session, in_table);
394
message::InsertData *data= statement.mutable_insert_data();
395
data->set_segment_id(1);
396
data->set_end_segment(true);
397
message::InsertRecord *record= data->add_record();
399
Field *current_field;
400
Field **table_fields= in_table->field;
402
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
403
string_value->set_charset(system_charset_info);
405
/* We will read all the table's fields... */
406
in_table->setReadSet();
408
while ((current_field= *table_fields++) != NULL)
410
string_value= current_field->val_str(string_value);
411
record->add_insert_value(string_value->c_ptr(), string_value->length());
412
string_value->free();
416
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
418
const unsigned char *old_record,
419
const unsigned char *new_record) const
421
message::Statement *statement= in_session->getStatementMessage();
423
if (statement == NULL)
425
message::Transaction *transaction= getActiveTransaction(in_session);
427
* Transaction message initialized and set, but no statement created
428
* yet. We construct one and initialize it, here, then return the
429
* message after attaching the new Statement message pointer to the
430
* Session for easy retrieval later...
432
statement= transaction->add_statement();
433
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
434
in_session->setStatementMessage(statement);
439
void ReplicationServices::setUpdateHeader(message::Statement &statement,
442
const unsigned char *old_record,
443
const unsigned char *new_record) const
445
initStatement(statement, message::Statement::UPDATE, in_session);
448
* Now we construct the specialized UpdateHeader message inside
449
* the generalized message::Statement container...
451
/* Set up the update header */
452
message::UpdateHeader *header= statement.mutable_update_header();
453
message::TableMetadata *table_metadata= header->mutable_table_metadata();
455
const char *schema_name= in_table->getShare()->db.str;
456
const char *table_name= in_table->getShare()->table_name.str;
458
table_metadata->set_schema_name(schema_name);
459
table_metadata->set_table_name(table_name);
461
Field *current_field;
462
Field **table_fields= in_table->field;
463
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
464
string_value->set_charset(system_charset_info);
466
message::FieldMetadata *field_metadata;
468
/* We will read all the table's fields... */
469
in_table->setReadSet();
471
while ((current_field= *table_fields++) != NULL)
474
* We add the "key field metadata" -- i.e. the fields which is
475
* the primary key for the table.
477
if (in_table->s->primary_key == current_field->field_index)
479
field_metadata= header->add_key_field_metadata();
480
field_metadata->set_name(current_field->field_name);
481
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
485
* The below really should be moved into the Field API and Record API. But for now
486
* we do this crazy pointer fiddling to figure out if the current field
487
* has been updated in the supplied record raw byte pointers.
489
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
490
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
492
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
494
if (memcmp(old_ptr, new_ptr, field_length) != 0)
496
/* Field is changed from old to new */
497
field_metadata= header->add_set_field_metadata();
498
field_metadata->set_name(current_field->field_name);
499
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
503
void ReplicationServices::updateRecord(Session *in_session,
505
const unsigned char *old_record,
506
const unsigned char *new_record)
511
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
513
message::UpdateData *data= statement.mutable_update_data();
514
data->set_segment_id(1);
515
data->set_end_segment(true);
516
message::UpdateRecord *record= data->add_record();
518
Field *current_field;
519
Field **table_fields= in_table->field;
520
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
521
string_value->set_charset(system_charset_info);
523
while ((current_field= *table_fields++) != NULL)
526
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
527
* but then realized that an UPDATE statement could potentially have different values for
528
* the SET field. For instance, imagine this SQL scenario:
530
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
531
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
532
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
534
* We will generate two UpdateRecord messages with different set_value byte arrays.
536
* The below really should be moved into the Field API and Record API. But for now
537
* we do this crazy pointer fiddling to figure out if the current field
538
* has been updated in the supplied record raw byte pointers.
540
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
541
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
543
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
545
if (memcmp(old_ptr, new_ptr, field_length) != 0)
547
/* Store the original "read bit" for this field */
548
bool is_read_set= current_field->isReadSet();
550
/* We need to mark that we will "read" this field... */
551
in_table->setReadSet(current_field->field_index);
553
/* Read the string value of this field's contents */
554
string_value= current_field->val_str(string_value);
557
* Reset the read bit after reading field to its original state. This
558
* prevents the field from being included in the WHERE clause
560
current_field->setReadSet(is_read_set);
562
record->add_after_value(string_value->c_ptr(), string_value->length());
563
string_value->free();
567
* Add the WHERE clause values now...for now, this means the
568
* primary key field value. Replication only supports tables
569
* with a primary key.
571
if (in_table->s->primary_key == current_field->field_index)
574
* To say the below is ugly is an understatement. But it works.
576
* @todo Move this crap into a real Record API.
578
string_value= current_field->val_str(string_value,
580
current_field->offset(const_cast<unsigned char *>(new_record)));
581
record->add_key_value(string_value->c_ptr(), string_value->length());
582
string_value->free();
588
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
589
Table *in_table) const
591
message::Statement *statement= in_session->getStatementMessage();
593
if (statement == NULL)
595
message::Transaction *transaction= getActiveTransaction(in_session);
597
* Transaction message initialized and set, but no statement created
598
* yet. We construct one and initialize it, here, then return the
599
* message after attaching the new Statement message pointer to the
600
* Session for easy retrieval later...
602
statement= transaction->add_statement();
603
setDeleteHeader(*statement, in_session, in_table);
604
in_session->setStatementMessage(statement);
609
void ReplicationServices::setDeleteHeader(message::Statement &statement,
611
Table *in_table) const
613
initStatement(statement, message::Statement::DELETE, in_session);
616
* Now we construct the specialized DeleteHeader message inside
617
* the generalized message::Statement container...
619
message::DeleteHeader *header= statement.mutable_delete_header();
620
message::TableMetadata *table_metadata= header->mutable_table_metadata();
622
const char *schema_name= in_table->getShare()->db.str;
623
const char *table_name= in_table->getShare()->table_name.str;
625
table_metadata->set_schema_name(schema_name);
626
table_metadata->set_table_name(table_name);
628
Field *current_field;
629
Field **table_fields= in_table->field;
631
message::FieldMetadata *field_metadata;
633
while ((current_field= *table_fields++) != NULL)
636
* Add the WHERE clause values now...for now, this means the
637
* primary key field value. Replication only supports tables
638
* with a primary key.
640
if (in_table->s->primary_key == current_field->field_index)
642
field_metadata= header->add_key_field_metadata();
643
field_metadata->set_name(current_field->field_name);
644
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
649
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
654
message::Statement &statement= getDeleteStatement(in_session, in_table);
656
message::DeleteData *data= statement.mutable_delete_data();
657
data->set_segment_id(1);
658
data->set_end_segment(true);
659
message::DeleteRecord *record= data->add_record();
661
Field *current_field;
662
Field **table_fields= in_table->field;
663
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
664
string_value->set_charset(system_charset_info);
666
while ((current_field= *table_fields++) != NULL)
669
* Add the WHERE clause values now...for now, this means the
670
* primary key field value. Replication only supports tables
671
* with a primary key.
673
if (in_table->s->primary_key == current_field->field_index)
675
string_value= current_field->val_str(string_value);
676
record->add_key_value(string_value->c_ptr(), string_value->length());
678
* @TODO Store optional old record value in the before data member
680
string_value->free();
685
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
690
message::Transaction *transaction= getActiveTransaction(in_session);
691
message::Statement *statement= transaction->add_statement();
693
initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
696
* Construct the specialized TruncateTableStatement message and attach
697
* it to the generic Statement message
699
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
700
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
702
const char *schema_name= in_table->getShare()->db.str;
703
const char *table_name= in_table->getShare()->table_name.str;
705
table_metadata->set_schema_name(schema_name);
706
table_metadata->set_table_name(table_name);
708
finalizeStatement(*statement, in_session);
710
finalizeTransaction(*transaction, in_session);
714
cleanupTransaction(transaction, in_session);
717
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
722
message::Transaction *transaction= getActiveTransaction(in_session);
723
message::Statement *statement= transaction->add_statement();
725
initStatement(*statement, message::Statement::RAW_SQL, in_session);
726
string query(in_query, in_query_len);
727
statement->set_sql(query);
728
finalizeStatement(*statement, in_session);
730
finalizeTransaction(*transaction, in_session);
734
cleanupTransaction(transaction, in_session);
737
void ReplicationServices::push(drizzled::message::Transaction &to_push)
739
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
740
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
741
appl_start_iter= appliers.begin();
743
plugin::TransactionReplicator *cur_repl;
744
plugin::TransactionApplier *cur_appl;
746
while (repl_iter != replicators.end())
748
cur_repl= *repl_iter;
749
if (! cur_repl->isEnabled())
755
appl_iter= appl_start_iter;
756
while (appl_iter != appliers.end())
758
cur_appl= *appl_iter;
760
if (! cur_appl->isEnabled())
766
cur_repl->replicate(cur_appl, to_push);
184
769
* We update the timestamp for the last applied Transaction so that
185
770
* publisher plugins can ask the replication services when the