~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Jay Pipes
  • Date: 2010-03-16 21:30:44 UTC
  • mto: This revision was merged to the branch mainline in revision 1351.
  • Revision ID: jpipes@serialcoder-20100316213044-e4f0xc0aga34l1es
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices.  Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *  Copyright (c) 2009-2010 Jay Pipes <jaypipes@gmail.com>
5
6
 *
6
7
 *  Authors:
7
8
 *
8
 
 *    Jay Pipes <joinfu@sun.com>
 
9
 *    Jay Pipes <jaypipes@gmail.com>
9
10
 *
10
11
 *  This program is free software; you can redistribute it and/or modify
11
12
 *  it under the terms of the GNU General Public License as published by
23
24
 
24
25
/**
25
26
 * @file Server-side utility which is responsible for managing the 
26
 
 * communication between the kernel, replicator plugins, and applier plugins.
27
 
 *
28
 
 * ReplicationServices is a bridge between modules and the kernel, and its
29
 
 * primary function is to take internal events (for instance the start of a 
30
 
 * transaction, the changing of a record, or the rollback of a transaction) 
31
 
 * and construct GPB Messages that are passed to the registered replicator and
32
 
 * applier plugins.
33
 
 *
34
 
 * The reason for this functionality is to encapsulate all communication
35
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
36
 
 * Instead of the plugin having to understand the (often fluidly changing)
37
 
 * mechanics of the kernel, all the plugin needs to understand is the message
38
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
 
 * these messages.
40
 
 *
41
 
 * @see /drizzled/message/transaction.proto
42
 
 *
43
 
 * @todo
44
 
 *
45
 
 * We really should store the raw bytes in the messages, not the
46
 
 * String value of the Field.  But, to do that, the
47
 
 * statement_transform library needs first to be updated
48
 
 * to include the transformation code to convert raw
49
 
 * Drizzle-internal Field byte representation into something
50
 
 * plugins can understand.
51
 
 */
 
27
 * communication between the kernel and the replication plugins:
 
28
 *
 
29
 * - TransactionReplicator
 
30
 * - TransactionApplier
 
31
 * - Publisher
 
32
 * - Subscriber
 
33
 *
 
34
 * ReplicationServices is a bridge between replication modules and the kernel,
 
35
 * and its primary function is to  */
52
36
 
53
37
#include "config.h"
54
38
#include "drizzled/replication_services.h"
161
145
  return is_active;
162
146
}
163
147
 
164
 
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
165
 
{
166
 
  message::Transaction *transaction= in_session->getTransactionMessage();
167
 
 
168
 
  if (unlikely(transaction == NULL))
169
 
  {
170
 
    /* 
171
 
     * Allocate and initialize a new transaction message 
172
 
     * for this Session object.  Session is responsible for
173
 
     * deleting transaction message when done with it.
174
 
     */
175
 
    transaction= new (nothrow) message::Transaction();
176
 
    initTransaction(*transaction, in_session);
177
 
    in_session->setTransactionMessage(transaction);
178
 
    return transaction;
179
 
  }
180
 
  else
181
 
    return transaction;
182
 
}
183
 
 
184
 
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
185
 
                                          Session *in_session) const
186
 
{
187
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
188
 
  trx->set_server_id(in_session->getServerId());
189
 
  trx->set_transaction_id(in_session->getQueryId());
190
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
191
 
}
192
 
 
193
 
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
194
 
                                              Session *in_session) const
195
 
{
196
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
197
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
198
 
}
199
 
 
200
 
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
201
 
                                             Session *in_session) const
202
 
{
203
 
  delete in_transaction;
204
 
  in_session->setStatementMessage(NULL);
205
 
  in_session->setTransactionMessage(NULL);
206
 
}
207
 
 
208
 
void ReplicationServices::commitTransaction(Session *in_session)
209
 
