26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* ReplicationServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/replication.proto
44
#include "drizzled/server_includes.h"
38
45
#include "drizzled/replication_services.h"
39
#include "drizzled/plugin/transaction_replicator.h"
40
#include "drizzled/plugin/transaction_applier.h"
41
#include "drizzled/message/transaction.pb.h"
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
48
#include "drizzled/message/replication.pb.h"
49
#include "drizzled/message/table.pb.h"
42
50
#include "drizzled/gettext.h"
43
51
#include "drizzled/session.h"
44
#include "drizzled/error.h"
52
#include "drizzled/plugin_registry.h"
56
using namespace drizzled;
58
ReplicationServices replication_services;
60
void add_replicator(plugin::Replicator *replicator)
62
replication_services.attachReplicator(replicator);
65
void remove_replicator(plugin::Replicator *replicator)
67
replication_services.detachReplicator(replicator);
70
void add_applier(plugin::Applier *applier)
72
replication_services.attachApplier(applier);
75
void remove_applier(plugin::Applier *applier)
77
replication_services.detachApplier(applier);
55
ReplicationServices::ReplicationServices() :
60
void ReplicationServices::normalizeReplicatorName(string &name)
62
transform(name.begin(),
66
if (name.find("replicator") == string::npos)
67
name.append("replicator", 10);
69
size_t found_underscore= name.find('_');
70
while (found_underscore != string::npos)
72
name.erase(found_underscore, 1);
73
found_underscore= name.find('_');
78
bool ReplicationServices::evaluateRegisteredPlugins()
81
* We loop through appliers that have registered with us
82
* and attempts to pair the applier with its requested
83
* replicator. If an applier has requested a replicator
84
* that has either not been built or has not registered
85
* with the replication services, we print an error and
91
if (replicators.empty() && not appliers.empty())
93
errmsg_printf(error::ERROR,
94
N_("You registered a TransactionApplier plugin but no "
95
"TransactionReplicator plugins were registered.\n"));
99
for (Appliers::iterator appl_iter= appliers.begin();
100
appl_iter != appliers.end();
103
plugin::TransactionApplier *applier= (*appl_iter).second;
104
string requested_replicator_name= (*appl_iter).first;
105
normalizeReplicatorName(requested_replicator_name);
108
Replicators::iterator repl_iter;
109
for (repl_iter= replicators.begin();
110
repl_iter != replicators.end();
113
string replicator_name= (*repl_iter)->getName();
114
normalizeReplicatorName(replicator_name);
116
if (requested_replicator_name.compare(replicator_name) == 0)
124
errmsg_printf(error::ERROR,
125
N_("You registered a TransactionApplier plugin but no "
126
"TransactionReplicator plugins were registered that match the "
127
"requested replicator name of '%s'.\n"
128
"We have deactivated the TransactionApplier '%s'.\n"),
129
requested_replicator_name.c_str(),
130
applier->getName().c_str());
131
applier->deactivate();
136
replication_streams.push_back(make_pair(*repl_iter, applier));
143
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
83
ReplicationServices::ReplicationServices()
88
void ReplicationServices::evaluateActivePlugins()
91
* We loop through replicators and appliers, evaluating
92
* whether or not there is at least one active replicator
93
* and one active applier. If not, we set is_active
96
bool tmp_is_active= false;
98
if (replicators.size() == 0 || appliers.size() == 0)
105
* Determine if any remaining replicators and if those
106
* replicators are active...if not, set is_active
109
std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
110
while (repl_iter != replicators.end())
112
if ((*repl_iter)->isActive())
121
/* No active replicators. Set is_active to false and exit. */
127
* OK, we know there's at least one active replicator.
129
* Now determine if any remaining replicators and if those
130
* replicators are active...if not, set is_active
133
std::vector<plugin::Applier *>::iterator appl_iter= appliers.begin();
134
while (appl_iter != appliers.end())
136
if ((*appl_iter)->isActive())
143
/* If we get here, there are no active appliers */
147
void ReplicationServices::attachReplicator(plugin::Replicator *in_replicator)
145
149
replicators.push_back(in_replicator);
150
evaluateActivePlugins();
148
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
153
void ReplicationServices::detachReplicator(plugin::Replicator *in_replicator)
150
155
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
155
appliers.push_back(make_pair(requested_replicator_name, in_applier));
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
156
evaluateActivePlugins();
159
void ReplicationServices::attachApplier(plugin::Applier *in_applier)
161
appliers.push_back(in_applier);
162
evaluateActivePlugins();
165
void ReplicationServices::detachApplier(plugin::Applier *in_applier)
167
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
168
evaluateActivePlugins();
162
171
bool ReplicationServices::isActive() const
164
173
return is_active;
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
message::Transaction &to_push)
170
plugin::ReplicationReturnCode result= plugin::SUCCESS;
172
for (ReplicationStreams::iterator iter= replication_streams.begin();
173
iter != replication_streams.end();
176
plugin::TransactionReplicator *cur_repl= (*iter).first;
177
plugin::TransactionApplier *cur_appl= (*iter).second;
179
result= cur_repl->replicate(cur_appl, in_session, to_push);
181
if (result == plugin::SUCCESS)
184
* We update the timestamp for the last applied Transaction so that
176
void ReplicationServices::setCommandTransactionContext(message::Command *in_command
177
, Session *in_session) const
179
message::TransactionContext *trx= in_command->mutable_transaction_context();
180
trx->set_server_id(in_session->getServerId());
181
trx->set_transaction_id(in_session->getTransactionId());
184
void ReplicationServices::startTransaction(Session *in_session)
189
message::Command command;
190
command.set_type(message::Command::START_TRANSACTION);
191
command.set_timestamp(in_session->getCurrentTimestamp());
193
setCommandTransactionContext(&command, in_session);
198
void ReplicationServices::commitTransaction(Session *in_session)
203
message::Command command;
204
command.set_type(message::Command::COMMIT);
205
command.set_timestamp(in_session->getCurrentTimestamp());
207
setCommandTransactionContext(&command, in_session);
212
void ReplicationServices::rollbackTransaction(Session *in_session)
217
message::Command command;
218
command.set_type(message::Command::ROLLBACK);
219
command.set_timestamp(in_session->getCurrentTimestamp());
221
setCommandTransactionContext(&command, in_session);
226
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
231
message::Command command;
232
command.set_type(message::Command::INSERT);
233
command.set_timestamp(in_session->getCurrentTimestamp());
235
setCommandTransactionContext(&command, in_session);
237
const char *schema_name= in_table->getShare()->db.str;
238
const char *table_name= in_table->getShare()->table_name.str;
240
command.set_schema(schema_name);
241
command.set_table(table_name);
244
* Now we construct the specialized InsertRecord command inside
245
* the message::Command container...
247
message::InsertRecord *change_record= command.mutable_insert_record();
249
Field *current_field;
250
Field **table_fields= in_table->field;
251
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
252
string_value->set_charset(system_charset_info);
254
message::Table::Field *current_proto_field;
256
/* We will read all the table's fields... */
257
in_table->setReadSet();
259
while ((current_field= *table_fields++) != NULL)
261
current_proto_field= change_record->add_insert_field();
262
current_proto_field->set_name(std::string(current_field->field_name));
263
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
264
string_value= current_field->val_str(string_value);
265
change_record->add_insert_value(std::string(string_value->c_ptr()));
266
string_value->free();
272
void ReplicationServices::updateRecord(Session *in_session,
274
const unsigned char *old_record,
275
const unsigned char *new_record)
280
message::Command command;
281
command.set_type(message::Command::UPDATE);
282
command.set_timestamp(in_session->getCurrentTimestamp());
284
setCommandTransactionContext(&command, in_session);
286
const char *schema_name= in_table->getShare()->db.str;
287
const char *table_name= in_table->getShare()->table_name.str;
289
command.set_schema(schema_name);
290
command.set_table(table_name);
293
* Now we construct the specialized UpdateRecord command inside
294
* the message::Command container...
296
message::UpdateRecord *change_record= command.mutable_update_record();
298
Field *current_field;
299
Field **table_fields= in_table->field;
300
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
301
string_value->set_charset(system_charset_info);
303
message::Table::Field *current_proto_field;
305
while ((current_field= *table_fields++) != NULL)
308
* The below really should be moved into the Field API and Record API. But for now
309
* we do this crazy pointer fiddling to figure out if the current field
310
* has been updated in the supplied record raw byte pointers.
312
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
313
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
315
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
317
if (memcmp(old_ptr, new_ptr, field_length) != 0)
319
/* Field is changed from old to new */
320
current_proto_field= change_record->add_update_field();
321
current_proto_field->set_name(std::string(current_field->field_name));
322
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
324
/* Store the original "read bit" for this field */
325
bool is_read_set= current_field->isReadSet();
327
/* We need to mark that we will "read" this field... */
328
in_table->setReadSet(current_field->field_index);
330
/* Read the string value of this field's contents */
331
string_value= current_field->val_str(string_value);
334
* Reset the read bit after reading field to its original state. This
335
* prevents the field from being included in the WHERE clause
337
current_field->setReadSet(is_read_set);
339
change_record->add_after_value(std::string(string_value->c_ptr()));
340
string_value->free();
344
* Add the WHERE clause values now...the fields which return true
345
* for isReadSet() are in the WHERE clause. For tables with no
346
* primary or unique key, all fields will be returned.
348
if (current_field->isReadSet())
350
current_proto_field= change_record->add_where_field();
351
current_proto_field->set_name(std::string(current_field->field_name));
352
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
353
string_value= current_field->val_str(string_value);
354
change_record->add_where_value(std::string(string_value->c_ptr()));
355
string_value->free();
362
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
367
message::Command command;
368
command.set_type(message::Command::DELETE);
369
command.set_timestamp(in_session->getCurrentTimestamp());
371
setCommandTransactionContext(&command, in_session);
373
const char *schema_name= in_table->getShare()->db.str;
374
const char *table_name= in_table->getShare()->table_name.str;
376
command.set_schema(schema_name);
377
command.set_table(table_name);
380
* Now we construct the specialized DeleteRecord command inside
381
* the message::Command container...
383
message::DeleteRecord *change_record= command.mutable_delete_record();
385
Field *current_field;
386
Field **table_fields= in_table->field;
387
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
388
string_value->set_charset(system_charset_info);
390
message::Table::Field *current_proto_field;
392
while ((current_field= *table_fields++) != NULL)
395
* Add the WHERE clause values now...the fields which return true
396
* for isReadSet() are in the WHERE clause. For tables with no
397
* primary or unique key, all fields will be returned.
399
if (current_field->isReadSet())
401
current_proto_field= change_record->add_where_field();
402
current_proto_field->set_name(std::string(current_field->field_name));
403
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
404
string_value= current_field->val_str(string_value);
405
change_record->add_where_value(std::string(string_value->c_ptr()));
406
string_value->free();
413
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
418
message::Command command;
419
command.set_type(message::Command::RAW_SQL);
420
command.set_timestamp(in_session->getCurrentTimestamp());
422
setCommandTransactionContext(&command, in_session);
424
std::string query(in_query, in_query_len);
425
command.set_sql(query);
430
void ReplicationServices::push(message::Command *to_push)
432
std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
433
std::vector<plugin::Applier *>::iterator appl_start_iter, appl_iter;
434
appl_start_iter= appliers.begin();
436
plugin::Replicator *cur_repl;
437
plugin::Applier *cur_appl;
439
while (repl_iter != replicators.end())
441
cur_repl= *repl_iter;
442
if (! cur_repl->isActive())
448
appl_iter= appl_start_iter;
449
while (appl_iter != appliers.end())
451
cur_appl= *appl_iter;
453
if (! cur_appl->isActive())
459
cur_repl->replicate(cur_appl, to_push);
462
* We update the timestamp for the last applied Command so that
185
463
* publisher plugins can ask the replication services when the
186
* last known applied Transaction was using the getLastAppliedTimestamp()
464
* last known applied Command was using the getLastAppliedTimestamp()
189
last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
467
last_applied_timestamp.fetch_and_store(to_push->timestamp());
197
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
199
return replication_streams;
202
} /* namespace drizzled */
474
} /* end namespace drizzled */