22
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
26
* @file Server-side utility which is responsible for managing the
27
* communication between the kernel and the replication plugins:
29
* - TransactionReplicator
30
* - TransactionApplier
34
* ReplicationServices is a bridge between replication modules and the kernel,
35
* and its primary function is to */
38
#include <drizzled/replication_services.h>
39
#include <drizzled/plugin/transaction_replicator.h>
40
#include <drizzled/plugin/transaction_applier.h>
41
#include <drizzled/message/transaction.pb.h>
20
#include <drizzled/server_includes.h>
21
#include <drizzled/replicator.h>
42
22
#include <drizzled/gettext.h>
43
23
#include <drizzled/session.h>
44
#include <drizzled/error.h>
55
ReplicationServices::ReplicationServices() :
60
void ReplicationServices::normalizeReplicatorName(string &name)
62
transform(name.begin(),
66
if (name.find("replicator") == string::npos)
67
name.append("replicator", 10);
69
size_t found_underscore= name.find('_');
70
while (found_underscore != string::npos)
72
name.erase(found_underscore, 1);
73
found_underscore= name.find('_');
78
bool ReplicationServices::evaluateRegisteredPlugins()
25
int replicator_initializer(st_plugin_int *plugin)
30
if (p == NULL) return 1;
31
memset(p, 0, sizeof(replicator_t));
33
plugin->data= (void *)p;
35
if (plugin->plugin->init)
37
if (plugin->plugin->init((void *)p))
39
/* TRANSLATORS: The leading word "replicator" is the name
40
of the plugin api, and so should not be translated. */
41
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' init() failed"),
46
plugin->state= PLUGIN_IS_READY;
55
int replicator_finalizer(st_plugin_int *plugin)
57
replicator_t *p= (replicator_t *) plugin->data;
59
if (plugin->plugin->deinit)
61
if (plugin->plugin->deinit((void *)p))
63
/* TRANSLATORS: The leading word "replicator" is the name
64
of the plugin api, and so should not be translated. */
65
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' deinit() failed"),
75
/* This gets called by plugin_foreach once for each loaded replicator plugin */
76
static bool replicator_session_iterate(Session *session, plugin_ref plugin, void *)
78
replicator_t *repl= plugin_data(plugin, replicator_t *);
81
/* call this loaded replicator plugin's replicator_func1 function pointer */
82
if (repl && repl->session_init)
84
error= repl->session_init(session);
87
/* TRANSLATORS: The leading word "replicator" is the name
88
of the plugin api, and so should not be translated. */
89
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' session_init() failed"),
90
(char *)plugin_name(plugin));
99
This call is called once at the begining of each transaction.
101
extern handlerton *binlog_hton;
102
bool replicator_session_init(Session *session)
106
if (session->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
107
trans_register_ha(session, true, binlog_hton);
108
trans_register_ha(session, false, binlog_hton);
110
if (session->getReplicationData())
81
* We loop through appliers that have registered with us
82
* and attempts to pair the applier with its requested
83
* replicator. If an applier has requested a replicator
84
* that has either not been built or has not registered
85
* with the replication services, we print an error and
91
if (replicators.empty() && not appliers.empty())
93
errmsg_printf(error::ERROR,
94
N_("You registered a TransactionApplier plugin but no "
95
"TransactionReplicator plugins were registered.\n"));
99
for (Appliers::iterator appl_iter= appliers.begin();
100
appl_iter != appliers.end();
103
plugin::TransactionApplier *applier= (*appl_iter).second;
104
string requested_replicator_name= (*appl_iter).first;
105
normalizeReplicatorName(requested_replicator_name);
108
Replicators::iterator repl_iter;
109
for (repl_iter= replicators.begin();
110
repl_iter != replicators.end();
113
string replicator_name= (*repl_iter)->getName();
114
normalizeReplicatorName(replicator_name);
116
if (requested_replicator_name.compare(replicator_name) == 0)
124
errmsg_printf(error::ERROR,
125
N_("You registered a TransactionApplier plugin but no "
126
"TransactionReplicator plugins were registered that match the "
127
"requested replicator name of '%s'.\n"
128
"We have deactivated the TransactionApplier '%s'.\n"),
129
requested_replicator_name.c_str(),
130
applier->getName().c_str());
131
applier->deactivate();
136
replication_streams.push_back(make_pair(*repl_iter, applier));
143
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
145
replicators.push_back(in_replicator);
148
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
150
replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
153
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
155
appliers.push_back(make_pair(requested_replicator_name, in_applier));
158
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
162
bool ReplicationServices::isActive() const
167
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
message::Transaction &to_push)
170
plugin::ReplicationReturnCode result= plugin::SUCCESS;
172
for (ReplicationStreams::iterator iter= replication_streams.begin();
173
iter != replication_streams.end();
176
plugin::TransactionReplicator *cur_repl= (*iter).first;
177
plugin::TransactionApplier *cur_appl= (*iter).second;
179
result= cur_repl->replicate(cur_appl, in_session, to_push);
181
if (result == plugin::SUCCESS)
184
* We update the timestamp for the last applied Transaction so that
185
* publisher plugins can ask the replication services when the
186
* last known applied Transaction was using the getLastAppliedTimestamp()
189
last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
197
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
199
return replication_streams;
202
} /* namespace drizzled */
114
call replicator_session_iterate
115
once for each loaded replicator plugin
117
foreach_rv= plugin_foreach(session, replicator_session_iterate,
118
DRIZZLE_REPLICATOR_PLUGIN, NULL);
123
/* The plugin_foreach() iterator requires that we
124
convert all the parameters of a plugin api entry point
125
into just one single void ptr, plus the session.
126
So we will take all the additional paramters of replicator_do2,
127
and marshall them into a struct of this type, and
128
then just pass in a pointer to it.
130
enum repl_row_exec_t{
136
typedef struct replicator_row_parms_st
138
repl_row_exec_t type;
140
const unsigned char *before;
141
const unsigned char *after;
142
} replicator_row_parms_st;
145
/* This gets called by plugin_foreach once for each loaded replicator plugin */
146
static bool replicator_do_row_iterate (Session *session, plugin_ref plugin, void *p)
148
replicator_t *repl= plugin_data(plugin, replicator_t *);
149
replicator_row_parms_st *params= (replicator_row_parms_st *) p;
151
switch (params->type) {
153
if (repl && repl->row_insert)
155
if (repl->row_insert(session, params->table))
157
/* TRANSLATORS: The leading word "replicator" is the name
158
of the plugin api, and so should not be translated. */
159
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_insert() failed"),
160
(char *)plugin_name(plugin));
167
if (repl && repl->row_update)
169
if (repl->row_update(session, params->table, params->before, params->after))
171
/* TRANSLATORS: The leading word "replicator" is the name
172
of the plugin api, and so should not be translated. */
173
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_update() failed"),
174
(char *)plugin_name(plugin));
181
if (repl && repl->row_delete)
183
if (repl->row_delete(session, params->table))
185
/* TRANSLATORS: The leading word "replicator" is the name
186
of the plugin api, and so should not be translated. */
187
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_delete() failed"),
188
(char *)plugin_name(plugin));
198
/* This is the replicator_do2 entry point.
199
This gets called by the rest of the Drizzle server code */
200
static bool replicator_do_row (Session *session, replicator_row_parms_st *params)
204
foreach_rv= plugin_foreach(session, replicator_do_row_iterate,
205
DRIZZLE_REPLICATOR_PLUGIN, params);
209
bool replicator_write_row(Session *session, Table *table)
211
replicator_row_parms_st param;
213
param.type= repl_insert;
218
return replicator_do_row(session, ¶m);
221
bool replicator_update_row(Session *session, Table *table,
222
const unsigned char *before,
223
const unsigned char *after)
225
replicator_row_parms_st param;
227
param.type= repl_update;
230
param.before= before;
232
return replicator_do_row(session, ¶m);
235
bool replicator_delete_row(Session *session, Table *table)
237
replicator_row_parms_st param;
239
param.type= repl_delete;
244
return replicator_do_row(session, ¶m);
250
Ok, not so much dragons, but this is where we handle either commits or rollbacks of
253
typedef struct replicator_row_end_st
257
} replicator_row_end_st;
259
/* We call this to end a statement (on each registered plugin) */
260
static bool replicator_end_transaction_iterate (Session *session, plugin_ref plugin, void *p)
262
replicator_t *repl= plugin_data(plugin, replicator_t *);
263
replicator_row_end_st *params= (replicator_row_end_st *)p;
265
/* call this loaded replicator plugin's replicator_func1 function pointer */
266
if (repl && repl->end_transaction)
268
if (repl->end_transaction(session, params->autocommit, params->commit))
270
/* TRANSLATORS: The leading word "replicator" is the name
271
of the plugin api, and so should not be translated. */
272
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' end_transaction() failed"),
273
(char *)plugin_name(plugin));
281
bool replicator_end_transaction(Session *session, bool autocommit, bool commit)
284
replicator_row_end_st params;
286
params.autocommit= autocommit;
287
params.commit= commit;
289
/* We need to free any data we did an init of for the session */
290
foreach_rv= plugin_foreach(session, replicator_end_transaction_iterate,
291
DRIZZLE_REPLICATOR_PLUGIN, (void *) ¶ms);
297
If you can do real 2PC this is your hook poing to know that the event is coming.
299
Always true for the moment.
302
bool replicator_prepare(Session *)
310
typedef struct replicator_statement_st
314
} replicator_statement_st;
316
/* We call this to end a statement (on each registered plugin) */
317
static bool replicator_statement_iterate(Session *session, plugin_ref plugin, void *p)
319
replicator_t *repl= plugin_data(plugin, replicator_t *);
320
replicator_statement_st *params= (replicator_statement_st *)p;
322
/* call this loaded replicator plugin's replicator_func1 function pointer */
323
if (repl && repl->statement)
325
if (repl->statement(session, params->query, params->query_length))
327
/* TRANSLATORS: The leading word "replicator" is the name
328
of the plugin api, and so should not be translated. */
329
errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' statement() failed"),
330
(char *)plugin_name(plugin));
338
bool replicator_statement(Session *session, const char *query, size_t query_length)
341
replicator_statement_st params;
344
params.query_length= query_length;
346
/* We need to free any data we did an init of for the session */
347
foreach_rv= plugin_foreach(session, replicator_statement_iterate,
348
DRIZZLE_REPLICATOR_PLUGIN, (void *) ¶ms);