~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

Merge in security refactor.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
24
23
 
25
24
/**
26
25
 * @file Server-side utility which is responsible for managing the 
27
 
 * communication between the kernel and the replication plugins:
28
 
 *
29
 
 * - TransactionReplicator
30
 
 * - TransactionApplier
31
 
 * - Publisher
32
 
 * - Subscriber
33
 
 *
34
 
 * ReplicationServices is a bridge between replication modules and the kernel,
35
 
 * and its primary function is to  */
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * ReplicationServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/transaction.proto
 
42
 *
 
43
 * @todo
 
44
 *
 
45
 * We really should store the raw bytes in the messages, not the
 
46
 * String value of the Field.  But, to do that, the
 
47
 * statement_transform library needs first to be updated
 
48
 * to include the transformation code to convert raw
 
49
 * Drizzle-internal Field byte representation into something
 
50
 * plugins can understand.
 
51
 */
36
52
 
37
53
#include "config.h"
38
54
#include "drizzled/replication_services.h"
39
55
#include "drizzled/plugin/transaction_replicator.h"
40
56
#include "drizzled/plugin/transaction_applier.h"
41
57
#include "drizzled/message/transaction.pb.h"
 
58
#include "drizzled/message/table.pb.h"
 
59
#include "drizzled/message/statement_transform.h"
42
60
#include "drizzled/gettext.h"
43
61
#include "drizzled/session.h"
44
62
#include "drizzled/error.h"
45
63
 
46
 
#include <string>
47
64
#include <vector>
48
 
#include <algorithm>
49
65
 
50
66
using namespace std;
51
67
 
