~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

Added code necessary for building plugins dynamically.
Merged in changes from lifeless to allow autoreconf to work.
Touching plugin.ini files now triggers a rebuid - so config/autorun.sh is no
longer required to be run after touching those.
Removed the duplicate plugin names - also removed the issue that getting them
different would silently fail weirdly later.

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
 * format, and GPB messages provide a nice, clear, and versioned format for 
39
39
 * these messages.
40
40
 *
41
 
 * @see /drizzled/message/replication.proto
 
41
 * @see /drizzled/message/transaction.proto
 
42
 *
 
43
 * @TODO
 
44
 *
 
45
 * We really should store the raw bytes in the messages, not the
 
46
 * String value of the Field.  But, to do that, the
 
47
 * statement_transform library needs first to be updated
 
48
 * to include the transformation code to convert raw
 
49
 * Drizzle-internal Field byte representation into something
 
50
 * plugins can understand.
42
51
 */
43
52
 
44
53
#include "drizzled/server_includes.h"
45
54
#include "drizzled/replication_services.h"
46
 
#include "drizzled/plugin/command_replicator.h"
47
 
#include "drizzled/plugin/command_applier.h"
48
 
#include "drizzled/message/replication.pb.h"
 
55
#include "drizzled/plugin/transaction_replicator.h"
 
56
#include "drizzled/plugin/transaction_applier.h"
 
57
#include "drizzled/message/transaction.pb.h"
49
58
#include "drizzled/message/table.pb.h"
50
59
#include "drizzled/gettext.h"
51
60
#include "drizzled/session.h"
57
66
namespace drizzled
58
67
{
59
68
 
 
69
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type);
 
70
 
