~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Jay Pipes
  • Date: 2009-04-10 17:06:58 UTC
  • mto: (971.1.47 mordred)
  • mto: This revision was merged to the branch mainline in revision 990.
  • Revision ID: jpipes@serialcoder-20090410170658-d3azdnas1fn8v68l
Removal of log.cc (binlog), added Applier plugin and fixed up Replicator
plugin.  New transaction_services.cc class implementation of the API for
converting between internal formats and GPB Command Messages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Mark Atwood
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *
 
6
 *  Authors:
 
7
 *
 
8
 *    Jay Pipes <joinfu@sun.com>
5
9
 *
6
10
 *  This program is free software; you can redistribute it and/or modify
7
11
 *  it under the terms of the GNU General Public License as published by
17
21
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
22
 */
19
23
 
20
 
#include <drizzled/server_includes.h>
21
 
#include <drizzled/replicator.h>
22
 
#include <drizzled/gettext.h>
23
 
#include <drizzled/session.h>
24
 
 
 
24
/**
 
25
 * @file Server-side utility which is responsible for managing the 
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * TransactionServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/transaction.proto
 
42
 */
 
43
 
 
44
#include "drizzled/server_includes.h"
 
45
#include "drizzled/transaction_services.h"
 
46
#include "drizzled/plugin/replicator.h"
 
47
#include "drizzled/plugin/applier.h"
 
48
#include "drizzled/message/transaction.pb.h"
 
49
#include "drizzled/message/table.pb.h"
 
50
#include "drizzled/gettext.h"
 
51
#include "drizzled/session.h"
 
52
 
 
53
#include <vector>
 
54
 
 
55
drizzled::TransactionServices transaction_services;
 
56
 
 
57
/**
 
58
 * @TODO
 
59
 *
 
60
 * We're going to start simple at first, meaning that the 
 
61
 * below are the global vectors of replicators and appliers. The
 
62
 * end goal is to have the TransactionServices have a register method
 
63
 * which allows modules to register Replicator or Applier *factories*, 
 
64
 * which will allow TransactionServices to attach and detach a replicator/applier
 
65
 * to a Session, instead of the current global vector.
 
66
 */
25
67
int replicator_initializer(st_plugin_int *plugin)
26
68
{
27
 
  Replicator *p= NULL;
 
69
  drizzled::plugin::Replicator *repl= NULL;
28
70
 
29
71
  if (plugin->plugin->init)
30
72
  {
31
 
    if (plugin->plugin->init((void *)&p))
 
73
    if (plugin->plugin->init(&repl))
32
74
    {
33
75
      /* TRANSLATORS: The leading word "replicator" is the name
34
76
        of the plugin api, and so should not be translated. */
39
81
    }
40
82
  }
41
83
 
42
 
  plugin->data= (void *)p;
 
84
  if (repl == NULL)
 
85
    return 1;
 
86
 
 
87
  transaction_services.attachReplicator(repl);
 
88
  plugin->data= repl;
43
89
 
44
90
  return 0;
45
91
}
46
92
 
47
93
int replicator_finalizer(st_plugin_int *plugin)
48
94
{
49
 
  Replicator *p= static_cast<Replicator *>(plugin->data);
 
95
  drizzled::plugin::Replicator *repl= static_cast<drizzled::plugin::Replicator *>(plugin->data);
 
96
  
 
97
  assert(repl);
 
98
 
 
99
  transaction_services.detachReplicator(repl);
50
100
 
51
101
  if (plugin->plugin->deinit)
52
 
    {
53
 
      if (plugin->plugin->deinit((void *)p))
54
 
        {
55
 
          /* TRANSLATORS: The leading word "replicator" is the name
56
 
             of the plugin api, and so should not be translated. */
57
 
          errmsg_printf(ERRMSG_LVL_ERROR,
58
 
                        _("replicator plugin '%s' deinit() failed"),
59
 
                        plugin->name.str);
60
 
        }
61
 
    }
 
102
    (void) plugin->plugin->deinit(repl);
62
103
 
63
104
  return 0;
64
105
}
65
106
 
 
107
namespace drizzled
 
108
{
 
109
 
 
110
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
 
111
{
 
112
  replicators.push_back(in_replicator);
 
113
}
 
114
 
 
115
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
 
116
{
 
117
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
 
118
}
 
119
 
 
120
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
 
121
{
 
122
  appliers.push_back(in_applier);
 
123
}
 
124
 
 
125
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
 
126
{
 
127
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
128
}
 
129
 
 
130
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
 
131
                                                     , Session *in_session) const
 
