~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Brian Aker
  • Date: 2009-07-29 18:35:48 UTC
  • mfrom: (1101.1.12 merge)
  • Revision ID: brian@gaz-20090729183548-yp36iwoaemfc76z0
Merging Monty (which includes new replication)

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 * @file Server-side utility which is responsible for managing the 
26
26
 * communication between the kernel, replicator plugins, and applier plugins.
27
27
 *
28
 
 * TransactionServices is a bridge between modules and the kernel, and its
 
28
 * ReplicationServices is a bridge between modules and the kernel, and its
29
29
 * primary function is to take internal events (for instance the start of a 
30
30
 * transaction, the changing of a record, or the rollback of a transaction) 
31
31
 * and construct GPB Messages that are passed to the registered replicator and
38
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
39
 * these messages.
40
40
 *
41
 
 * @see /drizzled/message/transaction.proto
 
41
 * @see /drizzled/message/replication.proto
42
42
 */
43
43
 
44
44
#include "drizzled/server_includes.h"
45
 
#include "drizzled/transaction_services.h"
 
45
#include "drizzled/replication_services.h"
46
46
#include "drizzled/plugin/replicator.h"
47
47
#include "drizzled/plugin/applier.h"
48
 
#include "drizzled/message/transaction.pb.h"
 
48
#include "drizzled/message/replication.pb.h"
49
49
#include "drizzled/message/table.pb.h"
50
50
#include "drizzled/gettext.h"
51
51
#include "drizzled/session.h"
53
53
 
54
54
#include <vector>
55
55
 
56
 
drizzled::TransactionServices transaction_services;
57
 
 
58
 
void add_replicator(drizzled::plugin::Replicator *repl)
59
 
{
60
 
  transaction_services.attachReplicator(repl);
61
 
}
62
 
 
63
 
void remove_replicator(drizzled::plugin::Replicator *repl)
64
 
{
65
 
  transaction_services.detachReplicator(repl);
66
 
}
67
 
 
 
56
using namespace drizzled;
 
57
 
 
58
ReplicationServices replication_services;
 
59
 
 
60
void add_replicator(plugin::Replicator *replicator)
 
61
{
 
62
  replication_services.attachReplicator(replicator);
 
63
}
 
64
 
 
65
void remove_replicator(plugin::Replicator *replicator)
 
66
{
 
67
  replication_services.detachReplicator(replicator);
 
68
}
 
69
 
 
70
void add_applier(plugin::Applier *applier)
 
71
{
 
72
  replication_services.attachApplier(applier);
 
73
}
 
74
 
 
75
void remove_applier(plugin::Applier *applier)
 
76
{
 
77
  replication_services.detachApplier(applier);
 
78
}
68
79
 
