~drizzle-trunk/drizzle/development

636.1.1 by Mark Atwood
add replicator plugin type
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2008 Mark Atwood
5
 *
6
 *  This program is free software; you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation; version 2 of the License.
9
 *
10
 *  This program is distributed in the hope that it will be useful,
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 *  GNU General Public License for more details.
14
 *
15
 *  You should have received a copy of the GNU General Public License
16
 *  along with this program; if not, write to the Free Software
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 */
19
20
#include <drizzled/server_includes.h>
21
#include <drizzled/replicator.h>
22
#include <drizzled/gettext.h>
661 by Brian Aker
First major pass through new replication.
23
#include <drizzled/session.h>
636.1.1 by Mark Atwood
add replicator plugin type
24
25
int replicator_initializer(st_plugin_int *plugin)
26
{
27
  replicator_t *p;
28
683.1.1 by Mark Atwood
use new/delete instead of malloc/free for plugin structs
29
  p= new replicator_t;
636.1.1 by Mark Atwood
add replicator plugin type
30
  if (p == NULL) return 1;
31
  memset(p, 0, sizeof(replicator_t));
32
33
  plugin->data= (void *)p;
34
35
  if (plugin->plugin->init)
667 by Brian Aker
Partial fix (more of replication flushed out)
36
  {
37
    if (plugin->plugin->init((void *)p))
636.1.1 by Mark Atwood
add replicator plugin type
38
    {
667 by Brian Aker
Partial fix (more of replication flushed out)
39
      /* TRANSLATORS: The leading word "replicator" is the name
40
        of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
41
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' init() failed"),
667 by Brian Aker
Partial fix (more of replication flushed out)
42
                      plugin->name.str);
43
      goto err;
636.1.1 by Mark Atwood
add replicator plugin type
44
    }
667 by Brian Aker
Partial fix (more of replication flushed out)
45
  }
810 by Brian Aker
Fix for making sure I_S has good information about which plugins are
46
  plugin->state= PLUGIN_IS_READY;
47
636.1.1 by Mark Atwood
add replicator plugin type
48
  return 0;
49
50
 err:
683.1.1 by Mark Atwood
use new/delete instead of malloc/free for plugin structs
51
  delete p;
636.1.1 by Mark Atwood
add replicator plugin type
52
  return 1;
53
}
54
55
int replicator_finalizer(st_plugin_int *plugin)
660.1.3 by Eric Herman
removed trailing whitespace with simple script:
56
{
636.1.1 by Mark Atwood
add replicator plugin type
57
  replicator_t *p= (replicator_t *) plugin->data;
58
59
  if (plugin->plugin->deinit)
60
    {
61
      if (plugin->plugin->deinit((void *)p))
62
        {
63
          /* TRANSLATORS: The leading word "replicator" is the name
64
             of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
65
          errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' deinit() failed"),
636.1.1 by Mark Atwood
add replicator plugin type
66
                          plugin->name.str);
67
        }
68
    }
69
683.1.1 by Mark Atwood
use new/delete instead of malloc/free for plugin structs
70
  if (p) delete p;
636.1.1 by Mark Atwood
add replicator plugin type
71
72
  return 0;
73
}
74
75
/* This gets called by plugin_foreach once for each loaded replicator plugin */
667 by Brian Aker
Partial fix (more of replication flushed out)
76
static bool replicator_session_iterate(Session *session, plugin_ref plugin, void *)
636.1.1 by Mark Atwood
add replicator plugin type
77
{
661 by Brian Aker
First major pass through new replication.
78
  replicator_t *repl= plugin_data(plugin, replicator_t *);
667 by Brian Aker
Partial fix (more of replication flushed out)
79
  bool error;
636.1.1 by Mark Atwood
add replicator plugin type
80
81
  /* call this loaded replicator plugin's replicator_func1 function pointer */
661 by Brian Aker
First major pass through new replication.
82
  if (repl && repl->session_init)
667 by Brian Aker
Partial fix (more of replication flushed out)
83
  {
84
    error= repl->session_init(session);
85
    if (error)
636.1.1 by Mark Atwood
add replicator plugin type
86
    {
667 by Brian Aker
Partial fix (more of replication flushed out)
87
      /* TRANSLATORS: The leading word "replicator" is the name
88
        of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
89
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' session_init() failed"),
667 by Brian Aker
Partial fix (more of replication flushed out)
90
                      (char *)plugin_name(plugin));
91
      return true;
636.1.1 by Mark Atwood
add replicator plugin type
92
    }
667 by Brian Aker
Partial fix (more of replication flushed out)
93
  }
661 by Brian Aker
First major pass through new replication.
94
636.1.1 by Mark Atwood
add replicator plugin type
95
  return false;
96
}
97
661 by Brian Aker
First major pass through new replication.
98
/*
99
  This call is called once at the begining of each transaction.
100
*/
667 by Brian Aker
Partial fix (more of replication flushed out)
101
extern handlerton *binlog_hton;
661 by Brian Aker
First major pass through new replication.
102
bool replicator_session_init(Session *session)
636.1.1 by Mark Atwood
add replicator plugin type
103
{
104
  bool foreach_rv;
660.1.3 by Eric Herman
removed trailing whitespace with simple script:
105
667 by Brian Aker
Partial fix (more of replication flushed out)
106
  if (session->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
107
    trans_register_ha(session, true, binlog_hton);
108
  trans_register_ha(session, false, binlog_hton);
109
110
  if (session->getReplicationData())
111
    return false;
112
661 by Brian Aker
First major pass through new replication.
113
  /* 
114
    call replicator_session_iterate
665.1.1 by Eric Herman
more EOL whitespace fixups
115
    once for each loaded replicator plugin
661 by Brian Aker
First major pass through new replication.
116
  */
117
  foreach_rv= plugin_foreach(session, replicator_session_iterate,
118
                             DRIZZLE_REPLICATOR_PLUGIN, NULL);
667 by Brian Aker
Partial fix (more of replication flushed out)
119
636.1.1 by Mark Atwood
add replicator plugin type
120
  return foreach_rv;
121
}
122
123
/* The plugin_foreach() iterator requires that we
124
   convert all the parameters of a plugin api entry point
125
   into just one single void ptr, plus the session.
126
   So we will take all the additional paramters of replicator_do2,
127
   and marshall them into a struct of this type, and
128
   then just pass in a pointer to it.
129
*/
661 by Brian Aker
First major pass through new replication.
130
enum repl_row_exec_t{
131
  repl_insert,
132
  repl_update,
133
  repl_delete
134
};
135
136
typedef struct replicator_row_parms_st
636.1.1 by Mark Atwood
add replicator plugin type
137
{
661 by Brian Aker
First major pass through new replication.
138
  repl_row_exec_t type;
139
  Table *table;
140
  const unsigned char *before;
141
  const unsigned char *after;
142
} replicator_row_parms_st;
143
636.1.1 by Mark Atwood
add replicator plugin type
144
145
/* This gets called by plugin_foreach once for each loaded replicator plugin */
661 by Brian Aker
First major pass through new replication.
146
static bool replicator_do_row_iterate (Session *session, plugin_ref plugin, void *p)
636.1.1 by Mark Atwood
add replicator plugin type
147
{
661 by Brian Aker
First major pass through new replication.
148
  replicator_t *repl= plugin_data(plugin, replicator_t *);
149
  replicator_row_parms_st *params= (replicator_row_parms_st *) p;
150
151
  switch (params->type) {
152
  case repl_insert:
153
    if (repl && repl->row_insert)
154
    {
155
      if (repl->row_insert(session, params->table))
156
      {
157
        /* TRANSLATORS: The leading word "replicator" is the name
158
          of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
159
        errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_insert() failed"),
661 by Brian Aker
First major pass through new replication.
160
                        (char *)plugin_name(plugin));
161
162
        return true;
163
      }
164
    }
165
    break;
166
  case repl_update:
167
    if (repl && repl->row_update)
168
    {
169
      if (repl->row_update(session, params->table, params->before, params->after))
170
      {
171
        /* TRANSLATORS: The leading word "replicator" is the name
172
          of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
173
        errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_update() failed"),
661 by Brian Aker
First major pass through new replication.
174
                        (char *)plugin_name(plugin));
175
176
        return true;
177
      }
178
    }
179
    break;
180
  case repl_delete:
181
    if (repl && repl->row_delete)
182
    {
183
      if (repl->row_delete(session, params->table))
184
      {
185
        /* TRANSLATORS: The leading word "replicator" is the name
186
          of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
187
        errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_delete() failed"),
661 by Brian Aker
First major pass through new replication.
188
                        (char *)plugin_name(plugin));
189
190
        return true;
191
      }
192
    }
193
    break;
194
  }
636.1.1 by Mark Atwood
add replicator plugin type
195
  return false;
196
}
197
198
/* This is the replicator_do2 entry point.
199
   This gets called by the rest of the Drizzle server code */
667 by Brian Aker
Partial fix (more of replication flushed out)
200
static bool replicator_do_row (Session *session, replicator_row_parms_st *params)
636.1.1 by Mark Atwood
add replicator plugin type
201
{
202
  bool foreach_rv;
203
661 by Brian Aker
First major pass through new replication.
204
  foreach_rv= plugin_foreach(session, replicator_do_row_iterate,
667 by Brian Aker
Partial fix (more of replication flushed out)
205
                             DRIZZLE_REPLICATOR_PLUGIN, params);
636.1.1 by Mark Atwood
add replicator plugin type
206
  return foreach_rv;
207
}
661 by Brian Aker
First major pass through new replication.
208
209
bool replicator_write_row(Session *session, Table *table)
210
{
211
  replicator_row_parms_st param;
212
213
  param.type= repl_insert;
214
  param.table= table;
215
  param.after= NULL;
216
  param.before= NULL;
217
218
  return replicator_do_row(session, &param);
219
}
220
665.1.1 by Eric Herman
more EOL whitespace fixups
221
bool replicator_update_row(Session *session, Table *table,
222
                           const unsigned char *before,
661 by Brian Aker
First major pass through new replication.
223
                           const unsigned char *after)
224
{
225
  replicator_row_parms_st param;
226
227
  param.type= repl_update;
228
  param.table= table;
229
  param.after= after;
230
  param.before= before;
231
232
  return replicator_do_row(session, &param);
233
}
234
235
bool replicator_delete_row(Session *session, Table *table)
236
{
237
  replicator_row_parms_st param;
238
239
  param.type= repl_delete;
240
  param.table= table;
241
  param.after= NULL;
242
  param.before= NULL;
243
244
  return replicator_do_row(session, &param);
245
}
246
665.1.1 by Eric Herman
more EOL whitespace fixups
247
/*
663 by Brian Aker
End points on new replication (aka end transaction)
248
  Here be Dragons!
249
665.1.1 by Eric Herman
more EOL whitespace fixups
250
  Ok, not so much dragons, but this is where we handle either commits or rollbacks of
251
  statements.
663 by Brian Aker
End points on new replication (aka end transaction)
252
*/
253
typedef struct replicator_row_end_st
254
{
255
  bool autocommit;
256
  bool commit;
257
} replicator_row_end_st;
258
259
/* We call this to end a statement (on each registered plugin) */
667 by Brian Aker
Partial fix (more of replication flushed out)
260
static bool replicator_end_transaction_iterate (Session *session, plugin_ref plugin, void *p)
663 by Brian Aker
End points on new replication (aka end transaction)
261
{
262
  replicator_t *repl= plugin_data(plugin, replicator_t *);
263
  replicator_row_end_st *params= (replicator_row_end_st *)p;
264
265
  /* call this loaded replicator plugin's replicator_func1 function pointer */
266
  if (repl && repl->end_transaction)
267
  {
268
    if (repl->end_transaction(session, params->autocommit, params->commit))
269
    {
270
      /* TRANSLATORS: The leading word "replicator" is the name
271
        of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
272
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' end_transaction() failed"),
663 by Brian Aker
End points on new replication (aka end transaction)
273
                      (char *)plugin_name(plugin));
274
      return true;
275
    }
276
  }
277
278
  return false;
279
}
280
281
bool replicator_end_transaction(Session *session, bool autocommit, bool commit)
282
{
283
  bool foreach_rv;
284
  replicator_row_end_st params;
665.1.1 by Eric Herman
more EOL whitespace fixups
285
663 by Brian Aker
End points on new replication (aka end transaction)
286
  params.autocommit= autocommit;
287
  params.commit= commit;
288
289
  /* We need to free any data we did an init of for the session */
667 by Brian Aker
Partial fix (more of replication flushed out)
290
  foreach_rv= plugin_foreach(session, replicator_end_transaction_iterate,
663 by Brian Aker
End points on new replication (aka end transaction)
291
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
292
293
  return foreach_rv;
294
}
664 by Brian Aker
Completing up replication API.
295
296
/*
297
  If you can do real 2PC this is your hook poing to know that the event is coming.
298
299
  Always true for the moment.
300
301
*/
302
bool replicator_prepare(Session *)
303
{
304
  return false;
305
}
666 by Brian Aker
Updating new replication positions in code.
306
307
/*
308
  Replicate statement.
309
*/
667 by Brian Aker
Partial fix (more of replication flushed out)
310
typedef struct replicator_statement_st
311
{
312
  const char *query;
313
  size_t query_length;
314
} replicator_statement_st;
315
316
/* We call this to end a statement (on each registered plugin) */
317
static bool replicator_statement_iterate(Session *session, plugin_ref plugin, void *p)
318
{
319
  replicator_t *repl= plugin_data(plugin, replicator_t *);
320
  replicator_statement_st *params= (replicator_statement_st *)p;
321
322
  /* call this loaded replicator plugin's replicator_func1 function pointer */
323
  if (repl && repl->statement)
324
  {
325
    if (repl->statement(session, params->query, params->query_length))
326
    {
327
      /* TRANSLATORS: The leading word "replicator" is the name
328
        of the plugin api, and so should not be translated. */
755.2.1 by Mark Atwood
replace sql_print_error etc with errmsg_print
329
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' statement() failed"),
667 by Brian Aker
Partial fix (more of replication flushed out)
330
                      (char *)plugin_name(plugin));
331
      return true;
332
    }
333
  }
334
666 by Brian Aker
Updating new replication positions in code.
335
  return false;
336
}
667 by Brian Aker
Partial fix (more of replication flushed out)
337
338
bool replicator_statement(Session *session, const char *query, size_t query_length)
339
{
340
  bool foreach_rv;
341
  replicator_statement_st params;
342
  
343
  params.query= query;
344
  params.query_length= query_length;
345
346
  /* We need to free any data we did an init of for the session */
347
  foreach_rv= plugin_foreach(session, replicator_statement_iterate,
348
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
349
350
  return foreach_rv;
351
}