26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* ReplicationServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/replication.proto
44
#include "drizzled/server_includes.h"
38
45
#include "drizzled/replication_services.h"
39
#include "drizzled/plugin/transaction_replicator.h"
40
#include "drizzled/plugin/transaction_applier.h"
41
#include "drizzled/message/transaction.pb.h"
46
#include "drizzled/plugin/command_replicator.h"
47
#include "drizzled/plugin/command_applier.h"
48
#include "drizzled/message/replication.pb.h"
42
49
#include "drizzled/message/table.pb.h"
43
#include "drizzled/message/statement_transform.h"
44
50
#include "drizzled/gettext.h"
45
51
#include "drizzled/session.h"
46
#include "drizzled/error.h"
50
55
using namespace std;
56
using namespace drizzled;
55
58
ReplicationServices::ReplicationServices()
145
148
return is_active;
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
150
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
151
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
151
void ReplicationServices::setCommandTransactionContext(message::Command &in_command,
152
Session *in_session) const
154
message::TransactionContext *trx= in_command.mutable_transaction_context();
155
trx->set_server_id(in_session->getServerId());
156
trx->set_transaction_id(in_session->getTransactionId());
158
in_command.set_session_id((uint32_t) in_session->getSessionId());
161
void ReplicationServices::startTransaction(Session *in_session)
166
message::Command command;
167
command.set_type(message::Command::START_TRANSACTION);
168
command.set_timestamp(in_session->getCurrentTimestamp());
170
setCommandTransactionContext(command, in_session);
175
void ReplicationServices::commitTransaction(Session *in_session)
180
message::Command command;
181
command.set_type(message::Command::COMMIT);
182
command.set_timestamp(in_session->getCurrentTimestamp());
184
setCommandTransactionContext(command, in_session);
189
void ReplicationServices::rollbackTransaction(Session *in_session)
194
message::Command command;
195
command.set_type(message::Command::ROLLBACK);
196
command.set_timestamp(in_session->getCurrentTimestamp());
198
setCommandTransactionContext(command, in_session);
203
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
208
message::Command command;
209
command.set_type(message::Command::INSERT);
210
command.set_timestamp(in_session->getCurrentTimestamp());
212
setCommandTransactionContext(command, in_session);
214
const char *schema_name= in_table->getShare()->db.str;
215
const char *table_name= in_table->getShare()->table_name.str;
217
command.set_schema(schema_name);
218
command.set_table(table_name);
221
* Now we construct the specialized InsertRecord command inside
222
* the message::Command container...
224
message::InsertRecord *change_record= command.mutable_insert_record();
226
Field *current_field;
227
Field **table_fields= in_table->field;
228
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
229
string_value->set_charset(system_charset_info);
231
message::Table::Field *current_proto_field;
233
/* We will read all the table's fields... */
234
in_table->setReadSet();
236
while ((current_field= *table_fields++) != NULL)
238
current_proto_field= change_record->add_insert_field();
239
current_proto_field->set_name(current_field->field_name);
240
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
241
string_value= current_field->val_str(string_value);
242
change_record->add_insert_value(string_value->c_ptr());
243
string_value->free();
249
void ReplicationServices::updateRecord(Session *in_session,
251
const unsigned char *old_record,
252
const unsigned char *new_record)
257
message::Command command;
258
command.set_type(message::Command::UPDATE);
259
command.set_timestamp(in_session->getCurrentTimestamp());
261
setCommandTransactionContext(command, in_session);
263
const char *schema_name= in_table->getShare()->db.str;
264
const char *table_name= in_table->getShare()->table_name.str;
266
command.set_schema(schema_name);
267
command.set_table(table_name);
270
* Now we construct the specialized UpdateRecord command inside
271
* the message::Command container...
273
message::UpdateRecord *change_record= command.mutable_update_record();
275
Field *current_field;
276
Field **table_fields= in_table->field;
277
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
278
string_value->set_charset(system_charset_info);
280
message::Table::Field *current_proto_field;
282
while ((current_field= *table_fields++) != NULL)
285
* The below really should be moved into the Field API and Record API. But for now
286
* we do this crazy pointer fiddling to figure out if the current field
287
* has been updated in the supplied record raw byte pointers.
289
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
290
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
292
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
294
if (memcmp(old_ptr, new_ptr, field_length) != 0)
296
/* Field is changed from old to new */
297
current_proto_field= change_record->add_update_field();
298
current_proto_field->set_name(current_field->field_name);
299
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
301
/* Store the original "read bit" for this field */
302
bool is_read_set= current_field->isReadSet();
304
/* We need to mark that we will "read" this field... */
305
in_table->setReadSet(current_field->field_index);
307
/* Read the string value of this field's contents */
308
string_value= current_field->val_str(string_value);
311
* Reset the read bit after reading field to its original state. This
312
* prevents the field from being included in the WHERE clause
314
current_field->setReadSet(is_read_set);
316
change_record->add_after_value(string_value->c_ptr());
317
string_value->free();
321
* Add the WHERE clause values now...the fields which return true
322
* for isReadSet() are in the WHERE clause. For tables with no
323
* primary or unique key, all fields will be returned.
325
if (current_field->isReadSet())
327
current_proto_field= change_record->add_where_field();
328
current_proto_field->set_name(current_field->field_name);
329
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
330
string_value= current_field->val_str(string_value);
331
change_record->add_where_value(string_value->c_ptr());
332
string_value->free();
339
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
344
message::Command command;
345
command.set_type(message::Command::DELETE);
346
command.set_timestamp(in_session->getCurrentTimestamp());
348
setCommandTransactionContext(command, in_session);
350
const char *schema_name= in_table->getShare()->db.str;
351
const char *table_name= in_table->getShare()->table_name.str;
353
command.set_schema(schema_name);
354
command.set_table(table_name);
357
* Now we construct the specialized DeleteRecord command inside
358
* the message::Command container...
360
message::DeleteRecord *change_record= command.mutable_delete_record();
362
Field *current_field;
363
Field **table_fields= in_table->field;
364
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
365
string_value->set_charset(system_charset_info);
367
message::Table::Field *current_proto_field;
369
while ((current_field= *table_fields++) != NULL)
372
* Add the WHERE clause values now...the fields which return true
373
* for isReadSet() are in the WHERE clause. For tables with no
374
* primary or unique key, all fields will be returned.
376
if (current_field->isReadSet())
378
current_proto_field= change_record->add_where_field();
379
current_proto_field->set_name(current_field->field_name);
380
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
381
string_value= current_field->val_str(string_value);
382
change_record->add_where_value(string_value->c_ptr());
383
string_value->free();
390
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
395
message::Command command;
396
command.set_type(message::Command::RAW_SQL);
397
command.set_timestamp(in_session->getCurrentTimestamp());
399
setCommandTransactionContext(command, in_session);
401
string query(in_query, in_query_len);
402
command.set_sql(query);
407
void ReplicationServices::push(drizzled::message::Command &to_push)
409
vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
410
vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
152
411
appl_start_iter= appliers.begin();
154
plugin::TransactionReplicator *cur_repl;
155
plugin::TransactionApplier *cur_appl;
413
plugin::CommandReplicator *cur_repl;
414
plugin::CommandApplier *cur_appl;
157
416
while (repl_iter != replicators.end())
159
418
cur_repl= *repl_iter;
160
if (! cur_repl->isEnabled())
419
if (! cur_repl->isActive())