~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Brian Aker
  • Date: 2009-12-06 01:55:53 UTC
  • mfrom: (1238.1.5 push)
  • Revision ID: brian@gaz-20091206015553-cva833q4gvwj11ob
Bundle for staging.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
24
23
 
25
24
/**
26
25
 * @file Server-side utility which is responsible for managing the 
27
 
 * communication between the kernel and the replication plugins:
28
 
 *
29
 
 * - TransactionReplicator
30
 
 * - TransactionApplier
31
 
 * - Publisher
32
 
 * - Subscriber
33
 
 *
34
 
 * ReplicationServices is a bridge between replication modules and the kernel,
35
 
 * and its primary function is to  */
36
 
 
37
 
#include <config.h>
38
 
#include <drizzled/replication_services.h>
39
 
#include <drizzled/plugin/transaction_replicator.h>
40
 
#include <drizzled/plugin/transaction_applier.h>
41
 
#include <drizzled/message/transaction.pb.h>
42
 
#include <drizzled/gettext.h>
43
 
#include <drizzled/session.h>
44
 
#include <drizzled/error.h>
45
 
 
46
 
#include <string>
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * ReplicationServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/transaction.proto
 
42
 *
 
43
 * @todo
 
44
 *
 
45
 * We really should store the raw bytes in the messages, not the
 
46
 * String value of the Field.  But, to do that, the
 
47
 * statement_transform library needs first to be updated
 
48
 * to include the transformation code to convert raw
 
49
 * Drizzle-internal Field byte representation into something
 
50
 * plugins can understand.
 
51
 */
 
52
 
 
53
#include "drizzled/server_includes.h"
 
54
#include "drizzled/replication_services.h"
 
55
#include "drizzled/plugin/transaction_replicator.h"
 
56
#include "drizzled/plugin/transaction_applier.h"
 
57
#include "drizzled/message/transaction.pb.h"
 
58
#include "drizzled/message/table.pb.h"
 
59
#include "drizzled/gettext.h"
 
60
#include "drizzled/session.h"
 
61
 
47
62
#include <vector>
48
 
#include <algorithm>
49
63
 
50
64
using namespace std;
51
65
 
52
66
namespace drizzled
53
67
{
54
68
 
55
 
ReplicationServices::ReplicationServices() :
56
 
  is_active(false)
 
69
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type);
 
70
 
 
71
ReplicationServices::ReplicationServices()
57
72
{
 
73
  is_active= false;
58
74
}
59
75
 
60
 
void ReplicationServices::normalizeReplicatorName(string &name)
 
