~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Brian Aker
  • Date: 2009-10-07 16:55:53 UTC
  • mfrom: (1161.2.1 bug444827)
  • Revision ID: brian@gaz-20091007165553-9tnp7liw1k9g6gvc
Merge Padraig

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
24
23
 
25
24
/**
26
25
 * @file Server-side utility which is responsible for managing the 
27
 
 * communication between the kernel and the replication plugins:
28
 
 *
29
 
 * - TransactionReplicator
30
 
 * - TransactionApplier
31
 
 * - Publisher
32
 
 * - Subscriber
33
 
 *
34
 
 * ReplicationServices is a bridge between replication modules and the kernel,
35
 
 * and its primary function is to  */
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * ReplicationServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/replication.proto
 
42
 */
36
43
 
37
 
#include "config.h"
 
44
#include "drizzled/server_includes.h"
38
45
#include "drizzled/replication_services.h"
39
 
#include "drizzled/plugin/transaction_replicator.h"
40
 
#include "drizzled/plugin/transaction_applier.h"
41
 
#include "drizzled/message/transaction.pb.h"
 
46
#include "drizzled/plugin/command_replicator.h"
 
47
#include "drizzled/plugin/command_applier.h"
 
48
#include "drizzled/message/replication.pb.h"
 
49
#include "drizzled/message/table.pb.h"
42
50
#include "drizzled/gettext.h"
43
51
#include "drizzled/session.h"
44
 
#include "drizzled/error.h"
45
52
 
46
 
#include <string>
47
53
#include <vector>
48
 
#include <algorithm>
49
54
 
50
55
using namespace std;
51
 
 
52
 
namespace drizzled
53
 