60
71
ReplicationServices::ReplicationServices()
61
72
{
62
73
  is_active= false;
83
94
   * replicators are active...if not, set is_active
84
95
   * to false
85
96
   */
86
 
  vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
 
97
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
87
98
  while (repl_iter != replicators.end())
88
99
  {
89
100
    if ((*repl_iter)->isEnabled())
107
118
   * replicators are active...if not, set is_active
108
119
   * to false
109
120
   */
110
 
  vector<plugin::CommandApplier *>::iterator appl_iter= appliers.begin();
 
121
  vector<plugin::TransactionApplier *>::iterator appl_iter= appliers.begin();
111
122
  while (appl_iter != appliers.end())
112
123
  {
113
124
    if ((*appl_iter)->isEnabled())
121
132
  is_active= false;
122
133
}
123
134
 
124
 
void ReplicationServices::attachReplicator(plugin::CommandReplicator *in_replicator)
 
135
void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
125
136
{
126
137
  replicators.push_back(in_replicator);
127
138
  evaluateActivePlugins();
128
139
}
129
140
 
130
 
void ReplicationServices::detachReplicator(plugin::CommandReplicator *in_replicator)
 
141
void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
131
142
{
132
143
  replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
133
144
  evaluateActivePlugins();
134
145
}
135
146
 
136
 
void ReplicationServices::attachApplier(plugin::CommandApplier *in_applier)
 
147
void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier)
137
148
{
138
149
  appliers.push_back(in_applier);
139
150
  evaluateActivePlugins();
140
151
}
141
152
 
142
 
void ReplicationServices::detachApplier(plugin::CommandApplier *in_applier)
 
153
void ReplicationServices::detachApplier(plugin::TransactionApplier *in_applier)
143
154
{
144
155
  appliers.erase(std::find(appliers.begin(), appliers.end(), in_applier));
145
156
  evaluateActivePlugins();
150
161
  return is_active;
151
162
}
152
163
 
153
 
void ReplicationServices::setCommandTransactionContext(message::Command &in_command,
154
 
                                                       Session *in_session) const
155
 
{
156
 
  message::TransactionContext *trx= in_command.mutable_transaction_context();
 
164
message::Transaction *ReplicationServices::getActiveTransaction(Session *in_session) const
 
165
{
 
166
  message::Transaction *transaction= in_session->getTransactionMessage();
 
167
 
 
168
  if (unlikely(transaction == NULL))
 
169
  {
 
170
    /* 
 
171
     * Allocate and initialize a new transaction message 
 
172
     * for this Session object.  Session is responsible for
 
173
     * deleting transaction message when done with it.
 
174
     */
 
175
    transaction= new (nothrow) message::Transaction();
 
176
    initTransaction(*transaction, in_session);
 
177
    in_session->setTransactionMessage(transaction);
 
178
    return transaction;
 
179
  }
 
180
  else
 
181
    return transaction;
 
182
}
 
183
 
 
184
void ReplicationServices::initTransaction(message::Transaction &in_transaction,
 
185
                                          Session *in_session) const
 
186
{
 
187
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
157
188
  trx->set_server_id(in_session->getServerId());
158
189
  trx->set_transaction_id(in_session->getTransactionId());
159
 
 
160
 
  in_command.set_session_id((uint32_t) in_session->getSessionId());
161
 
}
162
 
 
163
 
void ReplicationServices::startTransaction(Session *in_session)
164
 
{
165
 
  if (! is_active)
166
 
    return;
167
 
  
168
 
  message::Command command;
169
 
  command.set_type(message::Command::START_TRANSACTION);
170
 
  command.set_timestamp(in_session->getCurrentTimestamp());
171
 
 
172
 
  setCommandTransactionContext(command, in_session);
173
 
  
174
 
  push(command);
175
 
}
176
 
 
177
 
void ReplicationServices::commitTransaction(Session *in_session)
178
 
{
179
 
  if (! is_active)
180
 
    return;
181
 
  
182
 
  message::Command command;
183
 
  command.set_type(message::Command::COMMIT);
184
 
  command.set_timestamp(in_session->getCurrentTimestamp());
185
 
 
186
 
  setCommandTransactionContext(command, in_session);
187
 
  
188
 
  push(command);
 
190
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
191
}
 
192
 
 
193
void ReplicationServices::finalizeTransaction(message::Transaction &in_transaction,
 
194
                                              Session *in_session) const
 
195
{
 
196
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
197
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
198
}
 
199
 
 
200
void ReplicationServices::cleanupTransaction(message::Transaction *in_transaction,
 
201
                                             Session *in_session) const
 
202
{
 
203
  delete in_transaction;
 
204
  in_session->setStatementMessage(NULL);
 
205
  in_session->setTransactionMessage(NULL);
 
206
}
 
207
 
 
208
void ReplicationServices::startNormalTransaction(Session *in_session)
 
209
{
 
210
  if (! is_active)
 
211
    return;
 
212
 
 
213
  /* Safeguard...other transactions should have already been closed */
 
214
  message::Transaction *transaction= in_session->getTransactionMessage();
 
215
  assert(transaction == NULL);
 
216
  
 
217
  /* 
 
218
   * A "normal" transaction for the replication services component
 
219
   * is simply a Transaction message, nothing more...so we create a
 
220
   * new message and attach it to the Session object.
 
221
   *
 
222
   * Allocate and initialize a new transaction message 
 
223
   * for this Session object.  This memory is deleted when the
 
224
   * transaction is pushed out to replicators.  Session is NOT
 
225
   * responsible for deleting this memory.
 
226
   */
 
227
  transaction= new (nothrow) message::Transaction();
 
228
  initTransaction(*transaction, in_session);
 
229
  in_session->setTransactionMessage(transaction);
 
230
}
 
231
 
 
232
void ReplicationServices::commitNormalTransaction(Session *in_session)
 
233
{
 
234
  if (! is_active)
 
235
    return;
 
236
 
 
237
  /* If there is an active statement message, finalize it */
 
238
  message::Statement *statement= in_session->getStatementMessage();
 
239
 
 
240
  if (statement != NULL)
 
241
  {
 
242
    finalizeStatement(*statement, in_session);
 
243
  }
 
244
  else
 
245
    return; /* No data modification occurred inside the transaction */
 
246
  
 
247
  message::Transaction* transaction= getActiveTransaction(in_session);
 
248
 
 
249
  finalizeTransaction(*transaction, in_session);
 
250
  
 
251
  push(*transaction);
 
252
 
 
253
  cleanupTransaction(transaction, in_session);
 
254
}
 
255
 
 
256
void ReplicationServices::initStatement(message::Statement &statement,
 
257
                                        message::Statement::Type in_type,
 
258
                                        Session *in_session) const
 
259
{
 
260
  statement.set_type(in_type);
 
261
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
262
  /** @TODO Set sql string optionally */
 
263
}
 
264
 
 
265
void ReplicationServices::finalizeStatement(message::Statement &statement,
 
266
                                            Session *in_session) const
 
267
{
 
268
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
269
  in_session->setStatementMessage(NULL);
189
270
}
190
271
 
191
272
void ReplicationServices::rollbackTransaction(Session *in_session)
193
274
  if (! is_active)
194
275
    return;
195
276
  
196
 
  message::Command command;
197
 
  command.set_type(message::Command::ROLLBACK);
198
 
  command.set_timestamp(in_session->getCurrentTimestamp());
199
 
 
200
 
  setCommandTransactionContext(command, in_session);
 
277
  message::Transaction *transaction= getActiveTransaction(in_session);
 
278
 
 
279
  /* 
 
280
   * We clear the current transaction, reset its transaction ID properly, 
 
281
   * then set its type to ROLLBACK and push it out to the replicators.
 
282
   */
 
283
  transaction->Clear();
 
284
  initTransaction(*transaction, in_session);
 
285
 
 
286
  message::Statement *statement= transaction->add_statement();
 
287
 
 
288
  initStatement(*statement, message::Statement::ROLLBACK, in_session);
 
289
  finalizeStatement(*statement, in_session);
 
290
 
 
291
  finalizeTransaction(*transaction, in_session);
201
292
  
202
 
  push(command);
 
293
  push(*transaction);
 
294
 
 
295
  cleanupTransaction(transaction, in_session);
 
296
}
 
297
 
 
298
message::Statement &ReplicationServices::getInsertStatement(Session *in_session,
 
299
                                                            Table *in_table) const
 
300
{
 
301
  message::Statement *statement= in_session->getStatementMessage();
 
302
 
 
303
  if (statement == NULL)
 
304
  {
 
305
    message::Transaction *transaction= getActiveTransaction(in_session);
 
306
    /* 
 
307
     * Transaction message initialized and set, but no statement created
 
308
     * yet.  We construct one and initialize it, here, then return the
 
309
     * message after attaching the new Statement message pointer to the 
 
310
     * Session for easy retrieval later...
 
311
     */
 
312
    statement= transaction->add_statement();
 
313
    setInsertHeader(*statement, in_session, in_table);
 
314
    in_session->setStatementMessage(statement);
 
315
  }
 
316
  return *statement;
 
317
}
 
318
 
 
319
void ReplicationServices::setInsertHeader(message::Statement &statement,
 
320
                                          Session *in_session,
 
321
                                          Table *in_table) const
 
322
{
 
323
  initStatement(statement, message::Statement::INSERT, in_session);
 
324
 
 
325
  /* 
 
326
   * Now we construct the specialized InsertHeader message inside
 
327
   * the generalized message::Statement container...
 
328
   */
 
329
  /* Set up the insert header */
 
330
  message::InsertHeader *header= statement.mutable_insert_header();
 
331
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
332
 
 
333
  const char *schema_name= in_table->getShare()->db.str;
 
334
  const char *table_name= in_table->getShare()->table_name.str;
 
335
 
 
336
  table_metadata->set_schema_name(schema_name);
 
337
  table_metadata->set_table_name(table_name);
 
338
 
 
339
  Field *current_field;
 
340
  Field **table_fields= in_table->field;
 
341
 
 
342
  message::FieldMetadata *field_metadata;
 
343
 
 
344
  /* We will read all the table's fields... */
 
345
  in_table->setReadSet();
 
346
 
 
347
  while ((current_field= *table_fields++) != NULL) 
 
348
  {
 
349
    field_metadata= header->add_field_metadata();
 
350
    field_metadata->set_name(current_field->field_name);
 
351
    field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
352
  }
203
353
}
204
354
 
205
355
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
207
357
  if (! is_active)
208
358
    return;
209
359
 
210
 
  message::Command command;
211
 
  command.set_type(message::Command::INSERT);
212
 
  command.set_timestamp(in_session->getCurrentTimestamp());
213
 
 
214
 
  setCommandTransactionContext(command, in_session);
215
 
 
216
 
  const char *schema_name= in_table->getShare()->db.str;
217
 
  const char *table_name= in_table->getShare()->table_name.str;
218
 
 
219
 
  command.set_schema(schema_name);
220
 
  command.set_table(table_name);
221
 
 
222
 
  /* 
223
 
   * Now we construct the specialized InsertRecord command inside
224
 
   * the message::Command container...
225
 
   */
226
 
  message::InsertRecord *change_record= command.mutable_insert_record();
 
360
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
361
 
 
362
  message::InsertData *data= statement.mutable_insert_data();
 
363
  data->set_segment_id(1);
 
364
  data->set_end_segment(true);
 
365
  message::InsertRecord *record= data->add_record();
227
366
 
228
367
  Field *current_field;
229
368
  Field **table_fields= in_table->field;
 
369
 
230
370
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
231
371
  string_value->set_charset(system_charset_info);
232
372
 
233
 
  message::Table::Field *current_proto_field;
234
 
 
235
373
  /* We will read all the table's fields... */
236
374
  in_table->setReadSet();
237
375
 
238
376
  while ((current_field= *table_fields++) != NULL) 
239
377
  {
240
 
    current_proto_field= change_record->add_insert_field();
241
 
    current_proto_field->set_name(current_field->field_name);
242
 
    current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
243
378
    string_value= current_field->val_str(string_value);
244
 
    change_record->add_insert_value(string_value->c_ptr());
 
379
    record->add_insert_value(string_value->c_ptr());
245
380
    string_value->free();
246
381
  }
247
 
  
248
 
  push(command);
249
 
}
250
 
 
 
