26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* ReplicationServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/replication.proto
44
#include "drizzled/server_includes.h"
38
45
#include "drizzled/replication_services.h"
39
#include "drizzled/plugin/transaction_replicator.h"
40
#include "drizzled/plugin/transaction_applier.h"
41
#include "drizzled/message/transaction.pb.h"
46
#include "drizzled/plugin/command_replicator.h"
47
#include "drizzled/plugin/command_applier.h"
48
#include "drizzled/message/replication.pb.h"
49
#include "drizzled/message/table.pb.h"
42
50
#include "drizzled/gettext.h"
43
51
#include "drizzled/session.h"
44
#include "drizzled/error.h"
50
55
using namespace std;
55
ReplicationServices::ReplicationServices() :
60
void ReplicationServices::normalizeReplicatorName(string &name)
62
transform(name.begin(),
66
if (name.find("replicator") == string::npos)
67
name.append("replicator", 10);
69
size_t found_underscore= name.find('_');
70
while (found_underscore != string::npos)
72
name.erase(found_underscore, 1);
73
found_underscore= name.find('_');
78
bool ReplicationServices::evaluateRegisteredPlugins()
81
* We loop through appliers that have registered with us
82
* and attempts to pair the applier with its requested
83
* replicator. If an applier has requested a replicator
84
* that has either not been built or has not registered
85
* with the replication services, we print an error and
91
if (replicators.empty() && not appliers.empty())
93
errmsg_printf(error::ERROR,
94
N_("You registered a TransactionApplier plugin but no "
95
"TransactionReplicator plugins were registered.\n"));
99
for (Appliers::iterator appl_iter= appliers.begin();
100
appl_iter != appliers.end();
103
plugin::TransactionApplier *applier= (*appl_iter).second;
104
string requested_replicator_name= (*appl_iter).first;
105
normalizeReplicatorName(requested_replicator_name);
108
Replicators::iterator repl_iter;
109
for (repl_iter= replicators.begin();
110
repl_iter != replicators.end();
113
string replicator_name= (*repl_iter)->getName();
114
normalizeReplicatorName(replicator_name);
116
if (requested_replicator_name.compare(replicator_name) == 0)
124
errmsg_printf(error::ERROR,
125
N_("You registered a TransactionApplier plugin but no "
126
"TransactionReplicator plugins were registered that match the "
127
"requested replicator name of '%s'.\n"
128
"We have deactivated the TransactionApplier '%s'.\n"),
129
requested_replicator_name.c_str(),
130
applier->getName().c_str());
131
applier->deactivate();
136
replication_streams.push_back(make_pair(*repl_iter, applier));
143
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
56
using namespace drizzled;
58
ReplicationServices::ReplicationServices()
63
void ReplicationServices::evaluateActivePlugins()
66
* We loop through replicators and appliers, evaluating
67
* whether or not there is at least one active replicator
68
* and one active applier. If not, we set is_active
71
bool tmp_is_active= false;
73
if (replicators.empty() || appliers.empty())
80
* Determine if any remaining replicators and if those
81
* replicators are active...if not, set is_active
84
vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
85
while (repl_iter != replicators.end())
87
if ((*repl_iter)->isActive())
96
/* No active replicators. Set is_active to false and exit. */
102
* OK, we know there's at least one active replicator.
104
* Now determine if any remaining replicators and if those
105
* replicators are active...if not, set is_active
108
vector<plugin::CommandApplier *>::iterator appl_iter= appliers.begin();
109
while (appl_iter != appliers.end())
111
if ((*appl_iter)->isActive())
118
/* If we get here, there are no active appliers */
122
void ReplicationServices::attachReplicator(plugin::CommandReplicator *in_replicator)
145
124
replicators.push_back(in_replicator);
125
evaluateActivePlugins();
148
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
128
void ReplicationServices::detachReplicator(plugin::CommandReplicator *in_replicator)
150
130
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
155
appliers.push_back(make_pair(requested_replicator_name, in_applier));
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
131
evaluateActivePlugins();
134
void ReplicationServices::attachApplier(plugin::CommandApplier *in_applier)
136
appliers.push_back(in_applier);
137
evaluateActivePlugins();
140
void ReplicationServices::detachApplier(plugin::CommandApplier *in_applier)
142
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
143
evaluateActivePlugins();
162
146
bool ReplicationServices::isActive() const
164
148
return is_active;
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
message::Transaction &to_push)
170
plugin::ReplicationReturnCode result= plugin::SUCCESS;
172
for (ReplicationStreams::iterator iter= replication_streams.begin();
173
iter != replication_streams.end();
176
plugin::TransactionReplicator *cur_repl= (*iter).first;
177
plugin::TransactionApplier *cur_appl= (*iter).second;
179
result= cur_repl->replicate(cur_appl, in_session, to_push);
181
if (result == plugin::SUCCESS)
184
* We update the timestamp for the last applied Transaction so that
151
void ReplicationServices::setCommandTransactionContext(message::Command &in_command,
152
Session *in_session) const
154
message::TransactionContext *trx= in_command.mutable_transaction_context();
155
trx->set_server_id(in_session->getServerId());
156
trx->set_transaction_id(in_session->getTransactionId());
158
in_command.set_session_id((uint32_t) in_session->getSessionId());
161
void ReplicationServices::startTransaction(Session *in_session)
166
message::Command command;
167
command.set_type(message::Command::START_TRANSACTION);
168
command.set_timestamp(in_session->getCurrentTimestamp());
170
setCommandTransactionContext(command, in_session);
175
void ReplicationServices::commitTransaction(Session *in_session)
180
message::Command command;
181
command.set_type(message::Command::COMMIT);
182
command.set_timestamp(in_session->getCurrentTimestamp());
184
setCommandTransactionContext(command, in_session);
189
void ReplicationServices::rollbackTransaction(Session *in_session)
194
message::Command command;
195
command.set_type(message::Command::ROLLBACK);
196
command.set_timestamp(in_session->getCurrentTimestamp());
198
setCommandTransactionContext(command, in_session);
203
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
208
message::Command command;
209
command.set_type(message::Command::INSERT);
210
command.set_timestamp(in_session->getCurrentTimestamp());
212
setCommandTransactionContext(command, in_session);
214
const char *schema_name= in_table->getShare()->db.str;
215
const char *table_name= in_table->getShare()->table_name.str;
217
command.set_schema(schema_name);
218
command.set_table(table_name);
221
* Now we construct the specialized InsertRecord command inside
222
* the message::Command container...
224
message::InsertRecord *change_record= command.mutable_insert_record();
226
Field *current_field;
227
Field **table_fields= in_table->field;
228
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
229
string_value->set_charset(system_charset_info);
231
message::Table::Field *current_proto_field;
233
/* We will read all the table's fields... */
234
in_table->setReadSet();
236
while ((current_field= *table_fields++) != NULL)
238
current_proto_field= change_record->add_insert_field();
239
current_proto_field->set_name(current_field->field_name);
240
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
241
string_value= current_field->val_str(string_value);
242
change_record->add_insert_value(string_value->c_ptr());
243
string_value->free();
249
void ReplicationServices::updateRecord(Session *in_session,
251
const unsigned char *old_record,
252
const unsigned char *new_record)
257
message::Command command;
258
command.set_type(message::Command::UPDATE);
259
command.set_timestamp(in_session->getCurrentTimestamp());
261
setCommandTransactionContext(command, in_session);
263
const char *schema_name= in_table->getShare()->db.str;
264
const char *table_name= in_table->getShare()->table_name.str;
266
command.set_schema(schema_name);
267
command.set_table(table_name);
270
* Now we construct the specialized UpdateRecord command inside
271
* the message::Command container...
273
message::UpdateRecord *change_record= command.mutable_update_record();
275
Field *current_field;
276
Field **table_fields= in_table->field;
277
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
278
string_value->set_charset(system_charset_info);
280
message::Table::Field *current_proto_field;
282
while ((current_field= *table_fields++) != NULL)
285
* The below really should be moved into the Field API and Record API. But for now
286
* we do this crazy pointer fiddling to figure out if the current field
287
* has been updated in the supplied record raw byte pointers.
289
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
290
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
292
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
294
if (memcmp(old_ptr, new_ptr, field_length) != 0)
296
/* Field is changed from old to new */
297
current_proto_field= change_record->add_update_field();
298
current_proto_field->set_name(current_field->field_name);
299
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
301
/* Store the original "read bit" for this field */
302
bool is_read_set= current_field->isReadSet();
304
/* We need to mark that we will "read" this field... */
305
in_table->setReadSet(current_field->field_index);
307
/* Read the string value of this field's contents */
308
string_value= current_field->val_str(string_value);
311
* Reset the read bit after reading field to its original state. This
312
* prevents the field from being included in the WHERE clause
314
current_field->setReadSet(is_read_set);
316
change_record->add_after_value(string_value->c_ptr());
317
string_value->free();
321
* Add the WHERE clause values now...the fields which return true
322
* for isReadSet() are in the WHERE clause. For tables with no
323
* primary or unique key, all fields will be returned.
325
if (current_field->isReadSet())
327
current_proto_field= change_record->add_where_field();
328
current_proto_field->set_name(current_field->field_name);
329
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
330
string_value= current_field->val_str(string_value);
331
change_record->add_where_value(string_value->c_ptr());
332
string_value->free();
339
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
344
message::Command command;
345
command.set_type(message::Command::DELETE);
346
command.set_timestamp(in_session->getCurrentTimestamp());
348
setCommandTransactionContext(command, in_session);
350
const char *schema_name= in_table->getShare()->db.str;
351
const char *table_name= in_table->getShare()->table_name.str;
353
command.set_schema(schema_name);
354
command.set_table(table_name);
357
* Now we construct the specialized DeleteRecord command inside
358
* the message::Command container...
360
message::DeleteRecord *change_record= command.mutable_delete_record();
362
Field *current_field;
363
Field **table_fields= in_table->field;
364
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
365
string_value->set_charset(system_charset_info);
367
message::Table::Field *current_proto_field;
369
while ((current_field= *table_fields++) != NULL)
372
* Add the WHERE clause values now...the fields which return true
373
* for isReadSet() are in the WHERE clause. For tables with no
374
* primary or unique key, all fields will be returned.
376
if (current_field->isReadSet())
378
current_proto_field= change_record->add_where_field();
379
current_proto_field->set_name(current_field->field_name);
380
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
381
string_value= current_field->val_str(string_value);
382
change_record->add_where_value(string_value->c_ptr());
383
string_value->free();
390
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
395
message::Command command;
396
command.set_type(message::Command::RAW_SQL);
397
command.set_timestamp(in_session->getCurrentTimestamp());
399
setCommandTransactionContext(command, in_session);
401
string query(in_query, in_query_len);
402
command.set_sql(query);
407
void ReplicationServices::push(drizzled::message::Command &to_push)
409
vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
410
vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
411
appl_start_iter= appliers.begin();
413
plugin::CommandReplicator *cur_repl;
414
plugin::CommandApplier *cur_appl;
416
while (repl_iter != replicators.end())
418
cur_repl= *repl_iter;
419
if (! cur_repl->isActive())
425
appl_iter= appl_start_iter;
426
while (appl_iter != appliers.end())
428
cur_appl= *appl_iter;
430
if (! cur_appl->isActive())
436
cur_repl->replicate(cur_appl, to_push);
439
* We update the timestamp for the last applied Command so that
185
440
* publisher plugins can ask the replication services when the
186
* last known applied Transaction was using the getLastAppliedTimestamp()
441
* last known applied Command was using the getLastAppliedTimestamp()
189
last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
444
last_applied_timestamp.fetch_and_store(to_push.timestamp());
197
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
199
return replication_streams;
202
} /* namespace drizzled */