{
54
 
 
55
 
ReplicationServices::ReplicationServices() :
56
 
  is_active(false)
57
 
{
58
 
}
59
 
 
60
 
void ReplicationServices::normalizeReplicatorName(string &name)
61
 
{
62
 
  transform(name.begin(),
63
 
            name.end(),
64
 
            name.begin(),
65
 
            ::tolower);
66
 
  if (name.find("replicator") == string::npos)
67
 
    name.append("replicator", 10);
68
 
  {
69
 
    size_t found_underscore= name.find('_');
70
 
    while (found_underscore != string::npos)
71
 
    {
72
 
      name.erase(found_underscore, 1);
73
 
      found_underscore= name.find('_');
74
 
    }
75
 
  }
76
 
}
77
 
 
78
 
bool ReplicationServices::evaluateRegisteredPlugins()
79
 
{
80
 
  /* 
81
 
   * We loop through appliers that have registered with us
82
 
   * and attempts to pair the applier with its requested
83
 
   * replicator.  If an applier has requested a replicator
84
 
   * that has either not been built or has not registered
85
 
   * with the replication services, we print an error and
86
 
   * return false
87
 
   */
88
 
  if (appliers.empty())
89
 
    return true;
90
 
 
91
 
  if (replicators.empty() && not appliers.empty())
92
 
  {
93
 
    errmsg_printf(error::ERROR,
94
 
                  N_("You registered a TransactionApplier plugin but no "
95
 
                     "TransactionReplicator plugins were registered.\n"));
96
 
    return false;
97
 
  }
98
 
 
99
 
  for (Appliers::iterator appl_iter= appliers.begin();
100
 
       appl_iter != appliers.end();
101
 
       ++appl_iter)
102
 
  {
103
 
    plugin::TransactionApplier *applier= (*appl_iter).second;
104
 
    string requested_replicator_name= (*appl_iter).first;
105
 
    normalizeReplicatorName(requested_replicator_name);
106
 
 
107
 
    bool found= false;
108
 
    Replicators::iterator repl_iter;
109
 
    for (repl_iter= replicators.begin();
110
 
         repl_iter != replicators.end();
111
 
         ++repl_iter)
112
 
    {
113
 
      string replicator_name= (*repl_iter)->getName();
114
 
      normalizeReplicatorName(replicator_name);
115
 
 
116
 
      if (requested_replicator_name.compare(replicator_name) == 0)
117
 
      {
118
 
        found= true;
119
 
        break;
120
 
      }
121
 
    }
122
 
    if (not found)
123
 
    {
124
 
      errmsg_printf(error::ERROR,
125
 
                    N_("You registered a TransactionApplier plugin but no "
126
 
                       "TransactionReplicator plugins were registered that match the "
127
 
                       "requested replicator name of '%s'.\n"
128
 
                       "We have deactivated the TransactionApplier '%s'.\n"),
129
 
                       requested_replicator_name.c_str(),
130
 
                       applier->getName().c_str());
131
 
      applier->deactivate();
132
 
      return false;
133
 
    }
134
 
    else
135
 
    {
136
 
      replication_streams.push_back(make_pair(*repl_iter, applier));
137
 
    }
138
 
  }
139
 
  is_active= true;
140
 
  return true;
141
 
}
142
 
 
143
 
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
 
56
using namespace drizzled;
 
57
 
 
58
ReplicationServices::ReplicationServices()
 
59
{
 
60
  is_active= false;
 
61
}
 
62
 
 
63
void ReplicationServices::evaluateActivePlugins()
 
64
{
 
65
  /* 
 
66
   * We loop through replicators and appliers, evaluating
 
67
   * whether or not there is at least one active replicator
 
68
   * and one active applier.  If not, we set is_active
 
69
   * to false.
 
70
   */
 
71
  bool tmp_is_active= false;
 
72
 
 
73
  if (replicators.empty() || appliers.empty())
 
74
  {
 
75
    is_active= false;
 
76
    return;
 
77
  }
 
78
 
 
79
  /* 
 
80
   * Determine if any remaining replicators and if those
 
81
   * replicators are active...if not, set is_active
 
82
   * to false
 
83
   */
 
84
  vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
 
85
  while (repl_iter != replicators.end())
 
86
  {
 
87
    if ((*repl_iter)->isActive())
 
88
    {
 
89
      tmp_is_active= true;
 
90
      break;
 
91
    }
 
92
    ++repl_iter;
 
93
  }
 
94
  if (! tmp_is_active)
 
95
  {
 
96
    /* No active replicators. Set is_active to false and exit. */
 
97
    is_active= false;
 
98
    return;
 
99
  }
 
100
 
 
101
  /* 
 
102
   * OK, we know there's at least one active replicator.
 
103
   *
 
104
   * Now determine if any remaining replicators and if those
 
105
   * replicators are active...if not, set is_active
 
106
   * to false
 
107
   */
 
108
  vector<plugin::CommandApplier *>::iterator appl_iter= appliers.begin();
 
109
  while (appl_iter != appliers.end())
 
110
  {
 
111
    if ((*appl_iter)->isActive())
 
112
    {
 
113
      is_active= true;
 
114
      return;
 
115
    }
 
116
    ++appl_iter;
 
117
  }
 
118
  /* If we get here, there are no active appliers */
 
119
  is_active= false;
 
120
}
 
121
 
 
122
void ReplicationServices::attachReplicator(plugin::CommandReplicator *in_replicator)
144
123
{
145
124
  replicators.push_back(in_replicator);
 
125
  evaluateActivePlugins();
146
126
}
147
127
 
148
 
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
 
128
void ReplicationServices::detachReplicator(plugin::CommandReplicator *in_replicator)
149
129
{
150
130
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
151
 
}
152
 
 
153
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
154
 
{
155
 
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
156
 
}
157
 
 
158
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
159
 
{
 
131
  evaluateActivePlugins();
 
132
}
 
133
 
 
134
void ReplicationServices::attachApplier(plugin::CommandApplier *in_applier)
 
135
{
 
136
  appliers.push_back(in_applier);
 
137
  evaluateActivePlugins();
 
138
}
 
139
 
 
140
void ReplicationServices::detachApplier(plugin::CommandApplier *in_applier)
 
141
{
 
142
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
143
  evaluateActivePlugins();
160
144
}
161
145
 
162
146
bool ReplicationServices::isActive() const
164
148
  return is_active;
165
149
}
166
150
 