382
}
 
383
 
 
384
message::Statement &ReplicationServices::getUpdateStatement(Session *in_session,
 
385
                                                            Table *in_table,
 
386
                                                            const unsigned char *old_record, 
 
387
                                                            const unsigned char *new_record) const
 
388
{
 
389
  message::Statement *statement= in_session->getStatementMessage();
 
390
 
 
391
  if (statement == NULL)
 
392
  {
 
393
    message::Transaction *transaction= getActiveTransaction(in_session);
 
394
    /* 
 
395
     * Transaction message initialized and set, but no statement created
 
396
     * yet.  We construct one and initialize it, here, then return the
 
397
     * message after attaching the new Statement message pointer to the 
 
398
     * Session for easy retrieval later...
 
399
     */
 
400
    statement= transaction->add_statement();
 
401
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
402
    in_session->setStatementMessage(statement);
 
403
  }
 
404
  return *statement;
 
405
}
 
406
 
 
407
void ReplicationServices::setUpdateHeader(message::Statement &statement,
 
408
                                          Session *in_session,
 
409
                                          Table *in_table,
 
410
                                          const unsigned char *old_record, 
 
411
                                          const unsigned char *new_record) const
 
412
{
 
413
  initStatement(statement, message::Statement::UPDATE, in_session);
 
414
 
 
415
  /* 
 
416
   * Now we construct the specialized UpdateHeader message inside
 
417
   * the generalized message::Statement container...
 
418
   */
 
419
  /* Set up the update header */
 
420
  message::UpdateHeader *header= statement.mutable_update_header();
 
421
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
422
 
 
423
  const char *schema_name= in_table->getShare()->db.str;
 
424
  const char *table_name= in_table->getShare()->table_name.str;
 
425
 
 
426
  table_metadata->set_schema_name(schema_name);
 
427
  table_metadata->set_table_name(table_name);
 
428
 
 
429
  Field *current_field;
 
430
  Field **table_fields= in_table->field;
 
431
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
 
432
  string_value->set_charset(system_charset_info);
 
433
 
 
434
  message::FieldMetadata *field_metadata;
 
435
 
 
436
  /* We will read all the table's fields... */
 
437
  in_table->setReadSet();
 
438
 
 
439
  while ((current_field= *table_fields++) != NULL) 
 
440
  {
 
441
    /*
 
442
     * We add the "key field metadata" -- i.e. the fields which is
 
443
     * the primary key for the table.
 
444
     */
 
445
    if (in_table->s->primary_key == current_field->field_index)
 
446
    {
 
447
      field_metadata= header->add_key_field_metadata();
 
448
      field_metadata->set_name(current_field->field_name);
 
449
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
450
    }
 
451
 
 
452
    /*
 
453
     * The below really should be moved into the Field API and Record API.  But for now
 
454
     * we do this crazy pointer fiddling to figure out if the current field
 
455
     * has been updated in the supplied record raw byte pointers.
 
456
     */
 
457
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
458
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
 
459
 
 
460
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
461
 
 
462
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
463
    {
 
464
      /* Field is changed from old to new */
 
465
      field_metadata= header->add_set_field_metadata();
 
466
      field_metadata->set_name(current_field->field_name);
 
467
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
468
    }
 
469
  }
 
470
}
251
471
void ReplicationServices::updateRecord(Session *in_session,
252
472
                                       Table *in_table, 
253
473
                                       const unsigned char *old_record, 
255
475
{
256
476
  if (! is_active)
257
477
    return;
258
 
  
259
 
  message::Command command;
260
 
  command.set_type(message::Command::UPDATE);
261
 
  command.set_timestamp(in_session->getCurrentTimestamp());
262
 
 
263
 
  setCommandTransactionContext(command, in_session);
264
 
 
265
 
  const char *schema_name= in_table->getShare()->db.str;
266
 
  const char *table_name= in_table->getShare()->table_name.str;
267
 
 
268
 
  command.set_schema(schema_name);
269
 
  command.set_table(table_name);
270
 
 
271
 
  /* 
272
 
   * Now we construct the specialized UpdateRecord command inside
273
 
   * the message::Command container...
274
 
   */
275
 
  message::UpdateRecord *change_record= command.mutable_update_record();
 
478
 
 
479
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
480
 
 
481
  message::UpdateData *data= statement.mutable_update_data();
 
482
  data->set_segment_id(1);
 
483
  data->set_end_segment(true);
 
484
  message::UpdateRecord *record= data->add_record();
276
485
 
277
486
  Field *current_field;
278
487
  Field **table_fields= in_table->field;
279
488
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
280
489
  string_value->set_charset(system_charset_info);
281
490
 
282
 
  message::Table::Field *current_proto_field;
283
 
 
284
491
  while ((current_field= *table_fields++) != NULL) 
285
492
  {
286
493
    /*
 
494
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
495
     * but then realized that an UPDATE statement could potentially have different values for
 
496
     * the SET field.  For instance, imagine this SQL scenario:
 
497
     *
 
498
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
499
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
500
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
501
     *
 
502
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
503
     *
287
504
     * The below really should be moved into the Field API and Record API.  But for now
288
505
     * we do this crazy pointer fiddling to figure out if the current field
289
506
     * has been updated in the supplied record raw byte pointers.
295
512
 
296
513
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
297
514
    {
298
 
      /* Field is changed from old to new */
299
 
      current_proto_field= change_record->add_update_field();
300
 
      current_proto_field->set_name(current_field->field_name);
301
 
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
302
 
 
303
515
      /* Store the original "read bit" for this field */
304
516
      bool is_read_set= current_field->isReadSet();
305
517
 
315
527
       */
316
528
      current_field->setReadSet(is_read_set);
317
529
 
318
 
      change_record->add_after_value(string_value->c_ptr());
 
530
      record->add_after_value(string_value->c_ptr());
319
531
      string_value->free();
320
532
    }
321
533
 
326
538
     */
327
539
    if (current_field->isReadSet())
328
540
    {
329
 
      current_proto_field= change_record->add_where_field();
330
 
      current_proto_field->set_name(current_field->field_name);
331
 
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
332
541
      string_value= current_field->val_str(string_value);
333
 
      change_record->add_where_value(string_value->c_ptr());
 
542
      record->add_key_value(string_value->c_ptr());
 
543
      /**
 
544
       * @TODO Store optional old record value in the before data member
 
545
       */
334
546
      string_value->free();
335
547
    }
336
 
  }
337
 
 
338
 
  push(command);
 
548
 
 
549
  }
 