76
void ReplicationServices::evaluateActivePlugins()
61
77
{
62
 
  transform(name.begin(),
63
 
            name.end(),
64
 
            name.begin(),
65
 
            ::tolower);
66
 
  if (name.find("replicator") == string::npos)
67
 
    name.append("replicator", 10);
68
 
  {
69
 
    size_t found_underscore= name.find('_');
70
 
    while (found_underscore != string::npos)
 
78
  /* 
 
79
   * We loop through replicators and appliers, evaluating
 
80
   * whether or not there is at least one active replicator
 
81
   * and one active applier.  If not, we set is_active
 
82
   * to false.
 
83
   */
 
84
  bool tmp_is_active= false;
 
85
 
 
86
  if (replicators.empty() || appliers.empty())
 
87
  {
 
88
    is_active= false;
 
89
    return;
 
90
  }
 
91
 
 
92
  /* 
 
93
   * Determine if any remaining replicators and if those
 
94
   * replicators are active...if not, set is_active
 
95
   * to false
 
96
   */
 
97
  for (Replicators::iterator repl_iter= replicators.begin();
 
98
       repl_iter != replicators.end();
 
99
       ++repl_iter)
 
100
  {
 
101
    if ((*repl_iter)->isEnabled())
71
102
    {
72
 
      name.erase(found_underscore, 1);
73
 
      found_underscore= name.find('_');
 
103
      tmp_is_active= true;
 
104
      break;
74
105
    }
75
106
  }
76
 
}
 
107
  if (! tmp_is_active)
 
108
  {
 
109
    /* No active replicators. Set is_active to false and exit. */
 
110
    is_active= false;
 
111
    return;
 
112
  }
77
113
 
78
 
bool ReplicationServices::evaluateRegisteredPlugins()
79
 
{
80
114
  /* 
81
 
   * We loop through appliers that have registered with us
82
 
   * and attempts to pair the applier with its requested
83
 
   * replicator.  If an applier has requested a replicator
84
 
   * that has either not been built or has not registered
85
 
   * with the replication services, we print an error and
86
 
   * return false
 
115
   * OK, we know there's at least one active replicator.
 
116
   *
 
117
   * Now determine if any remaining replicators and if those
 
118
   * replicators are active...if not, set is_active
 
119
   * to false
87
120
   */
88
 
  if (appliers.empty())
89
 
    return true;
90
 
 
91
 
  if (replicators.empty() && not appliers.empty())
92
 
  {
93
 
    errmsg_printf(error::ERROR,
94
 
                  N_("You registered a TransactionApplier plugin but no "
95
 
                     "TransactionReplicator plugins were registered.\n"));
96
 
    return false;
97
 
  }
98
 
 
99
121
  for (Appliers::iterator appl_iter= appliers.begin();
100
122
       appl_iter != appliers.end();
101
123
       ++appl_iter)
102
124
  {
103
 
    plugin::TransactionApplier *applier= (*appl_iter).second;
104
 
    string requested_replicator_name= (*appl_iter).first;
105
 
    normalizeReplicatorName(requested_replicator_name);
106
 
 
107
 
    bool found= false;
108
 
    Replicators::iterator repl_iter;
109
 
    for (repl_iter= replicators.begin();
110
 
         repl_iter != replicators.end();
111
 
         ++repl_iter)
112
 
    {
113
 
      string replicator_name= (*repl_iter)->getName();
114
 
      normalizeReplicatorName(replicator_name);
115
 
 
116
 
      if (requested_replicator_name.compare(replicator_name) == 0)
117
 
      {
118
 
        found= true;
119
 
        break;
120
 
      }
121
 
    }
122
 
    if (not found)
123
 
    {
124
 
      errmsg_printf(error::ERROR,
125
 
                    N_("You registered a TransactionApplier plugin but no "
126
 
                       "TransactionReplicator plugins were registered that match the "
127
 
                       "requested replicator name of '%s'.\n"
128
 
                       "We have deactivated the TransactionApplier '%s'.\n"),
129
 
                       requested_replicator_name.c_str(),
130
 
                       applier->getName().c_str());
131
 
      applier->deactivate();
132
 
      return false;
133
 
    }
134
 
    else
135
 
    {
136
 
      replication_streams.push_back(make_pair(*repl_iter, applier));
 
125
    if ((*appl_iter)->isEnabled())
 
126
    {
 
127
      is_active= true;
 
128
      return;
137
129
    }
138
130
  }
139
 
  is_active= true;
140
 
  return true;
 
131
  /* If we get here, there are no active appliers */
 
132
  is_active= false;
141
133
}
142
134
 
143
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
144
136
{
145
137
  replicators.push_back(in_replicator);
 
138
  evaluateActivePlugins();
146
139
}
147
140
 
148
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
149
142
{
150
143
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
151
 
}
152
 
 
153
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
154
 
{
155
 
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
156
 
}
157
 
 
158
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
159
 
{
 
144
  evaluateActivePlugins();
 
145
}
 
146
 
 
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
 
148
{
 
149
  appliers.push_back(in_applier);
 
150
  evaluateActivePlugins();
 
151
}
 
152
 
 
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
 
154
{
 
155
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
156
  evaluateActivePlugins();
160
157
}
161
158
 
162
159
bool ReplicationServices::isActive() const
164
161
  return is_active;
165
162
}
166
163
 
167
 
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
 
                                                                          message::Transaction &to_push)
