1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems, Inc.
5
* Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
* @file Transaction processing code
26
* The TransactionServices component takes internal events (for instance the start of a
27
* transaction, the changing of a record, or the rollback of a transaction)
28
* and constructs GPB Messages that are passed to the ReplicationServices
29
* component and used during replication.
31
* The reason for this functionality is to encapsulate all communication
32
* between the kernel and the replicator/applier plugins into GPB Messages.
33
* Instead of the plugin having to understand the (often fluidly changing)
34
* mechanics of the kernel, all the plugin needs to understand is the message
35
* format, and GPB messages provide a nice, clear, and versioned format for
38
* @see /drizzled/message/transaction.proto
42
* We really should store the raw bytes in the messages, not the
43
* String value of the Field. But, to do that, the
44
* statement_transform library needs first to be updated
45
* to include the transformation code to convert raw
46
* Drizzle-internal Field byte representation into something
47
* plugins can understand.
51
#include <drizzled/current_session.h>
52
#include <drizzled/my_hash.h>
53
#include <drizzled/error.h>
54
#include <drizzled/gettext.h>
55
#include <drizzled/probes.h>
56
#include <drizzled/sql_parse.h>
57
#include <drizzled/session.h>
58
#include <drizzled/sql_base.h>
59
#include <drizzled/replication_services.h>
60
#include <drizzled/transaction_services.h>
61
#include <drizzled/transaction_context.h>
62
#include <drizzled/message/transaction.pb.h>
63
#include <drizzled/message/statement_transform.h>
64
#include <drizzled/resource_context.h>
65
#include <drizzled/lock.h>
66
#include <drizzled/item/int.h>
67
#include <drizzled/item/empty_string.h>
68
#include <drizzled/field/epoch.h>
69
#include <drizzled/plugin/client.h>
70
#include <drizzled/plugin/monitored_in_transaction.h>
71
#include <drizzled/plugin/transactional_storage_engine.h>
72
#include <drizzled/plugin/xa_resource_manager.h>
73
#include <drizzled/plugin/xa_storage_engine.h>
74
#include <drizzled/internal/my_sys.h>
79
#include <google/protobuf/repeated_field.h>
82
using namespace google;
88
* @defgroup Transactions
92
* Transaction handling in the server
96
* In each client connection, Drizzle maintains two transaction
97
* contexts representing the state of the:
99
* 1) Statement Transaction
100
* 2) Normal Transaction
102
* These two transaction contexts represent the transactional
103
* state of a Session's SQL and XA transactions for a single
104
* SQL statement or a series of SQL statements.
106
* When the Session's connection is in AUTOCOMMIT mode, there
107
* is no practical difference between the statement and the
108
* normal transaction, as each SQL statement is committed or
109
* rolled back depending on the success or failure of the
110
* indvidual SQL statement.
112
* When the Session's connection is NOT in AUTOCOMMIT mode, OR
113
* the Session has explicitly begun a normal SQL transaction using
114
* a BEGIN WORK/START TRANSACTION statement, then the normal
115
* transaction context tracks the aggregate transaction state of
116
* the SQL transaction's individual statements, and the SQL
117
* transaction's commit or rollback is done atomically for all of
118
* the SQL transaction's statement's data changes.
120
* Technically, a statement transaction can be viewed as a savepoint
121
* which is maintained automatically in order to make effects of one
124
* The normal transaction is started by the user and is typically
125
* ended (COMMIT or ROLLBACK) upon an explicity user request as well.
126
* The exception to this is that DDL statements implicitly COMMIT
127
* any previously active normal transaction before they begin executing.
129
* In Drizzle, unlike MySQL, plugins other than a storage engine
130
* may participate in a transaction. All plugin::TransactionalStorageEngine
131
* plugins will automatically be monitored by Drizzle's transaction
132
* manager (implemented in this source file), as will all plugins which
133
* implement plugin::XaResourceManager and register with the transaction
136
* If Drizzle's transaction manager sees that more than one resource
137
* manager (transactional storage engine or XA resource manager) has modified
138
* data state during a statement or normal transaction, the transaction
139
* manager will automatically use a two-phase commit protocol for all
140
* resources which support XA's distributed transaction protocol. Unlike
141
* MySQL, storage engines need not manually register with the transaction
142
* manager during a statement's execution. Previously, in MySQL, all
143
* handlertons would have to call trans_register_ha() at some point after
144
* modifying data state in order to have MySQL include that handler in
145
* an XA transaction. Drizzle does all of this grunt work behind the
146
* scenes for the storage engine implementers.
148
* When a connection is closed, the current normal transaction, if
149
* any is currently active, is rolled back.
151
* Transaction life cycle
152
* ----------------------
154
* When a new connection is established, session->transaction
155
* members are initialized to an empty state. If a statement uses any tables,
156
* all affected engines are registered in the statement engine list automatically
157
* in plugin::StorageEngine::startStatement() and
158
* plugin::TransactionalStorageEngine::startTransaction().
160
* You can view the lifetime of a normal transaction in the following
163
* drizzled::statement::Statement::execute()
164
* drizzled::plugin::TransactionalStorageEngine::startTransaction()
165
* drizzled::TransactionServices::registerResourceForTransaction()
166
* drizzled::TransactionServices::registerResourceForStatement()
167
* drizzled::plugin::StorageEngine::startStatement()
168
* drizzled::Cursor::write_row() <-- example...could be update_row(), etc
169
* drizzled::plugin::StorageEngine::endStatement()
170
* drizzled::TransactionServices::autocommitOrRollback()
171
* drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
172
* drizzled::XaResourceManager::xaCommit() <-- or rollback()
174
* Roles and responsibilities
175
* --------------------------
177
* Beginning of SQL Statement (and Statement Transaction)
178
* ------------------------------------------------------
180
* At the start of each SQL statement, for each storage engine
181
* <strong>that is involved in the SQL statement</strong>, the kernel
182
* calls the engine's plugin::StoragEngine::startStatement() method. If the
183
* engine needs to track some data for the statement, it should use
184
* this method invocation to initialize this data. This is the
185
* beginning of what is called the "statement transaction".
187
* <strong>For transaction storage engines (those storage engines
188
* that inherit from plugin::TransactionalStorageEngine)</strong>, the
189
* kernel automatically determines if the start of the SQL statement
190
* transaction should <em>also</em> begin the normal SQL transaction.
191
* This occurs when the connection is in NOT in autocommit mode. If
192
* the kernel detects this, then the kernel automatically starts the
193
* normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
194
* method and then calls plugin::StorageEngine::startStatement()
197
* Beginning of an SQL "Normal" Transaction
198
* ----------------------------------------
200
* As noted above, a "normal SQL transaction" may be started when
201
* an SQL statement is started in a connection and the connection is
202
* NOT in AUTOCOMMIT mode. This is automatically done by the kernel.
204
* In addition, when a user executes a START TRANSACTION or
205
* BEGIN WORK statement in a connection, the kernel explicitly
206
* calls each transactional storage engine's startTransaction() method.
208
* Ending of an SQL Statement (and Statement Transaction)
209
* ------------------------------------------------------
211
* At the end of each SQL statement, for each of the aforementioned
212
* involved storage engines, the kernel calls the engine's
213
* plugin::StorageEngine::endStatement() method. If the engine
214
* has initialized or modified some internal data about the
215
* statement transaction, it should use this method to reset or destroy
216
* this data appropriately.
218
* Ending of an SQL "Normal" Transaction
219
* -------------------------------------
221
* The end of a normal transaction is either a ROLLBACK or a COMMIT,
222
* depending on the success or failure of the statement transaction(s)
225
* The end of a "normal transaction" occurs when any of the following
228
* 1) If a statement transaction has completed and AUTOCOMMIT is ON,
229
* then the normal transaction which encloses the statement
231
* 2) If a COMMIT or ROLLBACK statement occurs on the connection
232
* 3) Just before a DDL operation occurs, the kernel will implicitly
233
* commit the active normal transaction
235
* Transactions and Non-transactional Storage Engines
236
* --------------------------------------------------
238
* For non-transactional engines, this call can be safely ignored, an
239
* the kernel tracks whether a non-transactional engine has changed
240
* any data state, and warns the user appropriately if a transaction
241
* (statement or normal) is rolled back after such non-transactional
242
* data changes have been made.
244
* XA Two-phase Commit Protocol
245
* ----------------------------
247
* During statement execution, whenever any of data-modifying
248
* PSEA API methods is used, e.g. Cursor::write_row() or
249
* Cursor::update_row(), the read-write flag is raised in the
250
* statement transaction for the involved engine.
251
* Currently All PSEA calls are "traced", and the data can not be
252
* changed in a way other than issuing a PSEA call. Important:
253
* unless this invariant is preserved the server will not know that
254
* a transaction in a given engine is read-write and will not
255
* involve the two-phase commit protocol!
257
* At the end of a statement, TransactionServices::autocommitOrRollback()
258
* is invoked. This call in turn
259
* invokes plugin::XaResourceManager::xapPepare() for every involved XA
262
* Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
263
* or plugin::XaResourceManager::xaCommit() (depending on what the resource
266
* If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
267
* invoked and the server only calls plugin::StorageEngine::commit_one_phase().
268
* At statement commit, the statement-related read-write engine
269
* flag is propagated to the corresponding flag in the normal
270
* transaction. When the commit is complete, the list of registered
271
* engines is cleared.
273
* Rollback is handled in a similar fashion.
275
* Additional notes on DDL and the normal transaction.
276
* ---------------------------------------------------
278
* CREATE TABLE .. SELECT can start a *new* normal transaction
279
* because of the fact that SELECTs on a transactional storage
280
* engine participate in the normal SQL transaction (due to
281
* isolation level issues and consistent read views).
283
* Behaviour of the server in this case is currently badly
286
* DDL statements use a form of "semantic" logging
287
* to maintain atomicity: if CREATE TABLE .. SELECT failed,
288
* the newly created table is deleted.
290
* In addition, some DDL statements issue interim transaction
291
* commits: e.g. ALTER TABLE issues a COMMIT after data is copied
292
* from the original table to the internal temporary table. Other
293
* statements, e.g. CREATE TABLE ... SELECT do not always commit
296
* And finally there is a group of DDL statements such as
297
* RENAME/DROP TABLE that doesn't start a new transaction
298
* and doesn't commit.
300
* A consistent behaviour is perhaps to always commit the normal
301
* transaction after all DDLs, just like the statement transaction
302
* is always committed at the end of all statements.
304
TransactionServices::TransactionServices()
306
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
309
xa_storage_engine= (plugin::XaStorageEngine*)engine;
313
xa_storage_engine= NULL;
317
void TransactionServices::registerResourceForStatement(Session::reference session,
318
plugin::MonitoredInTransaction *monitored,
319
plugin::TransactionalStorageEngine *engine)
321
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
324
* Now we automatically register this resource manager for the
325
* normal transaction. This is fine because a statement
326
* transaction registration should always enlist the resource
327
* in the normal transaction which contains the statement
330
registerResourceForTransaction(session, monitored, engine);
333
TransactionContext *trans= &session.transaction.stmt;
334
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
336
if (resource_context->isStarted())
337
return; /* already registered, return */
339
assert(monitored->participatesInSqlTransaction());
340
assert(not monitored->participatesInXaTransaction());
342
resource_context->setMonitored(monitored);
343
resource_context->setTransactionalStorageEngine(engine);
344
trans->registerResource(resource_context);
346
trans->no_2pc|= true;
349
void TransactionServices::registerResourceForStatement(Session::reference session,
350
plugin::MonitoredInTransaction *monitored,
351
plugin::TransactionalStorageEngine *engine,
352
plugin::XaResourceManager *resource_manager)
354
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
357
* Now we automatically register this resource manager for the
358
* normal transaction. This is fine because a statement
359
* transaction registration should always enlist the resource
360
* in the normal transaction which contains the statement
363
registerResourceForTransaction(session, monitored, engine, resource_manager);
366
TransactionContext *trans= &session.transaction.stmt;
367
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
369
if (resource_context->isStarted())
370
return; /* already registered, return */
372
assert(monitored->participatesInXaTransaction());
373
assert(monitored->participatesInSqlTransaction());
375
resource_context->setMonitored(monitored);
376
resource_context->setTransactionalStorageEngine(engine);
377
resource_context->setXaResourceManager(resource_manager);
378
trans->registerResource(resource_context);
380
trans->no_2pc|= false;
383
void TransactionServices::registerResourceForTransaction(Session::reference session,
384
plugin::MonitoredInTransaction *monitored,
385
plugin::TransactionalStorageEngine *engine)
387
TransactionContext *trans= &session.transaction.all;
388
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
390
if (resource_context->isStarted())
391
return; /* already registered, return */
393
session.server_status|= SERVER_STATUS_IN_TRANS;
395
trans->registerResource(resource_context);
397
assert(monitored->participatesInSqlTransaction());
398
assert(not monitored->participatesInXaTransaction());
400
resource_context->setMonitored(monitored);
401
resource_context->setTransactionalStorageEngine(engine);
402
trans->no_2pc|= true;
404
if (session.transaction.xid_state.xid.is_null())
405
session.transaction.xid_state.xid.set(session.getQueryId());
407
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
if (! session.getResourceContext(monitored, 0)->isStarted())
409
registerResourceForStatement(session, monitored, engine);
412
void TransactionServices::registerResourceForTransaction(Session::reference session,
413
plugin::MonitoredInTransaction *monitored,
414
plugin::TransactionalStorageEngine *engine,
415
plugin::XaResourceManager *resource_manager)
417
TransactionContext *trans= &session.transaction.all;
418
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
420
if (resource_context->isStarted())
421
return; /* already registered, return */
423
session.server_status|= SERVER_STATUS_IN_TRANS;
425
trans->registerResource(resource_context);
427
assert(monitored->participatesInSqlTransaction());
429
resource_context->setMonitored(monitored);
430
resource_context->setXaResourceManager(resource_manager);
431
resource_context->setTransactionalStorageEngine(engine);
432
trans->no_2pc|= true;
434
if (session.transaction.xid_state.xid.is_null())
435
session.transaction.xid_state.xid.set(session.getQueryId());
437
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
439
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
if (! session.getResourceContext(monitored, 0)->isStarted())
441
registerResourceForStatement(session, monitored, engine, resource_manager);
444
void TransactionServices::allocateNewTransactionId()
446
ReplicationServices &replication_services= ReplicationServices::singleton();
447
if (! replication_services.isActive())
452
Session *my_session= current_session;
453
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
454
my_session->setXaId(xa_id);
457
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
459
if (session.getXaId() == 0)
461
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
464
return session.getXaId();
467
int TransactionServices::commitTransaction(Session::reference session,
468
bool normal_transaction)
470
int error= 0, cookie= 0;
472
'all' means that this is either an explicit commit issued by
473
user, or an implicit commit issued by a DDL.
475
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
476
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
478
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
481
We must not commit the normal transaction if a statement
482
transaction is pending. Otherwise statement transaction
483
flags will not get propagated to its normal transaction's
486
assert(session.transaction.stmt.getResourceContexts().empty() ||
487
trans == &session.transaction.stmt);
489
if (resource_contexts.empty() == false)
491
if (is_real_trans && session.wait_if_global_read_lock(false, false))
493
rollbackTransaction(session, normal_transaction);
498
* If replication is on, we do a PREPARE on the resource managers, push the
499
* Transaction message across the replication stream, and then COMMIT if the
500
* replication stream returned successfully.
502
if (shouldConstructMessages())
504
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
505
it != resource_contexts.end() && ! error;
508
ResourceContext *resource_context= *it;
511
Do not call two-phase commit if this particular
512
transaction is read-only. This allows for simpler
513
implementation in engines that are always read-only.
515
if (! resource_context->hasModifiedData())
518
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
520
if (resource->participatesInXaTransaction())
522
if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
524
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
529
session.status_var.ha_prepare_count++;
533
if (error == 0 && is_real_trans)
536
* Push the constructed Transaction messages across to
537
* replicators and appliers.
539
error= commitTransactionMessage(session);
543
rollbackTransaction(session, normal_transaction);
548
error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
551
session.startWaitingGlobalReadLock();
558
This function does not care about global read lock. A caller should.
560
int TransactionServices::commitPhaseOne(Session::reference session,
561
bool normal_transaction)
564
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
565
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
567
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
568
bool all= normal_transaction;
570
/* If we're in autocommit then we have a real transaction to commit
571
(except if it's BEGIN)
573
if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
576
if (resource_contexts.empty() == false)
578
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
579
it != resource_contexts.end();
583
ResourceContext *resource_context= *it;
585
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
587
if (resource->participatesInXaTransaction())
589
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
591
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
594
else if (normal_transaction)
596
session.status_var.ha_commit_count++;
599
else if (resource->participatesInSqlTransaction())
601
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
603
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
606
else if (normal_transaction)
608
session.status_var.ha_commit_count++;
611
resource_context->reset(); /* keep it conveniently zero-filled */
615
session.transaction.xid_state.xid.null();
617
if (normal_transaction)
619
session.variables.tx_isolation= session.session_tx_isolation;
620
session.transaction.cleanup();
627
int TransactionServices::rollbackTransaction(Session::reference session,
628
bool normal_transaction)
631
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
632
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
634
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
638
We must not rollback the normal transaction if a statement
639
transaction is pending.
641
assert(session.transaction.stmt.getResourceContexts().empty() ||
642
trans == &session.transaction.stmt);
644
if (resource_contexts.empty() == false)
646
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
647
it != resource_contexts.end();
651
ResourceContext *resource_context= *it;
653
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
655
if (resource->participatesInXaTransaction())
657
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
659
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
else if (normal_transaction)
664
session.status_var.ha_rollback_count++;
667
else if (resource->participatesInSqlTransaction())
669
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
671
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
else if (normal_transaction)
676
session.status_var.ha_rollback_count++;
679
resource_context->reset(); /* keep it conveniently zero-filled */
683
* We need to signal the ROLLBACK to ReplicationServices here
684
* BEFORE we set the transaction ID to NULL. This is because
685
* if a bulk segment was sent to replicators, we need to send
686
* a rollback statement with the corresponding transaction ID
690
rollbackTransactionMessage(session);
692
rollbackStatementMessage(session);
695
session.transaction.xid_state.xid.null();
696
if (normal_transaction)
698
session.variables.tx_isolation=session.session_tx_isolation;
699
session.transaction.cleanup();
702
if (normal_transaction)
703
session.transaction_rollback_request= false;
706
* If a non-transactional table was updated, warn the user
709
session.transaction.all.hasModifiedNonTransData() &&
710
session.getKilled() != Session::KILL_CONNECTION)
712
push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
720
int TransactionServices::autocommitOrRollback(Session::reference session,
723
/* One GPB Statement message per SQL statement */
724
message::Statement *statement= session.getStatementMessage();
725
if ((statement != NULL) && (! error))
726
finalizeStatementMessage(*statement, session);
728
if (session.transaction.stmt.getResourceContexts().empty() == false)
730
TransactionContext *trans = &session.transaction.stmt;
731
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
732
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
733
it != resource_contexts.end();
736
ResourceContext *resource_context= *it;
738
resource_context->getTransactionalStorageEngine()->endStatement(&session);
743
if (commitTransaction(session, false))
748
(void) rollbackTransaction(session, false);
749
if (session.transaction_rollback_request)
751
(void) rollbackTransaction(session, true);
752
session.server_status&= ~SERVER_STATUS_IN_TRANS;
756
session.variables.tx_isolation= session.session_tx_isolation;
762
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
764
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
766
/* The below is perfectly fine, since we're simply comparing addresses for the underlying
767
* resources aren't the same... */
768
return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
772
int TransactionServices::rollbackToSavepoint(Session::reference session,
776
TransactionContext *trans= &session.transaction.all;
777
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
778
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
780
trans->no_2pc= false;
782
rolling back to savepoint in all storage engines that were part of the
783
transaction when the savepoint was set
785
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
786
it != sv_resource_contexts.end();
790
ResourceContext *resource_context= *it;
792
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
794
if (resource->participatesInSqlTransaction())
796
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
798
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
803
session.status_var.ha_savepoint_rollback_count++;
806
trans->no_2pc|= not resource->participatesInXaTransaction();
809
rolling back the transaction in all storage engines that were not part of
810
the transaction when the savepoint was set
813
TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
814
TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
815
TransactionContext::ResourceContexts set_difference_contexts;
818
* Bug #542299: segfault during set_difference() below. copy<>() requires pre-allocation
819
* of all elements, including the target, which is why we pre-allocate the set_difference_contexts
822
set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
824
sort(sorted_tran_resource_contexts.begin(),
825
sorted_tran_resource_contexts.end(),
826
ResourceContextCompare());
827
sort(sorted_sv_resource_contexts.begin(),
828
sorted_sv_resource_contexts.end(),
829
ResourceContextCompare());
830
set_difference(sorted_tran_resource_contexts.begin(),
831
sorted_tran_resource_contexts.end(),
832
sorted_sv_resource_contexts.begin(),
833
sorted_sv_resource_contexts.end(),
834
set_difference_contexts.begin(),
835
ResourceContextCompare());
837
* set_difference_contexts now contains all resource contexts
838
* which are in the transaction context but were NOT in the
839
* savepoint's resource contexts.
842
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
843
it != set_difference_contexts.end();
846
ResourceContext *resource_context= *it;
849
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
851
if (resource->participatesInSqlTransaction())
853
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
855
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
860
session.status_var.ha_rollback_count++;
863
resource_context->reset(); /* keep it conveniently zero-filled */
866
trans->setResourceContexts(sv_resource_contexts);
868
if (shouldConstructMessages())
870
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
871
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
872
if (savepoint_transaction != NULL)
874
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
875
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
876
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
877
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
879
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
880
uint32_t num_statements = savepoint_transaction_copy->statement_size();
881
if (num_statements == 0)
883
session.setStatementMessage(NULL);
887
session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
889
session.setTransactionMessage(savepoint_transaction_copy);
898
according to the sql standard (ISO/IEC 9075-2:2003)
899
section "4.33.4 SQL-statements and transaction states",
900
NamedSavepoint is *not* transaction-initiating SQL-statement
902
int TransactionServices::setSavepoint(Session::reference session,
906
TransactionContext *trans= &session.transaction.all;
907
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
909
if (resource_contexts.empty() == false)
911
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
912
it != resource_contexts.end();
915
ResourceContext *resource_context= *it;
918
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
920
if (resource->participatesInSqlTransaction())
922
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
924
my_error(ER_GET_ERRNO, MYF(0), err);
929
session.status_var.ha_savepoint_count++;
935
Remember the list of registered storage engines.
937
sv.setResourceContexts(resource_contexts);
939
if (shouldConstructMessages())
941
message::Transaction *transaction= session.getTransactionMessage();
943
if (transaction != NULL)
945
message::Transaction *transaction_savepoint=
946
new message::Transaction(*transaction);
947
sv.setTransactionMessage(transaction_savepoint);
954
int TransactionServices::releaseSavepoint(Session::reference session,
959
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
961
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
962
it != resource_contexts.end();
966
ResourceContext *resource_context= *it;
968
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
970
if (resource->participatesInSqlTransaction())
972
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
974
my_error(ER_GET_ERRNO, MYF(0), err);
983
bool TransactionServices::shouldConstructMessages()
985
ReplicationServices &replication_services= ReplicationServices::singleton();
986
return replication_services.isActive();
989
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
990
bool should_inc_trx_id)
992
message::Transaction *transaction= session.getTransactionMessage();
994
if (unlikely(transaction == NULL))
997
* Allocate and initialize a new transaction message
998
* for this Session object. Session is responsible for
999
* deleting transaction message when done with it.
1001
transaction= new (nothrow) message::Transaction();
1002
initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
session.setTransactionMessage(transaction);
1010
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
Session::reference session,
1012
bool should_inc_trx_id)
1014
message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
trx->set_server_id(session.getServerId());
1017
if (should_inc_trx_id)
1019
trx->set_transaction_id(getCurrentTransactionId(session));
1024
/* trx and seg id will get set properly elsewhere */
1025
trx->set_transaction_id(0);
1028
trx->set_start_timestamp(session.getCurrentTimestamp());
1030
/* segment info may get set elsewhere as needed */
1031
transaction.set_segment_id(1);
1032
transaction.set_end_segment(true);
1035
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
Session::const_reference session)
1038
message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
trx->set_end_timestamp(session.getCurrentTimestamp());
1042
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
Session::reference session)
1046
session.setStatementMessage(NULL);
1047
session.setTransactionMessage(NULL);
1051
int TransactionServices::commitTransactionMessage(Session::reference session)
1053
ReplicationServices &replication_services= ReplicationServices::singleton();
1054
if (! replication_services.isActive())
1058
* If no Transaction message was ever created, then no data modification
1059
* occurred inside the transaction, so nothing to do.
1061
if (session.getTransactionMessage() == NULL)
1064
/* If there is an active statement message, finalize it. */
1065
message::Statement *statement= session.getStatementMessage();
1067
if (statement != NULL)
1069
finalizeStatementMessage(*statement, session);
1072
message::Transaction* transaction= getActiveTransactionMessage(session);
1075
* It is possible that we could have a Transaction without any Statements
1076
* if we had created a Statement but had to roll it back due to it failing
1077
* mid-execution, and no subsequent Statements were added to the Transaction
1078
* message. In this case, we simply clean up the message and not push it.
1080
if (transaction->statement_size() == 0)
1082
cleanupTransactionMessage(transaction, session);
1086
finalizeTransactionMessage(*transaction, session);
1088
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1090
cleanupTransactionMessage(transaction, session);
1092
return static_cast<int>(result);
1095
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
message::Statement::Type type,
1097
Session::const_reference session)
1099
statement.set_type(type);
1100
statement.set_start_timestamp(session.getCurrentTimestamp());
1102
if (session.variables.replicate_query)
1103
statement.set_sql(session.getQueryString()->c_str());
1106
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
Session::reference session)
1109
statement.set_end_timestamp(session.getCurrentTimestamp());
1110
session.setStatementMessage(NULL);
1113
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1115
ReplicationServices &replication_services= ReplicationServices::singleton();
1116
if (! replication_services.isActive())
1119
message::Transaction *transaction= getActiveTransactionMessage(session);
1122
* OK, so there are two situations that we need to deal with here:
1124
* 1) We receive an instruction to ROLLBACK the current transaction
1125
* and the currently-stored Transaction message is *self-contained*,
1126
* meaning that no Statement messages in the Transaction message
1127
* contain a message having its segment_id member greater than 1. If
1128
* no non-segment ID 1 members are found, we can simply clear the
1129
* current Transaction message and remove it from memory.
1131
* 2) If the Transaction message does indeed have a non-end segment, that
1132
* means that a bulk update/delete/insert Transaction message segment
1133
* has previously been sent over the wire to replicators. In this case,
1134
* we need to package a Transaction with a Statement message of type
1135
* ROLLBACK to indicate to replicators that previously-transmitted
1136
* messages must be un-applied.
1138
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1140
/* Remember the transaction ID so we can re-use it */
1141
uint64_t trx_id= transaction->transaction_context().transaction_id();
1142
uint32_t seg_id= transaction->segment_id();
1145
* Clear the transaction, create a Rollback statement message,
1146
* attach it to the transaction, and push it to replicators.
1148
transaction->Clear();
1149
initTransactionMessage(*transaction, session, false);
1151
/* Set the transaction ID to match the previous messages */
1152
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1153
transaction->set_segment_id(seg_id);
1154
transaction->set_end_segment(true);
1156
message::Statement *statement= transaction->add_statement();
1158
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
finalizeStatementMessage(*statement, session);
1161
finalizeTransactionMessage(*transaction, session);
1163
(void) replication_services.pushTransactionMessage(session, *transaction);
1166
cleanupTransactionMessage(transaction, session);
1169
void TransactionServices::rollbackStatementMessage(Session::reference session)
1171
ReplicationServices &replication_services= ReplicationServices::singleton();
1172
if (! replication_services.isActive())
1175
message::Statement *current_statement= session.getStatementMessage();
1177
/* If we never added a Statement message, nothing to undo. */
1178
if (current_statement == NULL)
1182
* If the Statement has been segmented, then we've already pushed a portion
1183
* of this Statement's row changes through the replication stream and we
1184
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1185
* delete the current Statement message.
1187
bool is_segmented= false;
1189
switch (current_statement->type())
1191
case message::Statement::INSERT:
1192
if (current_statement->insert_data().segment_id() > 1)
1196
case message::Statement::UPDATE:
1197
if (current_statement->update_data().segment_id() > 1)
1201
case message::Statement::DELETE:
1202
if (current_statement->delete_data().segment_id() > 1)
1211
* Remove the Statement message we've been working with (same as
1212
* current_statement).
1214
message::Transaction *transaction= getActiveTransactionMessage(session);
1215
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
statements_in_txn= transaction->mutable_statement();
1217
statements_in_txn->RemoveLast();
1218
session.setStatementMessage(NULL);
1221
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1222
* an indicator to cancel the previous Statement message which should have
1223
* had its end_segment attribute set to false.
1227
current_statement= transaction->add_statement();
1228
initStatementMessage(*current_statement,
1229
message::Statement::ROLLBACK_STATEMENT,
1231
finalizeStatementMessage(*current_statement, session);
1235
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1236
message::Transaction *transaction)
1238
uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
uint32_t seg_id= transaction->segment_id();
1241
transaction->set_end_segment(false);
1242
commitTransactionMessage(session);
1243
transaction= getActiveTransactionMessage(session, false);
1245
/* Set the transaction ID to match the previous messages */
1246
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1247
transaction->set_segment_id(seg_id + 1);
1248
transaction->set_end_segment(true);
1253
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1255
uint32_t *next_segment_id)
1257
message::Statement *statement= session.getStatementMessage();
1258
message::Transaction *transaction= NULL;
1261
* If statement is NULL, this is a new statement.
1262
* If statement is NOT NULL, this a continuation of the same statement.
1263
* This is because autocommitOrRollback() finalizes the statement so that
1264
* we guarantee only one Statement message per statement (i.e., we no longer
1265
* share a single GPB message for multiple statements).
1267
if (statement == NULL)
1269
transaction= getActiveTransactionMessage(session);
1271
if (static_cast<size_t>(transaction->ByteSize()) >=
1272
transaction_message_threshold)
1274
transaction= segmentTransactionMessage(session, transaction);
1277
statement= transaction->add_statement();
1278
setInsertHeader(*statement, session, table);
1279
session.setStatementMessage(statement);
1283
transaction= getActiveTransactionMessage(session);
1286
* If we've passed our threshold for the statement size (possible for
1287
* a bulk insert), we'll finalize the Statement and Transaction (doing
1288
* the Transaction will keep it from getting huge).
1290
if (static_cast<size_t>(transaction->ByteSize()) >=
1291
transaction_message_threshold)
1293
/* Remember the transaction ID so we can re-use it */
1294
uint64_t trx_id= transaction->transaction_context().transaction_id();
1295
uint32_t seg_id= transaction->segment_id();
1297
message::InsertData *current_data= statement->mutable_insert_data();
1299
/* Caller should use this value when adding a new record */
1300
*next_segment_id= current_data->segment_id() + 1;
1302
current_data->set_end_segment(false);
1303
transaction->set_end_segment(false);
1306
* Send the trx message to replicators after finalizing the
1307
* statement and transaction. This will also set the Transaction
1308
* and Statement objects in Session to NULL.
1310
commitTransactionMessage(session);
1313
* Statement and Transaction should now be NULL, so new ones will get
1314
* created. We reuse the transaction id since we are segmenting
1317
transaction= getActiveTransactionMessage(session, false);
1318
assert(transaction != NULL);
1320
statement= transaction->add_statement();
1321
setInsertHeader(*statement, session, table);
1322
session.setStatementMessage(statement);
1324
/* Set the transaction ID to match the previous messages */
1325
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1326
transaction->set_segment_id(seg_id + 1);
1327
transaction->set_end_segment(true);
1332
* Continuation of the same statement. Carry forward the existing
1335
const message::InsertData ¤t_data= statement->insert_data();
1336
*next_segment_id= current_data.segment_id();
1343
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
Session::const_reference session,
1347
initStatementMessage(statement, message::Statement::INSERT, session);
1350
* Now we construct the specialized InsertHeader message inside
1351
* the generalized message::Statement container...
1353
/* Set up the insert header */
1354
message::InsertHeader *header= statement.mutable_insert_header();
1355
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1358
(void) table.getShare()->getSchemaName(schema_name);
1360
(void) table.getShare()->getTableName(table_name);
1362
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1363
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1365
Field *current_field;
1366
Field **table_fields= table.getFields();
1368
message::FieldMetadata *field_metadata;
1370
/* We will read all the table's fields... */
1373
while ((current_field= *table_fields++) != NULL)
1375
field_metadata= header->add_field_metadata();
1376
field_metadata->set_name(current_field->field_name);
1377
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1381
bool TransactionServices::insertRecord(Session::reference session,
1384
ReplicationServices &replication_services= ReplicationServices::singleton();
1385
if (! replication_services.isActive())
1388
if (not table.getShare()->replicate())
1392
* We do this check here because we don't want to even create a
1393
* statement if there isn't a primary key on the table...
1397
* Multi-column primary keys are handled how exactly?
1399
if (not table.getShare()->hasPrimaryKey())
1401
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1405
uint32_t next_segment_id= 1;
1406
message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1408
message::InsertData *data= statement.mutable_insert_data();
1409
data->set_segment_id(next_segment_id);
1410
data->set_end_segment(true);
1411
message::InsertRecord *record= data->add_record();
1413
Field *current_field;
1414
Field **table_fields= table.getFields();
1416
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1417
string_value->set_charset(system_charset_info);
1419
/* We will read all the table's fields... */
1422
while ((current_field= *table_fields++) != NULL)
1424
if (current_field->is_null())
1426
record->add_is_null(true);
1427
record->add_insert_value("", 0);
1431
string_value= current_field->val_str_internal(string_value);
1432
record->add_is_null(false);
1433
record->add_insert_value(string_value->c_ptr(), string_value->length());
1434
string_value->free();
1440
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1442
const unsigned char *old_record,
1443
const unsigned char *new_record,
1444
uint32_t *next_segment_id)
1446
message::Statement *statement= session.getStatementMessage();
1447
message::Transaction *transaction= NULL;
1450
* If statement is NULL, this is a new statement.
1451
* If statement is NOT NULL, this a continuation of the same statement.
1452
* This is because autocommitOrRollback() finalizes the statement so that
1453
* we guarantee only one Statement message per statement (i.e., we no longer
1454
* share a single GPB message for multiple statements).
1456
if (statement == NULL)
1458
transaction= getActiveTransactionMessage(session);
1460
if (static_cast<size_t>(transaction->ByteSize()) >=
1461
transaction_message_threshold)
1463
transaction= segmentTransactionMessage(session, transaction);
1466
statement= transaction->add_statement();
1467
setUpdateHeader(*statement, session, table, old_record, new_record);
1468
session.setStatementMessage(statement);
1472
transaction= getActiveTransactionMessage(session);
1475
* If we've passed our threshold for the statement size (possible for
1476
* a bulk insert), we'll finalize the Statement and Transaction (doing
1477
* the Transaction will keep it from getting huge).
1479
if (static_cast<size_t>(transaction->ByteSize()) >=
1480
transaction_message_threshold)
1482
/* Remember the transaction ID so we can re-use it */
1483
uint64_t trx_id= transaction->transaction_context().transaction_id();
1484
uint32_t seg_id= transaction->segment_id();
1486
message::UpdateData *current_data= statement->mutable_update_data();
1488
/* Caller should use this value when adding a new record */
1489
*next_segment_id= current_data->segment_id() + 1;
1491
current_data->set_end_segment(false);
1492
transaction->set_end_segment(false);
1495
* Send the trx message to replicators after finalizing the
1496
* statement and transaction. This will also set the Transaction
1497
* and Statement objects in Session to NULL.
1499
commitTransactionMessage(session);
1502
* Statement and Transaction should now be NULL, so new ones will get
1503
* created. We reuse the transaction id since we are segmenting
1506
transaction= getActiveTransactionMessage(session, false);
1507
assert(transaction != NULL);
1509
statement= transaction->add_statement();
1510
setUpdateHeader(*statement, session, table, old_record, new_record);
1511
session.setStatementMessage(statement);
1513
/* Set the transaction ID to match the previous messages */
1514
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1515
transaction->set_segment_id(seg_id + 1);
1516
transaction->set_end_segment(true);
1521
* Continuation of the same statement. Carry forward the existing
1524
const message::UpdateData ¤t_data= statement->update_data();
1525
*next_segment_id= current_data.segment_id();
1532
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
Session::const_reference session,
1535
const unsigned char *old_record,
1536
const unsigned char *new_record)
1538
initStatementMessage(statement, message::Statement::UPDATE, session);
1541
* Now we construct the specialized UpdateHeader message inside
1542
* the generalized message::Statement container...
1544
/* Set up the update header */
1545
message::UpdateHeader *header= statement.mutable_update_header();
1546
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1549
(void) table.getShare()->getSchemaName(schema_name);
1551
(void) table.getShare()->getTableName(table_name);
1553
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1554
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1556
Field *current_field;
1557
Field **table_fields= table.getFields();
1559
message::FieldMetadata *field_metadata;
1561
/* We will read all the table's fields... */
1564
while ((current_field= *table_fields++) != NULL)
1567
* We add the "key field metadata" -- i.e. the fields which is
1568
* the primary key for the table.
1570
if (table.getShare()->fieldInPrimaryKey(current_field))
1572
field_metadata= header->add_key_field_metadata();
1573
field_metadata->set_name(current_field->field_name);
1574
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1577
if (isFieldUpdated(current_field, table, old_record, new_record))
1579
/* Field is changed from old to new */
1580
field_metadata= header->add_set_field_metadata();
1581
field_metadata->set_name(current_field->field_name);
1582
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1587
void TransactionServices::updateRecord(Session::reference session,
1589
const unsigned char *old_record,
1590
const unsigned char *new_record)
1592
ReplicationServices &replication_services= ReplicationServices::singleton();
1593
if (! replication_services.isActive())
1596
if (not table.getShare()->replicate())
1599
uint32_t next_segment_id= 1;
1600
message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1602
message::UpdateData *data= statement.mutable_update_data();
1603
data->set_segment_id(next_segment_id);
1604
data->set_end_segment(true);
1605
message::UpdateRecord *record= data->add_record();
1607
Field *current_field;
1608
Field **table_fields= table.getFields();
1609
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1610
string_value->set_charset(system_charset_info);
1612
while ((current_field= *table_fields++) != NULL)
1615
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1616
* but then realized that an UPDATE statement could potentially have different values for
1617
* the SET field. For instance, imagine this SQL scenario:
1619
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1620
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1621
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1623
* We will generate two UpdateRecord messages with different set_value byte arrays.
1625
if (isFieldUpdated(current_field, table, old_record, new_record))
1627
/* Store the original "read bit" for this field */
1628
bool is_read_set= current_field->isReadSet();
1630
/* We need to mark that we will "read" this field... */
1631
table.setReadSet(current_field->position());
1633
/* Read the string value of this field's contents */
1634
string_value= current_field->val_str_internal(string_value);
1637
* Reset the read bit after reading field to its original state. This
1638
* prevents the field from being included in the WHERE clause
1640
current_field->setReadSet(is_read_set);
1642
if (current_field->is_null())
1644
record->add_is_null(true);
1645
record->add_after_value("", 0);
1649
record->add_is_null(false);
1650
record->add_after_value(string_value->c_ptr(), string_value->length());
1652
string_value->free();
1656
* Add the WHERE clause values now...for now, this means the
1657
* primary key field value. Replication only supports tables
1658
* with a primary key.
1660
if (table.getShare()->fieldInPrimaryKey(current_field))
1663
* To say the below is ugly is an understatement. But it works.
1665
* @todo Move this crap into a real Record API.
1667
string_value= current_field->val_str_internal(string_value,
1669
current_field->offset(const_cast<unsigned char *>(new_record)));
1670
record->add_key_value(string_value->c_ptr(), string_value->length());
1671
string_value->free();
1677
bool TransactionServices::isFieldUpdated(Field *current_field,
1679
const unsigned char *old_record,
1680
const unsigned char *new_record)
1683
* The below really should be moved into the Field API and Record API. But for now
1684
* we do this crazy pointer fiddling to figure out if the current field
1685
* has been updated in the supplied record raw byte pointers.
1687
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1690
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1692
bool old_value_is_null= current_field->is_null_in_record(old_record);
1693
bool new_value_is_null= current_field->is_null_in_record(new_record);
1695
bool isUpdated= false;
1696
if (old_value_is_null != new_value_is_null)
1698
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1702
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1710
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1718
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1720
uint32_t *next_segment_id)
1722
message::Statement *statement= session.getStatementMessage();
1723
message::Transaction *transaction= NULL;
1726
* If statement is NULL, this is a new statement.
1727
* If statement is NOT NULL, this a continuation of the same statement.
1728
* This is because autocommitOrRollback() finalizes the statement so that
1729
* we guarantee only one Statement message per statement (i.e., we no longer
1730
* share a single GPB message for multiple statements).
1732
if (statement == NULL)
1734
transaction= getActiveTransactionMessage(session);
1736
if (static_cast<size_t>(transaction->ByteSize()) >=
1737
transaction_message_threshold)
1739
transaction= segmentTransactionMessage(session, transaction);
1742
statement= transaction->add_statement();
1743
setDeleteHeader(*statement, session, table);
1744
session.setStatementMessage(statement);
1748
transaction= getActiveTransactionMessage(session);
1751
* If we've passed our threshold for the statement size (possible for
1752
* a bulk insert), we'll finalize the Statement and Transaction (doing
1753
* the Transaction will keep it from getting huge).
1755
if (static_cast<size_t>(transaction->ByteSize()) >=
1756
transaction_message_threshold)
1758
/* Remember the transaction ID so we can re-use it */
1759
uint64_t trx_id= transaction->transaction_context().transaction_id();
1760
uint32_t seg_id= transaction->segment_id();
1762
message::DeleteData *current_data= statement->mutable_delete_data();
1764
/* Caller should use this value when adding a new record */
1765
*next_segment_id= current_data->segment_id() + 1;
1767
current_data->set_end_segment(false);
1768
transaction->set_end_segment(false);
1771
* Send the trx message to replicators after finalizing the
1772
* statement and transaction. This will also set the Transaction
1773
* and Statement objects in Session to NULL.
1775
commitTransactionMessage(session);
1778
* Statement and Transaction should now be NULL, so new ones will get
1779
* created. We reuse the transaction id since we are segmenting
1782
transaction= getActiveTransactionMessage(session, false);
1783
assert(transaction != NULL);
1785
statement= transaction->add_statement();
1786
setDeleteHeader(*statement, session, table);
1787
session.setStatementMessage(statement);
1789
/* Set the transaction ID to match the previous messages */
1790
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1791
transaction->set_segment_id(seg_id + 1);
1792
transaction->set_end_segment(true);
1797
* Continuation of the same statement. Carry forward the existing
1800
const message::DeleteData ¤t_data= statement->delete_data();
1801
*next_segment_id= current_data.segment_id();
1808
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
Session::const_reference session,
1812
initStatementMessage(statement, message::Statement::DELETE, session);
1815
* Now we construct the specialized DeleteHeader message inside
1816
* the generalized message::Statement container...
1818
message::DeleteHeader *header= statement.mutable_delete_header();
1819
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1822
(void) table.getShare()->getSchemaName(schema_name);
1824
(void) table.getShare()->getTableName(table_name);
1826
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1827
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1829
Field *current_field;
1830
Field **table_fields= table.getFields();
1832
message::FieldMetadata *field_metadata;
1834
while ((current_field= *table_fields++) != NULL)
1837
* Add the WHERE clause values now...for now, this means the
1838
* primary key field value. Replication only supports tables
1839
* with a primary key.
1841
if (table.getShare()->fieldInPrimaryKey(current_field))
1843
field_metadata= header->add_key_field_metadata();
1844
field_metadata->set_name(current_field->field_name);
1845
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1850
void TransactionServices::deleteRecord(Session::reference session,
1852
bool use_update_record)
1854
ReplicationServices &replication_services= ReplicationServices::singleton();
1855
if (! replication_services.isActive())
1858
if (not table.getShare()->replicate())
1861
uint32_t next_segment_id= 1;
1862
message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1864
message::DeleteData *data= statement.mutable_delete_data();
1865
data->set_segment_id(next_segment_id);
1866
data->set_end_segment(true);
1867
message::DeleteRecord *record= data->add_record();
1869
Field *current_field;
1870
Field **table_fields= table.getFields();
1871
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1872
string_value->set_charset(system_charset_info);
1874
while ((current_field= *table_fields++) != NULL)
1877
* Add the WHERE clause values now...for now, this means the
1878
* primary key field value. Replication only supports tables
1879
* with a primary key.
1881
if (table.getShare()->fieldInPrimaryKey(current_field))
1883
if (use_update_record)
1886
* Temporarily point to the update record to get its value.
1887
* This is pretty much a hack in order to get the PK value from
1888
* the update record rather than the insert record. Field::val_str()
1889
* should not change anything in Field::ptr, so this should be safe.
1890
* We are careful not to change anything in old_ptr.
1892
const unsigned char *old_ptr= current_field->ptr;
1893
current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1894
string_value= current_field->val_str_internal(string_value);
1895
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1899
string_value= current_field->val_str_internal(string_value);
1901
* @TODO Store optional old record value in the before data member
1904
record->add_key_value(string_value->c_ptr(), string_value->length());
1905
string_value->free();
1910
void TransactionServices::createTable(Session::reference session,
1911
const message::Table &table)
1913
ReplicationServices &replication_services= ReplicationServices::singleton();
1914
if (! replication_services.isActive())
1917
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1920
message::Transaction *transaction= getActiveTransactionMessage(session);
1921
message::Statement *statement= transaction->add_statement();
1923
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1926
* Construct the specialized CreateTableStatement message and attach
1927
* it to the generic Statement message
1929
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1930
message::Table *new_table_message= create_table_statement->mutable_table();
1931
*new_table_message= table;
1933
finalizeStatementMessage(*statement, session);
1935
finalizeTransactionMessage(*transaction, session);
1937
(void) replication_services.pushTransactionMessage(session, *transaction);
1939
cleanupTransactionMessage(transaction, session);
1943
void TransactionServices::createSchema(Session::reference session,
1944
const message::Schema &schema)
1946
ReplicationServices &replication_services= ReplicationServices::singleton();
1947
if (! replication_services.isActive())
1950
if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1953
message::Transaction *transaction= getActiveTransactionMessage(session);
1954
message::Statement *statement= transaction->add_statement();
1956
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1959
* Construct the specialized CreateSchemaStatement message and attach
1960
* it to the generic Statement message
1962
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1963
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
*new_schema_message= schema;
1966
finalizeStatementMessage(*statement, session);
1968
finalizeTransactionMessage(*transaction, session);
1970
(void) replication_services.pushTransactionMessage(session, *transaction);
1972
cleanupTransactionMessage(transaction, session);
1976
void TransactionServices::dropSchema(Session::reference session,
1977
identifier::Schema::const_reference identifier,
1978
message::schema::const_reference schema)
1980
ReplicationServices &replication_services= ReplicationServices::singleton();
1981
if (! replication_services.isActive())
1984
if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1987
message::Transaction *transaction= getActiveTransactionMessage(session);
1988
message::Statement *statement= transaction->add_statement();
1990
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1993
* Construct the specialized DropSchemaStatement message and attach
1994
* it to the generic Statement message
1996
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1998
drop_schema_statement->set_schema_name(identifier.getSchemaName());
2000
finalizeStatementMessage(*statement, session);
2002
finalizeTransactionMessage(*transaction, session);
2004
(void) replication_services.pushTransactionMessage(session, *transaction);
2006
cleanupTransactionMessage(transaction, session);
2009
void TransactionServices::alterSchema(Session::reference session,
2010
const message::Schema &old_schema,
2011
const message::Schema &new_schema)
2013
ReplicationServices &replication_services= ReplicationServices::singleton();
2014
if (! replication_services.isActive())
2017
if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2020
message::Transaction *transaction= getActiveTransactionMessage(session);
2021
message::Statement *statement= transaction->add_statement();
2023
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2026
* Construct the specialized AlterSchemaStatement message and attach
2027
* it to the generic Statement message
2029
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2031
message::Schema *before= alter_schema_statement->mutable_before();
2032
message::Schema *after= alter_schema_statement->mutable_after();
2034
*before= old_schema;
2037
finalizeStatementMessage(*statement, session);
2039
finalizeTransactionMessage(*transaction, session);
2041
(void) replication_services.pushTransactionMessage(session, *transaction);
2043
cleanupTransactionMessage(transaction, session);
2046
void TransactionServices::dropTable(Session::reference session,
2047
identifier::Table::const_reference identifier,
2048
message::table::const_reference table,
2051
ReplicationServices &replication_services= ReplicationServices::singleton();
2052
if (! replication_services.isActive())
2055
if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2058
message::Transaction *transaction= getActiveTransactionMessage(session);
2059
message::Statement *statement= transaction->add_statement();
2061
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2064
* Construct the specialized DropTableStatement message and attach
2065
* it to the generic Statement message
2067
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2069
drop_table_statement->set_if_exists_clause(if_exists);
2071
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2073
table_metadata->set_schema_name(identifier.getSchemaName());
2074
table_metadata->set_table_name(identifier.getTableName());
2076
finalizeStatementMessage(*statement, session);
2078
finalizeTransactionMessage(*transaction, session);
2080
(void) replication_services.pushTransactionMessage(session, *transaction);
2082
cleanupTransactionMessage(transaction, session);
2085
void TransactionServices::truncateTable(Session::reference session,
2088
ReplicationServices &replication_services= ReplicationServices::singleton();
2089
if (! replication_services.isActive())
2092
if (not table.getShare()->replicate())
2095
message::Transaction *transaction= getActiveTransactionMessage(session);
2096
message::Statement *statement= transaction->add_statement();
2098
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2101
* Construct the specialized TruncateTableStatement message and attach
2102
* it to the generic Statement message
2104
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2105
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2108
(void) table.getShare()->getSchemaName(schema_name);
2111
(void) table.getShare()->getTableName(table_name);
2113
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2116
finalizeStatementMessage(*statement, session);
2118
finalizeTransactionMessage(*transaction, session);
2120
(void) replication_services.pushTransactionMessage(session, *transaction);
2122
cleanupTransactionMessage(transaction, session);
2125
void TransactionServices::rawStatement(Session::reference session,
2126
const string &query)
2128
ReplicationServices &replication_services= ReplicationServices::singleton();
2129
if (! replication_services.isActive())
2132
message::Transaction *transaction= getActiveTransactionMessage(session);
2133
message::Statement *statement= transaction->add_statement();
2135
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2136
statement->set_sql(query);
2137
finalizeStatementMessage(*statement, session);
2139
finalizeTransactionMessage(*transaction, session);
2141
(void) replication_services.pushTransactionMessage(session, *transaction);
2143
cleanupTransactionMessage(transaction, session);
2146
int TransactionServices::sendEvent(Session::reference session,
2147
const message::Event &event)
2149
ReplicationServices &replication_services= ReplicationServices::singleton();
2150
if (! replication_services.isActive())
2153
message::Transaction *transaction= new (nothrow) message::Transaction();
2155
// set server id, start timestamp
2156
initTransactionMessage(*transaction, session, true);
2158
// set end timestamp
2159
finalizeTransactionMessage(*transaction, session);
2161
message::Event *trx_event= transaction->mutable_event();
2163
trx_event->CopyFrom(event);
2165
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2169
return static_cast<int>(result);
2172
bool TransactionServices::sendStartupEvent(Session::reference session)
2174
message::Event event;
2175
event.set_type(message::Event::STARTUP);
2176
if (sendEvent(session, event) != 0)
2181
bool TransactionServices::sendShutdownEvent(Session::reference session)
2183
message::Event event;
2184
event.set_type(message::Event::SHUTDOWN);
2185
if (sendEvent(session, event) != 0)
2190
} /* namespace drizzled */