{
210
 
  if (! is_active)
211
 
    return;
212
 
 
213
 
  /* If there is an active statement message, finalize it */
214
 
  message::Statement *statement= in_session->getStatementMessage();
215
 
 
216
 
  if (statement != NULL)
217
 
  {
218
 
    finalizeStatement(*statement, in_session);
219
 
  }
220
 
  else
221
 
    return; /* No data modification occurred inside the transaction */
222
 
  
223
 
  message::Transaction* transaction= getActiveTransaction(in_session);
224
 
 
225
 
  finalizeTransaction(*transaction, in_session);
226
 
  
227
 
  push(*transaction);
228
 
 
229
 
  cleanupTransaction(transaction, in_session);
230
 
}
231
 
 
232
 
void ReplicationServices::initStatement(message::Statement &statement,
233
 
                                        message::Statement::Type in_type,
234
 
                                        Session *in_session) const
235
 
{
236
 
  statement.set_type(in_type);
237
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
238
 
  /** @TODO Set sql string optionally */
239
 
}
240
 
 
241
 
void ReplicationServices::finalizeStatement(message::Statement &statement,
242
 
                                            Session *in_session) const
243
 
{
244
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
245
 
  in_session->setStatementMessage(NULL);
246
 
}
247
 
 
248
 
void ReplicationServices::rollbackTransaction(Session *in_session)
249
 
{
250
 
  if (! is_active)
251
 
    return;
252
 
  
253
 
  message::Transaction *transaction= getActiveTransaction(in_session);
254
 
 
255
 
  /*
256
 
   * OK, so there are two situations that we need to deal with here:
257
 
   *
258
 
   * 1) We receive an instruction to ROLLBACK the current transaction
259
 
   *    and the currently-stored Transaction message is *self-contained*, 
260
 
   *    meaning that no Statement messages in the Transaction message
261
 
   *    contain a message having its segment_id member greater than 1.  If
262
 
   *    no non-segment ID 1 members are found, we can simply clear the
263
 
   *    current Transaction message and remove it from memory.
264
 
   *
265
 
   * 2) If the Transaction message does indeed have a non-end segment, that
266
 
   *    means that a bulk update/delete/insert Transaction message segment
267
 
   *    has previously been sent over the wire to replicators.  In this case, 
268
 
   *    we need to package a Transaction with a Statement message of type
269
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
270
 
   *    messages must be un-applied.
271
 
   */
272
 
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
273
 
  {
274
 
    /*
275
 
     * Clear the transaction, create a Rollback statement message, 
276
 
     * attach it to the transaction, and push it to replicators.
277
 
     */
278
 
    transaction->Clear();
279
 
    initTransaction(*transaction, in_session);
280
 
 
281
 
    message::Statement *statement= transaction->add_statement();
282
 
 
283
 
    initStatement(*statement, message::Statement::ROLLBACK, in_session);
284
 
    finalizeStatement(*statement, in_session);
285
 
 
286
 
    finalizeTransaction(*transaction, in_session);
287
 
    
288
 
    push(*transaction);
289
 
  }
290
 
  cleanupTransaction(transaction, in_session);
291
 
}
292
 
 
293
 
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
294
 
                                                                 Table *in_table) const
295
 
{
296
 
  message::Statement *statement= in_session->getStatementMessage();
297
 
  /*
298
 
   * We check to see if the current Statement message is of type INSERT.
299
 
   * If it is not, we finalize the current Statement and ensure a new
300
 
   * InsertStatement is created.
301
 
   */
302
 
  if (statement != NULL &&
303
 
      statement->type() != message::Statement::INSERT)
304
 
  {
305
 
    finalizeStatement(*statement, in_session);
306
 
    statement= in_session->getStatementMessage();
307
 
  }
308
 
 
309
 
  if (statement == NULL)
310
 
  {
311
 
    message::Transaction *transaction= getActiveTransaction(in_session);
312
 
    /* 
313
 
     * Transaction message initialized and set, but no statement created
314
 
     * yet.  We construct one and initialize it, here, then return the
315
 
     * message after attaching the new Statement message pointer to the 
316
 
     * Session for easy retrieval later...
317
 
     */
318
 
    statement= transaction->add_statement();
319
 
    setInsertHeader(*statement, in_session, in_table);
320
 
    in_session->setStatementMessage(statement);
321
 
  }
322
 
  return *statement;
323
 
}
324
 
 
325
 
