1047
1053
if (! replication_services.isActive())
1050
/* If there is an active statement message, finalize it */
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (in_session->getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
1051
1064
message::Statement *statement= in_session->getStatementMessage();
1053
1066
if (statement != NULL)
1055
1068
finalizeStatementMessage(*statement, in_session);
1058
return 0; /* No data modification occurred inside the transaction */
1060
1071
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, in_session);
1062
1085
finalizeTransactionMessage(*transaction, in_session);
1064
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1138
1161
cleanupTransactionMessage(transaction, in_session);
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1166
ReplicationServices &replication_services= ReplicationServices::singleton();
1167
if (! replication_services.isActive())
1170
message::Statement *current_statement= in_session->getStatementMessage();
1172
/* If we never added a Statement message, nothing to undo. */
1173
if (current_statement == NULL)
1177
* If the Statement has been segmented, then we've already pushed a portion
1178
* of this Statement's row changes through the replication stream and we
1179
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
* delete the current Statement message.
1182
bool is_segmented= false;
1184
switch (current_statement->type())
1186
case message::Statement::INSERT:
1187
if (current_statement->insert_data().segment_id() > 1)
1191
case message::Statement::UPDATE:
1192
if (current_statement->update_data().segment_id() > 1)
1196
case message::Statement::DELETE:
1197
if (current_statement->delete_data().segment_id() > 1)
1206
* Remove the Statement message we've been working with (same as
1207
* current_statement).
1209
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
statements_in_txn= transaction->mutable_statement();
1212
statements_in_txn->RemoveLast();
1213
in_session->setStatementMessage(NULL);
1216
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
* an indicator to cancel the previous Statement message which should have
1218
* had its end_segment attribute set to false.
1222
current_statement= transaction->add_statement();
1223
initStatementMessage(*current_statement,
1224
message::Statement::ROLLBACK_STATEMENT,
1226
finalizeStatementMessage(*current_statement, in_session);
1141
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1142
1231
Table *in_table,
1143
1232
uint32_t *next_segment_id)
1415
1490
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1417
/* append this UPDATE query string */
1418
if (in_session->variables.replicate_query)
1420
string s(statement->sql());
1424
s.append(in_session->getQueryString()->c_str());
1425
statement->set_sql(s);
1428
statement->set_sql(in_session->getQueryString()->c_str());
1431
1492
/* carry forward the existing segment id */
1432
1493
const message::UpdateData ¤t_data= statement->update_data();
1433
1494
*next_segment_id= current_data.segment_id();
1932
* Template for removing Statement records of different types.
1934
* The code for removing records from different Statement message types
1935
* is identical except for the class types that are embedded within the
1938
* There are 3 scenarios we need to look for:
1939
* - We've been asked to remove more records than exist in the Statement
1940
* - We've been asked to remove less records than exist in the Statement
1941
* - We've been asked to remove ALL records that exist in the Statement
1943
* If we are removing ALL records, then effectively we would be left with
1944
* an empty Statement message, so we should just remove it and clean up
1945
* message pointers in the Session object.
1947
template <class DataType, class RecordType>
1948
static bool removeStatementRecordsWithType(Session *session,
1952
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1954
/* If there aren't enough records to remove 'count' of them, error. */
1955
if (num_avail_recs < count)
1959
* If we are removing all of the data records, we'll just remove this
1960
* entire Statement message.
1962
if (num_avail_recs == count)
1964
message::Transaction *transaction= session->getTransactionMessage();
1965
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1966
statements->RemoveLast();
1969
* Now need to set the Session Statement pointer to either the previous
1970
* Statement, or NULL if there isn't one.
1972
if (statements->size() == 0)
1974
session->setStatementMessage(NULL);
1979
* There isn't a great way to get a pointer to the previous Statement
1980
* message using the RepeatedPtrField object, so we'll just get to it
1981
* using the Transaction message.
1983
int last_stmt_idx= transaction->statement_size() - 1;
1984
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1987
/* We only need to remove 'count' records */
1988
else if (num_avail_recs > count)
1990
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1992
records->RemoveLast();
1999
bool TransactionServices::removeStatementRecords(Session *session,
2002
ReplicationServices &replication_services= ReplicationServices::singleton();
2003
if (! replication_services.isActive())
2006
/* Get the most current Statement */
2007
message::Statement *statement= session->getStatementMessage();
2009
/* Make sure we have work to do */
2010
if (statement == NULL)
2015
switch (statement->type())
2017
case message::Statement::INSERT:
2019
message::InsertData *data= statement->mutable_insert_data();
2020
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
2024
case message::Statement::UPDATE:
2026
message::UpdateData *data= statement->mutable_update_data();
2027
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
2031
case message::Statement::DELETE: /* not sure if this one is possible... */
2033
message::DeleteData *data= statement->mutable_delete_data();
2034
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2047
1977
void TransactionServices::createTable(Session *in_session,
2048
1978
const message::Table &table)