1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
* @file Transaction processing code
26
* The TransactionServices component takes internal events (for instance the start of a
27
* transaction, the changing of a record, or the rollback of a transaction)
28
* and constructs GPB Messages that are passed to the ReplicationServices
29
* component and used during replication.
31
* The reason for this functionality is to encapsulate all communication
32
* between the kernel and the replicator/applier plugins into GPB Messages.
33
* Instead of the plugin having to understand the (often fluidly changing)
34
* mechanics of the kernel, all the plugin needs to understand is the message
35
* format, and GPB messages provide a nice, clear, and versioned format for
38
* @see /drizzled/message/transaction.proto
42
* We really should store the raw bytes in the messages, not the
43
* String value of the Field. But, to do that, the
44
* statement_transform library needs first to be updated
45
* to include the transformation code to convert raw
46
* Drizzle-internal Field byte representation into something
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
78
#include <google/protobuf/repeated_field.h>
81
using namespace google;
87
* @defgroup Transactions
91
* Transaction handling in the server
95
* In each client connection, Drizzle maintains two transaction
96
* contexts representing the state of the:
98
* 1) Statement Transaction
99
* 2) Normal Transaction
101
* These two transaction contexts represent the transactional
102
* state of a Session's SQL and XA transactions for a single
103
* SQL statement or a series of SQL statements.
105
* When the Session's connection is in AUTOCOMMIT mode, there
106
* is no practical difference between the statement and the
107
* normal transaction, as each SQL statement is committed or
108
* rolled back depending on the success or failure of the
109
* indvidual SQL statement.
111
* When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
* the Session has explicitly begun a normal SQL transaction using
113
* a BEGIN WORK/START TRANSACTION statement, then the normal
114
* transaction context tracks the aggregate transaction state of
115
* the SQL transaction's individual statements, and the SQL
116
* transaction's commit or rollback is done atomically for all of
117
* the SQL transaction's statement's data changes.
119
* Technically, a statement transaction can be viewed as a savepoint
120
* which is maintained automatically in order to make effects of one
123
* The normal transaction is started by the user and is typically
124
* ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
* The exception to this is that DDL statements implicitly COMMIT
126
* any previously active normal transaction before they begin executing.
128
* In Drizzle, unlike MySQL, plugins other than a storage engine
129
* may participate in a transaction. All plugin::TransactionalStorageEngine
130
* plugins will automatically be monitored by Drizzle's transaction
131
* manager (implemented in this source file), as will all plugins which
132
* implement plugin::XaResourceManager and register with the transaction
135
* If Drizzle's transaction manager sees that more than one resource
136
* manager (transactional storage engine or XA resource manager) has modified
137
* data state during a statement or normal transaction, the transaction
138
* manager will automatically use a two-phase commit protocol for all
139
* resources which support XA's distributed transaction protocol. Unlike
140
* MySQL, storage engines need not manually register with the transaction
141
* manager during a statement's execution. Previously, in MySQL, all
142
* handlertons would have to call trans_register_ha() at some point after
143
* modifying data state in order to have MySQL include that handler in
144
* an XA transaction. Drizzle does all of this grunt work behind the
145
* scenes for the storage engine implementers.
147
* When a connection is closed, the current normal transaction, if
148
* any is currently active, is rolled back.
150
* Transaction life cycle
151
* ----------------------
153
* When a new connection is established, session->transaction
154
* members are initialized to an empty state. If a statement uses any tables,
155
* all affected engines are registered in the statement engine list automatically
156
* in plugin::StorageEngine::startStatement() and
157
* plugin::TransactionalStorageEngine::startTransaction().
159
* You can view the lifetime of a normal transaction in the following
162
* drizzled::statement::Statement::execute()
163
* drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
* drizzled::TransactionServices::registerResourceForTransaction()
165
* drizzled::TransactionServices::registerResourceForStatement()
166
* drizzled::plugin::StorageEngine::startStatement()
167
* drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
* drizzled::plugin::StorageEngine::endStatement()
169
* drizzled::TransactionServices::autocommitOrRollback()
170
* drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
* drizzled::XaResourceManager::xaCommit() <-- or rollback()
173
* Roles and responsibilities
174
* --------------------------
176
* Beginning of SQL Statement (and Statement Transaction)
177
* ------------------------------------------------------
179
* At the start of each SQL statement, for each storage engine
180
* <strong>that is involved in the SQL statement</strong>, the kernel
181
* calls the engine's plugin::StoragEngine::startStatement() method. If the
182
* engine needs to track some data for the statement, it should use
183
* this method invocation to initialize this data. This is the
184
* beginning of what is called the "statement transaction".
186
* <strong>For transaction storage engines (those storage engines
187
* that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
* kernel automatically determines if the start of the SQL statement
189
* transaction should <em>also</em> begin the normal SQL transaction.
190
* This occurs when the connection is in NOT in autocommit mode. If
191
* the kernel detects this, then the kernel automatically starts the
192
* normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
* method and then calls plugin::StorageEngine::startStatement()
196
* Beginning of an SQL "Normal" Transaction
197
* ----------------------------------------
199
* As noted above, a "normal SQL transaction" may be started when
200
* an SQL statement is started in a connection and the connection is
201
* NOT in AUTOCOMMIT mode. This is automatically done by the kernel.
203
* In addition, when a user executes a START TRANSACTION or
204
* BEGIN WORK statement in a connection, the kernel explicitly
205
* calls each transactional storage engine's startTransaction() method.
207
* Ending of an SQL Statement (and Statement Transaction)
208
* ------------------------------------------------------
210
* At the end of each SQL statement, for each of the aforementioned
211
* involved storage engines, the kernel calls the engine's
212
* plugin::StorageEngine::endStatement() method. If the engine
213
* has initialized or modified some internal data about the
214
* statement transaction, it should use this method to reset or destroy
215
* this data appropriately.
217
* Ending of an SQL "Normal" Transaction
218
* -------------------------------------
220
* The end of a normal transaction is either a ROLLBACK or a COMMIT,
221
* depending on the success or failure of the statement transaction(s)
224
* The end of a "normal transaction" occurs when any of the following
227
* 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
* then the normal transaction which encloses the statement
230
* 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
* 3) Just before a DDL operation occurs, the kernel will implicitly
232
* commit the active normal transaction
234
* Transactions and Non-transactional Storage Engines
235
* --------------------------------------------------
237
* For non-transactional engines, this call can be safely ignored, an
238
* the kernel tracks whether a non-transactional engine has changed
239
* any data state, and warns the user appropriately if a transaction
240
* (statement or normal) is rolled back after such non-transactional
241
* data changes have been made.
243
* XA Two-phase Commit Protocol
244
* ----------------------------
246
* During statement execution, whenever any of data-modifying
247
* PSEA API methods is used, e.g. Cursor::write_row() or
248
* Cursor::update_row(), the read-write flag is raised in the
249
* statement transaction for the involved engine.
250
* Currently All PSEA calls are "traced", and the data can not be
251
* changed in a way other than issuing a PSEA call. Important:
252
* unless this invariant is preserved the server will not know that
253
* a transaction in a given engine is read-write and will not
254
* involve the two-phase commit protocol!
256
* At the end of a statement, TransactionServices::autocommitOrRollback()
257
* is invoked. This call in turn
258
* invokes plugin::XaResourceManager::xapPepare() for every involved XA
261
* Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
* or plugin::XaResourceManager::xaCommit() (depending on what the resource
265
* If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
* invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
* At statement commit, the statement-related read-write engine
268
* flag is propagated to the corresponding flag in the normal
269
* transaction. When the commit is complete, the list of registered
270
* engines is cleared.
272
* Rollback is handled in a similar fashion.
274
* Additional notes on DDL and the normal transaction.
275
* ---------------------------------------------------
277
* CREATE TABLE .. SELECT can start a *new* normal transaction
278
* because of the fact that SELECTs on a transactional storage
279
* engine participate in the normal SQL transaction (due to
280
* isolation level issues and consistent read views).
282
* Behaviour of the server in this case is currently badly
285
* DDL statements use a form of "semantic" logging
286
* to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
* the newly created table is deleted.
289
* In addition, some DDL statements issue interim transaction
290
* commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
* from the original table to the internal temporary table. Other
292
* statements, e.g. CREATE TABLE ... SELECT do not always commit
295
* And finally there is a group of DDL statements such as
296
* RENAME/DROP TABLE that doesn't start a new transaction
297
* and doesn't commit.
299
* A consistent behaviour is perhaps to always commit the normal
300
* transaction after all DDLs, just like the statement transaction
301
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
316
void TransactionServices::registerResourceForStatement(Session *session,
317
plugin::MonitoredInTransaction *monitored,
318
plugin::TransactionalStorageEngine *engine)
320
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
323
* Now we automatically register this resource manager for the
324
* normal transaction. This is fine because a statement
325
* transaction registration should always enlist the resource
326
* in the normal transaction which contains the statement
329
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session->transaction.stmt;
333
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
335
if (resource_context->isStarted())
336
return; /* already registered, return */
338
assert(monitored->participatesInSqlTransaction());
339
assert(not monitored->participatesInXaTransaction());
341
resource_context->setMonitored(monitored);
342
resource_context->setTransactionalStorageEngine(engine);
343
trans->registerResource(resource_context);
345
trans->no_2pc|= true;
348
void TransactionServices::registerResourceForStatement(Session *session,
349
plugin::MonitoredInTransaction *monitored,
350
plugin::TransactionalStorageEngine *engine,
351
plugin::XaResourceManager *resource_manager)
353
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
356
* Now we automatically register this resource manager for the
357
* normal transaction. This is fine because a statement
358
* transaction registration should always enlist the resource
359
* in the normal transaction which contains the statement
362
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session->transaction.stmt;
366
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
368
if (resource_context->isStarted())
369
return; /* already registered, return */
371
assert(monitored->participatesInXaTransaction());
372
assert(monitored->participatesInSqlTransaction());
374
resource_context->setMonitored(monitored);
375
resource_context->setTransactionalStorageEngine(engine);
376
resource_context->setXaResourceManager(resource_manager);
377
trans->registerResource(resource_context);
379
trans->no_2pc|= false;
382
void TransactionServices::registerResourceForTransaction(Session *session,
383
plugin::MonitoredInTransaction *monitored,
384
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session->transaction.all;
387
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
389
if (resource_context->isStarted())
390
return; /* already registered, return */
392
session->server_status|= SERVER_STATUS_IN_TRANS;
394
trans->registerResource(resource_context);
396
assert(monitored->participatesInSqlTransaction());
397
assert(not monitored->participatesInXaTransaction());
399
resource_context->setMonitored(monitored);
400
resource_context->setTransactionalStorageEngine(engine);
401
trans->no_2pc|= true;
403
if (session->transaction.xid_state.xid.is_null())
404
session->transaction.xid_state.xid.set(session->getQueryId());
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session->getResourceContext(monitored, 0)->isStarted())
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
plugin::MonitoredInTransaction *monitored,
413
plugin::TransactionalStorageEngine *engine,
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session->transaction.all;
417
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
419
if (resource_context->isStarted())
420
return; /* already registered, return */
422
session->server_status|= SERVER_STATUS_IN_TRANS;
424
trans->registerResource(resource_context);
426
assert(monitored->participatesInSqlTransaction());
428
resource_context->setMonitored(monitored);
429
resource_context->setXaResourceManager(resource_manager);
430
resource_context->setTransactionalStorageEngine(engine);
431
trans->no_2pc|= true;
433
if (session->transaction.xid_state.xid.is_null())
434
session->transaction.xid_state.xid.set(session->getQueryId());
436
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
438
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session->getResourceContext(monitored, 0)->isStarted())
440
registerResourceForStatement(session, monitored, engine, resource_manager);
443
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
451
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
if (session->getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
463
return session->getXaId();
470
1 transaction was rolled back
472
2 error during commit, data may be inconsistent
475
Since we don't support nested statement transactions in 5.0,
476
we can't commit or rollback stmt transactions while we are inside
477
stored functions or triggers. So we simply do nothing now.
478
TODO: This should be fixed in later ( >= 5.1) releases.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
482
int error= 0, cookie= 0;
484
'all' means that this is either an explicit commit issued by
485
user, or an implicit commit issued by a DDL.
487
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
488
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
490
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
493
We must not commit the normal transaction if a statement
494
transaction is pending. Otherwise statement transaction
495
flags will not get propagated to its normal transaction's
498
assert(session->transaction.stmt.getResourceContexts().empty() ||
499
trans == &session->transaction.stmt);
501
if (resource_contexts.empty() == false)
503
if (is_real_trans && session->wait_if_global_read_lock(false, false))
505
rollbackTransaction(session, normal_transaction);
510
* If replication is on, we do a PREPARE on the resource managers, push the
511
* Transaction message across the replication stream, and then COMMIT if the
512
* replication stream returned successfully.
514
if (shouldConstructMessages())
516
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
517
it != resource_contexts.end() && ! error;
520
ResourceContext *resource_context= *it;
523
Do not call two-phase commit if this particular
524
transaction is read-only. This allows for simpler
525
implementation in engines that are always read-only.
527
if (! resource_context->hasModifiedData())
530
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
532
if (resource->participatesInXaTransaction())
534
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
536
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
541
session->status_var.ha_prepare_count++;
545
if (error == 0 && is_real_trans)
548
* Push the constructed Transaction messages across to
549
* replicators and appliers.
551
error= commitTransactionMessage(session);
555
rollbackTransaction(session, normal_transaction);
560
error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
563
session->startWaitingGlobalReadLock();
570
This function does not care about global read lock. A caller should.
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
575
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
576
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
578
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
580
if (resource_contexts.empty() == false)
582
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
583
it != resource_contexts.end();
587
ResourceContext *resource_context= *it;
589
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
591
if (resource->participatesInXaTransaction())
593
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
595
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
598
else if (normal_transaction)
600
session->status_var.ha_commit_count++;
603
else if (resource->participatesInSqlTransaction())
605
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
607
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
610
else if (normal_transaction)
612
session->status_var.ha_commit_count++;
615
resource_context->reset(); /* keep it conveniently zero-filled */
619
session->transaction.xid_state.xid.null();
621
if (normal_transaction)
623
session->variables.tx_isolation= session->session_tx_isolation;
624
session->transaction.cleanup();
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
634
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
635
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
637
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
640
We must not rollback the normal transaction if a statement
641
transaction is pending.
643
assert(session->transaction.stmt.getResourceContexts().empty() ||
644
trans == &session->transaction.stmt);
646
if (resource_contexts.empty() == false)
648
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
649
it != resource_contexts.end();
653
ResourceContext *resource_context= *it;
655
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
657
if (resource->participatesInXaTransaction())
659
if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
661
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
664
else if (normal_transaction)
666
session->status_var.ha_rollback_count++;
669
else if (resource->participatesInSqlTransaction())
671
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
673
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
676
else if (normal_transaction)
678
session->status_var.ha_rollback_count++;
681
resource_context->reset(); /* keep it conveniently zero-filled */
685
* We need to signal the ROLLBACK to ReplicationServices here
686
* BEFORE we set the transaction ID to NULL. This is because
687
* if a bulk segment was sent to replicators, we need to send
688
* a rollback statement with the corresponding transaction ID
691
if (normal_transaction)
692
rollbackTransactionMessage(session);
694
rollbackStatementMessage(session);
697
session->transaction.xid_state.xid.null();
698
if (normal_transaction)
700
session->variables.tx_isolation=session->session_tx_isolation;
701
session->transaction.cleanup();
704
if (normal_transaction)
705
session->transaction_rollback_request= false;
708
* If a non-transactional table was updated, warn the user
711
session->transaction.all.hasModifiedNonTransData() &&
712
session->getKilled() != Session::KILL_CONNECTION)
714
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
ER_WARNING_NOT_COMPLETE_ROLLBACK,
716
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
723
This is used to commit or rollback a single statement depending on
727
Note that if the autocommit is on, then the following call inside
728
InnoDB will commit or rollback the whole transaction (= the statement). The
729
autocommit mechanism built into InnoDB is based on counting locks, but if
730
the user has used LOCK TABLES then that mechanism does not know to do the
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
735
/* One GPB Statement message per SQL statement */
736
message::Statement *statement= session->getStatementMessage();
737
if ((statement != NULL) && (! error))
738
finalizeStatementMessage(*statement, session);
740
if (session->transaction.stmt.getResourceContexts().empty() == false)
742
TransactionContext *trans = &session->transaction.stmt;
743
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
it != resource_contexts.end();
748
ResourceContext *resource_context= *it;
750
resource_context->getTransactionalStorageEngine()->endStatement(session);
755
if (commitTransaction(session, false))
760
(void) rollbackTransaction(session, false);
761
if (session->transaction_rollback_request)
762
(void) rollbackTransaction(session, true);
765
session->variables.tx_isolation= session->session_tx_isolation;
770
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
772
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
774
/* The below is perfectly fine, since we're simply comparing addresses for the underlying
775
* resources aren't the same... */
776
return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
783
TransactionContext *trans= &session->transaction.all;
784
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
785
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
787
trans->no_2pc= false;
789
rolling back to savepoint in all storage engines that were part of the
790
transaction when the savepoint was set
792
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
793
it != sv_resource_contexts.end();
797
ResourceContext *resource_context= *it;
799
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
801
if (resource->participatesInSqlTransaction())
803
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
805
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
810
session->status_var.ha_savepoint_rollback_count++;
813
trans->no_2pc|= not resource->participatesInXaTransaction();
816
rolling back the transaction in all storage engines that were not part of
817
the transaction when the savepoint was set
820
TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
821
TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
822
TransactionContext::ResourceContexts set_difference_contexts;
825
* Bug #542299: segfault during set_difference() below. copy<>() requires pre-allocation
826
* of all elements, including the target, which is why we pre-allocate the set_difference_contexts
829
set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
831
sort(sorted_tran_resource_contexts.begin(),
832
sorted_tran_resource_contexts.end(),
833
ResourceContextCompare());
834
sort(sorted_sv_resource_contexts.begin(),
835
sorted_sv_resource_contexts.end(),
836
ResourceContextCompare());
837
set_difference(sorted_tran_resource_contexts.begin(),
838
sorted_tran_resource_contexts.end(),
839
sorted_sv_resource_contexts.begin(),
840
sorted_sv_resource_contexts.end(),
841
set_difference_contexts.begin(),
842
ResourceContextCompare());
844
* set_difference_contexts now contains all resource contexts
845
* which are in the transaction context but were NOT in the
846
* savepoint's resource contexts.
849
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
850
it != set_difference_contexts.end();
853
ResourceContext *resource_context= *it;
856
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
858
if (resource->participatesInSqlTransaction())
860
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
862
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
867
session->status_var.ha_rollback_count++;
870
resource_context->reset(); /* keep it conveniently zero-filled */
873
trans->setResourceContexts(sv_resource_contexts);
875
if (shouldConstructMessages())
877
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
878
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
879
if (savepoint_transaction != NULL)
881
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
882
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
883
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
884
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
886
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
887
uint32_t num_statements = savepoint_transaction_copy->statement_size();
888
if (num_statements == 0)
890
session->setStatementMessage(NULL);
894
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
896
session->setTransactionMessage(savepoint_transaction_copy);
905
according to the sql standard (ISO/IEC 9075-2:2003)
906
section "4.33.4 SQL-statements and transaction states",
907
NamedSavepoint is *not* transaction-initiating SQL-statement
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
912
TransactionContext *trans= &session->transaction.all;
913
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
915
if (resource_contexts.empty() == false)
917
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
918
it != resource_contexts.end();
921
ResourceContext *resource_context= *it;
924
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
926
if (resource->participatesInSqlTransaction())
928
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
930
my_error(ER_GET_ERRNO, MYF(0), err);
935
session->status_var.ha_savepoint_count++;
941
Remember the list of registered storage engines.
943
sv.setResourceContexts(resource_contexts);
945
if (shouldConstructMessages())
947
message::Transaction *transaction= session->getTransactionMessage();
949
if (transaction != NULL)
951
message::Transaction *transaction_savepoint=
952
new message::Transaction(*transaction);
953
sv.setTransactionMessage(transaction_savepoint);
960
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
964
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
966
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
967
it != resource_contexts.end();
971
ResourceContext *resource_context= *it;
973
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
975
if (resource->participatesInSqlTransaction())
977
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
979
my_error(ER_GET_ERRNO, MYF(0), err);
988
bool TransactionServices::shouldConstructMessages()
990
ReplicationServices &replication_services= ReplicationServices::singleton();
991
return replication_services.isActive();
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
996
message::Transaction *transaction= in_session->getTransactionMessage();
998
if (unlikely(transaction == NULL))
1001
* Allocate and initialize a new transaction message
1002
* for this Session object. Session is responsible for
1003
* deleting transaction message when done with it.
1005
transaction= new (nothrow) message::Transaction();
1006
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
in_session->setTransactionMessage(transaction);
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
Session *in_session,
1016
bool should_inc_trx_id)
1018
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
trx->set_server_id(in_session->getServerId());
1021
if (should_inc_trx_id)
1023
trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
in_session->setXaId(0);
1028
trx->set_transaction_id(0);
1031
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
Session *in_session)
1037
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
Session *in_session)
1044
delete in_transaction;
1045
in_session->setStatementMessage(NULL);
1046
in_session->setTransactionMessage(NULL);
1047
in_session->setXaId(0);
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1052
ReplicationServices &replication_services= ReplicationServices::singleton();
1053
if (! replication_services.isActive())
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (in_session->getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
1064
message::Statement *statement= in_session->getStatementMessage();
1066
if (statement != NULL)
1068
finalizeStatementMessage(*statement, in_session);
1071
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, in_session);
1085
finalizeTransactionMessage(*transaction, in_session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1089
cleanupTransactionMessage(transaction, in_session);
1091
return static_cast<int>(result);
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
message::Statement::Type in_type,
1096
Session *in_session)
1098
statement.set_type(in_type);
1099
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1101
if (in_session->variables.replicate_query)
1102
statement.set_sql(in_session->getQueryString()->c_str());
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session *in_session)
1108
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
in_session->setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
if (! replication_services.isActive())
1118
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1121
* OK, so there are two situations that we need to deal with here:
1123
* 1) We receive an instruction to ROLLBACK the current transaction
1124
* and the currently-stored Transaction message is *self-contained*,
1125
* meaning that no Statement messages in the Transaction message
1126
* contain a message having its segment_id member greater than 1. If
1127
* no non-segment ID 1 members are found, we can simply clear the
1128
* current Transaction message and remove it from memory.
1130
* 2) If the Transaction message does indeed have a non-end segment, that
1131
* means that a bulk update/delete/insert Transaction message segment
1132
* has previously been sent over the wire to replicators. In this case,
1133
* we need to package a Transaction with a Statement message of type
1134
* ROLLBACK to indicate to replicators that previously-transmitted
1135
* messages must be un-applied.
1137
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1139
/* Remember the transaction ID so we can re-use it */
1140
uint64_t trx_id= transaction->transaction_context().transaction_id();
1143
* Clear the transaction, create a Rollback statement message,
1144
* attach it to the transaction, and push it to replicators.
1146
transaction->Clear();
1147
initTransactionMessage(*transaction, in_session, false);
1149
/* Set the transaction ID to match the previous messages */
1150
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
message::Statement *statement= transaction->add_statement();
1154
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
finalizeStatementMessage(*statement, in_session);
1157
finalizeTransactionMessage(*transaction, in_session);
1159
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1161
cleanupTransactionMessage(transaction, in_session);
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1166
ReplicationServices &replication_services= ReplicationServices::singleton();
1167
if (! replication_services.isActive())
1170
message::Statement *current_statement= in_session->getStatementMessage();
1172
/* If we never added a Statement message, nothing to undo. */
1173
if (current_statement == NULL)
1177
* If the Statement has been segmented, then we've already pushed a portion
1178
* of this Statement's row changes through the replication stream and we
1179
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
* delete the current Statement message.
1182
bool is_segmented= false;
1184
switch (current_statement->type())
1186
case message::Statement::INSERT:
1187
if (current_statement->insert_data().segment_id() > 1)
1191
case message::Statement::UPDATE:
1192
if (current_statement->update_data().segment_id() > 1)
1196
case message::Statement::DELETE:
1197
if (current_statement->delete_data().segment_id() > 1)
1206
* Remove the Statement message we've been working with (same as
1207
* current_statement).
1209
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
statements_in_txn= transaction->mutable_statement();
1212
statements_in_txn->RemoveLast();
1213
in_session->setStatementMessage(NULL);
1216
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
* an indicator to cancel the previous Statement message which should have
1218
* had its end_segment attribute set to false.
1222
current_statement= transaction->add_statement();
1223
initStatementMessage(*current_statement,
1224
message::Statement::ROLLBACK_STATEMENT,
1226
finalizeStatementMessage(*current_statement, in_session);
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1232
uint32_t *next_segment_id)
1234
message::Statement *statement= in_session->getStatementMessage();
1235
message::Transaction *transaction= NULL;
1238
* Check the type for the current Statement message, if it is anything
1239
* other then INSERT we need to call finalize, this will ensure a
1240
* new InsertStatement is created. If it is of type INSERT check
1241
* what table the INSERT belongs to, if it is a different table
1242
* call finalize, so a new InsertStatement can be created.
1244
if (statement != NULL && statement->type() != message::Statement::INSERT)
1246
finalizeStatementMessage(*statement, in_session);
1247
statement= in_session->getStatementMessage();
1249
else if (statement != NULL)
1251
transaction= getActiveTransactionMessage(in_session);
1254
* If we've passed our threshold for the statement size (possible for
1255
* a bulk insert), we'll finalize the Statement and Transaction (doing
1256
* the Transaction will keep it from getting huge).
1258
if (static_cast<size_t>(transaction->ByteSize()) >=
1259
in_session->variables.transaction_message_threshold)
1261
/* Remember the transaction ID so we can re-use it */
1262
uint64_t trx_id= transaction->transaction_context().transaction_id();
1264
message::InsertData *current_data= statement->mutable_insert_data();
1266
/* Caller should use this value when adding a new record */
1267
*next_segment_id= current_data->segment_id() + 1;
1269
current_data->set_end_segment(false);
1272
* Send the trx message to replicators after finalizing the
1273
* statement and transaction. This will also set the Transaction
1274
* and Statement objects in Session to NULL.
1276
commitTransactionMessage(in_session);
1279
* Statement and Transaction should now be NULL, so new ones will get
1280
* created. We reuse the transaction id since we are segmenting
1283
statement= in_session->getStatementMessage();
1284
transaction= getActiveTransactionMessage(in_session, false);
1285
assert(transaction != NULL);
1287
/* Set the transaction ID to match the previous messages */
1288
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1292
const message::InsertHeader &insert_header= statement->insert_header();
1293
string old_table_name= insert_header.table_metadata().table_name();
1295
string current_table_name;
1296
(void) in_table->getShare()->getTableName(current_table_name);
1298
if (current_table_name.compare(old_table_name))
1300
finalizeStatementMessage(*statement, in_session);
1301
statement= in_session->getStatementMessage();
1305
/* carry forward the existing segment id */
1306
const message::InsertData ¤t_data= statement->insert_data();
1307
*next_segment_id= current_data.segment_id();
1312
if (statement == NULL)
1315
* Transaction will be non-NULL only if we had to segment it due to
1316
* transaction size above.
1318
if (transaction == NULL)
1319
transaction= getActiveTransactionMessage(in_session);
1322
* Transaction message initialized and set, but no statement created
1323
* yet. We construct one and initialize it, here, then return the
1324
* message after attaching the new Statement message pointer to the
1325
* Session for easy retrieval later...
1327
statement= transaction->add_statement();
1328
setInsertHeader(*statement, in_session, in_table);
1329
in_session->setStatementMessage(statement);
1334
void TransactionServices::setInsertHeader(message::Statement &statement,
1335
Session *in_session,
1338
initStatementMessage(statement, message::Statement::INSERT, in_session);
1341
* Now we construct the specialized InsertHeader message inside
1342
* the generalized message::Statement container...
1344
/* Set up the insert header */
1345
message::InsertHeader *header= statement.mutable_insert_header();
1346
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1349
(void) in_table->getShare()->getSchemaName(schema_name);
1351
(void) in_table->getShare()->getTableName(table_name);
1353
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1354
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1356
Field *current_field;
1357
Field **table_fields= in_table->getFields();
1359
message::FieldMetadata *field_metadata;
1361
/* We will read all the table's fields... */
1362
in_table->setReadSet();
1364
while ((current_field= *table_fields++) != NULL)
1366
field_metadata= header->add_field_metadata();
1367
field_metadata->set_name(current_field->field_name);
1368
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1372
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1374
ReplicationServices &replication_services= ReplicationServices::singleton();
1375
if (! replication_services.isActive())
1378
* We do this check here because we don't want to even create a
1379
* statement if there isn't a primary key on the table...
1383
* Multi-column primary keys are handled how exactly?
1385
if (not in_table->getShare()->hasPrimaryKey())
1387
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1391
uint32_t next_segment_id= 1;
1392
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1394
message::InsertData *data= statement.mutable_insert_data();
1395
data->set_segment_id(next_segment_id);
1396
data->set_end_segment(true);
1397
message::InsertRecord *record= data->add_record();
1399
Field *current_field;
1400
Field **table_fields= in_table->getFields();
1402
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1403
string_value->set_charset(system_charset_info);
1405
/* We will read all the table's fields... */
1406
in_table->setReadSet();
1408
while ((current_field= *table_fields++) != NULL)
1410
if (current_field->is_null())
1412
record->add_is_null(true);
1413
record->add_insert_value("", 0);
1417
string_value= current_field->val_str_internal(string_value);
1418
record->add_is_null(false);
1419
record->add_insert_value(string_value->c_ptr(), string_value->length());
1420
string_value->free();
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1428
const unsigned char *old_record,
1429
const unsigned char *new_record,
1430
uint32_t *next_segment_id)
1432
message::Statement *statement= in_session->getStatementMessage();
1433
message::Transaction *transaction= NULL;
1436
* Check the type for the current Statement message, if it is anything
1437
* other then UPDATE we need to call finalize, this will ensure a
1438
* new UpdateStatement is created. If it is of type UPDATE check
1439
* what table the UPDATE belongs to, if it is a different table
1440
* call finalize, so a new UpdateStatement can be created.
1442
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1444
finalizeStatementMessage(*statement, in_session);
1445
statement= in_session->getStatementMessage();
1447
else if (statement != NULL)
1449
transaction= getActiveTransactionMessage(in_session);
1452
* If we've passed our threshold for the statement size (possible for
1453
* a bulk insert), we'll finalize the Statement and Transaction (doing
1454
* the Transaction will keep it from getting huge).
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
in_session->variables.transaction_message_threshold)
1459
/* Remember the transaction ID so we can re-use it */
1460
uint64_t trx_id= transaction->transaction_context().transaction_id();
1462
message::UpdateData *current_data= statement->mutable_update_data();
1464
/* Caller should use this value when adding a new record */
1465
*next_segment_id= current_data->segment_id() + 1;
1467
current_data->set_end_segment(false);
1470
* Send the trx message to replicators after finalizing the
1471
* statement and transaction. This will also set the Transaction
1472
* and Statement objects in Session to NULL.
1474
commitTransactionMessage(in_session);
1477
* Statement and Transaction should now be NULL, so new ones will get
1478
* created. We reuse the transaction id since we are segmenting
1481
statement= in_session->getStatementMessage();
1482
transaction= getActiveTransactionMessage(in_session, false);
1483
assert(transaction != NULL);
1485
/* Set the transaction ID to match the previous messages */
1486
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1490
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1492
/* carry forward the existing segment id */
1493
const message::UpdateData ¤t_data= statement->update_data();
1494
*next_segment_id= current_data.segment_id();
1498
finalizeStatementMessage(*statement, in_session);
1499
statement= in_session->getStatementMessage();
1504
if (statement == NULL)
1507
* Transaction will be non-NULL only if we had to segment it due to
1508
* transaction size above.
1510
if (transaction == NULL)
1511
transaction= getActiveTransactionMessage(in_session);
1514
* Transaction message initialized and set, but no statement created
1515
* yet. We construct one and initialize it, here, then return the
1516
* message after attaching the new Statement message pointer to the
1517
* Session for easy retrieval later...
1519
statement= transaction->add_statement();
1520
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
in_session->setStatementMessage(statement);
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1528
const unsigned char *old_record,
1529
const unsigned char *new_record)
1531
const message::UpdateHeader &update_header= statement.update_header();
1532
string old_table_name= update_header.table_metadata().table_name();
1534
string current_table_name;
1535
(void) in_table->getShare()->getTableName(current_table_name);
1536
if (current_table_name.compare(old_table_name))
1542
/* Compare the set fields in the existing UpdateHeader and see if they
1543
* match the updated fields in the new record, if they do not we must
1544
* create a new UpdateHeader
1546
size_t num_set_fields= update_header.set_field_metadata_size();
1548
Field *current_field;
1549
Field **table_fields= in_table->getFields();
1550
in_table->setReadSet();
1552
size_t num_calculated_updated_fields= 0;
1554
while ((current_field= *table_fields++) != NULL)
1556
if (num_calculated_updated_fields > num_set_fields)
1561
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1563
/* check that this field exists in the UpdateHeader record */
1566
for (size_t x= 0; x < num_set_fields; ++x)
1568
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
string name= field_metadata.name();
1570
if (name.compare(current_field->field_name) == 0)
1573
++num_calculated_updated_fields;
1584
if ((num_calculated_updated_fields == num_set_fields) && found)
1595
void TransactionServices::setUpdateHeader(message::Statement &statement,
1596
Session *in_session,
1598
const unsigned char *old_record,
1599
const unsigned char *new_record)
1601
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1604
* Now we construct the specialized UpdateHeader message inside
1605
* the generalized message::Statement container...
1607
/* Set up the update header */
1608
message::UpdateHeader *header= statement.mutable_update_header();
1609
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1612
(void) in_table->getShare()->getSchemaName(schema_name);
1614
(void) in_table->getShare()->getTableName(table_name);
1616
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1617
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1619
Field *current_field;
1620
Field **table_fields= in_table->getFields();
1622
message::FieldMetadata *field_metadata;
1624
/* We will read all the table's fields... */
1625
in_table->setReadSet();
1627
while ((current_field= *table_fields++) != NULL)
1630
* We add the "key field metadata" -- i.e. the fields which is
1631
* the primary key for the table.
1633
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1635
field_metadata= header->add_key_field_metadata();
1636
field_metadata->set_name(current_field->field_name);
1637
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1640
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1642
/* Field is changed from old to new */
1643
field_metadata= header->add_set_field_metadata();
1644
field_metadata->set_name(current_field->field_name);
1645
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1649
void TransactionServices::updateRecord(Session *in_session,
1651
const unsigned char *old_record,
1652
const unsigned char *new_record)
1654
ReplicationServices &replication_services= ReplicationServices::singleton();
1655
if (! replication_services.isActive())
1658
uint32_t next_segment_id= 1;
1659
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1661
message::UpdateData *data= statement.mutable_update_data();
1662
data->set_segment_id(next_segment_id);
1663
data->set_end_segment(true);
1664
message::UpdateRecord *record= data->add_record();
1666
Field *current_field;
1667
Field **table_fields= in_table->getFields();
1668
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1669
string_value->set_charset(system_charset_info);
1671
while ((current_field= *table_fields++) != NULL)
1674
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1675
* but then realized that an UPDATE statement could potentially have different values for
1676
* the SET field. For instance, imagine this SQL scenario:
1678
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1679
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1680
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1682
* We will generate two UpdateRecord messages with different set_value byte arrays.
1684
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1686
/* Store the original "read bit" for this field */
1687
bool is_read_set= current_field->isReadSet();
1689
/* We need to mark that we will "read" this field... */
1690
in_table->setReadSet(current_field->position());
1692
/* Read the string value of this field's contents */
1693
string_value= current_field->val_str_internal(string_value);
1696
* Reset the read bit after reading field to its original state. This
1697
* prevents the field from being included in the WHERE clause
1699
current_field->setReadSet(is_read_set);
1701
if (current_field->is_null())
1703
record->add_is_null(true);
1704
record->add_after_value("", 0);
1708
record->add_is_null(false);
1709
record->add_after_value(string_value->c_ptr(), string_value->length());
1711
string_value->free();
1715
* Add the WHERE clause values now...for now, this means the
1716
* primary key field value. Replication only supports tables
1717
* with a primary key.
1719
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1722
* To say the below is ugly is an understatement. But it works.
1724
* @todo Move this crap into a real Record API.
1726
string_value= current_field->val_str_internal(string_value,
1728
current_field->offset(const_cast<unsigned char *>(new_record)));
1729
record->add_key_value(string_value->c_ptr(), string_value->length());
1730
string_value->free();
1736
bool TransactionServices::isFieldUpdated(Field *current_field,
1738
const unsigned char *old_record,
1739
const unsigned char *new_record)
1742
* The below really should be moved into the Field API and Record API. But for now
1743
* we do this crazy pointer fiddling to figure out if the current field
1744
* has been updated in the supplied record raw byte pointers.
1746
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1747
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1749
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1751
bool old_value_is_null= current_field->is_null_in_record(old_record);
1752
bool new_value_is_null= current_field->is_null_in_record(new_record);
1754
bool isUpdated= false;
1755
if (old_value_is_null != new_value_is_null)
1757
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1761
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1769
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1779
uint32_t *next_segment_id)
1781
message::Statement *statement= in_session->getStatementMessage();
1782
message::Transaction *transaction= NULL;
1785
* Check the type for the current Statement message, if it is anything
1786
* other then DELETE we need to call finalize, this will ensure a
1787
* new DeleteStatement is created. If it is of type DELETE check
1788
* what table the DELETE belongs to, if it is a different table
1789
* call finalize, so a new DeleteStatement can be created.
1791
if (statement != NULL && statement->type() != message::Statement::DELETE)
1793
finalizeStatementMessage(*statement, in_session);
1794
statement= in_session->getStatementMessage();
1796
else if (statement != NULL)
1798
transaction= getActiveTransactionMessage(in_session);
1801
* If we've passed our threshold for the statement size (possible for
1802
* a bulk insert), we'll finalize the Statement and Transaction (doing
1803
* the Transaction will keep it from getting huge).
1805
if (static_cast<size_t>(transaction->ByteSize()) >=
1806
in_session->variables.transaction_message_threshold)
1808
/* Remember the transaction ID so we can re-use it */
1809
uint64_t trx_id= transaction->transaction_context().transaction_id();
1811
message::DeleteData *current_data= statement->mutable_delete_data();
1813
/* Caller should use this value when adding a new record */
1814
*next_segment_id= current_data->segment_id() + 1;
1816
current_data->set_end_segment(false);
1819
* Send the trx message to replicators after finalizing the
1820
* statement and transaction. This will also set the Transaction
1821
* and Statement objects in Session to NULL.
1823
commitTransactionMessage(in_session);
1826
* Statement and Transaction should now be NULL, so new ones will get
1827
* created. We reuse the transaction id since we are segmenting
1830
statement= in_session->getStatementMessage();
1831
transaction= getActiveTransactionMessage(in_session, false);
1832
assert(transaction != NULL);
1834
/* Set the transaction ID to match the previous messages */
1835
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1839
const message::DeleteHeader &delete_header= statement->delete_header();
1840
string old_table_name= delete_header.table_metadata().table_name();
1842
string current_table_name;
1843
(void) in_table->getShare()->getTableName(current_table_name);
1844
if (current_table_name.compare(old_table_name))
1846
finalizeStatementMessage(*statement, in_session);
1847
statement= in_session->getStatementMessage();
1851
/* carry forward the existing segment id */
1852
const message::DeleteData ¤t_data= statement->delete_data();
1853
*next_segment_id= current_data.segment_id();
1858
if (statement == NULL)
1861
* Transaction will be non-NULL only if we had to segment it due to
1862
* transaction size above.
1864
if (transaction == NULL)
1865
transaction= getActiveTransactionMessage(in_session);
1868
* Transaction message initialized and set, but no statement created
1869
* yet. We construct one and initialize it, here, then return the
1870
* message after attaching the new Statement message pointer to the
1871
* Session for easy retrieval later...
1873
statement= transaction->add_statement();
1874
setDeleteHeader(*statement, in_session, in_table);
1875
in_session->setStatementMessage(statement);
1880
void TransactionServices::setDeleteHeader(message::Statement &statement,
1881
Session *in_session,
1884
initStatementMessage(statement, message::Statement::DELETE, in_session);
1887
* Now we construct the specialized DeleteHeader message inside
1888
* the generalized message::Statement container...
1890
message::DeleteHeader *header= statement.mutable_delete_header();
1891
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1894
(void) in_table->getShare()->getSchemaName(schema_name);
1896
(void) in_table->getShare()->getTableName(table_name);
1898
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1899
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1901
Field *current_field;
1902
Field **table_fields= in_table->getFields();
1904
message::FieldMetadata *field_metadata;
1906
while ((current_field= *table_fields++) != NULL)
1909
* Add the WHERE clause values now...for now, this means the
1910
* primary key field value. Replication only supports tables
1911
* with a primary key.
1913
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1915
field_metadata= header->add_key_field_metadata();
1916
field_metadata->set_name(current_field->field_name);
1917
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1922
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1924
ReplicationServices &replication_services= ReplicationServices::singleton();
1925
if (! replication_services.isActive())
1928
uint32_t next_segment_id= 1;
1929
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1931
message::DeleteData *data= statement.mutable_delete_data();
1932
data->set_segment_id(next_segment_id);
1933
data->set_end_segment(true);
1934
message::DeleteRecord *record= data->add_record();
1936
Field *current_field;
1937
Field **table_fields= in_table->getFields();
1938
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1939
string_value->set_charset(system_charset_info);
1941
while ((current_field= *table_fields++) != NULL)
1944
* Add the WHERE clause values now...for now, this means the
1945
* primary key field value. Replication only supports tables
1946
* with a primary key.
1948
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1950
if (use_update_record)
1953
* Temporarily point to the update record to get its value.
1954
* This is pretty much a hack in order to get the PK value from
1955
* the update record rather than the insert record. Field::val_str()
1956
* should not change anything in Field::ptr, so this should be safe.
1957
* We are careful not to change anything in old_ptr.
1959
const unsigned char *old_ptr= current_field->ptr;
1960
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1961
string_value= current_field->val_str_internal(string_value);
1962
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1966
string_value= current_field->val_str_internal(string_value);
1968
* @TODO Store optional old record value in the before data member
1971
record->add_key_value(string_value->c_ptr(), string_value->length());
1972
string_value->free();
1977
void TransactionServices::createTable(Session *in_session,
1978
const message::Table &table)
1980
ReplicationServices &replication_services= ReplicationServices::singleton();
1981
if (! replication_services.isActive())
1984
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1985
message::Statement *statement= transaction->add_statement();
1987
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1990
* Construct the specialized CreateTableStatement message and attach
1991
* it to the generic Statement message
1993
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1994
message::Table *new_table_message= create_table_statement->mutable_table();
1995
*new_table_message= table;
1997
finalizeStatementMessage(*statement, in_session);
1999
finalizeTransactionMessage(*transaction, in_session);
2001
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2003
cleanupTransactionMessage(transaction, in_session);
2007
void TransactionServices::createSchema(Session *in_session,
2008
const message::Schema &schema)
2010
ReplicationServices &replication_services= ReplicationServices::singleton();
2011
if (! replication_services.isActive())
2014
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2015
message::Statement *statement= transaction->add_statement();
2017
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
2020
* Construct the specialized CreateSchemaStatement message and attach
2021
* it to the generic Statement message
2023
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
2024
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2025
*new_schema_message= schema;
2027
finalizeStatementMessage(*statement, in_session);
2029
finalizeTransactionMessage(*transaction, in_session);
2031
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2033
cleanupTransactionMessage(transaction, in_session);
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
2039
ReplicationServices &replication_services= ReplicationServices::singleton();
2040
if (! replication_services.isActive())
2043
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2044
message::Statement *statement= transaction->add_statement();
2046
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
2049
* Construct the specialized DropSchemaStatement message and attach
2050
* it to the generic Statement message
2052
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2054
drop_schema_statement->set_schema_name(schema_name);
2056
finalizeStatementMessage(*statement, in_session);
2058
finalizeTransactionMessage(*transaction, in_session);
2060
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2062
cleanupTransactionMessage(transaction, in_session);
2065
void TransactionServices::dropTable(Session *in_session,
2066
const string &schema_name,
2067
const string &table_name,
2070
ReplicationServices &replication_services= ReplicationServices::singleton();
2071
if (! replication_services.isActive())
2074
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2075
message::Statement *statement= transaction->add_statement();
2077
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2080
* Construct the specialized DropTableStatement message and attach
2081
* it to the generic Statement message
2083
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2085
drop_table_statement->set_if_exists_clause(if_exists);
2087
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2089
table_metadata->set_schema_name(schema_name);
2090
table_metadata->set_table_name(table_name);
2092
finalizeStatementMessage(*statement, in_session);
2094
finalizeTransactionMessage(*transaction, in_session);
2096
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2098
cleanupTransactionMessage(transaction, in_session);
2101
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2103
ReplicationServices &replication_services= ReplicationServices::singleton();
2104
if (! replication_services.isActive())
2107
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2108
message::Statement *statement= transaction->add_statement();
2110
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2113
* Construct the specialized TruncateTableStatement message and attach
2114
* it to the generic Statement message
2116
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2117
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2120
(void) in_table->getShare()->getSchemaName(schema_name);
2122
(void) in_table->getShare()->getTableName(table_name);
2124
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2125
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2127
finalizeStatementMessage(*statement, in_session);
2129
finalizeTransactionMessage(*transaction, in_session);
2131
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2133
cleanupTransactionMessage(transaction, in_session);
2136
void TransactionServices::rawStatement(Session *in_session, const string &query)
2138
ReplicationServices &replication_services= ReplicationServices::singleton();
2139
if (! replication_services.isActive())
2142
message::Transaction *transaction= getActiveTransactionMessage(in_session);
2143
message::Statement *statement= transaction->add_statement();
2145
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2146
statement->set_sql(query);
2147
finalizeStatementMessage(*statement, in_session);
2149
finalizeTransactionMessage(*transaction, in_session);
2151
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
2153
cleanupTransactionMessage(transaction, in_session);
2156
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2158
ReplicationServices &replication_services= ReplicationServices::singleton();
2159
if (! replication_services.isActive())
2162
message::Transaction *transaction= new (nothrow) message::Transaction();
2164
// set server id, start timestamp
2165
initTransactionMessage(*transaction, session, true);
2167
// set end timestamp
2168
finalizeTransactionMessage(*transaction, session);
2170
message::Event *trx_event= transaction->mutable_event();
2172
trx_event->CopyFrom(event);
2174
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2178
return static_cast<int>(result);
2181
bool TransactionServices::sendStartupEvent(Session *session)
2183
message::Event event;
2184
event.set_type(message::Event::STARTUP);
2185
if (sendEvent(session, event) != 0)
2190
bool TransactionServices::sendShutdownEvent(Session *session)
2192
message::Event event;
2193
event.set_type(message::Event::SHUTDOWN);
2194
if (sendEvent(session, event) != 0)
2199
} /* namespace drizzled */