52
68
namespace drizzled
53
69
{
54
70
 
55
 
ReplicationServices::ReplicationServices() :
56
 
  is_active(false)
 
71
ReplicationServices::ReplicationServices()
57
72
{
 
73
  is_active= false;
58
74
}
59
75
 
60
 
void ReplicationServices::normalizeReplicatorName(string &name)
 
76
void ReplicationServices::evaluateActivePlugins()
61
77
{
62
 
  transform(name.begin(),
63
 
            name.end(),
64
 
            name.begin(),
65
 
            ::tolower);
66
 
  if (name.find("replicator") == string::npos)
67
 
    name.append("replicator", 10);
68
 
  {
69
 
    size_t found_underscore= name.find('_');
70
 
    while (found_underscore != string::npos)
 
78
  /* 
 
79
   * We loop through replicators and appliers, evaluating
 
80
   * whether or not there is at least one active replicator
 
81
   * and one active applier.  If not, we set is_active
 
82
   * to false.
 
83
   */
 
84
  bool tmp_is_active= false;
 
85
 
 
86
  if (replicators.empty() || appliers.empty())
 
87
  {
 
88
    is_active= false;
 
89
    return;
 
90
  }
 
91
 
 
92
  /* 
 
93
   * Determine if any remaining replicators and if those
 
94
   * replicators are active...if not, set is_active
 
95
   * to false
 
96
   */
 
97
  for (Replicators::iterator repl_iter= replicators.begin();
 
98
       repl_iter != replicators.end();
 
99
       ++repl_iter)
 
100
  {
 
101
    if ((*repl_iter)->isEnabled())
71
102
    {
72
 
      name.erase(found_underscore, 1);
73
 
      found_underscore= name.find('_');
 
103
      tmp_is_active= true;
 
104
      break;
74
105
    }
75
106
  }
76
 
}
 
107
  if (! tmp_is_active)
 
108
  {
 
109
    /* No active replicators. Set is_active to false and exit. */
 
110
    is_active= false;
 
111
    return;
 
112
  }
77
113
 
78
 
bool ReplicationServices::evaluateRegisteredPlugins()
79
 
{
80
114
  /* 
81
 
   * We loop through appliers that have registered with us
82
 
   * and attempts to pair the applier with its requested
83
 
   * replicator.  If an applier has requested a replicator
84
 
   * that has either not been built or has not registered
85
 
   * with the replication services, we print an error and
86
 
   * return false
 
115
   * OK, we know there's at least one active replicator.
 
116
   *
 
117
   * Now determine if any remaining replicators and if those
 
118
   * replicators are active...if not, set is_active
 
119
   * to false
87
120
   */
88
 
  if (appliers.empty())
89
 
    return true;
90
 
 
91
 
  if (replicators.empty() && not appliers.empty())
92
 
  {
93
 
    errmsg_printf(error::ERROR,
94
 
                  N_("You registered a TransactionApplier plugin but no "
95
 
                     "TransactionReplicator plugins were registered.\n"));
96
 
    return false;
97
 
  }
98
 
 
99
121
  for (Appliers::iterator appl_iter= appliers.begin();
100
122
       appl_iter != appliers.end();
101
123
       ++appl_iter)
102
124
  {
103
 
    plugin::TransactionApplier *applier= (*appl_iter).second;
104
 
    string requested_replicator_name= (*appl_iter).first;
105
 
    normalizeReplicatorName(requested_replicator_name);
106
 
 
107
 
    bool found= false;
108
 
    Replicators::iterator repl_iter;
109
 
    for (repl_iter= replicators.begin();
110
 
         repl_iter != replicators.end();
111
 
         ++repl_iter)
112
 
    {
113
 
      string replicator_name= (*repl_iter)->getName();
114
 
      normalizeReplicatorName(replicator_name);
115
 
 
116
 
      if (requested_replicator_name.compare(replicator_name) == 0)
117
 
      {
118
 
        found= true;
119
 
        break;
120
 
      }
121
 
    }
122
 
    if (not found)
123
 
    {
124
 
      errmsg_printf(error::ERROR,
125
 
                    N_("You registered a TransactionApplier plugin but no "
126
 
                       "TransactionReplicator plugins were registered that match the "
127
 
                       "requested replicator name of '%s'.\n"
128
 
                       "We have deactivated the TransactionApplier '%s'.\n"),
129
 
                       requested_replicator_name.c_str(),
130
 
                       applier->getName().c_str());
131
 
      applier->deactivate();
132
 
      return false;
133
 
    }
134
 
    else
135
 
    {
136
 
      replication_streams.push_back(make_pair(*repl_iter, applier));
 
125
    if ((*appl_iter)->isEnabled())
 
126
    {
 
127
      is_active= true;
 
128
      return;
137
129
    }
138
130
  }
139
 
  is_active= true;
140
 
  return true;
 
131
  /* If we get here, there are no active appliers */
 
132
  is_active= false;
141
133
}
142
134
 
143
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
144
136
{
145
137
  replicators.push_back(in_replicator);
 
138
  evaluateActivePlugins();
146
139
}
147
140
 
148
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
149
142
{
150
143
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
151
 
}
152
 
 
153
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
154
 
{
155
 
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
156
 
}
157
 
 
158
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
159
 
{
 
144
  evaluateActivePlugins();
 
145
}
 
146
 
 
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
 
148
{
 
149
  appliers.push_back(in_applier);
 
150
  evaluateActivePlugins();
 
151
}
 
152
 
 
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
 
154
{
 
155
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
156
  evaluateActivePlugins();
160
157
}
161
158
 
162
159
bool ReplicationServices::isActive() const
164
161
  return is_active;
165
162
}
166
163
 
167
 
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
 
                                                                          message::Transaction &to_push)
169
 
{
170
 
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
171
 
 
172
 
  for (ReplicationStreams::iterator iter= replication_streams.begin();
173
 
       iter != replication_streams.end();
174
 
       ++iter)
175
 
  {
176
 
    plugin::TransactionReplicator *cur_repl= (*iter).first;
177
 
    plugin::TransactionApplier *cur_appl= (*iter).second;
178
 
 
179
 
    result= cur_repl->replicate(cur_appl, in_session, to_push);
180
 
 
181
 
    if (result == plugin::SUCCESS)
182
 
    {
 
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
 
165
{
 
166
  message::Transaction *transaction= in_session->getTransactionMessage();
 
167
 
 
168
  if (unlikely(transaction == NULL))
 
169
  {
 
170
    /* 
 
171
     * Allocate and initialize a new transaction message 
 
172
     * for this Session object.  Session is responsible for
 
173
     * deleting transaction message when done with it.
 
174
     */
 
175
    transaction= new (nothrow) message::Transaction();
 
176
    initTransaction(*transaction, in_session);
 
177
    in_session->setTransactionMessage(transaction);
 
178
    return transaction;
 
179
  }
 
180
  else
 
181
    return transaction;
 
182
}
 
183
 
 
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
 
185
                                          Session *in_session) const
 
186
{
 
187
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
188
  trx->set_server_id(in_session->getServerId());
 
189
  trx->set_transaction_id(in_session->getQueryId());
 
190
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
191
}
 
192
 
 
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
 
194
                                              Session *in_session) const
 
195
{
 
196
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
197
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
198
}
 
199
 
 
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
 
201
                                             Session *in_session) const
 
202
{
 
203
  delete in_transaction;
 
204
  in_session->setStatementMessage(NULL);
 
205
  in_session->setTransactionMessage(NULL);
 
206
}
 
207
 
 
208
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
 
209
{
 
210
  size_t num_statements= transaction.statement_size();
 
211
  if (num_statements == 0)
 
212
    return false;
 
213
 
 
214
  /*
 
215
   * Only INSERT, UPDATE, and DELETE statements can possibly
 
216
   * have bulk segments.  So, we loop through the statements
 
217
   * checking for segment_id > 1 in those specific submessages.
 
218
   */
 
219
  size_t x;
 
220
  for (x= 0; x < num_statements; ++x)
 
221
  {
 
222
    const message::Statement &statement= transaction.statement(x);
 
223
    message::Statement::Type type= statement.type();
 
224
 
 
225
    switch (type)
 
226
    {
 
227
      case message::Statement::INSERT:
 
228
        if (statement.insert_data().segment_id() > 1)
 
229
          return true;
 
230
        break;
 
231
      case message::Statement::UPDATE:
 
232
        if (statement.update_data().segment_id() > 1)
 
233
          return true;
 
234
        break;
 
235
      case message::Statement::DELETE:
 
236
        if (statement.delete_data().segment_id() > 1)
 
237
          return true;
 
238
        break;
 
239
      default:
 
240
        break;
 
241
    }
 
242
  }
 
243
  return false;
 
244
}
 
245
void ReplicationServices::commitTransaction(Session *in_session)
 
246
{
 
247
  if (! is_active)
 
248
    return;
 
249
 
 
250
  /* If there is an active statement message, finalize it */
 
251
  message::Statement *statement= in_session->getStatementMessage();
 
252
 
 
253
  if (statement != NULL)
 
254
  {
 
255
    finalizeStatement(*statement, in_session);
 
256
  }
 
257
  else
 
258
    return; /* No data modification occurred inside the transaction */
 
259
  
 
260
  message::Transaction* transaction= getActiveTransaction(in_session);
 
261
 
 
262
  finalizeTransaction(*transaction, in_session);
 
263
  
 
264
  push(*transaction);
 
265
 
 
266
  cleanupTransaction(transaction, in_session);
 
267
}
 
268
 
 
269
void ReplicationServices::initStatement(message::Statement &statement,
 
270
                                        message::Statement::Type in_type,
 
271
                                        Session *in_session) const
 
272
{
 
273
  statement.set_type(in_type);
 
274
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
275
  /** @TODO Set sql string optionally */
 
276
}
 
277
 
 
278
void ReplicationServices::finalizeStatement(message::Statement &statement,
 
279
                                            Session *in_session) const
 
280
{
 
281
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
282
  in_session->setStatementMessage(NULL);
 
283
}
 
284
 
 
285
void ReplicationServices::rollbackTransaction(Session *in_session)
 
286
{
 
287
  if (! is_active)
 
288
    return;
 
289
  
 
290
  message::Transaction *transaction= getActiveTransaction(in_session);
 
291
 
 
292
  /*
 
293
   * OK, so there are two situations that we need to deal with here:
 
294
   *
 
295
   * 1) We receive an instruction to ROLLBACK the current transaction
 
296
   *    and the currently-stored Transaction message is *self-contained*, 
 
297
   *    meaning that no Statement messages in the Transaction message
 
298
   *    contain a message having its segment_id member greater than 1.  If
 
299
   *    no non-segment ID 1 members are found, we can simply clear the
 
300
   *    current Transaction message and remove it from memory.
 
301
   *
 
302
   * 2) If the Transaction message does indeed have a non-end segment, that
 
303
   *    means that a bulk update/delete/insert Transaction message segment
 
304
   *    has previously been sent over the wire to replicators.  In this case, 
 
305
   *    we need to package a Transaction with a Statement message of type
 
306
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
307
   *    messages must be un-applied.
 
308
   */
 
309
  if (unlikely(transactionContainsBulkSegment(*transaction)))
 
310
  {
 
311
    /*
 
312
     * Clear the transaction, create a Rollback statement message, 
 
313
     * attach it to the transaction, and push it to replicators.
 
314
     */
 
315
    transaction->Clear();
 
316
    initTransaction(*transaction, in_session);
 
317
 
 
318
    message::Statement *statement= transaction->add_statement();
 
319
 
 
320
    initStatement(*statement, message::Statement::ROLLBACK, in_session);
 
321
    finalizeStatement(*statement, in_session);
 
322
 
 
323
    finalizeTransaction(*transaction, in_session);
 
324
    
 
325
    push(*transaction);
 
326
  }
 
327
  cleanupTransaction(transaction, in_session);
 
328
}
 
329
 
 
330
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
 
331
                                                                 Table *in_table) const
 
