26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* ReplicationServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/replication.proto
44
#include "drizzled/server_includes.h"
38
45
#include "drizzled/replication_services.h"
39
#include "drizzled/plugin/transaction_replicator.h"
40
#include "drizzled/plugin/transaction_applier.h"
41
#include "drizzled/message/transaction.pb.h"
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
48
#include "drizzled/message/replication.pb.h"
42
49
#include "drizzled/message/table.pb.h"
43
#include "drizzled/message/statement_transform.h"
44
50
#include "drizzled/gettext.h"
45
51
#include "drizzled/session.h"
46
#include "drizzled/error.h"
52
#include "drizzled/plugin/registry.h"
56
using namespace drizzled;
58
ReplicationServices replication_services;
60
void add_replicator(plugin::Replicator *replicator)
62
replication_services.attachReplicator(replicator);
65
void remove_replicator(plugin::Replicator *replicator)
67
replication_services.detachReplicator(replicator);
70
void add_applier(plugin::Applier *applier)
72
replication_services.attachApplier(applier);
75
void remove_applier(plugin::Applier *applier)
77
replication_services.detachApplier(applier);
145
173
return is_active;
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
150
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
151
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
176
void ReplicationServices::setCommandTransactionContext(message::Command *in_command
177
, Session *in_session) const
179
message::TransactionContext *trx= in_command->mutable_transaction_context();
180
trx->set_server_id(in_session->getServerId());
181
trx->set_transaction_id(in_session->getTransactionId());
184
void ReplicationServices::startTransaction(Session *in_session)
189
message::Command command;
190
command.set_type(message::Command::START_TRANSACTION);
191
command.set_timestamp(in_session->getCurrentTimestamp());
193
setCommandTransactionContext(&command, in_session);
198
void ReplicationServices::commitTransaction(Session *in_session)
203
message::Command command;
204
command.set_type(message::Command::COMMIT);
205
command.set_timestamp(in_session->getCurrentTimestamp());
207
setCommandTransactionContext(&command, in_session);
212
void ReplicationServices::rollbackTransaction(Session *in_session)
217
message::Command command;
218
command.set_type(message::Command::ROLLBACK);
219
command.set_timestamp(in_session->getCurrentTimestamp());
221
setCommandTransactionContext(&command, in_session);
226
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
231
message::Command command;
232
command.set_type(message::Command::INSERT);
233
command.set_timestamp(in_session->getCurrentTimestamp());
235
setCommandTransactionContext(&command, in_session);
237
const char *schema_name= in_table->getShare()->db.str;
238
const char *table_name= in_table->getShare()->table_name.str;
240
command.set_schema(schema_name);
241
command.set_table(table_name);
244
* Now we construct the specialized InsertRecord command inside
245
* the message::Command container...
247
message::InsertRecord *change_record= command.mutable_insert_record();
249
Field *current_field;
250
Field **table_fields= in_table->field;
251
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
252
string_value->set_charset(system_charset_info);
254
message::Table::Field *current_proto_field;
256
/* We will read all the table's fields... */
257
in_table->setReadSet();
259
while ((current_field= *table_fields++) != NULL)
261
current_proto_field= change_record->add_insert_field();
262
current_proto_field->set_name(std::string(current_field->field_name));
263
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
264
string_value= current_field->val_str(string_value);
265
change_record->add_insert_value(std::string(string_value->c_ptr()));
266
string_value->free();
272
void ReplicationServices::updateRecord(Session *in_session,
274
const unsigned char *old_record,
275
const unsigned char *new_record)
280
message::Command command;
281
command.set_type(message::Command::UPDATE);
282
command.set_timestamp(in_session->getCurrentTimestamp());
284
setCommandTransactionContext(&command, in_session);
286
const char *schema_name= in_table->getShare()->db.str;
287
const char *table_name= in_table->getShare()->table_name.str;
289
command.set_schema(schema_name);
290
command.set_table(table_name);
293
* Now we construct the specialized UpdateRecord command inside
294
* the message::Command container...
296
message::UpdateRecord *change_record= command.mutable_update_record();
298
Field *current_field;
299
Field **table_fields= in_table->field;
300
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
301
string_value->set_charset(system_charset_info);
303
message::Table::Field *current_proto_field;
305
while ((current_field= *table_fields++) != NULL)
308
* The below really should be moved into the Field API and Record API. But for now
309
* we do this crazy pointer fiddling to figure out if the current field
310
* has been updated in the supplied record raw byte pointers.
312
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
313
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
315
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
317
if (memcmp(old_ptr, new_ptr, field_length) != 0)
319
/* Field is changed from old to new */
320
current_proto_field= change_record->add_update_field();
321
current_proto_field->set_name(std::string(current_field->field_name));
322
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
324
/* Store the original "read bit" for this field */
325
bool is_read_set= current_field->isReadSet();
327
/* We need to mark that we will "read" this field... */
328
in_table->setReadSet(current_field->field_index);
330
/* Read the string value of this field's contents */
331
string_value= current_field->val_str(string_value);
334
* Reset the read bit after reading field to its original state. This
335
* prevents the field from being included in the WHERE clause
337
current_field->setReadSet(is_read_set);
339
change_record->add_after_value(std::string(string_value->c_ptr()));
340
string_value->free();
344
* Add the WHERE clause values now...the fields which return true
345
* for isReadSet() are in the WHERE clause. For tables with no
346
* primary or unique key, all fields will be returned.
348
if (current_field->isReadSet())
350
current_proto_field= change_record->add_where_field();
351
current_proto_field->set_name(std::string(current_field->field_name));
352
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
353
string_value= current_field->val_str(string_value);
354
change_record->add_where_value(std::string(string_value->c_ptr()));
355
string_value->free();
362
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
367
message::Command command;
368
command.set_type(message::Command::DELETE);
369
command.set_timestamp(in_session->getCurrentTimestamp());
371
setCommandTransactionContext(&command, in_session);
373
const char *schema_name= in_table->getShare()->db.str;
374
const char *table_name= in_table->getShare()->table_name.str;
376
command.set_schema(schema_name);
377
command.set_table(table_name);
380
* Now we construct the specialized DeleteRecord command inside
381
* the message::Command container...
383
message::DeleteRecord *change_record= command.mutable_delete_record();
385
Field *current_field;
386
Field **table_fields= in_table->field;
387
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
388
string_value->set_charset(system_charset_info);
390
message::Table::Field *current_proto_field;
392
while ((current_field= *table_fields++) != NULL)
395
* Add the WHERE clause values now...the fields which return true
396
* for isReadSet() are in the WHERE clause. For tables with no
397
* primary or unique key, all fields will be returned.
399
if (current_field->isReadSet())
401
current_proto_field= change_record->add_where_field();
402
current_proto_field->set_name(std::string(current_field->field_name));
403
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
404
string_value= current_field->val_str(string_value);
405
change_record->add_where_value(std::string(string_value->c_ptr()));
406
string_value->free();
413
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
418
message::Command command;
419
command.set_type(message::Command::RAW_SQL);
420
command.set_timestamp(in_session->getCurrentTimestamp());
422
setCommandTransactionContext(&command, in_session);
424
std::string query(in_query, in_query_len);
425
command.set_sql(query);
430
void ReplicationServices::push(message::Command *to_push)
432
std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
433
std::vector<plugin::Applier *>::iterator appl_start_iter, appl_iter;
152
434
appl_start_iter= appliers.begin();
154
plugin::TransactionReplicator *cur_repl;
155
plugin::TransactionApplier *cur_appl;
436
plugin::Replicator *cur_repl;
437
plugin::Applier *cur_appl;
157
439
while (repl_iter != replicators.end())
159
441
cur_repl= *repl_iter;
160
if (! cur_repl->isEnabled())
442
if (! cur_repl->isActive())