132
{
 
133
  using namespace drizzled::message;
 
134
 
 
135
  TransactionContext *trx= in_command->mutable_transaction_context();
 
136
  trx->set_server_id(in_session->getServerId());
 
137
  trx->set_transaction_id(in_session->getTransactionId());
 
138
}
 
139
 
 
140
void TransactionServices::startTransaction(Session *in_session)
 
141
{
 
142
  using namespace drizzled::message;
 
143
  
 
144
  Command command;
 
145
  command.set_type(Command::START_TRANSACTION);
 
146
  command.set_timestamp(in_session->getCurrentTimestamp());
 
147
 
 
148
  setCommandTransactionContext(&command, in_session);
 
149
  
 
150
  push(&command);
 
151
}
 
152
 
 
153
void TransactionServices::commitTransaction(Session *in_session)
 
154
{
 
155
  using namespace drizzled::message;
 
156
  
 
157
  Command command;
 
158
  command.set_type(Command::COMMIT);
 
159
  command.set_timestamp(in_session->getCurrentTimestamp());
 
160
 
 
161
  setCommandTransactionContext(&command, in_session);
 
162
  
 
163
  push(&command);
 
164
}
 
165
 
 
166
void TransactionServices::rollbackTransaction(Session *in_session)
 
167
{
 
168
  using namespace drizzled::message;
 
169
  
 
170
  Command command;
 
171
  command.set_type(Command::ROLLBACK);
 
172
  command.set_timestamp(in_session->getCurrentTimestamp());
 
173
 
 
174
  setCommandTransactionContext(&command, in_session);
 
175
  
 
176
  push(&command);
 
177
}
 
178
 
 
179
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
180
{
 
181
  using namespace drizzled::message;
 
182
  
 
183
  Command command;
 
184
  command.set_type(Command::INSERT);
 
185
  command.set_timestamp(in_session->getCurrentTimestamp());
 
186
 
 
187
  setCommandTransactionContext(&command, in_session);
 
188
 
 
189
  const char *schema_name= in_table->getShare()->db.str;
 
190
  const char *table_name= in_table->getShare()->table_name.str;
 
191
 
 
192
  command.set_schema(schema_name);
 
193
  command.set_table(table_name);
 
194
 
 
195
  /* 
 
196
   * Now we construct the specialized InsertRecord command inside
 
197
   * the Command container...
 
198
   */
 
199
  InsertRecord *change_record= command.mutable_insert_record();
 
200
 
 
201
  Field *current_field;
 
202
  Field **table_fields= in_table->field;
 
203
  String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
 
204
  string_value->set_charset(system_charset_info);
 
205
 
 
206
  Table::Field *cur_field;
 
207
 
 
208
  while ((current_field= *table_fields++) != NULL) 
 
209
  {
 
210
    cur_field= change_record->add_insert_field();
 
211
    cur_field->set_name(std::string(current_field->field_name));
 
212
    cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
 
213
    string_value= current_field->val_str(string_value);
 
214
    change_record->add_insert_value(std::string(string_value->c_ptr()));
 
215
    string_value->free(); /* I wish there was a clear() method... */
 
216
  }
 
217
  
 
218
  push(&command);
 
219
}
 
220
 
 
221
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
 
222
{
 
223
  using namespace drizzled::message;
 
224
  
 
225
  Command command;
 
226
  command.set_type(Command::UPDATE);
 
227
  command.set_timestamp(in_session->getCurrentTimestamp());
 
228
 
 
229
  setCommandTransactionContext(&command, in_session);
 
230
 
 
231
  const char *schema_name= in_table->getShare()->db.str;
 
232
  const char *table_name= in_table->getShare()->table_name.str;
 
233
 
 
234
  command.set_schema(schema_name);
 
235
  command.set_table(table_name);
 
236
 
 
237
  /* 
 
238
   * Now we construct the specialized UpdateRecord command inside
 
239
   * the Command container...
 
240
   */
 
241
  //UpdateRecord *change_record= command.mutable_update_record();
 
242
 
 
243
  push(&command);
 
244
}
 