169
 
{
170
 
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
171
 
 
172
 
  for (ReplicationStreams::iterator iter= replication_streams.begin();
173
 
       iter != replication_streams.end();
174
 
       ++iter)
175
 
  {
176
 
    plugin::TransactionReplicator *cur_repl= (*iter).first;
177
 
    plugin::TransactionApplier *cur_appl= (*iter).second;
178
 
 
179
 
    result= cur_repl->replicate(cur_appl, in_session, to_push);
180
 
 
181
 
    if (result == plugin::SUCCESS)
182
 
    {
 
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
 
165
{
 
166
  message::Transaction *transaction= in_session->getTransactionMessage();
 
167
 
 
168
  if (unlikely(transaction == NULL))
 
169
  {
 
170
    /* 
 
171
     * Allocate and initialize a new transaction message 
 
172
     * for this Session object.  Session is responsible for
 
173
     * deleting transaction message when done with it.
 
174
     */
 
175
    transaction= new (nothrow) message::Transaction();
 
176
    initTransaction(*transaction, in_session);
 
177
    in_session->setTransactionMessage(transaction);
 
178
    return transaction;
 
179
  }
 
180
  else
 
181
    return transaction;
 
182
}
 
183
 
 
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
 
185
                                          Session *in_session) const
 
186
{
 
187
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
188
  trx->set_server_id(in_session->getServerId());
 
189
  trx->set_transaction_id(in_session->getTransactionId());
 
190
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
191
}
 
192
 
 
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
 
194
                                              Session *in_session) const
 
195
{
 
196
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
197
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
198
}
 
199
 
 
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
 
201
                                             Session *in_session) const
 
202
{
 
203
  delete in_transaction;
 
204
  in_session->setStatementMessage(NULL);
 
205
  in_session->setTransactionMessage(NULL);
 
206
}
 
207
 
 
208
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
 
209
{
 
210
  size_t num_statements= transaction.statement_size();
 
211
  if (num_statements == 0)
 
212
    return false;
 
213
 
 
214
  /*
 
215
   * Only INSERT, UPDATE, and DELETE statements can possibly
 
216
   * have bulk segments.  So, we loop through the statements
 
217
   * checking for segment_id > 1 in those specific submessages.
 
218
   */
 
219
  size_t x;
 
220
  for (x= 0; x < num_statements; ++x)
 
221
  {
 
222
    const message::Statement &statement= transaction.statement(x);
 
223
    message::Statement::Type type= statement.type();
 
224
 
 
225
    switch (type)
 
226
    {
 
227
      case message::Statement::INSERT:
 
228
        if (statement.insert_data().segment_id() > 1)
 
229
          return true;
 
230
        break;
 
231
      case message::Statement::UPDATE:
 
232
        if (statement.update_data().segment_id() > 1)
 
233
          return true;
 
234
        break;
 
235
      case message::Statement::DELETE:
 
236
        if (statement.delete_data().segment_id() > 1)
 
237
          return true;
 
238
        break;
 
239
      default:
 
240
        break;
 
241
    }
 
242
  }
 
243
  return false;
 
244
}
 
245
void ReplicationServices::commitTransaction(Session *in_session)
 
246
{
 
247
  if (! is_active)
 
248
    return;
 
249
 
 
250
  /* If there is an active statement message, finalize it */
 
251
  message::Statement *statement= in_session->getStatementMessage();
 
252
 
 
253
  if (statement != NULL)
 
254
  {
 
255
    finalizeStatement(*statement, in_session);
 
256
  }
 
257
  else
 
258
    return; /* No data modification occurred inside the transaction */
 
259
  
 
260
  message::Transaction* transaction= getActiveTransaction(in_session);
 
261
 
 
262
  finalizeTransaction(*transaction, in_session);
 
263
  
 
264
  push(*transaction);
 
265
 
 
266
  cleanupTransaction(transaction, in_session);
 
267
}
 
