26
25
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* TransactionServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/transaction.proto
38
#include "drizzled/replication_services.h"
39
#include "drizzled/plugin/transaction_replicator.h"
40
#include "drizzled/plugin/transaction_applier.h"
44
#include "drizzled/server_includes.h"
45
#include "drizzled/transaction_services.h"
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
41
48
#include "drizzled/message/transaction.pb.h"
42
49
#include "drizzled/message/table.pb.h"
43
#include "drizzled/message/statement_transform.h"
44
50
#include "drizzled/gettext.h"
45
51
#include "drizzled/session.h"
46
#include "drizzled/error.h"
52
#include "drizzled/plugin_registry.h"
56
drizzled::TransactionServices transaction_services;
58
void add_replicator(drizzled::plugin::Replicator *repl)
60
transaction_services.attachReplicator(repl);
63
void remove_replicator(drizzled::plugin::Replicator *repl)
65
transaction_services.detachReplicator(repl);
55
ReplicationServices::ReplicationServices()
60
void ReplicationServices::evaluateActivePlugins()
63
* We loop through replicators and appliers, evaluating
64
* whether or not there is at least one active replicator
65
* and one active applier. If not, we set is_active
68
bool tmp_is_active= false;
70
if (replicators.empty() || appliers.empty())
77
* Determine if any remaining replicators and if those
78
* replicators are active...if not, set is_active
81
for (Replicators::iterator repl_iter= replicators.begin();
82
repl_iter != replicators.end();
85
if ((*repl_iter)->isEnabled())
93
/* No active replicators. Set is_active to false and exit. */
99
* OK, we know there's at least one active replicator.
101
* Now determine if any remaining replicators and if those
102
* replicators are active...if not, set is_active
105
for (Appliers::iterator appl_iter= appliers.begin();
106
appl_iter != appliers.end();
109
if ((*appl_iter)->isEnabled())
115
/* If we get here, there are no active appliers */
119
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
72
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
121
74
replicators.push_back(in_replicator);
122
evaluateActivePlugins();
125
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
77
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
127
79
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
128
evaluateActivePlugins();
131
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
82
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
133
84
appliers.push_back(in_applier);
134
evaluateActivePlugins();
137
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
87
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
139
89
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
140
evaluateActivePlugins();
143
bool ReplicationServices::isActive() const
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
150
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
151
vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
92
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
93
, Session *in_session) const
95
using namespace drizzled::message;
97
TransactionContext *trx= in_command->mutable_transaction_context();
98
trx->set_server_id(in_session->getServerId());
99
trx->set_transaction_id(in_session->getTransactionId());
102
void TransactionServices::startTransaction(Session *in_session)
104
using namespace drizzled::message;
106
if (replicators.size() == 0 || appliers.size() == 0)
110
command.set_type(Command::START_TRANSACTION);
111
command.set_timestamp(in_session->getCurrentTimestamp());
113
setCommandTransactionContext(&command, in_session);
118
void TransactionServices::commitTransaction(Session *in_session)
120
using namespace drizzled::message;
122
if (replicators.size() == 0 || appliers.size() == 0)
126
command.set_type(Command::COMMIT);
127
command.set_timestamp(in_session->getCurrentTimestamp());
129
setCommandTransactionContext(&command, in_session);
134
void TransactionServices::rollbackTransaction(Session *in_session)
136
using namespace drizzled::message;
138
if (replicators.size() == 0 || appliers.size() == 0)
142
command.set_type(Command::ROLLBACK);
143
command.set_timestamp(in_session->getCurrentTimestamp());
145
setCommandTransactionContext(&command, in_session);
150
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
152
using namespace drizzled::message;
154
if (replicators.size() == 0 || appliers.size() == 0)
158
command.set_type(Command::INSERT);
159
command.set_timestamp(in_session->getCurrentTimestamp());
161
setCommandTransactionContext(&command, in_session);
163
const char *schema_name= in_table->getShare()->db.str;
164
const char *table_name= in_table->getShare()->table_name.str;
166
command.set_schema(schema_name);
167
command.set_table(table_name);
170
* Now we construct the specialized InsertRecord command inside
171
* the Command container...
173
InsertRecord *change_record= command.mutable_insert_record();
175
Field *current_field;
176
Field **table_fields= in_table->field;
177
String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
178
string_value->set_charset(system_charset_info);
180
Table::Field *cur_field;
182
while ((current_field= *table_fields++) != NULL)
184
cur_field= change_record->add_insert_field();
185
cur_field->set_name(std::string(current_field->field_name));
186
cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
187
string_value= current_field->val_str(string_value);
188
change_record->add_insert_value(std::string(string_value->c_ptr()));
189
string_value->free(); /* I wish there was a clear() method... */
193
delete string_value; /* Is this needed with memroot allocation? */
198
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
200
using namespace drizzled::message;
202
if (replicators.size() == 0 || appliers.size() == 0)
206
command.set_type(Command::UPDATE);
207
command.set_timestamp(in_session->getCurrentTimestamp());
209
setCommandTransactionContext(&command, in_session);
211
const char *schema_name= in_table->getShare()->db.str;
212
const char *table_name= in_table->getShare()->table_name.str;
214
command.set_schema(schema_name);
215
command.set_table(table_name);
218
* Now we construct the specialized UpdateRecord command inside
219
* the Command container...
221
//UpdateRecord *change_record= command.mutable_update_record();
226
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
228
using namespace drizzled::message;
230
if (replicators.size() == 0 || appliers.size() == 0)
234
command.set_type(Command::DELETE);
235
command.set_timestamp(in_session->getCurrentTimestamp());
237
setCommandTransactionContext(&command, in_session);
239
const char *schema_name= in_table->getShare()->db.str;
240
const char *table_name= in_table->getShare()->table_name.str;
242
command.set_schema(schema_name);
243
command.set_table(table_name);
246
* Now we construct the specialized DeleteRecord command inside
247
* the Command container...
249
//DeleteRecord *change_record= command.mutable_delete_record();
254
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
256
using namespace drizzled::message;
258
if (replicators.size() == 0 || appliers.size() == 0)
262
command.set_type(Command::RAW_SQL);
263
command.set_timestamp(in_session->getCurrentTimestamp());
265
setCommandTransactionContext(&command, in_session);
267
std::string query(in_query, in_query_len);
268
command.set_sql(query);
274
void TransactionServices::push(drizzled::message::Command *to_push)
276
std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
277
std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
152
278
appl_start_iter= appliers.begin();
154
plugin::TransactionReplicator *cur_repl;
155
plugin::TransactionApplier *cur_appl;
280
drizzled::plugin::Replicator *cur_repl;
281
drizzled::plugin::Applier *cur_appl;
157
283
while (repl_iter != replicators.end())
159
285
cur_repl= *repl_iter;
160
if (! cur_repl->isEnabled())
286
if (! cur_repl->isActive())