47
47
* plugins can understand.
51
#include <drizzled/current_session.h>
52
#include <drizzled/my_hash.h>
53
#include <drizzled/error.h>
54
#include <drizzled/gettext.h>
55
#include <drizzled/probes.h>
56
#include <drizzled/sql_parse.h>
57
#include <drizzled/session.h>
58
#include <drizzled/sql_base.h>
59
#include <drizzled/replication_services.h>
60
#include <drizzled/transaction_services.h>
61
#include <drizzled/transaction_context.h>
62
#include <drizzled/message/transaction.pb.h>
63
#include <drizzled/message/statement_transform.h>
64
#include <drizzled/resource_context.h>
65
#include <drizzled/lock.h>
66
#include <drizzled/item/int.h>
67
#include <drizzled/item/empty_string.h>
68
#include <drizzled/field/epoch.h>
69
#include <drizzled/plugin/client.h>
70
#include <drizzled/plugin/monitored_in_transaction.h>
71
#include <drizzled/plugin/transactional_storage_engine.h>
72
#include <drizzled/plugin/xa_resource_manager.h>
73
#include <drizzled/plugin/xa_storage_engine.h>
74
#include <drizzled/internal/my_sys.h>
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/epoch.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
77
76
#include <algorithm>
317
void TransactionServices::registerResourceForStatement(Session::reference session,
316
void TransactionServices::registerResourceForStatement(Session *session,
318
317
plugin::MonitoredInTransaction *monitored,
319
318
plugin::TransactionalStorageEngine *engine)
321
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
320
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
324
323
* Now we automatically register this resource manager for the
330
329
registerResourceForTransaction(session, monitored, engine);
333
TransactionContext *trans= &session.transaction.stmt;
334
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
332
TransactionContext *trans= &session->transaction.stmt;
333
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
336
335
if (resource_context->isStarted())
337
336
return; /* already registered, return */
346
345
trans->no_2pc|= true;
349
void TransactionServices::registerResourceForStatement(Session::reference session,
348
void TransactionServices::registerResourceForStatement(Session *session,
350
349
plugin::MonitoredInTransaction *monitored,
351
350
plugin::TransactionalStorageEngine *engine,
352
351
plugin::XaResourceManager *resource_manager)
354
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
353
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
357
356
* Now we automatically register this resource manager for the
363
362
registerResourceForTransaction(session, monitored, engine, resource_manager);
366
TransactionContext *trans= &session.transaction.stmt;
367
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
365
TransactionContext *trans= &session->transaction.stmt;
366
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
369
368
if (resource_context->isStarted())
370
369
return; /* already registered, return */
380
379
trans->no_2pc|= false;
383
void TransactionServices::registerResourceForTransaction(Session::reference session,
382
void TransactionServices::registerResourceForTransaction(Session *session,
384
383
plugin::MonitoredInTransaction *monitored,
385
384
plugin::TransactionalStorageEngine *engine)
387
TransactionContext *trans= &session.transaction.all;
388
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
386
TransactionContext *trans= &session->transaction.all;
387
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
390
389
if (resource_context->isStarted())
391
390
return; /* already registered, return */
393
session.server_status|= SERVER_STATUS_IN_TRANS;
392
session->server_status|= SERVER_STATUS_IN_TRANS;
395
394
trans->registerResource(resource_context);
401
400
resource_context->setTransactionalStorageEngine(engine);
402
401
trans->no_2pc|= true;
404
if (session.transaction.xid_state.xid.is_null())
405
session.transaction.xid_state.xid.set(session.getQueryId());
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
407
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
if (! session.getResourceContext(monitored, 0)->isStarted())
407
if (! session->getResourceContext(monitored, 0)->isStarted())
409
408
registerResourceForStatement(session, monitored, engine);
412
void TransactionServices::registerResourceForTransaction(Session::reference session,
411
void TransactionServices::registerResourceForTransaction(Session *session,
413
412
plugin::MonitoredInTransaction *monitored,
414
413
plugin::TransactionalStorageEngine *engine,
415
414
plugin::XaResourceManager *resource_manager)
417
TransactionContext *trans= &session.transaction.all;
418
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
420
419
if (resource_context->isStarted())
421
420
return; /* already registered, return */
423
session.server_status|= SERVER_STATUS_IN_TRANS;
422
session->server_status|= SERVER_STATUS_IN_TRANS;
425
424
trans->registerResource(resource_context);
431
430
resource_context->setTransactionalStorageEngine(engine);
432
431
trans->no_2pc|= true;
434
if (session.transaction.xid_state.xid.is_null())
435
session.transaction.xid_state.xid.set(session.getQueryId());
433
if (session->transaction.xid_state.xid.is_null())
434
session->transaction.xid_state.xid.set(session->getQueryId());
437
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
436
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
439
438
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
if (! session.getResourceContext(monitored, 0)->isStarted())
439
if (! session->getResourceContext(monitored, 0)->isStarted())
441
440
registerResourceForStatement(session, monitored, engine, resource_manager);
454
453
my_session->setXaId(xa_id);
457
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
459
if (session.getXaId() == 0)
458
if (session->getXaId() == 0)
461
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
464
return session.getXaId();
463
return session->getXaId();
467
int TransactionServices::commitTransaction(Session::reference session,
468
bool normal_transaction)
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
470
482
int error= 0, cookie= 0;
472
484
'all' means that this is either an explicit commit issued by
473
485
user, or an implicit commit issued by a DDL.
475
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
476
488
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
478
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
481
493
We must not commit the normal transaction if a statement
483
495
flags will not get propagated to its normal transaction's
486
assert(session.transaction.stmt.getResourceContexts().empty() ||
487
trans == &session.transaction.stmt);
498
assert(session->transaction.stmt.getResourceContexts().empty() ||
499
trans == &session->transaction.stmt);
489
501
if (resource_contexts.empty() == false)
491
if (is_real_trans && session.wait_if_global_read_lock(false, false))
503
if (is_real_trans && session->wait_if_global_read_lock(false, false))
493
505
rollbackTransaction(session, normal_transaction);
520
532
if (resource->participatesInXaTransaction())
522
if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
534
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
524
536
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
529
session.status_var.ha_prepare_count++;
541
session->status_var.ha_prepare_count++;
558
570
This function does not care about global read lock. A caller should.
560
int TransactionServices::commitPhaseOne(Session::reference session,
561
bool normal_transaction)
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
564
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
575
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
565
576
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
567
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
568
bool all= normal_transaction;
570
/* If we're in autocommit then we have a real transaction to commit
571
(except if it's BEGIN)
573
if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
578
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
576
580
if (resource_contexts.empty() == false)
587
591
if (resource->participatesInXaTransaction())
589
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
591
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
594
598
else if (normal_transaction)
596
session.status_var.ha_commit_count++;
600
session->status_var.ha_commit_count++;
599
603
else if (resource->participatesInSqlTransaction())
601
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
603
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
606
610
else if (normal_transaction)
608
session.status_var.ha_commit_count++;
612
session->status_var.ha_commit_count++;
611
615
resource_context->reset(); /* keep it conveniently zero-filled */
614
618
if (is_real_trans)
615
session.transaction.xid_state.xid.null();
619
session->transaction.xid_state.xid.null();
617
621
if (normal_transaction)
619
session.variables.tx_isolation= session.session_tx_isolation;
620
session.transaction.cleanup();
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
627
int TransactionServices::rollbackTransaction(Session::reference session,
628
bool normal_transaction)
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
631
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
632
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
634
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
638
640
We must not rollback the normal transaction if a statement
639
641
transaction is pending.
641
assert(session.transaction.stmt.getResourceContexts().empty() ||
642
trans == &session.transaction.stmt);
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
644
646
if (resource_contexts.empty() == false)
655
657
if (resource->participatesInXaTransaction())
657
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
659
if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
659
661
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
664
else if (normal_transaction)
664
session.status_var.ha_rollback_count++;
666
session->status_var.ha_rollback_count++;
667
669
else if (resource->participatesInSqlTransaction())
669
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
671
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
671
673
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
676
else if (normal_transaction)
676
session.status_var.ha_rollback_count++;
678
session->status_var.ha_rollback_count++;
679
681
resource_context->reset(); /* keep it conveniently zero-filled */
686
688
* a rollback statement with the corresponding transaction ID
691
if (normal_transaction)
690
692
rollbackTransactionMessage(session);
692
694
rollbackStatementMessage(session);
694
696
if (is_real_trans)
695
session.transaction.xid_state.xid.null();
697
session->transaction.xid_state.xid.null();
696
698
if (normal_transaction)
698
session.variables.tx_isolation=session.session_tx_isolation;
699
session.transaction.cleanup();
700
session->variables.tx_isolation=session->session_tx_isolation;
701
session->transaction.cleanup();
702
704
if (normal_transaction)
703
session.transaction_rollback_request= false;
705
session->transaction_rollback_request= false;
706
708
* If a non-transactional table was updated, warn the user
708
710
if (is_real_trans &&
709
session.transaction.all.hasModifiedNonTransData() &&
710
session.getKilled() != Session::KILL_CONNECTION)
711
session->transaction.all.hasModifiedNonTransData() &&
712
session->getKilled() != Session::KILL_CONNECTION)
712
push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
714
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
715
ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
716
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
720
int TransactionServices::autocommitOrRollback(Session::reference session,
723
This is used to commit or rollback a single statement depending on
727
Note that if the autocommit is on, then the following call inside
728
InnoDB will commit or rollback the whole transaction (= the statement). The
729
autocommit mechanism built into InnoDB is based on counting locks, but if
730
the user has used LOCK TABLES then that mechanism does not know to do the
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
723
735
/* One GPB Statement message per SQL statement */
724
message::Statement *statement= session.getStatementMessage();
736
message::Statement *statement= session->getStatementMessage();
725
737
if ((statement != NULL) && (! error))
726
738
finalizeStatementMessage(*statement, session);
728
if (session.transaction.stmt.getResourceContexts().empty() == false)
740
if (session->transaction.stmt.getResourceContexts().empty() == false)
730
TransactionContext *trans = &session.transaction.stmt;
742
TransactionContext *trans = &session->transaction.stmt;
731
743
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
732
744
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
733
745
it != resource_contexts.end();
748
760
(void) rollbackTransaction(session, false);
749
if (session.transaction_rollback_request)
761
if (session->transaction_rollback_request)
751
762
(void) rollbackTransaction(session, true);
752
session.server_status&= ~SERVER_STATUS_IN_TRANS;
756
session.variables.tx_isolation= session.session_tx_isolation;
765
session->variables.tx_isolation= session->session_tx_isolation;
772
int TransactionServices::rollbackToSavepoint(Session::reference session,
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
776
TransactionContext *trans= &session.transaction.all;
783
TransactionContext *trans= &session->transaction.all;
777
784
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
778
785
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
794
801
if (resource->participatesInSqlTransaction())
796
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
803
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
798
805
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
803
session.status_var.ha_savepoint_rollback_count++;
810
session->status_var.ha_savepoint_rollback_count++;
806
813
trans->no_2pc|= not resource->participatesInXaTransaction();
851
858
if (resource->participatesInSqlTransaction())
853
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
860
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
855
862
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
860
session.status_var.ha_rollback_count++;
867
session->status_var.ha_rollback_count++;
863
870
resource_context->reset(); /* keep it conveniently zero-filled */
880
887
uint32_t num_statements = savepoint_transaction_copy->statement_size();
881
888
if (num_statements == 0)
883
session.setStatementMessage(NULL);
890
session->setStatementMessage(NULL);
887
session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
894
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
889
session.setTransactionMessage(savepoint_transaction_copy);
896
session->setTransactionMessage(savepoint_transaction_copy);
899
906
section "4.33.4 SQL-statements and transaction states",
900
907
NamedSavepoint is *not* transaction-initiating SQL-statement
902
int TransactionServices::setSavepoint(Session::reference session,
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
906
TransactionContext *trans= &session.transaction.all;
912
TransactionContext *trans= &session->transaction.all;
907
913
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
909
915
if (resource_contexts.empty() == false)
920
926
if (resource->participatesInSqlTransaction())
922
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
928
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
924
930
my_error(ER_GET_ERRNO, MYF(0), err);
929
session.status_var.ha_savepoint_count++;
935
session->status_var.ha_savepoint_count++;
986
991
return replication_services.isActive();
989
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
990
bool should_inc_trx_id)
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
992
message::Transaction *transaction= session.getTransactionMessage();
996
message::Transaction *transaction= in_session->getTransactionMessage();
994
998
if (unlikely(transaction == NULL))
999
1003
* deleting transaction message when done with it.
1001
1005
transaction= new (nothrow) message::Transaction();
1002
initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
session.setTransactionMessage(transaction);
1006
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
in_session->setTransactionMessage(transaction);
1004
1008
return transaction;
1007
1011
return transaction;
1010
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
Session::reference session,
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
Session *in_session,
1012
1016
bool should_inc_trx_id)
1014
message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
trx->set_server_id(session.getServerId());
1018
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
trx->set_server_id(in_session->getServerId());
1017
1021
if (should_inc_trx_id)
1019
trx->set_transaction_id(getCurrentTransactionId(session));
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1025
1029
trx->set_transaction_id(0);
1028
trx->set_start_timestamp(session.getCurrentTimestamp());
1032
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1030
1034
/* segment info may get set elsewhere as needed */
1031
transaction.set_segment_id(1);
1032
transaction.set_end_segment(true);
1035
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
Session::const_reference session)
1038
message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
trx->set_end_timestamp(session.getCurrentTimestamp());
1042
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
Session::reference session)
1046
session.setStatementMessage(NULL);
1047
session.setTransactionMessage(NULL);
1051
int TransactionServices::commitTransactionMessage(Session::reference session)
1035
in_transaction.set_segment_id(1);
1036
in_transaction.set_end_segment(true);
1039
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1040
Session *in_session)
1042
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1043
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1046
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1047
Session *in_session)
1049
delete in_transaction;
1050
in_session->setStatementMessage(NULL);
1051
in_session->setTransactionMessage(NULL);
1052
in_session->setXaId(0);
1055
int TransactionServices::commitTransactionMessage(Session *in_session)
1053
1057
ReplicationServices &replication_services= ReplicationServices::singleton();
1054
1058
if (! replication_services.isActive())
1058
1062
* If no Transaction message was ever created, then no data modification
1059
1063
* occurred inside the transaction, so nothing to do.
1061
if (session.getTransactionMessage() == NULL)
1065
if (in_session->getTransactionMessage() == NULL)
1064
1068
/* If there is an active statement message, finalize it. */
1065
message::Statement *statement= session.getStatementMessage();
1069
message::Statement *statement= in_session->getStatementMessage();
1067
1071
if (statement != NULL)
1069
finalizeStatementMessage(*statement, session);
1073
finalizeStatementMessage(*statement, in_session);
1072
message::Transaction* transaction= getActiveTransactionMessage(session);
1076
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1075
1079
* It is possible that we could have a Transaction without any Statements
1080
1084
if (transaction->statement_size() == 0)
1082
cleanupTransactionMessage(transaction, session);
1086
cleanupTransactionMessage(transaction, in_session);
1086
finalizeTransactionMessage(*transaction, session);
1090
finalizeTransactionMessage(*transaction, in_session);
1088
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1092
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1090
cleanupTransactionMessage(transaction, session);
1094
cleanupTransactionMessage(transaction, in_session);
1092
1096
return static_cast<int>(result);
1095
1099
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
message::Statement::Type type,
1097
Session::const_reference session)
1100
message::Statement::Type in_type,
1101
Session *in_session)
1099
statement.set_type(type);
1100
statement.set_start_timestamp(session.getCurrentTimestamp());
1103
statement.set_type(in_type);
1104
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1102
if (session.variables.replicate_query)
1103
statement.set_sql(session.getQueryString()->c_str());
1106
if (in_session->variables.replicate_query)
1107
statement.set_sql(in_session->getQueryString()->c_str());
1106
1110
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
Session::reference session)
1111
Session *in_session)
1109
statement.set_end_timestamp(session.getCurrentTimestamp());
1110
session.setStatementMessage(NULL);
1113
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1114
in_session->setStatementMessage(NULL);
1113
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1117
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1115
1119
ReplicationServices &replication_services= ReplicationServices::singleton();
1116
1120
if (! replication_services.isActive())
1119
message::Transaction *transaction= getActiveTransactionMessage(session);
1123
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1122
1126
* OK, so there are two situations that we need to deal with here:
1146
1150
* attach it to the transaction, and push it to replicators.
1148
1152
transaction->Clear();
1149
initTransactionMessage(*transaction, session, false);
1153
initTransactionMessage(*transaction, in_session, false);
1151
1155
/* Set the transaction ID to match the previous messages */
1152
1156
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1156
1160
message::Statement *statement= transaction->add_statement();
1158
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
finalizeStatementMessage(*statement, session);
1162
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1163
finalizeStatementMessage(*statement, in_session);
1161
finalizeTransactionMessage(*transaction, session);
1165
finalizeTransactionMessage(*transaction, in_session);
1163
(void) replication_services.pushTransactionMessage(session, *transaction);
1167
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1166
cleanupTransactionMessage(transaction, session);
1169
cleanupTransactionMessage(transaction, in_session);
1169
void TransactionServices::rollbackStatementMessage(Session::reference session)
1172
void TransactionServices::rollbackStatementMessage(Session *in_session)
1171
1174
ReplicationServices &replication_services= ReplicationServices::singleton();
1172
1175
if (! replication_services.isActive())
1175
message::Statement *current_statement= session.getStatementMessage();
1178
message::Statement *current_statement= in_session->getStatementMessage();
1177
1180
/* If we never added a Statement message, nothing to undo. */
1178
1181
if (current_statement == NULL)
1211
1214
* Remove the Statement message we've been working with (same as
1212
1215
* current_statement).
1214
message::Transaction *transaction= getActiveTransactionMessage(session);
1217
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1215
1218
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
1219
statements_in_txn= transaction->mutable_statement();
1217
1220
statements_in_txn->RemoveLast();
1218
session.setStatementMessage(NULL);
1221
in_session->setStatementMessage(NULL);
1221
1224
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1227
1230
current_statement= transaction->add_statement();
1228
1231
initStatementMessage(*current_statement,
1229
1232
message::Statement::ROLLBACK_STATEMENT,
1231
finalizeStatementMessage(*current_statement, session);
1234
finalizeStatementMessage(*current_statement, in_session);
1235
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1238
message::Transaction *TransactionServices::segmentTransactionMessage(Session *in_session,
1236
1239
message::Transaction *transaction)
1238
1241
uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
1242
uint32_t seg_id= transaction->segment_id();
1241
1244
transaction->set_end_segment(false);
1242
commitTransactionMessage(session);
1243
transaction= getActiveTransactionMessage(session, false);
1245
commitTransactionMessage(in_session);
1246
transaction= getActiveTransactionMessage(in_session, false);
1245
1248
/* Set the transaction ID to match the previous messages */
1246
1249
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1250
1253
return transaction;
1253
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1256
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1255
1258
uint32_t *next_segment_id)
1257
message::Statement *statement= session.getStatementMessage();
1260
message::Statement *statement= in_session->getStatementMessage();
1258
1261
message::Transaction *transaction= NULL;
1267
1270
if (statement == NULL)
1269
transaction= getActiveTransactionMessage(session);
1272
transaction= getActiveTransactionMessage(in_session);
1271
1274
if (static_cast<size_t>(transaction->ByteSize()) >=
1272
transaction_message_threshold)
1275
in_session->variables.transaction_message_threshold)
1274
transaction= segmentTransactionMessage(session, transaction);
1277
transaction= segmentTransactionMessage(in_session, transaction);
1277
1280
statement= transaction->add_statement();
1278
setInsertHeader(*statement, session, table);
1279
session.setStatementMessage(statement);
1281
setInsertHeader(*statement, in_session, in_table);
1282
in_session->setStatementMessage(statement);
1283
transaction= getActiveTransactionMessage(session);
1286
transaction= getActiveTransactionMessage(in_session);
1286
1289
* If we've passed our threshold for the statement size (possible for
1307
1310
* statement and transaction. This will also set the Transaction
1308
1311
* and Statement objects in Session to NULL.
1310
commitTransactionMessage(session);
1313
commitTransactionMessage(in_session);
1313
1316
* Statement and Transaction should now be NULL, so new ones will get
1314
1317
* created. We reuse the transaction id since we are segmenting
1315
1318
* one transaction.
1317
transaction= getActiveTransactionMessage(session, false);
1320
transaction= getActiveTransactionMessage(in_session, false);
1318
1321
assert(transaction != NULL);
1320
1323
statement= transaction->add_statement();
1321
setInsertHeader(*statement, session, table);
1322
session.setStatementMessage(statement);
1324
setInsertHeader(*statement, in_session, in_table);
1325
in_session->setStatementMessage(statement);
1324
1327
/* Set the transaction ID to match the previous messages */
1325
1328
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1343
1346
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
Session::const_reference session,
1347
Session *in_session,
1347
initStatementMessage(statement, message::Statement::INSERT, session);
1350
initStatementMessage(statement, message::Statement::INSERT, in_session);
1350
1353
* Now we construct the specialized InsertHeader message inside
1355
1358
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1357
1360
string schema_name;
1358
(void) table.getShare()->getSchemaName(schema_name);
1361
(void) in_table->getShare()->getSchemaName(schema_name);
1359
1362
string table_name;
1360
(void) table.getShare()->getTableName(table_name);
1363
(void) in_table->getShare()->getTableName(table_name);
1362
1365
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1363
1366
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1365
1368
Field *current_field;
1366
Field **table_fields= table.getFields();
1369
Field **table_fields= in_table->getFields();
1368
1371
message::FieldMetadata *field_metadata;
1370
1373
/* We will read all the table's fields... */
1374
in_table->setReadSet();
1373
1376
while ((current_field= *table_fields++) != NULL)
1381
bool TransactionServices::insertRecord(Session::reference session,
1384
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1384
1386
ReplicationServices &replication_services= ReplicationServices::singleton();
1385
1387
if (! replication_services.isActive())
1388
if (not table.getShare()->replicate())
1392
1390
* We do this check here because we don't want to even create a
1393
1391
* statement if there isn't a primary key on the table...
1397
1395
* Multi-column primary keys are handled how exactly?
1399
if (not table.getShare()->hasPrimaryKey())
1397
if (not in_table->getShare()->hasPrimaryKey())
1401
1399
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1405
1403
uint32_t next_segment_id= 1;
1406
message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1404
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1408
1406
message::InsertData *data= statement.mutable_insert_data();
1409
1407
data->set_segment_id(next_segment_id);
1411
1409
message::InsertRecord *record= data->add_record();
1413
1411
Field *current_field;
1414
Field **table_fields= table.getFields();
1412
Field **table_fields= in_table->getFields();
1416
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1414
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1417
1415
string_value->set_charset(system_charset_info);
1419
1417
/* We will read all the table's fields... */
1418
in_table->setReadSet();
1422
1420
while ((current_field= *table_fields++) != NULL)
1440
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1438
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1442
1440
const unsigned char *old_record,
1443
1441
const unsigned char *new_record,
1444
1442
uint32_t *next_segment_id)
1446
message::Statement *statement= session.getStatementMessage();
1444
message::Statement *statement= in_session->getStatementMessage();
1447
1445
message::Transaction *transaction= NULL;
1456
1454
if (statement == NULL)
1458
transaction= getActiveTransactionMessage(session);
1456
transaction= getActiveTransactionMessage(in_session);
1460
1458
if (static_cast<size_t>(transaction->ByteSize()) >=
1461
transaction_message_threshold)
1459
in_session->variables.transaction_message_threshold)
1463
transaction= segmentTransactionMessage(session, transaction);
1461
transaction= segmentTransactionMessage(in_session, transaction);
1466
1464
statement= transaction->add_statement();
1467
setUpdateHeader(*statement, session, table, old_record, new_record);
1468
session.setStatementMessage(statement);
1465
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1466
in_session->setStatementMessage(statement);
1472
transaction= getActiveTransactionMessage(session);
1470
transaction= getActiveTransactionMessage(in_session);
1475
1473
* If we've passed our threshold for the statement size (possible for
1496
1494
* statement and transaction. This will also set the Transaction
1497
1495
* and Statement objects in Session to NULL.
1499
commitTransactionMessage(session);
1497
commitTransactionMessage(in_session);
1502
1500
* Statement and Transaction should now be NULL, so new ones will get
1503
1501
* created. We reuse the transaction id since we are segmenting
1504
1502
* one transaction.
1506
transaction= getActiveTransactionMessage(session, false);
1504
transaction= getActiveTransactionMessage(in_session, false);
1507
1505
assert(transaction != NULL);
1509
1507
statement= transaction->add_statement();
1510
setUpdateHeader(*statement, session, table, old_record, new_record);
1511
session.setStatementMessage(statement);
1508
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1509
in_session->setStatementMessage(statement);
1513
1511
/* Set the transaction ID to match the previous messages */
1514
1512
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1532
1530
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
Session::const_reference session,
1531
Session *in_session,
1535
1533
const unsigned char *old_record,
1536
1534
const unsigned char *new_record)
1538
initStatementMessage(statement, message::Statement::UPDATE, session);
1536
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1541
1539
* Now we construct the specialized UpdateHeader message inside
1546
1544
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1548
1546
string schema_name;
1549
(void) table.getShare()->getSchemaName(schema_name);
1547
(void) in_table->getShare()->getSchemaName(schema_name);
1550
1548
string table_name;
1551
(void) table.getShare()->getTableName(table_name);
1549
(void) in_table->getShare()->getTableName(table_name);
1553
1551
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1554
1552
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1556
1554
Field *current_field;
1557
Field **table_fields= table.getFields();
1555
Field **table_fields= in_table->getFields();
1559
1557
message::FieldMetadata *field_metadata;
1561
1559
/* We will read all the table's fields... */
1560
in_table->setReadSet();
1564
1562
while ((current_field= *table_fields++) != NULL)
1567
1565
* We add the "key field metadata" -- i.e. the fields which is
1568
1566
* the primary key for the table.
1570
if (table.getShare()->fieldInPrimaryKey(current_field))
1568
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1572
1570
field_metadata= header->add_key_field_metadata();
1573
1571
field_metadata->set_name(current_field->field_name);
1574
1572
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1577
if (isFieldUpdated(current_field, table, old_record, new_record))
1575
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1579
1577
/* Field is changed from old to new */
1580
1578
field_metadata= header->add_set_field_metadata();
1593
1590
if (! replication_services.isActive())
1596
if (not table.getShare()->replicate())
1599
1593
uint32_t next_segment_id= 1;
1600
message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1594
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1602
1596
message::UpdateData *data= statement.mutable_update_data();
1603
1597
data->set_segment_id(next_segment_id);
1605
1599
message::UpdateRecord *record= data->add_record();
1607
1601
Field *current_field;
1608
Field **table_fields= table.getFields();
1609
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1602
Field **table_fields= in_table->getFields();
1603
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1610
1604
string_value->set_charset(system_charset_info);
1612
1606
while ((current_field= *table_fields++) != NULL)
1623
1617
* We will generate two UpdateRecord messages with different set_value byte arrays.
1625
if (isFieldUpdated(current_field, table, old_record, new_record))
1619
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1627
1621
/* Store the original "read bit" for this field */
1628
1622
bool is_read_set= current_field->isReadSet();
1630
1624
/* We need to mark that we will "read" this field... */
1631
table.setReadSet(current_field->position());
1625
in_table->setReadSet(current_field->position());
1633
1627
/* Read the string value of this field's contents */
1634
1628
string_value= current_field->val_str_internal(string_value);
1684
1678
* we do this crazy pointer fiddling to figure out if the current field
1685
1679
* has been updated in the supplied record raw byte pointers.
1687
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1681
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1682
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1690
1684
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1715
1709
return isUpdated;
1718
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1712
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1720
1714
uint32_t *next_segment_id)
1722
message::Statement *statement= session.getStatementMessage();
1716
message::Statement *statement= in_session->getStatementMessage();
1723
1717
message::Transaction *transaction= NULL;
1732
1726
if (statement == NULL)
1734
transaction= getActiveTransactionMessage(session);
1728
transaction= getActiveTransactionMessage(in_session);
1736
1730
if (static_cast<size_t>(transaction->ByteSize()) >=
1737
transaction_message_threshold)
1731
in_session->variables.transaction_message_threshold)
1739
transaction= segmentTransactionMessage(session, transaction);
1733
transaction= segmentTransactionMessage(in_session, transaction);
1742
1736
statement= transaction->add_statement();
1743
setDeleteHeader(*statement, session, table);
1744
session.setStatementMessage(statement);
1737
setDeleteHeader(*statement, in_session, in_table);
1738
in_session->setStatementMessage(statement);
1748
transaction= getActiveTransactionMessage(session);
1742
transaction= getActiveTransactionMessage(in_session);
1751
1745
* If we've passed our threshold for the statement size (possible for
1772
1766
* statement and transaction. This will also set the Transaction
1773
1767
* and Statement objects in Session to NULL.
1775
commitTransactionMessage(session);
1769
commitTransactionMessage(in_session);
1778
1772
* Statement and Transaction should now be NULL, so new ones will get
1779
1773
* created. We reuse the transaction id since we are segmenting
1780
1774
* one transaction.
1782
transaction= getActiveTransactionMessage(session, false);
1776
transaction= getActiveTransactionMessage(in_session, false);
1783
1777
assert(transaction != NULL);
1785
1779
statement= transaction->add_statement();
1786
setDeleteHeader(*statement, session, table);
1787
session.setStatementMessage(statement);
1780
setDeleteHeader(*statement, in_session, in_table);
1781
in_session->setStatementMessage(statement);
1789
1783
/* Set the transaction ID to match the previous messages */
1790
1784
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1808
1802
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
Session::const_reference session,
1803
Session *in_session,
1812
initStatementMessage(statement, message::Statement::DELETE, session);
1806
initStatementMessage(statement, message::Statement::DELETE, in_session);
1815
1809
* Now we construct the specialized DeleteHeader message inside
1819
1813
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1821
1815
string schema_name;
1822
(void) table.getShare()->getSchemaName(schema_name);
1816
(void) in_table->getShare()->getSchemaName(schema_name);
1823
1817
string table_name;
1824
(void) table.getShare()->getTableName(table_name);
1818
(void) in_table->getShare()->getTableName(table_name);
1826
1820
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1827
1821
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1829
1823
Field *current_field;
1830
Field **table_fields= table.getFields();
1824
Field **table_fields= in_table->getFields();
1832
1826
message::FieldMetadata *field_metadata;
1850
void TransactionServices::deleteRecord(Session::reference session,
1852
bool use_update_record)
1844
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1854
1846
ReplicationServices &replication_services= ReplicationServices::singleton();
1855
1847
if (! replication_services.isActive())
1858
if (not table.getShare()->replicate())
1861
1850
uint32_t next_segment_id= 1;
1862
message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1851
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1864
1853
message::DeleteData *data= statement.mutable_delete_data();
1865
1854
data->set_segment_id(next_segment_id);
1867
1856
message::DeleteRecord *record= data->add_record();
1869
1858
Field *current_field;
1870
Field **table_fields= table.getFields();
1871
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1859
Field **table_fields= in_table->getFields();
1860
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1872
1861
string_value->set_charset(system_charset_info);
1874
1863
while ((current_field= *table_fields++) != NULL)
1910
void TransactionServices::createTable(Session::reference session,
1899
void TransactionServices::createTable(Session *in_session,
1911
1900
const message::Table &table)
1913
1902
ReplicationServices &replication_services= ReplicationServices::singleton();
1914
1903
if (! replication_services.isActive())
1917
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1920
message::Transaction *transaction= getActiveTransactionMessage(session);
1906
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1921
1907
message::Statement *statement= transaction->add_statement();
1923
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1909
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1926
1912
* Construct the specialized CreateTableStatement message and attach
1930
1916
message::Table *new_table_message= create_table_statement->mutable_table();
1931
1917
*new_table_message= table;
1933
finalizeStatementMessage(*statement, session);
1919
finalizeStatementMessage(*statement, in_session);
1935
finalizeTransactionMessage(*transaction, session);
1921
finalizeTransactionMessage(*transaction, in_session);
1937
(void) replication_services.pushTransactionMessage(session, *transaction);
1923
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1939
cleanupTransactionMessage(transaction, session);
1925
cleanupTransactionMessage(transaction, in_session);
1943
void TransactionServices::createSchema(Session::reference session,
1929
void TransactionServices::createSchema(Session *in_session,
1944
1930
const message::Schema &schema)
1946
1932
ReplicationServices &replication_services= ReplicationServices::singleton();
1947
1933
if (! replication_services.isActive())
1950
if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1953
message::Transaction *transaction= getActiveTransactionMessage(session);
1936
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1954
1937
message::Statement *statement= transaction->add_statement();
1956
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1939
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1959
1942
* Construct the specialized CreateSchemaStatement message and attach
1963
1946
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
1947
*new_schema_message= schema;
1966
finalizeStatementMessage(*statement, session);
1949
finalizeStatementMessage(*statement, in_session);
1968
finalizeTransactionMessage(*transaction, session);
1951
finalizeTransactionMessage(*transaction, in_session);
1970
(void) replication_services.pushTransactionMessage(session, *transaction);
1953
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1972
cleanupTransactionMessage(transaction, session);
1955
cleanupTransactionMessage(transaction, in_session);
1976
void TransactionServices::dropSchema(Session::reference session,
1977
identifier::Schema::const_reference identifier,
1978
message::schema::const_reference schema)
1959
void TransactionServices::dropSchema(Session *in_session, identifier::Schema::const_reference identifier)
1980
1961
ReplicationServices &replication_services= ReplicationServices::singleton();
1981
1962
if (! replication_services.isActive())
1984
if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1987
message::Transaction *transaction= getActiveTransactionMessage(session);
1965
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1988
1966
message::Statement *statement= transaction->add_statement();
1990
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1968
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1993
1971
* Construct the specialized DropSchemaStatement message and attach
1998
1976
drop_schema_statement->set_schema_name(identifier.getSchemaName());
2000
finalizeStatementMessage(*statement, session);
1978
finalizeStatementMessage(*statement, in_session);
2002
finalizeTransactionMessage(*transaction, session);
1980
finalizeTransactionMessage(*transaction, in_session);
2004
(void) replication_services.pushTransactionMessage(session, *transaction);
1982
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2006
cleanupTransactionMessage(transaction, session);
1984
cleanupTransactionMessage(transaction, in_session);
2009
void TransactionServices::alterSchema(Session::reference session,
2010
const message::Schema &old_schema,
1987
void TransactionServices::alterSchema(Session *in_session,
1988
const message::schema::shared_ptr &old_schema,
2011
1989
const message::Schema &new_schema)
2013
1991
ReplicationServices &replication_services= ReplicationServices::singleton();
2014
1992
if (! replication_services.isActive())
2017
if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2020
message::Transaction *transaction= getActiveTransactionMessage(session);
1995
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2021
1996
message::Statement *statement= transaction->add_statement();
2023
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
1998
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, in_session);
2026
2001
* Construct the specialized AlterSchemaStatement message and attach
2031
2006
message::Schema *before= alter_schema_statement->mutable_before();
2032
2007
message::Schema *after= alter_schema_statement->mutable_after();
2034
*before= old_schema;
2009
*before= *old_schema;
2035
2010
*after= new_schema;
2037
finalizeStatementMessage(*statement, session);
2012
finalizeStatementMessage(*statement, in_session);
2039
finalizeTransactionMessage(*transaction, session);
2014
finalizeTransactionMessage(*transaction, in_session);
2041
(void) replication_services.pushTransactionMessage(session, *transaction);
2016
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2043
cleanupTransactionMessage(transaction, session);
2018
cleanupTransactionMessage(transaction, in_session);
2046
void TransactionServices::dropTable(Session::reference session,
2047
identifier::Table::const_reference identifier,
2048
message::table::const_reference table,
2021
void TransactionServices::dropTable(Session *in_session,
2022
const identifier::Table &table,
2049
2023
bool if_exists)
2051
2025
ReplicationServices &replication_services= ReplicationServices::singleton();
2052
2026
if (! replication_services.isActive())
2055
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2058
message::Transaction *transaction= getActiveTransactionMessage(session);
2029
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2059
2030
message::Statement *statement= transaction->add_statement();
2061
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2032
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2064
2035
* Construct the specialized DropTableStatement message and attach
2071
2042
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2073
table_metadata->set_schema_name(identifier.getSchemaName());
2074
table_metadata->set_table_name(identifier.getTableName());
2076
finalizeStatementMessage(*statement, session);
2078
finalizeTransactionMessage(*transaction, session);
2044
table_metadata->set_schema_name(table.getSchemaName());
2045
table_metadata->set_table_name(table.getTableName());
2047
finalizeStatementMessage(*statement, in_session);
2049
finalizeTransactionMessage(*transaction, in_session);
2080
(void) replication_services.pushTransactionMessage(session, *transaction);
2051
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2082
cleanupTransactionMessage(transaction, session);
2053
cleanupTransactionMessage(transaction, in_session);
2085
void TransactionServices::truncateTable(Session::reference session,
2056
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2088
2058
ReplicationServices &replication_services= ReplicationServices::singleton();
2089
2059
if (! replication_services.isActive())
2092
if (not table.getShare()->replicate())
2095
message::Transaction *transaction= getActiveTransactionMessage(session);
2062
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2096
2063
message::Statement *statement= transaction->add_statement();
2098
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2065
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2101
2068
* Construct the specialized TruncateTableStatement message and attach
2105
2072
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2107
2074
string schema_name;
2108
(void) table.getShare()->getSchemaName(schema_name);
2075
(void) in_table->getShare()->getSchemaName(schema_name);
2110
2076
string table_name;
2111
(void) table.getShare()->getTableName(table_name);
2077
(void) in_table->getShare()->getTableName(table_name);
2113
2079
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
2080
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2116
finalizeStatementMessage(*statement, session);
2082
finalizeStatementMessage(*statement, in_session);
2118
finalizeTransactionMessage(*transaction, session);
2084
finalizeTransactionMessage(*transaction, in_session);
2120
(void) replication_services.pushTransactionMessage(session, *transaction);
2086
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2122
cleanupTransactionMessage(transaction, session);
2088
cleanupTransactionMessage(transaction, in_session);
2125
void TransactionServices::rawStatement(Session::reference session,
2126
const string &query)
2091
void TransactionServices::rawStatement(Session *in_session, const string &query)
2128
2093
ReplicationServices &replication_services= ReplicationServices::singleton();
2129
2094
if (! replication_services.isActive())
2132
message::Transaction *transaction= getActiveTransactionMessage(session);
2097
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2133
2098
message::Statement *statement= transaction->add_statement();
2135
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2100
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2136
2101
statement->set_sql(query);
2137
finalizeStatementMessage(*statement, session);
2102
finalizeStatementMessage(*statement, in_session);
2139
finalizeTransactionMessage(*transaction, session);
2104
finalizeTransactionMessage(*transaction, in_session);
2141
(void) replication_services.pushTransactionMessage(session, *transaction);
2106
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2143
cleanupTransactionMessage(transaction, session);
2108
cleanupTransactionMessage(transaction, in_session);
2146
int TransactionServices::sendEvent(Session::reference session,
2147
const message::Event &event)
2111
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2149
2113
ReplicationServices &replication_services= ReplicationServices::singleton();
2150
2114
if (! replication_services.isActive())
2163
2127
trx_event->CopyFrom(event);
2165
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2129
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2167
2131
delete transaction;
2169
2133
return static_cast<int>(result);
2172
bool TransactionServices::sendStartupEvent(Session::reference session)
2136
bool TransactionServices::sendStartupEvent(Session *session)
2174
2138
message::Event event;
2175
2139
event.set_type(message::Event::STARTUP);