332
{
 
333
  message::Statement *statement= in_session->getStatementMessage();
 
334
  /*
 
335
   * We check to see if the current Statement message is of type INSERT.
 
336
   * If it is not, we finalize the current Statement and ensure a new
 
337
   * InsertStatement is created.
 
338
   */
 
339
  if (statement != NULL &&
 
340
      statement->type() != message::Statement::INSERT)
 
341
  {
 
342
    finalizeStatement(*statement, in_session);
 
343
    statement= in_session->getStatementMessage();
 
344
  }
 
345
 
 
346
  if (statement == NULL)
 
347
  {
 
348
    message::Transaction *transaction= getActiveTransaction(in_session);
 
349
    /* 
 
350
     * Transaction message initialized and set, but no statement created
 
351
     * yet.  We construct one and initialize it, here, then return the
 
352
     * message after attaching the new Statement message pointer to the 
 
353
     * Session for easy retrieval later...
 
354
     */
 
355
    statement= transaction->add_statement();
 
356
    setInsertHeader(*statement, in_session, in_table);
 
357
    in_session->setStatementMessage(statement);
 
358
  }
 
359
  return *statement;
 
360
}
 
361
 
 
362
void ReplicationServices::setInsertHeader(message::Statement &statement,
 
363
                                          Session *in_session,
 
364
                                          Table *in_table) const
 
