~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replicator.cc

Moved the last of the libdrizzleclient calls into Protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008 Mark Atwood
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; version 2 of the License.
 
9
 *
 
10
 *  This program is distributed in the hope that it will be useful,
 
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 *  GNU General Public License for more details.
 
14
 *
 
15
 *  You should have received a copy of the GNU General Public License
 
16
 *  along with this program; if not, write to the Free Software
 
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 */
 
19
 
 
20
#include <drizzled/server_includes.h>
 
21
#include <drizzled/replicator.h>
 
22
#include <drizzled/gettext.h>
 
23
#include <drizzled/session.h>
 
24
 
 
25
int replicator_initializer(st_plugin_int *plugin)
 
26
{
 
27
  Replicator *p= NULL;
 
28
 
 
29
  if (plugin->plugin->init)
 
30
  {
 
31
    if (plugin->plugin->init((void *)&p))
 
32
    {
 
33
      /* TRANSLATORS: The leading word "replicator" is the name
 
34
        of the plugin api, and so should not be translated. */
 
35
      errmsg_printf(ERRMSG_LVL_ERROR,
 
36
                    _("replicator plugin '%s' init() failed"),
 
37
                    plugin->name.str);
 
38
      return 1;
 
39
    }
 
40
  }
 
41
 
 
42
  plugin->data= (void *)p;
 
43
 
 
44
  return 0;
 
45
}
 
46
 
 
47
int replicator_finalizer(st_plugin_int *plugin)
 
48
{
 
49
  Replicator *p= static_cast<Replicator *>(plugin->data);
 
50
 
 
51
  if (plugin->plugin->deinit)
 
52
    {
 
53
      if (plugin->plugin->deinit((void *)p))
 
54
        {
 
55
          /* TRANSLATORS: The leading word "replicator" is the name
 
56
             of the plugin api, and so should not be translated. */
 
57
          errmsg_printf(ERRMSG_LVL_ERROR,
 
58
                        _("replicator plugin '%s' deinit() failed"),
 
59
                        plugin->name.str);
 
60
        }
 
61
    }
 
62
 
 
63
  return 0;
 
64
}
 
65
 
 
66
/* This gets called by plugin_foreach once for each loaded replicator plugin */
 
67
static bool replicator_session_iterate(Session *session, plugin_ref plugin, void *)
 
68
{
 
69
  Replicator *repl= plugin_data(plugin, Replicator *);
 
70
  bool error;
 
71
 
 
72
  /* call this loaded replicator plugin's session_init method */
 
73
  if (repl)
 
74
  {
 
75
    error= repl->session_init(session);
 
76
    if (error)
 
77
    {
 
78
      /* TRANSLATORS: The leading word "replicator" is the name
 
79
        of the plugin api, and so should not be translated. */
 
80
      errmsg_printf(ERRMSG_LVL_ERROR,
 
81
                    _("replicator plugin '%s' session_init() failed"),
 
82
                    (char *)plugin_name(plugin));
 
83
      return true;
 
84
    }
 
85
  }
 
86
 
 
87
  return false;
 
88
}
 
89
 
 
90
/*
 
91
  This call is called once at the begining of each transaction.
 
92
*/
 
93
extern StorageEngine *binlog_engine;
 
94
bool replicator_session_init(Session *session)
 
