56
drizzled::TransactionServices transaction_services;
58
void add_replicator(drizzled::plugin::Replicator *repl)
60
transaction_services.attachReplicator(repl);
63
void remove_replicator(drizzled::plugin::Replicator *repl)
65
transaction_services.detachReplicator(repl);
56
using namespace drizzled;
58
ReplicationServices replication_services;
60
void add_replicator(plugin::Replicator *replicator)
62
replication_services.attachReplicator(replicator);
65
void remove_replicator(plugin::Replicator *replicator)
67
replication_services.detachReplicator(replicator);
70
void add_applier(plugin::Applier *applier)
72
replication_services.attachApplier(applier);
75
void remove_applier(plugin::Applier *applier)
77
replication_services.detachApplier(applier);
72
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
83
ReplicationServices::ReplicationServices()
88
void ReplicationServices::evaluateActivePlugins()
91
* We loop through replicators and appliers, evaluating
92
* whether or not there is at least one active replicator
93
* and one active applier. If not, we set is_active
96
bool tmp_is_active= false;
98
if (replicators.size() == 0 || appliers.size() == 0)
105
* Determine if any remaining replicators and if those
106
* replicators are active...if not, set is_active
109
std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
110
while (repl_iter != replicators.end())
112
if ((*repl_iter)->isActive())
121
/* No active replicators. Set is_active to false and exit. */
127
* OK, we know there's at least one active replicator.
129
* Now determine if any remaining replicators and if those
130
* replicators are active...if not, set is_active
133
std::vector<plugin::Applier *>::iterator appl_iter= appliers.begin();
134
while (appl_iter != appliers.end())
136
if ((*appl_iter)->isActive())
143
/* If we get here, there are no active appliers */
147
void ReplicationServices::attachReplicator(plugin::Replicator *in_replicator)
74
149
replicators.push_back(in_replicator);
150
evaluateActivePlugins();
77
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
153
void ReplicationServices::detachReplicator(plugin::Replicator *in_replicator)
79
155
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
156
evaluateActivePlugins();
82
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
159
void ReplicationServices::attachApplier(plugin::Applier *in_applier)
84
161
appliers.push_back(in_applier);
162
evaluateActivePlugins();
87
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
165
void ReplicationServices::detachApplier(plugin::Applier *in_applier)
89
167
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
92
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
168
evaluateActivePlugins();
171
bool ReplicationServices::isActive() const
176
void ReplicationServices::setCommandTransactionContext(message::Command *in_command
93
177
, Session *in_session) const
95
using namespace drizzled::message;
97
TransactionContext *trx= in_command->mutable_transaction_context();
179
message::TransactionContext *trx= in_command->mutable_transaction_context();
98
180
trx->set_server_id(in_session->getServerId());
99
181
trx->set_transaction_id(in_session->getTransactionId());
102
void TransactionServices::startTransaction(Session *in_session)
104
using namespace drizzled::message;
106
if (replicators.size() == 0 || appliers.size() == 0)
110
command.set_type(Command::START_TRANSACTION);
111
command.set_timestamp(in_session->getCurrentTimestamp());
113
setCommandTransactionContext(&command, in_session);
118
void TransactionServices::commitTransaction(Session *in_session)
120
using namespace drizzled::message;
122
if (replicators.size() == 0 || appliers.size() == 0)
126
command.set_type(Command::COMMIT);
127
command.set_timestamp(in_session->getCurrentTimestamp());
129
setCommandTransactionContext(&command, in_session);
134
void TransactionServices::rollbackTransaction(Session *in_session)
136
using namespace drizzled::message;
138
if (replicators.size() == 0 || appliers.size() == 0)
142
command.set_type(Command::ROLLBACK);
143
command.set_timestamp(in_session->getCurrentTimestamp());
145
setCommandTransactionContext(&command, in_session);
150
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
152
using namespace drizzled::message;
154
if (replicators.size() == 0 || appliers.size() == 0)
158
command.set_type(Command::INSERT);
184
void ReplicationServices::startTransaction(Session *in_session)
189
message::Command command;
190
command.set_type(message::Command::START_TRANSACTION);
191
command.set_timestamp(in_session->getCurrentTimestamp());
193
setCommandTransactionContext(&command, in_session);
198
void ReplicationServices::commitTransaction(Session *in_session)
203
message::Command command;
204
command.set_type(message::Command::COMMIT);
205
command.set_timestamp(in_session->getCurrentTimestamp());
207
setCommandTransactionContext(&command, in_session);
212
void ReplicationServices::rollbackTransaction(Session *in_session)
217
message::Command command;
218
command.set_type(message::Command::ROLLBACK);
219
command.set_timestamp(in_session->getCurrentTimestamp());
221
setCommandTransactionContext(&command, in_session);
226
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
231
message::Command command;
232
command.set_type(message::Command::INSERT);
159
233
command.set_timestamp(in_session->getCurrentTimestamp());
161
235
setCommandTransactionContext(&command, in_session);
170
244
* Now we construct the specialized InsertRecord command inside
171
* the Command container...
245
* the message::Command container...
173
InsertRecord *change_record= command.mutable_insert_record();
247
message::InsertRecord *change_record= command.mutable_insert_record();
175
249
Field *current_field;
176
250
Field **table_fields= in_table->field;
177
String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
251
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
178
252
string_value->set_charset(system_charset_info);
180
Table::Field *cur_field;
254
message::Table::Field *current_proto_field;
256
/* We will read all the table's fields... */
257
in_table->setReadSet();
182
259
while ((current_field= *table_fields++) != NULL)
184
cur_field= change_record->add_insert_field();
185
cur_field->set_name(std::string(current_field->field_name));
186
cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
261
current_proto_field= change_record->add_insert_field();
262
current_proto_field->set_name(std::string(current_field->field_name));
263
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
187
264
string_value= current_field->val_str(string_value);
188
265
change_record->add_insert_value(std::string(string_value->c_ptr()));
189
string_value->free(); /* I wish there was a clear() method... */
266
string_value->free();
193
delete string_value; /* Is this needed with memroot allocation? */
198
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
272
void ReplicationServices::updateRecord(Session *in_session,
274
const unsigned char *old_record,
275
const unsigned char *new_record)
200
using namespace drizzled::message;
202
if (replicators.size() == 0 || appliers.size() == 0)
206
command.set_type(Command::UPDATE);
280
message::Command command;
281
command.set_type(message::Command::UPDATE);
207
282
command.set_timestamp(in_session->getCurrentTimestamp());
209
284
setCommandTransactionContext(&command, in_session);
218
293
* Now we construct the specialized UpdateRecord command inside
219
* the Command container...
294
* the message::Command container...
221
//UpdateRecord *change_record= command.mutable_update_record();
296
message::UpdateRecord *change_record= command.mutable_update_record();
298
Field *current_field;
299
Field **table_fields= in_table->field;
300
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
301
string_value->set_charset(system_charset_info);
303
message::Table::Field *current_proto_field;
305
while ((current_field= *table_fields++) != NULL)
308
* The below really should be moved into the Field API and Record API. But for now
309
* we do this crazy pointer fiddling to figure out if the current field
310
* has been updated in the supplied record raw byte pointers.
312
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
313
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
315
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
317
if (memcmp(old_ptr, new_ptr, field_length) != 0)
319
/* Field is changed from old to new */
320
current_proto_field= change_record->add_update_field();
321
current_proto_field->set_name(std::string(current_field->field_name));
322
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
324
/* Store the original "read bit" for this field */
325
bool is_read_set= current_field->isReadSet();
327
/* We need to mark that we will "read" this field... */
328
in_table->setReadSet(current_field->field_index);
330
/* Read the string value of this field's contents */
331
string_value= current_field->val_str(string_value);
334
* Reset the read bit after reading field to its original state. This
335
* prevents the field from being included in the WHERE clause
337
current_field->setReadSet(is_read_set);
339
change_record->add_after_value(std::string(string_value->c_ptr()));
340
string_value->free();
344
* Add the WHERE clause values now...the fields which return true
345
* for isReadSet() are in the WHERE clause. For tables with no
346
* primary or unique key, all fields will be returned.
348
if (current_field->isReadSet())
350
current_proto_field= change_record->add_where_field();
351
current_proto_field->set_name(std::string(current_field->field_name));
352
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
353
string_value= current_field->val_str(string_value);
354
change_record->add_where_value(std::string(string_value->c_ptr()));
355
string_value->free();
226
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
362
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
228
using namespace drizzled::message;
230
if (replicators.size() == 0 || appliers.size() == 0)
234
command.set_type(Command::DELETE);
367
message::Command command;
368
command.set_type(message::Command::DELETE);
235
369
command.set_timestamp(in_session->getCurrentTimestamp());
237
371
setCommandTransactionContext(&command, in_session);
246
380
* Now we construct the specialized DeleteRecord command inside
247
* the Command container...
381
* the message::Command container...
249
//DeleteRecord *change_record= command.mutable_delete_record();
383
message::DeleteRecord *change_record= command.mutable_delete_record();
385
Field *current_field;
386
Field **table_fields= in_table->field;
387
String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
388
string_value->set_charset(system_charset_info);
390
message::Table::Field *current_proto_field;
392
while ((current_field= *table_fields++) != NULL)
395
* Add the WHERE clause values now...the fields which return true
396
* for isReadSet() are in the WHERE clause. For tables with no
397
* primary or unique key, all fields will be returned.
399
if (current_field->isReadSet())
401
current_proto_field= change_record->add_where_field();
402
current_proto_field->set_name(std::string(current_field->field_name));
403
current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
404
string_value= current_field->val_str(string_value);
405
change_record->add_where_value(std::string(string_value->c_ptr()));
406
string_value->free();
254
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
413
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
256
using namespace drizzled::message;
258
if (replicators.size() == 0 || appliers.size() == 0)
262
command.set_type(Command::RAW_SQL);
418
message::Command command;
419
command.set_type(message::Command::RAW_SQL);
263
420
command.set_timestamp(in_session->getCurrentTimestamp());
265
422
setCommandTransactionContext(&command, in_session);