316
void TransactionServices::registerResourceForStatement(Session *session,
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
317
plugin::MonitoredInTransaction *monitored,
318
318
plugin::TransactionalStorageEngine *engine)
320
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
320
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
323
323
* Now we automatically register this resource manager for the
329
329
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session->transaction.stmt;
333
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
332
TransactionContext *trans= &session.transaction.stmt;
333
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
335
335
if (resource_context->isStarted())
336
336
return; /* already registered, return */
345
345
trans->no_2pc|= true;
348
void TransactionServices::registerResourceForStatement(Session *session,
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
349
plugin::MonitoredInTransaction *monitored,
350
350
plugin::TransactionalStorageEngine *engine,
351
351
plugin::XaResourceManager *resource_manager)
353
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
353
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
356
356
* Now we automatically register this resource manager for the
362
362
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session->transaction.stmt;
366
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
365
TransactionContext *trans= &session.transaction.stmt;
366
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
368
368
if (resource_context->isStarted())
369
369
return; /* already registered, return */
379
379
trans->no_2pc|= false;
382
void TransactionServices::registerResourceForTransaction(Session *session,
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
383
plugin::MonitoredInTransaction *monitored,
384
384
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session->transaction.all;
387
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
386
TransactionContext *trans= &session.transaction.all;
387
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
389
389
if (resource_context->isStarted())
390
390
return; /* already registered, return */
392
session->server_status|= SERVER_STATUS_IN_TRANS;
392
session.server_status|= SERVER_STATUS_IN_TRANS;
394
394
trans->registerResource(resource_context);
400
400
resource_context->setTransactionalStorageEngine(engine);
401
401
trans->no_2pc|= true;
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
403
if (session.transaction.xid_state.xid.is_null())
404
session.transaction.xid_state.xid.set(session.getQueryId());
406
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session->getResourceContext(monitored, 0)->isStarted())
407
if (! session.getResourceContext(monitored, 0)->isStarted())
408
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session *session,
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
412
plugin::MonitoredInTransaction *monitored,
413
413
plugin::TransactionalStorageEngine *engine,
414
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
416
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
419
419
if (resource_context->isStarted())
420
420
return; /* already registered, return */
422
session->server_status|= SERVER_STATUS_IN_TRANS;
422
session.server_status|= SERVER_STATUS_IN_TRANS;
424
424
trans->registerResource(resource_context);
430
430
resource_context->setTransactionalStorageEngine(engine);
431
431
trans->no_2pc|= true;
433
if (session->transaction.xid_state.xid.is_null())
434
session->transaction.xid_state.xid.set(session->getQueryId());
433
if (session.transaction.xid_state.xid.is_null())
434
session.transaction.xid_state.xid.set(session.getQueryId());
436
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
436
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
438
438
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session->getResourceContext(monitored, 0)->isStarted())
439
if (! session.getResourceContext(monitored, 0)->isStarted())
440
440
registerResourceForStatement(session, monitored, engine, resource_manager);
453
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
458
if (session->getXaId() == 0)
458
if (session.getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
463
return session->getXaId();
463
return session.getXaId();
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
466
int TransactionServices::commitTransaction(Session::reference session,
467
bool normal_transaction)
482
469
int error= 0, cookie= 0;
484
471
'all' means that this is either an explicit commit issued by
485
472
user, or an implicit commit issued by a DDL.
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
474
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
488
475
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
477
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
493
480
We must not commit the normal transaction if a statement
495
482
flags will not get propagated to its normal transaction's
498
assert(session->transaction.stmt.getResourceContexts().empty() ||
499
trans == &session->transaction.stmt);
485
assert(session.transaction.stmt.getResourceContexts().empty() ||
486
trans == &session.transaction.stmt);
501
488
if (resource_contexts.empty() == false)
503
if (is_real_trans && session->wait_if_global_read_lock(false, false))
490
if (is_real_trans && session.wait_if_global_read_lock(false, false))
505
492
rollbackTransaction(session, normal_transaction);
532
519
if (resource->participatesInXaTransaction())
534
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
521
if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
536
523
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
541
session->status_var.ha_prepare_count++;
528
session.status_var.ha_prepare_count++;
570
557
This function does not care about global read lock. A caller should.
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
559
int TransactionServices::commitPhaseOne(Session::reference session,
560
bool normal_transaction)
575
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
563
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
576
564
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
578
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
566
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
579
567
bool all= normal_transaction;
581
569
/* If we're in autocommit then we have a real transaction to commit
582
570
(except if it's BEGIN)
584
if (! session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
572
if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
587
575
if (resource_contexts.empty() == false)
598
586
if (resource->participatesInXaTransaction())
600
if ((err= resource_context->getXaResourceManager()->xaCommit(session, all)))
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
602
590
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
605
593
else if (normal_transaction)
607
session->status_var.ha_commit_count++;
595
session.status_var.ha_commit_count++;
610
598
else if (resource->participatesInSqlTransaction())
612
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, all)))
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
614
602
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
617
605
else if (normal_transaction)
619
session->status_var.ha_commit_count++;
607
session.status_var.ha_commit_count++;
622
610
resource_context->reset(); /* keep it conveniently zero-filled */
625
613
if (is_real_trans)
626
session->transaction.xid_state.xid.null();
614
session.transaction.xid_state.xid.null();
628
616
if (normal_transaction)
630
session->variables.tx_isolation= session->session_tx_isolation;
631
session->transaction.cleanup();
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
638
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
641
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
642
631
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
644
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
645
bool all = normal_transaction || !session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
648
637
We must not rollback the normal transaction if a statement
649
638
transaction is pending.
651
assert(session->transaction.stmt.getResourceContexts().empty() ||
652
trans == &session->transaction.stmt);
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
654
643
if (resource_contexts.empty() == false)
665
654
if (resource->participatesInXaTransaction())
667
if ((err= resource_context->getXaResourceManager()->xaRollback(session, all)))
656
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
669
658
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
672
661
else if (normal_transaction)
674
session->status_var.ha_rollback_count++;
663
session.status_var.ha_rollback_count++;
677
666
else if (resource->participatesInSqlTransaction())
679
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, all)))
668
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
681
670
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
684
673
else if (normal_transaction)
686
session->status_var.ha_rollback_count++;
675
session.status_var.ha_rollback_count++;
689
678
resource_context->reset(); /* keep it conveniently zero-filled */
702
691
rollbackStatementMessage(session);
704
693
if (is_real_trans)
705
session->transaction.xid_state.xid.null();
694
session.transaction.xid_state.xid.null();
706
695
if (normal_transaction)
708
session->variables.tx_isolation=session->session_tx_isolation;
709
session->transaction.cleanup();
697
session.variables.tx_isolation=session.session_tx_isolation;
698
session.transaction.cleanup();
712
701
if (normal_transaction)
713
session->transaction_rollback_request= false;
702
session.transaction_rollback_request= false;
716
705
* If a non-transactional table was updated, warn the user
718
707
if (is_real_trans &&
719
session->transaction.all.hasModifiedNonTransData() &&
720
session->getKilled() != Session::KILL_CONNECTION)
708
session.transaction.all.hasModifiedNonTransData() &&
709
session.getKilled() != Session::KILL_CONNECTION)
722
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
711
push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
723
712
ER_WARNING_NOT_COMPLETE_ROLLBACK,
724
713
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
731
This is used to commit or rollback a single statement depending on
735
Note that if the autocommit is on, then the following call inside
736
InnoDB will commit or rollback the whole transaction (= the statement). The
737
autocommit mechanism built into InnoDB is based on counting locks, but if
738
the user has used LOCK TABLES then that mechanism does not know to do the
741
int TransactionServices::autocommitOrRollback(Session *session, int error)
719
int TransactionServices::autocommitOrRollback(Session::reference session,
743
722
/* One GPB Statement message per SQL statement */
744
message::Statement *statement= session->getStatementMessage();
723
message::Statement *statement= session.getStatementMessage();
745
724
if ((statement != NULL) && (! error))
746
725
finalizeStatementMessage(*statement, session);
748
if (session->transaction.stmt.getResourceContexts().empty() == false)
727
if (session.transaction.stmt.getResourceContexts().empty() == false)
750
TransactionContext *trans = &session->transaction.stmt;
729
TransactionContext *trans = &session.transaction.stmt;
751
730
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
752
731
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
753
732
it != resource_contexts.end();
788
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
767
int TransactionServices::rollbackToSavepoint(Session::reference session,
791
TransactionContext *trans= &session->transaction.all;
771
TransactionContext *trans= &session.transaction.all;
792
772
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
793
773
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
809
789
if (resource->participatesInSqlTransaction())
811
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
791
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
813
793
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
818
session->status_var.ha_savepoint_rollback_count++;
798
session.status_var.ha_savepoint_rollback_count++;
821
801
trans->no_2pc|= not resource->participatesInXaTransaction();
866
846
if (resource->participatesInSqlTransaction())
868
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
848
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
870
850
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
875
session->status_var.ha_rollback_count++;
855
session.status_var.ha_rollback_count++;
878
858
resource_context->reset(); /* keep it conveniently zero-filled */
895
875
uint32_t num_statements = savepoint_transaction_copy->statement_size();
896
876
if (num_statements == 0)
898
session->setStatementMessage(NULL);
878
session.setStatementMessage(NULL);
902
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
882
session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
904
session->setTransactionMessage(savepoint_transaction_copy);
884
session.setTransactionMessage(savepoint_transaction_copy);
914
894
section "4.33.4 SQL-statements and transaction states",
915
895
NamedSavepoint is *not* transaction-initiating SQL-statement
917
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
897
int TransactionServices::setSavepoint(Session::reference session,
920
TransactionContext *trans= &session->transaction.all;
901
TransactionContext *trans= &session.transaction.all;
921
902
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
923
904
if (resource_contexts.empty() == false)
934
915
if (resource->participatesInSqlTransaction())
936
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
917
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
938
919
my_error(ER_GET_ERRNO, MYF(0), err);
943
session->status_var.ha_savepoint_count++;
924
session.status_var.ha_savepoint_count++;
999
981
return replication_services.isActive();
1002
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
984
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
985
bool should_inc_trx_id)
1004
message::Transaction *transaction= in_session->getTransactionMessage();
987
message::Transaction *transaction= session.getTransactionMessage();
1006
989
if (unlikely(transaction == NULL))
1011
994
* deleting transaction message when done with it.
1013
996
transaction= new (nothrow) message::Transaction();
1014
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1015
in_session->setTransactionMessage(transaction);
997
initTransactionMessage(*transaction, session, should_inc_trx_id);
998
session.setTransactionMessage(transaction);
1016
999
return transaction;
1019
1002
return transaction;
1022
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1023
Session *in_session,
1005
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1006
Session::reference session,
1024
1007
bool should_inc_trx_id)
1026
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1027
trx->set_server_id(in_session->getServerId());
1009
message::TransactionContext *trx= transaction.mutable_transaction_context();
1010
trx->set_server_id(session.getServerId());
1029
1012
if (should_inc_trx_id)
1031
trx->set_transaction_id(getCurrentTransactionId(in_session));
1032
in_session->setXaId(0);
1014
trx->set_transaction_id(getCurrentTransactionId(session));
1037
1020
trx->set_transaction_id(0);
1040
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1023
trx->set_start_timestamp(session.getCurrentTimestamp());
1042
1025
/* segment info may get set elsewhere as needed */
1043
in_transaction.set_segment_id(1);
1044
in_transaction.set_end_segment(true);
1047
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1048
Session *in_session)
1050
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1051
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1054
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1055
Session *in_session)
1057
delete in_transaction;
1058
in_session->setStatementMessage(NULL);
1059
in_session->setTransactionMessage(NULL);
1060
in_session->setXaId(0);
1063
int TransactionServices::commitTransactionMessage(Session *in_session)
1026
transaction.set_segment_id(1);
1027
transaction.set_end_segment(true);
1030
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1031
Session::const_reference session)
1033
message::TransactionContext *trx= transaction.mutable_transaction_context();
1034
trx->set_end_timestamp(session.getCurrentTimestamp());
1037
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1038
Session::reference session)
1041
session.setStatementMessage(NULL);
1042
session.setTransactionMessage(NULL);
1046
int TransactionServices::commitTransactionMessage(Session::reference session)
1065
1048
ReplicationServices &replication_services= ReplicationServices::singleton();
1066
1049
if (! replication_services.isActive())
1070
1053
* If no Transaction message was ever created, then no data modification
1071
1054
* occurred inside the transaction, so nothing to do.
1073
if (in_session->getTransactionMessage() == NULL)
1056
if (session.getTransactionMessage() == NULL)
1076
1059
/* If there is an active statement message, finalize it. */
1077
message::Statement *statement= in_session->getStatementMessage();
1060
message::Statement *statement= session.getStatementMessage();
1079
1062
if (statement != NULL)
1081
finalizeStatementMessage(*statement, in_session);
1064
finalizeStatementMessage(*statement, session);
1084
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1067
message::Transaction* transaction= getActiveTransactionMessage(session);
1087
1070
* It is possible that we could have a Transaction without any Statements
1092
1075
if (transaction->statement_size() == 0)
1094
cleanupTransactionMessage(transaction, in_session);
1077
cleanupTransactionMessage(transaction, session);
1098
finalizeTransactionMessage(*transaction, in_session);
1081
finalizeTransactionMessage(*transaction, session);
1100
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1083
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1102
cleanupTransactionMessage(transaction, in_session);
1085
cleanupTransactionMessage(transaction, session);
1104
1087
return static_cast<int>(result);
1107
1090
void TransactionServices::initStatementMessage(message::Statement &statement,
1108
message::Statement::Type in_type,
1109
Session *in_session)
1091
message::Statement::Type type,
1092
Session::const_reference session)
1111
statement.set_type(in_type);
1112
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1094
statement.set_type(type);
1095
statement.set_start_timestamp(session.getCurrentTimestamp());
1114
if (in_session->variables.replicate_query)
1115
statement.set_sql(in_session->getQueryString()->c_str());
1097
if (session.variables.replicate_query)
1098
statement.set_sql(session.getQueryString()->c_str());
1118
1101
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1119
Session *in_session)
1102
Session::reference session)
1121
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1122
in_session->setStatementMessage(NULL);
1104
statement.set_end_timestamp(session.getCurrentTimestamp());
1105
session.setStatementMessage(NULL);
1125
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1108
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1127
1110
ReplicationServices &replication_services= ReplicationServices::singleton();
1128
1111
if (! replication_services.isActive())
1131
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1114
message::Transaction *transaction= getActiveTransactionMessage(session);
1134
1117
* OK, so there are two situations that we need to deal with here:
1158
1141
* attach it to the transaction, and push it to replicators.
1160
1143
transaction->Clear();
1161
initTransactionMessage(*transaction, in_session, false);
1144
initTransactionMessage(*transaction, session, false);
1163
1146
/* Set the transaction ID to match the previous messages */
1164
1147
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1168
1151
message::Statement *statement= transaction->add_statement();
1170
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1171
finalizeStatementMessage(*statement, in_session);
1153
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1154
finalizeStatementMessage(*statement, session);
1173
finalizeTransactionMessage(*transaction, in_session);
1156
finalizeTransactionMessage(*transaction, session);
1175
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1158
(void) replication_services.pushTransactionMessage(session, *transaction);
1177
cleanupTransactionMessage(transaction, in_session);
1161
cleanupTransactionMessage(transaction, session);
1180
void TransactionServices::rollbackStatementMessage(Session *in_session)
1164
void TransactionServices::rollbackStatementMessage(Session::reference session)
1182
1166
ReplicationServices &replication_services= ReplicationServices::singleton();
1183
1167
if (! replication_services.isActive())
1186
message::Statement *current_statement= in_session->getStatementMessage();
1170
message::Statement *current_statement= session.getStatementMessage();
1188
1172
/* If we never added a Statement message, nothing to undo. */
1189
1173
if (current_statement == NULL)
1222
1206
* Remove the Statement message we've been working with (same as
1223
1207
* current_statement).
1225
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1209
message::Transaction *transaction= getActiveTransactionMessage(session);
1226
1210
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1227
1211
statements_in_txn= transaction->mutable_statement();
1228
1212
statements_in_txn->RemoveLast();
1229
in_session->setStatementMessage(NULL);
1213
session.setStatementMessage(NULL);
1232
1216
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1238
1222
current_statement= transaction->add_statement();
1239
1223
initStatementMessage(*current_statement,
1240
1224
message::Statement::ROLLBACK_STATEMENT,
1242
finalizeStatementMessage(*current_statement, in_session);
1226
finalizeStatementMessage(*current_statement, session);
1246
message::Transaction *TransactionServices::segmentTransactionMessage(Session *in_session,
1230
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1247
1231
message::Transaction *transaction)
1249
1233
uint64_t trx_id= transaction->transaction_context().transaction_id();
1250
1234
uint32_t seg_id= transaction->segment_id();
1252
1236
transaction->set_end_segment(false);
1253
commitTransactionMessage(in_session);
1254
transaction= getActiveTransactionMessage(in_session, false);
1237
commitTransactionMessage(session);
1238
transaction= getActiveTransactionMessage(session, false);
1256
1240
/* Set the transaction ID to match the previous messages */
1257
1241
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1261
1245
return transaction;
1264
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1248
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1266
1250
uint32_t *next_segment_id)
1268
message::Statement *statement= in_session->getStatementMessage();
1252
message::Statement *statement= session.getStatementMessage();
1269
1253
message::Transaction *transaction= NULL;
1278
1262
if (statement == NULL)
1280
transaction= getActiveTransactionMessage(in_session);
1264
transaction= getActiveTransactionMessage(session);
1282
1266
if (static_cast<size_t>(transaction->ByteSize()) >=
1283
in_session->variables.transaction_message_threshold)
1267
session.variables.transaction_message_threshold)
1285
transaction= segmentTransactionMessage(in_session, transaction);
1269
transaction= segmentTransactionMessage(session, transaction);
1288
1272
statement= transaction->add_statement();
1289
setInsertHeader(*statement, in_session, in_table);
1290
in_session->setStatementMessage(statement);
1273
setInsertHeader(*statement, session, table);
1274
session.setStatementMessage(statement);
1294
transaction= getActiveTransactionMessage(in_session);
1278
transaction= getActiveTransactionMessage(session);
1297
1281
* If we've passed our threshold for the statement size (possible for
1318
1302
* statement and transaction. This will also set the Transaction
1319
1303
* and Statement objects in Session to NULL.
1321
commitTransactionMessage(in_session);
1305
commitTransactionMessage(session);
1324
1308
* Statement and Transaction should now be NULL, so new ones will get
1325
1309
* created. We reuse the transaction id since we are segmenting
1326
1310
* one transaction.
1328
transaction= getActiveTransactionMessage(in_session, false);
1312
transaction= getActiveTransactionMessage(session, false);
1329
1313
assert(transaction != NULL);
1331
1315
statement= transaction->add_statement();
1332
setInsertHeader(*statement, in_session, in_table);
1333
in_session->setStatementMessage(statement);
1316
setInsertHeader(*statement, session, table);
1317
session.setStatementMessage(statement);
1335
1319
/* Set the transaction ID to match the previous messages */
1336
1320
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1354
1338
void TransactionServices::setInsertHeader(message::Statement &statement,
1355
Session *in_session,
1339
Session::const_reference session,
1358
initStatementMessage(statement, message::Statement::INSERT, in_session);
1342
initStatementMessage(statement, message::Statement::INSERT, session);
1361
1345
* Now we construct the specialized InsertHeader message inside
1366
1350
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1368
1352
string schema_name;
1369
(void) in_table->getShare()->getSchemaName(schema_name);
1353
(void) table.getShare()->getSchemaName(schema_name);
1370
1354
string table_name;
1371
(void) in_table->getShare()->getTableName(table_name);
1355
(void) table.getShare()->getTableName(table_name);
1373
1357
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1374
1358
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1376
1360
Field *current_field;
1377
Field **table_fields= in_table->getFields();
1361
Field **table_fields= table.getFields();
1379
1363
message::FieldMetadata *field_metadata;
1381
1365
/* We will read all the table's fields... */
1382
in_table->setReadSet();
1384
1368
while ((current_field= *table_fields++) != NULL)
1392
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1376
bool TransactionServices::insertRecord(Session::reference session,
1394
1379
ReplicationServices &replication_services= ReplicationServices::singleton();
1395
1380
if (! replication_services.isActive())
1398
1384
* We do this check here because we don't want to even create a
1399
1385
* statement if there isn't a primary key on the table...
1403
1389
* Multi-column primary keys are handled how exactly?
1405
if (not in_table->getShare()->hasPrimaryKey())
1391
if (not table.getShare()->hasPrimaryKey())
1407
1393
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1411
1397
uint32_t next_segment_id= 1;
1412
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1398
message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1414
1400
message::InsertData *data= statement.mutable_insert_data();
1415
1401
data->set_segment_id(next_segment_id);
1417
1403
message::InsertRecord *record= data->add_record();
1419
1405
Field *current_field;
1420
Field **table_fields= in_table->getFields();
1406
Field **table_fields= table.getFields();
1422
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1408
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1423
1409
string_value->set_charset(system_charset_info);
1425
1411
/* We will read all the table's fields... */
1426
in_table->setReadSet();
1428
1414
while ((current_field= *table_fields++) != NULL)
1446
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1432
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1448
1434
const unsigned char *old_record,
1449
1435
const unsigned char *new_record,
1450
1436
uint32_t *next_segment_id)
1452
message::Statement *statement= in_session->getStatementMessage();
1438
message::Statement *statement= session.getStatementMessage();
1453
1439
message::Transaction *transaction= NULL;
1462
1448
if (statement == NULL)
1464
transaction= getActiveTransactionMessage(in_session);
1450
transaction= getActiveTransactionMessage(session);
1466
1452
if (static_cast<size_t>(transaction->ByteSize()) >=
1467
in_session->variables.transaction_message_threshold)
1453
session.variables.transaction_message_threshold)
1469
transaction= segmentTransactionMessage(in_session, transaction);
1455
transaction= segmentTransactionMessage(session, transaction);
1472
1458
statement= transaction->add_statement();
1473
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1474
in_session->setStatementMessage(statement);
1459
setUpdateHeader(*statement, session, table, old_record, new_record);
1460
session.setStatementMessage(statement);
1478
transaction= getActiveTransactionMessage(in_session);
1464
transaction= getActiveTransactionMessage(session);
1481
1467
* If we've passed our threshold for the statement size (possible for
1502
1488
* statement and transaction. This will also set the Transaction
1503
1489
* and Statement objects in Session to NULL.
1505
commitTransactionMessage(in_session);
1491
commitTransactionMessage(session);
1508
1494
* Statement and Transaction should now be NULL, so new ones will get
1509
1495
* created. We reuse the transaction id since we are segmenting
1510
1496
* one transaction.
1512
transaction= getActiveTransactionMessage(in_session, false);
1498
transaction= getActiveTransactionMessage(session, false);
1513
1499
assert(transaction != NULL);
1515
1501
statement= transaction->add_statement();
1516
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1517
in_session->setStatementMessage(statement);
1502
setUpdateHeader(*statement, session, table, old_record, new_record);
1503
session.setStatementMessage(statement);
1519
1505
/* Set the transaction ID to match the previous messages */
1520
1506
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1538
1524
void TransactionServices::setUpdateHeader(message::Statement &statement,
1539
Session *in_session,
1525
Session::const_reference session,
1541
1527
const unsigned char *old_record,
1542
1528
const unsigned char *new_record)
1544
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1530
initStatementMessage(statement, message::Statement::UPDATE, session);
1547
1533
* Now we construct the specialized UpdateHeader message inside
1552
1538
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1554
1540
string schema_name;
1555
(void) in_table->getShare()->getSchemaName(schema_name);
1541
(void) table.getShare()->getSchemaName(schema_name);
1556
1542
string table_name;
1557
(void) in_table->getShare()->getTableName(table_name);
1543
(void) table.getShare()->getTableName(table_name);
1559
1545
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1560
1546
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1562
1548
Field *current_field;
1563
Field **table_fields= in_table->getFields();
1549
Field **table_fields= table.getFields();
1565
1551
message::FieldMetadata *field_metadata;
1567
1553
/* We will read all the table's fields... */
1568
in_table->setReadSet();
1570
1556
while ((current_field= *table_fields++) != NULL)
1573
1559
* We add the "key field metadata" -- i.e. the fields which is
1574
1560
* the primary key for the table.
1576
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1562
if (table.getShare()->fieldInPrimaryKey(current_field))
1578
1564
field_metadata= header->add_key_field_metadata();
1579
1565
field_metadata->set_name(current_field->field_name);
1580
1566
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1583
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1569
if (isFieldUpdated(current_field, table, old_record, new_record))
1585
1571
/* Field is changed from old to new */
1586
1572
field_metadata= header->add_set_field_metadata();
1601
1587
uint32_t next_segment_id= 1;
1602
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1588
message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1604
1590
message::UpdateData *data= statement.mutable_update_data();
1605
1591
data->set_segment_id(next_segment_id);
1607
1593
message::UpdateRecord *record= data->add_record();
1609
1595
Field *current_field;
1610
Field **table_fields= in_table->getFields();
1611
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1596
Field **table_fields= table.getFields();
1597
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1612
1598
string_value->set_charset(system_charset_info);
1614
1600
while ((current_field= *table_fields++) != NULL)
1625
1611
* We will generate two UpdateRecord messages with different set_value byte arrays.
1627
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1613
if (isFieldUpdated(current_field, table, old_record, new_record))
1629
1615
/* Store the original "read bit" for this field */
1630
1616
bool is_read_set= current_field->isReadSet();
1632
1618
/* We need to mark that we will "read" this field... */
1633
in_table->setReadSet(current_field->position());
1619
table.setReadSet(current_field->position());
1635
1621
/* Read the string value of this field's contents */
1636
1622
string_value= current_field->val_str_internal(string_value);
1686
1672
* we do this crazy pointer fiddling to figure out if the current field
1687
1673
* has been updated in the supplied record raw byte pointers.
1689
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1690
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1675
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1676
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1692
1678
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1717
1703
return isUpdated;
1720
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1706
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1722
1708
uint32_t *next_segment_id)
1724
message::Statement *statement= in_session->getStatementMessage();
1710
message::Statement *statement= session.getStatementMessage();
1725
1711
message::Transaction *transaction= NULL;
1734
1720
if (statement == NULL)
1736
transaction= getActiveTransactionMessage(in_session);
1722
transaction= getActiveTransactionMessage(session);
1738
1724
if (static_cast<size_t>(transaction->ByteSize()) >=
1739
in_session->variables.transaction_message_threshold)
1725
session.variables.transaction_message_threshold)
1741
transaction= segmentTransactionMessage(in_session, transaction);
1727
transaction= segmentTransactionMessage(session, transaction);
1744
1730
statement= transaction->add_statement();
1745
setDeleteHeader(*statement, in_session, in_table);
1746
in_session->setStatementMessage(statement);
1731
setDeleteHeader(*statement, session, table);
1732
session.setStatementMessage(statement);
1750
transaction= getActiveTransactionMessage(in_session);
1736
transaction= getActiveTransactionMessage(session);
1753
1739
* If we've passed our threshold for the statement size (possible for
1774
1760
* statement and transaction. This will also set the Transaction
1775
1761
* and Statement objects in Session to NULL.
1777
commitTransactionMessage(in_session);
1763
commitTransactionMessage(session);
1780
1766
* Statement and Transaction should now be NULL, so new ones will get
1781
1767
* created. We reuse the transaction id since we are segmenting
1782
1768
* one transaction.
1784
transaction= getActiveTransactionMessage(in_session, false);
1770
transaction= getActiveTransactionMessage(session, false);
1785
1771
assert(transaction != NULL);
1787
1773
statement= transaction->add_statement();
1788
setDeleteHeader(*statement, in_session, in_table);
1789
in_session->setStatementMessage(statement);
1774
setDeleteHeader(*statement, session, table);
1775
session.setStatementMessage(statement);
1791
1777
/* Set the transaction ID to match the previous messages */
1792
1778
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1810
1796
void TransactionServices::setDeleteHeader(message::Statement &statement,
1811
Session *in_session,
1797
Session::const_reference session,
1814
initStatementMessage(statement, message::Statement::DELETE, in_session);
1800
initStatementMessage(statement, message::Statement::DELETE, session);
1817
1803
* Now we construct the specialized DeleteHeader message inside
1821
1807
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1823
1809
string schema_name;
1824
(void) in_table->getShare()->getSchemaName(schema_name);
1810
(void) table.getShare()->getSchemaName(schema_name);
1825
1811
string table_name;
1826
(void) in_table->getShare()->getTableName(table_name);
1812
(void) table.getShare()->getTableName(table_name);
1828
1814
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1829
1815
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1831
1817
Field *current_field;
1832
Field **table_fields= in_table->getFields();
1818
Field **table_fields= table.getFields();
1834
1820
message::FieldMetadata *field_metadata;
1852
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1838
void TransactionServices::deleteRecord(Session::reference session,
1840
bool use_update_record)
1854
1842
ReplicationServices &replication_services= ReplicationServices::singleton();
1855
1843
if (! replication_services.isActive())
1858
1846
uint32_t next_segment_id= 1;
1859
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1847
message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1861
1849
message::DeleteData *data= statement.mutable_delete_data();
1862
1850
data->set_segment_id(next_segment_id);
1864
1852
message::DeleteRecord *record= data->add_record();
1866
1854
Field *current_field;
1867
Field **table_fields= in_table->getFields();
1868
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1855
Field **table_fields= table.getFields();
1856
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1869
1857
string_value->set_charset(system_charset_info);
1871
1859
while ((current_field= *table_fields++) != NULL)
1907
void TransactionServices::createTable(Session *in_session,
1895
void TransactionServices::createTable(Session::reference session,
1908
1896
const message::Table &table)
1910
1898
ReplicationServices &replication_services= ReplicationServices::singleton();
1911
1899
if (! replication_services.isActive())
1914
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1902
message::Transaction *transaction= getActiveTransactionMessage(session);
1915
1903
message::Statement *statement= transaction->add_statement();
1917
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1905
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1920
1908
* Construct the specialized CreateTableStatement message and attach
1924
1912
message::Table *new_table_message= create_table_statement->mutable_table();
1925
1913
*new_table_message= table;
1927
finalizeStatementMessage(*statement, in_session);
1915
finalizeStatementMessage(*statement, session);
1929
finalizeTransactionMessage(*transaction, in_session);
1917
finalizeTransactionMessage(*transaction, session);
1931
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1919
(void) replication_services.pushTransactionMessage(session, *transaction);
1933
cleanupTransactionMessage(transaction, in_session);
1921
cleanupTransactionMessage(transaction, session);
1937
void TransactionServices::createSchema(Session *in_session,
1925
void TransactionServices::createSchema(Session::reference session,
1938
1926
const message::Schema &schema)
1940
1928
ReplicationServices &replication_services= ReplicationServices::singleton();
1941
1929
if (! replication_services.isActive())
1944
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1932
message::Transaction *transaction= getActiveTransactionMessage(session);
1945
1933
message::Statement *statement= transaction->add_statement();
1947
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1935
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1950
1938
* Construct the specialized CreateSchemaStatement message and attach
1954
1942
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1955
1943
*new_schema_message= schema;
1957
finalizeStatementMessage(*statement, in_session);
1945
finalizeStatementMessage(*statement, session);
1959
finalizeTransactionMessage(*transaction, in_session);
1947
finalizeTransactionMessage(*transaction, session);
1961
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1949
(void) replication_services.pushTransactionMessage(session, *transaction);
1963
cleanupTransactionMessage(transaction, in_session);
1951
cleanupTransactionMessage(transaction, session);
1967
void TransactionServices::dropSchema(Session *in_session, identifier::Schema::const_reference identifier)
1955
void TransactionServices::dropSchema(Session::reference session,
1956
identifier::Schema::const_reference identifier)
1969
1958
ReplicationServices &replication_services= ReplicationServices::singleton();
1970
1959
if (! replication_services.isActive())
1973
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1962
message::Transaction *transaction= getActiveTransactionMessage(session);
1974
1963
message::Statement *statement= transaction->add_statement();
1976
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1965
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1979
1968
* Construct the specialized DropSchemaStatement message and attach
1984
1973
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1986
finalizeStatementMessage(*statement, in_session);
1975
finalizeStatementMessage(*statement, session);
1988
finalizeTransactionMessage(*transaction, in_session);
1977
finalizeTransactionMessage(*transaction, session);
1990
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1979
(void) replication_services.pushTransactionMessage(session, *transaction);
1992
cleanupTransactionMessage(transaction, in_session);
1981
cleanupTransactionMessage(transaction, session);
1995
void TransactionServices::alterSchema(Session *in_session,
1984
void TransactionServices::alterSchema(Session::reference session,
1996
1985
const message::schema::shared_ptr &old_schema,
1997
1986
const message::Schema &new_schema)
2000
1989
if (! replication_services.isActive())
2003
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1992
message::Transaction *transaction= getActiveTransactionMessage(session);
2004
1993
message::Statement *statement= transaction->add_statement();
2006
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, in_session);
1995
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2009
1998
* Construct the specialized AlterSchemaStatement message and attach
2017
2006
*before= *old_schema;
2018
2007
*after= new_schema;
2020
finalizeStatementMessage(*statement, in_session);
2009
finalizeStatementMessage(*statement, session);
2022
finalizeTransactionMessage(*transaction, in_session);
2011
finalizeTransactionMessage(*transaction, session);
2024
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2013
(void) replication_services.pushTransactionMessage(session, *transaction);
2026
cleanupTransactionMessage(transaction, in_session);
2015
cleanupTransactionMessage(transaction, session);
2029
void TransactionServices::dropTable(Session *in_session,
2018
void TransactionServices::dropTable(Session::reference session,
2030
2019
const identifier::Table &table,
2031
2020
bool if_exists)
2034
2023
if (! replication_services.isActive())
2037
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2026
message::Transaction *transaction= getActiveTransactionMessage(session);
2038
2027
message::Statement *statement= transaction->add_statement();
2040
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2029
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2043
2032
* Construct the specialized DropTableStatement message and attach
2052
2041
table_metadata->set_schema_name(table.getSchemaName());
2053
2042
table_metadata->set_table_name(table.getTableName());
2055
finalizeStatementMessage(*statement, in_session);
2044
finalizeStatementMessage(*statement, session);
2057
finalizeTransactionMessage(*transaction, in_session);
2046
finalizeTransactionMessage(*transaction, session);
2059
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2048
(void) replication_services.pushTransactionMessage(session, *transaction);
2061
cleanupTransactionMessage(transaction, in_session);
2050
cleanupTransactionMessage(transaction, session);
2064
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2053
void TransactionServices::truncateTable(Session::reference session,
2066
2056
ReplicationServices &replication_services= ReplicationServices::singleton();
2067
2057
if (! replication_services.isActive())
2070
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2060
message::Transaction *transaction= getActiveTransactionMessage(session);
2071
2061
message::Statement *statement= transaction->add_statement();
2073
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2063
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2076
2066
* Construct the specialized TruncateTableStatement message and attach
2080
2070
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2082
2072
string schema_name;
2083
(void) in_table->getShare()->getSchemaName(schema_name);
2073
(void) table.getShare()->getSchemaName(schema_name);
2084
2074
string table_name;
2085
(void) in_table->getShare()->getTableName(table_name);
2075
(void) table.getShare()->getTableName(table_name);
2087
2077
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2088
2078
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2090
finalizeStatementMessage(*statement, in_session);
2080
finalizeStatementMessage(*statement, session);
2092
finalizeTransactionMessage(*transaction, in_session);
2082
finalizeTransactionMessage(*transaction, session);
2094
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2084
(void) replication_services.pushTransactionMessage(session, *transaction);
2096
cleanupTransactionMessage(transaction, in_session);
2086
cleanupTransactionMessage(transaction, session);
2099
void TransactionServices::rawStatement(Session *in_session, const string &query)
2089
void TransactionServices::rawStatement(Session::reference session,
2090
const string &query)
2101
2092
ReplicationServices &replication_services= ReplicationServices::singleton();
2102
2093
if (! replication_services.isActive())
2105
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
message::Transaction *transaction= getActiveTransactionMessage(session);
2106
2097
message::Statement *statement= transaction->add_statement();
2108
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2099
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2109
2100
statement->set_sql(query);
2110
finalizeStatementMessage(*statement, in_session);
2101
finalizeStatementMessage(*statement, session);
2112
finalizeTransactionMessage(*transaction, in_session);
2103
finalizeTransactionMessage(*transaction, session);
2114
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2105
(void) replication_services.pushTransactionMessage(session, *transaction);
2116
cleanupTransactionMessage(transaction, in_session);
2107
cleanupTransactionMessage(transaction, session);
2119
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2110
int TransactionServices::sendEvent(Session::reference session,
2111
const message::Event &event)
2121
2113
ReplicationServices &replication_services= ReplicationServices::singleton();
2122
2114
if (! replication_services.isActive())
2135
2127
trx_event->CopyFrom(event);
2137
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2129
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2139
2131
delete transaction;
2141
2133
return static_cast<int>(result);
2144
bool TransactionServices::sendStartupEvent(Session *session)
2136
bool TransactionServices::sendStartupEvent(Session::reference session)
2146
2138
message::Event event;
2147
2139
event.set_type(message::Event::STARTUP);