94
94
* replicators are active...if not, set is_active
97
vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
98
while (repl_iter != replicators.end())
97
for (Replicators::iterator repl_iter= replicators.begin();
98
repl_iter != replicators.end();
100
101
if ((*repl_iter)->isEnabled())
102
103
tmp_is_active= true;
107
107
if (! tmp_is_active)
118
118
* replicators are active...if not, set is_active
121
vector<plugin::TransactionApplier *>::iterator appl_iter= appliers.begin();
122
while (appl_iter != appliers.end())
121
for (Appliers::iterator appl_iter= appliers.begin();
122
appl_iter != appliers.end();
124
125
if ((*appl_iter)->isEnabled())
131
131
/* If we get here, there are no active appliers */
132
132
is_active= false;
205
205
in_session->setTransactionMessage(NULL);
208
void ReplicationServices::startNormalTransaction(Session *in_session)
208
bool ReplicationServices::transactionContainsBulkSegment(const message::Transaction &transaction) const
210
size_t num_statements= transaction.statement_size();
211
if (num_statements == 0)
213
/* Safeguard...other transactions should have already been closed */
214
message::Transaction *transaction= in_session->getTransactionMessage();
215
assert(transaction == NULL);
218
* A "normal" transaction for the replication services component
219
* is simply a Transaction message, nothing more...so we create a
220
* new message and attach it to the Session object.
222
* Allocate and initialize a new transaction message
223
* for this Session object. This memory is deleted when the
224
* transaction is pushed out to replicators. Session is NOT
225
* responsible for deleting this memory.
215
* Only INSERT, UPDATE, and DELETE statements can possibly
216
* have bulk segments. So, we loop through the statements
217
* checking for segment_id > 1 in those specific submessages.
227
transaction= new (nothrow) message::Transaction();
228
initTransaction(*transaction, in_session);
229
in_session->setTransactionMessage(transaction);
220
for (x= 0; x < num_statements; ++x)
222
const message::Statement &statement= transaction.statement(x);
223
message::Statement::Type type= statement.type();
227
case message::Statement::INSERT:
228
if (statement.insert_data().segment_id() > 1)
231
case message::Statement::UPDATE:
232
if (statement.update_data().segment_id() > 1)
235
case message::Statement::DELETE:
236
if (statement.delete_data().segment_id() > 1)
232
void ReplicationServices::commitNormalTransaction(Session *in_session)
245
void ReplicationServices::commitTransaction(Session *in_session)
277
290
message::Transaction *transaction= getActiveTransaction(in_session);
280
* We clear the current transaction, reset its transaction ID properly,
281
* then set its type to ROLLBACK and push it out to the replicators.
293
* OK, so there are two situations that we need to deal with here:
295
* 1) We receive an instruction to ROLLBACK the current transaction
296
* and the currently-stored Transaction message is *self-contained*,
297
* meaning that no Statement messages in the Transaction message
298
* contain a message having its segment_id member greater than 1. If
299
* no non-segment ID 1 members are found, we can simply clear the
300
* current Transaction message and remove it from memory.
302
* 2) If the Transaction message does indeed have a non-end segment, that
303
* means that a bulk update/delete/insert Transaction message segment
304
* has previously been sent over the wire to replicators. In this case,
305
* we need to package a Transaction with a Statement message of type
306
* ROLLBACK to indicate to replicators that previously-transmitted
307
* messages must be un-applied.
283
transaction->Clear();
284
initTransaction(*transaction, in_session);
286
message::Statement *statement= transaction->add_statement();
288
initStatement(*statement, message::Statement::ROLLBACK, in_session);
289
finalizeStatement(*statement, in_session);
291
finalizeTransaction(*transaction, in_session);
309
if (unlikely(transactionContainsBulkSegment(*transaction)))
312
* Clear the transaction, create a Rollback statement message,
313
* attach it to the transaction, and push it to replicators.
315
transaction->Clear();
316
initTransaction(*transaction, in_session);
318
message::Statement *statement= transaction->add_statement();
320
initStatement(*statement, message::Statement::ROLLBACK, in_session);
321
finalizeStatement(*statement, in_session);
323
finalizeTransaction(*transaction, in_session);
295
327
cleanupTransaction(transaction, in_session);
535
* Add the WHERE clause values now...the fields which return true
536
* for isReadSet() are in the WHERE clause. For tables with no
537
* primary or unique key, all fields will be returned.
567
* Add the WHERE clause values now...for now, this means the
568
* primary key field value. Replication only supports tables
569
* with a primary key.
539
if (current_field->isReadSet())
571
if (in_table->s->primary_key == current_field->field_index)
541
string_value= current_field->val_str(string_value);
542
record->add_key_value(string_value->c_ptr(), string_value->length());
544
* @TODO Store optional old record value in the before data member
574
* To say the below is ugly is an understatement. But it works.
576
* @todo Move this crap into a real Record API.
578
string_value= current_field->val_str(string_value,
580
current_field->offset(const_cast<unsigned char *>(new_record)));
581
record->add_key_value(string_value->c_ptr(), string_value->length());
546
582
string_value->free();
685
void ReplicationServices::truncateTable(Session *in_session, Table *in_table)
690
message::Transaction *transaction= getActiveTransaction(in_session);
691
message::Statement *statement= transaction->add_statement();
693
initStatement(*statement, message::Statement::TRUNCATE_TABLE, in_session);
696
* Construct the specialized TruncateTableStatement message and attach
697
* it to the generic Statement message
699
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
700
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
702
const char *schema_name= in_table->getShare()->db.str;
703
const char *table_name= in_table->getShare()->table_name.str;
705
table_metadata->set_schema_name(schema_name);
706
table_metadata->set_table_name(table_name);
708
finalizeStatement(*statement, in_session);
710
finalizeTransaction(*transaction, in_session);
714
cleanupTransaction(transaction, in_session);
649
717
void ReplicationServices::rawStatement(Session *in_session, const char *in_query, size_t in_query_len)