550
}
 
551
 
 
552
message::Statement &ReplicationServices::getDeleteStatement(Session *in_session,
 
553
                                                            Table *in_table) const
 
554
{
 
555
  message::Statement *statement= in_session->getStatementMessage();
 
556
 
 
557
  if (statement == NULL)
 
558
  {
 
559
    message::Transaction *transaction= getActiveTransaction(in_session);
 
560
    /* 
 
561
     * Transaction message initialized and set, but no statement created
 
562
     * yet.  We construct one and initialize it, here, then return the
 
563
     * message after attaching the new Statement message pointer to the 
 
564
     * Session for easy retrieval later...
 
565
     */
 
566
    statement= transaction->add_statement();
 
567
    setDeleteHeader(*statement, in_session, in_table);
 
568
    in_session->setStatementMessage(statement);
 
569
  }
 
570
  return *statement;
 
571
}
 
572
 
 
573
void ReplicationServices::setDeleteHeader(message::Statement &statement,
 
574
                                          Session *in_session,
 
575
                                          Table *in_table) const
 
576
{
 
577
  initStatement(statement, message::Statement::DELETE, in_session);
 
578
 
 
579
  /* 
 
580
   * Now we construct the specialized DeleteHeader message inside
 
581
   * the generalized message::Statement container...
 
582
   */
 
583
  message::DeleteHeader *header= statement.mutable_delete_header();
 
584
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
585
 
 
586
  const char *schema_name= in_table->getShare()->db.str;
 
587
  const char *table_name= in_table->getShare()->table_name.str;
 
588
 
 
589
  table_metadata->set_schema_name(schema_name);
 
590
  table_metadata->set_table_name(table_name);
 
591
 
 
592
  Field *current_field;
 
593
  Field **table_fields= in_table->field;
 
594
 
 
595
  message::FieldMetadata *field_metadata;
 
596
 
 
597
  while ((current_field= *table_fields++) != NULL) 
 
598
  {
 
599
    /* 
 
600
     * Add the WHERE clause values now...for now, this means the
 
601
     * primary key field value.  Replication only supports tables
 
602
     * with a primary key.
 
603
     */
 
604
    if (in_table->s->primary_key == current_field->field_index)
 
605
    {
 
606
      field_metadata= header->add_key_field_metadata();
 
607
      field_metadata->set_name(current_field->field_name);
 
608
      field_metadata->set_type(internalFieldTypeToFieldProtoType(current_field->type()));
 
609
    }
 
610
  }
339
611
}
340
612
 