268
 
 
269
void ReplicationServices::initStatement(message::Statement &statement,
 
270
                                        message::Statement::Type in_type,
 
271
                                        Session *in_session) const
 
272
{
 
273
  statement.set_type(in_type);
 
274
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
275
  /** @TODO Set sql string optionally */
 
276
}
 
277
 
 
278
void ReplicationServices::finalizeStatement(message::Statement &statement,
 
279
                                            Session *in_session) const
 
280
{
 
281
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
282
  in_session->setStatementMessage(NULL);
 
283
}
 
284
 
 
285
void ReplicationServices::rollbackTransaction(Session *in_session)
 
286
{
 
287
  if (! is_active)
 
288
    return;
 
289
  
 
290
  message::Transaction *transaction= getActiveTransaction(in_session);
 
291
 
 
292
  /*
 
293
   * OK, so there are two situations that we need to deal with here:
 
294
   *
 
295
   * 1) We receive an instruction to ROLLBACK the current transaction
 
296
   *    and the currently-stored Transaction message is *self-contained*, 
 
297
   *    meaning that no Statement messages in the Transaction message
 
298
   *    contain a message having its segment_id member greater than 1.  If
 
299
   *    no non-segment ID 1 members are found, we can simply clear the
 
300
   *    current Transaction message and remove it from memory.
 
301
   *
 
302
   * 2) If the Transaction message does indeed have a non-end segment, that
 
303
   *    means that a bulk update/delete/insert Transaction message segment
 
304
   *    has previously been sent over the wire to replicators.  In this case, 
 
305
   *    we need to package a Transaction with a Statement message of type
 
306
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
307
   *    messages must be un-applied.
 
308
   */
 
309
  if (unlikely(transactionContainsBulkSegment(*transaction)))
 
310
  {
 
311
    /*
 
312
     * Clear the transaction, create a Rollback statement message, 
 
313
     * attach it to the transaction, and push it to replicators.
 
314
     */
 
315
    transaction->Clear();
 
316
    initTransaction(*transaction, in_session);
 
317
 
 
318
    message::Statement *statement= transaction->add_statement();
 
319
 
 
320
    initStatement(*statement, message::Statement::ROLLBACK, in_session);
 
321
    finalizeStatement(*statement, in_session);
 
322
 
 
323
    finalizeTransaction(*transaction, in_session);
 
324
    
 
325
    push(*transaction);
 
326
  }
 
327
  cleanupTransaction(transaction, in_session);
 
328
}
 
329
 
 
330
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
 
331
                                                            Table *in_table) const
 
332
{
 
333
  message::Statement *statement= in_session->getStatementMessage();
 
334
 
 
335
  if (statement == NULL)
 
336
  {
 
337
    message::Transaction *transaction= getActiveTransaction(in_session);
 
338
    /* 
 
339
     * Transaction message initialized and set, but no statement created
 
340
     * yet.  We construct one and initialize it, here, then return the
 
341
     * message after attaching the new Statement message pointer to the 
 
342
     * Session for easy retrieval later...
 
343
     */
 
344
    statement= transaction->add_statement();
 
345
    setInsertHeader(*statement, in_session, in_table);
 
346
    in_session->setStatementMessage(statement);
 
347
  }
 
348
  return *statement;
 
349
}
 
350
 
 
351
void ReplicationServices::setInsertHeader(message::Statement &statement,
 
352
                                          Session *in_session,
 
353
                                          Table *in_table) const
 
