~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replicator.cc

  • Committer: Monty Taylor
  • Date: 2009-03-18 18:45:23 UTC
  • mto: (950.1.1 mordred)
  • mto: This revision was merged to the branch mainline in revision 943.
  • Revision ID: mordred@inaugust.com-20090318184523-mfbjyj5wkipv4n3b
Moved big tests to big suite. Added make target "make test-big" to allow for easy running of the big tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  Authors:
8
 
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008 Mark Atwood
10
5
 *
11
6
 *  This program is free software; you can redistribute it and/or modify
12
7
 *  it under the terms of the GNU General Public License as published by
22
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
18
 */
24
19
 
25
 
/**
26
 
 * @file Server-side utility which is responsible for managing the 
27
 
 * communication between the kernel and the replication plugins:
28
 
 *
29
 
 * - TransactionReplicator
30
 
 * - TransactionApplier
31
 
 * - Publisher
32
 
 * - Subscriber
33
 
 *
34
 
 * ReplicationServices is a bridge between replication modules and the kernel,
35
 
 * and its primary function is to  */
36
 
 
37
 
#include <config.h>
38
 
#include <drizzled/replication_services.h>
39
 
#include <drizzled/plugin/transaction_replicator.h>
40
 
#include <drizzled/plugin/transaction_applier.h>
41
 
#include <drizzled/message/transaction.pb.h>
 
20
#include <drizzled/server_includes.h>
 
21
#include <drizzled/replicator.h>
42
22
#include <drizzled/gettext.h>
43
23
#include <drizzled/session.h>
44
 
#include <drizzled/error.h>
45
 
 
46
 
#include <string>
47
 
#include <vector>
48
 
#include <algorithm>
49
 
 
50
 
using namespace std;
51
 
 
52
 
namespace drizzled
53
 
{
54
 
 
55
 
ReplicationServices::ReplicationServices() :
56
 
  is_active(false)
57
 
{
58
 
}
59
 
 
60
 
void ReplicationServices::normalizeReplicatorName(string &name)
61
 
{
62
 
  transform(name.begin(),
63
 
            name.end(),
64
 
            name.begin(),
65
 
            ::tolower);
66
 
  if (name.find("replicator") == string::npos)
67
 
    name.append("replicator", 10);
68
 
  {
69
 
    size_t found_underscore= name.find('_');
70
 
    while (found_underscore != string::npos)
71
 
    {
72
 
      name.erase(found_underscore, 1);
73
 
      found_underscore= name.find('_');
74
 
    }
75
 
  }
76
 
}
77
 
 
78
 
bool ReplicationServices::evaluateRegisteredPlugins()
79
 
{
 
24
 
 
25
int replicator_initializer(st_plugin_int *plugin)
 
26
{
 
27
  replicator_t *p;
 
28
 
 
29
  p= new replicator_t;
 
30
  if (p == NULL) return 1;
 
31
  memset(p, 0, sizeof(replicator_t));
 
32
 
 
33
  plugin->data= (void *)p;
 
34
 
 
35
  if (plugin->plugin->init)
 
36
  {
 
37
    if (plugin->plugin->init((void *)p))
 
38
    {
 
39
      /* TRANSLATORS: The leading word "replicator" is the name
 
40
        of the plugin api, and so should not be translated. */
 
41
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' init() failed"),
 
42
                      plugin->name.str);
 
43
      goto err;
 
44
    }
 
45
  }
 
46
  plugin->state= PLUGIN_IS_READY;
 
47
 
 
48
  return 0;
 
49
 
 
50
 err:
 
51
  delete p;
 
52
  return 1;
 
53
}
 
54
 
 
55
int replicator_finalizer(st_plugin_int *plugin)
 
56
{
 
57
  replicator_t *p= (replicator_t *) plugin->data;
 
58
 
 
59
  if (plugin->plugin->deinit)
 
60
    {
 
61
      if (plugin->plugin->deinit((void *)p))
 
62
        {
 
63
          /* TRANSLATORS: The leading word "replicator" is the name
 
64
             of the plugin api, and so should not be translated. */
 
65
          errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' deinit() failed"),
 
66
                          plugin->name.str);
 
67
        }
 
68
    }
 
69
 
 
70
  if (p) delete p;
 
71
 
 
72
  return 0;
 
73
}
 
74
 
 
75
/* This gets called by plugin_foreach once for each loaded replicator plugin */
 
76
static bool replicator_session_iterate(Session *session, plugin_ref plugin, void *)
 