365
{
 
366
  initStatement(statement, message::Statement::INSERT, in_session);
 
367
 
 
368
  /* 
 
369
   * Now we construct the specialized InsertHeader message inside
 
370
   * the generalized message::Statement container...
 
371
   */
 
372
  /* Set up the insert header */
 
373
  message::InsertHeader *header= statement.mutable_insert_header();
 
374
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
375
 
 
376
  const char *schema_name= in_table->getShare()->db.str;
 
377
  const char *table_name= in_table->getShare()->table_name.str;
 
378
 
 
379
  table_metadata->set_schema_name(schema_name);
 
380
  table_metadata->set_table_name(table_name);
 
381
 
 
382
  Field *current_field;
 
383
  Field **table_fields= in_table->field;
 
384
 
 
385
  message::FieldMetadata *field_metadata;
 
386
 
 
387
  /* We will read all the table's fields... */
 
388
  in_table->setReadSet();
 
389
 
 
390
  while ((current_field= *table_fields++) != NULL) 
 
391
  {
 
392
    field_metadata= header->add_field_metadata();
 
393
    field_metadata->set_name(current_field->field_name);
 
394
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
395
  }
 
396
}
 
397
 
 
398
bool ReplicationServices::insertRecord(Session *in_session, Table *in_table)
 