354
{
 
355
  initStatement(statement, message::Statement::INSERT, in_session);
 
356
 
 
357
  /* 
 
358
   * Now we construct the specialized InsertHeader message inside
 
359
   * the generalized message::Statement container...
 
360
   */
 
361
  /* Set up the insert header */
 
362
  message::InsertHeader *header= statement.mutable_insert_header();
 
363
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
364
 
 
365
  const char *schema_name= in_table->getShare()->db.str;
 
366
  const char *table_name= in_table->getShare()->table_name.str;
 
367
 
 
368
  table_metadata->set_schema_name(schema_name);
 
369
  table_metadata->set_table_name(table_name);
 
370
 
 
371
  Field *current_field;
 
372
  Field **table_fields= in_table->field;
 
373
 
 
374
  message::FieldMetadata *field_metadata;
 
375
 
 
376
  /* We will read all the table's fields... */
 
377
  in_table->setReadSet();
 
378
 
 
379
  while ((current_field= *table_fields++) != NULL) 
 
380
  {
 
381
    field_metadata= header->add_field_metadata();
 
382
    field_metadata->set_name(current_field->field_name);
 
383
    field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
384
  }
 
385
}
 
386
 
 
387
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
 
388
{
 
389
  if (! is_active)
 
390
    return;
 
391
 
 
392
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
393
 
 
394
  message::InsertData *data= statement.mutable_insert_data();
 
395
  data->set_segment_id(1);
 
396
  data->set_end_segment(true);
 
397
  message::InsertRecord *record= data->add_record();
 
398
 
 
399
  Field *current_field;
 
400
  Field **table_fields= in_table->field;
 
401
 
 
402
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
403
  string_value->set_charset(system_charset_info);
 
404
 
 
405
  /* We will read all the table's fields... */
 
406
  in_table->setReadSet();
 
407
 
 
408
  while ((current_field= *table_fields++) != NULL) 
 
409
  {
 
410
    string_value= current_field->val_str(string_value);
 
411
    record->add_insert_value(string_value->c_ptr(), string_value->length());
 
412
    string_value->free();
 
413
  }
 
414
}
 
415
 
 
416
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
 
417
                                                            Table *in_table,
 
418
                                                            const unsigned char *old_record, 
 
419
                                                            const unsigned char *new_record) const
 
420
{
 
421
  message::Statement *statement= in_session->getStatementMessage();
 
422
 
 
423
  if (statement == NULL)
 
424
  {
 
425
    message::Transaction *transaction= getActiveTransaction(in_session);
 
426
    /* 
 
427
     * Transaction message initialized and set, but no statement created
 
428
     * yet.  We construct one and initialize it, here, then return the
 
429
     * message after attaching the new Statement message pointer to the 
 
430
     * Session for easy retrieval later...
 
431
     */
 
432
    statement= transaction->add_statement();
 
433
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
434
    in_session->setStatementMessage(statement);
 
435
  }
 
436
  return *statement;
 
437
}
 
438
 
 
439
void ReplicationServices::setUpdateHeader(message::Statement &statement,
 
440
                                          Session *in_session,
 
441
                                          Table *in_table,
 
442
                                          const unsigned char *old_record, 
 
443
                                          const unsigned char *new_record) const
 
444
{
 
445
  initStatement(statement, message::Statement::UPDATE, in_session);
 
446
 
 
447
  /* 
 
448
   * Now we construct the specialized UpdateHeader message inside
 
449
   * the generalized message::Statement container...
 
450
   */
 
451
  /* Set up the update header */
 
452
  message::UpdateHeader *header= statement.mutable_update_header();
 
453
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
454
 
 
455
  const char *schema_name= in_table->getShare()->db.str;
 
456
  const char *table_name= in_table->getShare()->table_name.str;
 
457
 
 
458
  table_metadata->set_schema_name(schema_name);
 
459
  table_metadata->set_table_name(table_name);
 
460
 
 
461
  Field *current_field;
 
462
  Field **table_fields= in_table->field;
 
463
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
464
  string_value->set_charset(system_charset_info);
 
465
 
 
466
  message::FieldMetadata *field_metadata;
 
467
 
 
468
  /* We will read all the table's fields... */
 
469
  in_table->setReadSet();
 
470
 
 
471
  while ((current_field= *table_fields++) != NULL) 
 
472
  {
 
473
    /*
 
474
     * We add the "key field metadata" -- i.e. the fields which is
 
475
     * the primary key for the table.
 
476
     */
 
477
    if (in_table->s->primary_key == current_field->field_index)
 
478
    {
 
479
      field_metadata= header->add_key_field_metadata();
 
480
      field_metadata->set_name(current_field->field_name);
 
481
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
482
    }
 
483
 
 
484
    /*
 
485
     * The below really should be moved into the Field API and Record API.  But for now
 
486
     * we do this crazy pointer fiddling to figure out if the current field
 
487
     * has been updated in the supplied record raw byte pointers.
 
488
     */
 
489
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
490
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
491
 
 
492
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
493
 
 
494
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
495
    {
 
496
      /* Field is changed from old to new */
 
497
      field_metadata= header->add_set_field_metadata();
 
498
      field_metadata->set_name(current_field->field_name);
 
499
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
500
    }
 
501
  }
 
502
}
 
