~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

  • Committer: Monty Taylor
  • Date: 2009-12-01 17:50:17 UTC
  • mto: (1235.1.1 push)
  • mto: This revision was merged to the branch mainline in revision 1236.
  • Revision ID: mordred@inaugust.com-20091201175017-o9yed6ssdiolghv4
Renamed instances of HEAP engine to MEMORY. Removed the alias.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2009-2010 Jay Pipes <jaypipes@gmail.com>
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com>
 
8
 *    Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
24
23
 
25
24
/**
26
25
 * @file Server-side utility which is responsible for managing the 
27
 
 * communication between the kernel and the replication plugins:
28
 
 *
29
 
 * - TransactionReplicator
30
 
 * - TransactionApplier
31
 
 * - Publisher
32
 
 * - Subscriber
33
 
 *
34
 
 * ReplicationServices is a bridge between replication modules and the kernel,
35
 
 * and its primary function is to  */
 
26
 * communication between the kernel, replicator plugins, and applier plugins.
 
27
 *
 
28
 * ReplicationServices is a bridge between modules and the kernel, and its
 
29
 * primary function is to take internal events (for instance the start of a 
 
30
 * transaction, the changing of a record, or the rollback of a transaction) 
 
31
 * and construct GPB Messages that are passed to the registered replicator and
 
32
 * applier plugins.
 
33
 *
 
34
 * The reason for this functionality is to encapsulate all communication
 
35
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
36
 * Instead of the plugin having to understand the (often fluidly changing)
 
37
 * mechanics of the kernel, all the plugin needs to understand is the message
 
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
39
 * these messages.
 
40
 *
 
41
 * @see /drizzled/message/transaction.proto
 
42
 *
 
43
 * @todo
 
44
 *
 
45
 * We really should store the raw bytes in the messages, not the
 
46
 * String value of the Field.  But, to do that, the
 
47
 * statement_transform library needs first to be updated
 
48
 * to include the transformation code to convert raw
 
49
 * Drizzle-internal Field byte representation into something
 
50
 * plugins can understand.
 
51
 */
36
52
 
37
 
#include "config.h"
 
53
#include "drizzled/server_includes.h"
38
54
#include "drizzled/replication_services.h"
39
55
#include "drizzled/plugin/transaction_replicator.h"
40
56
#include "drizzled/plugin/transaction_applier.h"
41
57
#include "drizzled/message/transaction.pb.h"
 
58
#include "drizzled/message/table.pb.h"
42
59
#include "drizzled/gettext.h"
43
60
#include "drizzled/session.h"
44
 
#include "drizzled/error.h"
45
61
 
46
 
#include <string>
47
62
#include <vector>
48
 
#include <algorithm>
49
63
 
50
64
using namespace std;
51
65
 