77
{
 
78
  replicator_t *repl= plugin_data(plugin, replicator_t *);
 
79
  bool error;
 
80
 
 
81
  /* call this loaded replicator plugin's replicator_func1 function pointer */
 
82
  if (repl && repl->session_init)
 
83
  {
 
84
    error= repl->session_init(session);
 
85
    if (error)
 
86
    {
 
87
      /* TRANSLATORS: The leading word "replicator" is the name
 
88
        of the plugin api, and so should not be translated. */
 
89
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' session_init() failed"),
 
90
                      (char *)plugin_name(plugin));
 
91
      return true;
 
92
    }
 
93
  }
 
94
 
 
95
  return false;
 
96
}
 
97
 
 
98
/*
 
99
  This call is called once at the begining of each transaction.
 
100
*/
 
101
extern handlerton *binlog_hton;
 
102
bool replicator_session_init(Session *session)
 
103
{
 
104
  bool foreach_rv;
 
105
 
 
106
  if (session->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
107
    trans_register_ha(session, true, binlog_hton);
 
108
  trans_register_ha(session, false, binlog_hton);
 
109
 
 
110
  if (session->getReplicationData())
 
111
    return false;
 
112
 
80
113
  /* 
81
 
   * We loop through appliers that have registered with us
82
 
   * and attempts to pair the applier with its requested
83
 
   * replicator.  If an applier has requested a replicator
84
 
   * that has either not been built or has not registered
85
 
   * with the replication services, we print an error and
86
 
   * return false
87
 
   */
88
 
  if (appliers.empty())
89
 
    return true;
90
 
 
91
 
  if (replicators.empty() && not appliers.empty())
92
 
  {
93
 
    errmsg_printf(error::ERROR,
94
 
                  N_("You registered a TransactionApplier plugin but no "
95
 
                     "TransactionReplicator plugins were registered.\n"));
96
 
    return false;
97
 
  }
98
 
 
99
 
  for (Appliers::iterator appl_iter= appliers.begin();
100
 
       appl_iter != appliers.end();
101
 
       ++appl_iter)
102
 
  {
103
 
    plugin::TransactionApplier *applier= (*appl_iter).second;
104
 
    string requested_replicator_name= (*appl_iter).first;
105
 
    normalizeReplicatorName(requested_replicator_name);
106
 
 
107
 
    bool found= false;
108
 
    Replicators::iterator repl_iter;
109
 
    for (repl_iter= replicators.begin();
110
 
         repl_iter != replicators.end();
111
 
         ++repl_iter)
112
 
    {
113
 
      string replicator_name= (*repl_iter)->getName();
114
 
      normalizeReplicatorName(replicator_name);
115
 
 
116
 
      if (requested_replicator_name.compare(replicator_name) == 0)
117
 
      {
118
 
        found= true;
119
 
        break;
120
 
      }
121
 
    }
122
 
    if (not found)
123
 
    {
124
 
      errmsg_printf(error::ERROR,
125
 
                    N_("You registered a TransactionApplier plugin but no "
126
 
                       "TransactionReplicator plugins were registered that match the "
127
 
                       "requested replicator name of '%s'.\n"
128
 
                       "We have deactivated the TransactionApplier '%s'.\n"),
129
 
                       requested_replicator_name.c_str(),
130
 
                       applier->getName().c_str());
131
 
      applier->deactivate();
132
 
      return false;
133
 
    }
134
 
    else
135
 
    {
136
 
      replication_streams.push_back(make_pair(*repl_iter, applier));
137
 
    }
138
 
  }
139
 
  is_active= true;
140
 
  return true;
141
 
}
142
 
 
143
 
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
144
 
{
145
 
  replicators.push_back(in_replicator);
146
 
}
147
 
 
148
 
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
149
 
{
150
 
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
151
 
}
152
 
 
153
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
154
 
{
155
 
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
156
 
}
157
 
 
158
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
159
 
{
160
 
}
161
 
 
162
 
bool ReplicationServices::isActive() const
163
 
{
164
 
  return is_active;
165
 
}
166
 
 
167
 
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
 
                                                                          message::Transaction &to_push)
169
 
{
170
 
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
171
 
 
172
 
  for (ReplicationStreams::iterator iter= replication_streams.begin();
173
 
       iter != replication_streams.end();
174
 
       ++iter)
175
 
  {
176
 
    plugin::TransactionReplicator *cur_repl= (*iter).first;
177
 
    plugin::TransactionApplier *cur_appl= (*iter).second;
178
 
 
179
 
    result= cur_repl->replicate(cur_appl, in_session, to_push);
180
 
 
181
 
    if (result == plugin::SUCCESS)
182
 
    {
183
 
      /* 
184
 
       * We update the timestamp for the last applied Transaction so that
185
 
       * publisher plugins can ask the replication services when the
186
 
       * last known applied Transaction was using the getLastAppliedTimestamp()
187
 
       * method.
188
 
       */
189
 
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
190
 
    }
191
 
    else
192
 
      return result;
193
 
  }
194
 
  return result;
195
 
}
196
 
 
197
 
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
198
 