503
void ReplicationServices::updateRecord(Session *in_session,
 
504
                                       Table *in_table, 
 
505
                                       const unsigned char *old_record, 
 
506
                                       const unsigned char *new_record)
 
507
{
 
508
  if (! is_active)
 
509
    return;
 
510
 
 
511
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
512
 
 
513
  message::UpdateData *data= statement.mutable_update_data();
 
514
  data->set_segment_id(1);
 
515
  data->set_end_segment(true);
 
516
  message::UpdateRecord *record= data->add_record();
 
517
 
 
518
  Field *current_field;
 
519
  Field **table_fields= in_table->field;
 
520
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
521
  string_value->set_charset(system_charset_info);
 
522
 
 
523
  while ((current_field= *table_fields++) != NULL) 
 
524
  {
 
525
    /*
 
526
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
527
     * but then realized that an UPDATE statement could potentially have different values for
 
528
     * the SET field.  For instance, imagine this SQL scenario:
 
529
     *
 
530
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
531
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
532
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
533
     *
 
534
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
535
     *
 
536
     * The below really should be moved into the Field API and Record API.  But for now
 
537
     * we do this crazy pointer fiddling to figure out if the current field
 
538
     * has been updated in the supplied record raw byte pointers.
 
539
     */
 
540
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
541
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
542
 
 
543
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
544
 
 
545
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
546
    {
 
547
      /* Store the original "read bit" for this field */
 
548
      bool is_read_set= current_field->isReadSet();
 
549
 
 
550
      /* We need to mark that we will "read" this field... */
 
551
      in_table->setReadSet(current_field->field_index);
 
552
 
 
553
      /* Read the string value of this field's contents */
 
554
      string_value= current_field->val_str(string_value);
 
555
 
 
556
      /* 
 
557
       * Reset the read bit after reading field to its original state.  This 
 
558
       * prevents the field from being included in the WHERE clause
 
559
       */
 
560
      current_field->setReadSet(is_read_set);
 
561
 
 
562
      record->add_after_value(string_value->c_ptr(), string_value->length());
 
563
      string_value->free();
 
564
    }
 
565
 
 
566
    /* 
 
567
     * Add the WHERE clause values now...for now, this means the
 
568
     * primary key field value.  Replication only supports tables
 
569
     * with a primary key.
 
570
     */
 
571
    if (in_table->s->primary_key == current_field->field_index)
 
572
    {
 
573
      /**
 
574
       * To say the below is ugly is an understatement. But it works.
 
575
       * 
 
576
       * @todo Move this crap into a real Record API.
 
577
       */
 
578
      string_value= current_field->val_str(string_value,
 
579
                                           old_record + 
 
580
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
581
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
582
      string_value->free();
 
583
    }
 
584
 
 
585
  }
 
586
}
 
587
 
 
588
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
 
589
                                                            Table *in_table) const
 