52
66
namespace drizzled
53
67
{
54
68
 
55
 
ReplicationServices::ReplicationServices() :
56
 
  is_active(false)
57
 
{
58
 
}
59
 
 
60
 
void ReplicationServices::normalizeReplicatorName(string &name)
61
 
{
62
 
  transform(name.begin(),
63
 
            name.end(),
64
 
            name.begin(),
65
 
            ::tolower);
66
 
  if (name.find("replicator") == string::npos)
67
 
    name.append("replicator", 10);
68
 
  {
69
 
    size_t found_underscore= name.find('_');
70
 
    while (found_underscore != string::npos)
71
 
    {
72
 
      name.erase(found_underscore, 1);
73
 
      found_underscore= name.find('_');
74
 
    }
75
 
  }
76
 
}
77
 
 
78
 
bool ReplicationServices::evaluateRegisteredPlugins()
79
 
{
80
 
  /* 
81
 
   * We loop through appliers that have registered with us
82
 
   * and attempts to pair the applier with its requested
83
 
   * replicator.  If an applier has requested a replicator
84
 
   * that has either not been built or has not registered
85
 
   * with the replication services, we print an error and
86
 
   * return false
87
 
   */
88
 
  if (appliers.empty())
89
 
    return true;
90
 
 
91
 
  if (replicators.empty() && not appliers.empty())
92
 
  {
93
 
    errmsg_printf(ERRMSG_LVL_ERROR,
94
 
                  N_("You registered a TransactionApplier plugin but no "
95
 
                     "TransactionReplicator plugins were registered.\n"));
96
 
    return false;
97
 
  }
98
 
 
99
 
  for (Appliers::iterator appl_iter= appliers.begin();
100
 
       appl_iter != appliers.end();
101
 
       ++appl_iter)
102
 
  {
103
 
    plugin::TransactionApplier *applier= (*appl_iter).second;
104
 
    string requested_replicator_name= (*appl_iter).first;
105
 
    normalizeReplicatorName(requested_replicator_name);
106
 
 
107
 
    bool found= false;
108
 
    Replicators::iterator repl_iter;
109
 
    for (repl_iter= replicators.begin();
110
 
         repl_iter != replicators.end();
111
 
         ++repl_iter)
112
 
    {
113
 
      string replicator_name= (*repl_iter)->getName();
114
 
      normalizeReplicatorName(replicator_name);
115
 
 
116
 
      if (requested_replicator_name.compare(replicator_name) == 0)
117
 
      {
118
 
        found= true;
119
 
        break;
120
 
      }
121
 
    }
122
 
    if (not found)
123
 
    {
124
 
      errmsg_printf(ERRMSG_LVL_ERROR,
125
 
                    N_("You registered a TransactionApplier plugin but no "
126
 
                       "TransactionReplicator plugins were registered that match the "
127
 
                       "requested replicator name of '%s'.\n"
128
 
                       "We have deactivated the TransactionApplier '%s'.\n"),
129
 
                       requested_replicator_name.c_str(),
130
 
                       applier->getName().c_str());
131
 
      applier->deactivate();
132
 
      return false;
133
 
    }
134
 
    else
135
 
    {
136
 
      replication_streams.push_back(make_pair(*repl_iter, applier));
137
 
    }
138
 
  }
139
 
  is_active= true;
140
 
  return true;
 
69
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type);
 
70
 
 
71
ReplicationServices::ReplicationServices()
 
72
{
 
73
  is_active= false;
 
74
}
 
75
 
 
76
void ReplicationServices::evaluateActivePlugins()
 
77
{
 
78
  /* 
 
79
   * We loop through replicators and appliers, evaluating
 
80
   * whether or not there is at least one active replicator
 
81
   * and one active applier.  If not, we set is_active
 
82
   * to false.
 
83
   */
 
84
  bool tmp_is_active= false;
 
85
 
 
86
  if (replicators.empty() || appliers.empty())
 
87
  {
 
88
    is_active= false;
 
89
    return;
 
90
  }
 
91
 
 
92
  /* 
 
93
   * Determine if any remaining replicators and if those
 
94
   * replicators are active...if not, set is_active
 
95
   * to false
 
96
   */
 
97
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
 
98
  while (repl_iter != replicators.end())
 
99
  {
 
100
    if ((*repl_iter)->isEnabled())
 
101
    {
 
102
      tmp_is_active= true;
 
103
      break;
 
104
    }
 
105
    ++repl_iter;
 
106
  }
 
107
  if (! tmp_is_active)
 
108
  {
 
109
    /* No active replicators. Set is_active to false and exit. */
 
110
    is_active= false;
 
111
    return;
 
112
  }
 
113
 
 
114
  /* 
 
115
   * OK, we know there's at least one active replicator.
 
116
   *
 
117
   * Now determine if any remaining replicators and if those
 
118
   * replicators are active...if not, set is_active
 
119
   * to false
 
120
   */
 
121
  vector<plugin::TransactionApplier *>::iterator appl_iter= appliers.begin();
 
122
  while (appl_iter != appliers.end())
 
123
  {
 
124
    if ((*appl_iter)->isEnabled())
 
125
    {
 
126
      is_active= true;
 
127
      return;
 
128
    }
 
129
    ++appl_iter;
 
130
  }
 
131
  /* If we get here, there are no active appliers */
 
132
  is_active= false;
141
133
}
142
134
 
143
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
144
136
{
145
137
  replicators.push_back(in_replicator);
 
138
  evaluateActivePlugins();
146
139
}
147
140
 
148
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
149
142
{
150
143
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
151
 
}
152
 
 
153
 
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
154
 
{
155
 
  appliers.push_back(make_pair(requested_replicator_name, in_applier));
156
 
}
157
 
 
158
 
void ReplicationServices::detachApplier(plugin::TransactionApplier *)
159
 
{
 
144
  evaluateActivePlugins();
 
145
}
 
146
 
 
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
 
148
{
 
149
  appliers.push_back(in_applier);
 
150
  evaluateActivePlugins();
 
151
}
 
152
 
 
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
 
154
{
 
155
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
 
156
  evaluateActivePlugins();
160
157
}
161
158
 
162
159
bool ReplicationServices::isActive() const
164
161
  return is_active;
165
162
}
166
163
 
