~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2009-04-13 16:22:40 UTC
  • mfrom: (971.1.78 mordred)
  • Revision ID: brian@gaz-20090413162240-ugi3gvhofmcuglzl
Merge Monty

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Mark Atwood
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *
 
6
 *  Authors:
 
7
 *
 
8
 *    Jay Pipes <joinfu@sun.com>
5
9
 *
6
10
 *  This program is free software; you can redistribute it and/or modify
7
11
 *  it under the terms of the GNU General Public License as published by
17
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
22
 */
19
23
 
20
 
#include <drizzled/server_includes.h>
21
 
#include <drizzled/replicator.h>
22
 
#include <drizzled/gettext.h>
23
 
#include <drizzled/session.h>
24
 
 
25
 
int replicator_initializer(st_plugin_int *plugin)
26
 
{
27
 
  Replicator *p= NULL;
28
 
 
29
 
  if (plugin->plugin->init)
30
 
  {
31
 
    if (plugin->plugin->init((void *)&p))
32
 
    {
33
 
      /* TRANSLATORS: The leading word "replicator" is the name
34
 
        of the plugin api, and so should not be translated. */
35
 
      errmsg_printf(ERRMSG_LVL_ERROR,
36
 
                    _("replicator plugin '%s' init() failed"),
37
 
                    plugin->name.str);
38
 
      return 1;
39
 
    }
40
 
  }
41
 
 
42
 
  plugin->data= (void *)p;
43
 
 
44
 
  return 0;
45
 
}
46
 
 
47
 
int replicator_finalizer(st_plugin_int *plugin)
48
 
{
49
 
  Replicator *p= static_cast<Replicator *>(plugin->data);
50
 
 
51
 
  if (plugin->plugin->deinit)
52
 
    {
53
 
      if (plugin->plugin->deinit((void *)p))
54
 
        {
55
 
          /* TRANSLATORS: The leading word "replicator" is the name
56
 
             of the plugin api, and so should not be translated. */
57
 
          errmsg_printf(ERRMSG_LVL_ERROR,
58
 
                        _("replicator plugin '%s' deinit() failed"),
59
 
                        plugin->name.str);
60
 
        }
61
 
    }
62
 
 
63
 
  return 0;
64
 
}
65
 
 
66
 
/* This gets called by plugin_foreach once for each loaded replicator plugin */
67
 
static bool replicator_session_iterate(Session *session, st_plugin_int *plugin, void *)
68
 
{
69
 
  Replicator *repl= plugin_data(plugin, Replicator *);
70
 
  bool error;
71
 
 
72
 
  /* call this loaded replicator plugin's session_init method */
73
 
  if (repl)
74
 
  {
75
 
    error= repl->session_init(session);
76
 
    if (error)
77
 
    {
78
 
      /* TRANSLATORS: The leading word "replicator" is the name
79
 
        of the plugin api, and so should not be translated. */
80
 
      errmsg_printf(ERRMSG_LVL_ERROR,
81
 
                    _("replicator plugin '%s' session_init() failed"),
82
 
                    (char *)plugin_name(plugin));
83
 
      return true;
84
 
    }
85
 
  }
86
 
 
87
 
  return false;
88
 
}
89
 
 
90
 
/*
91
 
  This call is called once at the begining of each transaction.
92
 
*/
93
 
extern StorageEngine *binlog_engine;
94
 
bool replicator_session_init(Session *session)
95
 