void ReplicationServices::setInsertHeader(message::Statement &statement,
326
 
                                          Session *in_session,
327
 
                                          Table *in_table) const
328
 
{
329
 
  initStatement(statement, message::Statement::INSERT, in_session);
330
 
 
331
 
  /* 
332
 
   * Now we construct the specialized InsertHeader message inside
333
 
   * the generalized message::Statement container...
334
 
   */
335
 
  /* Set up the insert header */
336
 
  message::InsertHeader *header= statement.mutable_insert_header();
337
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
338
 
 
339
 
  const char *schema_name= in_table->getShare()->db.str;
340
 
  const char *table_name= in_table->getShare()->table_name.str;
341
 
 
342
 
  table_metadata->set_schema_name(schema_name);
343
 
  table_metadata->set_table_name(table_name);
344
 
 
345
 
  Field *current_field;
346
 
  Field **table_fields= in_table->field;
347
 
 
348
 
  message::FieldMetadata *field_metadata;
349
 
 
350
 
  /* We will read all the table's fields... */
351
 
  in_table->setReadSet();
352
 
 
353
 
  while ((current_field= *table_fields++) != NULL) 
354
 
  {
355
 
    field_metadata= header->add_field_metadata();
356
 
    field_metadata->set_name(current_field->field_name);
357
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
358
 
  }
359
 
}
360
 
 
361
 
bool ReplicationServices::insertRecord(Session *in_session, Table *in_table)
362
 
{
363
 
  if (! is_active)
364
 
    return false;
365
 
  /**
366
 
   * We do this check here because we don't want to even create a 
367
 
   * statement if there isn't a primary key on the table...
368
 
   *
369
 
   * @todo
370
 
   *
371
 
   * Multi-column primary keys are handled how exactly?
372
 
   */
373
 
  if (in_table->s->primary_key == MAX_KEY)
374
 
  {
375
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
376
 
    return true;
377
 
  }
378
 
 
379
 
  message::Statement &statement= getInsertStatement(in_session, in_table);
380
 
 
381
 
  message::InsertData *data= statement.mutable_insert_data();
382
 
  data->set_segment_id(1);
383
 
  data->set_end_segment(true);
384
 
  message::InsertRecord *record= data->add_record();
385
 
 
386
 
  Field *current_field;
387
 
  Field **table_fields= in_table->field;
388
 
 
389
 
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
390
 
  string_value->set_charset(system_charset_info);
391
 
 
392
 
  /* We will read all the table's fields... */
393
 
  in_table->setReadSet();
394
 
 
395
 
  while ((current_field= *table_fields++) != NULL) 
396
 
  {
397
 
    string_value= current_field->val_str(string_value);
398
 
    record->add_insert_value(string_value->c_ptr(), string_value->length());
399
 
    string_value->free();
400
 
  }
401
 
  return false;
402
 
}
403
 
 
404
 
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
405
 
                                                            Table *in_table,
406
 
                                                            const unsigned char *old_record, 
407
 
                                                            const unsigned char *new_record) const
408
 