167
 
plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
168
 
                                                                          message::Transaction &to_push)
169
 
{
170
 
  plugin::ReplicationReturnCode result= plugin::SUCCESS;
171
 
 
172
 
  for (ReplicationStreams::iterator iter= replication_streams.begin();
173
 
       iter != replication_streams.end();
174
 
       ++iter)
175
 
  {
176
 
    plugin::TransactionReplicator *cur_repl= (*iter).first;
177
 
    plugin::TransactionApplier *cur_appl= (*iter).second;
178
 
 
179
 
    result= cur_repl->replicate(cur_appl, in_session, to_push);
180
 
 
181
 
    if (result == plugin::SUCCESS)
182
 
    {
 
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
 
165
{
 
166
  message::Transaction *transaction= in_session->getTransactionMessage();
 
167
 
 
168
  if (unlikely(transaction == NULL))
 
169
  {
 
170
    /* 
 
171
     * Allocate and initialize a new transaction message 
 
172
     * for this Session object.  Session is responsible for
 
173
     * deleting transaction message when done with it.
 
174
     */
 
175
    transaction= new (nothrow) message::Transaction();
 
176
    initTransaction(*transaction, in_session);
 
177
    in_session->setTransactionMessage(transaction);
 
178
    return transaction;
 
179
  }
 
180
  else
 
181
    return transaction;
 
182
}
 
183
 
 
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
 
185
                                          Session *in_session) const
 
186
{
 
187
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
188
  trx->set_server_id(in_session->getServerId());
 
189
  trx->set_transaction_id(in_session->getTransactionId());
 
190
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
191
}
 
192
 
 
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
 
194
                                              Session *in_session) const
 
195
{
 
196
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
197
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
198
}
 
199
 
 
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
 
201
                                             Session *in_session) const
 
202
{
 
203
  delete in_transaction;
 
204
  in_session->setStatementMessage(NULL);
 
205
  in_session->setTransactionMessage(NULL);
 
206
}
 
207
 
 
208
void ReplicationServices::startNormalTransaction(Session *in_session)
 
209
{
 
210
  if (! is_active)
 
211
    return;
 
212
 
 
213
  /* Safeguard...other transactions should have already been closed */
 
214
  message::Transaction *transaction= in_session->getTransactionMessage();
 
215
  assert(transaction == NULL);
 
216
  
 
217
  /* 
 
218
   * A "normal" transaction for the replication services component
 
219
   * is simply a Transaction message, nothing more...so we create a
 
220
   * new message and attach it to the Session object.
 
221
   *
 
222
   * Allocate and initialize a new transaction message 
 
223
   * for this Session object.  This memory is deleted when the
 
224
   * transaction is pushed out to replicators.  Session is NOT
 
225
   * responsible for deleting this memory.
 
226
   */
 
227
  transaction= new (nothrow) message::Transaction();
 
228
  initTransaction(*transaction, in_session);
 
229
  in_session->setTransactionMessage(transaction);
 
230
}
 
231
 
 
232
void ReplicationServices::commitNormalTransaction(Session *in_session)
 
233
{
 
234
  if (! is_active)
 
235
    return;
 
236
 
 
237
  /* If there is an active statement message, finalize it */
 
238
  message::Statement *statement= in_session->getStatementMessage();
 
239
 
 
240
  if (statement != NULL)
 
241
  {
 
242
    finalizeStatement(*statement, in_session);
 
243
  }
 
244
  else
 
245
    return; /* No data modification occurred inside the transaction */
 
246
  
 
247
  message::Transaction* transaction= getActiveTransaction(in_session);
 
248
 
 
249
  finalizeTransaction(*transaction, in_session);
 
250
  
 
251
  push(*transaction);
 
252
 
 
253
  cleanupTransaction(transaction, in_session);
 
254
}
 
255
 
 
256
void ReplicationServices::initStatement(message::Statement &statement,
 
257
                                        message::Statement::Type in_type,
 
258
                                        Session *in_session) const
 
