17
21
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
21
#include <drizzled/replicator.h>
22
#include <drizzled/gettext.h>
23
#include <drizzled/session.h>
25
* @file Server-side utility which is responsible for managing the
26
* communication between the kernel, replicator plugins, and applier plugins.
28
* TransactionServices is a bridge between modules and the kernel, and its
29
* primary function is to take internal events (for instance the start of a
30
* transaction, the changing of a record, or the rollback of a transaction)
31
* and construct GPB Messages that are passed to the registered replicator and
34
* The reason for this functionality is to encapsulate all communication
35
* between the kernel and the replicator/applier plugins into GPB Messages.
36
* Instead of the plugin having to understand the (often fluidly changing)
37
* mechanics of the kernel, all the plugin needs to understand is the message
38
* format, and GPB messages provide a nice, clear, and versioned format for
41
* @see /drizzled/message/transaction.proto
44
#include "drizzled/server_includes.h"
45
#include "drizzled/transaction_services.h"
46
#include "drizzled/plugin/replicator.h"
47
#include "drizzled/plugin/applier.h"
48
#include "drizzled/message/transaction.pb.h"
49
#include "drizzled/message/table.pb.h"
50
#include "drizzled/gettext.h"
51
#include "drizzled/session.h"
55
drizzled::TransactionServices transaction_services;
60
* We're going to start simple at first, meaning that the
61
* below are the global vectors of replicators and appliers. The
62
* end goal is to have the TransactionServices have a register method
63
* which allows modules to register Replicator or Applier *factories*,
64
* which will allow TransactionServices to attach and detach a replicator/applier
65
* to a Session, instead of the current global vector.
25
67
int replicator_initializer(st_plugin_int *plugin)
69
drizzled::plugin::Replicator *repl= NULL;
29
71
if (plugin->plugin->init)
31
if (plugin->plugin->init((void *)&p))
73
if (plugin->plugin->init(&repl))
33
75
/* TRANSLATORS: The leading word "replicator" is the name
34
76
of the plugin api, and so should not be translated. */
42
plugin->data= (void *)p;
87
transaction_services.attachReplicator(repl);
47
93
int replicator_finalizer(st_plugin_int *plugin)
49
Replicator *p= static_cast<Replicator *>(plugin->data);
95
drizzled::plugin::Replicator *repl= static_cast<drizzled::plugin::Replicator *>(plugin->data);
99
transaction_services.detachReplicator(repl);
51
101
if (plugin->plugin->deinit)
53
if (plugin->plugin->deinit((void *)p))
55
/* TRANSLATORS: The leading word "replicator" is the name
56
of the plugin api, and so should not be translated. */
57
errmsg_printf(ERRMSG_LVL_ERROR,
58
_("replicator plugin '%s' deinit() failed"),
102
(void) plugin->plugin->deinit(repl);
110
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
112
replicators.push_back(in_replicator);
115
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
117
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
120
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
122
appliers.push_back(in_applier);
125
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
127
appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
130
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
131
, Session *in_session) const
133
using namespace drizzled::message;
135
TransactionContext *trx= in_command->mutable_transaction_context();
136
trx->set_server_id(in_session->getServerId());
137
trx->set_transaction_id(in_session->getTransactionId());
140
void TransactionServices::startTransaction(Session *in_session)
142
using namespace drizzled::message;
145
command.set_type(Command::START_TRANSACTION);
146
command.set_timestamp(in_session->getCurrentTimestamp());
148
setCommandTransactionContext(&command, in_session);
153
void TransactionServices::commitTransaction(Session *in_session)
155
using namespace drizzled::message;
158
command.set_type(Command::COMMIT);
159
command.set_timestamp(in_session->getCurrentTimestamp());
161
setCommandTransactionContext(&command, in_session);
166
void TransactionServices::rollbackTransaction(Session *in_session)
168
using namespace drizzled::message;
171
command.set_type(Command::ROLLBACK);
172
command.set_timestamp(in_session->getCurrentTimestamp());
174
setCommandTransactionContext(&command, in_session);
179
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
181
using namespace drizzled::message;
184
command.set_type(Command::INSERT);
185
command.set_timestamp(in_session->getCurrentTimestamp());
187
setCommandTransactionContext(&command, in_session);
189
const char *schema_name= in_table->getShare()->db.str;
190
const char *table_name= in_table->getShare()->table_name.str;
192
command.set_schema(schema_name);
193
command.set_table(table_name);
196
* Now we construct the specialized InsertRecord command inside
197
* the Command container...
199
InsertRecord *change_record= command.mutable_insert_record();
201
Field *current_field;
202
Field **table_fields= in_table->field;
203
String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
204
string_value->set_charset(system_charset_info);
206
Table::Field *cur_field;
208
while ((current_field= *table_fields++) != NULL)
210
cur_field= change_record->add_insert_field();
211
cur_field->set_name(std::string(current_field->field_name));
212
cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
213
string_value= current_field->val_str(string_value);
214
change_record->add_insert_value(std::string(string_value->c_ptr()));
215
string_value->free(); /* I wish there was a clear() method... */
221
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
223
using namespace drizzled::message;
226
command.set_type(Command::UPDATE);
227
command.set_timestamp(in_session->getCurrentTimestamp());
229
setCommandTransactionContext(&command, in_session);
231
const char *schema_name= in_table->getShare()->db.str;
232
const char *table_name= in_table->getShare()->table_name.str;
234
command.set_schema(schema_name);
235
command.set_table(table_name);
238
* Now we construct the specialized UpdateRecord command inside
239
* the Command container...
241
//UpdateRecord *change_record= command.mutable_update_record();
246
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
248
using namespace drizzled::message;
251
command.set_type(Command::DELETE);
252
command.set_timestamp(in_session->getCurrentTimestamp());
254
setCommandTransactionContext(&command, in_session);
256
const char *schema_name= in_table->getShare()->db.str;
257
const char *table_name= in_table->getShare()->table_name.str;
259
command.set_schema(schema_name);
260
command.set_table(table_name);
263
* Now we construct the specialized DeleteRecord command inside
264
* the Command container...
266
//DeleteRecord *change_record= command.mutable_delete_record();
271
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
273
using namespace drizzled::message;
276
command.set_type(Command::RAW_SQL);
277
command.set_timestamp(in_session->getCurrentTimestamp());
279
setCommandTransactionContext(&command, in_session);
281
std::string query(in_query, in_query_len);
282
command.set_sql(query);
288
void TransactionServices::push(drizzled::message::Command *to_push)
290
std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
291
std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
292
appl_start_iter= appliers.begin();
294
drizzled::plugin::Replicator *cur_repl;
295
drizzled::plugin::Applier *cur_appl;
297
while (repl_iter != replicators.end())
299
cur_repl= *repl_iter;
300
if (! cur_repl->isActive())
306
appl_iter= appl_start_iter;
307
while (appl_iter != appliers.end())
309
cur_appl= *appl_iter;
311
if (! cur_appl->isActive())
317
cur_repl->replicate(cur_appl, to_push);
66
325
/* This gets called by plugin_foreach once for each loaded replicator plugin */
67
static bool replicator_session_iterate(Session *session, st_plugin_int *plugin, void *)
326
static bool replicator_session_iterate(Session *session, plugin_ref plugin, void *)
69
Replicator *repl= plugin_data(plugin, Replicator *);
328
drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
330
if (! repl || ! repl->isActive())
72
333
/* call this loaded replicator plugin's session_init method */
334
if (! repl->initSession(session))
75
error= repl->session_init(session);
78
/* TRANSLATORS: The leading word "replicator" is the name
79
of the plugin api, and so should not be translated. */
80
errmsg_printf(ERRMSG_LVL_ERROR,
81
_("replicator plugin '%s' session_init() failed"),
82
(char *)plugin_name(plugin));
336
/* TRANSLATORS: The leading word "replicator" is the name
337
of the plugin api, and so should not be translated. */
338
errmsg_printf(ERRMSG_LVL_ERROR,
339
_("replicator plugin '%s' session_init() failed"),
340
(char *)plugin_name(plugin));
137
390
/* This gets called by plugin_foreach once for each loaded replicator plugin */
138
static bool replicator_do_row_iterate (Session *session, st_plugin_int *plugin, void *p)
391
static bool replicator_do_row_iterate (Session *session, plugin_ref plugin, void *p)
140
Replicator *repl= plugin_data(plugin, Replicator *);
393
drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
395
if (! repl || ! repl->isActive())
141
398
replicator_row_parms_st *params= static_cast<replicator_row_parms_st *>(p);
143
switch (params->type) {
401
* We create a container ChangeRecord and then specialize the change
402
* record depending on the actual event which occurred...
404
drizzled::message::ChangeRecord change_record;
406
* When inserting a row, we want to pass the replicator only the
407
* specific information it needs, which is the name of the schema,
408
* the name of the table, the list of field names in the field list of
409
* the INSERT expression, and the values of this written row as strings.
411
* @TODO Eventually, it would be better to simply pass a Table
412
* proto message instead of both a schema name and a table
415
* @TODO Better to not have to pass string values...
417
* @TODO Pass pointers here instead of copies of values/field names?
418
* For large inserts, might be in trouble of running out of
419
* stack space? Not sure...
421
* @TODO Ugh, get rid of the friggin custom String shit.
423
const char *schema_name= params->table->getShare()->db.str;
424
const char *table_name= params->table->getShare()->table_name.str;
426
change_record.set_schema(schema_name);
427
change_record.set_table(table_name);
428
switch (params->type)
147
if (repl->row_insert(session, params->table))
433
drizzled::message::InsertRecord *irecord= change_record.mutable_insert_record();
435
std::vector<std::string> values;
436
std::vector<std::string> field_names;
438
Field *current_field;
439
Field **table_fields= params->table->field;
440
String *string_value= new (session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
441
string_value->set_charset(system_charset_info);
443
while ((current_field= *table_fields++) != NULL)
149
/* TRANSLATORS: The leading word "replicator" is the name
150
of the plugin api, and so should not be translated. */
151
errmsg_printf(ERRMSG_LVL_ERROR,
152
_("replicator plugin '%s' row_insert() failed"),
153
(char *)plugin_name(plugin));
446
field_names.push_back(std::string(current_field->field_name));
447
string_value= current_field->val_str(string_value);
448
values.push_back(std::string(string_value->c_ptr()));
449
string_value->free(); /* I wish there was a clear() method... */
452
repl->replicate(appl, &change_record);
162
if (repl->row_update(session, params->table,
163
params->before, params->after))
459
if (! repl->updateRow(session, params->table, params->before, params->after))
165
461
/* TRANSLATORS: The leading word "replicator" is the name
166
462
of the plugin api, and so should not be translated. */
254
549
} replicator_row_end_st;
256
551
/* We call this to end a statement (on each registered plugin) */
257
static bool replicator_end_transaction_iterate (Session *session, st_plugin_int *plugin, void *p)
552
static bool replicator_end_transaction_iterate (Session *session, plugin_ref plugin, void *p)
259
Replicator *repl= plugin_data(plugin, Replicator *);
554
drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
260
555
replicator_row_end_st *params= static_cast<replicator_row_end_st *>(p);
557
if (! repl || ! repl->isActive())
262
560
/* call this loaded replicator plugin's replicator_func1 function pointer */
561
if (! repl->endTransaction(session, params->autocommit, params->commit))
265
if (repl->end_transaction(session, params->autocommit, params->commit))
267
/* TRANSLATORS: The leading word "replicator" is the name
268
of the plugin api, and so should not be translated. */
269
errmsg_printf(ERRMSG_LVL_ERROR,
270
_("replicator plugin '%s' end_transaction() failed"),
271
(char *)plugin_name(plugin));
563
/* TRANSLATORS: The leading word "replicator" is the name
564
of the plugin api, and so should not be translated. */
565
errmsg_printf(ERRMSG_LVL_ERROR,
566
_("replicator plugin '%s' end_transaction() failed"),
567
(char *)plugin_name(plugin));
311
605
size_t query_length;
312
606
} replicator_statement_st;
314
/* We call this to end a statement (on each registered plugin) */
315
static bool replicator_statement_iterate(Session *session, st_plugin_int *plugin, void *p)
608
/* We call this to begin a statement (on each registered plugin) */
609
static bool replicator_statement_iterate(Session *session, plugin_ref plugin, void *p)
317
Replicator *repl= plugin_data(plugin, Replicator *);
611
drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
318
612
replicator_statement_st *params= (replicator_statement_st *)p;
614
if (! repl || ! repl->isActive())
320
617
/* call this loaded replicator plugin's replicator_func1 function pointer */
618
if (! repl->beginStatement(session, params->query, params->query_length))
323
if (repl->statement(session, params->query, params->query_length))
325
/* TRANSLATORS: The leading word "replicator" is the name
326
of the plugin api, and so should not be translated. */
327
errmsg_printf(ERRMSG_LVL_ERROR,
328
_("replicator plugin '%s' statement() failed"),
329
(char *)plugin_name(plugin));
620
/* TRANSLATORS: The leading word "replicator" is the name
621
of the plugin api, and so should not be translated. */
622
errmsg_printf(ERRMSG_LVL_ERROR,
623
_("replicator plugin '%s' statement() failed"),
624
(char *)plugin_name(plugin));