{
409
 
  message::Statement *statement= in_session->getStatementMessage();
410
 
  /*
411
 
   * We check to see if the current Statement message is of type UPDATE.
412
 
   * If it is not, we finalize the current Statement and ensure a new
413
 
   * UpdateStatement is created.
414
 
   */
415
 
  if (statement != NULL &&
416
 
      statement->type() != message::Statement::UPDATE)
417
 
  {
418
 
    finalizeStatement(*statement, in_session);
419
 
    statement= in_session->getStatementMessage();
420
 
  }
421
 
 
422
 
  if (statement == NULL)
423
 
  {
424
 
    message::Transaction *transaction= getActiveTransaction(in_session);
425
 
    /* 
426
 
     * Transaction message initialized and set, but no statement created
427
 
     * yet.  We construct one and initialize it, here, then return the
428
 
     * message after attaching the new Statement message pointer to the 
429
 
     * Session for easy retrieval later...
430
 
     */
431
 
    statement= transaction->add_statement();
432
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
433
 
    in_session->setStatementMessage(statement);
434
 
  }
435
 
  return *statement;
436
 
}
437
 
 
438
 
void ReplicationServices::setUpdateHeader(message::Statement &statement,
439
 
                                          Session *in_session,
440
 
                                          Table *in_table,
441
 
                                          const unsigned char *old_record, 
442
 
                                          const unsigned char *new_record) const
443
 
{
444
 
  initStatement(statement, message::Statement::UPDATE, in_session);
445
 
 
446
 
  /* 
447
 
   * Now we construct the specialized UpdateHeader message inside
448
 
   * the generalized message::Statement container...
449
 
   */
450
 
  /* Set up the update header */
451
 
  message::UpdateHeader *header= statement.mutable_update_header();
452
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
453
 
 
454
 
  const char *schema_name= in_table->getShare()->db.str;
455
 
  const char *table_name= in_table->getShare()->table_name.str;
456
 
 
457
 
  table_metadata->set_schema_name(schema_name);
458
 
  table_metadata->set_table_name(table_name);
459
 
 
460
 
  Field *current_field;
461
 
  Field **table_fields= in_table->field;
462
 
 
463
 
  message::FieldMetadata *field_metadata;
464
 
 
465
 
  /* We will read all the table's fields... */
466
 
  in_table->setReadSet();
467
 
 
468
 
  while ((current_field= *table_fields++) != NULL) 
469
 
  {
470
 
    /*
471
 
     * We add the "key field metadata" -- i.e. the fields which is
472
 
     * the primary key for the table.
473
 
     */
474
 
    if (in_table->s->fieldInPrimaryKey(current_field))
475
 
    {
476
 
      field_metadata= header->add_key_field_metadata();
477
 
      field_metadata->set_name(current_field->field_name);
478
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
479
 
    }
480
 
 
481
 
    /*
482
 
     * The below really should be moved into the Field API and Record API.  But for now
483
 
     * we do this crazy pointer fiddling to figure out if the current field
484
 
     * has been updated in the supplied record raw byte pointers.
485
 
     */
486
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
487
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
488
 
 
489
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
490
 
 
491
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
492
 
    {
493
 
      /* Field is changed from old to new */
494
 
      field_metadata= header->add_set_field_metadata();
495
 
      field_metadata->set_name(current_field->field_name);
496
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
497
 
    }
498
 
  }
499
 
}
500
 
void ReplicationServices::updateRecord(Session *in_session,
501
 
                                       Table *in_table, 
502
 
                                       const unsigned char *old_record, 
503
 
                                       const unsigned char *new_record)
504
 