590
{
 
591
  message::Statement *statement= in_session->getStatementMessage();
 
592
 
 
593
  if (statement == NULL)
 
594
  {
 
595
    message::Transaction *transaction= getActiveTransaction(in_session);
 
596
    /* 
 
597
     * Transaction message initialized and set, but no statement created
 
598
     * yet.  We construct one and initialize it, here, then return the
 
599
     * message after attaching the new Statement message pointer to the 
 
600
     * Session for easy retrieval later...
 
601
     */
 
602
    statement= transaction->add_statement();
 
603
    setDeleteHeader(*statement, in_session, in_table);
 
604
    in_session->setStatementMessage(statement);
 
605
  }
 
606
  return *statement;
 
607
}
 
608
 
 
609
void ReplicationServices::setDeleteHeader(message::Statement &statement,
 
610
                                          Session *in_session,
 
611
                                          Table *in_table) const
 
612
{
 
613
  initStatement(statement, message::Statement::DELETE, in_session);
 
614
 
 
615
  /* 
 
616
   * Now we construct the specialized DeleteHeader message inside
 
617
   * the generalized message::Statement container...
 
618
   */
 
619
  message::DeleteHeader *header= statement.mutable_delete_header();
 
620
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
621
 
 
622
  const char *schema_name= in_table->getShare()->db.str;
 
623
  const char *table_name= in_table->getShare()->table_name.str;
 
624
 
 
625
  table_metadata->set_schema_name(schema_name);
 
626
  table_metadata->set_table_name(table_name);
 
627
 
 
628
  Field *current_field;
 
629
  Field **table_fields= in_table->field;
 
630
 
 
631
  message::FieldMetadata *field_metadata;
 
632
 
 
633
  while ((current_field= *table_fields++) != NULL) 
 
634
  {
 
635
    /* 
 
636
     * Add the WHERE clause values now...for now, this means the
 
637
     * primary key field value.  Replication only supports tables
 
638
     * with a primary key.
 
639
     */
 
640
    if (in_table->s->primary_key == current_field->field_index)
 
641
    {
 
642
      field_metadata= header->add_key_field_metadata();
 
643
      field_metadata->set_name(current_field->field_name);
 
644
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
645
    }
 
646
  }
 
647
}
 
648
 
 
649
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
 
650
{
 
651
  if (! is_active)
 
652
    return;
 
653
 
 
654
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
655
 
 
656
  message::DeleteData *data= statement.mutable_delete_data();
 
657
  data->set_segment_id(1);
 
658
  data->set_end_segment(true);
 
659
  message::DeleteRecord *record= data->add_record();
 
660
 
 
661
  Field *current_field;
 
662
  Field **table_fields= in_table->field;
 
663
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
664
  string_value->set_charset(system_charset_info);
 
665
 
 
666
  while ((current_field= *table_fields++) != NULL) 
 
667
  {
 
668
    /* 
 
669
     * Add the WHERE clause values now...for now, this means the
 
670
     * primary key field value.  Replication only supports tables
 
671
     * with a primary key.
 
672
     */
 
673
    if (in_table->s->primary_key == current_field->field_index)
 
674
    {
 
675
      string_value= current_field->val_str(string_value);
 
676
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
677
      /**
 
678
       * @TODO Store optional old record value in the before data member
 
679
       */
 
680
      string_value->free();
 
681
    }
 
682
  }
 
683
}
 
684
 
 
685
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
 
686
{
 
687
  if (! is_active)
 
688
    return;
 
689
  
 
690
  message::Transaction *transaction= getActiveTransaction(in_session);
 
691
  message::Statement *statement= transaction->add_statement();
 
692
 
 
693
  initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
694
 
 
695
  /* 
 
696
   * Construct the specialized TruncateTableStatement message and attach
 
697
   * it to the generic Statement message
 
698
   */
 
699
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
700
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
701
 
 
702
  const char *schema_name= in_table->getShare()->db.str;
 
703
  const char *table_name= in_table->getShare()->table_name.str;
 
704
 
 
705
  table_metadata->set_schema_name(schema_name);
 
706
  table_metadata->set_table_name(table_name);
 
707
 
 
708
  finalizeStatement(*statement, in_session);
 
709
 
 
710
  finalizeTransaction(*transaction, in_session);
 
711
  
 
712
  push(*transaction);
 
713
 
 
714
  cleanupTransaction(transaction, in_session);
 
715
}
 