167
 
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
 
                                                                          message::Transaction &to_push)
169
 
{
170
 
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
171
 
 
172
 
  for (ReplicationStreams::iterator iter= replication_streams.begin();
173
 
       iter != replication_streams.end();
174
 
       ++iter)
175
 
  {
176
 
    plugin::TransactionReplicator *cur_repl= (*iter).first;
177
 
    plugin::TransactionApplier *cur_appl= (*iter).second;
178
 
 
179
 
    result= cur_repl->replicate(cur_appl, in_session, to_push);
180
 
 
181
 
    if (result == plugin::SUCCESS)
182
 
    {
183
 
      /* 
184
 
       * We update the timestamp for the last applied Transaction so that
 
151
void ReplicationServices::setCommandTransactionContext(message::Command &in_command,
 
152
                                                       Session *in_session) const
 
153
{
 
154
  message::TransactionContext *trx= in_command.mutable_transaction_context();
 
155
  trx->set_server_id(in_session->getServerId());
 
156
  trx->set_transaction_id(in_session->getTransactionId());
 
157
 
 
158
  in_command.set_session_id((uint32_t) in_session->getSessionId());
 
159
}
 
160
 
 
161
void ReplicationServices::startTransaction(Session *in_session)
 
162
{
 
163
  if (! is_active)
 
164
    return;
 
165
  
 
166
  message::Command command;
 
167
  command.set_type(message::Command::START_TRANSACTION);
 
168
  command.set_timestamp(in_session->getCurrentTimestamp());
 
169
 
 
170
  setCommandTransactionContext(command, in_session);
 
171
  
 
172
  push(command);
 
173
}
 
174
 
 
175
void ReplicationServices::commitTransaction(Session *in_session)
 
176
{
 
177
  if (! is_active)
 
178
    return;
 
179
  
 
180
  message::Command command;
 
181
  command.set_type(message::Command::COMMIT);
 
182
  command.set_timestamp(in_session->getCurrentTimestamp());
 
183
 
 
184
  setCommandTransactionContext(command, in_session);
 
185
  
 
186
  push(command);
 
187
}
 
188
 
 
189
void ReplicationServices::rollbackTransaction(Session *in_session)
 
190
{
 
191
  if (! is_active)
 
192
    return;
 
193
  
 
194
  message::Command command;
 
195
  command.set_type(message::Command::ROLLBACK);
 
196
  command.set_timestamp(in_session->getCurrentTimestamp());
 
197
 
 
198
  setCommandTransactionContext(command, in_session);
 
199
  
 
200
  push(command);
 
201
}
 
202
 
 
203
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
 
204
{
 
205
  if (! is_active)
 
206
    return;
 
207
 
 
208
  message::Command command;
 
209
  command.set_type(message::Command::INSERT);
 
210
  command.set_timestamp(in_session->getCurrentTimestamp());
 
211
 
 
212
  setCommandTransactionContext(command, in_session);
 
213
 
 
214
  const char *schema_name= in_table->getShare()->db.str;
 
215
  const char *table_name= in_table->getShare()->table_name.str;
 
216
 
 
217
  command.set_schema(schema_name);
 
218
  command.set_table(table_name);
 
219
 
 
220
  /* 
 
221
   * Now we construct the specialized InsertRecord command inside
 
222
   * the message::Command container...
 
223
   */
 
224
  message::InsertRecord *change_record= command.mutable_insert_record();
 
225
 
 
226
  Field *current_field;
 
227
  Field **table_fields= in_table->field;
 
228
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
229
  string_value->set_charset(system_charset_info);
 
230
 
 
231
  message::Table::Field *current_proto_field;
 
232
 
 
233
  /* We will read all the table's fields... */
 
234
  in_table->setReadSet();
 
235
 
 
236
  while ((current_field= *table_fields++) != NULL) 
 
237
  {
 
238
    current_proto_field= change_record->add_insert_field();
 
239
    current_proto_field->set_name(current_field->field_name);
 
240
    current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
241
    string_value= current_field->val_str(string_value);
 
242
    change_record->add_insert_value(string_value->c_ptr());
 
243
    string_value->free();
 
244
  }
 
245
  
 
246
  push(command);
 
247
}
 
248
 
 
249
void ReplicationServices::updateRecord(Session *in_session,
 
250
                                       Table *in_table, 
 
251
                                       const unsigned char *old_record, 
 
252
                                       const unsigned char *new_record)
 
253
{
 
254
  if (! is_active)
 
255
    return;
 
256
  
 
257
  message::Command command;
 
258
  command.set_type(message::Command::UPDATE);
 
259
  command.set_timestamp(in_session->getCurrentTimestamp());
 
260
 
 
261
  setCommandTransactionContext(command, in_session);
 
262
 
 
263
  const char *schema_name= in_table->getShare()->db.str;
 
264
  const char *table_name= in_table->getShare()->table_name.str;
 
265
 
 
266
  command.set_schema(schema_name);
 
267
  command.set_table(table_name);
 
268
 
 
269
  /* 
 
270
   * Now we construct the specialized UpdateRecord command inside
 
271
   * the message::Command container...
 
272
   */
 
273
  message::UpdateRecord *change_record= command.mutable_update_record();
 
274
 
 
275
  Field *current_field;
 
276
  Field **table_fields= in_table->field;
 
277
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
278
  string_value->set_charset(system_charset_info);
 
279
 
 
280
  message::Table::Field *current_proto_field;
 
281
 
 
282
  while ((current_field= *table_fields++) != NULL) 
 
283
  {
 
284
    /*
 
285
     * The below really should be moved into the Field API and Record API.  But for now
 
286
     * we do this crazy pointer fiddling to figure out if the current field
 
287
     * has been updated in the supplied record raw byte pointers.
 
288
     */
 
289
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
290
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
291
 
 
292
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
293
 
 
294
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
295
    {
 
296
      /* Field is changed from old to new */
 
297
      current_proto_field= change_record->add_update_field();
 
298
      current_proto_field->set_name(current_field->field_name);
 
299
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
300
 
 
301
      /* Store the original "read bit" for this field */
 
302
      bool is_read_set= current_field->isReadSet();
 
303
 
 
304
      /* We need to mark that we will "read" this field... */
 
305
      in_table->setReadSet(current_field->field_index);
 
306
 
 
307
      /* Read the string value of this field's contents */
 
308
      string_value= current_field->val_str(string_value);
 
309
 
 
310
      /* 
 
311
       * Reset the read bit after reading field to its original state.  This 
 
312
       * prevents the field from being included in the WHERE clause
 
313
       */
 
314
      current_field->setReadSet(is_read_set);
 
315
 
 
316
      change_record->add_after_value(string_value->c_ptr());
 
317
      string_value->free();
 
318
    }
 
319
 
 
320
    /* 
 
321
     * Add the WHERE clause values now...the fields which return true
 
322
     * for isReadSet() are in the WHERE clause.  For tables with no
 
323
     * primary or unique key, all fields will be returned.
 
324
     */
 
325
    if (current_field->isReadSet())
 
326
    {
 
327
      current_proto_field= change_record->add_where_field();
 
328
      current_proto_field->set_name(current_field->field_name);
 
329
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
330
      string_value= current_field->val_str(string_value);
 
331
      change_record->add_where_value(string_value->c_ptr());
 
332
      string_value->free();
 
333
    }
 
334
  }
 
335
 
 
336
  push(command);
 
337
}
 
338
 
 
339
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
 
340
{
 
341
  if (! is_active)
 
342
    return;
 
343
  
 
344
  message::Command command;
 
345
  command.set_type(message::Command::DELETE);
 
346
  command.set_timestamp(in_session->getCurrentTimestamp());
 
347
 
 
348
  setCommandTransactionContext(command, in_session);
 
349
 
 
350
  const char *schema_name= in_table->getShare()->db.str;
 
351
  const char *table_name= in_table->getShare()->table_name.str;
 
352
 
 
353
  command.set_schema(schema_name);
 
354
  command.set_table(table_name);
 
355
 
 
356
  /* 
 
357
   * Now we construct the specialized DeleteRecord command inside
 
358
   * the message::Command container...
 
359
   */
 
360
  message::DeleteRecord *change_record= command.mutable_delete_record();
 
361
 
 
362
  Field *current_field;
 
363
  Field **table_fields= in_table->field;
 
364
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
365
  string_value->set_charset(system_charset_info);
 
366
 
 
367
  message::Table::Field *current_proto_field;
 
368
 
 
369
  while ((current_field= *table_fields++) != NULL)
 
370
  {
 
371
    /*
 
372
     * Add the WHERE clause values now...the fields which return true
 
373
     * for isReadSet() are in the WHERE clause.  For tables with no
 
374
     * primary or unique key, all fields will be returned.
 
375
     */
 
376
    if (current_field->isReadSet())
 
377
    {
 
378
      current_proto_field= change_record->add_where_field();
 
379
      current_proto_field->set_name(current_field->field_name);
 
380
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
381
      string_value= current_field->val_str(string_value);
 
382
      change_record->add_where_value(string_value->c_ptr());
 
383
      string_value->free();
 
384
    }
 
385
  }
 
386
 
 
387
  push(command);
 
388
}
 
389
 
 
390
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
391
{
 
392
  if (! is_active)
 
393
    return;
 
394
  
 
395
  message::Command command;
 
396
  command.set_type(message::Command::RAW_SQL);
 
397
  command.set_timestamp(in_session->getCurrentTimestamp());
 
398
 
 
399
  setCommandTransactionContext(command, in_session);
 
400
 
 
401
  string query(in_query, in_query_len);
 
402
  command.set_sql(query);
 
403
 
 
404
  push(command);
 
405
}
 
406
 
 
407
void ReplicationServices::push(drizzled::message::Command &to_push)
 
408
{
 
409
  vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
 
410
  vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
 
411
  appl_start_iter= appliers.begin();
 
412
 
 
413
  plugin::CommandReplicator *cur_repl;
 
414
  plugin::CommandApplier *cur_appl;
 
415
 
 
416
  while (repl_iter != replicators.end())
 
417
  {
 
418
    cur_repl= *repl_iter;
 
419
    if (! cur_repl->isActive())
 
420
    {
 
421
      ++repl_iter;
 
422
      continue;
 
423
    }
 
424
    
 
425
    appl_iter= appl_start_iter;
 
426
    while (appl_iter != appliers.end())
 
427
    {
 
428
      cur_appl= *appl_iter;
 
429
 
 
430
      if (! cur_appl->isActive())
 
431
      {
 
432
        ++appl_iter;
 
433
        continue;
 
434
      }
 
435
 
 
436
      cur_repl->replicate(cur_appl, to_push);
 
437
      
 
438
      /* 
 
439
       * We update the timestamp for the last applied Command so that
185
440
       * publisher plugins can ask the replication services when the
186
 
       * last known applied Transaction was using the getLastAppliedTimestamp()
 
441
       * last known applied Command was using the getLastAppliedTimestamp()
187
442
       * method.
188
443
       */
189
 
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
 
444
      last_applied_timestamp.fetch_and_store(to_push.timestamp());
 
445
      ++appl_iter;
190
446
    }
191
 
    else
192
 
      return result;
 
447
    ++repl_iter;
193
448
  }
194
 
  return result;
195
 
}
196
 
 
197
 
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
198
 
{
199
 
  return replication_streams;
200
 
}
201
 
 
202
 
} /* namespace drizzled */
 
449
}