{
199
 
  return replication_streams;
200
 
}
201
 
 
202
 
} /* namespace drizzled */
 
114
    call replicator_session_iterate
 
115
    once for each loaded replicator plugin
 
116
  */
 
117
  foreach_rv= plugin_foreach(session, replicator_session_iterate,
 
118
                             DRIZZLE_REPLICATOR_PLUGIN, NULL);
 
119
 
 
120
  return foreach_rv;
 
121
}
 
122
 
 
123
/* The plugin_foreach() iterator requires that we
 
124
   convert all the parameters of a plugin api entry point
 
125
   into just one single void ptr, plus the session.
 
126
   So we will take all the additional paramters of replicator_do2,
 
127
   and marshall them into a struct of this type, and
 
128
   then just pass in a pointer to it.
 
129
*/
 
130
enum repl_row_exec_t{
 
131
  repl_insert,
 
132
  repl_update,
 
133
  repl_delete
 
134
};
 
135
 
 
136
typedef struct replicator_row_parms_st
 
137
{
 
138
  repl_row_exec_t type;
 
139
  Table *table;
 
140
  const unsigned char *before;
 
141
  const unsigned char *after;
 
142
} replicator_row_parms_st;
 
143
 
 
144
 
 
145
/* This gets called by plugin_foreach once for each loaded replicator plugin */
 
146
static bool replicator_do_row_iterate (Session *session, plugin_ref plugin, void *p)
 
147
{
 
148
  replicator_t *repl= plugin_data(plugin, replicator_t *);
 
149
  replicator_row_parms_st *params= (replicator_row_parms_st *) p;
 
150
 
 
151
  switch (params->type) {
 
152
  case repl_insert:
 
153
    if (repl && repl->row_insert)
 
154
    {
 
155
      if (repl->row_insert(session, params->table))
 
156
      {
 
157
        /* TRANSLATORS: The leading word "replicator" is the name
 
158
          of the plugin api, and so should not be translated. */
 
159
        errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_insert() failed"),
 
160
                        (char *)plugin_name(plugin));
 
161
 
 
162
        return true;
 
163
      }
 
164
    }
 
165
    break;
 
166
  case repl_update:
 
167
    if (repl && repl->row_update)
 
168
    {
 
169
      if (repl->row_update(session, params->table, params->before, params->after))
 
170
      {
 
171
        /* TRANSLATORS: The leading word "replicator" is the name
 
172
          of the plugin api, and so should not be translated. */
 
173
        errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_update() failed"),
 
174
                        (char *)plugin_name(plugin));
 
175
 
 
176
        return true;
 
177
      }
 
178
    }
 
179
    break;
 
180
  case repl_delete:
 
181
    if (repl && repl->row_delete)
 
182
    {
 
183
      if (repl->row_delete(session, params->table))
 
184
      {
 
185
        /* TRANSLATORS: The leading word "replicator" is the name
 
186
          of the plugin api, and so should not be translated. */
 
187
        errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' row_delete() failed"),
 
188
                        (char *)plugin_name(plugin));
 
189
 
 
190
        return true;
 
191
      }
 
192
    }
 
193
    break;
 
194
  }
 
195
  return false;
 
196
}
 
197
 
 
198
/* This is the replicator_do2 entry point.
 
199
   This gets called by the rest of the Drizzle server code */
 
200
static bool replicator_do_row (Session *session, replicator_row_parms_st *params)
 
201
{
 
202
  bool foreach_rv;
 
203
 
 
204
  foreach_rv= plugin_foreach(session, replicator_do_row_iterate,
 
205
                             DRIZZLE_REPLICATOR_PLUGIN, params);
 
206
  return foreach_rv;
 
207
}
 
208
 
 
209
bool replicator_write_row(Session *session, Table *table)
 
210
{
 
211
  replicator_row_parms_st param;
 
212
 
 
213
  param.type= repl_insert;
 
214
  param.table= table;
 
215
  param.after= NULL;
 
216
  param.before= NULL;
 
217
 
 
218
  return replicator_do_row(session, &param);
 
219
}
 
220
 
 
221
bool replicator_update_row(Session *session, Table *table,
 
222
                           const unsigned char *before,
 
223
                           const unsigned char *after)
 