{
505
 
  if (! is_active)
506
 
    return;
507
 
 
508
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
509
 
 
510
 
  message::UpdateData *data= statement.mutable_update_data();
511
 
  data->set_segment_id(1);
512
 
  data->set_end_segment(true);
513
 
  message::UpdateRecord *record= data->add_record();
514
 
 
515
 
  Field *current_field;
516
 
  Field **table_fields= in_table->field;
517
 
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
518
 
  string_value->set_charset(system_charset_info);
519
 
 
520
 
  while ((current_field= *table_fields++) != NULL) 
521
 
  {
522
 
    /*
523
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
524
 
     * but then realized that an UPDATE statement could potentially have different values for
525
 
     * the SET field.  For instance, imagine this SQL scenario:
526
 
     *
527
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
528
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
529
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
530
 
     *
531
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
532
 
     *
533
 
     * The below really should be moved into the Field API and Record API.  But for now
534
 
     * we do this crazy pointer fiddling to figure out if the current field
535
 
     * has been updated in the supplied record raw byte pointers.
536
 
     */
537
 
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
538
 
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
539
 
 
540
 
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
541
 
 
542
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
543
 
    {
544
 
      /* Store the original "read bit" for this field */
545
 
      bool is_read_set= current_field->isReadSet();
546
 
 
547
 
      /* We need to mark that we will "read" this field... */
548
 
      in_table->setReadSet(current_field->field_index);
549
 
 
550
 
      /* Read the string value of this field's contents */
551
 
      string_value= current_field->val_str(string_value);
552
 
 
553
 
      /* 
554
 
       * Reset the read bit after reading field to its original state.  This 
555
 
       * prevents the field from being included in the WHERE clause
556
 
       */
557
 
      current_field->setReadSet(is_read_set);
558
 
 
559
 
      record->add_after_value(string_value->c_ptr(), string_value->length());
560
 
      string_value->free();
561
 
    }
562
 
 
563
 
    /* 
564
 
     * Add the WHERE clause values now...for now, this means the
565
 
     * primary key field value.  Replication only supports tables
566
 
     * with a primary key.
567
 
     */
568
 
    if (in_table->s->fieldInPrimaryKey(current_field))
569
 
    {
570
 
      /**
571
 
       * To say the below is ugly is an understatement. But it works.
572
 
       * 
573
 
       * @todo Move this crap into a real Record API.
574
 
       */
575
 
      string_value= current_field->val_str(string_value,
576
 
                                           old_record + 
577
 
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
578
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
579
 
      string_value->free();
580
 
    }
581
 
 
582
 
  }
583
 
}
584
 
 
585
 
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
586
 
                                                            Table *in_table) const
587
 
{
588
 
  message::Statement *statement= in_session->getStatementMessage();
589
 
  /*
590
 
   * We check to see if the current Statement message is of type DELETE.
591
 
   * If it is not, we finalize the current Statement and ensure a new
592
 
   * DeleteStatement is created.
593
 
   */
594
 
  if (statement != NULL &&
595
 
      statement->type() != message::Statement::DELETE)
596
 
  {
597
 
    finalizeStatement(*statement, in_session);
598
 
    statement= in_session->getStatementMessage();
599
 
  }
600
 
 
601
 
  if (statement == NULL)
602
 
  {
603
 
    message::Transaction *transaction= getActiveTransaction(in_session);
604
 
    /* 
605
 
     * Transaction message initialized and set, but no statement created
606
 
     * yet.  We construct one and initialize it, here, then return the
607
 
     * message after attaching the new Statement message pointer to the 
608
 
     * Session for easy retrieval later...
609
 
     */
610
 
    statement= transaction->add_statement();
611
 
    setDeleteHeader(*statement, in_session, in_table);
612
 
    in_session->setStatementMessage(statement);
613
 
  }
614
 
  return *statement;
615
 
}
616
 
 
617
 
void ReplicationServices::setDeleteHeader(message::Statement &statement,
618
 
                                          Session *in_session,
619
 
                                          Table *in_table) const
620
 
{
621
 
  initStatement(statement, message::Statement::DELETE, in_session);
622
 
 
623
 
  /* 
624
 
   * Now we construct the specialized DeleteHeader message inside
625
 
   * the generalized message::Statement container...
626
 
   */
627
 
  message::DeleteHeader *header= statement.mutable_delete_header();
628
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
629
 
 
630
 
  const char *schema_name= in_table->getShare()->db.str;
631
 
  const char *table_name= in_table->getShare()->table_name.str;
632
 
 
633
 
  table_metadata->set_schema_name(schema_name);
634
 
  table_metadata->set_table_name(table_name);
635
 
 
636
 
  Field *current_field;
637
 
  Field **table_fields= in_table->field;
638
 
 
639
 
  message::FieldMetadata *field_metadata;
640
 
 
641
 
  while ((current_field= *table_fields++) != NULL) 
642
 
  {
643
 
    /* 
644
 
     * Add the WHERE clause values now...for now, this means the
645
 
     * primary key field value.  Replication only supports tables
646
 
     * with a primary key.
647
 
     */
648
 
    if (in_table->s->fieldInPrimaryKey(current_field))
649
 
    {
650
 
      field_metadata= header->add_key_field_metadata();
651
 
      field_metadata->set_name(current_field->field_name);
652
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
653
 
    }
654
 
  }
655
 
}
656
 
 
657
 
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
658
 