259
{
 
260
  statement.set_type(in_type);
 
261
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
262
  /** @TODO Set sql string optionally */
 
263
}
 
264
 
 
265
void ReplicationServices::finalizeStatement(message::Statement &statement,
 
266
                                            Session *in_session) const
 
267
{
 
268
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
269
  in_session->setStatementMessage(NULL);
 
270
}
 
271
 
 
272
void ReplicationServices::rollbackTransaction(Session *in_session)
 
273
{
 
274
  if (! is_active)
 
275
    return;
 
276
  
 
277
  message::Transaction *transaction= getActiveTransaction(in_session);
 
278
 
 
279
  /* 
 
280
   * We clear the current transaction, reset its transaction ID properly, 
 
281
   * then set its type to ROLLBACK and push it out to the replicators.
 
282
   */
 
283
  transaction->Clear();
 
284
  initTransaction(*transaction, in_session);
 
285
 
 
286
  message::Statement *statement= transaction->add_statement();
 
287
 
 
288
  initStatement(*statement, message::Statement::ROLLBACK, in_session);
 
289
  finalizeStatement(*statement, in_session);
 
290
 
 
291
  finalizeTransaction(*transaction, in_session);
 
292
  
 
293
  push(*transaction);
 
294
 
 
295
  cleanupTransaction(transaction, in_session);
 
296
}
 
297
 
 
298
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
 
299
                                                            Table *in_table) const
 
300
{
 
301
  message::Statement *statement= in_session->getStatementMessage();
 
302
 
 
303
  if (statement == NULL)
 
304
  {
 
305
    message::Transaction *transaction= getActiveTransaction(in_session);
 
306
    /* 
 
307
     * Transaction message initialized and set, but no statement created
 
308
     * yet.  We construct one and initialize it, here, then return the
 
309
     * message after attaching the new Statement message pointer to the 
 
310
     * Session for easy retrieval later...
 
311
     */
 
312
    statement= transaction->add_statement();
 
313
    setInsertHeader(*statement, in_session, in_table);
 
314
    in_session->setStatementMessage(statement);
 
315
  }
 
316
  return *statement;
 
317
}
 
318
 
 
319
void ReplicationServices::setInsertHeader(message::Statement &statement,
 
320
                                          Session *in_session,
 
321
                                          Table *in_table) const
 
322
{
 
323
  initStatement(statement, message::Statement::INSERT, in_session);
 
324
 
 
325
  /* 
 
326
   * Now we construct the specialized InsertHeader message inside
 
327
   * the generalized message::Statement container...
 
328
   */
 
329
  /* Set up the insert header */
 
330
  message::InsertHeader *header= statement.mutable_insert_header();
 
331
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
332
 
 
333
  const char *schema_name= in_table->getShare()->db.str;
 
334
  const char *table_name= in_table->getShare()->table_name.str;
 
335
 
 
336
  table_metadata->set_schema_name(schema_name);
 
337
  table_metadata->set_table_name(table_name);
 
338
 
 
339
  Field *current_field;
 
340
  Field **table_fields= in_table->field;
 
341
 
 
342
  message::FieldMetadata *field_metadata;
 
343
 
 
344
  /* We will read all the table's fields... */
 
345
  in_table->setReadSet();
 
346
 
 
347
  while ((current_field= *table_fields++) != NULL) 
 
348
  {
 
349
    field_metadata= header->add_field_metadata();
 
350
    field_metadata->set_name(current_field->field_name);
 
351
    field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
352
  }
 
353
}
 
354
 
 
355
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
 
356
{
 
357
  if (! is_active)
 
358
    return;
 
359
 
 
360
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
361
 
 
362
  message::InsertData *data= statement.mutable_insert_data();
 
363
  data->set_segment_id(1);
 
364
  data->set_end_segment(true);
 
365
  message::InsertRecord *record= data->add_record();
 
366
 
 
367
  Field *current_field;
 
368
  Field **table_fields= in_table->field;
 
369
 
 
370
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
371
  string_value->set_charset(system_charset_info);
 
372
 
 
373
  /* We will read all the table's fields... */
 
374
  in_table->setReadSet();
 
375
 
 
376
  while ((current_field= *table_fields++) != NULL) 
 
377
  {
 
378
    string_value= current_field->val_str(string_value);
 
379
    record->add_insert_value(string_value->c_ptr(), string_value->length());
 
380
    string_value->free();
 
381
  }
 
382
}
 
