1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
* @file Transaction processing code
26
* The TransactionServices component takes internal events (for instance the start of a
27
* transaction, the changing of a record, or the rollback of a transaction)
28
* and constructs GPB Messages that are passed to the ReplicationServices
29
* component and used during replication.
31
* The reason for this functionality is to encapsulate all communication
32
* between the kernel and the replicator/applier plugins into GPB Messages.
33
* Instead of the plugin having to understand the (often fluidly changing)
34
* mechanics of the kernel, all the plugin needs to understand is the message
35
* format, and GPB messages provide a nice, clear, and versioned format for
38
* @see /drizzled/message/transaction.proto
42
* We really should store the raw bytes in the messages, not the
43
* String value of the Field. But, to do that, the
44
* statement_transform library needs first to be updated
45
* to include the transformation code to convert raw
46
* Drizzle-internal Field byte representation into something
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
78
#include <google/protobuf/repeated_field.h>
81
using namespace google;
87
* @defgroup Transactions
91
* Transaction handling in the server
95
* In each client connection, Drizzle maintains two transaction
96
* contexts representing the state of the:
98
* 1) Statement Transaction
99
* 2) Normal Transaction
101
* These two transaction contexts represent the transactional
102
* state of a Session's SQL and XA transactions for a single
103
* SQL statement or a series of SQL statements.
105
* When the Session's connection is in AUTOCOMMIT mode, there
106
* is no practical difference between the statement and the
107
* normal transaction, as each SQL statement is committed or
108
* rolled back depending on the success or failure of the
109
* indvidual SQL statement.
111
* When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
* the Session has explicitly begun a normal SQL transaction using
113
* a BEGIN WORK/START TRANSACTION statement, then the normal
114
* transaction context tracks the aggregate transaction state of
115
* the SQL transaction's individual statements, and the SQL
116
* transaction's commit or rollback is done atomically for all of
117
* the SQL transaction's statement's data changes.
119
* Technically, a statement transaction can be viewed as a savepoint
120
* which is maintained automatically in order to make effects of one
123
* The normal transaction is started by the user and is typically
124
* ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
* The exception to this is that DDL statements implicitly COMMIT
126
* any previously active normal transaction before they begin executing.
128
* In Drizzle, unlike MySQL, plugins other than a storage engine
129
* may participate in a transaction. All plugin::TransactionalStorageEngine
130
* plugins will automatically be monitored by Drizzle's transaction
131
* manager (implemented in this source file), as will all plugins which
132
* implement plugin::XaResourceManager and register with the transaction
135
* If Drizzle's transaction manager sees that more than one resource
136
* manager (transactional storage engine or XA resource manager) has modified
137
* data state during a statement or normal transaction, the transaction
138
* manager will automatically use a two-phase commit protocol for all
139
* resources which support XA's distributed transaction protocol. Unlike
140
* MySQL, storage engines need not manually register with the transaction
141
* manager during a statement's execution. Previously, in MySQL, all
142
* handlertons would have to call trans_register_ha() at some point after
143
* modifying data state in order to have MySQL include that handler in
144
* an XA transaction. Drizzle does all of this grunt work behind the
145
* scenes for the storage engine implementers.
147
* When a connection is closed, the current normal transaction, if
148
* any is currently active, is rolled back.
150
* Transaction life cycle
151
* ----------------------
153
* When a new connection is established, session->transaction
154
* members are initialized to an empty state. If a statement uses any tables,
155
* all affected engines are registered in the statement engine list automatically
156
* in plugin::StorageEngine::startStatement() and
157
* plugin::TransactionalStorageEngine::startTransaction().
159
* You can view the lifetime of a normal transaction in the following
162
* drizzled::statement::Statement::execute()
163
* drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
* drizzled::TransactionServices::registerResourceForTransaction()
165
* drizzled::TransactionServices::registerResourceForStatement()
166
* drizzled::plugin::StorageEngine::startStatement()
167
* drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
* drizzled::plugin::StorageEngine::endStatement()
169
* drizzled::TransactionServices::autocommitOrRollback()
170
* drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
* drizzled::XaResourceManager::xaCommit() <-- or rollback()
173
* Roles and responsibilities
174
* --------------------------
176
* Beginning of SQL Statement (and Statement Transaction)
177
* ------------------------------------------------------
179
* At the start of each SQL statement, for each storage engine
180
* <strong>that is involved in the SQL statement</strong>, the kernel
181
* calls the engine's plugin::StoragEngine::startStatement() method. If the
182
* engine needs to track some data for the statement, it should use
183
* this method invocation to initialize this data. This is the
184
* beginning of what is called the "statement transaction".
186
* <strong>For transaction storage engines (those storage engines
187
* that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
* kernel automatically determines if the start of the SQL statement
189
* transaction should <em>also</em> begin the normal SQL transaction.
190
* This occurs when the connection is in NOT in autocommit mode. If
191
* the kernel detects this, then the kernel automatically starts the
192
* normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
* method and then calls plugin::StorageEngine::startStatement()
196
* Beginning of an SQL "Normal" Transaction
197
* ----------------------------------------
199
* As noted above, a "normal SQL transaction" may be started when
200
* an SQL statement is started in a connection and the connection is
201
* NOT in AUTOCOMMIT mode. This is automatically done by the kernel.
203
* In addition, when a user executes a START TRANSACTION or
204
* BEGIN WORK statement in a connection, the kernel explicitly
205
* calls each transactional storage engine's startTransaction() method.
207
* Ending of an SQL Statement (and Statement Transaction)
208
* ------------------------------------------------------
210
* At the end of each SQL statement, for each of the aforementioned
211
* involved storage engines, the kernel calls the engine's
212
* plugin::StorageEngine::endStatement() method. If the engine
213
* has initialized or modified some internal data about the
214
* statement transaction, it should use this method to reset or destroy
215
* this data appropriately.
217
* Ending of an SQL "Normal" Transaction
218
* -------------------------------------
220
* The end of a normal transaction is either a ROLLBACK or a COMMIT,
221
* depending on the success or failure of the statement transaction(s)
224
* The end of a "normal transaction" occurs when any of the following
227
* 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
* then the normal transaction which encloses the statement
230
* 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
* 3) Just before a DDL operation occurs, the kernel will implicitly
232
* commit the active normal transaction
234
* Transactions and Non-transactional Storage Engines
235
* --------------------------------------------------
237
* For non-transactional engines, this call can be safely ignored, an
238
* the kernel tracks whether a non-transactional engine has changed
239
* any data state, and warns the user appropriately if a transaction
240
* (statement or normal) is rolled back after such non-transactional
241
* data changes have been made.
243
* XA Two-phase Commit Protocol
244
* ----------------------------
246
* During statement execution, whenever any of data-modifying
247
* PSEA API methods is used, e.g. Cursor::write_row() or
248
* Cursor::update_row(), the read-write flag is raised in the
249
* statement transaction for the involved engine.
250
* Currently All PSEA calls are "traced", and the data can not be
251
* changed in a way other than issuing a PSEA call. Important:
252
* unless this invariant is preserved the server will not know that
253
* a transaction in a given engine is read-write and will not
254
* involve the two-phase commit protocol!
256
* At the end of a statement, TransactionServices::autocommitOrRollback()
257
* is invoked. This call in turn
258
* invokes plugin::XaResourceManager::xapPepare() for every involved XA
261
* Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
* or plugin::XaResourceManager::xaCommit() (depending on what the resource
265
* If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
* invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
* At statement commit, the statement-related read-write engine
268
* flag is propagated to the corresponding flag in the normal
269
* transaction. When the commit is complete, the list of registered
270
* engines is cleared.
272
* Rollback is handled in a similar fashion.
274
* Additional notes on DDL and the normal transaction.
275
* ---------------------------------------------------
277
* CREATE TABLE .. SELECT can start a *new* normal transaction
278
* because of the fact that SELECTs on a transactional storage
279
* engine participate in the normal SQL transaction (due to
280
* isolation level issues and consistent read views).
282
* Behaviour of the server in this case is currently badly
285
* DDL statements use a form of "semantic" logging
286
* to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
* the newly created table is deleted.
289
* In addition, some DDL statements issue interim transaction
290
* commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
* from the original table to the internal temporary table. Other
292
* statements, e.g. CREATE TABLE ... SELECT do not always commit
295
* And finally there is a group of DDL statements such as
296
* RENAME/DROP TABLE that doesn't start a new transaction
297
* and doesn't commit.
299
* A consistent behaviour is perhaps to always commit the normal
300
* transaction after all DDLs, just like the statement transaction
301
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
316
void TransactionServices::registerResourceForStatement(Session *session,
317
plugin::MonitoredInTransaction *monitored,
318
plugin::TransactionalStorageEngine *engine)
320
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
323
* Now we automatically register this resource manager for the
324
* normal transaction. This is fine because a statement
325
* transaction registration should always enlist the resource
326
* in the normal transaction which contains the statement
329
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session->transaction.stmt;
333
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
335
if (resource_context->isStarted())
336
return; /* already registered, return */
338
assert(monitored->participatesInSqlTransaction());
339
assert(not monitored->participatesInXaTransaction());
341
resource_context->setMonitored(monitored);
342
resource_context->setTransactionalStorageEngine(engine);
343
trans->registerResource(resource_context);
345
trans->no_2pc|= true;
348
void TransactionServices::registerResourceForStatement(Session *session,
349
plugin::MonitoredInTransaction *monitored,
350
plugin::TransactionalStorageEngine *engine,
351
plugin::XaResourceManager *resource_manager)
353
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
356
* Now we automatically register this resource manager for the
357
* normal transaction. This is fine because a statement
358
* transaction registration should always enlist the resource
359
* in the normal transaction which contains the statement
362
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session->transaction.stmt;
366
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
368
if (resource_context->isStarted())
369
return; /* already registered, return */
371
assert(monitored->participatesInXaTransaction());
372
assert(monitored->participatesInSqlTransaction());
374
resource_context->setMonitored(monitored);
375
resource_context->setTransactionalStorageEngine(engine);
376
resource_context->setXaResourceManager(resource_manager);
377
trans->registerResource(resource_context);
379
trans->no_2pc|= false;
382
void TransactionServices::registerResourceForTransaction(Session *session,
383
plugin::MonitoredInTransaction *monitored,
384
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session->transaction.all;
387
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
389
if (resource_context->isStarted())
390
return; /* already registered, return */
392
session->server_status|= SERVER_STATUS_IN_TRANS;
394
trans->registerResource(resource_context);
396
assert(monitored->participatesInSqlTransaction());
397
assert(not monitored->participatesInXaTransaction());
399
resource_context->setMonitored(monitored);
400
resource_context->setTransactionalStorageEngine(engine);
401
trans->no_2pc|= true;
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session->getResourceContext(monitored, 0)->isStarted())
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
plugin::MonitoredInTransaction *monitored,
413
plugin::TransactionalStorageEngine *engine,
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
419
if (resource_context->isStarted())
420
return; /* already registered, return */
422
session->server_status|= SERVER_STATUS_IN_TRANS;
424
trans->registerResource(resource_context);
426
assert(monitored->participatesInSqlTransaction());
428
resource_context->setMonitored(monitored);
429
resource_context->setXaResourceManager(resource_manager);
430
resource_context->setTransactionalStorageEngine(engine);
431
trans->no_2pc|= true;
433
if (session->transaction.xid_state.xid.is_null())
434
session->transaction.xid_state.xid.set(session->getQueryId());
436
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
438
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session->getResourceContext(monitored, 0)->isStarted())
440
registerResourceForStatement(session, monitored, engine, resource_manager);
443
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
451
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
if (session->getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
463
return session->getXaId();
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
482
int error= 0, cookie= 0;
484
'all' means that this is either an explicit commit issued by
485
user, or an implicit commit issued by a DDL.
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
488
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
493
We must not commit the normal transaction if a statement
494
transaction is pending. Otherwise statement transaction
495
flags will not get propagated to its normal transaction's
498
assert(session->transaction.stmt.getResourceContexts().empty() ||
499
trans == &session->transaction.stmt);
501
if (resource_contexts.empty() == false)
503
if (is_real_trans && session->wait_if_global_read_lock(false, false))
505
rollbackTransaction(session, normal_transaction);
510
* If replication is on, we do a PREPARE on the resource managers, push the
511
* Transaction message across the replication stream, and then COMMIT if the
512
* replication stream returned successfully.
514
if (shouldConstructMessages())
516
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
517
it != resource_contexts.end() && ! error;
520
ResourceContext *resource_context= *it;
523
Do not call two-phase commit if this particular
524
transaction is read-only. This allows for simpler
525
implementation in engines that are always read-only.
527
if (! resource_context->hasModifiedData())
530
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
532
if (resource->participatesInXaTransaction())
534
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
536
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
541
session->status_var.ha_prepare_count++;
545
if (error == 0 && is_real_trans)
548
* Push the constructed Transaction messages across to
549
* replicators and appliers.
551
error= commitTransactionMessage(session);
555
rollbackTransaction(session, normal_transaction);
560
error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
563
session->startWaitingGlobalReadLock();
570
This function does not care about global read lock. A caller should.
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
575
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
576
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
578
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
580
if (resource_contexts.empty() == false)
582
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
583
it != resource_contexts.end();
587
ResourceContext *resource_context= *it;
589
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
591
if (resource->participatesInXaTransaction())
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
598
else if (normal_transaction)
600
session->status_var.ha_commit_count++;
603
else if (resource->participatesInSqlTransaction())
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
610
else if (normal_transaction)
612
session->status_var.ha_commit_count++;
615
resource_context->reset(); /* keep it conveniently zero-filled */
619
session->transaction.xid_state.xid.null();
621
if (normal_transaction)
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
640
We must not rollback the normal transaction if a statement
641
transaction is pending.
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
646
if (resource_contexts.empty() == false)
648
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
649
it != resource_contexts.end();
653
ResourceContext *resource_context= *it;
655
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
657
if (resource->participatesInXaTransaction())
659
if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
661
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
664
else if (normal_transaction)
666
session->status_var.ha_rollback_count++;
669
else if (resource->participatesInSqlTransaction())
671
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
673
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
676
else if (normal_transaction)
678
session->status_var.ha_rollback_count++;
681
resource_context->reset(); /* keep it conveniently zero-filled */
685
* We need to signal the ROLLBACK to ReplicationServices here
686
* BEFORE we set the transaction ID to NULL. This is because
687
* if a bulk segment was sent to replicators, we need to send
688
* a rollback statement with the corresponding transaction ID
691
if (normal_transaction)
692
rollbackTransactionMessage(session);
695
session->transaction.xid_state.xid.null();
696
if (normal_transaction)
698
session->variables.tx_isolation=session->session_tx_isolation;
699
session->transaction.cleanup();
702
if (normal_transaction)
703
session->transaction_rollback_request= false;
706
* If a non-transactional table was updated, warn the user
709
session->transaction.all.hasModifiedNonTransData() &&
710
session->getKilled() != Session::KILL_CONNECTION)
712
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
721
This is used to commit or rollback a single statement depending on
725
Note that if the autocommit is on, then the following call inside
726
InnoDB will commit or rollback the whole transaction (= the statement). The
727
autocommit mechanism built into InnoDB is based on counting locks, but if
728
the user has used LOCK TABLES then that mechanism does not know to do the
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
734
if (session->transaction.stmt.getResourceContexts().empty() == false)
736
TransactionContext *trans = &session->transaction.stmt;
737
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
738
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
739
it != resource_contexts.end();
742
ResourceContext *resource_context= *it;
744
resource_context->getTransactionalStorageEngine()->endStatement(session);
749
if (commitTransaction(session, false))
754
(void) rollbackTransaction(session, false);
755
if (session->transaction_rollback_request)
756
(void) rollbackTransaction(session, true);
759
session->variables.tx_isolation= session->session_tx_isolation;
764
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
766
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
768
/* The below is perfectly fine, since we're simply comparing addresses for the underlying
769
* resources aren't the same... */
770
return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
774
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
777
TransactionContext *trans= &session->transaction.all;
778
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
779
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
781
trans->no_2pc= false;
783
rolling back to savepoint in all storage engines that were part of the
784
transaction when the savepoint was set
786
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
787
it != sv_resource_contexts.end();
791
ResourceContext *resource_context= *it;
793
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
795
if (resource->participatesInSqlTransaction())
797
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
799
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
804
session->status_var.ha_savepoint_rollback_count++;
807
trans->no_2pc|= not resource->participatesInXaTransaction();
810
rolling back the transaction in all storage engines that were not part of
811
the transaction when the savepoint was set
814
TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
815
TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
816
TransactionContext::ResourceContexts set_difference_contexts;
819
* Bug #542299: segfault during set_difference() below. copy<>() requires pre-allocation
820
* of all elements, including the target, which is why we pre-allocate the set_difference_contexts
823
set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
825
sort(sorted_tran_resource_contexts.begin(),
826
sorted_tran_resource_contexts.end(),
827
ResourceContextCompare());
828
sort(sorted_sv_resource_contexts.begin(),
829
sorted_sv_resource_contexts.end(),
830
ResourceContextCompare());
831
set_difference(sorted_tran_resource_contexts.begin(),
832
sorted_tran_resource_contexts.end(),
833
sorted_sv_resource_contexts.begin(),
834
sorted_sv_resource_contexts.end(),
835
set_difference_contexts.begin(),
836
ResourceContextCompare());
838
* set_difference_contexts now contains all resource contexts
839
* which are in the transaction context but were NOT in the
840
* savepoint's resource contexts.
843
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
844
it != set_difference_contexts.end();
847
ResourceContext *resource_context= *it;
850
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
852
if (resource->participatesInSqlTransaction())
854
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
856
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
861
session->status_var.ha_rollback_count++;
864
resource_context->reset(); /* keep it conveniently zero-filled */
867
trans->setResourceContexts(sv_resource_contexts);
869
if (shouldConstructMessages())
871
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
872
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
873
if (savepoint_transaction != NULL)
875
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
876
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
877
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
878
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
880
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
881
uint32_t num_statements = savepoint_transaction_copy->statement_size();
882
if (num_statements == 0)
884
session->setStatementMessage(NULL);
888
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
890
session->setTransactionMessage(savepoint_transaction_copy);
899
according to the sql standard (ISO/IEC 9075-2:2003)
900
section "4.33.4 SQL-statements and transaction states",
901
NamedSavepoint is *not* transaction-initiating SQL-statement
903
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
906
TransactionContext *trans= &session->transaction.all;
907
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
909
if (resource_contexts.empty() == false)
911
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
912
it != resource_contexts.end();
915
ResourceContext *resource_context= *it;
918
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
920
if (resource->participatesInSqlTransaction())
922
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
924
my_error(ER_GET_ERRNO, MYF(0), err);
929
session->status_var.ha_savepoint_count++;
935
Remember the list of registered storage engines.
937
sv.setResourceContexts(resource_contexts);
939
if (shouldConstructMessages())
941
message::Transaction *transaction= session->getTransactionMessage();
943
if (transaction != NULL)
945
message::Transaction *transaction_savepoint=
946
new message::Transaction(*transaction);
947
sv.setTransactionMessage(transaction_savepoint);
954
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
958
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
960
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
961
it != resource_contexts.end();
965
ResourceContext *resource_context= *it;
967
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
969
if (resource->participatesInSqlTransaction())
971
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
973
my_error(ER_GET_ERRNO, MYF(0), err);
982
bool TransactionServices::shouldConstructMessages()
984
ReplicationServices &replication_services= ReplicationServices::singleton();
985
return replication_services.isActive();
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
990
message::Transaction *transaction= in_session->getTransactionMessage();
992
if (unlikely(transaction == NULL))
995
* Allocate and initialize a new transaction message
996
* for this Session object. Session is responsible for
997
* deleting transaction message when done with it.
999
transaction= new (nothrow) message::Transaction();
1000
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1001
in_session->setTransactionMessage(transaction);
1008
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1009
Session *in_session,
1010
bool should_inc_trx_id)
1012
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1013
trx->set_server_id(in_session->getServerId());
1015
if (should_inc_trx_id)
1017
trx->set_transaction_id(getCurrentTransactionId(in_session));
1018
in_session->setXaId(0);
1022
trx->set_transaction_id(0);
1025
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1028
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1029
Session *in_session)
1031
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1032
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1035
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1036
Session *in_session)
1038
delete in_transaction;
1039
in_session->setStatementMessage(NULL);
1040
in_session->setTransactionMessage(NULL);
1041
in_session->setXaId(0);
1044
int TransactionServices::commitTransactionMessage(Session *in_session)
1046
ReplicationServices &replication_services= ReplicationServices::singleton();
1047
if (! replication_services.isActive())
1050
/* If there is an active statement message, finalize it */
1051
message::Statement *statement= in_session->getStatementMessage();
1053
if (statement != NULL)
1055
finalizeStatementMessage(*statement, in_session);
1058
return 0; /* No data modification occurred inside the transaction */
1060
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1062
finalizeTransactionMessage(*transaction, in_session);
1064
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1066
cleanupTransactionMessage(transaction, in_session);
1068
return static_cast<int>(result);
1071
void TransactionServices::initStatementMessage(message::Statement &statement,
1072
message::Statement::Type in_type,
1073
Session *in_session)
1075
statement.set_type(in_type);
1076
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1078
if (in_session->variables.replicate_query)
1079
statement.set_sql(in_session->getQueryString()->c_str());
1082
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1083
Session *in_session)
1085
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1086
in_session->setStatementMessage(NULL);
1089
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1091
ReplicationServices &replication_services= ReplicationServices::singleton();
1092
if (! replication_services.isActive())
1095
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1098
* OK, so there are two situations that we need to deal with here:
1100
* 1) We receive an instruction to ROLLBACK the current transaction
1101
* and the currently-stored Transaction message is *self-contained*,
1102
* meaning that no Statement messages in the Transaction message
1103
* contain a message having its segment_id member greater than 1. If
1104
* no non-segment ID 1 members are found, we can simply clear the
1105
* current Transaction message and remove it from memory.
1107
* 2) If the Transaction message does indeed have a non-end segment, that
1108
* means that a bulk update/delete/insert Transaction message segment
1109
* has previously been sent over the wire to replicators. In this case,
1110
* we need to package a Transaction with a Statement message of type
1111
* ROLLBACK to indicate to replicators that previously-transmitted
1112
* messages must be un-applied.
1114
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1116
/* Remember the transaction ID so we can re-use it */
1117
uint64_t trx_id= transaction->transaction_context().transaction_id();
1120
* Clear the transaction, create a Rollback statement message,
1121
* attach it to the transaction, and push it to replicators.
1123
transaction->Clear();
1124
initTransactionMessage(*transaction, in_session, false);
1126
/* Set the transaction ID to match the previous messages */
1127
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1129
message::Statement *statement= transaction->add_statement();
1131
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1132
finalizeStatementMessage(*statement, in_session);
1134
finalizeTransactionMessage(*transaction, in_session);
1136
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1138
cleanupTransactionMessage(transaction, in_session);
1141
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1143
uint32_t *next_segment_id)
1145
message::Statement *statement= in_session->getStatementMessage();
1146
message::Transaction *transaction= NULL;
1149
* Check the type for the current Statement message, if it is anything
1150
* other then INSERT we need to call finalize, this will ensure a
1151
* new InsertStatement is created. If it is of type INSERT check
1152
* what table the INSERT belongs to, if it is a different table
1153
* call finalize, so a new InsertStatement can be created.
1155
if (statement != NULL && statement->type() != message::Statement::INSERT)
1157
finalizeStatementMessage(*statement, in_session);
1158
statement= in_session->getStatementMessage();
1160
else if (statement != NULL)
1162
transaction= getActiveTransactionMessage(in_session);
1165
* If we've passed our threshold for the statement size (possible for
1166
* a bulk insert), we'll finalize the Statement and Transaction (doing
1167
* the Transaction will keep it from getting huge).
1169
if (static_cast<size_t>(transaction->ByteSize()) >=
1170
in_session->variables.transaction_message_threshold)
1172
/* Remember the transaction ID so we can re-use it */
1173
uint64_t trx_id= transaction->transaction_context().transaction_id();
1175
message::InsertData *current_data= statement->mutable_insert_data();
1177
/* Caller should use this value when adding a new record */
1178
*next_segment_id= current_data->segment_id() + 1;
1180
current_data->set_end_segment(false);
1183
* Send the trx message to replicators after finalizing the
1184
* statement and transaction. This will also set the Transaction
1185
* and Statement objects in Session to NULL.
1187
commitTransactionMessage(in_session);
1190
* Statement and Transaction should now be NULL, so new ones will get
1191
* created. We reuse the transaction id since we are segmenting
1194
statement= in_session->getStatementMessage();
1195
transaction= getActiveTransactionMessage(in_session, false);
1196
assert(transaction != NULL);
1198
/* Set the transaction ID to match the previous messages */
1199
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1203
const message::InsertHeader &insert_header= statement->insert_header();
1204
string old_table_name= insert_header.table_metadata().table_name();
1206
string current_table_name;
1207
(void) in_table->getShare()->getTableName(current_table_name);
1209
if (current_table_name.compare(old_table_name))
1211
finalizeStatementMessage(*statement, in_session);
1212
statement= in_session->getStatementMessage();
1216
/* append this INSERT query string */
1217
if (in_session->variables.replicate_query)
1219
string s(statement->sql());
1223
s.append(in_session->getQueryString()->c_str());
1224
statement->set_sql(s);
1227
statement->set_sql(in_session->getQueryString()->c_str());
1230
/* carry forward the existing segment id */
1231
const message::InsertData ¤t_data= statement->insert_data();
1232
*next_segment_id= current_data.segment_id();
1237
if (statement == NULL)
1240
* Transaction will be non-NULL only if we had to segment it due to
1241
* transaction size above.
1243
if (transaction == NULL)
1244
transaction= getActiveTransactionMessage(in_session);
1247
* Transaction message initialized and set, but no statement created
1248
* yet. We construct one and initialize it, here, then return the
1249
* message after attaching the new Statement message pointer to the
1250
* Session for easy retrieval later...
1252
statement= transaction->add_statement();
1253
setInsertHeader(*statement, in_session, in_table);
1254
in_session->setStatementMessage(statement);
1259
void TransactionServices::setInsertHeader(message::Statement &statement,
1260
Session *in_session,
1263
initStatementMessage(statement, message::Statement::INSERT, in_session);
1266
* Now we construct the specialized InsertHeader message inside
1267
* the generalized message::Statement container...
1269
/* Set up the insert header */
1270
message::InsertHeader *header= statement.mutable_insert_header();
1271
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1274
(void) in_table->getShare()->getSchemaName(schema_name);
1276
(void) in_table->getShare()->getTableName(table_name);
1278
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1279
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1281
Field *current_field;
1282
Field **table_fields= in_table->getFields();
1284
message::FieldMetadata *field_metadata;
1286
/* We will read all the table's fields... */
1287
in_table->setReadSet();
1289
while ((current_field= *table_fields++) != NULL)
1291
field_metadata= header->add_field_metadata();
1292
field_metadata->set_name(current_field->field_name);
1293
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1297
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1299
ReplicationServices &replication_services= ReplicationServices::singleton();
1300
if (! replication_services.isActive())
1303
* We do this check here because we don't want to even create a
1304
* statement if there isn't a primary key on the table...
1308
* Multi-column primary keys are handled how exactly?
1310
if (not in_table->getShare()->hasPrimaryKey())
1312
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1316
uint32_t next_segment_id= 1;
1317
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1319
message::InsertData *data= statement.mutable_insert_data();
1320
data->set_segment_id(next_segment_id);
1321
data->set_end_segment(true);
1322
message::InsertRecord *record= data->add_record();
1324
Field *current_field;
1325
Field **table_fields= in_table->getFields();
1327
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1328
string_value->set_charset(system_charset_info);
1330
/* We will read all the table's fields... */
1331
in_table->setReadSet();
1333
while ((current_field= *table_fields++) != NULL)
1335
if (current_field->is_null())
1337
record->add_is_null(true);
1338
record->add_insert_value("", 0);
1342
string_value= current_field->val_str(string_value);
1343
record->add_is_null(false);
1344
record->add_insert_value(string_value->c_ptr(), string_value->length());
1345
string_value->free();
1351
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1353
const unsigned char *old_record,
1354
const unsigned char *new_record,
1355
uint32_t *next_segment_id)
1357
message::Statement *statement= in_session->getStatementMessage();
1358
message::Transaction *transaction= NULL;
1361
* Check the type for the current Statement message, if it is anything
1362
* other then UPDATE we need to call finalize, this will ensure a
1363
* new UpdateStatement is created. If it is of type UPDATE check
1364
* what table the UPDATE belongs to, if it is a different table
1365
* call finalize, so a new UpdateStatement can be created.
1367
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1369
finalizeStatementMessage(*statement, in_session);
1370
statement= in_session->getStatementMessage();
1372
else if (statement != NULL)
1374
transaction= getActiveTransactionMessage(in_session);
1377
* If we've passed our threshold for the statement size (possible for
1378
* a bulk insert), we'll finalize the Statement and Transaction (doing
1379
* the Transaction will keep it from getting huge).
1381
if (static_cast<size_t>(transaction->ByteSize()) >=
1382
in_session->variables.transaction_message_threshold)
1384
/* Remember the transaction ID so we can re-use it */
1385
uint64_t trx_id= transaction->transaction_context().transaction_id();
1387
message::UpdateData *current_data= statement->mutable_update_data();
1389
/* Caller should use this value when adding a new record */
1390
*next_segment_id= current_data->segment_id() + 1;
1392
current_data->set_end_segment(false);
1395
* Send the trx message to replicators after finalizing the
1396
* statement and transaction. This will also set the Transaction
1397
* and Statement objects in Session to NULL.
1399
commitTransactionMessage(in_session);
1402
* Statement and Transaction should now be NULL, so new ones will get
1403
* created. We reuse the transaction id since we are segmenting
1406
statement= in_session->getStatementMessage();
1407
transaction= getActiveTransactionMessage(in_session, false);
1408
assert(transaction != NULL);
1410
/* Set the transaction ID to match the previous messages */
1411
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1415
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1417
/* append this UPDATE query string */
1418
if (in_session->variables.replicate_query)
1420
string s(statement->sql());
1424
s.append(in_session->getQueryString()->c_str());
1425
statement->set_sql(s);
1428
statement->set_sql(in_session->getQueryString()->c_str());
1431
/* carry forward the existing segment id */
1432
const message::UpdateData ¤t_data= statement->update_data();
1433
*next_segment_id= current_data.segment_id();
1437
finalizeStatementMessage(*statement, in_session);
1438
statement= in_session->getStatementMessage();
1443
if (statement == NULL)
1446
* Transaction will be non-NULL only if we had to segment it due to
1447
* transaction size above.
1449
if (transaction == NULL)
1450
transaction= getActiveTransactionMessage(in_session);
1453
* Transaction message initialized and set, but no statement created
1454
* yet. We construct one and initialize it, here, then return the
1455
* message after attaching the new Statement message pointer to the
1456
* Session for easy retrieval later...
1458
statement= transaction->add_statement();
1459
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1460
in_session->setStatementMessage(statement);
1465
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1467
const unsigned char *old_record,
1468
const unsigned char *new_record)
1470
const message::UpdateHeader &update_header= statement.update_header();
1471
string old_table_name= update_header.table_metadata().table_name();
1473
string current_table_name;
1474
(void) in_table->getShare()->getTableName(current_table_name);
1475
if (current_table_name.compare(old_table_name))
1481
/* Compare the set fields in the existing UpdateHeader and see if they
1482
* match the updated fields in the new record, if they do not we must
1483
* create a new UpdateHeader
1485
size_t num_set_fields= update_header.set_field_metadata_size();
1487
Field *current_field;
1488
Field **table_fields= in_table->getFields();
1489
in_table->setReadSet();
1491
size_t num_calculated_updated_fields= 0;
1493
while ((current_field= *table_fields++) != NULL)
1495
if (num_calculated_updated_fields > num_set_fields)
1500
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1502
/* check that this field exists in the UpdateHeader record */
1505
for (size_t x= 0; x < num_set_fields; ++x)
1507
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1508
string name= field_metadata.name();
1509
if (name.compare(current_field->field_name) == 0)
1512
++num_calculated_updated_fields;
1523
if ((num_calculated_updated_fields == num_set_fields) && found)
1534
void TransactionServices::setUpdateHeader(message::Statement &statement,
1535
Session *in_session,
1537
const unsigned char *old_record,
1538
const unsigned char *new_record)
1540
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1543
* Now we construct the specialized UpdateHeader message inside
1544
* the generalized message::Statement container...
1546
/* Set up the update header */
1547
message::UpdateHeader *header= statement.mutable_update_header();
1548
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1551
(void) in_table->getShare()->getSchemaName(schema_name);
1553
(void) in_table->getShare()->getTableName(table_name);
1555
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1556
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1558
Field *current_field;
1559
Field **table_fields= in_table->getFields();
1561
message::FieldMetadata *field_metadata;
1563
/* We will read all the table's fields... */
1564
in_table->setReadSet();
1566
while ((current_field= *table_fields++) != NULL)
1569
* We add the "key field metadata" -- i.e. the fields which is
1570
* the primary key for the table.
1572
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1574
field_metadata= header->add_key_field_metadata();
1575
field_metadata->set_name(current_field->field_name);
1576
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1579
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1581
/* Field is changed from old to new */
1582
field_metadata= header->add_set_field_metadata();
1583
field_metadata->set_name(current_field->field_name);
1584
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1588
void TransactionServices::updateRecord(Session *in_session,
1590
const unsigned char *old_record,
1591
const unsigned char *new_record)
1593
ReplicationServices &replication_services= ReplicationServices::singleton();
1594
if (! replication_services.isActive())
1597
uint32_t next_segment_id= 1;
1598
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1600
message::UpdateData *data= statement.mutable_update_data();
1601
data->set_segment_id(next_segment_id);
1602
data->set_end_segment(true);
1603
message::UpdateRecord *record= data->add_record();
1605
Field *current_field;
1606
Field **table_fields= in_table->getFields();
1607
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1608
string_value->set_charset(system_charset_info);
1610
while ((current_field= *table_fields++) != NULL)
1613
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1614
* but then realized that an UPDATE statement could potentially have different values for
1615
* the SET field. For instance, imagine this SQL scenario:
1617
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1618
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1619
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1621
* We will generate two UpdateRecord messages with different set_value byte arrays.
1623
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1625
/* Store the original "read bit" for this field */
1626
bool is_read_set= current_field->isReadSet();
1628
/* We need to mark that we will "read" this field... */
1629
in_table->setReadSet(current_field->field_index);
1631
/* Read the string value of this field's contents */
1632
string_value= current_field->val_str(string_value);
1635
* Reset the read bit after reading field to its original state. This
1636
* prevents the field from being included in the WHERE clause
1638
current_field->setReadSet(is_read_set);
1640
if (current_field->is_null())
1642
record->add_is_null(true);
1643
record->add_after_value("", 0);
1647
record->add_is_null(false);
1648
record->add_after_value(string_value->c_ptr(), string_value->length());
1650
string_value->free();
1654
* Add the WHERE clause values now...for now, this means the
1655
* primary key field value. Replication only supports tables
1656
* with a primary key.
1658
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1661
* To say the below is ugly is an understatement. But it works.
1663
* @todo Move this crap into a real Record API.
1665
string_value= current_field->val_str(string_value,
1667
current_field->offset(const_cast<unsigned char *>(new_record)));
1668
record->add_key_value(string_value->c_ptr(), string_value->length());
1669
string_value->free();
1675
bool TransactionServices::isFieldUpdated(Field *current_field,
1677
const unsigned char *old_record,
1678
const unsigned char *new_record)
1681
* The below really should be moved into the Field API and Record API. But for now
1682
* we do this crazy pointer fiddling to figure out if the current field
1683
* has been updated in the supplied record raw byte pointers.
1685
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1686
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1688
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1690
bool old_value_is_null= current_field->is_null_in_record(old_record);
1691
bool new_value_is_null= current_field->is_null_in_record(new_record);
1693
bool isUpdated= false;
1694
if (old_value_is_null != new_value_is_null)
1696
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1700
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1708
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1716
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1718
uint32_t *next_segment_id)
1720
message::Statement *statement= in_session->getStatementMessage();
1721
message::Transaction *transaction= NULL;
1724
* Check the type for the current Statement message, if it is anything
1725
* other then DELETE we need to call finalize, this will ensure a
1726
* new DeleteStatement is created. If it is of type DELETE check
1727
* what table the DELETE belongs to, if it is a different table
1728
* call finalize, so a new DeleteStatement can be created.
1730
if (statement != NULL && statement->type() != message::Statement::DELETE)
1732
finalizeStatementMessage(*statement, in_session);
1733
statement= in_session->getStatementMessage();
1735
else if (statement != NULL)
1737
transaction= getActiveTransactionMessage(in_session);
1740
* If we've passed our threshold for the statement size (possible for
1741
* a bulk insert), we'll finalize the Statement and Transaction (doing
1742
* the Transaction will keep it from getting huge).
1744
if (static_cast<size_t>(transaction->ByteSize()) >=
1745
in_session->variables.transaction_message_threshold)
1747
/* Remember the transaction ID so we can re-use it */
1748
uint64_t trx_id= transaction->transaction_context().transaction_id();
1750
message::DeleteData *current_data= statement->mutable_delete_data();
1752
/* Caller should use this value when adding a new record */
1753
*next_segment_id= current_data->segment_id() + 1;
1755
current_data->set_end_segment(false);
1758
* Send the trx message to replicators after finalizing the
1759
* statement and transaction. This will also set the Transaction
1760
* and Statement objects in Session to NULL.
1762
commitTransactionMessage(in_session);
1765
* Statement and Transaction should now be NULL, so new ones will get
1766
* created. We reuse the transaction id since we are segmenting
1769
statement= in_session->getStatementMessage();
1770
transaction= getActiveTransactionMessage(in_session, false);
1771
assert(transaction != NULL);
1773
/* Set the transaction ID to match the previous messages */
1774
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1778
const message::DeleteHeader &delete_header= statement->delete_header();
1779
string old_table_name= delete_header.table_metadata().table_name();
1781
string current_table_name;
1782
(void) in_table->getShare()->getTableName(current_table_name);
1783
if (current_table_name.compare(old_table_name))
1785
finalizeStatementMessage(*statement, in_session);
1786
statement= in_session->getStatementMessage();
1790
/* append this DELETE query string */
1791
if (in_session->variables.replicate_query)
1793
string s(statement->sql());
1797
s.append(in_session->getQueryString()->c_str());
1798
statement->set_sql(s);
1801
statement->set_sql(in_session->getQueryString()->c_str());
1804
/* carry forward the existing segment id */
1805
const message::DeleteData ¤t_data= statement->delete_data();
1806
*next_segment_id= current_data.segment_id();
1811
if (statement == NULL)
1814
* Transaction will be non-NULL only if we had to segment it due to
1815
* transaction size above.
1817
if (transaction == NULL)
1818
transaction= getActiveTransactionMessage(in_session);
1821
* Transaction message initialized and set, but no statement created
1822
* yet. We construct one and initialize it, here, then return the
1823
* message after attaching the new Statement message pointer to the
1824
* Session for easy retrieval later...
1826
statement= transaction->add_statement();
1827
setDeleteHeader(*statement, in_session, in_table);
1828
in_session->setStatementMessage(statement);
1833
void TransactionServices::setDeleteHeader(message::Statement &statement,
1834
Session *in_session,
1837
initStatementMessage(statement, message::Statement::DELETE, in_session);
1840
* Now we construct the specialized DeleteHeader message inside
1841
* the generalized message::Statement container...
1843
message::DeleteHeader *header= statement.mutable_delete_header();
1844
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1847
(void) in_table->getShare()->getSchemaName(schema_name);
1849
(void) in_table->getShare()->getTableName(table_name);
1851
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1852
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1854
Field *current_field;
1855
Field **table_fields= in_table->getFields();
1857
message::FieldMetadata *field_metadata;
1859
while ((current_field= *table_fields++) != NULL)
1862
* Add the WHERE clause values now...for now, this means the
1863
* primary key field value. Replication only supports tables
1864
* with a primary key.
1866
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1868
field_metadata= header->add_key_field_metadata();
1869
field_metadata->set_name(current_field->field_name);
1870
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1875
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1877
ReplicationServices &replication_services= ReplicationServices::singleton();
1878
if (! replication_services.isActive())
1881
uint32_t next_segment_id= 1;
1882
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1884
message::DeleteData *data= statement.mutable_delete_data();
1885
data->set_segment_id(next_segment_id);
1886
data->set_end_segment(true);
1887
message::DeleteRecord *record= data->add_record();
1889
Field *current_field;
1890
Field **table_fields= in_table->getFields();
1891
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1892
string_value->set_charset(system_charset_info);
1894
while ((current_field= *table_fields++) != NULL)
1897
* Add the WHERE clause values now...for now, this means the
1898
* primary key field value. Replication only supports tables
1899
* with a primary key.
1901
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1903
if (use_update_record)
1906
* Temporarily point to the update record to get its value.
1907
* This is pretty much a hack in order to get the PK value from
1908
* the update record rather than the insert record. Field::val_str()
1909
* should not change anything in Field::ptr, so this should be safe.
1910
* We are careful not to change anything in old_ptr.
1912
const unsigned char *old_ptr= current_field->ptr;
1913
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1914
string_value= current_field->val_str(string_value);
1915
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1919
string_value= current_field->val_str(string_value);
1921
* @TODO Store optional old record value in the before data member
1924
record->add_key_value(string_value->c_ptr(), string_value->length());
1925
string_value->free();
1932
* Template for removing Statement records of different types.
1934
* The code for removing records from different Statement message types
1935
* is identical except for the class types that are embedded within the
1938
* There are 3 scenarios we need to look for:
1939
* - We've been asked to remove more records than exist in the Statement
1940
* - We've been asked to remove less records than exist in the Statement
1941
* - We've been asked to remove ALL records that exist in the Statement
1943
* If we are removing ALL records, then effectively we would be left with
1944
* an empty Statement message, so we should just remove it and clean up
1945
* message pointers in the Session object.
1947
template <class DataType, class RecordType>
1948
static bool removeStatementRecordsWithType(Session *session,
1952
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1954
/* If there aren't enough records to remove 'count' of them, error. */
1955
if (num_avail_recs < count)
1959
* If we are removing all of the data records, we'll just remove this
1960
* entire Statement message.
1962
if (num_avail_recs == count)
1964
message::Transaction *transaction= session->getTransactionMessage();
1965
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1966
statements->RemoveLast();
1969
* Now need to set the Session Statement pointer to either the previous
1970
* Statement, or NULL if there isn't one.
1972
if (statements->size() == 0)
1974
session->setStatementMessage(NULL);
1979
* There isn't a great way to get a pointer to the previous Statement
1980
* message using the RepeatedPtrField object, so we'll just get to it
1981
* using the Transaction message.
1983
int last_stmt_idx= transaction->statement_size() - 1;
1984
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1987
/* We only need to remove 'count' records */
1988
else if (num_avail_recs > count)
1990
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1992
records->RemoveLast();
1999
bool TransactionServices::removeStatementRecords(Session *session,
2002
ReplicationServices &replication_services= ReplicationServices::singleton();
2003
if (! replication_services.isActive())
2006
/* Get the most current Statement */
2007
message::Statement *statement= session->getStatementMessage();
2009
/* Make sure we have work to do */
2010
if (statement == NULL)
2015
switch (statement->type())
2017
case message::Statement::INSERT:
2019
message::InsertData *data= statement->mutable_insert_data();
2020
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
2024
case message::Statement::UPDATE:
2026
message::UpdateData *data= statement->mutable_update_data();
2027
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
2031
case message::Statement::DELETE: /* not sure if this one is possible... */
2033
message::DeleteData *data= statement->mutable_delete_data();
2034
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
2047
void TransactionServices::createTable(Session *in_session,
2048
const message::Table &table)
2050
ReplicationServices &replication_services= ReplicationServices::singleton();
2051
if (! replication_services.isActive())
2054
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2055
message::Statement *statement= transaction->add_statement();
2057
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
2060
* Construct the specialized CreateTableStatement message and attach
2061
* it to the generic Statement message
2063
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
2064
message::Table *new_table_message= create_table_statement->mutable_table();
2065
*new_table_message= table;
2067
finalizeStatementMessage(*statement, in_session);
2069
finalizeTransactionMessage(*transaction, in_session);
2071
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2073
cleanupTransactionMessage(transaction, in_session);
2077
void TransactionServices::createSchema(Session *in_session,
2078
const message::Schema &schema)
2080
ReplicationServices &replication_services= ReplicationServices::singleton();
2081
if (! replication_services.isActive())
2084
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2085
message::Statement *statement= transaction->add_statement();
2087
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
2090
* Construct the specialized CreateSchemaStatement message and attach
2091
* it to the generic Statement message
2093
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
2094
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2095
*new_schema_message= schema;
2097
finalizeStatementMessage(*statement, in_session);
2099
finalizeTransactionMessage(*transaction, in_session);
2101
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2103
cleanupTransactionMessage(transaction, in_session);
2107
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
2109
ReplicationServices &replication_services= ReplicationServices::singleton();
2110
if (! replication_services.isActive())
2113
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2114
message::Statement *statement= transaction->add_statement();
2116
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
2119
* Construct the specialized DropSchemaStatement message and attach
2120
* it to the generic Statement message
2122
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2124
drop_schema_statement->set_schema_name(schema_name);
2126
finalizeStatementMessage(*statement, in_session);
2128
finalizeTransactionMessage(*transaction, in_session);
2130
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2132
cleanupTransactionMessage(transaction, in_session);
2135
void TransactionServices::dropTable(Session *in_session,
2136
const string &schema_name,
2137
const string &table_name,
2140
ReplicationServices &replication_services= ReplicationServices::singleton();
2141
if (! replication_services.isActive())
2144
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2145
message::Statement *statement= transaction->add_statement();
2147
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2150
* Construct the specialized DropTableStatement message and attach
2151
* it to the generic Statement message
2153
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2155
drop_table_statement->set_if_exists_clause(if_exists);
2157
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2159
table_metadata->set_schema_name(schema_name);
2160
table_metadata->set_table_name(table_name);
2162
finalizeStatementMessage(*statement, in_session);
2164
finalizeTransactionMessage(*transaction, in_session);
2166
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2168
cleanupTransactionMessage(transaction, in_session);
2171
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2173
ReplicationServices &replication_services= ReplicationServices::singleton();
2174
if (! replication_services.isActive())
2177
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2178
message::Statement *statement= transaction->add_statement();
2180
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2183
* Construct the specialized TruncateTableStatement message and attach
2184
* it to the generic Statement message
2186
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2187
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2190
(void) in_table->getShare()->getSchemaName(schema_name);
2192
(void) in_table->getShare()->getTableName(table_name);
2194
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2195
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2197
finalizeStatementMessage(*statement, in_session);
2199
finalizeTransactionMessage(*transaction, in_session);
2201
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2203
cleanupTransactionMessage(transaction, in_session);
2206
void TransactionServices::rawStatement(Session *in_session, const string &query)
2208
ReplicationServices &replication_services= ReplicationServices::singleton();
2209
if (! replication_services.isActive())
2212
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2213
message::Statement *statement= transaction->add_statement();
2215
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2216
statement->set_sql(query);
2217
finalizeStatementMessage(*statement, in_session);
2219
finalizeTransactionMessage(*transaction, in_session);
2221
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2223
cleanupTransactionMessage(transaction, in_session);
2226
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2228
ReplicationServices &replication_services= ReplicationServices::singleton();
2229
if (! replication_services.isActive())
2232
message::Transaction *transaction= new (nothrow) message::Transaction();
2234
// set server id, start timestamp
2235
initTransactionMessage(*transaction, session, true);
2237
// set end timestamp
2238
finalizeTransactionMessage(*transaction, session);
2240
message::Event *trx_event= transaction->mutable_event();
2242
trx_event->CopyFrom(event);
2244
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2248
return static_cast<int>(result);
2251
bool TransactionServices::sendStartupEvent(Session *session)
2253
message::Event event;
2254
event.set_type(message::Event::STARTUP);
2255
if (sendEvent(session, event) != 0)
2260
bool TransactionServices::sendShutdownEvent(Session *session)
2262
message::Event event;
2263
event.set_type(message::Event::SHUTDOWN);
2264
if (sendEvent(session, event) != 0)
2269
} /* namespace drizzled */