{
659
 
  if (! is_active)
660
 
    return;
661
 
 
662
 
  message::Statement &statement= getDeleteStatement(in_session, in_table);
663
 
 
664
 
  message::DeleteData *data= statement.mutable_delete_data();
665
 
  data->set_segment_id(1);
666
 
  data->set_end_segment(true);
667
 
  message::DeleteRecord *record= data->add_record();
668
 
 
669
 
  Field *current_field;
670
 
  Field **table_fields= in_table->field;
671
 
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
672
 
  string_value->set_charset(system_charset_info);
673
 
 
674
 
  while ((current_field= *table_fields++) != NULL) 
675
 
  {
676
 
    /* 
677
 
     * Add the WHERE clause values now...for now, this means the
678
 
     * primary key field value.  Replication only supports tables
679
 
     * with a primary key.
680
 
     */
681
 
    if (in_table->s->fieldInPrimaryKey(current_field))
682
 
    {
683
 
      string_value= current_field->val_str(string_value);
684
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
685
 
      /**
686
 
       * @TODO Store optional old record value in the before data member
687
 
       */
688
 
      string_value->free();
689
 
    }
690
 
  }
691
 
}
692
 
 
693
 
void ReplicationServices::createTable(Session *in_session,
694
 
                                      const message::Table &table)
695
 
{
696
 
  if (! is_active)
697
 
    return;
698
 
  
699
 
  message::Transaction *transaction= getActiveTransaction(in_session);
700
 
  message::Statement *statement= transaction->add_statement();
701
 
 
702
 
  initStatement(*statement, message::Statement::CREATE_TABLE, in_session);
703
 
 
704
 
  /* 
705
 
   * Construct the specialized CreateTableStatement message and attach
706
 
   * it to the generic Statement message
707
 
   */
708
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
709
 
  message::Table *new_table_message= create_table_statement->mutable_table();
710
 
  *new_table_message= table;
711
 
 
712
 
  finalizeStatement(*statement, in_session);
713
 
 
714
 
  finalizeTransaction(*transaction, in_session);
715
 
  
716
 
  push(*transaction);
717
 
 
718
 
  cleanupTransaction(transaction, in_session);
719
 
 
720
 
}
721
 
 
722
 
void ReplicationServices::createSchema(Session *in_session,
723
 
                                       const message::Schema &schema)
724
 
{
725
 
  if (! is_active)
726
 
    return;
727
 
  
728
 
  message::Transaction *transaction= getActiveTransaction(in_session);
729
 
  message::Statement *statement= transaction->add_statement();
730
 
 
731
 
  initStatement(*statement, message::Statement::CREATE_SCHEMA, in_session);
732
 
 
733
 
  /* 
734
 
   * Construct the specialized CreateSchemaStatement message and attach
735
 
   * it to the generic Statement message
736
 
   */
737
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
738
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
739
 
  *new_schema_message= schema;
740
 
 
741
 
  finalizeStatement(*statement, in_session);
742
 
 
743
 
  finalizeTransaction(*transaction, in_session);
744
 
  
745
 
  push(*transaction);
746
 
 
747
 
  cleanupTransaction(transaction, in_session);
748
 
 
749
 
}
750
 
 
751
 
void ReplicationServices::dropSchema(Session *in_session, const string &schema_name)
752
 