383
 
 
384
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
 
385
                                                            Table *in_table,
 
386
                                                            const unsigned char *old_record, 
 
387
                                                            const unsigned char *new_record) const
 
388
{
 
389
  message::Statement *statement= in_session->getStatementMessage();
 
390
 
 
391
  if (statement == NULL)
 
392
  {
 
393
    message::Transaction *transaction= getActiveTransaction(in_session);
 
394
    /* 
 
395
     * Transaction message initialized and set, but no statement created
 
396
     * yet.  We construct one and initialize it, here, then return the
 
397
     * message after attaching the new Statement message pointer to the 
 
398
     * Session for easy retrieval later...
 
399
     */
 
400
    statement= transaction->add_statement();
 
401
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
402
    in_session->setStatementMessage(statement);
 
403
  }
 
404
  return *statement;
 
405
}
 
406
 
 
407
void ReplicationServices::setUpdateHeader(message::Statement &statement,
 
408
                                          Session *in_session,
 
409
                                          Table *in_table,
 
410
                                          const unsigned char *old_record, 
 
411
                                          const unsigned char *new_record) const
 
412
{
 
413
  initStatement(statement, message::Statement::UPDATE, in_session);
 
414
 
 
415
  /* 
 
416
   * Now we construct the specialized UpdateHeader message inside
 
417
   * the generalized message::Statement container...
 
418
   */
 
419
  /* Set up the update header */
 
420
  message::UpdateHeader *header= statement.mutable_update_header();
 
421
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
422
 
 
423
  const char *schema_name= in_table->getShare()->db.str;
 
424
  const char *table_name= in_table->getShare()->table_name.str;
 
425
 
 
426
  table_metadata->set_schema_name(schema_name);
 
427
  table_metadata->set_table_name(table_name);
 
428
 
 
429
  Field *current_field;
 
430
  Field **table_fields= in_table->field;
 
431
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
432
  string_value->set_charset(system_charset_info);
 
433
 
 
434
  message::FieldMetadata *field_metadata;
 
435
 
 
436
  /* We will read all the table's fields... */
 
437
  in_table->setReadSet();
 
438
 
 
439
  while ((current_field= *table_fields++) != NULL) 
 
440
  {
 
441
    /*
 
442
     * We add the "key field metadata" -- i.e. the fields which is
 
443
     * the primary key for the table.
 
444
     */
 
445
    if (in_table->s->primary_key == current_field->field_index)
 
446
    {
 
447
      field_metadata= header->add_key_field_metadata();
 
448
      field_metadata->set_name(current_field->field_name);
 
449
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
450
    }
 
451
 
 
452
    /*
 
453
     * The below really should be moved into the Field API and Record API.  But for now
 
454
     * we do this crazy pointer fiddling to figure out if the current field
 
455
     * has been updated in the supplied record raw byte pointers.
 
456
     */
 
457
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
458
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
459
 
 
460
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
461
 
 
462
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
463
    {
 
464
      /* Field is changed from old to new */
 
465
      field_metadata= header->add_set_field_metadata();
 
466
      field_metadata->set_name(current_field->field_name);
 
467
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
468
    }
 
469
  }
 
470
}
 
471
void ReplicationServices::updateRecord(Session *in_session,
 
472
                                       Table *in_table, 
 
473
                                       const unsigned char *old_record, 
 
474
                                       const unsigned char *new_record)
 
