26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* ReplicationServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/transaction.proto
45
* We really should store the raw bytes in the messages, not the
46
* String value of the Field. But, to do that, the
47
* statement_transform library needs first to be updated
48
* to include the transformation code to convert raw
49
* Drizzle-internal Field byte representation into something
50
* plugins can understand.
53
#include "drizzled/server_includes.h"
38
54
#include "drizzled/replication_services.h"
39
55
#include "drizzled/plugin/transaction_replicator.h"
40
56
#include "drizzled/plugin/transaction_applier.h"
41
57
#include "drizzled/message/transaction.pb.h"
58
#include "drizzled/message/table.pb.h"
42
59
#include "drizzled/gettext.h"
43
60
#include "drizzled/session.h"
44
#include "drizzled/error.h"
50
64
using namespace std;
55
ReplicationServices::ReplicationServices() :
60
void ReplicationServices::normalizeReplicatorName(string &name)
62
transform(name.begin(),
66
if (name.find("replicator") == string::npos)
67
name.append("replicator", 10);
69
size_t found_underscore= name.find('_');
70
while (found_underscore != string::npos)
72
name.erase(found_underscore, 1);
73
found_underscore= name.find('_');
78
bool ReplicationServices::evaluateRegisteredPlugins()
81
* We loop through appliers that have registered with us
82
* and attempts to pair the applier with its requested
83
* replicator. If an applier has requested a replicator
84
* that has either not been built or has not registered
85
* with the replication services, we print an error and
91
if (replicators.empty() && not appliers.empty())
93
errmsg_printf(ERRMSG_LVL_ERROR,
94
N_("You registered a TransactionApplier plugin but no "
95
"TransactionReplicator plugins were registered.\n"));
99
for (Appliers::iterator appl_iter= appliers.begin();
100
appl_iter != appliers.end();
103
plugin::TransactionApplier *applier= (*appl_iter).second;
104
string requested_replicator_name= (*appl_iter).first;
105
normalizeReplicatorName(requested_replicator_name);
108
Replicators::iterator repl_iter;
109
for (repl_iter= replicators.begin();
110
repl_iter != replicators.end();
113
string replicator_name= (*repl_iter)->getName();
114
normalizeReplicatorName(replicator_name);
116
if (requested_replicator_name.compare(replicator_name) == 0)
124
errmsg_printf(ERRMSG_LVL_ERROR,
125
N_("You registered a TransactionApplier plugin but no "
126
"TransactionReplicator plugins were registered that match the "
127
"requested replicator name of '%s'.\n"
128
"We have deactivated the TransactionApplier '%s'.\n"),
129
requested_replicator_name.c_str(),
130
applier->getName().c_str());
131
applier->deactivate();
136
replication_streams.push_back(make_pair(*repl_iter, applier));
69
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type);
71
ReplicationServices::ReplicationServices()
76
void ReplicationServices::evaluateActivePlugins()
79
* We loop through replicators and appliers, evaluating
80
* whether or not there is at least one active replicator
81
* and one active applier. If not, we set is_active
84
bool tmp_is_active= false;
86
if (replicators.empty() || appliers.empty())
93
* Determine if any remaining replicators and if those
94
* replicators are active...if not, set is_active
97
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
98
while (repl_iter != replicators.end())
100
if ((*repl_iter)->isEnabled())
109
/* No active replicators. Set is_active to false and exit. */
115
* OK, we know there's at least one active replicator.
117
* Now determine if any remaining replicators and if those
118
* replicators are active...if not, set is_active
121
vector<plugin::TransactionApplier *>::iterator appl_iter= appliers.begin();
122
while (appl_iter != appliers.end())
124
if ((*appl_iter)->isEnabled())
131
/* If we get here, there are no active appliers */
143
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
145
137
replicators.push_back(in_replicator);
138
evaluateActivePlugins();
148
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
150
143
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
155
appliers.push_back(make_pair(requested_replicator_name, in_applier));
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
144
evaluateActivePlugins();
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
149
appliers.push_back(in_applier);
150
evaluateActivePlugins();
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
155
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
156
evaluateActivePlugins();
162
159
bool ReplicationServices::isActive() const
164
161
return is_active;
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
message::Transaction &to_push)
170
plugin::ReplicationReturnCode result= plugin::SUCCESS;
172
for (ReplicationStreams::iterator iter= replication_streams.begin();
173
iter != replication_streams.end();
176
plugin::TransactionReplicator *cur_repl= (*iter).first;
177
plugin::TransactionApplier *cur_appl= (*iter).second;
179
result= cur_repl->replicate(cur_appl, in_session, to_push);
181
if (result == plugin::SUCCESS)
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
166
message::Transaction *transaction= in_session->getTransactionMessage();
168
if (unlikely(transaction == NULL))
171
* Allocate and initialize a new transaction message
172
* for this Session object. Session is responsible for
173
* deleting transaction message when done with it.
175
transaction= new (nothrow) message::Transaction();
176
initTransaction(*transaction, in_session);
177
in_session->setTransactionMessage(transaction);
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
Session *in_session) const
187
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
188
trx->set_server_id(in_session->getServerId());
189
trx->set_transaction_id(in_session->getTransactionId());
190
trx->set_start_timestamp(in_session->getCurrentTimestamp());
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
Session *in_session) const
196
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
trx->set_end_timestamp(in_session->getCurrentTimestamp());
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
Session *in_session) const
203
delete in_transaction;
204
in_session->setStatementMessage(NULL);
205
in_session->setTransactionMessage(NULL);
208
void ReplicationServices::startNormalTransaction(Session *in_session)
213
/* Safeguard...other transactions should have already been closed */
214
message::Transaction *transaction= in_session->getTransactionMessage();
215
assert(transaction == NULL);
218
* A "normal" transaction for the replication services component
219
* is simply a Transaction message, nothing more...so we create a
220
* new message and attach it to the Session object.
222
* Allocate and initialize a new transaction message
223
* for this Session object. This memory is deleted when the
224
* transaction is pushed out to replicators. Session is NOT
225
* responsible for deleting this memory.
227
transaction= new (nothrow) message::Transaction();
228
initTransaction(*transaction, in_session);
229
in_session->setTransactionMessage(transaction);
232
void ReplicationServices::commitNormalTransaction(Session *in_session)
237
/* If there is an active statement message, finalize it */
238
message::Statement *statement= in_session->getStatementMessage();
240
if (statement != NULL)
242
finalizeStatement(*statement, in_session);
245
return; /* No data modification occurred inside the transaction */
247
message::Transaction* transaction= getActiveTransaction(in_session);
249
finalizeTransaction(*transaction, in_session);
253
cleanupTransaction(transaction, in_session);
256
void ReplicationServices::initStatement(message::Statement &statement,
257
message::Statement::Type in_type,
258
Session *in_session) const
260
statement.set_type(in_type);
261
statement.set_start_timestamp(in_session->getCurrentTimestamp());
262
/** @TODO Set sql string optionally */
265
void ReplicationServices::finalizeStatement(message::Statement &statement,
266
Session *in_session) const
268
statement.set_end_timestamp(in_session->getCurrentTimestamp());
269
in_session->setStatementMessage(NULL);
272
void ReplicationServices::rollbackTransaction(Session *in_session)
277
message::Transaction *transaction= getActiveTransaction(in_session);
280
* We clear the current transaction, reset its transaction ID properly,
281
* then set its type to ROLLBACK and push it out to the replicators.
283
transaction->Clear();
284
initTransaction(*transaction, in_session);
286
message::Statement *statement= transaction->add_statement();
288
initStatement(*statement, message::Statement::ROLLBACK, in_session);
289
finalizeStatement(*statement, in_session);
291
finalizeTransaction(*transaction, in_session);
295
cleanupTransaction(transaction, in_session);
298
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
299
Table *in_table) const
301
message::Statement *statement= in_session->getStatementMessage();
303
if (statement == NULL)
305
message::Transaction *transaction= getActiveTransaction(in_session);
307
* Transaction message initialized and set, but no statement created
308
* yet. We construct one and initialize it, here, then return the
309
* message after attaching the new Statement message pointer to the
310
* Session for easy retrieval later...
312
statement= transaction->add_statement();
313
setInsertHeader(*statement, in_session, in_table);
314
in_session->setStatementMessage(statement);
319
void ReplicationServices::setInsertHeader(message::Statement &statement,
321
Table *in_table) const
323
initStatement(statement, message::Statement::INSERT, in_session);
326
* Now we construct the specialized InsertHeader message inside
327
* the generalized message::Statement container...
329
/* Set up the insert header */
330
message::InsertHeader *header= statement.mutable_insert_header();
331
message::TableMetadata *table_metadata= header->mutable_table_metadata();
333
const char *schema_name= in_table->getShare()->db.str;
334
const char *table_name= in_table->getShare()->table_name.str;
336
table_metadata->set_schema_name(schema_name);
337
table_metadata->set_table_name(table_name);
339
Field *current_field;
340
Field **table_fields= in_table->field;
342
message::FieldMetadata *field_metadata;
344
/* We will read all the table's fields... */
345
in_table->setReadSet();
347
while ((current_field= *table_fields++) != NULL)
349
field_metadata= header->add_field_metadata();
350
field_metadata->set_name(current_field->field_name);
351
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
355
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
360
message::Statement &statement= getInsertStatement(in_session, in_table);
362
message::InsertData *data= statement.mutable_insert_data();
363
data->set_segment_id(1);
364
data->set_end_segment(true);
365
message::InsertRecord *record= data->add_record();
367
Field *current_field;
368
Field **table_fields= in_table->field;
370
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
371
string_value->set_charset(system_charset_info);
373
/* We will read all the table's fields... */
374
in_table->setReadSet();
376
while ((current_field= *table_fields++) != NULL)
378
string_value= current_field->val_str(string_value);
379
record->add_insert_value(string_value->c_ptr(), string_value->length());
380
string_value->free();
384
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
386
const unsigned char *old_record,
387
const unsigned char *new_record) const
389
message::Statement *statement= in_session->getStatementMessage();
391
if (statement == NULL)
393
message::Transaction *transaction= getActiveTransaction(in_session);
395
* Transaction message initialized and set, but no statement created
396
* yet. We construct one and initialize it, here, then return the
397
* message after attaching the new Statement message pointer to the
398
* Session for easy retrieval later...
400
statement= transaction->add_statement();
401
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
402
in_session->setStatementMessage(statement);
407
void ReplicationServices::setUpdateHeader(message::Statement &statement,
410
const unsigned char *old_record,
411
const unsigned char *new_record) const
413
initStatement(statement, message::Statement::UPDATE, in_session);
416
* Now we construct the specialized UpdateHeader message inside
417
* the generalized message::Statement container...
419
/* Set up the update header */
420
message::UpdateHeader *header= statement.mutable_update_header();
421
message::TableMetadata *table_metadata= header->mutable_table_metadata();
423
const char *schema_name= in_table->getShare()->db.str;
424
const char *table_name= in_table->getShare()->table_name.str;
426
table_metadata->set_schema_name(schema_name);
427
table_metadata->set_table_name(table_name);
429
Field *current_field;
430
Field **table_fields= in_table->field;
431
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
432
string_value->set_charset(system_charset_info);
434
message::FieldMetadata *field_metadata;
436
/* We will read all the table's fields... */
437
in_table->setReadSet();
439
while ((current_field= *table_fields++) != NULL)
442
* We add the "key field metadata" -- i.e. the fields which is
443
* the primary key for the table.
445
if (in_table->s->primary_key == current_field->field_index)
447
field_metadata= header->add_key_field_metadata();
448
field_metadata->set_name(current_field->field_name);
449
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
453
* The below really should be moved into the Field API and Record API. But for now
454
* we do this crazy pointer fiddling to figure out if the current field
455
* has been updated in the supplied record raw byte pointers.
457
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
458
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
460
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
462
if (memcmp(old_ptr, new_ptr, field_length) != 0)
464
/* Field is changed from old to new */
465
field_metadata= header->add_set_field_metadata();
466
field_metadata->set_name(current_field->field_name);
467
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
471
void ReplicationServices::updateRecord(Session *in_session,
473
const unsigned char *old_record,
474
const unsigned char *new_record)
479
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
481
message::UpdateData *data= statement.mutable_update_data();
482
data->set_segment_id(1);
483
data->set_end_segment(true);
484
message::UpdateRecord *record= data->add_record();
486
Field *current_field;
487
Field **table_fields= in_table->field;
488
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
489
string_value->set_charset(system_charset_info);
491
while ((current_field= *table_fields++) != NULL)
494
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
495
* but then realized that an UPDATE statement could potentially have different values for
496
* the SET field. For instance, imagine this SQL scenario:
498
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
499
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
500
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
502
* We will generate two UpdateRecord messages with different set_value byte arrays.
504
* The below really should be moved into the Field API and Record API. But for now
505
* we do this crazy pointer fiddling to figure out if the current field
506
* has been updated in the supplied record raw byte pointers.
508
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
509
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
511
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
513
if (memcmp(old_ptr, new_ptr, field_length) != 0)
515
/* Store the original "read bit" for this field */
516
bool is_read_set= current_field->isReadSet();
518
/* We need to mark that we will "read" this field... */
519
in_table->setReadSet(current_field->field_index);
521
/* Read the string value of this field's contents */
522
string_value= current_field->val_str(string_value);
525
* Reset the read bit after reading field to its original state. This
526
* prevents the field from being included in the WHERE clause
528
current_field->setReadSet(is_read_set);
530
record->add_after_value(string_value->c_ptr(), string_value->length());
531
string_value->free();
535
* Add the WHERE clause values now...the fields which return true
536
* for isReadSet() are in the WHERE clause. For tables with no
537
* primary or unique key, all fields will be returned.
539
if (current_field->isReadSet())
541
string_value= current_field->val_str(string_value);
542
record->add_key_value(string_value->c_ptr(), string_value->length());
544
* @TODO Store optional old record value in the before data member
546
string_value->free();
552
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
553
Table *in_table) const
555
message::Statement *statement= in_session->getStatementMessage();
557
if (statement == NULL)
559
message::Transaction *transaction= getActiveTransaction(in_session);
561
* Transaction message initialized and set, but no statement created
562
* yet. We construct one and initialize it, here, then return the
563
* message after attaching the new Statement message pointer to the
564
* Session for easy retrieval later...
566
statement= transaction->add_statement();
567
setDeleteHeader(*statement, in_session, in_table);
568
in_session->setStatementMessage(statement);
573
void ReplicationServices::setDeleteHeader(message::Statement &statement,
575
Table *in_table) const
577
initStatement(statement, message::Statement::DELETE, in_session);
580
* Now we construct the specialized DeleteHeader message inside
581
* the generalized message::Statement container...
583
message::DeleteHeader *header= statement.mutable_delete_header();
584
message::TableMetadata *table_metadata= header->mutable_table_metadata();
586
const char *schema_name= in_table->getShare()->db.str;
587
const char *table_name= in_table->getShare()->table_name.str;
589
table_metadata->set_schema_name(schema_name);
590
table_metadata->set_table_name(table_name);
592
Field *current_field;
593
Field **table_fields= in_table->field;
595
message::FieldMetadata *field_metadata;
597
while ((current_field= *table_fields++) != NULL)
600
* Add the WHERE clause values now...for now, this means the
601
* primary key field value. Replication only supports tables
602
* with a primary key.
604
if (in_table->s->primary_key == current_field->field_index)
606
field_metadata= header->add_key_field_metadata();
607
field_metadata->set_name(current_field->field_name);
608
field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
613
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
618
message::Statement &statement= getDeleteStatement(in_session, in_table);
620
message::DeleteData *data= statement.mutable_delete_data();
621
data->set_segment_id(1);
622
data->set_end_segment(true);
623
message::DeleteRecord *record= data->add_record();
625
Field *current_field;
626
Field **table_fields= in_table->field;
627
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
628
string_value->set_charset(system_charset_info);
630
while ((current_field= *table_fields++) != NULL)
633
* Add the WHERE clause values now...for now, this means the
634
* primary key field value. Replication only supports tables
635
* with a primary key.
637
if (in_table->s->primary_key == current_field->field_index)
639
string_value= current_field->val_str(string_value);
640
record->add_key_value(string_value->c_ptr(), string_value->length());
642
* @TODO Store optional old record value in the before data member
644
string_value->free();
649
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
654
message::Transaction *transaction= getActiveTransaction(in_session);
655
message::Statement *statement= transaction->add_statement();
657
initStatement(*statement, message::Statement::RAW_SQL, in_session);
658
string query(in_query, in_query_len);
659
statement->set_sql(query);
660
finalizeStatement(*statement, in_session);
662
finalizeTransaction(*transaction, in_session);
666
cleanupTransaction(transaction, in_session);
669
void ReplicationServices::push(drizzled::message::Transaction &to_push)
671
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
672
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
673
appl_start_iter= appliers.begin();
675
plugin::TransactionReplicator *cur_repl;
676
plugin::TransactionApplier *cur_appl;
678
while (repl_iter != replicators.end())
680
cur_repl= *repl_iter;
681
if (! cur_repl->isEnabled())
687
appl_iter= appl_start_iter;
688
while (appl_iter != appliers.end())
690
cur_appl= *appl_iter;
692
if (! cur_appl->isEnabled())
698
cur_repl->replicate(cur_appl, to_push);
184
701
* We update the timestamp for the last applied Transaction so that
185
702
* publisher plugins can ask the replication services when the