341
613
void ReplicationServices::deleteRecord(Session *in_session, Table *in_table)
342
614
{
343
615
  if (! is_active)
344
616
    return;
345
 
  
346
 
  message::Command command;
347
 
  command.set_type(message::Command::DELETE);
348
 
  command.set_timestamp(in_session->getCurrentTimestamp());
349
 
 
350
 
  setCommandTransactionContext(command, in_session);
351
 
 
352
 
  const char *schema_name= in_table->getShare()->db.str;
353
 
  const char *table_name= in_table->getShare()->table_name.str;
354
 
 
355
 
  command.set_schema(schema_name);
356
 
  command.set_table(table_name);
357
 
 
358
 
  /* 
359
 
   * Now we construct the specialized DeleteRecord command inside
360
 
   * the message::Command container...
361
 
   */
362
 
  message::DeleteRecord *change_record= command.mutable_delete_record();
363
 
 
 
617
 
 
618
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
619
 
 
620
  message::DeleteData *data= statement.mutable_delete_data();
 
621
  data->set_segment_id(1);
 
622
  data->set_end_segment(true);
 
623
  message::DeleteRecord *record= data->add_record();
 
624
 
364
625
  Field *current_field;
365
626
  Field **table_fields= in_table->field;
366
627
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
367
628
  string_value->set_charset(system_charset_info);
368
629
 
369
 
  message::Table::Field *current_proto_field;
370
 
 
371
 
  while ((current_field= *table_fields++) != NULL)
 
630
  while ((current_field= *table_fields++) != NULL) 
372
631
  {
373
 
    /*
374
 
     * Add the WHERE clause values now...the fields which return true
375
 
     * for isReadSet() are in the WHERE clause.  For tables with no
376
 
     * primary or unique key, all fields will be returned.
 
632
    /* 
 
633
     * Add the WHERE clause values now...for now, this means the
 
634
     * primary key field value.  Replication only supports tables
 
635
     * with a primary key.
377
636
     */
378
 
    if (current_field->isReadSet())
 
637
    if (in_table->s->primary_key == current_field->field_index)
379
638
    {
380
 
      current_proto_field= change_record->add_where_field();
381
 
      current_proto_field->set_name(current_field->field_name);
382
 
      current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
383
639
      string_value= current_field->val_str(string_value);
384
 
      change_record->add_where_value(string_value->c_ptr());
 
640
      record->add_key_value(string_value->c_ptr());
 
641
      /**
 
642
       * @TODO Store optional old record value in the before data member
 
643
       */
385
644
      string_value->free();
386
645
    }
387
646
  }
388
 
 
389
 
  push(command);
390
647
}
391
648
 
