~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication_services.cc

Merging Jay

Show diffs side-by-side

added added

removed removed

Lines of Context:
94
94
   * replicators are active...if not, set is_active
95
95
   * to false
96
96
   */
97
 
  vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
98
 
  while (repl_iter != replicators.end())
 
97
  for (Replicators::iterator repl_iter= replicators.begin();
 
98
       repl_iter != replicators.end();
 
99
       ++repl_iter)
99
100
  {
100
101
    if ((*repl_iter)->isEnabled())
101
102
    {
102
103
      tmp_is_active= true;
103
104
      break;
104
105
    }
105
 
    ++repl_iter;
106
106
  }
107
107
  if (! tmp_is_active)
108
108
  {
118
118
   * replicators are active...if not, set is_active
119
119
   * to false
120
120
   */
121
 
  vector<plugin::TransactionApplier *>::iterator appl_iter= appliers.begin();
122
 
  while (appl_iter != appliers.end())
 
121
  for (Appliers::iterator appl_iter= appliers.begin();
 
122
       appl_iter != appliers.end();
 
123
       ++appl_iter)
123
124
  {
124
125
    if ((*appl_iter)->isEnabled())
125
126
    {
126
127
      is_active= true;
127
128
      return;
128
129
    }
129
 
    ++appl_iter;
130
130
  }
131
131
  /* If we get here, there are no active appliers */
132
132
  is_active= false;
205
205
  in_session->setTransactionMessage(NULL);
206
206
}
207
207
 
208
 
void ReplicationServices::startNormalTransaction(Session *in_session)
 
208
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
209
209
{
210
 
  if (! is_active)
211
 
    return;
 
210
  size_t num_statements= transaction.statement_size();
 
211
  if (num_statements == 0)
 
212
    return false;
212
213
 
213
 
  /* Safeguard...other transactions should have already been closed */
214
 
  message::Transaction *transaction= in_session->getTransactionMessage();
215
 
  assert(transaction == NULL);
216
 
  
217
 
  /* 
218
 
   * A "normal" transaction for the replication services component
219
 
   * is simply a Transaction message, nothing more...so we create a
220
 
   * new message and attach it to the Session object.
221
 
   *
222
 
   * Allocate and initialize a new transaction message 
223
 
   * for this Session object.  This memory is deleted when the
224
 
   * transaction is pushed out to replicators.  Session is NOT
225
 
   * responsible for deleting this memory.
 
214
  /*
 
215
   * Only INSERT, UPDATE, and DELETE statements can possibly
 
216
   * have bulk segments.  So, we loop through the statements
 
217
   * checking for segment_id > 1 in those specific submessages.
226
218
   */
227
 
  transaction= new (nothrow) message::Transaction();
228
 
  initTransaction(*transaction, in_session);
229
 
  in_session->setTransactionMessage(transaction);
 
219
  size_t x;
 
220
  for (x= 0; x < num_statements; ++x)
 
221
  {
 
222
    const message::Statement &statement= transaction.statement(x);
 
223
    message::Statement::Type type= statement.type();
 
224
 
 
225
    switch (type)
 
226
    {
 
227
      case message::Statement::INSERT:
 
228
        if (statement.insert_data().segment_id() > 1)
 
229
          return true;
 
230
        break;
 
231
      case message::Statement::UPDATE:
 
232
        if (statement.update_data().segment_id() > 1)
 
233
          return true;
 
234
        break;
 
235
      case message::Statement::DELETE:
 
236
        if (statement.delete_data().segment_id() > 1)
 
237
          return true;
 
238
        break;
 
239
      default:
 
240
        break;
 
241
    }
 
242
  }
 
243
  return false;
230
244
}
231
 
 
232
 
void ReplicationServices::commitNormalTransaction(Session *in_session)
 