475
{
 
476
  if (! is_active)
 
477
    return;
 
478
 
 
479
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
480
 
 
481
  message::UpdateData *data= statement.mutable_update_data();
 
482
  data->set_segment_id(1);
 
483
  data->set_end_segment(true);
 
484
  message::UpdateRecord *record= data->add_record();
 
485
 
 
486
  Field *current_field;
 
487
  Field **table_fields= in_table->field;
 
488
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
489
  string_value->set_charset(system_charset_info);
 
490
 
 
491
  while ((current_field= *table_fields++) != NULL) 
 
492
  {
 
493
    /*
 
494
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
495
     * but then realized that an UPDATE statement could potentially have different values for
 
496
     * the SET field.  For instance, imagine this SQL scenario:
 
497
     *
 
498
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
499
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
500
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
501
     *
 
502
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
503
     *
 
504
     * The below really should be moved into the Field API and Record API.  But for now
 
505
     * we do this crazy pointer fiddling to figure out if the current field
 
506
     * has been updated in the supplied record raw byte pointers.
 
507
     */
 
508
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
509
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
510
 
 
511
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
512
 
 
513
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
514
    {
 
515
      /* Store the original "read bit" for this field */
 
516
      bool is_read_set= current_field->isReadSet();
 
517
 
 
518
      /* We need to mark that we will "read" this field... */
 
519
      in_table->setReadSet(current_field->field_index);
 
520
 
 
521
      /* Read the string value of this field's contents */
 
522
      string_value= current_field->val_str(string_value);
 
523
 
 
524
      /* 
 
525
       * Reset the read bit after reading field to its original state.  This 
 
526
       * prevents the field from being included in the WHERE clause
 
527
       */
 
528
      current_field->setReadSet(is_read_set);
 
529
 
 
530
      record->add_after_value(string_value->c_ptr(), string_value->length());
 
531
      string_value->free();
 
532
    }
 
533
 
 
534
    /* 
 
535
     * Add the WHERE clause values now...the fields which return true
 
536
     * for isReadSet() are in the WHERE clause.  For tables with no
 
537
     * primary or unique key, all fields will be returned.
 
538
     */
 
539
    if (current_field->isReadSet())
 
540
    {
 
541
      string_value= current_field->val_str(string_value);
 
542
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
543
      /**
 
544
       * @TODO Store optional old record value in the before data member
 
545
       */
 
546
      string_value->free();
 
547
    }
 
548
 
 
549
  }
 
550
}
 
551
 
 
552
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
 
553
                                                            Table *in_table) const
 
554
{
 
555
  message::Statement *statement= in_session->getStatementMessage();
 
556
 
 
557
  if (statement == NULL)
 
558
  {
 
559
    message::Transaction *transaction= getActiveTransaction(in_session);
 
560
    /* 
 
561
     * Transaction message initialized and set, but no statement created
 
562
     * yet.  We construct one and initialize it, here, then return the
 
563
     * message after attaching the new Statement message pointer to the 
 
564
     * Session for easy retrieval later...
 
565
     */
 
566
    statement= transaction->add_statement();
 
567
    setDeleteHeader(*statement, in_session, in_table);
 
568
    in_session->setStatementMessage(statement);
 
569
  }
 
570
  return *statement;
 
571
}
 
572
 
 
573
void ReplicationServices::setDeleteHeader(message::Statement &statement,
 
574
                                          Session *in_session,
 
575
                                          Table *in_table) const
 
576
{
 
577
  initStatement(statement, message::Statement::DELETE, in_session);
 
578
 
 
579
  /* 
 
580
   * Now we construct the specialized DeleteHeader message inside
 
581
   * the generalized message::Statement container...
 
582
   */
 
583
  message::DeleteHeader *header= statement.mutable_delete_header();
 
584
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
585
 
 
586
  const char *schema_name= in_table->getShare()->db.str;
 
587
  const char *table_name= in_table->getShare()->table_name.str;
 
588
 
 
589
  table_metadata->set_schema_name(schema_name);
 
590
  table_metadata->set_table_name(table_name);
 
591
 
 
592
  Field *current_field;
 
593
  Field **table_fields= in_table->field;
 
594
 
 
595
  message::FieldMetadata *field_metadata;
 
596
 
 
597
  while ((current_field= *table_fields++) != NULL) 
 
598
  {
 
599
    /* 
 
600
     * Add the WHERE clause values now...for now, this means the
 
601
     * primary key field value.  Replication only supports tables
 
602
     * with a primary key.
 
603
     */
 
604
    if (in_table->s->primary_key == current_field->field_index)
 
605
    {
 
606
      field_metadata= header->add_key_field_metadata();
 
607
      field_metadata->set_name(current_field->field_name);
 
608
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
609
    }
 
610
  }
 
611
}
 
612
 
 
613
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
 