399
{
 
400
  if (! is_active)
 
401
    return false;
 
402
  /**
 
403
   * We do this check here because we don't want to even create a 
 
404
   * statement if there isn't a primary key on the table...
 
405
   *
 
406
   * @todo
 
407
   *
 
408
   * Multi-column primary keys are handled how exactly?
 
409
   */
 
410
  if (in_table->s->primary_key == MAX_KEY)
 
411
  {
 
412
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
 
413
    return true;
 
414
  }
 
415
 
 
416
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
417
 
 
418
  message::InsertData *data= statement.mutable_insert_data();
 
419
  data->set_segment_id(1);
 
420
  data->set_end_segment(true);
 
421
  message::InsertRecord *record= data->add_record();
 
422
 
 
423
  Field *current_field;
 
424
  Field **table_fields= in_table->field;
 
425
 
 
426
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
427
  string_value->set_charset(system_charset_info);
 
428
 
 
429
  /* We will read all the table's fields... */
 
430
  in_table->setReadSet();
 
431
 
 
432
  while ((current_field= *table_fields++) != NULL) 
 
433
  {
 
434
    string_value= current_field->val_str(string_value);
 
435
    record->add_insert_value(string_value->c_ptr(), string_value->length());
 
436
    string_value->free();
 
437
  }
 
438
  return false;
 
439
}
 
440
 
 
441
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
 
442
                                                            Table *in_table,
 
443
                                                            const unsigned char *old_record, 
 
444
                                                            const unsigned char *new_record) const
 
445
{
 
446
  message::Statement *statement= in_session->getStatementMessage();
 
447
  /*
 
448
   * We check to see if the current Statement message is of type UPDATE.
 
449
   * If it is not, we finalize the current Statement and ensure a new
 
450
   * UpdateStatement is created.
 
451
   */
 
452
  if (statement != NULL &&
 
453
      statement->type() != message::Statement::UPDATE)
 
454
  {
 
455
    finalizeStatement(*statement, in_session);
 
456
    statement= in_session->getStatementMessage();
 
457
  }
 
458
 
 
459
  if (statement == NULL)
 
460
  {
 
461
    message::Transaction *transaction= getActiveTransaction(in_session);
 
462
    /* 
 
463
     * Transaction message initialized and set, but no statement created
 
464
     * yet.  We construct one and initialize it, here, then return the
 
465
     * message after attaching the new Statement message pointer to the 
 
466
     * Session for easy retrieval later...
 
467
     */
 
468
    statement= transaction->add_statement();
 
469
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
470
    in_session->setStatementMessage(statement);
 
471
  }
 
472
  return *statement;
 
473
}
 
474
 
 
475
void ReplicationServices::setUpdateHeader(message::Statement &statement,
 
476
                                          Session *in_session,
 
477
                                          Table *in_table,
 
478
                                          const unsigned char *old_record, 
 
479
                                          const unsigned char *new_record) const
 