245
void ReplicationServices::commitTransaction(Session *in_session)
233
246
{
234
247
  if (! is_active)
235
248
    return;
276
289
  
277
290
  message::Transaction *transaction= getActiveTransaction(in_session);
278
291
 
279
 
  /* 
280
 
   * We clear the current transaction, reset its transaction ID properly, 
281
 
   * then set its type to ROLLBACK and push it out to the replicators.
 
292
  /*
 
293
   * OK, so there are two situations that we need to deal with here:
 
294
   *
 
295
   * 1) We receive an instruction to ROLLBACK the current transaction
 
296
   *    and the currently-stored Transaction message is *self-contained*, 
 
297
   *    meaning that no Statement messages in the Transaction message
 
298
   *    contain a message having its segment_id member greater than 1.  If
 
299
   *    no non-segment ID 1 members are found, we can simply clear the
 
300
   *    current Transaction message and remove it from memory.
 
301
   *
 
302
   * 2) If the Transaction message does indeed have a non-end segment, that
 
303
   *    means that a bulk update/delete/insert Transaction message segment
 
304
   *    has previously been sent over the wire to replicators.  In this case, 
 
305
   *    we need to package a Transaction with a Statement message of type
 
306
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
307
   *    messages must be un-applied.
282
308
   */
283
 
  transaction->Clear();
284
 
  initTransaction(*transaction, in_session);
285
 
 
286
 
  message::Statement *statement= transaction->add_statement();
287
 
 
288
 
  initStatement(*statement, message::Statement::ROLLBACK, in_session);
289
 
  finalizeStatement(*statement, in_session);
290
 
 
291
 
  finalizeTransaction(*transaction, in_session);
292
 
  
293
 
  push(*transaction);
294
 
 
 
309
  if (unlikely(transactionContainsBulkSegment(*transaction)))
 
310
  {
 
311
    /*
 
312
     * Clear the transaction, create a Rollback statement message, 
 
313
     * attach it to the transaction, and push it to replicators.
 
314
     */
 
315
    transaction->Clear();
 
316
    initTransaction(*transaction, in_session);
 
317
 
 
318
    message::Statement *statement= transaction->add_statement();
 
319
 
 
320
    initStatement(*statement, message::Statement::ROLLBACK, in_session);
 
321
    finalizeStatement(*statement, in_session);
 
322
 
 
323
    finalizeTransaction(*transaction, in_session);
 
324
    
 
325
    push(*transaction);
 
326
  }
295
327
  cleanupTransaction(transaction, in_session);
296
328
}
297
329
 
532
564
    }
533
565
 
534
566
    /* 
535
 
     * Add the WHERE clause values now...the fields which return true
536
 
     * for isReadSet() are in the WHERE clause.  For tables with no
537
 
     * primary or unique key, all fields will be returned.
 
567
     * Add the WHERE clause values now...for now, this means the
 
568
     * primary key field value.  Replication only supports tables
 
569
     * with a primary key.
538
570
     */
539
 
    if (current_field->isReadSet())
 
571
    if (in_table->s->primary_key == current_field->field_index)
540
572
    {
541
 
      string_value= current_field->val_str(string_value);
542
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
543
573
      /**
544
 
       * @TODO Store optional old record value in the before data member
 
574
       * To say the below is ugly is an understatement. But it works.
 
575
       * 
 
576
       * @todo Move this crap into a real Record API.
545
577
       */
 
578
      string_value= current_field->val_str(string_value,
 
579
                                           old_record + 
 
580
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
581
      record->add_key_value(string_value->c_ptr(), string_value->length());
546
582
      string_value->free();
547
583
    }
548
584
 
646
682
  }
647
683
}
648
684
 
 
685
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
 
686
{
 
687
  if (! is_active)
 
688
    return;
 
689
  
 
690
  message::Transaction *transaction= getActiveTransaction(in_session);
 
691
  message::Statement *statement= transaction->add_statement();
 
692
 
 
693
  initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
694
 
 
695
  /* 
 
696
   * Construct the specialized TruncateTableStatement message and attach
 
697
   * it to the generic Statement message
 
698
   */
 
699
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
700
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
701
 
 
702
  const char *schema_name= in_table->getShare()->db.str;
 
703
  const char *table_name= in_table->getShare()->table_name.str;
 
704
 
 
705
  table_metadata->set_schema_name(schema_name);
 
706
  table_metadata->set_table_name(table_name);
 
707
 
 
708
  finalizeStatement(*statement, in_session);
 
709
 
 
710
  finalizeTransaction(*transaction, in_session);
 
711
  
 
712
  push(*transaction);
 
713
 
 
714
  cleanupTransaction(transaction, in_session);
 
715
}
 
716
 
649
717
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)
650
718
{
651
719
  if (! is_active)