69
80
namespace drizzled
70
81
{
71
82
 
72
 
void TransactionServices::attachReplicator(drizzled::plugin::Replicator *in_replicator)
 
83
ReplicationServices::ReplicationServices()
 
84
{
 
85
  is_active= false;
 
86
}
 
87
 
 
88
void ReplicationServices::evaluateActivePlugins()
 
89
{
 
90
  /* 
 
91
   * We loop through replicators and appliers, evaluating
 
92
   * whether or not there is at least one active replicator
 
93
   * and one active applier.  If not, we set is_active
 
94
   * to false.
 
95
   */
 
96
  bool tmp_is_active= false;
 
97
 
 
98
  if (replicators.size() == 0 || appliers.size() == 0)
 
99
  {
 
100
    is_active= false;
 
101
    return;
 
102
  }
 
103
 
 
104
  /* 
 
105
   * Determine if any remaining replicators and if those
 
106
   * replicators are active...if not, set is_active
 
107
   * to false
 
108
   */
 
109
  std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
 
110
  while (repl_iter != replicators.end())
 
111
  {
 
112
    if ((*repl_iter)->isActive())
 
113
    {
 
114
      tmp_is_active= true;
 
115
      break;
 
116
    }
 
117
    ++repl_iter;
 
118
  }
 
119
  if (! tmp_is_active)
 
120
  {
 
121
    /* No active replicators. Set is_active to false and exit. */
 
122
    is_active= false;
 
123
    return;
 
124
  }
 
125
 
 
126
  /* 
 
127
   * OK, we know there's at least one active replicator.
 
128
   *
 
129
   * Now determine if any remaining replicators and if those
 
130
   * replicators are active...if not, set is_active
 
131
   * to false
 
132
   */
 
133
  std::vector<plugin::Applier *>::iterator appl_iter= appliers.begin();
 
134
  while (appl_iter != appliers.end())
 
135
  {
 
136
    if ((*appl_iter)->isActive())
 
137
    {
 
138
      is_active= true;
 
139
      return;
 
140
    }
 
141
    ++appl_iter;
 
142
  }
 
143
  /* If we get here, there are no active appliers */
 
144
  is_active= false;
 
145
}
 
146
 
 
147
void ReplicationServices::attachReplicator(plugin::Replicator *in_replicator)
73
148
{
74
149
  replicators.push_back(in_replicator);
 
150
  evaluateActivePlugins();
75
151
}
76
152
 
77
 
void TransactionServices::detachReplicator(drizzled::plugin::Replicator *in_replicator)
 
153
void ReplicationServices::detachReplicator(plugin::Replicator *in_replicator)
78
154
{
79
155
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
 
156
  evaluateActivePlugins();
80
157
}
81
158
 
82
 
void TransactionServices::attachApplier(drizzled::plugin::Applier *in_applier)
 
159
void ReplicationServices::attachApplier(plugin::Applier *in_applier)
83
160
{
84
161
  appliers.push_back(in_applier);
 
162
  evaluateActivePlugins();
85
163
}
86
164
 
87
 
void TransactionServices::detachApplier(drizzled::plugin::Applier *in_applier)
 
165
void ReplicationServices::detachApplier(plugin::Applier *in_applier)
88
166
{
89
167
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
90
 
}
91
 
 
92
 
void TransactionServices::setCommandTransactionContext(drizzled::message::Command *in_command
 
168
  evaluateActivePlugins();
 
169
}
 
170
 
 
171
bool ReplicationServices::isActive() const
 
172
{
 
173
  return is_active;
 
174
}
 
175
 
 
176
void ReplicationServices::setCommandTransactionContext(message::Command *in_command
93
177
                                                     , Session *in_session) const
94
178
{
95
 
  using namespace drizzled::message;
96
 
 
97
 
  TransactionContext *trx= in_command->mutable_transaction_context();
 
179
  message::TransactionContext *trx= in_command->mutable_transaction_context();
98
180
  trx->set_server_id(in_session->getServerId());
99
181
  trx->set_transaction_id(in_session->getTransactionId());
100
182
}
101
183
 
102
 
void TransactionServices::startTransaction(Session *in_session)
103
 
{
104
 
  using namespace drizzled::message;
105
 
  
106
 
  if (replicators.size() == 0 || appliers.size() == 0)
107
 
    return;
108
 
  
109
 
  Command command;
110
 
  command.set_type(Command::START_TRANSACTION);
111
 
  command.set_timestamp(in_session->getCurrentTimestamp());
112
 
 
113
 
  setCommandTransactionContext(&command, in_session);
114
 
  
115
 
  push(&command);
116
 
}
117
 
 
118
 
void TransactionServices::commitTransaction(Session *in_session)
119
 
{
120
 
  using namespace drizzled::message;
121
 
  
122
 
  if (replicators.size() == 0 || appliers.size() == 0)
123
 
    return;
124
 
  
125
 
  Command command;
126
 
  command.set_type(Command::COMMIT);
127
 
  command.set_timestamp(in_session->getCurrentTimestamp());
128
 
 
129
 
  setCommandTransactionContext(&command, in_session);
130
 
  
131
 
  push(&command);
132
 
}
133
 
 
134
 
void TransactionServices::rollbackTransaction(Session *in_session)
135
 
{
136
 
  using namespace drizzled::message;
137
 
  
138
 
  if (replicators.size() == 0 || appliers.size() == 0)
139
 
    return;
140
 
  
141
 
  Command command;
142
 
  command.set_type(Command::ROLLBACK);
143
 
  command.set_timestamp(in_session->getCurrentTimestamp());
144
 
 
145
 
  setCommandTransactionContext(&command, in_session);
146
 
  
147
 
  push(&command);
148
 
}
149
 
 
150
 
void TransactionServices::insertRecord(Session *in_session, Table *in_table)
151
 
{
152
 
  using namespace drizzled::message;
153
 
  
154
 
  if (replicators.size() == 0 || appliers.size() == 0)
155
 
    return;
156
 
 
157
 
  Command command;
158
 
  command.set_type(Command::INSERT);
 
184
void ReplicationServices::startTransaction(Session *in_session)
 
185
{
 
186
  if (! is_active)
 
187
    return;
 
188
  
 
189
  message::Command command;
 
190
  command.set_type(message::Command::START_TRANSACTION);
 
191
  command.set_timestamp(in_session->getCurrentTimestamp());
 
192
 
 
193
  setCommandTransactionContext(&command, in_session);
 
194
  
 
195
  push(&command);
 
196
}
 
197
 
 
198
void ReplicationServices::commitTransaction(Session *in_session)
 
199
{
 
200
  if (! is_active)
 
201
    return;
 
202
  
 
203
  message::Command command;
 
204
  command.set_type(message::Command::COMMIT);
 
205
  command.set_timestamp(in_session->getCurrentTimestamp());
 
206
 
 
207
  setCommandTransactionContext(&command, in_session);
 
208
  
 
209
  push(&command);
 
210
}
 
211
 
 
212
void ReplicationServices::rollbackTransaction(Session *in_session)
 
213
{
 
214
  if (! is_active)
 
215
    return;
 
216
  
 
217
  message::Command command;
 
218
  command.set_type(message::Command::ROLLBACK);
 
219
  command.set_timestamp(in_session->getCurrentTimestamp());
 
220
 
 
221
  setCommandTransactionContext(&command, in_session);
 
222
  
 
223
  push(&command);
 
224
}
 
225
 
 
226
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
 
227
{
 
228
  if (! is_active)
 
229
    return;
 
230
 
 
231
  message::Command command;
 
232
  command.set_type(message::Command::INSERT);
159
233
  command.set_timestamp(in_session->getCurrentTimestamp());
160
234
 
161
235
  setCommandTransactionContext(&command, in_session);
168
242
 
169
243
  /* 
170
244
   * Now we construct the specialized InsertRecord command inside
171
 
   * the Command container...
 
245
   * the message::Command container...
172
246
   */
173
 
  InsertRecord *change_record= command.mutable_insert_record();
 
247
  message::InsertRecord *change_record= command.mutable_insert_record();
174
248
 
175
249
  Field *current_field;
176
250
  Field **table_fields= in_table->field;
177
 
  String *string_value= new (in_session->mem_root) String(100); /* 100 initially. field->val_str() is responsible for re-adjusting */
 
251
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
178
252
  string_value->set_charset(system_charset_info);
179
253
 
180
 
  Table::Field *cur_field;
 
254
  message::Table::Field *current_proto_field;
 
255
 
 
256
  /* We will read all the table's fields... */
 
257
  in_table->setReadSet();
181
258
 
182
259
  while ((current_field= *table_fields++) != NULL) 
183
260
  {
184
 
    cur_field= change_record->add_insert_field();
185
 
    cur_field->set_name(std::string(current_field->field_name));
186
 
    cur_field->set_type(Table::Field::VARCHAR); /* @TODO real types! */
 
261
    current_proto_field= change_record->add_insert_field();
 
262
    current_proto_field->set_name(std::string(current_field->field_name));
 
263
    current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
187
264
    string_value= current_field->val_str(string_value);
188
265
    change_record->add_insert_value(std::string(string_value->c_ptr()));
189
 
    string_value->free(); /* I wish there was a clear() method... */
 
266
    string_value->free();
190
267
  }
191
 
 
192
 
  if (string_value)
193
 
    delete string_value; /* Is this needed with memroot allocation? */
194
268
  
195
269
  push(&command);
196
270
}
197
271
 
198
 
void TransactionServices::updateRecord(Session *in_session, Table *in_table, const unsigned char *, const unsigned char *)
 
272
void ReplicationServices::updateRecord(Session *in_session,
 
273
                                       Table *in_table, 
 
274
                                       const unsigned char *old_record, 
 
275
                                       const unsigned char *new_record)
199
276
{
200
 
  using namespace drizzled::message;
201
 
  
202
 
  if (replicators.size() == 0 || appliers.size() == 0)
 
277
  if (! is_active)
203
278
    return;
204
279
  
205
 
  Command command;
206
 
  command.set_type(Command::UPDATE);
 
280
  message::Command command;
 
281
  command.set_type(message::Command::UPDATE);
207
282
  command.set_timestamp(in_session->getCurrentTimestamp());
208
283
 
209
284
  setCommandTransactionContext(&command, in_session);
216
291
 
217
292
  /* 
218
293
   * Now we construct the specialized UpdateRecord command inside
219
 
   * the Command container...
 
294
   * the message::Command container...
220
295
   */
221
 
  //UpdateRecord *change_record= command.mutable_update_record();
 
296
  message::UpdateRecord *change_record= command.mutable_update_record();
 
297
 
 
298
  Field *current_field;
 
299
  Field **table_fields= in_table->field;
 
300
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
301
  string_value->set_charset(system_charset_info);
 
302
 
 
303
  message::Table::Field *current_proto_field;
 
304
 
 
305
  while ((current_field= *table_fields++) != NULL) 
 
306
  {
 
307
    /*
 
308
     * The below really should be moved into the Field API and Record API.  But for now
 
309
     * we do this crazy pointer fiddling to figure out if the current field
 
310
     * has been updated in the supplied record raw byte pointers.
 
311
     */
 
312
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
313
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
314
 
 
315
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
316
 
 
317
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
318
    {
 
319
      /* Field is changed from old to new */
 
320
      current_proto_field= change_record->add_update_field();
 
321
      current_proto_field->set_name(std::string(current_field->field_name));
 
322
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
323
 
 
324
      /* Store the original "read bit" for this field */
 
325
      bool is_read_set= current_field->isReadSet();
 
326
 
 
327
      /* We need to mark that we will "read" this field... */
 
328
      in_table->setReadSet(current_field->field_index);
 
329
 
 
330
      /* Read the string value of this field's contents */
 
331
      string_value= current_field->val_str(string_value);
 
332
 
 
333
      /* 
 
334
       * Reset the read bit after reading field to its original state.  This 
 
335
       * prevents the field from being included in the WHERE clause
 
336
       */
 
337
      current_field->setReadSet(is_read_set);
 
338
 
 
339
      change_record->add_after_value(std::string(string_value->c_ptr()));
 
340
      string_value->free();
 
341
    }
 
342
 
 
343
    /* 
 
344
     * Add the WHERE clause values now...the fields which return true
 
345
     * for isReadSet() are in the WHERE clause.  For tables with no
 
346
     * primary or unique key, all fields will be returned.
 
347
     */
 
348
    if (current_field->isReadSet())
 
349
    {
 
350
      current_proto_field= change_record->add_where_field();
 
351
      current_proto_field->set_name(std::string(current_field->field_name));
 
352
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
353
      string_value= current_field->val_str(string_value);
 
354
      change_record->add_where_value(std::string(string_value->c_ptr()));
 
355
      string_value->free();
 
356
    }
 
357
  }
222
358
 
223
359
  push(&command);
224
360
}
225
361
 
226
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
362
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
227
363
{
228
 
  using namespace drizzled::message;
229
 
  
230
 
  if (replicators.size() == 0 || appliers.size() == 0)
 
364
  if (! is_active)
231
365
    return;
232
366
  
233
 
  Command command;
234
 
  command.set_type(Command::DELETE);
 
367
  message::Command command;
 
368
  command.set_type(message::Command::DELETE);
235
369
  command.set_timestamp(in_session->getCurrentTimestamp());
236
370
 
237
371
  setCommandTransactionContext(&command, in_session);
244
378
 
245
379
  /* 
246
380
   * Now we construct the specialized DeleteRecord command inside
247
 
   * the Command container...
 
381
   * the message::Command container...
248
382
   */
249
 
  //DeleteRecord *change_record= command.mutable_delete_record();
250
 
  
 
383
  message::DeleteRecord *change_record= command.mutable_delete_record();
 
384
 
 
385
  Field *current_field;
 
386
  Field **table_fields= in_table->field;
 
387
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
388
  string_value->set_charset(system_charset_info);
 
389
 
 
390
  message::Table::Field *current_proto_field;
 
391
 
 
392
  while ((current_field= *table_fields++) != NULL)
 
393
  {
 
394
    /*
 
395
     * Add the WHERE clause values now...the fields which return true
 
396
     * for isReadSet() are in the WHERE clause.  For tables with no
 
397
     * primary or unique key, all fields will be returned.
 
398
     */
 
399
    if (current_field->isReadSet())
 
400
    {
 
401
      current_proto_field= change_record->add_where_field();
 
402
      current_proto_field->set_name(std::string(current_field->field_name));
 
403
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
 
404
      string_value= current_field->val_str(string_value);
 
405
      change_record->add_where_value(std::string(string_value->c_ptr()));
 
406
      string_value->free();
 
407
    }
 
408
  }
 
409
 
251
410
  push(&command);
252
411
}
253
412
 
254
 
void TransactionServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
413
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
255
414
{
256
 
  using namespace drizzled::message;
257
 
  
258
 
  if (replicators.size() == 0 || appliers.size() == 0)
 
415
  if (! is_active)
259
416
    return;
260
417
  
261
 
  Command command;
262
 
  command.set_type(Command::RAW_SQL);
 
418
  message::Command command;
 
419
  command.set_type(message::Command::RAW_SQL);
263
420
  command.set_timestamp(in_session->getCurrentTimestamp());
264
421
 
265
422
  setCommandTransactionContext(&command, in_session);
268
425
  command.set_sql(query);
269
426
 
270
427
  push(&command);
271
 
  
272
428
}
273
429
 
274
 
void TransactionServices::push(drizzled::message::Command *to_push)
 
430
void ReplicationServices::push(message::Command *to_push)
275
431
{
276
 
  std::vector<drizzled::plugin::Replicator *>::iterator repl_iter= replicators.begin();
277
 
  std::vector<drizzled::plugin::Applier *>::iterator appl_start_iter, appl_iter;
 
432
  std::vector<plugin::Replicator *>::iterator repl_iter= replicators.begin();
 
433
  std::vector<plugin::Applier *>::iterator appl_start_iter, appl_iter;
278
434
  appl_start_iter= appliers.begin();
279
435
 
280
 
  drizzled::plugin::Replicator *cur_repl;
281
 
  drizzled::plugin::Applier *cur_appl;
 
436
  plugin::Replicator *cur_repl;
 
437
  plugin::Applier *cur_appl;
282
438
 
283
439
  while (repl_iter != replicators.end())
284
440
  {
301
457
      }
302
458
 
303
459
      cur_repl->replicate(cur_appl, to_push);
 
460
      
 
461
      /* 
 
462
       * We update the timestamp for the last applied Command so that
 
463
       * publisher plugins can ask the replication services when the
 
464
       * last known applied Command was using the getLastAppliedTimestamp()
 
465
       * method.
 
466
       */
 
467
      last_applied_timestamp.fetch_and_store(to_push->timestamp());
304
468
      ++appl_iter;
305
469
    }
306
470
    ++repl_iter;
307
471
  }
308
472
}
309
473
 
310
 
 
311
474
} /* end namespace drizzled */