1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems, Inc.
5
* Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
* @file Transaction processing code
26
* The TransactionServices component takes internal events (for instance the start of a
27
* transaction, the changing of a record, or the rollback of a transaction)
28
* and constructs GPB Messages that are passed to the ReplicationServices
29
* component and used during replication.
31
* The reason for this functionality is to encapsulate all communication
32
* between the kernel and the replicator/applier plugins into GPB Messages.
33
* Instead of the plugin having to understand the (often fluidly changing)
34
* mechanics of the kernel, all the plugin needs to understand is the message
35
* format, and GPB messages provide a nice, clear, and versioned format for
38
* @see /drizzled/message/transaction.proto
42
* We really should store the raw bytes in the messages, not the
43
* String value of the Field. But, to do that, the
44
* statement_transform library needs first to be updated
45
* to include the transformation code to convert raw
46
* Drizzle-internal Field byte representation into something
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/epoch.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/plugin/xa_storage_engine.h"
73
#include "drizzled/internal/my_sys.h"
78
#include <google/protobuf/repeated_field.h>
81
using namespace google;
87
* @defgroup Transactions
91
* Transaction handling in the server
95
* In each client connection, Drizzle maintains two transaction
96
* contexts representing the state of the:
98
* 1) Statement Transaction
99
* 2) Normal Transaction
101
* These two transaction contexts represent the transactional
102
* state of a Session's SQL and XA transactions for a single
103
* SQL statement or a series of SQL statements.
105
* When the Session's connection is in AUTOCOMMIT mode, there
106
* is no practical difference between the statement and the
107
* normal transaction, as each SQL statement is committed or
108
* rolled back depending on the success or failure of the
109
* indvidual SQL statement.
111
* When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
* the Session has explicitly begun a normal SQL transaction using
113
* a BEGIN WORK/START TRANSACTION statement, then the normal
114
* transaction context tracks the aggregate transaction state of
115
* the SQL transaction's individual statements, and the SQL
116
* transaction's commit or rollback is done atomically for all of
117
* the SQL transaction's statement's data changes.
119
* Technically, a statement transaction can be viewed as a savepoint
120
* which is maintained automatically in order to make effects of one
123
* The normal transaction is started by the user and is typically
124
* ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
* The exception to this is that DDL statements implicitly COMMIT
126
* any previously active normal transaction before they begin executing.
128
* In Drizzle, unlike MySQL, plugins other than a storage engine
129
* may participate in a transaction. All plugin::TransactionalStorageEngine
130
* plugins will automatically be monitored by Drizzle's transaction
131
* manager (implemented in this source file), as will all plugins which
132
* implement plugin::XaResourceManager and register with the transaction
135
* If Drizzle's transaction manager sees that more than one resource
136
* manager (transactional storage engine or XA resource manager) has modified
137
* data state during a statement or normal transaction, the transaction
138
* manager will automatically use a two-phase commit protocol for all
139
* resources which support XA's distributed transaction protocol. Unlike
140
* MySQL, storage engines need not manually register with the transaction
141
* manager during a statement's execution. Previously, in MySQL, all
142
* handlertons would have to call trans_register_ha() at some point after
143
* modifying data state in order to have MySQL include that handler in
144
* an XA transaction. Drizzle does all of this grunt work behind the
145
* scenes for the storage engine implementers.
147
* When a connection is closed, the current normal transaction, if
148
* any is currently active, is rolled back.
150
* Transaction life cycle
151
* ----------------------
153
* When a new connection is established, session->transaction
154
* members are initialized to an empty state. If a statement uses any tables,
155
* all affected engines are registered in the statement engine list automatically
156
* in plugin::StorageEngine::startStatement() and
157
* plugin::TransactionalStorageEngine::startTransaction().
159
* You can view the lifetime of a normal transaction in the following
162
* drizzled::statement::Statement::execute()
163
* drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
* drizzled::TransactionServices::registerResourceForTransaction()
165
* drizzled::TransactionServices::registerResourceForStatement()
166
* drizzled::plugin::StorageEngine::startStatement()
167
* drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
* drizzled::plugin::StorageEngine::endStatement()
169
* drizzled::TransactionServices::autocommitOrRollback()
170
* drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
* drizzled::XaResourceManager::xaCommit() <-- or rollback()
173
* Roles and responsibilities
174
* --------------------------
176
* Beginning of SQL Statement (and Statement Transaction)
177
* ------------------------------------------------------
179
* At the start of each SQL statement, for each storage engine
180
* <strong>that is involved in the SQL statement</strong>, the kernel
181
* calls the engine's plugin::StoragEngine::startStatement() method. If the
182
* engine needs to track some data for the statement, it should use
183
* this method invocation to initialize this data. This is the
184
* beginning of what is called the "statement transaction".
186
* <strong>For transaction storage engines (those storage engines
187
* that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
* kernel automatically determines if the start of the SQL statement
189
* transaction should <em>also</em> begin the normal SQL transaction.
190
* This occurs when the connection is in NOT in autocommit mode. If
191
* the kernel detects this, then the kernel automatically starts the
192
* normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
* method and then calls plugin::StorageEngine::startStatement()
196
* Beginning of an SQL "Normal" Transaction
197
* ----------------------------------------
199
* As noted above, a "normal SQL transaction" may be started when
200
* an SQL statement is started in a connection and the connection is
201
* NOT in AUTOCOMMIT mode. This is automatically done by the kernel.
203
* In addition, when a user executes a START TRANSACTION or
204
* BEGIN WORK statement in a connection, the kernel explicitly
205
* calls each transactional storage engine's startTransaction() method.
207
* Ending of an SQL Statement (and Statement Transaction)
208
* ------------------------------------------------------
210
* At the end of each SQL statement, for each of the aforementioned
211
* involved storage engines, the kernel calls the engine's
212
* plugin::StorageEngine::endStatement() method. If the engine
213
* has initialized or modified some internal data about the
214
* statement transaction, it should use this method to reset or destroy
215
* this data appropriately.
217
* Ending of an SQL "Normal" Transaction
218
* -------------------------------------
220
* The end of a normal transaction is either a ROLLBACK or a COMMIT,
221
* depending on the success or failure of the statement transaction(s)
224
* The end of a "normal transaction" occurs when any of the following
227
* 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
* then the normal transaction which encloses the statement
230
* 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
* 3) Just before a DDL operation occurs, the kernel will implicitly
232
* commit the active normal transaction
234
* Transactions and Non-transactional Storage Engines
235
* --------------------------------------------------
237
* For non-transactional engines, this call can be safely ignored, an
238
* the kernel tracks whether a non-transactional engine has changed
239
* any data state, and warns the user appropriately if a transaction
240
* (statement or normal) is rolled back after such non-transactional
241
* data changes have been made.
243
* XA Two-phase Commit Protocol
244
* ----------------------------
246
* During statement execution, whenever any of data-modifying
247
* PSEA API methods is used, e.g. Cursor::write_row() or
248
* Cursor::update_row(), the read-write flag is raised in the
249
* statement transaction for the involved engine.
250
* Currently All PSEA calls are "traced", and the data can not be
251
* changed in a way other than issuing a PSEA call. Important:
252
* unless this invariant is preserved the server will not know that
253
* a transaction in a given engine is read-write and will not
254
* involve the two-phase commit protocol!
256
* At the end of a statement, TransactionServices::autocommitOrRollback()
257
* is invoked. This call in turn
258
* invokes plugin::XaResourceManager::xapPepare() for every involved XA
261
* Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
* or plugin::XaResourceManager::xaCommit() (depending on what the resource
265
* If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
* invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
* At statement commit, the statement-related read-write engine
268
* flag is propagated to the corresponding flag in the normal
269
* transaction. When the commit is complete, the list of registered
270
* engines is cleared.
272
* Rollback is handled in a similar fashion.
274
* Additional notes on DDL and the normal transaction.
275
* ---------------------------------------------------
277
* CREATE TABLE .. SELECT can start a *new* normal transaction
278
* because of the fact that SELECTs on a transactional storage
279
* engine participate in the normal SQL transaction (due to
280
* isolation level issues and consistent read views).
282
* Behaviour of the server in this case is currently badly
285
* DDL statements use a form of "semantic" logging
286
* to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
* the newly created table is deleted.
289
* In addition, some DDL statements issue interim transaction
290
* commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
* from the original table to the internal temporary table. Other
292
* statements, e.g. CREATE TABLE ... SELECT do not always commit
295
* And finally there is a group of DDL statements such as
296
* RENAME/DROP TABLE that doesn't start a new transaction
297
* and doesn't commit.
299
* A consistent behaviour is perhaps to always commit the normal
300
* transaction after all DDLs, just like the statement transaction
301
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
316
void TransactionServices::registerResourceForStatement(Session::reference session,
317
plugin::MonitoredInTransaction *monitored,
318
plugin::TransactionalStorageEngine *engine)
320
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
323
* Now we automatically register this resource manager for the
324
* normal transaction. This is fine because a statement
325
* transaction registration should always enlist the resource
326
* in the normal transaction which contains the statement
329
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session.transaction.stmt;
333
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
335
if (resource_context->isStarted())
336
return; /* already registered, return */
338
assert(monitored->participatesInSqlTransaction());
339
assert(not monitored->participatesInXaTransaction());
341
resource_context->setMonitored(monitored);
342
resource_context->setTransactionalStorageEngine(engine);
343
trans->registerResource(resource_context);
345
trans->no_2pc|= true;
348
void TransactionServices::registerResourceForStatement(Session::reference session,
349
plugin::MonitoredInTransaction *monitored,
350
plugin::TransactionalStorageEngine *engine,
351
plugin::XaResourceManager *resource_manager)
353
if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
356
* Now we automatically register this resource manager for the
357
* normal transaction. This is fine because a statement
358
* transaction registration should always enlist the resource
359
* in the normal transaction which contains the statement
362
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session.transaction.stmt;
366
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
368
if (resource_context->isStarted())
369
return; /* already registered, return */
371
assert(monitored->participatesInXaTransaction());
372
assert(monitored->participatesInSqlTransaction());
374
resource_context->setMonitored(monitored);
375
resource_context->setTransactionalStorageEngine(engine);
376
resource_context->setXaResourceManager(resource_manager);
377
trans->registerResource(resource_context);
379
trans->no_2pc|= false;
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
plugin::MonitoredInTransaction *monitored,
384
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session.transaction.all;
387
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
389
if (resource_context->isStarted())
390
return; /* already registered, return */
392
session.server_status|= SERVER_STATUS_IN_TRANS;
394
trans->registerResource(resource_context);
396
assert(monitored->participatesInSqlTransaction());
397
assert(not monitored->participatesInXaTransaction());
399
resource_context->setMonitored(monitored);
400
resource_context->setTransactionalStorageEngine(engine);
401
trans->no_2pc|= true;
403
if (session.transaction.xid_state.xid.is_null())
404
session.transaction.xid_state.xid.set(session.getQueryId());
406
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session.getResourceContext(monitored, 0)->isStarted())
408
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
plugin::MonitoredInTransaction *monitored,
413
plugin::TransactionalStorageEngine *engine,
414
plugin::XaResourceManager *resource_manager)
416
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
419
if (resource_context->isStarted())
420
return; /* already registered, return */
422
session.server_status|= SERVER_STATUS_IN_TRANS;
424
trans->registerResource(resource_context);
426
assert(monitored->participatesInSqlTransaction());
428
resource_context->setMonitored(monitored);
429
resource_context->setXaResourceManager(resource_manager);
430
resource_context->setTransactionalStorageEngine(engine);
431
trans->no_2pc|= true;
433
if (session.transaction.xid_state.xid.is_null())
434
session.transaction.xid_state.xid.set(session.getQueryId());
436
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
438
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session.getResourceContext(monitored, 0)->isStarted())
440
registerResourceForStatement(session, monitored, engine, resource_manager);
443
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
451
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
458
if (session.getXaId() == 0)
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
463
return session.getXaId();
466
int TransactionServices::commitTransaction(Session::reference session,
467
bool normal_transaction)
469
int error= 0, cookie= 0;
471
'all' means that this is either an explicit commit issued by
472
user, or an implicit commit issued by a DDL.
474
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
475
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
477
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
480
We must not commit the normal transaction if a statement
481
transaction is pending. Otherwise statement transaction
482
flags will not get propagated to its normal transaction's
485
assert(session.transaction.stmt.getResourceContexts().empty() ||
486
trans == &session.transaction.stmt);
488
if (resource_contexts.empty() == false)
490
if (is_real_trans && session.wait_if_global_read_lock(false, false))
492
rollbackTransaction(session, normal_transaction);
497
* If replication is on, we do a PREPARE on the resource managers, push the
498
* Transaction message across the replication stream, and then COMMIT if the
499
* replication stream returned successfully.
501
if (shouldConstructMessages())
503
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
504
it != resource_contexts.end() && ! error;
507
ResourceContext *resource_context= *it;
510
Do not call two-phase commit if this particular
511
transaction is read-only. This allows for simpler
512
implementation in engines that are always read-only.
514
if (! resource_context->hasModifiedData())
517
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
519
if (resource->participatesInXaTransaction())
521
if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
523
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
528
session.status_var.ha_prepare_count++;
532
if (error == 0 && is_real_trans)
535
* Push the constructed Transaction messages across to
536
* replicators and appliers.
538
error= commitTransactionMessage(session);
542
rollbackTransaction(session, normal_transaction);
547
error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
550
session.startWaitingGlobalReadLock();
557
This function does not care about global read lock. A caller should.
559
int TransactionServices::commitPhaseOne(Session::reference session,
560
bool normal_transaction)
563
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
564
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
566
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
567
bool all= normal_transaction;
569
/* If we're in autocommit then we have a real transaction to commit
570
(except if it's BEGIN)
572
if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
575
if (resource_contexts.empty() == false)
577
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
578
it != resource_contexts.end();
582
ResourceContext *resource_context= *it;
584
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
586
if (resource->participatesInXaTransaction())
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
590
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
593
else if (normal_transaction)
595
session.status_var.ha_commit_count++;
598
else if (resource->participatesInSqlTransaction())
600
if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
602
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
605
else if (normal_transaction)
607
session.status_var.ha_commit_count++;
610
resource_context->reset(); /* keep it conveniently zero-filled */
614
session.transaction.xid_state.xid.null();
616
if (normal_transaction)
618
session.variables.tx_isolation= session.session_tx_isolation;
619
session.transaction.cleanup();
626
int TransactionServices::rollbackTransaction(Session::reference session,
627
bool normal_transaction)
630
TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
631
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
637
We must not rollback the normal transaction if a statement
638
transaction is pending.
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
643
if (resource_contexts.empty() == false)
645
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
646
it != resource_contexts.end();
650
ResourceContext *resource_context= *it;
652
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
654
if (resource->participatesInXaTransaction())
656
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
658
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
661
else if (normal_transaction)
663
session.status_var.ha_rollback_count++;
666
else if (resource->participatesInSqlTransaction())
668
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
670
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
673
else if (normal_transaction)
675
session.status_var.ha_rollback_count++;
678
resource_context->reset(); /* keep it conveniently zero-filled */
682
* We need to signal the ROLLBACK to ReplicationServices here
683
* BEFORE we set the transaction ID to NULL. This is because
684
* if a bulk segment was sent to replicators, we need to send
685
* a rollback statement with the corresponding transaction ID
689
rollbackTransactionMessage(session);
691
rollbackStatementMessage(session);
694
session.transaction.xid_state.xid.null();
695
if (normal_transaction)
697
session.variables.tx_isolation=session.session_tx_isolation;
698
session.transaction.cleanup();
701
if (normal_transaction)
702
session.transaction_rollback_request= false;
705
* If a non-transactional table was updated, warn the user
708
session.transaction.all.hasModifiedNonTransData() &&
709
session.getKilled() != Session::KILL_CONNECTION)
711
push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
712
ER_WARNING_NOT_COMPLETE_ROLLBACK,
713
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
719
int TransactionServices::autocommitOrRollback(Session::reference session,
722
/* One GPB Statement message per SQL statement */
723
message::Statement *statement= session.getStatementMessage();
724
if ((statement != NULL) && (! error))
725
finalizeStatementMessage(*statement, session);
727
if (session.transaction.stmt.getResourceContexts().empty() == false)
729
TransactionContext *trans = &session.transaction.stmt;
730
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
731
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
732
it != resource_contexts.end();
735
ResourceContext *resource_context= *it;
737
resource_context->getTransactionalStorageEngine()->endStatement(&session);
742
if (commitTransaction(session, false))
747
(void) rollbackTransaction(session, false);
748
if (session.transaction_rollback_request)
750
(void) rollbackTransaction(session, true);
751
session.server_status&= ~SERVER_STATUS_IN_TRANS;
755
session.variables.tx_isolation= session.session_tx_isolation;
761
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
763
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
765
/* The below is perfectly fine, since we're simply comparing addresses for the underlying
766
* resources aren't the same... */
767
return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
771
int TransactionServices::rollbackToSavepoint(Session::reference session,
775
TransactionContext *trans= &session.transaction.all;
776
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
777
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
779
trans->no_2pc= false;
781
rolling back to savepoint in all storage engines that were part of the
782
transaction when the savepoint was set
784
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
785
it != sv_resource_contexts.end();
789
ResourceContext *resource_context= *it;
791
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
793
if (resource->participatesInSqlTransaction())
795
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
797
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
802
session.status_var.ha_savepoint_rollback_count++;
805
trans->no_2pc|= not resource->participatesInXaTransaction();
808
rolling back the transaction in all storage engines that were not part of
809
the transaction when the savepoint was set
812
TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
813
TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
814
TransactionContext::ResourceContexts set_difference_contexts;
817
* Bug #542299: segfault during set_difference() below. copy<>() requires pre-allocation
818
* of all elements, including the target, which is why we pre-allocate the set_difference_contexts
821
set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
823
sort(sorted_tran_resource_contexts.begin(),
824
sorted_tran_resource_contexts.end(),
825
ResourceContextCompare());
826
sort(sorted_sv_resource_contexts.begin(),
827
sorted_sv_resource_contexts.end(),
828
ResourceContextCompare());
829
set_difference(sorted_tran_resource_contexts.begin(),
830
sorted_tran_resource_contexts.end(),
831
sorted_sv_resource_contexts.begin(),
832
sorted_sv_resource_contexts.end(),
833
set_difference_contexts.begin(),
834
ResourceContextCompare());
836
* set_difference_contexts now contains all resource contexts
837
* which are in the transaction context but were NOT in the
838
* savepoint's resource contexts.
841
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
842
it != set_difference_contexts.end();
845
ResourceContext *resource_context= *it;
848
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
850
if (resource->participatesInSqlTransaction())
852
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
854
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
859
session.status_var.ha_rollback_count++;
862
resource_context->reset(); /* keep it conveniently zero-filled */
865
trans->setResourceContexts(sv_resource_contexts);
867
if (shouldConstructMessages())
869
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
870
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
871
if (savepoint_transaction != NULL)
873
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
874
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
875
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
876
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
878
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
879
uint32_t num_statements = savepoint_transaction_copy->statement_size();
880
if (num_statements == 0)
882
session.setStatementMessage(NULL);
886
session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
888
session.setTransactionMessage(savepoint_transaction_copy);
897
according to the sql standard (ISO/IEC 9075-2:2003)
898
section "4.33.4 SQL-statements and transaction states",
899
NamedSavepoint is *not* transaction-initiating SQL-statement
901
int TransactionServices::setSavepoint(Session::reference session,
905
TransactionContext *trans= &session.transaction.all;
906
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
908
if (resource_contexts.empty() == false)
910
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
911
it != resource_contexts.end();
914
ResourceContext *resource_context= *it;
917
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
919
if (resource->participatesInSqlTransaction())
921
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
923
my_error(ER_GET_ERRNO, MYF(0), err);
928
session.status_var.ha_savepoint_count++;
934
Remember the list of registered storage engines.
936
sv.setResourceContexts(resource_contexts);
938
if (shouldConstructMessages())
940
message::Transaction *transaction= session.getTransactionMessage();
942
if (transaction != NULL)
944
message::Transaction *transaction_savepoint=
945
new message::Transaction(*transaction);
946
sv.setTransactionMessage(transaction_savepoint);
953
int TransactionServices::releaseSavepoint(Session::reference session,
958
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
960
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
961
it != resource_contexts.end();
965
ResourceContext *resource_context= *it;
967
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
969
if (resource->participatesInSqlTransaction())
971
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
973
my_error(ER_GET_ERRNO, MYF(0), err);
982
bool TransactionServices::shouldConstructMessages()
984
ReplicationServices &replication_services= ReplicationServices::singleton();
985
return replication_services.isActive();
988
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
989
bool should_inc_trx_id)
991
message::Transaction *transaction= session.getTransactionMessage();
993
if (unlikely(transaction == NULL))
996
* Allocate and initialize a new transaction message
997
* for this Session object. Session is responsible for
998
* deleting transaction message when done with it.
1000
transaction= new (nothrow) message::Transaction();
1001
initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
session.setTransactionMessage(transaction);
1009
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
Session::reference session,
1011
bool should_inc_trx_id)
1013
message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
trx->set_server_id(session.getServerId());
1016
if (should_inc_trx_id)
1018
trx->set_transaction_id(getCurrentTransactionId(session));
1023
/* trx and seg id will get set properly elsewhere */
1024
trx->set_transaction_id(0);
1027
trx->set_start_timestamp(session.getCurrentTimestamp());
1029
/* segment info may get set elsewhere as needed */
1030
transaction.set_segment_id(1);
1031
transaction.set_end_segment(true);
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
Session::const_reference session)
1037
message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
trx->set_end_timestamp(session.getCurrentTimestamp());
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
Session::reference session)
1045
session.setStatementMessage(NULL);
1046
session.setTransactionMessage(NULL);
1050
int TransactionServices::commitTransactionMessage(Session::reference session)
1052
ReplicationServices &replication_services= ReplicationServices::singleton();
1053
if (! replication_services.isActive())
1057
* If no Transaction message was ever created, then no data modification
1058
* occurred inside the transaction, so nothing to do.
1060
if (session.getTransactionMessage() == NULL)
1063
/* If there is an active statement message, finalize it. */
1064
message::Statement *statement= session.getStatementMessage();
1066
if (statement != NULL)
1068
finalizeStatementMessage(*statement, session);
1071
message::Transaction* transaction= getActiveTransactionMessage(session);
1074
* It is possible that we could have a Transaction without any Statements
1075
* if we had created a Statement but had to roll it back due to it failing
1076
* mid-execution, and no subsequent Statements were added to the Transaction
1077
* message. In this case, we simply clean up the message and not push it.
1079
if (transaction->statement_size() == 0)
1081
cleanupTransactionMessage(transaction, session);
1085
finalizeTransactionMessage(*transaction, session);
1087
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
cleanupTransactionMessage(transaction, session);
1091
return static_cast<int>(result);
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
message::Statement::Type type,
1096
Session::const_reference session)
1098
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1101
if (session.variables.replicate_query)
1102
statement.set_sql(session.getQueryString()->c_str());
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session::reference session)
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1109
session.setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
if (! replication_services.isActive())
1118
message::Transaction *transaction= getActiveTransactionMessage(session);
1121
* OK, so there are two situations that we need to deal with here:
1123
* 1) We receive an instruction to ROLLBACK the current transaction
1124
* and the currently-stored Transaction message is *self-contained*,
1125
* meaning that no Statement messages in the Transaction message
1126
* contain a message having its segment_id member greater than 1. If
1127
* no non-segment ID 1 members are found, we can simply clear the
1128
* current Transaction message and remove it from memory.
1130
* 2) If the Transaction message does indeed have a non-end segment, that
1131
* means that a bulk update/delete/insert Transaction message segment
1132
* has previously been sent over the wire to replicators. In this case,
1133
* we need to package a Transaction with a Statement message of type
1134
* ROLLBACK to indicate to replicators that previously-transmitted
1135
* messages must be un-applied.
1137
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1139
/* Remember the transaction ID so we can re-use it */
1140
uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
uint32_t seg_id= transaction->segment_id();
1144
* Clear the transaction, create a Rollback statement message,
1145
* attach it to the transaction, and push it to replicators.
1147
transaction->Clear();
1148
initTransactionMessage(*transaction, session, false);
1150
/* Set the transaction ID to match the previous messages */
1151
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
transaction->set_segment_id(seg_id);
1153
transaction->set_end_segment(true);
1155
message::Statement *statement= transaction->add_statement();
1157
initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
finalizeStatementMessage(*statement, session);
1160
finalizeTransactionMessage(*transaction, session);
1162
(void) replication_services.pushTransactionMessage(session, *transaction);
1165
cleanupTransactionMessage(transaction, session);
1168
void TransactionServices::rollbackStatementMessage(Session::reference session)
1170
ReplicationServices &replication_services= ReplicationServices::singleton();
1171
if (! replication_services.isActive())
1174
message::Statement *current_statement= session.getStatementMessage();
1176
/* If we never added a Statement message, nothing to undo. */
1177
if (current_statement == NULL)
1181
* If the Statement has been segmented, then we've already pushed a portion
1182
* of this Statement's row changes through the replication stream and we
1183
* need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
* delete the current Statement message.
1186
bool is_segmented= false;
1188
switch (current_statement->type())
1190
case message::Statement::INSERT:
1191
if (current_statement->insert_data().segment_id() > 1)
1195
case message::Statement::UPDATE:
1196
if (current_statement->update_data().segment_id() > 1)
1200
case message::Statement::DELETE:
1201
if (current_statement->delete_data().segment_id() > 1)
1210
* Remove the Statement message we've been working with (same as
1211
* current_statement).
1213
message::Transaction *transaction= getActiveTransactionMessage(session);
1214
google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
statements_in_txn= transaction->mutable_statement();
1216
statements_in_txn->RemoveLast();
1217
session.setStatementMessage(NULL);
1220
* Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
* an indicator to cancel the previous Statement message which should have
1222
* had its end_segment attribute set to false.
1226
current_statement= transaction->add_statement();
1227
initStatementMessage(*current_statement,
1228
message::Statement::ROLLBACK_STATEMENT,
1230
finalizeStatementMessage(*current_statement, session);
1234
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
message::Transaction *transaction)
1237
uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
uint32_t seg_id= transaction->segment_id();
1240
transaction->set_end_segment(false);
1241
commitTransactionMessage(session);
1242
transaction= getActiveTransactionMessage(session, false);
1244
/* Set the transaction ID to match the previous messages */
1245
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
transaction->set_segment_id(seg_id + 1);
1247
transaction->set_end_segment(true);
1252
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1254
uint32_t *next_segment_id)
1256
message::Statement *statement= session.getStatementMessage();
1257
message::Transaction *transaction= NULL;
1260
* If statement is NULL, this is a new statement.
1261
* If statement is NOT NULL, this a continuation of the same statement.
1262
* This is because autocommitOrRollback() finalizes the statement so that
1263
* we guarantee only one Statement message per statement (i.e., we no longer
1264
* share a single GPB message for multiple statements).
1266
if (statement == NULL)
1268
transaction= getActiveTransactionMessage(session);
1270
if (static_cast<size_t>(transaction->ByteSize()) >=
1271
transaction_message_threshold)
1273
transaction= segmentTransactionMessage(session, transaction);
1276
statement= transaction->add_statement();
1277
setInsertHeader(*statement, session, table);
1278
session.setStatementMessage(statement);
1282
transaction= getActiveTransactionMessage(session);
1285
* If we've passed our threshold for the statement size (possible for
1286
* a bulk insert), we'll finalize the Statement and Transaction (doing
1287
* the Transaction will keep it from getting huge).
1289
if (static_cast<size_t>(transaction->ByteSize()) >=
1290
transaction_message_threshold)
1292
/* Remember the transaction ID so we can re-use it */
1293
uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
uint32_t seg_id= transaction->segment_id();
1296
message::InsertData *current_data= statement->mutable_insert_data();
1298
/* Caller should use this value when adding a new record */
1299
*next_segment_id= current_data->segment_id() + 1;
1301
current_data->set_end_segment(false);
1302
transaction->set_end_segment(false);
1305
* Send the trx message to replicators after finalizing the
1306
* statement and transaction. This will also set the Transaction
1307
* and Statement objects in Session to NULL.
1309
commitTransactionMessage(session);
1312
* Statement and Transaction should now be NULL, so new ones will get
1313
* created. We reuse the transaction id since we are segmenting
1316
transaction= getActiveTransactionMessage(session, false);
1317
assert(transaction != NULL);
1319
statement= transaction->add_statement();
1320
setInsertHeader(*statement, session, table);
1321
session.setStatementMessage(statement);
1323
/* Set the transaction ID to match the previous messages */
1324
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
transaction->set_segment_id(seg_id + 1);
1326
transaction->set_end_segment(true);
1331
* Continuation of the same statement. Carry forward the existing
1334
const message::InsertData ¤t_data= statement->insert_data();
1335
*next_segment_id= current_data.segment_id();
1342
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
Session::const_reference session,
1346
initStatementMessage(statement, message::Statement::INSERT, session);
1349
* Now we construct the specialized InsertHeader message inside
1350
* the generalized message::Statement container...
1352
/* Set up the insert header */
1353
message::InsertHeader *header= statement.mutable_insert_header();
1354
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1357
(void) table.getShare()->getSchemaName(schema_name);
1359
(void) table.getShare()->getTableName(table_name);
1361
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1362
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1364
Field *current_field;
1365
Field **table_fields= table.getFields();
1367
message::FieldMetadata *field_metadata;
1369
/* We will read all the table's fields... */
1372
while ((current_field= *table_fields++) != NULL)
1374
field_metadata= header->add_field_metadata();
1375
field_metadata->set_name(current_field->field_name);
1376
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1380
bool TransactionServices::insertRecord(Session::reference session,
1383
ReplicationServices &replication_services= ReplicationServices::singleton();
1384
if (! replication_services.isActive())
1388
* We do this check here because we don't want to even create a
1389
* statement if there isn't a primary key on the table...
1393
* Multi-column primary keys are handled how exactly?
1395
if (not table.getShare()->hasPrimaryKey())
1397
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1401
uint32_t next_segment_id= 1;
1402
message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1404
message::InsertData *data= statement.mutable_insert_data();
1405
data->set_segment_id(next_segment_id);
1406
data->set_end_segment(true);
1407
message::InsertRecord *record= data->add_record();
1409
Field *current_field;
1410
Field **table_fields= table.getFields();
1412
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1413
string_value->set_charset(system_charset_info);
1415
/* We will read all the table's fields... */
1418
while ((current_field= *table_fields++) != NULL)
1420
if (current_field->is_null())
1422
record->add_is_null(true);
1423
record->add_insert_value("", 0);
1427
string_value= current_field->val_str_internal(string_value);
1428
record->add_is_null(false);
1429
record->add_insert_value(string_value->c_ptr(), string_value->length());
1430
string_value->free();
1436
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1438
const unsigned char *old_record,
1439
const unsigned char *new_record,
1440
uint32_t *next_segment_id)
1442
message::Statement *statement= session.getStatementMessage();
1443
message::Transaction *transaction= NULL;
1446
* If statement is NULL, this is a new statement.
1447
* If statement is NOT NULL, this a continuation of the same statement.
1448
* This is because autocommitOrRollback() finalizes the statement so that
1449
* we guarantee only one Statement message per statement (i.e., we no longer
1450
* share a single GPB message for multiple statements).
1452
if (statement == NULL)
1454
transaction= getActiveTransactionMessage(session);
1456
if (static_cast<size_t>(transaction->ByteSize()) >=
1457
transaction_message_threshold)
1459
transaction= segmentTransactionMessage(session, transaction);
1462
statement= transaction->add_statement();
1463
setUpdateHeader(*statement, session, table, old_record, new_record);
1464
session.setStatementMessage(statement);
1468
transaction= getActiveTransactionMessage(session);
1471
* If we've passed our threshold for the statement size (possible for
1472
* a bulk insert), we'll finalize the Statement and Transaction (doing
1473
* the Transaction will keep it from getting huge).
1475
if (static_cast<size_t>(transaction->ByteSize()) >=
1476
transaction_message_threshold)
1478
/* Remember the transaction ID so we can re-use it */
1479
uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
uint32_t seg_id= transaction->segment_id();
1482
message::UpdateData *current_data= statement->mutable_update_data();
1484
/* Caller should use this value when adding a new record */
1485
*next_segment_id= current_data->segment_id() + 1;
1487
current_data->set_end_segment(false);
1488
transaction->set_end_segment(false);
1491
* Send the trx message to replicators after finalizing the
1492
* statement and transaction. This will also set the Transaction
1493
* and Statement objects in Session to NULL.
1495
commitTransactionMessage(session);
1498
* Statement and Transaction should now be NULL, so new ones will get
1499
* created. We reuse the transaction id since we are segmenting
1502
transaction= getActiveTransactionMessage(session, false);
1503
assert(transaction != NULL);
1505
statement= transaction->add_statement();
1506
setUpdateHeader(*statement, session, table, old_record, new_record);
1507
session.setStatementMessage(statement);
1509
/* Set the transaction ID to match the previous messages */
1510
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
transaction->set_segment_id(seg_id + 1);
1512
transaction->set_end_segment(true);
1517
* Continuation of the same statement. Carry forward the existing
1520
const message::UpdateData ¤t_data= statement->update_data();
1521
*next_segment_id= current_data.segment_id();
1528
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
Session::const_reference session,
1531
const unsigned char *old_record,
1532
const unsigned char *new_record)
1534
initStatementMessage(statement, message::Statement::UPDATE, session);
1537
* Now we construct the specialized UpdateHeader message inside
1538
* the generalized message::Statement container...
1540
/* Set up the update header */
1541
message::UpdateHeader *header= statement.mutable_update_header();
1542
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1545
(void) table.getShare()->getSchemaName(schema_name);
1547
(void) table.getShare()->getTableName(table_name);
1549
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1550
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1552
Field *current_field;
1553
Field **table_fields= table.getFields();
1555
message::FieldMetadata *field_metadata;
1557
/* We will read all the table's fields... */
1560
while ((current_field= *table_fields++) != NULL)
1563
* We add the "key field metadata" -- i.e. the fields which is
1564
* the primary key for the table.
1566
if (table.getShare()->fieldInPrimaryKey(current_field))
1568
field_metadata= header->add_key_field_metadata();
1569
field_metadata->set_name(current_field->field_name);
1570
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1573
if (isFieldUpdated(current_field, table, old_record, new_record))
1575
/* Field is changed from old to new */
1576
field_metadata= header->add_set_field_metadata();
1577
field_metadata->set_name(current_field->field_name);
1578
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1582
void TransactionServices::updateRecord(Session::reference session,
1584
const unsigned char *old_record,
1585
const unsigned char *new_record)
1587
ReplicationServices &replication_services= ReplicationServices::singleton();
1588
if (! replication_services.isActive())
1591
uint32_t next_segment_id= 1;
1592
message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1594
message::UpdateData *data= statement.mutable_update_data();
1595
data->set_segment_id(next_segment_id);
1596
data->set_end_segment(true);
1597
message::UpdateRecord *record= data->add_record();
1599
Field *current_field;
1600
Field **table_fields= table.getFields();
1601
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1602
string_value->set_charset(system_charset_info);
1604
while ((current_field= *table_fields++) != NULL)
1607
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1608
* but then realized that an UPDATE statement could potentially have different values for
1609
* the SET field. For instance, imagine this SQL scenario:
1611
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1612
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1613
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1615
* We will generate two UpdateRecord messages with different set_value byte arrays.
1617
if (isFieldUpdated(current_field, table, old_record, new_record))
1619
/* Store the original "read bit" for this field */
1620
bool is_read_set= current_field->isReadSet();
1622
/* We need to mark that we will "read" this field... */
1623
table.setReadSet(current_field->position());
1625
/* Read the string value of this field's contents */
1626
string_value= current_field->val_str_internal(string_value);
1629
* Reset the read bit after reading field to its original state. This
1630
* prevents the field from being included in the WHERE clause
1632
current_field->setReadSet(is_read_set);
1634
if (current_field->is_null())
1636
record->add_is_null(true);
1637
record->add_after_value("", 0);
1641
record->add_is_null(false);
1642
record->add_after_value(string_value->c_ptr(), string_value->length());
1644
string_value->free();
1648
* Add the WHERE clause values now...for now, this means the
1649
* primary key field value. Replication only supports tables
1650
* with a primary key.
1652
if (table.getShare()->fieldInPrimaryKey(current_field))
1655
* To say the below is ugly is an understatement. But it works.
1657
* @todo Move this crap into a real Record API.
1659
string_value= current_field->val_str_internal(string_value,
1661
current_field->offset(const_cast<unsigned char *>(new_record)));
1662
record->add_key_value(string_value->c_ptr(), string_value->length());
1663
string_value->free();
1669
bool TransactionServices::isFieldUpdated(Field *current_field,
1671
const unsigned char *old_record,
1672
const unsigned char *new_record)
1675
* The below really should be moved into the Field API and Record API. But for now
1676
* we do this crazy pointer fiddling to figure out if the current field
1677
* has been updated in the supplied record raw byte pointers.
1679
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1680
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1682
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1684
bool old_value_is_null= current_field->is_null_in_record(old_record);
1685
bool new_value_is_null= current_field->is_null_in_record(new_record);
1687
bool isUpdated= false;
1688
if (old_value_is_null != new_value_is_null)
1690
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1694
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1702
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1710
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1712
uint32_t *next_segment_id)
1714
message::Statement *statement= session.getStatementMessage();
1715
message::Transaction *transaction= NULL;
1718
* If statement is NULL, this is a new statement.
1719
* If statement is NOT NULL, this a continuation of the same statement.
1720
* This is because autocommitOrRollback() finalizes the statement so that
1721
* we guarantee only one Statement message per statement (i.e., we no longer
1722
* share a single GPB message for multiple statements).
1724
if (statement == NULL)
1726
transaction= getActiveTransactionMessage(session);
1728
if (static_cast<size_t>(transaction->ByteSize()) >=
1729
transaction_message_threshold)
1731
transaction= segmentTransactionMessage(session, transaction);
1734
statement= transaction->add_statement();
1735
setDeleteHeader(*statement, session, table);
1736
session.setStatementMessage(statement);
1740
transaction= getActiveTransactionMessage(session);
1743
* If we've passed our threshold for the statement size (possible for
1744
* a bulk insert), we'll finalize the Statement and Transaction (doing
1745
* the Transaction will keep it from getting huge).
1747
if (static_cast<size_t>(transaction->ByteSize()) >=
1748
transaction_message_threshold)
1750
/* Remember the transaction ID so we can re-use it */
1751
uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
uint32_t seg_id= transaction->segment_id();
1754
message::DeleteData *current_data= statement->mutable_delete_data();
1756
/* Caller should use this value when adding a new record */
1757
*next_segment_id= current_data->segment_id() + 1;
1759
current_data->set_end_segment(false);
1760
transaction->set_end_segment(false);
1763
* Send the trx message to replicators after finalizing the
1764
* statement and transaction. This will also set the Transaction
1765
* and Statement objects in Session to NULL.
1767
commitTransactionMessage(session);
1770
* Statement and Transaction should now be NULL, so new ones will get
1771
* created. We reuse the transaction id since we are segmenting
1774
transaction= getActiveTransactionMessage(session, false);
1775
assert(transaction != NULL);
1777
statement= transaction->add_statement();
1778
setDeleteHeader(*statement, session, table);
1779
session.setStatementMessage(statement);
1781
/* Set the transaction ID to match the previous messages */
1782
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
transaction->set_segment_id(seg_id + 1);
1784
transaction->set_end_segment(true);
1789
* Continuation of the same statement. Carry forward the existing
1792
const message::DeleteData ¤t_data= statement->delete_data();
1793
*next_segment_id= current_data.segment_id();
1800
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
Session::const_reference session,
1804
initStatementMessage(statement, message::Statement::DELETE, session);
1807
* Now we construct the specialized DeleteHeader message inside
1808
* the generalized message::Statement container...
1810
message::DeleteHeader *header= statement.mutable_delete_header();
1811
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1814
(void) table.getShare()->getSchemaName(schema_name);
1816
(void) table.getShare()->getTableName(table_name);
1818
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1819
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1821
Field *current_field;
1822
Field **table_fields= table.getFields();
1824
message::FieldMetadata *field_metadata;
1826
while ((current_field= *table_fields++) != NULL)
1829
* Add the WHERE clause values now...for now, this means the
1830
* primary key field value. Replication only supports tables
1831
* with a primary key.
1833
if (table.getShare()->fieldInPrimaryKey(current_field))
1835
field_metadata= header->add_key_field_metadata();
1836
field_metadata->set_name(current_field->field_name);
1837
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1842
void TransactionServices::deleteRecord(Session::reference session,
1844
bool use_update_record)
1846
ReplicationServices &replication_services= ReplicationServices::singleton();
1847
if (! replication_services.isActive())
1850
uint32_t next_segment_id= 1;
1851
message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1853
message::DeleteData *data= statement.mutable_delete_data();
1854
data->set_segment_id(next_segment_id);
1855
data->set_end_segment(true);
1856
message::DeleteRecord *record= data->add_record();
1858
Field *current_field;
1859
Field **table_fields= table.getFields();
1860
String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
string_value->set_charset(system_charset_info);
1863
while ((current_field= *table_fields++) != NULL)
1866
* Add the WHERE clause values now...for now, this means the
1867
* primary key field value. Replication only supports tables
1868
* with a primary key.
1870
if (table.getShare()->fieldInPrimaryKey(current_field))
1872
if (use_update_record)
1875
* Temporarily point to the update record to get its value.
1876
* This is pretty much a hack in order to get the PK value from
1877
* the update record rather than the insert record. Field::val_str()
1878
* should not change anything in Field::ptr, so this should be safe.
1879
* We are careful not to change anything in old_ptr.
1881
const unsigned char *old_ptr= current_field->ptr;
1882
current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1883
string_value= current_field->val_str_internal(string_value);
1884
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1888
string_value= current_field->val_str_internal(string_value);
1890
* @TODO Store optional old record value in the before data member
1893
record->add_key_value(string_value->c_ptr(), string_value->length());
1894
string_value->free();
1899
void TransactionServices::createTable(Session::reference session,
1900
const message::Table &table)
1902
ReplicationServices &replication_services= ReplicationServices::singleton();
1903
if (! replication_services.isActive())
1906
message::Transaction *transaction= getActiveTransactionMessage(session);
1907
message::Statement *statement= transaction->add_statement();
1909
initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1912
* Construct the specialized CreateTableStatement message and attach
1913
* it to the generic Statement message
1915
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1916
message::Table *new_table_message= create_table_statement->mutable_table();
1917
*new_table_message= table;
1919
finalizeStatementMessage(*statement, session);
1921
finalizeTransactionMessage(*transaction, session);
1923
(void) replication_services.pushTransactionMessage(session, *transaction);
1925
cleanupTransactionMessage(transaction, session);
1929
void TransactionServices::createSchema(Session::reference session,
1930
const message::Schema &schema)
1932
ReplicationServices &replication_services= ReplicationServices::singleton();
1933
if (! replication_services.isActive())
1936
message::Transaction *transaction= getActiveTransactionMessage(session);
1937
message::Statement *statement= transaction->add_statement();
1939
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1942
* Construct the specialized CreateSchemaStatement message and attach
1943
* it to the generic Statement message
1945
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1946
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
*new_schema_message= schema;
1949
finalizeStatementMessage(*statement, session);
1951
finalizeTransactionMessage(*transaction, session);
1953
(void) replication_services.pushTransactionMessage(session, *transaction);
1955
cleanupTransactionMessage(transaction, session);
1959
void TransactionServices::dropSchema(Session::reference session,
1960
identifier::Schema::const_reference identifier)
1962
ReplicationServices &replication_services= ReplicationServices::singleton();
1963
if (! replication_services.isActive())
1966
message::Transaction *transaction= getActiveTransactionMessage(session);
1967
message::Statement *statement= transaction->add_statement();
1969
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1972
* Construct the specialized DropSchemaStatement message and attach
1973
* it to the generic Statement message
1975
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1977
drop_schema_statement->set_schema_name(identifier.getSchemaName());
1979
finalizeStatementMessage(*statement, session);
1981
finalizeTransactionMessage(*transaction, session);
1983
(void) replication_services.pushTransactionMessage(session, *transaction);
1985
cleanupTransactionMessage(transaction, session);
1988
void TransactionServices::alterSchema(Session::reference session,
1989
const message::schema::shared_ptr &old_schema,
1990
const message::Schema &new_schema)
1992
ReplicationServices &replication_services= ReplicationServices::singleton();
1993
if (! replication_services.isActive())
1996
message::Transaction *transaction= getActiveTransactionMessage(session);
1997
message::Statement *statement= transaction->add_statement();
1999
initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2002
* Construct the specialized AlterSchemaStatement message and attach
2003
* it to the generic Statement message
2005
message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2007
message::Schema *before= alter_schema_statement->mutable_before();
2008
message::Schema *after= alter_schema_statement->mutable_after();
2010
*before= *old_schema;
2013
finalizeStatementMessage(*statement, session);
2015
finalizeTransactionMessage(*transaction, session);
2017
(void) replication_services.pushTransactionMessage(session, *transaction);
2019
cleanupTransactionMessage(transaction, session);
2022
void TransactionServices::dropTable(Session::reference session,
2023
const identifier::Table &table,
2026
ReplicationServices &replication_services= ReplicationServices::singleton();
2027
if (! replication_services.isActive())
2030
message::Transaction *transaction= getActiveTransactionMessage(session);
2031
message::Statement *statement= transaction->add_statement();
2033
initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2036
* Construct the specialized DropTableStatement message and attach
2037
* it to the generic Statement message
2039
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2041
drop_table_statement->set_if_exists_clause(if_exists);
2043
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2045
table_metadata->set_schema_name(table.getSchemaName());
2046
table_metadata->set_table_name(table.getTableName());
2048
finalizeStatementMessage(*statement, session);
2050
finalizeTransactionMessage(*transaction, session);
2052
(void) replication_services.pushTransactionMessage(session, *transaction);
2054
cleanupTransactionMessage(transaction, session);
2057
void TransactionServices::truncateTable(Session::reference session,
2060
ReplicationServices &replication_services= ReplicationServices::singleton();
2061
if (! replication_services.isActive())
2064
message::Transaction *transaction= getActiveTransactionMessage(session);
2065
message::Statement *statement= transaction->add_statement();
2067
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2070
* Construct the specialized TruncateTableStatement message and attach
2071
* it to the generic Statement message
2073
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2074
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2077
(void) table.getShare()->getSchemaName(schema_name);
2079
(void) table.getShare()->getTableName(table_name);
2081
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2084
finalizeStatementMessage(*statement, session);
2086
finalizeTransactionMessage(*transaction, session);
2088
(void) replication_services.pushTransactionMessage(session, *transaction);
2090
cleanupTransactionMessage(transaction, session);
2093
void TransactionServices::rawStatement(Session::reference session,
2094
const string &query)
2096
ReplicationServices &replication_services= ReplicationServices::singleton();
2097
if (! replication_services.isActive())
2100
message::Transaction *transaction= getActiveTransactionMessage(session);
2101
message::Statement *statement= transaction->add_statement();
2103
initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2104
statement->set_sql(query);
2105
finalizeStatementMessage(*statement, session);
2107
finalizeTransactionMessage(*transaction, session);
2109
(void) replication_services.pushTransactionMessage(session, *transaction);
2111
cleanupTransactionMessage(transaction, session);
2114
int TransactionServices::sendEvent(Session::reference session,
2115
const message::Event &event)
2117
ReplicationServices &replication_services= ReplicationServices::singleton();
2118
if (! replication_services.isActive())
2121
message::Transaction *transaction= new (nothrow) message::Transaction();
2123
// set server id, start timestamp
2124
initTransactionMessage(*transaction, session, true);
2126
// set end timestamp
2127
finalizeTransactionMessage(*transaction, session);
2129
message::Event *trx_event= transaction->mutable_event();
2131
trx_event->CopyFrom(event);
2133
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2137
return static_cast<int>(result);
2140
bool TransactionServices::sendStartupEvent(Session::reference session)
2142
message::Event event;
2143
event.set_type(message::Event::STARTUP);
2144
if (sendEvent(session, event) != 0)
2149
bool TransactionServices::sendShutdownEvent(Session::reference session)
2151
message::Event event;
2152
event.set_type(message::Event::SHUTDOWN);
2153
if (sendEvent(session, event) != 0)
2158
} /* namespace drizzled */