316
void TransactionServices::registerResourceForStatement(Session *session,
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
317
plugin::MonitoredInTransaction *monitored,
318
318
plugin::TransactionalStorageEngine *engine)
320
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
320
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
323
323
* Now we automatically register this resource manager for the
329
329
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session->transaction.stmt;
333
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
332
TransactionContext *trans= &session.transaction.stmt;
333
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
335
335
if (resource_context->isStarted())
336
336
return; /* already registered, return */
345
345
trans->no_2pc|= true;
348
void TransactionServices::registerResourceForStatement(Session *session,
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
349
plugin::MonitoredInTransaction *monitored,
350
350
plugin::TransactionalStorageEngine *engine,
351
351
plugin::XaResourceManager *resource_manager)
353
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
353
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
356
356
* Now we automatically register this resource manager for the
362
362
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session->transaction.stmt;
366
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
365
TransactionContext *trans= &session.transaction.stmt;
366
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
368
368
if (resource_context->isStarted())
369
369
return; /* already registered, return */
379
379
trans->no_2pc|= false;
382
void TransactionServices::registerResourceForTransaction(Session *session,
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
383
plugin::MonitoredInTransaction *monitored,
384
384
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session->transaction.all;
387
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
386
TransactionContext *trans= &session.transaction.all;
387
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
389
389
if (resource_context->isStarted())
390
390
return; /* already registered, return */
392
session->server_status|= SERVER_STATUS_IN_TRANS;
392
session.server_status|= SERVER_STATUS_IN_TRANS;
394
394
trans->registerResource(resource_context);
400
400
resource_context->setTransactionalStorageEngine(engine);
401
401
trans->no_2pc|= true;
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
403
if (session.transaction.xid_state.xid.is_null())
404
session.transaction.xid_state.xid.set(session.getQueryId());
406
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session->getResourceContext(monitored, 0)->isStarted())
407
if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session *session,
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
plugin::MonitoredInTransaction *monitored,
413
413
plugin::TransactionalStorageEngine *engine,
414
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
416
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
419
419
if (resource_context->isStarted())
420
420
return; /* already registered, return */
422
session->server_status|= SERVER_STATUS_IN_TRANS;
422
session.server_status|= SERVER_STATUS_IN_TRANS;
424
424
trans->registerResource(resource_context);
430
430
resource_context->setTransactionalStorageEngine(engine);
431
431
trans->no_2pc|= true;
433
if (session->transaction.xid_state.xid.is_null())
434
session->transaction.xid_state.xid.set(session->getQueryId());
433
if (session.transaction.xid_state.xid.is_null())
434
session.transaction.xid_state.xid.set(session.getQueryId());
436
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
436
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
438
438
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session->getResourceContext(monitored, 0)->isStarted())
439
if (! session.getResourceContext(monitored, 0)->isStarted())
440
440
registerResourceForStatement(session, monitored, engine, resource_manager);
453
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
458
if (session->getXaId() == 0)
458
if (session.getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
463
return session->getXaId();
463
return session.getXaId();
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
466
int TransactionServices::commitTransaction(Session::reference session,
467
bool normal_transaction)
482
469
int error= 0, cookie= 0;
484
471
'all' means that this is either an explicit commit issued by
485
472
user, or an implicit commit issued by a DDL.
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
474
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
477
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
493
480
We must not commit the normal transaction if a statement
495
482
flags will not get propagated to its normal transaction's
498
assert(session->transaction.stmt.getResourceContexts().empty() ||
499
trans == &session->transaction.stmt);
485
assert(session.transaction.stmt.getResourceContexts().empty() ||
486
trans == &session.transaction.stmt);
501
488
if (resource_contexts.empty() == false)
503
if (is_real_trans && session->wait_if_global_read_lock(false, false))
490
if (is_real_trans && session.wait_if_global_read_lock(false, false))
505
492
rollbackTransaction(session, normal_transaction);
532
519
if (resource->participatesInXaTransaction())
534
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
521
if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
536
523
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
541
session->status_var.ha_prepare_count++;
528
session.status_var.ha_prepare_count++;
570
557
This function does not care about global read lock. A caller should.
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
559
int TransactionServices::commitPhaseOne(Session::reference session,
560
bool normal_transaction)
575
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
563
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
576
564
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
578
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
566
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
580
568
if (resource_contexts.empty() == false)
591
579
if (resource->participatesInXaTransaction())
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
581
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, normal_transaction)))
595
583
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
598
586
else if (normal_transaction)
600
session->status_var.ha_commit_count++;
588
session.status_var.ha_commit_count++;
603
591
else if (resource->participatesInSqlTransaction())
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
593
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, normal_transaction)))
607
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
610
598
else if (normal_transaction)
612
session->status_var.ha_commit_count++;
600
session.status_var.ha_commit_count++;
615
603
resource_context->reset(); /* keep it conveniently zero-filled */
618
606
if (is_real_trans)
619
session->transaction.xid_state.xid.null();
607
session.transaction.xid_state.xid.null();
621
609
if (normal_transaction)
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
611
session.variables.tx_isolation= session.session_tx_isolation;
612
session.transaction.cleanup();
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
619
int TransactionServices::rollbackTransaction(Session::reference session,
620
bool normal_transaction)
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
623
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
635
624
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
626
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
640
629
We must not rollback the normal transaction if a statement
641
630
transaction is pending.
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
632
assert(session.transaction.stmt.getResourceContexts().empty() ||
633
trans == &session.transaction.stmt);
646
635
if (resource_contexts.empty() == false)
657
646
if (resource->participatesInXaTransaction())
659
if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
648
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, normal_transaction)))
661
650
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
664
653
else if (normal_transaction)
666
session->status_var.ha_rollback_count++;
655
session.status_var.ha_rollback_count++;
669
658
else if (resource->participatesInSqlTransaction())
671
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
660
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, normal_transaction)))
673
662
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
676
665
else if (normal_transaction)
678
session->status_var.ha_rollback_count++;
667
session.status_var.ha_rollback_count++;
681
670
resource_context->reset(); /* keep it conveniently zero-filled */
694
683
rollbackStatementMessage(session);
696
685
if (is_real_trans)
697
session->transaction.xid_state.xid.null();
686
session.transaction.xid_state.xid.null();
698
687
if (normal_transaction)
700
session->variables.tx_isolation=session->session_tx_isolation;
701
session->transaction.cleanup();
689
session.variables.tx_isolation=session.session_tx_isolation;
690
session.transaction.cleanup();
704
693
if (normal_transaction)
705
session->transaction_rollback_request= false;
694
session.transaction_rollback_request= false;
708
697
* If a non-transactional table was updated, warn the user
710
699
if (is_real_trans &&
711
session->transaction.all.hasModifiedNonTransData() &&
712
session->getKilled() != Session::KILL_CONNECTION)
700
session.transaction.all.hasModifiedNonTransData() &&
701
session.getKilled() != Session::KILL_CONNECTION)
714
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
703
push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
704
ER_WARNING_NOT_COMPLETE_ROLLBACK,
716
705
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
723
This is used to commit or rollback a single statement depending on
727
Note that if the autocommit is on, then the following call inside
728
InnoDB will commit or rollback the whole transaction (= the statement). The
729
autocommit mechanism built into InnoDB is based on counting locks, but if
730
the user has used LOCK TABLES then that mechanism does not know to do the
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
711
int TransactionServices::autocommitOrRollback(Session::reference session,
735
714
/* One GPB Statement message per SQL statement */
736
message::Statement *statement= session->getStatementMessage();
715
message::Statement *statement= session.getStatementMessage();
737
716
if ((statement != NULL) && (! error))
738
717
finalizeStatementMessage(*statement, session);
740
if (session->transaction.stmt.getResourceContexts().empty() == false)
719
if (session.transaction.stmt.getResourceContexts().empty() == false)
742
TransactionContext *trans = &session->transaction.stmt;
721
TransactionContext *trans = &session.transaction.stmt;
743
722
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
723
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
724
it != resource_contexts.end();
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
759
int TransactionServices::rollbackToSavepoint(Session::reference session,
783
TransactionContext *trans= &session->transaction.all;
763
TransactionContext *trans= &session.transaction.all;
784
764
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
785
765
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
801
781
if (resource->participatesInSqlTransaction())
803
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
783
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
805
785
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
810
session->status_var.ha_savepoint_rollback_count++;
790
session.status_var.ha_savepoint_rollback_count++;
813
793
trans->no_2pc|= not resource->participatesInXaTransaction();
858
838
if (resource->participatesInSqlTransaction())
860
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
840
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
862
842
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
867
session->status_var.ha_rollback_count++;
847
session.status_var.ha_rollback_count++;
870
850
resource_context->reset(); /* keep it conveniently zero-filled */
887
867
uint32_t num_statements = savepoint_transaction_copy->statement_size();
888
868
if (num_statements == 0)
890
session->setStatementMessage(NULL);
870
session.setStatementMessage(NULL);
894
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
874
session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
896
session->setTransactionMessage(savepoint_transaction_copy);
876
session.setTransactionMessage(savepoint_transaction_copy);
906
886
section "4.33.4 SQL-statements and transaction states",
907
887
NamedSavepoint is *not* transaction-initiating SQL-statement
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
889
int TransactionServices::setSavepoint(Session::reference session,
912
TransactionContext *trans= &session->transaction.all;
893
TransactionContext *trans= &session.transaction.all;
913
894
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
915
896
if (resource_contexts.empty() == false)
926
907
if (resource->participatesInSqlTransaction())
928
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
909
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
930
911
my_error(ER_GET_ERRNO, MYF(0), err);
935
session->status_var.ha_savepoint_count++;
916
session.status_var.ha_savepoint_count++;
991
973
return replication_services.isActive();
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
976
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
977
bool should_inc_trx_id)
996
message::Transaction *transaction= in_session->getTransactionMessage();
979
message::Transaction *transaction= session.getTransactionMessage();
998
981
if (unlikely(transaction == NULL))
1003
986
* deleting transaction message when done with it.
1005
988
transaction= new (nothrow) message::Transaction();
1006
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
in_session->setTransactionMessage(transaction);
989
initTransactionMessage(*transaction, session, should_inc_trx_id);
990
session.setTransactionMessage(transaction);
1008
991
return transaction;
1011
994
return transaction;
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
Session *in_session,
997
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
998
Session::reference session,
1016
999
bool should_inc_trx_id)
1018
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
trx->set_server_id(in_session->getServerId());
1001
message::TransactionContext *trx= transaction.mutable_transaction_context();
1002
trx->set_server_id(session.getServerId());
1021
1004
if (should_inc_trx_id)
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1006
trx->set_transaction_id(getCurrentTransactionId(session));
1029
1012
trx->set_transaction_id(0);
1032
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1015
trx->set_start_timestamp(session.getCurrentTimestamp());
1034
1017
/* segment info may get set elsewhere as needed */
1035
in_transaction.set_segment_id(1);
1036
in_transaction.set_end_segment(true);
1039
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1040
Session *in_session)
1042
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1043
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1046
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1047
Session *in_session)
1049
delete in_transaction;
1050
in_session->setStatementMessage(NULL);
1051
in_session->setTransactionMessage(NULL);
1052
in_session->setXaId(0);
1055
int TransactionServices::commitTransactionMessage(Session *in_session)
1018
transaction.set_segment_id(1);
1019
transaction.set_end_segment(true);
1022
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1023
Session::const_reference session)
1025
message::TransactionContext *trx= transaction.mutable_transaction_context();
1026
trx->set_end_timestamp(session.getCurrentTimestamp());
1029
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1030
Session::reference session)
1033
session.setStatementMessage(NULL);
1034
session.setTransactionMessage(NULL);
1038
int TransactionServices::commitTransactionMessage(Session::reference session)
1057
1040
ReplicationServices &replication_services= ReplicationServices::singleton();
1058
1041
if (! replication_services.isActive())
1062
1045
* If no Transaction message was ever created, then no data modification
1063
1046
* occurred inside the transaction, so nothing to do.
1065
if (in_session->getTransactionMessage() == NULL)
1048
if (session.getTransactionMessage() == NULL)
1068
1051
/* If there is an active statement message, finalize it. */
1069
message::Statement *statement= in_session->getStatementMessage();
1052
message::Statement *statement= session.getStatementMessage();
1071
1054
if (statement != NULL)
1073
finalizeStatementMessage(*statement, in_session);
1056
finalizeStatementMessage(*statement, session);
1076
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1059
message::Transaction* transaction= getActiveTransactionMessage(session);
1079
1062
* It is possible that we could have a Transaction without any Statements
1084
1067
if (transaction->statement_size() == 0)
1086
cleanupTransactionMessage(transaction, in_session);
1069
cleanupTransactionMessage(transaction, session);
1090
finalizeTransactionMessage(*transaction, in_session);
1073
finalizeTransactionMessage(*transaction, session);
1092
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1075
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1094
cleanupTransactionMessage(transaction, in_session);
1077
cleanupTransactionMessage(transaction, session);
1096
1079
return static_cast<int>(result);
1099
1082
void TransactionServices::initStatementMessage(message::Statement &statement,
1100
message::Statement::Type in_type,
1101
Session *in_session)
1083
message::Statement::Type type,
1084
Session::const_reference session)
1103
statement.set_type(in_type);
1104
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1086
statement.set_type(type);
1087
statement.set_start_timestamp(session.getCurrentTimestamp());
1106
if (in_session->variables.replicate_query)
1107
statement.set_sql(in_session->getQueryString()->c_str());
1089
if (session.variables.replicate_query)
1090
statement.set_sql(session.getQueryString()->c_str());
1110
1093
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1111
Session *in_session)
1094
Session::reference session)
1113
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1114
in_session->setStatementMessage(NULL);
1096
statement.set_end_timestamp(session.getCurrentTimestamp());
1097
session.setStatementMessage(NULL);
1117
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1100
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1119
1102
ReplicationServices &replication_services= ReplicationServices::singleton();
1120
1103
if (! replication_services.isActive())
1123
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1106
message::Transaction *transaction= getActiveTransactionMessage(session);
1126
1109
* OK, so there are two situations that we need to deal with here:
1150
1133
* attach it to the transaction, and push it to replicators.
1152
1135
transaction->Clear();
1153
initTransactionMessage(*transaction, in_session, false);
1136
initTransactionMessage(*transaction, session, false);
1155
1138
/* Set the transaction ID to match the previous messages */
1156
1139
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1160
1143
message::Statement *statement= transaction->add_statement();
1162
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1163
finalizeStatementMessage(*statement, in_session);
1145
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1146
finalizeStatementMessage(*statement, session);
1165
finalizeTransactionMessage(*transaction, in_session);
1148
finalizeTransactionMessage(*transaction, session);
1167
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1150
(void) replication_services.pushTransactionMessage(session, *transaction);
1169
cleanupTransactionMessage(transaction, in_session);
1153
cleanupTransactionMessage(transaction, session);
1172
void TransactionServices::rollbackStatementMessage(Session *in_session)
1156
void TransactionServices::rollbackStatementMessage(Session::reference session)
1174
1158
ReplicationServices &replication_services= ReplicationServices::singleton();
1175
1159
if (! replication_services.isActive())
1178
message::Statement *current_statement= in_session->getStatementMessage();
1162
message::Statement *current_statement= session.getStatementMessage();
1180
1164
/* If we never added a Statement message, nothing to undo. */
1181
1165
if (current_statement == NULL)
1214
1198
* Remove the Statement message we've been working with (same as
1215
1199
* current_statement).
1217
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1201
message::Transaction *transaction= getActiveTransactionMessage(session);
1218
1202
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1219
1203
statements_in_txn= transaction->mutable_statement();
1220
1204
statements_in_txn->RemoveLast();
1221
in_session->setStatementMessage(NULL);
1205
session.setStatementMessage(NULL);
1224
1208
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1230
1214
current_statement= transaction->add_statement();
1231
1215
initStatementMessage(*current_statement,
1232
1216
message::Statement::ROLLBACK_STATEMENT,
1234
finalizeStatementMessage(*current_statement, in_session);
1218
finalizeStatementMessage(*current_statement, session);
1238
message::Transaction *TransactionServices::segmentTransactionMessage(Session *in_session,
1222
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1239
1223
message::Transaction *transaction)
1241
1225
uint64_t trx_id= transaction->transaction_context().transaction_id();
1242
1226
uint32_t seg_id= transaction->segment_id();
1244
1228
transaction->set_end_segment(false);
1245
commitTransactionMessage(in_session);
1246
transaction= getActiveTransactionMessage(in_session, false);
1229
commitTransactionMessage(session);
1230
transaction= getActiveTransactionMessage(session, false);
1248
1232
/* Set the transaction ID to match the previous messages */
1249
1233
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1253
1237
return transaction;
1256
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1240
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1258
1242
uint32_t *next_segment_id)
1260
message::Statement *statement= in_session->getStatementMessage();
1244
message::Statement *statement= session.getStatementMessage();
1261
1245
message::Transaction *transaction= NULL;
1270
1254
if (statement == NULL)
1272
transaction= getActiveTransactionMessage(in_session);
1256
transaction= getActiveTransactionMessage(session);
1274
1258
if (static_cast<size_t>(transaction->ByteSize()) >=
1275
in_session->variables.transaction_message_threshold)
1259
session.variables.transaction_message_threshold)
1277
transaction= segmentTransactionMessage(in_session, transaction);
1261
transaction= segmentTransactionMessage(session, transaction);
1280
1264
statement= transaction->add_statement();
1281
setInsertHeader(*statement, in_session, in_table);
1282
in_session->setStatementMessage(statement);
1265
setInsertHeader(*statement, session, table);
1266
session.setStatementMessage(statement);
1286
transaction= getActiveTransactionMessage(in_session);
1270
transaction= getActiveTransactionMessage(session);
1289
1273
* If we've passed our threshold for the statement size (possible for
1310
1294
* statement and transaction. This will also set the Transaction
1311
1295
* and Statement objects in Session to NULL.
1313
commitTransactionMessage(in_session);
1297
commitTransactionMessage(session);
1316
1300
* Statement and Transaction should now be NULL, so new ones will get
1317
1301
* created. We reuse the transaction id since we are segmenting
1318
1302
* one transaction.
1320
transaction= getActiveTransactionMessage(in_session, false);
1304
transaction= getActiveTransactionMessage(session, false);
1321
1305
assert(transaction != NULL);
1323
1307
statement= transaction->add_statement();
1324
setInsertHeader(*statement, in_session, in_table);
1325
in_session->setStatementMessage(statement);
1308
setInsertHeader(*statement, session, table);
1309
session.setStatementMessage(statement);
1327
1311
/* Set the transaction ID to match the previous messages */
1328
1312
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1346
1330
void TransactionServices::setInsertHeader(message::Statement &statement,
1347
Session *in_session,
1331
Session::const_reference session,
1350
initStatementMessage(statement, message::Statement::INSERT, in_session);
1334
initStatementMessage(statement, message::Statement::INSERT, session);
1353
1337
* Now we construct the specialized InsertHeader message inside
1358
1342
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1360
1344
string schema_name;
1361
(void) in_table->getShare()->getSchemaName(schema_name);
1345
(void) table.getShare()->getSchemaName(schema_name);
1362
1346
string table_name;
1363
(void) in_table->getShare()->getTableName(table_name);
1347
(void) table.getShare()->getTableName(table_name);
1365
1349
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1366
1350
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1368
1352
Field *current_field;
1369
Field **table_fields= in_table->getFields();
1353
Field **table_fields= table.getFields();
1371
1355
message::FieldMetadata *field_metadata;
1373
1357
/* We will read all the table's fields... */
1374
in_table->setReadSet();
1376
1360
while ((current_field= *table_fields++) != NULL)
1384
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1368
bool TransactionServices::insertRecord(Session::reference session,
1386
1371
ReplicationServices &replication_services= ReplicationServices::singleton();
1387
1372
if (! replication_services.isActive())
1390
1376
* We do this check here because we don't want to even create a
1391
1377
* statement if there isn't a primary key on the table...
1395
1381
* Multi-column primary keys are handled how exactly?
1397
if (not in_table->getShare()->hasPrimaryKey())
1383
if (not table.getShare()->hasPrimaryKey())
1399
1385
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1403
1389
uint32_t next_segment_id= 1;
1404
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1390
message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1406
1392
message::InsertData *data= statement.mutable_insert_data();
1407
1393
data->set_segment_id(next_segment_id);
1409
1395
message::InsertRecord *record= data->add_record();
1411
1397
Field *current_field;
1412
Field **table_fields= in_table->getFields();
1398
Field **table_fields= table.getFields();
1414
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1400
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1415
1401
string_value->set_charset(system_charset_info);
1417
1403
/* We will read all the table's fields... */
1418
in_table->setReadSet();
1420
1406
while ((current_field= *table_fields++) != NULL)
1438
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1424
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1440
1426
const unsigned char *old_record,
1441
1427
const unsigned char *new_record,
1442
1428
uint32_t *next_segment_id)
1444
message::Statement *statement= in_session->getStatementMessage();
1430
message::Statement *statement= session.getStatementMessage();
1445
1431
message::Transaction *transaction= NULL;
1454
1440
if (statement == NULL)
1456
transaction= getActiveTransactionMessage(in_session);
1442
transaction= getActiveTransactionMessage(session);
1458
1444
if (static_cast<size_t>(transaction->ByteSize()) >=
1459
in_session->variables.transaction_message_threshold)
1445
session.variables.transaction_message_threshold)
1461
transaction= segmentTransactionMessage(in_session, transaction);
1447
transaction= segmentTransactionMessage(session, transaction);
1464
1450
statement= transaction->add_statement();
1465
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1466
in_session->setStatementMessage(statement);
1451
setUpdateHeader(*statement, session, table, old_record, new_record);
1452
session.setStatementMessage(statement);
1470
transaction= getActiveTransactionMessage(in_session);
1456
transaction= getActiveTransactionMessage(session);
1473
1459
* If we've passed our threshold for the statement size (possible for
1494
1480
* statement and transaction. This will also set the Transaction
1495
1481
* and Statement objects in Session to NULL.
1497
commitTransactionMessage(in_session);
1483
commitTransactionMessage(session);
1500
1486
* Statement and Transaction should now be NULL, so new ones will get
1501
1487
* created. We reuse the transaction id since we are segmenting
1502
1488
* one transaction.
1504
transaction= getActiveTransactionMessage(in_session, false);
1490
transaction= getActiveTransactionMessage(session, false);
1505
1491
assert(transaction != NULL);
1507
1493
statement= transaction->add_statement();
1508
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1509
in_session->setStatementMessage(statement);
1494
setUpdateHeader(*statement, session, table, old_record, new_record);
1495
session.setStatementMessage(statement);
1511
1497
/* Set the transaction ID to match the previous messages */
1512
1498
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1530
1516
void TransactionServices::setUpdateHeader(message::Statement &statement,
1531
Session *in_session,
1517
Session::const_reference session,
1533
1519
const unsigned char *old_record,
1534
1520
const unsigned char *new_record)
1536
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1522
initStatementMessage(statement, message::Statement::UPDATE, session);
1539
1525
* Now we construct the specialized UpdateHeader message inside
1544
1530
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1546
1532
string schema_name;
1547
(void) in_table->getShare()->getSchemaName(schema_name);
1533
(void) table.getShare()->getSchemaName(schema_name);
1548
1534
string table_name;
1549
(void) in_table->getShare()->getTableName(table_name);
1535
(void) table.getShare()->getTableName(table_name);
1551
1537
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1552
1538
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1554
1540
Field *current_field;
1555
Field **table_fields= in_table->getFields();
1541
Field **table_fields= table.getFields();
1557
1543
message::FieldMetadata *field_metadata;
1559
1545
/* We will read all the table's fields... */
1560
in_table->setReadSet();
1562
1548
while ((current_field= *table_fields++) != NULL)
1565
1551
* We add the "key field metadata" -- i.e. the fields which is
1566
1552
* the primary key for the table.
1568
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1554
if (table.getShare()->fieldInPrimaryKey(current_field))
1570
1556
field_metadata= header->add_key_field_metadata();
1571
1557
field_metadata->set_name(current_field->field_name);
1572
1558
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1575
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1561
if (isFieldUpdated(current_field, table, old_record, new_record))
1577
1563
/* Field is changed from old to new */
1578
1564
field_metadata= header->add_set_field_metadata();
1593
1579
uint32_t next_segment_id= 1;
1594
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1580
message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1596
1582
message::UpdateData *data= statement.mutable_update_data();
1597
1583
data->set_segment_id(next_segment_id);
1599
1585
message::UpdateRecord *record= data->add_record();
1601
1587
Field *current_field;
1602
Field **table_fields= in_table->getFields();
1603
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1588
Field **table_fields= table.getFields();
1589
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1604
1590
string_value->set_charset(system_charset_info);
1606
1592
while ((current_field= *table_fields++) != NULL)
1617
1603
* We will generate two UpdateRecord messages with different set_value byte arrays.
1619
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1605
if (isFieldUpdated(current_field, table, old_record, new_record))
1621
1607
/* Store the original "read bit" for this field */
1622
1608
bool is_read_set= current_field->isReadSet();
1624
1610
/* We need to mark that we will "read" this field... */
1625
in_table->setReadSet(current_field->position());
1611
table.setReadSet(current_field->position());
1627
1613
/* Read the string value of this field's contents */
1628
1614
string_value= current_field->val_str_internal(string_value);
1678
1664
* we do this crazy pointer fiddling to figure out if the current field
1679
1665
* has been updated in the supplied record raw byte pointers.
1681
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1682
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1667
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1668
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1684
1670
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1709
1695
return isUpdated;
1712
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1698
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1714
1700
uint32_t *next_segment_id)
1716
message::Statement *statement= in_session->getStatementMessage();
1702
message::Statement *statement= session.getStatementMessage();
1717
1703
message::Transaction *transaction= NULL;
1726
1712
if (statement == NULL)
1728
transaction= getActiveTransactionMessage(in_session);
1714
transaction= getActiveTransactionMessage(session);
1730
1716
if (static_cast<size_t>(transaction->ByteSize()) >=
1731
in_session->variables.transaction_message_threshold)
1717
session.variables.transaction_message_threshold)
1733
transaction= segmentTransactionMessage(in_session, transaction);
1719
transaction= segmentTransactionMessage(session, transaction);
1736
1722
statement= transaction->add_statement();
1737
setDeleteHeader(*statement, in_session, in_table);
1738
in_session->setStatementMessage(statement);
1723
setDeleteHeader(*statement, session, table);
1724
session.setStatementMessage(statement);
1742
transaction= getActiveTransactionMessage(in_session);
1728
transaction= getActiveTransactionMessage(session);
1745
1731
* If we've passed our threshold for the statement size (possible for
1766
1752
* statement and transaction. This will also set the Transaction
1767
1753
* and Statement objects in Session to NULL.
1769
commitTransactionMessage(in_session);
1755
commitTransactionMessage(session);
1772
1758
* Statement and Transaction should now be NULL, so new ones will get
1773
1759
* created. We reuse the transaction id since we are segmenting
1774
1760
* one transaction.
1776
transaction= getActiveTransactionMessage(in_session, false);
1762
transaction= getActiveTransactionMessage(session, false);
1777
1763
assert(transaction != NULL);
1779
1765
statement= transaction->add_statement();
1780
setDeleteHeader(*statement, in_session, in_table);
1781
in_session->setStatementMessage(statement);
1766
setDeleteHeader(*statement, session, table);
1767
session.setStatementMessage(statement);
1783
1769
/* Set the transaction ID to match the previous messages */
1784
1770
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1802
1788
void TransactionServices::setDeleteHeader(message::Statement &statement,
1803
Session *in_session,
1789
Session::const_reference session,
1806
initStatementMessage(statement, message::Statement::DELETE, in_session);
1792
initStatementMessage(statement, message::Statement::DELETE, session);
1809
1795
* Now we construct the specialized DeleteHeader message inside
1813
1799
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1815
1801
string schema_name;
1816
(void) in_table->getShare()->getSchemaName(schema_name);
1802
(void) table.getShare()->getSchemaName(schema_name);
1817
1803
string table_name;
1818
(void) in_table->getShare()->getTableName(table_name);
1804
(void) table.getShare()->getTableName(table_name);
1820
1806
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1821
1807
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1823
1809
Field *current_field;
1824
Field **table_fields= in_table->getFields();
1810
Field **table_fields= table.getFields();
1826
1812
message::FieldMetadata *field_metadata;
1844
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1830
void TransactionServices::deleteRecord(Session::reference session,
1832
bool use_update_record)
1846
1834
ReplicationServices &replication_services= ReplicationServices::singleton();
1847
1835
if (! replication_services.isActive())
1850
1838
uint32_t next_segment_id= 1;
1851
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1839
message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1853
1841
message::DeleteData *data= statement.mutable_delete_data();
1854
1842
data->set_segment_id(next_segment_id);
1856
1844
message::DeleteRecord *record= data->add_record();
1858
1846
Field *current_field;
1859
Field **table_fields= in_table->getFields();
1860
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1847
Field **table_fields= table.getFields();
1848
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
1849
string_value->set_charset(system_charset_info);
1863
1851
while ((current_field= *table_fields++) != NULL)
1899
void TransactionServices::createTable(Session *in_session,
1887
void TransactionServices::createTable(Session::reference session,
1900
1888
const message::Table &table)
1902
1890
ReplicationServices &replication_services= ReplicationServices::singleton();
1903
1891
if (! replication_services.isActive())
1906
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1894
message::Transaction *transaction= getActiveTransactionMessage(session);
1907
1895
message::Statement *statement= transaction->add_statement();
1909
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1897
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1912
1900
* Construct the specialized CreateTableStatement message and attach
1916
1904
message::Table *new_table_message= create_table_statement->mutable_table();
1917
1905
*new_table_message= table;
1919
finalizeStatementMessage(*statement, in_session);
1907
finalizeStatementMessage(*statement, session);
1921
finalizeTransactionMessage(*transaction, in_session);
1909
finalizeTransactionMessage(*transaction, session);
1923
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1911
(void) replication_services.pushTransactionMessage(session, *transaction);
1925
cleanupTransactionMessage(transaction, in_session);
1913
cleanupTransactionMessage(transaction, session);
1929
void TransactionServices::createSchema(Session *in_session,
1917
void TransactionServices::createSchema(Session::reference session,
1930
1918
const message::Schema &schema)
1932
1920
ReplicationServices &replication_services= ReplicationServices::singleton();
1933
1921
if (! replication_services.isActive())
1936
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1924
message::Transaction *transaction= getActiveTransactionMessage(session);
1937
1925
message::Statement *statement= transaction->add_statement();
1939
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1927
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1942
1930
* Construct the specialized CreateSchemaStatement message and attach
1946
1934
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
1935
*new_schema_message= schema;
1949
finalizeStatementMessage(*statement, in_session);
1937
finalizeStatementMessage(*statement, session);
1951
finalizeTransactionMessage(*transaction, in_session);
1939
finalizeTransactionMessage(*transaction, session);
1953
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1941
(void) replication_services.pushTransactionMessage(session, *transaction);
1955
cleanupTransactionMessage(transaction, in_session);
1943
cleanupTransactionMessage(transaction, session);
1959
void TransactionServices::dropSchema(Session *in_session, identifier::Schema::const_reference identifier)
1947
void TransactionServices::dropSchema(Session::reference session,
1948
identifier::Schema::const_reference identifier)
1961
1950
ReplicationServices &replication_services= ReplicationServices::singleton();
1962
1951
if (! replication_services.isActive())
1965
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1954
message::Transaction *transaction= getActiveTransactionMessage(session);
1966
1955
message::Statement *statement= transaction->add_statement();
1968
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1957
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1971
1960
* Construct the specialized DropSchemaStatement message and attach
1976
1965
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1978
finalizeStatementMessage(*statement, in_session);
1967
finalizeStatementMessage(*statement, session);
1980
finalizeTransactionMessage(*transaction, in_session);
1969
finalizeTransactionMessage(*transaction, session);
1982
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1971
(void) replication_services.pushTransactionMessage(session, *transaction);
1984
cleanupTransactionMessage(transaction, in_session);
1973
cleanupTransactionMessage(transaction, session);
1987
void TransactionServices::alterSchema(Session *in_session,
1976
void TransactionServices::alterSchema(Session::reference session,
1988
1977
const message::schema::shared_ptr &old_schema,
1989
1978
const message::Schema &new_schema)
1992
1981
if (! replication_services.isActive())
1995
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1984
message::Transaction *transaction= getActiveTransactionMessage(session);
1996
1985
message::Statement *statement= transaction->add_statement();
1998
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, in_session);
1987
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2001
1990
* Construct the specialized AlterSchemaStatement message and attach
2009
1998
*before= *old_schema;
2010
1999
*after= new_schema;
2012
finalizeStatementMessage(*statement, in_session);
2001
finalizeStatementMessage(*statement, session);
2014
finalizeTransactionMessage(*transaction, in_session);
2003
finalizeTransactionMessage(*transaction, session);
2016
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2005
(void) replication_services.pushTransactionMessage(session, *transaction);
2018
cleanupTransactionMessage(transaction, in_session);
2007
cleanupTransactionMessage(transaction, session);
2021
void TransactionServices::dropTable(Session *in_session,
2010
void TransactionServices::dropTable(Session::reference session,
2022
2011
const identifier::Table &table,
2023
2012
bool if_exists)
2026
2015
if (! replication_services.isActive())
2029
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2018
message::Transaction *transaction= getActiveTransactionMessage(session);
2030
2019
message::Statement *statement= transaction->add_statement();
2032
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2021
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2035
2024
* Construct the specialized DropTableStatement message and attach
2044
2033
table_metadata->set_schema_name(table.getSchemaName());
2045
2034
table_metadata->set_table_name(table.getTableName());
2047
finalizeStatementMessage(*statement, in_session);
2036
finalizeStatementMessage(*statement, session);
2049
finalizeTransactionMessage(*transaction, in_session);
2038
finalizeTransactionMessage(*transaction, session);
2051
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2040
(void) replication_services.pushTransactionMessage(session, *transaction);
2053
cleanupTransactionMessage(transaction, in_session);
2042
cleanupTransactionMessage(transaction, session);
2056
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2045
void TransactionServices::truncateTable(Session::reference session,
2058
2048
ReplicationServices &replication_services= ReplicationServices::singleton();
2059
2049
if (! replication_services.isActive())
2062
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2052
message::Transaction *transaction= getActiveTransactionMessage(session);
2063
2053
message::Statement *statement= transaction->add_statement();
2065
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2055
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2068
2058
* Construct the specialized TruncateTableStatement message and attach
2072
2062
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2074
2064
string schema_name;
2075
(void) in_table->getShare()->getSchemaName(schema_name);
2065
(void) table.getShare()->getSchemaName(schema_name);
2076
2066
string table_name;
2077
(void) in_table->getShare()->getTableName(table_name);
2067
(void) table.getShare()->getTableName(table_name);
2079
2069
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2080
2070
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2082
finalizeStatementMessage(*statement, in_session);
2072
finalizeStatementMessage(*statement, session);
2084
finalizeTransactionMessage(*transaction, in_session);
2074
finalizeTransactionMessage(*transaction, session);
2086
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2076
(void) replication_services.pushTransactionMessage(session, *transaction);
2088
cleanupTransactionMessage(transaction, in_session);
2078
cleanupTransactionMessage(transaction, session);
2091
void TransactionServices::rawStatement(Session *in_session, const string &query)
2081
void TransactionServices::rawStatement(Session::reference session,
2082
const string &query)
2093
2084
ReplicationServices &replication_services= ReplicationServices::singleton();
2094
2085
if (! replication_services.isActive())
2097
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2088
message::Transaction *transaction= getActiveTransactionMessage(session);
2098
2089
message::Statement *statement= transaction->add_statement();
2100
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2091
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2101
2092
statement->set_sql(query);
2102
finalizeStatementMessage(*statement, in_session);
2093
finalizeStatementMessage(*statement, session);
2104
finalizeTransactionMessage(*transaction, in_session);
2095
finalizeTransactionMessage(*transaction, session);
2106
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2097
(void) replication_services.pushTransactionMessage(session, *transaction);
2108
cleanupTransactionMessage(transaction, in_session);
2099
cleanupTransactionMessage(transaction, session);
2111
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2102
int TransactionServices::sendEvent(Session::reference session,
2103
const message::Event &event)
2113
2105
ReplicationServices &replication_services= ReplicationServices::singleton();
2114
2106
if (! replication_services.isActive())
2127
2119
trx_event->CopyFrom(event);
2129
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2121
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2131
2123
delete transaction;
2133
2125
return static_cast<int>(result);
2136
bool TransactionServices::sendStartupEvent(Session *session)
2128
bool TransactionServices::sendStartupEvent(Session::reference session)
2138
2130
message::Event event;
2139
2131
event.set_type(message::Event::STARTUP);