95
{
 
96
  bool foreach_rv;
 
97
 
 
98
  if (session->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
99
    trans_register_ha(session, true, binlog_engine);
 
100
  trans_register_ha(session, false, binlog_engine);
 
101
 
 
102
  if (session->getReplicationData())
 
103
    return false;
 
104
 
 
105
  /* 
 
106
    call replicator_session_iterate
 
107
    once for each loaded replicator plugin
 
108
  */
 
109
  foreach_rv= plugin_foreach(session, replicator_session_iterate,
 
110
                             DRIZZLE_REPLICATOR_PLUGIN, NULL);
 
111
 
 
112
  return foreach_rv;
 
113
}
 
114
 
 
115
/* The plugin_foreach() iterator requires that we
 
116
   convert all the parameters of a plugin api entry point
 
117
   into just one single void ptr, plus the session.
 
118
   So we will take all the additional paramters of replicator_do2,
 
119
   and marshall them into a struct of this type, and
 
120
   then just pass in a pointer to it.
 
121
*/
 
122
enum repl_row_exec_t{
 
123
  repl_insert,
 
124
  repl_update,
 
125
  repl_delete
 
126
};
 
127
 
 
128
typedef struct replicator_row_parms_st
 
129
{
 
130
  repl_row_exec_t type;
 
131
  Table *table;
 
132
  const unsigned char *before;
 
133
  const unsigned char *after;
 
134
} replicator_row_parms_st;
 
135
 
 
136
 
 
137
/* This gets called by plugin_foreach once for each loaded replicator plugin */
 
138
static bool replicator_do_row_iterate (Session *session, plugin_ref plugin, void *p)
 
139
{
 
140
  Replicator *repl= plugin_data(plugin, Replicator *);
 
141
  replicator_row_parms_st *params= static_cast<replicator_row_parms_st *>(p);
 
142
 
 
143
  switch (params->type) {
 
144
  case repl_insert:
 
145
    if (repl)
 
146
    {
 
147
      if (repl->row_insert(session, params->table))
 
148
      {
 
149
        /* TRANSLATORS: The leading word "replicator" is the name
 
150
          of the plugin api, and so should not be translated. */
 
151
        errmsg_printf(ERRMSG_LVL_ERROR,
 
152
                      _("replicator plugin '%s' row_insert() failed"),
 
153
                     (char *)plugin_name(plugin));
 
154
 
 
155
        return true;
 
156
      }
 
157
    }
 
158
    break;
 
159
  case repl_update:
 
160
    if (repl)
 
161
    {
 
162
      if (repl->row_update(session, params->table,
 
163
                           params->before, params->after))
 
164
      {
 
165
        /* TRANSLATORS: The leading word "replicator" is the name
 
166
          of the plugin api, and so should not be translated. */
 
167
        errmsg_printf(ERRMSG_LVL_ERROR,
 
168
                      _("replicator plugin '%s' row_update() failed"),
 
169
                      (char *)plugin_name(plugin));
 
170
 
 
171
        return true;
 
172
      }
 
173
    }
 
174
    break;
 
175
  case repl_delete:
 
176
    if (repl)
 
177
    {
 
178
      if (repl->row_delete(session, params->table))
 
179
      {
 
180
        /* TRANSLATORS: The leading word "replicator" is the name
 
181
          of the plugin api, and so should not be translated. */
 
182
        errmsg_printf(ERRMSG_LVL_ERROR,
 
183
                      _("replicator plugin '%s' row_delete() failed"),
 
184
                      (char *)plugin_name(plugin));
 
185
 
 
186
        return true;
 
187
      }
 
188
    }
 
189
    break;
 
190
  }
 
191
  return false;
 
192
}
 
193
 
 
194
/* This is the replicator_do_row entry point.
 
195
   This gets called by the rest of the Drizzle server code */
 
196
static bool replicator_do_row (Session *session,
 
197
                               replicator_row_parms_st *params)
 
198
{
 
199
  bool foreach_rv;
 
200
 
 
201
  foreach_rv= plugin_foreach(session, replicator_do_row_iterate,
 
202
                             DRIZZLE_REPLICATOR_PLUGIN, params);
 
203
  return foreach_rv;
 
204
}
 
205
 
 
206
bool replicator_write_row(Session *session, Table *table)
 
207
{
 
208
  replicator_row_parms_st param;
 
209
 
 
210
  param.type= repl_insert;
 
211
  param.table= table;
 
212
  param.after= NULL;
 
213
  param.before= NULL;
 
214
 
 
215
  return replicator_do_row(session, &param);
 
216
}
 
217
 
 
218
bool replicator_update_row(Session *session, Table *table,
 
219
                           const unsigned char *before,
 
220
                           const unsigned char *after)
 
221
{
 
222
  replicator_row_parms_st param;
 
223
 
 
224
  param.type= repl_update;
 
225
  param.table= table;
 
226
  param.after= after;
 
227
  param.before= before;
 
228
 
 
229
  return replicator_do_row(session, &param);
 
230
}
 
231
 
 
232
bool replicator_delete_row(Session *session, Table *table)
 
233
{
 
234
  replicator_row_parms_st param;
 
235
 
 
236
  param.type= repl_delete;
 
237
  param.table= table;
 
238
  param.after= NULL;
 
239
  param.before= NULL;
 
240
 
 
241
  return replicator_do_row(session, &param);
 
242
}
 
243
 
 
244
/*
 
245
  Here be Dragons!
 
246
 
 
247
  Ok, not so much dragons, but this is where we handle either commits or rollbacks of
 
248
  statements.
 
249
*/
 
250
typedef struct replicator_row_end_st
 
251
{
 
252
  bool autocommit;
 
253
  bool commit;
 
254
} replicator_row_end_st;
 
255
 
 
256
/* We call this to end a statement (on each registered plugin) */
 
257
static bool replicator_end_transaction_iterate (Session *session, plugin_ref plugin, void *p)
 
258
{
 
259
  Replicator *repl= plugin_data(plugin, Replicator *);
 
260
  replicator_row_end_st *params= static_cast<replicator_row_end_st *>(p);
 
261
 
 
262
  /* call this loaded replicator plugin's replicator_func1 function pointer */
 
263
  if (repl)
 
264
  {
 
265
    if (repl->end_transaction(session, params->autocommit, params->commit))
 
266
    {
 
267
      /* TRANSLATORS: The leading word "replicator" is the name
 
268
        of the plugin api, and so should not be translated. */
 
269
      errmsg_printf(ERRMSG_LVL_ERROR,
 
270
                    _("replicator plugin '%s' end_transaction() failed"),
 
271
                    (char *)plugin_name(plugin));
 
272
      return true;
 
273
    }
 
274
  }
 
275
 
 
276
  return false;
 
277
}
 
278
 
 
279
bool replicator_end_transaction(Session *session, bool autocommit, bool commit)
 
280
{
 
281
  bool foreach_rv;
 
282
  replicator_row_end_st params;
 
283
 
 
284
  params.autocommit= autocommit;
 
285
  params.commit= commit;
 
286
 
 
287
  /* We need to free any data we did an init of for the session */
 
288
  foreach_rv= plugin_foreach(session, replicator_end_transaction_iterate,
 
289
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
 
290
 
 
291
  return foreach_rv;
 
292
}
 
293
 
 
294
/*
 
295
  If you can do real 2PC this is your hook poing to know that the event is coming.
 
296
 
 
297
  Always true for the moment.
 
298
 
 
299
*/
 
300
bool replicator_prepare(Session *)
 
301
{
 
302
  return false;
 
303
}
 
304
 
 
305
/*
 
306
  Replicate statement.
 
307
*/
 
308
typedef struct replicator_statement_st
 
309
{
 
310
  const char *query;
 
311
  size_t query_length;
 
312
} replicator_statement_st;
 
313
 
 
314
/* We call this to end a statement (on each registered plugin) */
 
315
static bool replicator_statement_iterate(Session *session, plugin_ref plugin, void *p)
 
316
{
 
317
  Replicator *repl= plugin_data(plugin, Replicator *);
 
318
  replicator_statement_st *params= (replicator_statement_st *)p;
 
319
 
 
320
  /* call this loaded replicator plugin's replicator_func1 function pointer */
 
321
  if (repl)
 
322
  {
 
323
    if (repl->statement(session, params->query, params->query_length))
 
324
    {
 
325
      /* TRANSLATORS: The leading word "replicator" is the name
 
326
        of the plugin api, and so should not be translated. */
 
327
      errmsg_printf(ERRMSG_LVL_ERROR,
 
328
                    _("replicator plugin '%s' statement() failed"),
 
329
                    (char *)plugin_name(plugin));
 
330
      return true;
 
331
    }
 
332
  }
 
333
 
 
334
  return false;
 
335
}
 
336
 
 
337
bool replicator_statement(Session *session, const char *query, size_t query_length)
 
338
{
 
339
  bool foreach_rv;
 
340
  replicator_statement_st params;
 
341
  
 
342
  params.query= query;
 
343
  params.query_length= query_length;
 
344
 
 
345
  /* We need to free any data we did an init of for the session */
 
346
  foreach_rv= plugin_foreach(session, replicator_statement_iterate,
 
347
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
 
348
 
 
349
  return foreach_rv;
 
350
}