716
 
 
717
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
718
{
 
719
  if (! is_active)
 
720
    return;
 
721
  
 
722
  message::Transaction *transaction= getActiveTransaction(in_session);
 
723
  message::Statement *statement= transaction->add_statement();
 
724
 
 
725
  initStatement(*statement, message::Statement::RAW_SQL, in_session);
 
726
  string query(in_query, in_query_len);
 
727
  statement->set_sql(query);
 
728
  finalizeStatement(*statement, in_session);
 
729
 
 
730
  finalizeTransaction(*transaction, in_session);
 
731
  
 
732
  push(*transaction);
 
733
 
 
734
  cleanupTransaction(transaction, in_session);
 
735
}
 
736
 
 
737
void ReplicationServices::push(drizzled::message::Transaction &to_push)
 
738
{
 
739
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
 
740
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
 
741
  appl_start_iter= appliers.begin();
 
742
 
 
743
  plugin::TransactionReplicator *cur_repl;
 
744
  plugin::TransactionApplier *cur_appl;
 
745
 
 
746
  while (repl_iter != replicators.end())
 
747
  {
 
748
    cur_repl= *repl_iter;
 
749
    if (! cur_repl->isEnabled())
 
750
    {
 
751
      ++repl_iter;
 
752
      continue;
 
753
    }
 
754
    
 
755
    appl_iter= appl_start_iter;
 
756
    while (appl_iter != appliers.end())
 
757
    {
 
758
      cur_appl= *appl_iter;
 
759
 
 
760
      if (! cur_appl->isEnabled())
 
761
      {
 
762
        ++appl_iter;
 
763
        continue;
 
764
      }
 
765
 
 
766
      cur_repl->replicate(cur_appl, to_push);
 
767
      
183
768
      /* 
184
769
       * We update the timestamp for the last applied Transaction so that
185
770
       * publisher plugins can ask the replication services when the
187
772
       * method.
188
773
       */
189
774
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
 
775
      ++appl_iter;
190
776
    }
191
 
    else
192
 
      return result;
 
777
    ++repl_iter;
193
778
  }
194
 
  return result;
195
779
}
196
780
 
197
 
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
 
781
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type)
198
782
{
199
 
  return replication_streams;
 
783
  switch (in_type)
 
784
  {
 
785
    case DRIZZLE_TYPE_LONGLONG:
 
786
      return message::Table::Field::BIGINT;
 
787
    case DRIZZLE_TYPE_LONG:
 
788
      return message::Table::Field::INTEGER;
 
789
    case DRIZZLE_TYPE_DECIMAL:
 
790
      return message::Table::Field::DECIMAL;
 
791
    case DRIZZLE_TYPE_DOUBLE:
 
792
      return message::Table::Field::DOUBLE;
 
793
    case DRIZZLE_TYPE_DATE:
 
794
      return message::Table::Field::DATE;
 
795
    case DRIZZLE_TYPE_DATETIME:
 
796
      return message::Table::Field::DATETIME;
 
797
    case DRIZZLE_TYPE_TIMESTAMP:
 
798
      return message::Table::Field::TIMESTAMP;
 
799
    case DRIZZLE_TYPE_VARCHAR:
 
800
      return message::Table::Field::VARCHAR;
 
801
    case DRIZZLE_TYPE_BLOB:
 
802
      return message::Table::Field::BLOB;
 
803
    case DRIZZLE_TYPE_ENUM:
 
804
      return message::Table::Field::ENUM;
 
805
    default:
 
806
      return message::Table::Field::VARCHAR;
 
807
  }
200
808
}
201
809
 
202
810
} /* namespace drizzled */