614
{
 
615
  if (! is_active)
 
616
    return;
 
617
 
 
618
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
619
 
 
620
  message::DeleteData *data= statement.mutable_delete_data();
 
621
  data->set_segment_id(1);
 
622
  data->set_end_segment(true);
 
623
  message::DeleteRecord *record= data->add_record();
 
624
 
 
625
  Field *current_field;
 
626
  Field **table_fields= in_table->field;
 
627
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
628
  string_value->set_charset(system_charset_info);
 
629
 
 
630
  while ((current_field= *table_fields++) != NULL) 
 
631
  {
 
632
    /* 
 
633
     * Add the WHERE clause values now...for now, this means the
 
634
     * primary key field value.  Replication only supports tables
 
635
     * with a primary key.
 
636
     */
 
637
    if (in_table->s->primary_key == current_field->field_index)
 
638
    {
 
639
      string_value= current_field->val_str(string_value);
 
640
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
641
      /**
 
642
       * @TODO Store optional old record value in the before data member
 
643
       */
 
644
      string_value->free();
 
645
    }
 
646
  }
 
647
}
 
648
 
 
649
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
 
650
{
 
651
  if (! is_active)
 
652
    return;
 
653
  
 
654
  message::Transaction *transaction= getActiveTransaction(in_session);
 
655
  message::Statement *statement= transaction->add_statement();
 
656
 
 
657
  initStatement(*statement, message::Statement::RAW_SQL, in_session);
 
658
  string query(in_query, in_query_len);
 
659
  statement->set_sql(query);
 
660
  finalizeStatement(*statement, in_session);
 
661
 
 
662
  finalizeTransaction(*transaction, in_session);
 
663
  
 
664
  push(*transaction);
 
665
 
 
666
  cleanupTransaction(transaction, in_session);
 
667
}
 
668
 
 
669
void ReplicationServices::push(drizzled::message::Transaction &to_push)
 
670
{
 
671
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
 
672
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
 
673
  appl_start_iter= appliers.begin();
 
674
 
 
675
  plugin::TransactionReplicator *cur_repl;
 
676
  plugin::TransactionApplier *cur_appl;
 
677
 
 
678
  while (repl_iter != replicators.end())
 
679
  {
 
680
    cur_repl= *repl_iter;
 
681
    if (! cur_repl->isEnabled())
 
682
    {
 
683
      ++repl_iter;
 
684
      continue;
 
685
    }
 
686
    
 
687
    appl_iter= appl_start_iter;
 
688
    while (appl_iter != appliers.end())
 
689
    {
 
690
      cur_appl= *appl_iter;
 
691
 
 
692
      if (! cur_appl->isEnabled())
 
693
      {
 
694
        ++appl_iter;
 
695
        continue;
 
696
      }
 
697
 
 
698
      cur_repl->replicate(cur_appl, to_push);
 
699
      
183
700
      /* 
184
701
       * We update the timestamp for the last applied Transaction so that
185
702
       * publisher plugins can ask the replication services when the
187
704
       * method.
188
705
       */
189
706
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
 
707
      ++appl_iter;
190
708
    }
191
 
    else
192
 
      return result;
 
709
    ++repl_iter;
193
710
  }
194
 
  return result;
195
711
}
196
712
 
197
 
ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
 
713
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type)
198
714
{
199
 
  return replication_streams;
 
715
  switch (in_type)
 
716
  {
 
717
    case DRIZZLE_TYPE_LONGLONG:
 
718
      return message::Table::Field::BIGINT;
 
719
    case DRIZZLE_TYPE_LONG:
 
720
      return message::Table::Field::INTEGER;
 
721
    case DRIZZLE_TYPE_DECIMAL:
 
722
      return message::Table::Field::DECIMAL;
 
723
    case DRIZZLE_TYPE_DOUBLE:
 
724
      return message::Table::Field::DOUBLE;
 
725
    case DRIZZLE_TYPE_DATE:
 
726
      return message::Table::Field::DATE;
 
727
    case DRIZZLE_TYPE_DATETIME:
 
728
      return message::Table::Field::DATETIME;
 
729
    case DRIZZLE_TYPE_TIMESTAMP:
 
730
      return message::Table::Field::TIMESTAMP;
 
731
    case DRIZZLE_TYPE_VARCHAR:
 
732
      return message::Table::Field::VARCHAR;
 
733
    case DRIZZLE_TYPE_BLOB:
 
734
      return message::Table::Field::BLOB;
 
735
    case DRIZZLE_TYPE_ENUM:
 
736
      return message::Table::Field::ENUM;
 
737
    default:
 
738
      return message::Table::Field::VARCHAR;
 
739
  }
200
740
}
201
741
 
202
742
} /* namespace drizzled */