224
{
 
225
  replicator_row_parms_st param;
 
226
 
 
227
  param.type= repl_update;
 
228
  param.table= table;
 
229
  param.after= after;
 
230
  param.before= before;
 
231
 
 
232
  return replicator_do_row(session, &param);
 
233
}
 
234
 
 
235
bool replicator_delete_row(Session *session, Table *table)
 
236
{
 
237
  replicator_row_parms_st param;
 
238
 
 
239
  param.type= repl_delete;
 
240
  param.table= table;
 
241
  param.after= NULL;
 
242
  param.before= NULL;
 
243
 
 
244
  return replicator_do_row(session, &param);
 
245
}
 
246
 
 
247
/*
 
248
  Here be Dragons!
 
249
 
 
250
  Ok, not so much dragons, but this is where we handle either commits or rollbacks of
 
251
  statements.
 
252
*/
 
253
typedef struct replicator_row_end_st
 
254
{
 
255
  bool autocommit;
 
256
  bool commit;
 
257
} replicator_row_end_st;
 
258
 
 
259
/* We call this to end a statement (on each registered plugin) */
 
260
static bool replicator_end_transaction_iterate (Session *session, plugin_ref plugin, void *p)
 
261
{
 
262
  replicator_t *repl= plugin_data(plugin, replicator_t *);
 
263
  replicator_row_end_st *params= (replicator_row_end_st *)p;
 
264
 
 
265
  /* call this loaded replicator plugin's replicator_func1 function pointer */
 
266
  if (repl && repl->end_transaction)
 
267
  {
 
268
    if (repl->end_transaction(session, params->autocommit, params->commit))
 
269
    {
 
270
      /* TRANSLATORS: The leading word "replicator" is the name
 
271
        of the plugin api, and so should not be translated. */
 
272
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' end_transaction() failed"),
 
273
                      (char *)plugin_name(plugin));
 
274
      return true;
 
275
    }
 
276
  }
 
277
 
 
278
  return false;
 
279
}
 
280
 
 
281
bool replicator_end_transaction(Session *session, bool autocommit, bool commit)
 
282
{
 
283
  bool foreach_rv;
 
284
  replicator_row_end_st params;
 
285
 
 
286
  params.autocommit= autocommit;
 
287
  params.commit= commit;
 
288
 
 
289
  /* We need to free any data we did an init of for the session */
 
290
  foreach_rv= plugin_foreach(session, replicator_end_transaction_iterate,
 
291
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
 
292
 
 
293
  return foreach_rv;
 
294
}
 
295
 
 
296
/*
 
297
  If you can do real 2PC this is your hook poing to know that the event is coming.
 
298
 
 
299
  Always true for the moment.
 
300
 
 
301
*/
 
302
bool replicator_prepare(Session *)
 
303
{
 
304
  return false;
 
305
}
 
306
 
 
307
/*
 
308
  Replicate statement.
 
309
*/
 
310
typedef struct replicator_statement_st
 
311
{
 
312
  const char *query;
 
313
  size_t query_length;
 
314
} replicator_statement_st;
 
315
 
 
316
/* We call this to end a statement (on each registered plugin) */
 
317
static bool replicator_statement_iterate(Session *session, plugin_ref plugin, void *p)
 
318
{
 
319
  replicator_t *repl= plugin_data(plugin, replicator_t *);
 
320
  replicator_statement_st *params= (replicator_statement_st *)p;
 
321
 
 
322
  /* call this loaded replicator plugin's replicator_func1 function pointer */
 
323
  if (repl && repl->statement)
 
324
  {
 
325
    if (repl->statement(session, params->query, params->query_length))
 
326
    {
 
327
      /* TRANSLATORS: The leading word "replicator" is the name
 
328
        of the plugin api, and so should not be translated. */
 
329
      errmsg_printf(ERRMSG_LVL_ERROR, _("replicator plugin '%s' statement() failed"),
 
330
                      (char *)plugin_name(plugin));
 
331
      return true;
 
332
    }
 
333
  }
 
334
 
 
335
  return false;
 
336
}
 
337
 
 
338
bool replicator_statement(Session *session, const char *query, size_t query_length)
 
339
{
 
340
  bool foreach_rv;
 
341
  replicator_statement_st params;
 
342
  
 
343
  params.query= query;
 
344
  params.query_length= query_length;
 
345
 
 
346
  /* We need to free any data we did an init of for the session */
 
347
  foreach_rv= plugin_foreach(session, replicator_statement_iterate,
 
348
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
 
349
 
 
350
  return foreach_rv;
 
351
}