392
649
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
394
651
  if (! is_active)
395
652
    return;
396
653
  
397
 
  message::Command command;
398
 
  command.set_type(message::Command::RAW_SQL);
399
 
  command.set_timestamp(in_session->getCurrentTimestamp());
400
 
 
401
 
  setCommandTransactionContext(command, in_session);
402
 
 
 
654
  message::Transaction *transaction= getActiveTransaction(in_session);
 
655
  message::Statement *statement= transaction->add_statement();
 
656
 
 
657
  initStatement(*statement, message::Statement::RAW_SQL, in_session);
403
658
  string query(in_query, in_query_len);
404
 
  command.set_sql(query);
405
 
 
406
 
  push(command);
 
659
  statement->set_sql(query);
 
660
  finalizeStatement(*statement, in_session);
 
661
 
 
662
  finalizeTransaction(*transaction, in_session);
 
663
  
 
664
  push(*transaction);
 
665
 
 
666
  cleanupTransaction(transaction, in_session);
407
667
}
408
668
 
409
 
void ReplicationServices::push(drizzled::message::Command &to_push)
 
669
void ReplicationServices::push(drizzled::message::Transaction &to_push)
410
670
{
411
 
  vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
412
 
  vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
 
671
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
 
672
  vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
413
673
  appl_start_iter= appliers.begin();
414
674
 
415
 
  plugin::CommandReplicator *cur_repl;
416
 
  plugin::CommandApplier *cur_appl;
 
675
  plugin::TransactionReplicator *cur_repl;
 
676
  plugin::TransactionApplier *cur_appl;
417
677
 
418
678
  while (repl_iter != replicators.end())
419
679
  {
438
698
      cur_repl->replicate(cur_appl, to_push);
439
699
      
440
700
      /* 
441
 
       * We update the timestamp for the last applied Command so that
 
701
       * We update the timestamp for the last applied Transaction so that
442
702
       * publisher plugins can ask the replication services when the
443
 
       * last known applied Command was using the getLastAppliedTimestamp()
 
703
       * last known applied Transaction was using the getLastAppliedTimestamp()
444
704
       * method.
445
705
       */
446
 
      last_applied_timestamp.fetch_and_store(to_push.timestamp());
 
706
      last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
447
707
      ++appl_iter;
448
708
    }
449
709
    ++repl_iter;
450
710
  }
451
711
}
452
712
 
 
713
static message::Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types in_type)
 
714
{
 
715
  switch (in_type)
 
716
  {
 
717
    case DRIZZLE_TYPE_LONGLONG:
 
718
      return message::Table::Field::BIGINT;
 
719
    case DRIZZLE_TYPE_LONG:
 
720
      return message::Table::Field::INTEGER;
 
721
    case DRIZZLE_TYPE_NEWDECIMAL:
 
722
      return message::Table::Field::DECIMAL;
 
723
    case DRIZZLE_TYPE_DOUBLE:
 
724
      return message::Table::Field::DOUBLE;
 
725
    case DRIZZLE_TYPE_DATE:
 
726
      return message::Table::Field::DATE;
 
727
    case DRIZZLE_TYPE_DATETIME:
 
728
      return message::Table::Field::DATETIME;
 
729
    case DRIZZLE_TYPE_TIMESTAMP:
 
730
      return message::Table::Field::TIMESTAMP;
 
731
    case DRIZZLE_TYPE_VARCHAR:
 
732
      return message::Table::Field::VARCHAR;
 
733
    default:
 
734
      return message::Table::Field::VARCHAR;
 
735
  }
 
736
}
 
737
 
453
738
} /* namespace drizzled */