{
753
 
  if (! is_active)
754
 
    return;
755
 
  
756
 
  message::Transaction *transaction= getActiveTransaction(in_session);
757
 
  message::Statement *statement= transaction->add_statement();
758
 
 
759
 
  initStatement(*statement, message::Statement::DROP_SCHEMA, in_session);
760
 
 
761
 
  /* 
762
 
   * Construct the specialized DropSchemaStatement message and attach
763
 
   * it to the generic Statement message
764
 
   */
765
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
766
 
 
767
 
  drop_schema_statement->set_schema_name(schema_name);
768
 
 
769
 
  finalizeStatement(*statement, in_session);
770
 
 
771
 
  finalizeTransaction(*transaction, in_session);
772
 
  
773
 
  push(*transaction);
774
 
 
775
 
  cleanupTransaction(transaction, in_session);
776
 
}
777
 
 
778
 
void ReplicationServices::dropTable(Session *in_session,
779
 
                                    const string &schema_name,
780
 
                                    const string &table_name,
781
 
                                    bool if_exists)
782
 
{
783
 
  if (! is_active)
784
 
    return;
785
 
  
786
 
  message::Transaction *transaction= getActiveTransaction(in_session);
787
 
  message::Statement *statement= transaction->add_statement();
788
 
 
789
 
  initStatement(*statement, message::Statement::DROP_TABLE, in_session);
790
 
 
791
 
  /* 
792
 
   * Construct the specialized DropTableStatement message and attach
793
 
   * it to the generic Statement message
794
 
   */
795
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
796
 
 
797
 
  drop_table_statement->set_if_exists_clause(if_exists);
798
 
 
799
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
800
 
 
801
 
  table_metadata->set_schema_name(schema_name);
802
 
  table_metadata->set_table_name(table_name);
803
 
 
804
 
  finalizeStatement(*statement, in_session);
805
 
 
806
 
  finalizeTransaction(*transaction, in_session);
807
 
  
808
 
  push(*transaction);
809
 
 
810
 
  cleanupTransaction(transaction, in_session);
811
 
}
812
 
 
813
 
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
814
 
{
815
 
  if (! is_active)
816
 
    return;
817
 
  
818
 
  message::Transaction *transaction= getActiveTransaction(in_session);
819
 
  message::Statement *statement= transaction->add_statement();
820
 
 
821
 
  initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
822
 
 
823
 
  /* 
824
 
   * Construct the specialized TruncateTableStatement message and attach
825
 
   * it to the generic Statement message
826
 
   */
827
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
828
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
829
 
 
830
 
  const char *schema_name= in_table->getShare()->db.str;
831
 
  const char *table_name= in_table->getShare()->table_name.str;
832
 
 
833
 
  table_metadata->set_schema_name(schema_name);
834
 
  table_metadata->set_table_name(table_name);
835
 
 
836
 
  finalizeStatement(*statement, in_session);
837
 
 
838
 
  finalizeTransaction(*transaction, in_session);
839
 
  
840
 
  push(*transaction);
841
 
 
842
 
  cleanupTransaction(transaction, in_session);
843
 
}
844
 
 
845
 
void ReplicationServices::rawStatement(Session *in_session, const string &query)
846
 
{
847
 
  if (! is_active)
848
 
    return;
849
 
  
850
 
  message::Transaction *transaction= getActiveTransaction(in_session);
851
 
  message::Statement *statement= transaction->add_statement();
852
 
 
853
 
  initStatement(*statement, message::Statement::RAW_SQL, in_session);
854
 
  statement->set_sql(query);
855
 
  finalizeStatement(*statement, in_session);
856
 
 
857
 
  finalizeTransaction(*transaction, in_session);
858
 
  
859
 
  push(*transaction);
860
 
 
861
 
  cleanupTransaction(transaction, in_session);
862
 
}
863
 
 
864
 
void ReplicationServices::push(message::Transaction &to_push)
 
148
void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
865
149
{
866
150
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
867
151
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;