245
 
 
246
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
247
{
 
248
  using namespace drizzled::message;
 
249
  
 
250
  Command command;
 
251
  command.set_type(Command::DELETE);
 
252
  command.set_timestamp(in_session->getCurrentTimestamp());
 
253
 
 
254
  setCommandTransactionContext(&command, in_session);
 
255
 
 
256
  const char *schema_name= in_table->getShare()->db.str;
 
257
  const char *table_name= in_table->getShare()->table_name.str;
 
258
 
 
259
  command.set_schema(schema_name);
 
260
  command.set_table(table_name);
 
261
 
 
262
  /* 
 
263
   * Now we construct the specialized DeleteRecord command inside
 
264
   * the Command container...
 
265
   */
 
266
  //DeleteRecord *change_record= command.mutable_delete_record();
 
267
  
 
268
  push(&command);
 
269
}
 
270
 
 
271
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
272
{
 
273
  using namespace drizzled::message;
 
274
  
 
275
  Command command;
 
276
  command.set_type(Command::RAW_SQL);
 
277
  command.set_timestamp(in_session->getCurrentTimestamp());
 
278
 
 
279
  setCommandTransactionContext(&command, in_session);
 
280
 
 
281
  std::string query(in_query, in_query_len);
 
282
  command.set_sql(query);
 
283
 
 
284
  push(&command);
 
285
  
 
286
}
 
287
 
 
288
void TransactionServices::push(drizzled::message::Command *to_push)
 
289
{
 
290
  std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
 
291
  std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
 
292
  appl_start_iter= appliers.begin();
 
293
 
 
294
  drizzled::plugin::Replicator *cur_repl;
 
295
  drizzled::plugin::Applier *cur_appl;
 
296
 
 
297
  while (repl_iter != replicators.end())
 
298
  {
 
299
    cur_repl= *repl_iter;
 
300
    if (! cur_repl->isActive())
 
301
    {
 
302
      ++repl_iter;
 
303
      continue;
 
304
    }
 
305
    
 
306
    appl_iter= appl_start_iter;
 
307
    while (appl_iter != appliers.end())
 
308
    {
 
309
      cur_appl= *appl_iter;
 
310
 
 
311
      if (! cur_appl->isActive())
 
312
      {
 
313
        ++appl_iter;
 
314
        continue;
 
315
      }
 
316
 
 
317
      cur_repl->replicate(cur_appl, to_push);
 
318
      ++appl_iter;
 
319
    }
 
320
    ++repl_iter;
 
321
  }
 
322
}
 
323
 
 
324
#ifdef oldcode
66
325
/* This gets called by plugin_foreach once for each loaded replicator plugin */
67
 
static bool replicator_session_iterate(Session *session, st_plugin_int *plugin, void *)
 
326
static bool replicator_session_iterate(Session *session, plugin_ref plugin, void *)
68
327
{
69
 
  Replicator *repl= plugin_data(plugin, Replicator *);
70
 
  bool error;
 
328
  drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
 
329
 
 
330
  if (! repl || ! repl->isActive())
 
331
    return false;
71
332
 
72
333
  /* call this loaded replicator plugin's session_init method */
73
 
  if (repl)
 
334
  if (! repl->initSession(session))
74
335
  {
75
 
    error= repl->session_init(session);
76
 
    if (error)
77
 
    {
78
 
      /* TRANSLATORS: The leading word "replicator" is the name
79
 
        of the plugin api, and so should not be translated. */
80
 
      errmsg_printf(ERRMSG_LVL_ERROR,
81
 
                    _("replicator plugin '%s' session_init() failed"),
82
 
                    (char *)plugin_name(plugin));
83
 
      return true;
84
 
    }
 
336
    /* TRANSLATORS: The leading word "replicator" is the name
 
337
      of the plugin api, and so should not be translated. */
 
338
    errmsg_printf(ERRMSG_LVL_ERROR,
 
339
                  _("replicator plugin '%s' session_init() failed"),
 
340
                  (char *)plugin_name(plugin));
 
341
    return true;
85
342
  }
86
 
 
87
343
  return false;
88
344
}
89
345
 
99
355
    trans_register_ha(session, true, binlog_engine);
100
356
  trans_register_ha(session, false, binlog_engine);
101
357
 
102
 
  if (session->getReplicationData())
103
 
    return false;
104
 
 
105
358
  /* 
106
359
    call replicator_session_iterate
107
360
    once for each loaded replicator plugin
111
364
 
112
365
  return foreach_rv;
113
366
}
114
 
 
115
367
/* The plugin_foreach() iterator requires that we
116
368
   convert all the parameters of a plugin api entry point
117
369
   into just one single void ptr, plus the session.
119
371
   and marshall them into a struct of this type, and
120
372
   then just pass in a pointer to it.
121
373
*/
122
 