480
{
 
481
  initStatement(statement, message::Statement::UPDATE, in_session);
 
482
 
 
483
  /* 
 
484
   * Now we construct the specialized UpdateHeader message inside
 
485
   * the generalized message::Statement container...
 
486
   */
 
487
  /* Set up the update header */
 
488
  message::UpdateHeader *header= statement.mutable_update_header();
 
489
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
490
 
 
491
  const char *schema_name= in_table->getShare()->db.str;
 
492
  const char *table_name= in_table->getShare()->table_name.str;
 
493
 
 
494
  table_metadata->set_schema_name(schema_name);
 
495
  table_metadata->set_table_name(table_name);
 
496
 
 
497
  Field *current_field;
 
498
  Field **table_fields= in_table->field;
 
499
 
 
500
  message::FieldMetadata *field_metadata;
 
501
 
 
502
  /* We will read all the table's fields... */
 
503
  in_table->setReadSet();
 
504
 
 
505
  while ((current_field= *table_fields++) != NULL) 
 
506
  {
 
507
    /*
 
508
     * We add the "key field metadata" -- i.e. the fields which is
 
509
     * the primary key for the table.
 
510
     */
 
511
    if (in_table->s->primary_key == current_field->field_index)
 
512
    {
 
513
      field_metadata= header->add_key_field_metadata();
 
514
      field_metadata->set_name(current_field->field_name);
 
515
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
516
    }
 
517
 
 
518
    /*
 
519
     * The below really should be moved into the Field API and Record API.  But for now
 
520
     * we do this crazy pointer fiddling to figure out if the current field
 
521
     * has been updated in the supplied record raw byte pointers.
 
522
     */
 
523
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
524
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
525
 
 
526
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
527
 
 
528
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
529
    {
 
530
      /* Field is changed from old to new */
 
531
      field_metadata= header->add_set_field_metadata();
 
532
      field_metadata->set_name(current_field->field_name);
 
533
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
534
    }
 
535
  }
 
536
}
 
537
void ReplicationServices::updateRecord(Session *in_session,
 
538
                                       Table *in_table, 
 
539
                                       const unsigned char *old_record, 
 
540
                                       const unsigned char *new_record)
 
541
{
 
542
  if (! is_active)
 
543
    return;
 
544
 
 
545
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
546
 
 
547
  message::UpdateData *data= statement.mutable_update_data();
 
548
  data->set_segment_id(1);
 
549
  data->set_end_segment(true);
 
550
  message::UpdateRecord *record= data->add_record();
 
551
 
 
552
  Field *current_field;
 
553
  Field **table_fields= in_table->field;
 
554
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
555
  string_value->set_charset(system_charset_info);
 
556
 
 
557
  while ((current_field= *table_fields++) != NULL) 
 
558
  {
 
559
    /*
 
560
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
561
     * but then realized that an UPDATE statement could potentially have different values for
 
562
     * the SET field.  For instance, imagine this SQL scenario:
 
563
     *
 
564
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
565
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
566
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
567
     *
 
568
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
569
     *
 
570
     * The below really should be moved into the Field API and Record API.  But for now
 
571
     * we do this crazy pointer fiddling to figure out if the current field
 
572
     * has been updated in the supplied record raw byte pointers.
 
573
     */
 
574
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
575
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
576
 
 
577
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
578
 
 
579
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
580
    {
 
581
      /* Store the original "read bit" for this field */
 
582
      bool is_read_set= current_field->isReadSet();
 
583
 
 
584
      /* We need to mark that we will "read" this field... */
 
585
      in_table->setReadSet(current_field->field_index);
 
586
 
 
587
      /* Read the string value of this field's contents */
 
588
      string_value= current_field->val_str(string_value);
 
589
 
 
590
      /* 
 
591
       * Reset the read bit after reading field to its original state.  This 
 
592
       * prevents the field from being included in the WHERE clause
 
593
       */
 
594
      current_field->setReadSet(is_read_set);
 
595
 
 
596
      record->add_after_value(string_value->c_ptr(), string_value->length());
 
597
      string_value->free();
 
598
    }
 
599
 
 
600
    /* 
 
601
     * Add the WHERE clause values now...for now, this means the
 
602
     * primary key field value.  Replication only supports tables
 
603
     * with a primary key.
 
604
     */
 
605
    if (in_table->s->primary_key == current_field->field_index)
 
606
    {
 
607
      /**
 
608
       * To say the below is ugly is an understatement. But it works.
 
609
       * 
 
610
       * @todo Move this crap into a real Record API.
 
611
       */
 
612
      string_value= current_field->val_str(string_value,
 
613
                                           old_record + 
 
614
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
615
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
616
      string_value->free();
 
617
    }
 
618
 
 
619
  }
 
620
}
 
621
 
 
622
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
 
623
                                                            Table *in_table) const
 