{
96
 
  bool foreach_rv;
97
 
 
98
 
  if (session->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
99
 
    trans_register_ha(session, true, binlog_engine);
100
 
  trans_register_ha(session, false, binlog_engine);
101
 
 
102
 
  if (session->getReplicationData())
103
 
    return false;
104
 
 
105
 
  /* 
106
 
    call replicator_session_iterate
107
 
    once for each loaded replicator plugin
108
 
  */
109
 
  foreach_rv= plugin_foreach(session, replicator_session_iterate,
110
 
                             DRIZZLE_REPLICATOR_PLUGIN, NULL);
111
 
 
112
 
  return foreach_rv;
113
 
}
114
 
 
115
 
/* The plugin_foreach() iterator requires that we
116
 
   convert all the parameters of a plugin api entry point
117
 
   into just one single void ptr, plus the session.
118
 
   So we will take all the additional paramters of replicator_do2,
119
 
   and marshall them into a struct of this type, and
120
 
   then just pass in a pointer to it.
121
 
*/
122
 
enum repl_row_exec_t{
123
 
  repl_insert,
124
 
  repl_update,
125
 
  repl_delete
126
 
};
127
 
 
128
 
typedef struct replicator_row_parms_st
129
 
{
130
 
  repl_row_exec_t type;
131
 
  Table *table;
132
 
  const unsigned char *before;
133
 
  const unsigned char *after;
134
 
} replicator_row_parms_st;
135
 
 
136
 
 
137
 
/* This gets called by plugin_foreach once for each loaded replicator plugin */
138
 
static bool replicator_do_row_iterate (Session *session, st_plugin_int *plugin, void *p)
139
 
{
140
 
  Replicator *repl= plugin_data(plugin, Replicator *);
141
 
  replicator_row_parms_st *params= static_cast<replicator_row_parms_st *>(p);
142
 
 
143
 
  switch (params->type) {
144
 
  case repl_insert:
145
 
    if (repl)
146
 
    {
147
 
      if (repl->row_insert(session, params->table))
148
 
      {
149
 
        /* TRANSLATORS: The leading word "replicator" is the name
150
 
          of the plugin api, and so should not be translated. */
151
 
        errmsg_printf(ERRMSG_LVL_ERROR,
152
 
                      _("replicator plugin '%s' row_insert() failed"),
153
 
                     (char *)plugin_name(plugin));
154
 
 
155
 
        return true;
156
 
      }
157
 
    }
158
 
    break;
159
 
  case repl_update:
160
 
    if (repl)
161
 
    {
162
 
      if (repl->row_update(session, params->table,
163
 
                           params->before, params->after))
164
 
      {
165
 
        /* TRANSLATORS: The leading word "replicator" is the name
166
 
          of the plugin api, and so should not be translated. */
167
 
        errmsg_printf(ERRMSG_LVL_ERROR,
168
 
                      _("replicator plugin '%s' row_update() failed"),
169
 
                      (char *)plugin_name(plugin));
170
 
 
171
 
        return true;
172
 
      }
173
 
    }
174
 
    break;
175
 
  case repl_delete:
176
 
    if (repl)
177
 
    {
178
 
      if (repl->row_delete(session, params->table))
179
 
      {
180
 
        /* TRANSLATORS: The leading word "replicator" is the name
181
 
          of the plugin api, and so should not be translated. */
182
 
        errmsg_printf(ERRMSG_LVL_ERROR,
183
 
                      _("replicator plugin '%s' row_delete() failed"),
184
 
                      (char *)plugin_name(plugin));
185
 
 
186
 
        return true;
187
 
      }
188
 
    }
189
 
    break;
190
 
  }
191
 
  return false;
192
 
}
193
 
 
194
 
/* This is the replicator_do_row entry point.
195
 
   This gets called by the rest of the Drizzle server code */
196
 
static bool replicator_do_row (Session *session,
197
 
                               replicator_row_parms_st *params)
198
 
{
199
 
  bool foreach_rv;
200
 
 
201
 
  foreach_rv= plugin_foreach(session, replicator_do_row_iterate,
202
 
                             DRIZZLE_REPLICATOR_PLUGIN, params);
203
 
  return foreach_rv;
204
 
}
205
 
 
206
 
bool replicator_write_row(Session *session, Table *table)
207
 
{
208
 
  replicator_row_parms_st param;
209
 
 
210
 
  param.type= repl_insert;
211
 
  param.table= table;
212
 
  param.after= NULL;
213
 
  param.before= NULL;
214
 
 
215
 
  return replicator_do_row(session, &param);
216
 
}
217
 
 
218
 
bool replicator_update_row(Session *session, Table *table,
219
 
                           const unsigned char *before,
220
 
                           const unsigned char *after)
221
 
{
222
 
  replicator_row_parms_st param;
223
 
 
224
 
  param.type= repl_update;
225
 
  param.table= table;
226
 
  param.after= after;
227
 
  param.before= before;
228
 
 
229
 
  return replicator_do_row(session, &param);
230
 
}
231
 
 
232
 
bool replicator_delete_row(Session *session, Table *table)
233
 
{
234
 
  replicator_row_parms_st param;
235
 
 
236
 
  param.type= repl_delete;
237
 
  param.table= table;
238
 
  param.after= NULL;
239
 
  param.before= NULL;
240
 
 
241
 
  return replicator_do_row(session, &param);
242
 
}
243
 
 
244
 
/*
245
 
  Here be Dragons!
246
 
 
247
 
  Ok, not so much dragons, but this is where we handle either commits or rollbacks of
248
 
  statements.
249
 
*/
250
 
typedef struct replicator_row_end_st
251
 
{
252
 
  bool autocommit;
253
 
  bool commit;
254
 
} replicator_row_end_st;
255
 
 
256
 
/* We call this to end a statement (on each registered plugin) */
257
 
static bool replicator_end_transaction_iterate (Session *session, st_plugin_int *plugin, void *p)
258
 
{
259
 
  Replicator *repl= plugin_data(plugin, Replicator *);
260
 
  replicator_row_end_st *params= static_cast<replicator_row_end_st *>(p);
261
 
 
262
 
  /* call this loaded replicator plugin's replicator_func1 function pointer */
263
 
  if (repl)
264
 
  {
265
 
    if (repl->end_transaction(session, params->autocommit, params->commit))
266
 
    {
267
 
      /* TRANSLATORS: The leading word "replicator" is the name
268
 
        of the plugin api, and so should not be translated. */
269
 
      errmsg_printf(ERRMSG_LVL_ERROR,
270
 
                    _("replicator plugin '%s' end_transaction() failed"),
271
 
                    (char *)plugin_name(plugin));
272
 
      return true;
273
 
    }
274
 
  }
275
 
 
276
 
  return false;
277
 
}
278
 
 
279
 
bool replicator_end_transaction(Session *session, bool autocommit, bool commit)
280
 
{
281
 
  bool foreach_rv;
282
 
  replicator_row_end_st params;
283
 
 
284
 
  params.autocommit= autocommit;
285
 
  params.commit= commit;
286
 
 
287
 
  /* We need to free any data we did an init of for the session */
288
 
  foreach_rv= plugin_foreach(session, replicator_end_transaction_iterate,
289
 
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
290
 
 
291
 
  return foreach_rv;
292
 
}
293
 
 
294
 
/*
295
 
  If you can do real 2PC this is your hook poing to know that the event is coming.
296
 
 
297
 
  Always true for the moment.
298
 
 
299
 
*/
300
 
bool replicator_prepare(Session *)
301
 
{
302
 
  return false;
303
 
}
304
 
 
305
 
/*
306
 
  Replicate statement.
307
 
*/
308
 
typedef struct replicator_statement_st
309
 
{
310
 
  const char *query;
311
 
  size_t query_length;
312
 
} replicator_statement_st;
313
 
 
314
 
/* We call this to end a statement (on each registered plugin) */
315
 
static bool replicator_statement_iterate(Session *session, st_plugin_int *plugin, void *p)
316
 
{
317
 
  Replicator *repl= plugin_data(plugin, Replicator *);
318
 
  replicator_statement_st *params= (replicator_statement_st *)p;
319
 
 
320
 
  /* call this loaded replicator plugin's replicator_func1 function pointer */
321
 
  if (repl)
322
 
  {
323
 
    if (repl->statement(session, params->query, params->query_length))
324
 
    {
325
 
      /* TRANSLATORS: The leading word "replicator" is the name
326
 
        of the plugin api, and so should not be translated. */
327
 
      errmsg_printf(ERRMSG_LVL_ERROR,
328
 
                    _("replicator plugin '%s' statement() failed"),
329
 
                    (char *)plugin_name(plugin));
330
 
      return true;
331
 
    }
332
 
  }
333
 
 
334
 
  return false;
335
 
}
336
 
 
337
 
bool replicator_statement(Session *session, const char *query, size_t query_length)
338
 
{
339
 
  bool foreach_rv;
340
 
  replicator_statement_st params;
341
 
  
342
 
  params.query= query;
343
 
  params.query_length= query_length;
344
 
 
345
 
  /* We need to free any data we did an init of for the session */
346
 
  foreach_rv= plugin_foreach(session, replicator_statement_iterate,
347
 
                             DRIZZLE_REPLICATOR_PLUGIN, (void *) &params);
348
 
 
349
 
  return foreach_rv;
350
 
}
 
24
/**
 
25
 * @file Server-side utility which is responsible for managing the 
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * TransactionServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/transaction.proto
 
42
 */
 
43
 
 
44
#include "drizzled/server_includes.h"
 
45
#include "drizzled/transaction_services.h"
 
46
#include "drizzled/plugin/replicator.h"
 
47
#include "drizzled/plugin/applier.h"
 
48
#include "drizzled/message/transaction.pb.h"
 
49
#include "drizzled/message/table.pb.h"
 
50
#include "drizzled/gettext.h"
 
51
#include "drizzled/session.h"
 
52
#include "drizzled/plugin_registry.h"
 
53
 
 
54
#include <vector>
 
55
 
 
56
drizzled::TransactionServices transaction_services;
 
57
 
 
58
void add_replicator(drizzled::plugin::Replicator *repl)
 
59
{
 
60
  transaction_services.attachReplicator(repl);
 
61
}
 
62
 
 
63
void remove_replicator(drizzled::plugin::Replicator *repl)
 
64
{
 
65
  transaction_services.detachReplicator(repl);
 
66
}
 
67
 
 
68
 
 
69
namespace drizzled
 
70
{
 
71
 
 
72
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
 
73
{
 
74
  replicators.push_back(in_replicator);
 
75
}
 
76
 
 
77
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
 
78
{
 
79
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
 
80
}
 
81
 
 
82
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
 
83
{
 
84
  appliers.push_back(in_applier);
 
85
}
 
86
 
 
87
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
 
88
{
 
89
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
90
}
 
91
 
 
92
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
 
93
                                                     , Session *in_session) const
 
94
{
 
95
  using namespace drizzled::message;
 
96
 
 
97
  TransactionContext *trx= in_command->mutable_transaction_context();
 
98
  trx->set_server_id(in_session->getServerId());
 
99
  trx->set_transaction_id(in_session->getTransactionId());
 
100
}
 
101
 
 
102
void TransactionServices::startTransaction(Session *in_session)
 
103
{
 
104
  using namespace drizzled::message;
 
105
  
 
106
  if (replicators.size() == 0 || appliers.size() == 0)
 
107
    return;
 
108
  
 
109
  Command command;
 
110
  command.set_type(Command::START_TRANSACTION);
 
111
  command.set_timestamp(in_session->getCurrentTimestamp());
 
112
 
 
113
  setCommandTransactionContext(&command, in_session);
 
114
  
 
115
  push(&command);
 
116
}
 
117
 
 
118
void TransactionServices::commitTransaction(Session *in_session)
 
119
{
 
120
  using namespace drizzled::message;
 
121
  
 
122
  if (replicators.size() == 0 || appliers.size() == 0)
 
123
    return;
 
124
  
 
125
  Command command;
 
126
  command.set_type(Command::COMMIT);
 
127
  command.set_timestamp(in_session->getCurrentTimestamp());
 
128
 
 
129
  setCommandTransactionContext(&command, in_session);
 
130
  
 
131
  push(&command);
 
132
}
 
133
 
 
134
void TransactionServices::rollbackTransaction(Session *in_session)
 
135
{
 
136
  using namespace drizzled::message;
 
137
  
 
138
  if (replicators.size() == 0 || appliers.size() == 0)
 
139
    return;
 
140
  
 
141
  Command command;
 
142
  command.set_type(Command::ROLLBACK);
 
143
  command.set_timestamp(in_session->getCurrentTimestamp());
 
144
 
 
145
  setCommandTransactionContext(&command, in_session);
 
146
  
 
147
  push(&command);
 
148
}
 
149
 
 
150
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
151
{
 
152
  using namespace drizzled::message;
 
153
  
 
154
  if (replicators.size() == 0 || appliers.size() == 0)
 
155
    return;
 
156
 
 
157
  Command command;
 
158
  command.set_type(Command::INSERT);
 
159
  command.set_timestamp(in_session->getCurrentTimestamp());
 
160
 
 
161
  setCommandTransactionContext(&command, in_session);
 
162
 
 
163
  const char *schema_name= in_table->getShare()->db.str;
 
164
  const char *table_name= in_table->getShare()->table_name.str;
 
165
 
 
166
  command.set_schema(schema_name);
 
167
  command.set_table(table_name);
 
168
 
 
169
  /* 
 
170
   * Now we construct the specialized InsertRecord command inside
 
171
   * the Command container...
 
172
   */
 
173
  InsertRecord *change_record= command.mutable_insert_record();
 
174
 
 
175
  Field *current_field;
 
176
  Field **table_fields= in_table->field;
 
177
  String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
 
178
  string_value->set_charset(system_charset_info);
 
179
 
 
180
  Table::Field *cur_field;
 
181
 
 
182
  while ((current_field= *table_fields++) != NULL) 
 
183
  {
 
184
    cur_field= change_record->add_insert_field();
 
185
    cur_field->set_name(std::string(current_field->field_name));
 
186
    cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
 
187
    string_value= current_field->val_str(string_value);
 
188
    change_record->add_insert_value(std::string(string_value->c_ptr()));
 
189
    string_value->free(); /* I wish there was a clear() method... */
 
190
  }
 
191
 
 
192
  if (string_value)
 
193
    delete string_value; /* Is this needed with memroot allocation? */
 
194
  
 
195
  push(&command);
 
196
}
 
197
 
 
198
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
 
199
{
 
200
  using namespace drizzled::message;
 
201
  
 
202
  if (replicators.size() == 0 || appliers.size() == 0)
 
203
    return;
 
204
  
 
205
  Command command;
 
206
  command.set_type(Command::UPDATE);
 
207
  command.set_timestamp(in_session->getCurrentTimestamp());
 
208
 
 
209
  setCommandTransactionContext(&command, in_session);
 
210
 
 
211
  const char *schema_name= in_table->getShare()->db.str;
 
212
  const char *table_name= in_table->getShare()->table_name.str;
 
213
 
 
214
  command.set_schema(schema_name);
 
215
  command.set_table(table_name);
 
216
 
 
217
  /* 
 
218
   * Now we construct the specialized UpdateRecord command inside
 
219
   * the Command container...
 
220
   */
 
221
  //UpdateRecord *change_record= command.mutable_update_record();
 
222
 
 
223
  push(&command);
 
224
}
 
225
 
 
226
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
227
{
 
228
  using namespace drizzled::message;
 
229
  
 
230
  if (replicators.size() == 0 || appliers.size() == 0)
 
231
    return;
 
232
  
 
233
  Command command;
 
234
  command.set_type(Command::DELETE);
 
235
  command.set_timestamp(in_session->getCurrentTimestamp());
 
236
 
 
237
  setCommandTransactionContext(&command, in_session);
 
238
 
 
239
  const char *schema_name= in_table->getShare()->db.str;
 
240
  const char *table_name= in_table->getShare()->table_name.str;
 
241
 
 
242
  command.set_schema(schema_name);
 
243
  command.set_table(table_name);
 
244
 
 
245
  /* 
 
246
   * Now we construct the specialized DeleteRecord command inside
 
247
   * the Command container...
 
248
   */
 
249
  //DeleteRecord *change_record= command.mutable_delete_record();
 
250
  
 
251
  push(&command);
 
252
}
 
253
 
 
254
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
255
{
 
256
  using namespace drizzled::message;
 
257
  
 
258
  if (replicators.size() == 0 || appliers.size() == 0)
 
259
    return;
 
260
  
 
261
  Command command;
 
262
  command.set_type(Command::RAW_SQL);
 
263
  command.set_timestamp(in_session->getCurrentTimestamp());
 
264
 
 
265
  setCommandTransactionContext(&command, in_session);
 
266
 
 
267
  std::string query(in_query, in_query_len);
 
268
  command.set_sql(query);
 
269
 
 
270
  push(&command);
 
271
  
 
272
}
 
273
 
 
274
void TransactionServices::push(drizzled::message::Command *to_push)
 
275
{
 
276
  std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
 
277
  std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
 
278
  appl_start_iter= appliers.begin();
 
279
 
 
280
  drizzled::plugin::Replicator *cur_repl;
 
281
  drizzled::plugin::Applier *cur_appl;
 
282
 
 
283
  while (repl_iter != replicators.end())
 
284
  {
 
285
    cur_repl= *repl_iter;
 
286
    if (! cur_repl->isActive())
 
287
    {
 
288
      ++repl_iter;
 
289
      continue;
 
290
    }
 
291
    
 
292
    appl_iter= appl_start_iter;
 
293
    while (appl_iter != appliers.end())
 
294
    {
 
295
      cur_appl= *appl_iter;
 
296
 
 
297
      if (! cur_appl->isActive())
 
298
      {
 
299
        ++appl_iter;
 
300
        continue;
 
301
      }
 
302
 
 
303
      cur_repl->replicate(cur_appl, to_push);
 
304
      ++appl_iter;
 
305
    }
 
306
    ++repl_iter;
 
307
  }
 
308
}
 
309
 
 
310
 
 
311
} /* end namespace drizzled */