17
21
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
21
#include <drizzled/replicator.h>
22
#include <drizzled/gettext.h>
23
#include <drizzled/session.h>
25
int replicator_initializer(st_plugin_int *plugin)
29
if (plugin->plugin->init)
31
if (plugin->plugin->init((void *)&p))
33
/* TRANSLATORS: The leading word "replicator" is the name
34
of the plugin api, and so should not be translated. */
35
errmsg_printf(ERRMSG_LVL_ERROR,
36
_("replicator plugin '%s' init() failed"),
42
plugin->data= (void *)p;
47
int replicator_finalizer(st_plugin_int *plugin)
49
Replicator *p= static_cast<Replicator *>(plugin->data);
51
if (plugin->plugin->deinit)
53
if (plugin->plugin->deinit((void *)p))
55
/* TRANSLATORS: The leading word "replicator" is the name
56
of the plugin api, and so should not be translated. */
57
errmsg_printf(ERRMSG_LVL_ERROR,
58
_("replicator plugin '%s' deinit() failed"),
66
/* This gets called by plugin_foreach once for each loaded replicator plugin */
67
static bool replicator_session_iterate(Session *session, st_plugin_int *plugin, void *)
69
Replicator *repl= plugin_data(plugin, Replicator *);
72
/* call this loaded replicator plugin's session_init method */
75
error= repl->session_init(session);
78
/* TRANSLATORS: The leading word "replicator" is the name
79
of the plugin api, and so should not be translated. */
80
errmsg_printf(ERRMSG_LVL_ERROR,
81
_("replicator plugin '%s' session_init() failed"),
82
(char *)plugin_name(plugin));
91
This call is called once at the begining of each transaction.
93
extern StorageEngine *binlog_engine;
94
bool replicator_session_init(Session *session)
98
if (session->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
99
trans_register_ha(session, true, binlog_engine);
100
trans_register_ha(session, false, binlog_engine);
102
if (session->getReplicationData())
106
call replicator_session_iterate
107
once for each loaded replicator plugin
109
foreach_rv= plugin_foreach(session, replicator_session_iterate,
110
DRIZZLE_REPLICATOR_PLUGIN, NULL);
115
/* The plugin_foreach() iterator requires that we
116
convert all the parameters of a plugin api entry point
117
into just one single void ptr, plus the session.
118
So we will take all the additional paramters of replicator_do2,
119
and marshall them into a struct of this type, and
120
then just pass in a pointer to it.
122
enum repl_row_exec_t{
128
typedef struct replicator_row_parms_st
130
repl_row_exec_t type;
132
const unsigned char *before;
133
const unsigned char *after;
134
} replicator_row_parms_st;
137
/* This gets called by plugin_foreach once for each loaded replicator plugin */
138
static bool replicator_do_row_iterate (Session *session, st_plugin_int *plugin, void *p)
140
Replicator *repl= plugin_data(plugin, Replicator *);
141
replicator_row_parms_st *params= static_cast<replicator_row_parms_st *>(p);
143
switch (params->type) {
147
if (repl->row_insert(session, params->table))
149
/* TRANSLATORS: The leading word "replicator" is the name
150
of the plugin api, and so should not be translated. */
151
errmsg_printf(ERRMSG_LVL_ERROR,
152
_("replicator plugin '%s' row_insert() failed"),
153
(char *)plugin_name(plugin));
162
if (repl->row_update(session, params->table,
163
params->before, params->after))
165
/* TRANSLATORS: The leading word "replicator" is the name
166
of the plugin api, and so should not be translated. */
167
errmsg_printf(ERRMSG_LVL_ERROR,
168
_("replicator plugin '%s' row_update() failed"),
169
(char *)plugin_name(plugin));
178
if (repl->row_delete(session, params->table))
180
/* TRANSLATORS: The leading word "replicator" is the name
181
of the plugin api, and so should not be translated. */
182
errmsg_printf(ERRMSG_LVL_ERROR,
183
_("replicator plugin '%s' row_delete() failed"),
184
(char *)plugin_name(plugin));
194
/* This is the replicator_do_row entry point.
195
This gets called by the rest of the Drizzle server code */
196
static bool replicator_do_row (Session *session,
197
replicator_row_parms_st *params)
201
foreach_rv= plugin_foreach(session, replicator_do_row_iterate,
202
DRIZZLE_REPLICATOR_PLUGIN, params);
206
bool replicator_write_row(Session *session, Table *table)
208
replicator_row_parms_st param;
210
param.type= repl_insert;
215
return replicator_do_row(session, ¶m);
218
bool replicator_update_row(Session *session, Table *table,
219
const unsigned char *before,
220
const unsigned char *after)
222
replicator_row_parms_st param;
224
param.type= repl_update;
227
param.before= before;
229
return replicator_do_row(session, ¶m);
232
bool replicator_delete_row(Session *session, Table *table)
234
replicator_row_parms_st param;
236
param.type= repl_delete;
241
return replicator_do_row(session, ¶m);
247
Ok, not so much dragons, but this is where we handle either commits or rollbacks of
250
typedef struct replicator_row_end_st
254
} replicator_row_end_st;
256
/* We call this to end a statement (on each registered plugin) */
257
static bool replicator_end_transaction_iterate (Session *session, st_plugin_int *plugin, void *p)
259
Replicator *repl= plugin_data(plugin, Replicator *);
260
replicator_row_end_st *params= static_cast<replicator_row_end_st *>(p);
262
/* call this loaded replicator plugin's replicator_func1 function pointer */
265
if (repl->end_transaction(session, params->autocommit, params->commit))
267
/* TRANSLATORS: The leading word "replicator" is the name
268
of the plugin api, and so should not be translated. */
269
errmsg_printf(ERRMSG_LVL_ERROR,
270
_("replicator plugin '%s' end_transaction() failed"),
271
(char *)plugin_name(plugin));
279
bool replicator_end_transaction(Session *session, bool autocommit, bool commit)
282
replicator_row_end_st params;
284
params.autocommit= autocommit;
285
params.commit= commit;
287
/* We need to free any data we did an init of for the session */
288
foreach_rv= plugin_foreach(session, replicator_end_transaction_iterate,
289
DRIZZLE_REPLICATOR_PLUGIN, (void *) ¶ms);
295
If you can do real 2PC this is your hook poing to know that the event is coming.
297
Always true for the moment.
300
bool replicator_prepare(Session *)
308
typedef struct replicator_statement_st
312
} replicator_statement_st;
314
/* We call this to end a statement (on each registered plugin) */
315
static bool replicator_statement_iterate(Session *session, st_plugin_int *plugin, void *p)
317
Replicator *repl= plugin_data(plugin, Replicator *);
318
replicator_statement_st *params= (replicator_statement_st *)p;
320
/* call this loaded replicator plugin's replicator_func1 function pointer */
323
if (repl->statement(session, params->query, params->query_length))
325
/* TRANSLATORS: The leading word "replicator" is the name
326
of the plugin api, and so should not be translated. */
327
errmsg_printf(ERRMSG_LVL_ERROR,
328
_("replicator plugin '%s' statement() failed"),
329
(char *)plugin_name(plugin));
337
bool replicator_statement(Session *session, const char *query, size_t query_length)
340
replicator_statement_st params;
343
params.query_length= query_length;
345
/* We need to free any data we did an init of for the session */
346
foreach_rv= plugin_foreach(session, replicator_statement_iterate,
347
DRIZZLE_REPLICATOR_PLUGIN, (void *) ¶ms);
25
* @file Server-side utility which is responsible for managing the
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* TransactionServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/transaction.proto
44
#include "drizzled/server_includes.h"
45
#include "drizzled/transaction_services.h"
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
48
#include "drizzled/message/transaction.pb.h"
49
#include "drizzled/message/table.pb.h"
50
#include "drizzled/gettext.h"
51
#include "drizzled/session.h"
52
#include "drizzled/plugin_registry.h"
56
drizzled::TransactionServices transaction_services;
58
void add_replicator(drizzled::plugin::Replicator *repl)
60
transaction_services.attachReplicator(repl);
63
void remove_replicator(drizzled::plugin::Replicator *repl)
65
transaction_services.detachReplicator(repl);
72
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
74
replicators.push_back(in_replicator);
77
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
79
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
82
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
84
appliers.push_back(in_applier);
87
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
89
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
92
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
93
, Session *in_session) const
95
using namespace drizzled::message;
97
TransactionContext *trx= in_command->mutable_transaction_context();
98
trx->set_server_id(in_session->getServerId());
99
trx->set_transaction_id(in_session->getTransactionId());
102
void TransactionServices::startTransaction(Session *in_session)
104
using namespace drizzled::message;
106
if (replicators.size() == 0 || appliers.size() == 0)
110
command.set_type(Command::START_TRANSACTION);
111
command.set_timestamp(in_session->getCurrentTimestamp());
113
setCommandTransactionContext(&command, in_session);
118
void TransactionServices::commitTransaction(Session *in_session)
120
using namespace drizzled::message;
122
if (replicators.size() == 0 || appliers.size() == 0)
126
command.set_type(Command::COMMIT);
127
command.set_timestamp(in_session->getCurrentTimestamp());
129
setCommandTransactionContext(&command, in_session);
134
void TransactionServices::rollbackTransaction(Session *in_session)
136
using namespace drizzled::message;
138
if (replicators.size() == 0 || appliers.size() == 0)
142
command.set_type(Command::ROLLBACK);
143
command.set_timestamp(in_session->getCurrentTimestamp());
145
setCommandTransactionContext(&command, in_session);
150
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
152
using namespace drizzled::message;
154
if (replicators.size() == 0 || appliers.size() == 0)
158
command.set_type(Command::INSERT);
159
command.set_timestamp(in_session->getCurrentTimestamp());
161
setCommandTransactionContext(&command, in_session);
163
const char *schema_name= in_table->getShare()->db.str;
164
const char *table_name= in_table->getShare()->table_name.str;
166
command.set_schema(schema_name);
167
command.set_table(table_name);
170
* Now we construct the specialized InsertRecord command inside
171
* the Command container...
173
InsertRecord *change_record= command.mutable_insert_record();
175
Field *current_field;
176
Field **table_fields= in_table->field;
177
String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
178
string_value->set_charset(system_charset_info);
180
Table::Field *cur_field;
182
while ((current_field= *table_fields++) != NULL)
184
cur_field= change_record->add_insert_field();
185
cur_field->set_name(std::string(current_field->field_name));
186
cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
187
string_value= current_field->val_str(string_value);
188
change_record->add_insert_value(std::string(string_value->c_ptr()));
189
string_value->free(); /* I wish there was a clear() method... */
193
delete string_value; /* Is this needed with memroot allocation? */
198
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
200
using namespace drizzled::message;
202
if (replicators.size() == 0 || appliers.size() == 0)
206
command.set_type(Command::UPDATE);
207
command.set_timestamp(in_session->getCurrentTimestamp());
209
setCommandTransactionContext(&command, in_session);
211
const char *schema_name= in_table->getShare()->db.str;
212
const char *table_name= in_table->getShare()->table_name.str;
214
command.set_schema(schema_name);
215
command.set_table(table_name);
218
* Now we construct the specialized UpdateRecord command inside
219
* the Command container...
221
//UpdateRecord *change_record= command.mutable_update_record();
226
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
228
using namespace drizzled::message;
230
if (replicators.size() == 0 || appliers.size() == 0)
234
command.set_type(Command::DELETE);
235
command.set_timestamp(in_session->getCurrentTimestamp());
237
setCommandTransactionContext(&command, in_session);
239
const char *schema_name= in_table->getShare()->db.str;
240
const char *table_name= in_table->getShare()->table_name.str;
242
command.set_schema(schema_name);
243
command.set_table(table_name);
246
* Now we construct the specialized DeleteRecord command inside
247
* the Command container...
249
//DeleteRecord *change_record= command.mutable_delete_record();
254
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
256
using namespace drizzled::message;
258
if (replicators.size() == 0 || appliers.size() == 0)
262
command.set_type(Command::RAW_SQL);
263
command.set_timestamp(in_session->getCurrentTimestamp());
265
setCommandTransactionContext(&command, in_session);
267
std::string query(in_query, in_query_len);
268
command.set_sql(query);
274
void TransactionServices::push(drizzled::message::Command *to_push)
276
std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
277
std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
278
appl_start_iter= appliers.begin();
280
drizzled::plugin::Replicator *cur_repl;
281
drizzled::plugin::Applier *cur_appl;
283
while (repl_iter != replicators.end())
285
cur_repl= *repl_iter;
286
if (! cur_repl->isActive())
292
appl_iter= appl_start_iter;
293
while (appl_iter != appliers.end())
295
cur_appl= *appl_iter;
297
if (! cur_appl->isActive())
303
cur_repl->replicate(cur_appl, to_push);
311
} /* end namespace drizzled */