624
{
 
625
  message::Statement *statement= in_session->getStatementMessage();
 
626
  /*
 
627
   * We check to see if the current Statement message is of type DELETE.
 
628
   * If it is not, we finalize the current Statement and ensure a new
 
629
   * DeleteStatement is created.
 
630
   */
 
631
  if (statement != NULL &&
 
632
      statement->type() != message::Statement::DELETE)
 
633
  {
 
634
    finalizeStatement(*statement, in_session);
 
635
    statement= in_session->getStatementMessage();
 
636
  }
 
637
 
 
638
  if (statement == NULL)
 
639
  {
 
640
    message::Transaction *transaction= getActiveTransaction(in_session);
 
641
    /* 
 
642
     * Transaction message initialized and set, but no statement created
 
643
     * yet.  We construct one and initialize it, here, then return the
 
644
     * message after attaching the new Statement message pointer to the 
 
645
     * Session for easy retrieval later...
 
646
     */
 
647
    statement= transaction->add_statement();
 
648
    setDeleteHeader(*statement, in_session, in_table);
 
649
    in_session->setStatementMessage(statement);
 
650
  }
 
651
  return *statement;
 
652
}
 
653
 
 
654
void ReplicationServices::setDeleteHeader(message::Statement &statement,
 
655
                                          Session *in_session,
 
656
                                          Table *in_table) const
 
657
{
 
658
  initStatement(statement, message::Statement::DELETE, in_session);
 
659
 
 
660
  /* 
 
661
   * Now we construct the specialized DeleteHeader message inside
 
662
   * the generalized message::Statement container...
 
663
   */
 
664
  message::DeleteHeader *header= statement.mutable_delete_header();
 
665
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
666
 
 
667
  const char *schema_name= in_table->getShare()->db.str;
 
668
  const char *table_name= in_table->getShare()->table_name.str;
 
669
 
 
670
  table_metadata->set_schema_name(schema_name);
 
671
  table_metadata->set_table_name(table_name);
 
672
 
 
673
  Field *current_field;
 
674
  Field **table_fields= in_table->field;
 
675
 
 
676
  message::FieldMetadata *field_metadata;
 
677
 
 
678
  while ((current_field= *table_fields++) != NULL) 
 
679
  {
 
680
    /* 
 
681
     * Add the WHERE clause values now...for now, this means the
 
682
     * primary key field value.  Replication only supports tables
 
683
     * with a primary key.
 
684
     */
 
685
    if (in_table->s->primary_key == current_field->field_index)
 
686
    {
 
687
      field_metadata= header->add_key_field_metadata();
 
688
      field_metadata->set_name(current_field->field_name);
 
689
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
690
    }
 
691
  }
 
692
}
 
693
 
 
694
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
 