enum repl_row_exec_t{
 
374
enum repl_row_exec_t
 
375
{
123
376
  repl_insert,
124
377
  repl_update,
125
378
  repl_delete
135
388
 
136
389
 
137
390
/* This gets called by plugin_foreach once for each loaded replicator plugin */
138
 
static bool replicator_do_row_iterate (Session *session, st_plugin_int *plugin, void *p)
 
391
static bool replicator_do_row_iterate (Session *session, plugin_ref plugin, void *p)
139
392
{
140
 
  Replicator *repl= plugin_data(plugin, Replicator *);
 
393
  drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
 
394
 
 
395
  if (! repl || ! repl->isActive())
 
396
    return false;
 
397
 
141
398
  replicator_row_parms_st *params= static_cast<replicator_row_parms_st *>(p);
142
399
 
143
 
  switch (params->type) {
144
 
  case repl_insert:
145
 
    if (repl)
 
400
  /* 
 
401
   * We create a container ChangeRecord and then specialize the change
 
402
   * record depending on the actual event which occurred...
 
403
   */
 
404
  drizzled::message::ChangeRecord change_record;
 
405
  /** 
 
406
   * When inserting a row, we want to pass the replicator only the
 
407
   * specific information it needs, which is the name of the schema, 
 
408
   * the name of the table, the list of field names in the field list of
 
409
   * the INSERT expression, and the values of this written row as strings.
 
410
   *
 
411
   * @TODO  Eventually, it would be better to simply pass a Table 
 
412
   *        proto message instead of both a schema name and a table
 
413
   *        name.
 
414
   *
 
415
   * @TODO  Better to not have to pass string values...
 
416
   *
 
417
   * @TODO  Pass pointers here instead of copies of values/field names?
 
418
   *        For large inserts, might be in trouble of running out of 
 
419
   *        stack space? Not sure...
 
420
   *
 
421
   * @TODO  Ugh, get rid of the friggin custom String shit.
 
422
   */
 
423
  const char *schema_name= params->table->getShare()->db.str;
 
424
  const char *table_name= params->table->getShare()->table_name.str;
 
425
 
 
426
  change_record.set_schema(schema_name);
 
427
  change_record.set_table(table_name);
 
428
  switch (params->type) 
 
429
  {
 
430
    case repl_insert:
146
431
    {
147
 
      if (repl->row_insert(session, params->table))
 
432
 
 
433
      drizzled::message::InsertRecord *irecord= change_record.mutable_insert_record();
 
434
 
 
435
      std::vector<std::string> values;
 
436
      std::vector<std::string> field_names;
 
437
 
 
438
      Field *current_field;
 
439
      Field **table_fields= params->table->field;
 
440
      String *string_value= new (session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
 
441
      string_value->set_charset(system_charset_info);
 
442
 
 
443
      while ((current_field= *table_fields++) != NULL) 
148
444
      {
149
 
        /* TRANSLATORS: The leading word "replicator" is the name
150
 
          of the plugin api, and so should not be translated. */
151
 
        errmsg_printf(ERRMSG_LVL_ERROR,
152
 
                      _("replicator plugin '%s' row_insert() failed"),
153
 
                     (char *)plugin_name(plugin));
154
445
 
155
 
        return true;
 
446
        field_names.push_back(std::string(current_field->field_name));
 
447
        string_value= current_field->val_str(string_value);
 
448
        values.push_back(std::string(string_value->c_ptr()));
 
449
        string_value->free(); /* I wish there was a clear() method... */
156
450
      }
 
451
 
 
452
      repl->replicate(appl, &change_record);
 
453
      if (string_value)
 
454
        delete string_value;
 
455
      break;
157
456
    }
158
 
    break;
159
 
  case repl_update:
160
 
    if (repl)
 
457
    case repl_update:
161
458
    {
162
 
      if (repl->row_update(session, params->table,
163
 
                           params->before, params->after))
 
459
      if (! repl->updateRow(session, params->table, params->before, params->after))
164
460
      {
165
461
        /* TRANSLATORS: The leading word "replicator" is the name
166
462
          of the plugin api, and so should not be translated. */
170
466
 
171
467
        return true;
172
468
      }
 
469
      break;
173
470
    }
174
 
    break;
175
 
  case repl_delete:
176
 
    if (repl)
 
471
    case repl_delete:
177
472
    {
178
 
      if (repl->row_delete(session, params->table))
 
473
      if (! repl->deleteRow(session, params->table))
179
474
      {
180
475
        /* TRANSLATORS: The leading word "replicator" is the name
181
476
          of the plugin api, and so should not be translated. */
185
480
 
186
481
        return true;
187
482
      }
 
483
      break;
188
484
    }
189
 
    break;
190
485
  }
191
486
  return false;
192
487
}
254
549
} replicator_row_end_st;
255
550
 
256
551
/* We call this to end a statement (on each registered plugin) */
257
 
static bool replicator_end_transaction_iterate (Session *session, st_plugin_int *plugin, void *p)
 
552
static bool replicator_end_transaction_iterate (Session *session, plugin_ref plugin, void *p)
258
553
{
259
 
  Replicator *repl= plugin_data(plugin, Replicator *);
 
554
  drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
260
555
  replicator_row_end_st *params= static_cast<replicator_row_end_st *>(p);
261
556
 
 
557
  if (! repl || ! repl->isActive())
 
558
    return false;
 
559
 
262
560
  /* call this loaded replicator plugin's replicator_func1 function pointer */
263
 
  if (repl)
 
561
  if (! repl->endTransaction(session, params->autocommit, params->commit))
264
562
  {
265
 
    if (repl->end_transaction(session, params->autocommit, params->commit))
266
 
    {
267
 
      /* TRANSLATORS: The leading word "replicator" is the name
268
 
        of the plugin api, and so should not be translated. */
269
 
      errmsg_printf(ERRMSG_LVL_ERROR,
270
 
                    _("replicator plugin '%s' end_transaction() failed"),
271
 
                    (char *)plugin_name(plugin));
272
 
      return true;
273
 
    }
 
563
    /* TRANSLATORS: The leading word "replicator" is the name
 
564
      of the plugin api, and so should not be translated. */
 
565
    errmsg_printf(ERRMSG_LVL_ERROR,
 
566
                  _("replicator plugin '%s' end_transaction() failed"),
 
567
                  (char *)plugin_name(plugin));
 
568
    return true;
274
569
  }
275
 
 
276
570
  return false;
277
571
}
278
572
 
311
605
  size_t query_length;
312
606
} replicator_statement_st;
313
607
 
314
 
/* We call this to end a statement (on each registered plugin) */
315
 
static bool replicator_statement_iterate(Session *session, st_plugin_int *plugin, void *p)
 
608
/* We call this to begin a statement (on each registered plugin) */
 
609
static bool replicator_statement_iterate(Session *session, plugin_ref plugin, void *p)
316
610
{
317
 
  Replicator *repl= plugin_data(plugin, Replicator *);
 
611
  drizzled::plugin::Replicator *repl= plugin_data(plugin, drizzled::plugin::Replicator *);
318
612
  replicator_statement_st *params= (replicator_statement_st *)p;
319
613
 
 
614
  if (! repl || ! repl->isActive())
 
615
    return false;
 
616
 
320
617
  /* call this loaded replicator plugin's replicator_func1 function pointer */
321
 
  if (repl)
 
618
  if (! repl->beginStatement(session, params->query, params->query_length))
322
619
  {
323
 
    if (repl->statement(session, params->query, params->query_length))
324
 
    {
325
 
      /* TRANSLATORS: The leading word "replicator" is the name
326
 
        of the plugin api, and so should not be translated. */
327
 
      errmsg_printf(ERRMSG_LVL_ERROR,
328
 
                    _("replicator plugin '%s' statement() failed"),
329
 
                    (char *)plugin_name(plugin));
330
 
      return true;
331
 
    }
 
620
    /* TRANSLATORS: The leading word "replicator" is the name
 
621
      of the plugin api, and so should not be translated. */
 
622
    errmsg_printf(ERRMSG_LVL_ERROR,
 
623
                  _("replicator plugin '%s' statement() failed"),
 
624
                  (char *)plugin_name(plugin));
 
625
    return true;
332
626
  }
333
 
 
334
627
  return false;
335
628
}
336
629
 
348
641
 
349
642
  return foreach_rv;
350
643
}
 
644
#endif /* oldcode */
 
645
 
 
646
} /* end namespace drizzled */