695
{
 
696
  if (! is_active)
 
697
    return;
 
698
 
 
699
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
700
 
 
701
  message::DeleteData *data= statement.mutable_delete_data();
 
702
  data->set_segment_id(1);
 
703
  data->set_end_segment(true);
 
704
  message::DeleteRecord *record= data->add_record();
 
705
 
 
706
  Field *current_field;
 
707
  Field **table_fields= in_table->field;
 
708
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
709
  string_value->set_charset(system_charset_info);
 
710
 
 
711
  while ((current_field= *table_fields++) != NULL) 
 
712
  {
 
713
    /* 
 
714
     * Add the WHERE clause values now...for now, this means the
 
715
     * primary key field value.  Replication only supports tables
 
716
     * with a primary key.
 
717
     */
 
718
    if (in_table->s->primary_key == current_field->field_index)
 
719
    {
 
720
      string_value= current_field->val_str(string_value);
 
721
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
722
      /**
 
723
       * @TODO Store optional old record value in the before data member
 
724
       */
 
725
      string_value->free();
 
726
    }
 
727
  }
 
728
}
 
729
 
 
730
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
 
731
{
 
732
  if (! is_active)
 
733
    return;
 
734
  
 
735
  message::Transaction *transaction= getActiveTransaction(in_session);
 
736
  message::Statement *statement= transaction->add_statement();
 
737
 
 
738
  initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
739
 
 
740
  /* 
 
741
   * Construct the specialized TruncateTableStatement message and attach
 
742
   * it to the generic Statement message
 
743
   */
 
744
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
745
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
746
 
 
747
  const char *schema_name= in_table->getShare()->db.str;
 
748
  const char *table_name= in_table->getShare()->table_name.str;
 
749
 
 
750
  table_metadata->set_schema_name(schema_name);
 
751
  table_metadata->set_table_name(table_name);
 
752
 
 
753
  finalizeStatement(*statement, in_session);
 
754
 
 
755
  finalizeTransaction(*transaction, in_session);
 
756
  
 
757
  push(*transaction);
 
758
 
 
759
  cleanupTransaction(transaction, in_session);
 
760
}
 
761
 
 
762
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
763
{
 
764
  if (! is_active)
 
765
    return;
 
766
  
 
767
  message::Transaction *transaction= getActiveTransaction(in_session);
 
768
  message::Statement *statement= transaction->add_statement();
 
769
 
 
770
  initStatement(*statement, message::Statement::RAW_SQL, in_session);
 
771
  string query(in_query, in_query_len);
 
772
  statement->set_sql(query);
 
773
  finalizeStatement(*statement, in_session);
 
774
 
 
775
  finalizeTransaction(*transaction, in_session);
 
776
  
 
777
  push(*transaction);
 
778
 
 
779
  cleanupTransaction(transaction, in_session);
 
780
}
 
781
 
 
782
void ReplicationServices::push(message::Transaction &to_push)
 
783
{
 
784
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
 
785
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
 
786
  appl_start_iter= appliers.begin();
 
787
 
 
788
  plugin::TransactionReplicator *cur_repl;
 
789
  plugin::TransactionApplier *cur_appl;
 
790
 
 
791
  while (repl_iter != replicators.end())
 
792
  {
 
793
    cur_repl= *repl_iter;
 
794
    if (! cur_repl->isEnabled())
 
795
    {
 
796
      ++repl_iter;
 
797
      continue;
 
798
    }
 
799
    
 
800
    appl_iter= appl_start_iter;
 
801
    while (appl_iter != appliers.end())
 
802
    {
 
803
      cur_appl= *appl_iter;
 
804
 
 
805
      if (! cur_appl->isEnabled())
 
806
      {
 
807
        ++appl_iter;
 
808
        continue;
 
809
      }
 
810
 
 
811
      cur_repl->replicate(cur_appl, to_push);
 
812
      
183
813
      /* 
184
814
       * We update the timestamp for the last applied Transaction so that
185
815
       * publisher plugins can ask the replication services when the
187
817
       * method.
188
818
       */
189
819
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
 
820
      ++appl_iter;
190
821
    }
191
 
    else
192
 
      return result;
 
822
    ++repl_iter;
193
823
  }
194
 
  return result;
195
 
}
196
 
 
197
 
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
198
 
{
199
 
  return replication_streams;
200
824
}
201
825
 
202
826
} /* namespace drizzled */