1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
* @file Transaction processing code
26
* The TransactionServices component takes internal events (for instance the start of a
27
* transaction, the changing of a record, or the rollback of a transaction)
28
* and constructs GPB Messages that are passed to the ReplicationServices
29
* component and used during replication.
31
* The reason for this functionality is to encapsulate all communication
32
* between the kernel and the replicator/applier plugins into GPB Messages.
33
* Instead of the plugin having to understand the (often fluidly changing)
34
* mechanics of the kernel, all the plugin needs to understand is the message
35
* format, and GPB messages provide a nice, clear, and versioned format for
38
* @see /drizzled/message/transaction.proto
42
* We really should store the raw bytes in the messages, not the
43
* String value of the Field. But, to do that, the
44
* statement_transform library needs first to be updated
45
* to include the transformation code to convert raw
46
* Drizzle-internal Field byte representation into something
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/internal/my_sys.h"
83
/** @TODO: Make this a system variable */
84
static const size_t trx_msg_threshold= 1024 * 1024;
87
* @defgroup Transactions
91
* Transaction handling in the server
95
* In each client connection, Drizzle maintains two transaction
96
* contexts representing the state of the:
98
* 1) Statement Transaction
99
* 2) Normal Transaction
101
* These two transaction contexts represent the transactional
102
* state of a Session's SQL and XA transactions for a single
103
* SQL statement or a series of SQL statements.
105
* When the Session's connection is in AUTOCOMMIT mode, there
106
* is no practical difference between the statement and the
107
* normal transaction, as each SQL statement is committed or
108
* rolled back depending on the success or failure of the
109
* indvidual SQL statement.
111
* When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
* the Session has explicitly begun a normal SQL transaction using
113
* a BEGIN WORK/START TRANSACTION statement, then the normal
114
* transaction context tracks the aggregate transaction state of
115
* the SQL transaction's individual statements, and the SQL
116
* transaction's commit or rollback is done atomically for all of
117
* the SQL transaction's statement's data changes.
119
* Technically, a statement transaction can be viewed as a savepoint
120
* which is maintained automatically in order to make effects of one
123
* The normal transaction is started by the user and is typically
124
* ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
* The exception to this is that DDL statements implicitly COMMIT
126
* any previously active normal transaction before they begin executing.
128
* In Drizzle, unlike MySQL, plugins other than a storage engine
129
* may participate in a transaction. All plugin::TransactionalStorageEngine
130
* plugins will automatically be monitored by Drizzle's transaction
131
* manager (implemented in this source file), as will all plugins which
132
* implement plugin::XaResourceManager and register with the transaction
135
* If Drizzle's transaction manager sees that more than one resource
136
* manager (transactional storage engine or XA resource manager) has modified
137
* data state during a statement or normal transaction, the transaction
138
* manager will automatically use a two-phase commit protocol for all
139
* resources which support XA's distributed transaction protocol. Unlike
140
* MySQL, storage engines need not manually register with the transaction
141
* manager during a statement's execution. Previously, in MySQL, all
142
* handlertons would have to call trans_register_ha() at some point after
143
* modifying data state in order to have MySQL include that handler in
144
* an XA transaction. Drizzle does all of this grunt work behind the
145
* scenes for the storage engine implementers.
147
* When a connection is closed, the current normal transaction, if
148
* any is currently active, is rolled back.
150
* Transaction life cycle
151
* ----------------------
153
* When a new connection is established, session->transaction
154
* members are initialized to an empty state. If a statement uses any tables,
155
* all affected engines are registered in the statement engine list automatically
156
* in plugin::StorageEngine::startStatement() and
157
* plugin::TransactionalStorageEngine::startTransaction().
159
* You can view the lifetime of a normal transaction in the following
162
* drizzled::statement::Statement::execute()
163
* drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
* drizzled::TransactionServices::registerResourceForTransaction()
165
* drizzled::TransactionServices::registerResourceForStatement()
166
* drizzled::plugin::StorageEngine::startStatement()
167
* drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
* drizzled::plugin::StorageEngine::endStatement()
169
* drizzled::TransactionServices::autocommitOrRollback()
170
* drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
* drizzled::XaResourceManager::xaCommit() <-- or rollback()
173
* Roles and responsibilities
174
* --------------------------
176
* Beginning of SQL Statement (and Statement Transaction)
177
* ------------------------------------------------------
179
* At the start of each SQL statement, for each storage engine
180
* <strong>that is involved in the SQL statement</strong>, the kernel
181
* calls the engine's plugin::StoragEngine::startStatement() method. If the
182
* engine needs to track some data for the statement, it should use
183
* this method invocation to initialize this data. This is the
184
* beginning of what is called the "statement transaction".
186
* <strong>For transaction storage engines (those storage engines
187
* that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
* kernel automatically determines if the start of the SQL statement
189
* transaction should <em>also</em> begin the normal SQL transaction.
190
* This occurs when the connection is in NOT in autocommit mode. If
191
* the kernel detects this, then the kernel automatically starts the
192
* normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
* method and then calls plugin::StorageEngine::startStatement()
196
* Beginning of an SQL "Normal" Transaction
197
* ----------------------------------------
199
* As noted above, a "normal SQL transaction" may be started when
200
* an SQL statement is started in a connection and the connection is
201
* NOT in AUTOCOMMIT mode. This is automatically done by the kernel.
203
* In addition, when a user executes a START TRANSACTION or
204
* BEGIN WORK statement in a connection, the kernel explicitly
205
* calls each transactional storage engine's startTransaction() method.
207
* Ending of an SQL Statement (and Statement Transaction)
208
* ------------------------------------------------------
210
* At the end of each SQL statement, for each of the aforementioned
211
* involved storage engines, the kernel calls the engine's
212
* plugin::StorageEngine::endStatement() method. If the engine
213
* has initialized or modified some internal data about the
214
* statement transaction, it should use this method to reset or destroy
215
* this data appropriately.
217
* Ending of an SQL "Normal" Transaction
218
* -------------------------------------
220
* The end of a normal transaction is either a ROLLBACK or a COMMIT,
221
* depending on the success or failure of the statement transaction(s)
224
* The end of a "normal transaction" occurs when any of the following
227
* 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
* then the normal transaction which encloses the statement
230
* 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
* 3) Just before a DDL operation occurs, the kernel will implicitly
232
* commit the active normal transaction
234
* Transactions and Non-transactional Storage Engines
235
* --------------------------------------------------
237
* For non-transactional engines, this call can be safely ignored, an
238
* the kernel tracks whether a non-transactional engine has changed
239
* any data state, and warns the user appropriately if a transaction
240
* (statement or normal) is rolled back after such non-transactional
241
* data changes have been made.
243
* XA Two-phase Commit Protocol
244
* ----------------------------
246
* During statement execution, whenever any of data-modifying
247
* PSEA API methods is used, e.g. Cursor::write_row() or
248
* Cursor::update_row(), the read-write flag is raised in the
249
* statement transaction for the involved engine.
250
* Currently All PSEA calls are "traced", and the data can not be
251
* changed in a way other than issuing a PSEA call. Important:
252
* unless this invariant is preserved the server will not know that
253
* a transaction in a given engine is read-write and will not
254
* involve the two-phase commit protocol!
256
* At the end of a statement, TransactionServices::autocommitOrRollback()
257
* is invoked. This call in turn
258
* invokes plugin::XaResourceManager::xapPepare() for every involved XA
261
* Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
* or plugin::XaResourceManager::xaCommit() (depending on what the resource
265
* If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
* invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
* At statement commit, the statement-related read-write engine
268
* flag is propagated to the corresponding flag in the normal
269
* transaction. When the commit is complete, the list of registered
270
* engines is cleared.
272
* Rollback is handled in a similar fashion.
274
* Additional notes on DDL and the normal transaction.
275
* ---------------------------------------------------
277
* CREATE TABLE .. SELECT can start a *new* normal transaction
278
* because of the fact that SELECTs on a transactional storage
279
* engine participate in the normal SQL transaction (due to
280
* isolation level issues and consistent read views).
282
* Behaviour of the server in this case is currently badly
285
* DDL statements use a form of "semantic" logging
286
* to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
* the newly created table is deleted.
289
* In addition, some DDL statements issue interim transaction
290
* commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
* from the original table to the internal temporary table. Other
292
* statements, e.g. CREATE TABLE ... SELECT do not always commit
295
* And finally there is a group of DDL statements such as
296
* RENAME/DROP TABLE that doesn't start a new transaction
297
* and doesn't commit.
299
* A consistent behaviour is perhaps to always commit the normal
300
* transaction after all DDLs, just like the statement transaction
301
* is always committed at the end of all statements.
303
void TransactionServices::registerResourceForStatement(Session *session,
304
plugin::MonitoredInTransaction *monitored,
305
plugin::TransactionalStorageEngine *engine)
307
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
310
* Now we automatically register this resource manager for the
311
* normal transaction. This is fine because a statement
312
* transaction registration should always enlist the resource
313
* in the normal transaction which contains the statement
316
registerResourceForTransaction(session, monitored, engine);
319
TransactionContext *trans= &session->transaction.stmt;
320
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
322
if (resource_context->isStarted())
323
return; /* already registered, return */
325
assert(monitored->participatesInSqlTransaction());
326
assert(not monitored->participatesInXaTransaction());
328
resource_context->setMonitored(monitored);
329
resource_context->setTransactionalStorageEngine(engine);
330
trans->registerResource(resource_context);
332
trans->no_2pc|= true;
335
void TransactionServices::registerResourceForStatement(Session *session,
336
plugin::MonitoredInTransaction *monitored,
337
plugin::TransactionalStorageEngine *engine,
338
plugin::XaResourceManager *resource_manager)
340
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
343
* Now we automatically register this resource manager for the
344
* normal transaction. This is fine because a statement
345
* transaction registration should always enlist the resource
346
* in the normal transaction which contains the statement
349
registerResourceForTransaction(session, monitored, engine, resource_manager);
352
TransactionContext *trans= &session->transaction.stmt;
353
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
355
if (resource_context->isStarted())
356
return; /* already registered, return */
358
assert(monitored->participatesInXaTransaction());
359
assert(monitored->participatesInSqlTransaction());
361
resource_context->setMonitored(monitored);
362
resource_context->setTransactionalStorageEngine(engine);
363
resource_context->setXaResourceManager(resource_manager);
364
trans->registerResource(resource_context);
366
trans->no_2pc|= false;
369
void TransactionServices::registerResourceForTransaction(Session *session,
370
plugin::MonitoredInTransaction *monitored,
371
plugin::TransactionalStorageEngine *engine)
373
TransactionContext *trans= &session->transaction.all;
374
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
376
if (resource_context->isStarted())
377
return; /* already registered, return */
379
session->server_status|= SERVER_STATUS_IN_TRANS;
381
trans->registerResource(resource_context);
383
assert(monitored->participatesInSqlTransaction());
384
assert(not monitored->participatesInXaTransaction());
386
resource_context->setMonitored(monitored);
387
resource_context->setTransactionalStorageEngine(engine);
388
trans->no_2pc|= true;
390
if (session->transaction.xid_state.xid.is_null())
391
session->transaction.xid_state.xid.set(session->getQueryId());
393
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
395
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
396
if (! session->getResourceContext(monitored, 0)->isStarted())
397
registerResourceForStatement(session, monitored, engine);
400
void TransactionServices::registerResourceForTransaction(Session *session,
401
plugin::MonitoredInTransaction *monitored,
402
plugin::TransactionalStorageEngine *engine,
403
plugin::XaResourceManager *resource_manager)
405
TransactionContext *trans= &session->transaction.all;
406
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
408
if (resource_context->isStarted())
409
return; /* already registered, return */
411
session->server_status|= SERVER_STATUS_IN_TRANS;
413
trans->registerResource(resource_context);
415
assert(monitored->participatesInSqlTransaction());
417
resource_context->setMonitored(monitored);
418
resource_context->setXaResourceManager(resource_manager);
419
resource_context->setTransactionalStorageEngine(engine);
420
trans->no_2pc|= true;
422
if (session->transaction.xid_state.xid.is_null())
423
session->transaction.xid_state.xid.set(session->getQueryId());
425
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
427
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
428
if (! session->getResourceContext(monitored, 0)->isStarted())
429
registerResourceForStatement(session, monitored, engine, resource_manager);
436
1 transaction was rolled back
438
2 error during commit, data may be inconsistent
441
Since we don't support nested statement transactions in 5.0,
442
we can't commit or rollback stmt transactions while we are inside
443
stored functions or triggers. So we simply do nothing now.
444
TODO: This should be fixed in later ( >= 5.1) releases.
446
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
448
int error= 0, cookie= 0;
450
'all' means that this is either an explicit commit issued by
451
user, or an implicit commit issued by a DDL.
453
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
454
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
456
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
459
We must not commit the normal transaction if a statement
460
transaction is pending. Otherwise statement transaction
461
flags will not get propagated to its normal transaction's
464
assert(session->transaction.stmt.getResourceContexts().empty() ||
465
trans == &session->transaction.stmt);
467
if (resource_contexts.empty() == false)
469
if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
471
rollbackTransaction(session, normal_transaction);
476
* If replication is on, we do a PREPARE on the resource managers, push the
477
* Transaction message across the replication stream, and then COMMIT if the
478
* replication stream returned successfully.
480
if (shouldConstructMessages())
482
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
483
it != resource_contexts.end() && ! error;
486
ResourceContext *resource_context= *it;
489
Do not call two-phase commit if this particular
490
transaction is read-only. This allows for simpler
491
implementation in engines that are always read-only.
493
if (! resource_context->hasModifiedData())
496
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
498
if (resource->participatesInXaTransaction())
500
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
502
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
507
session->status_var.ha_prepare_count++;
511
if (error == 0 && is_real_trans)
514
* Push the constructed Transaction messages across to
515
* replicators and appliers.
517
error= commitTransactionMessage(session);
521
rollbackTransaction(session, normal_transaction);
526
error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
529
start_waiting_global_read_lock(session);
536
This function does not care about global read lock. A caller should.
538
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
541
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
542
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
544
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
546
if (resource_contexts.empty() == false)
548
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
549
it != resource_contexts.end();
553
ResourceContext *resource_context= *it;
555
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
557
if (resource->participatesInXaTransaction())
559
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
561
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
564
else if (normal_transaction)
566
session->status_var.ha_commit_count++;
569
else if (resource->participatesInSqlTransaction())
571
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
573
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
576
else if (normal_transaction)
578
session->status_var.ha_commit_count++;
581
resource_context->reset(); /* keep it conveniently zero-filled */
585
session->transaction.xid_state.xid.null();
587
if (normal_transaction)
589
session->variables.tx_isolation= session->session_tx_isolation;
590
session->transaction.cleanup();
597
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
600
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
601
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
603
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
606
We must not rollback the normal transaction if a statement
607
transaction is pending.
609
assert(session->transaction.stmt.getResourceContexts().empty() ||
610
trans == &session->transaction.stmt);
612
if (resource_contexts.empty() == false)
614
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
615
it != resource_contexts.end();
619
ResourceContext *resource_context= *it;
621
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
623
if (resource->participatesInXaTransaction())
625
if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
627
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
630
else if (normal_transaction)
632
session->status_var.ha_rollback_count++;
635
else if (resource->participatesInSqlTransaction())
637
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
639
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
642
else if (normal_transaction)
644
session->status_var.ha_rollback_count++;
647
resource_context->reset(); /* keep it conveniently zero-filled */
651
* We need to signal the ROLLBACK to ReplicationServices here
652
* BEFORE we set the transaction ID to NULL. This is because
653
* if a bulk segment was sent to replicators, we need to send
654
* a rollback statement with the corresponding transaction ID
657
rollbackTransactionMessage(session);
660
session->transaction.xid_state.xid.null();
661
if (normal_transaction)
663
session->variables.tx_isolation=session->session_tx_isolation;
664
session->transaction.cleanup();
667
if (normal_transaction)
668
session->transaction_rollback_request= false;
671
* If a non-transactional table was updated, warn the user
674
session->transaction.all.hasModifiedNonTransData() &&
675
session->killed != Session::KILL_CONNECTION)
677
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
678
ER_WARNING_NOT_COMPLETE_ROLLBACK,
679
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
686
This is used to commit or rollback a single statement depending on
690
Note that if the autocommit is on, then the following call inside
691
InnoDB will commit or rollback the whole transaction (= the statement). The
692
autocommit mechanism built into InnoDB is based on counting locks, but if
693
the user has used LOCK TABLES then that mechanism does not know to do the
696
int TransactionServices::autocommitOrRollback(Session *session, int error)
698
if (session->transaction.stmt.getResourceContexts().empty() == false)
702
if (commitTransaction(session, false))
707
(void) rollbackTransaction(session, false);
708
if (session->transaction_rollback_request)
709
(void) rollbackTransaction(session, true);
712
session->variables.tx_isolation= session->session_tx_isolation;
717
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
719
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
721
/* The below is perfectly fine, since we're simply comparing addresses for the underlying
722
* resources aren't the same... */
723
return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
727
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
730
TransactionContext *trans= &session->transaction.all;
731
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
732
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
734
trans->no_2pc= false;
736
rolling back to savepoint in all storage engines that were part of the
737
transaction when the savepoint was set
739
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
740
it != sv_resource_contexts.end();
744
ResourceContext *resource_context= *it;
746
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
748
if (resource->participatesInSqlTransaction())
750
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
752
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
757
session->status_var.ha_savepoint_rollback_count++;
760
trans->no_2pc|= not resource->participatesInXaTransaction();
763
rolling back the transaction in all storage engines that were not part of
764
the transaction when the savepoint was set
767
TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
768
TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
769
TransactionContext::ResourceContexts set_difference_contexts;
772
* Bug #542299: segfault during set_difference() below. copy<>() requires pre-allocation
773
* of all elements, including the target, which is why we pre-allocate the set_difference_contexts
776
set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
778
sort(sorted_tran_resource_contexts.begin(),
779
sorted_tran_resource_contexts.end(),
780
ResourceContextCompare());
781
sort(sorted_sv_resource_contexts.begin(),
782
sorted_sv_resource_contexts.end(),
783
ResourceContextCompare());
784
set_difference(sorted_tran_resource_contexts.begin(),
785
sorted_tran_resource_contexts.end(),
786
sorted_sv_resource_contexts.begin(),
787
sorted_sv_resource_contexts.end(),
788
set_difference_contexts.begin(),
789
ResourceContextCompare());
791
* set_difference_contexts now contains all resource contexts
792
* which are in the transaction context but were NOT in the
793
* savepoint's resource contexts.
796
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
797
it != set_difference_contexts.end();
800
ResourceContext *resource_context= *it;
803
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
805
if (resource->participatesInSqlTransaction())
807
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
809
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
814
session->status_var.ha_rollback_count++;
817
resource_context->reset(); /* keep it conveniently zero-filled */
820
trans->setResourceContexts(sv_resource_contexts);
822
if (shouldConstructMessages())
824
cleanupTransactionMessage(getActiveTransactionMessage(session), session);
825
message::Transaction *savepoint_transaction= sv.getTransactionMessage();
826
if (savepoint_transaction != NULL)
828
/* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
829
Upon commit the savepoint_transaction_copy will be cleaned up by a call to
830
cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
831
up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
833
message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
834
uint32_t num_statements = savepoint_transaction_copy->statement_size();
835
if (num_statements == 0)
837
session->setStatementMessage(NULL);
841
session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
843
session->setTransactionMessage(savepoint_transaction_copy);
852
according to the sql standard (ISO/IEC 9075-2:2003)
853
section "4.33.4 SQL-statements and transaction states",
854
NamedSavepoint is *not* transaction-initiating SQL-statement
856
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
859
TransactionContext *trans= &session->transaction.all;
860
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
862
if (resource_contexts.empty() == false)
864
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
865
it != resource_contexts.end();
868
ResourceContext *resource_context= *it;
871
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
873
if (resource->participatesInSqlTransaction())
875
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
877
my_error(ER_GET_ERRNO, MYF(0), err);
882
session->status_var.ha_savepoint_count++;
888
Remember the list of registered storage engines.
890
sv.setResourceContexts(resource_contexts);
892
if (shouldConstructMessages())
894
message::Transaction *transaction= session->getTransactionMessage();
896
if (transaction != NULL)
898
message::Transaction *transaction_savepoint=
899
new message::Transaction(*transaction);
900
sv.setTransactionMessage(transaction_savepoint);
907
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
911
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
913
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
914
it != resource_contexts.end();
918
ResourceContext *resource_context= *it;
920
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
922
if (resource->participatesInSqlTransaction())
924
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
926
my_error(ER_GET_ERRNO, MYF(0), err);
935
bool TransactionServices::shouldConstructMessages()
937
ReplicationServices &replication_services= ReplicationServices::singleton();
938
return replication_services.isActive();
941
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
943
message::Transaction *transaction= in_session->getTransactionMessage();
945
if (unlikely(transaction == NULL))
948
* Allocate and initialize a new transaction message
949
* for this Session object. Session is responsible for
950
* deleting transaction message when done with it.
952
transaction= new (nothrow) message::Transaction();
953
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
954
in_session->setTransactionMessage(transaction);
961
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
963
bool should_inc_trx_id)
965
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
966
trx->set_server_id(in_session->getServerId());
968
if (should_inc_trx_id)
969
trx->set_transaction_id(getNextTransactionId());
971
trx->set_start_timestamp(in_session->getCurrentTimestamp());
974
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
977
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
978
trx->set_end_timestamp(in_session->getCurrentTimestamp());
981
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
984
delete in_transaction;
985
in_session->setStatementMessage(NULL);
986
in_session->setTransactionMessage(NULL);
989
int TransactionServices::commitTransactionMessage(Session *in_session)
991
ReplicationServices &replication_services= ReplicationServices::singleton();
992
if (! replication_services.isActive())
995
/* If there is an active statement message, finalize it */
996
message::Statement *statement= in_session->getStatementMessage();
998
if (statement != NULL)
1000
finalizeStatementMessage(*statement, in_session);
1003
return 0; /* No data modification occurred inside the transaction */
1005
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1007
finalizeTransactionMessage(*transaction, in_session);
1009
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1011
cleanupTransactionMessage(transaction, in_session);
1013
return static_cast<int>(result);
1016
void TransactionServices::initStatementMessage(message::Statement &statement,
1017
message::Statement::Type in_type,
1018
Session *in_session)
1020
statement.set_type(in_type);
1021
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1022
/** @TODO Set sql string optionally */
1025
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1026
Session *in_session)
1028
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1029
in_session->setStatementMessage(NULL);
1032
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1034
ReplicationServices &replication_services= ReplicationServices::singleton();
1035
if (! replication_services.isActive())
1038
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1041
* OK, so there are two situations that we need to deal with here:
1043
* 1) We receive an instruction to ROLLBACK the current transaction
1044
* and the currently-stored Transaction message is *self-contained*,
1045
* meaning that no Statement messages in the Transaction message
1046
* contain a message having its segment_id member greater than 1. If
1047
* no non-segment ID 1 members are found, we can simply clear the
1048
* current Transaction message and remove it from memory.
1050
* 2) If the Transaction message does indeed have a non-end segment, that
1051
* means that a bulk update/delete/insert Transaction message segment
1052
* has previously been sent over the wire to replicators. In this case,
1053
* we need to package a Transaction with a Statement message of type
1054
* ROLLBACK to indicate to replicators that previously-transmitted
1055
* messages must be un-applied.
1057
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1059
/* Remember the transaction ID so we can re-use it */
1060
uint64_t trx_id= transaction->transaction_context().transaction_id();
1063
* Clear the transaction, create a Rollback statement message,
1064
* attach it to the transaction, and push it to replicators.
1066
transaction->Clear();
1067
initTransactionMessage(*transaction, in_session, false);
1069
/* Set the transaction ID to match the previous messages */
1070
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1072
message::Statement *statement= transaction->add_statement();
1074
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1075
finalizeStatementMessage(*statement, in_session);
1077
finalizeTransactionMessage(*transaction, in_session);
1079
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1081
cleanupTransactionMessage(transaction, in_session);
1084
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1086
uint32_t *next_segment_id)
1088
message::Statement *statement= in_session->getStatementMessage();
1089
message::Transaction *transaction= NULL;
1092
* Check the type for the current Statement message, if it is anything
1093
* other then INSERT we need to call finalize, this will ensure a
1094
* new InsertStatement is created. If it is of type INSERT check
1095
* what table the INSERT belongs to, if it is a different table
1096
* call finalize, so a new InsertStatement can be created.
1098
if (statement != NULL && statement->type() != message::Statement::INSERT)
1100
finalizeStatementMessage(*statement, in_session);
1101
statement= in_session->getStatementMessage();
1103
else if (statement != NULL)
1105
transaction= getActiveTransactionMessage(in_session);
1108
* If we've passed our threshold for the statement size (possible for
1109
* a bulk insert), we'll finalize the Statement and Transaction (doing
1110
* the Transaction will keep it from getting huge).
1112
if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1114
/* Remember the transaction ID so we can re-use it */
1115
uint64_t trx_id= transaction->transaction_context().transaction_id();
1117
message::InsertData *current_data= statement->mutable_insert_data();
1119
/* Caller should use this value when adding a new record */
1120
*next_segment_id= current_data->segment_id() + 1;
1122
current_data->set_end_segment(false);
1125
* Send the trx message to replicators after finalizing the
1126
* statement and transaction. This will also set the Transaction
1127
* and Statement objects in Session to NULL.
1129
commitTransactionMessage(in_session);
1132
* Statement and Transaction should now be NULL, so new ones will get
1133
* created. We reuse the transaction id since we are segmenting
1136
statement= in_session->getStatementMessage();
1137
transaction= getActiveTransactionMessage(in_session, false);
1138
assert(transaction != NULL);
1140
/* Set the transaction ID to match the previous messages */
1141
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1145
const message::InsertHeader &insert_header= statement->insert_header();
1146
string old_table_name= insert_header.table_metadata().table_name();
1148
string current_table_name;
1149
(void) in_table->getShare()->getTableName(current_table_name);
1151
if (current_table_name.compare(old_table_name))
1153
finalizeStatementMessage(*statement, in_session);
1154
statement= in_session->getStatementMessage();
1158
/* carry forward the existing segment id */
1159
const message::InsertData ¤t_data= statement->insert_data();
1160
*next_segment_id= current_data.segment_id();
1165
if (statement == NULL)
1168
* Transaction will be non-NULL only if we had to segment it due to
1169
* transaction size above.
1171
if (transaction == NULL)
1172
transaction= getActiveTransactionMessage(in_session);
1175
* Transaction message initialized and set, but no statement created
1176
* yet. We construct one and initialize it, here, then return the
1177
* message after attaching the new Statement message pointer to the
1178
* Session for easy retrieval later...
1180
statement= transaction->add_statement();
1181
setInsertHeader(*statement, in_session, in_table);
1182
in_session->setStatementMessage(statement);
1187
void TransactionServices::setInsertHeader(message::Statement &statement,
1188
Session *in_session,
1191
initStatementMessage(statement, message::Statement::INSERT, in_session);
1194
* Now we construct the specialized InsertHeader message inside
1195
* the generalized message::Statement container...
1197
/* Set up the insert header */
1198
message::InsertHeader *header= statement.mutable_insert_header();
1199
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1202
(void) in_table->getShare()->getSchemaName(schema_name);
1204
(void) in_table->getShare()->getTableName(table_name);
1206
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1207
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1209
Field *current_field;
1210
Field **table_fields= in_table->getFields();
1212
message::FieldMetadata *field_metadata;
1214
/* We will read all the table's fields... */
1215
in_table->setReadSet();
1217
while ((current_field= *table_fields++) != NULL)
1219
field_metadata= header->add_field_metadata();
1220
field_metadata->set_name(current_field->field_name);
1221
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1225
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1227
ReplicationServices &replication_services= ReplicationServices::singleton();
1228
if (! replication_services.isActive())
1231
* We do this check here because we don't want to even create a
1232
* statement if there isn't a primary key on the table...
1236
* Multi-column primary keys are handled how exactly?
1238
if (not in_table->getShare()->hasPrimaryKey())
1240
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1244
uint32_t next_segment_id= 1;
1245
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1247
message::InsertData *data= statement.mutable_insert_data();
1248
data->set_segment_id(next_segment_id);
1249
data->set_end_segment(true);
1250
message::InsertRecord *record= data->add_record();
1252
Field *current_field;
1253
Field **table_fields= in_table->getFields();
1255
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1256
string_value->set_charset(system_charset_info);
1258
/* We will read all the table's fields... */
1259
in_table->setReadSet();
1261
while ((current_field= *table_fields++) != NULL)
1263
if (current_field->is_null())
1265
record->add_is_null(true);
1266
record->add_insert_value("", 0);
1270
string_value= current_field->val_str(string_value);
1271
record->add_is_null(false);
1272
record->add_insert_value(string_value->c_ptr(), string_value->length());
1273
string_value->free();
1279
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1281
const unsigned char *old_record,
1282
const unsigned char *new_record,
1283
uint32_t *next_segment_id)
1285
message::Statement *statement= in_session->getStatementMessage();
1286
message::Transaction *transaction= NULL;
1289
* Check the type for the current Statement message, if it is anything
1290
* other then UPDATE we need to call finalize, this will ensure a
1291
* new UpdateStatement is created. If it is of type UPDATE check
1292
* what table the UPDATE belongs to, if it is a different table
1293
* call finalize, so a new UpdateStatement can be created.
1295
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1297
finalizeStatementMessage(*statement, in_session);
1298
statement= in_session->getStatementMessage();
1300
else if (statement != NULL)
1302
transaction= getActiveTransactionMessage(in_session);
1305
* If we've passed our threshold for the statement size (possible for
1306
* a bulk insert), we'll finalize the Statement and Transaction (doing
1307
* the Transaction will keep it from getting huge).
1309
if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1311
/* Remember the transaction ID so we can re-use it */
1312
uint64_t trx_id= transaction->transaction_context().transaction_id();
1314
message::UpdateData *current_data= statement->mutable_update_data();
1316
/* Caller should use this value when adding a new record */
1317
*next_segment_id= current_data->segment_id() + 1;
1319
current_data->set_end_segment(false);
1322
* Send the trx message to replicators after finalizing the
1323
* statement and transaction. This will also set the Transaction
1324
* and Statement objects in Session to NULL.
1326
commitTransactionMessage(in_session);
1329
* Statement and Transaction should now be NULL, so new ones will get
1330
* created. We reuse the transaction id since we are segmenting
1333
statement= in_session->getStatementMessage();
1334
transaction= getActiveTransactionMessage(in_session, false);
1335
assert(transaction != NULL);
1337
/* Set the transaction ID to match the previous messages */
1338
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1342
const message::UpdateHeader &update_header= statement->update_header();
1343
string old_table_name= update_header.table_metadata().table_name();
1345
string current_table_name;
1346
(void) in_table->getShare()->getTableName(current_table_name);
1347
if (current_table_name.compare(old_table_name))
1349
finalizeStatementMessage(*statement, in_session);
1350
statement= in_session->getStatementMessage();
1354
/* carry forward the existing segment id */
1355
const message::UpdateData ¤t_data= statement->update_data();
1356
*next_segment_id= current_data.segment_id();
1361
if (statement == NULL)
1364
* Transaction will be non-NULL only if we had to segment it due to
1365
* transaction size above.
1367
if (transaction == NULL)
1368
transaction= getActiveTransactionMessage(in_session);
1371
* Transaction message initialized and set, but no statement created
1372
* yet. We construct one and initialize it, here, then return the
1373
* message after attaching the new Statement message pointer to the
1374
* Session for easy retrieval later...
1376
statement= transaction->add_statement();
1377
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1378
in_session->setStatementMessage(statement);
1383
void TransactionServices::setUpdateHeader(message::Statement &statement,
1384
Session *in_session,
1386
const unsigned char *old_record,
1387
const unsigned char *new_record)
1389
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1392
* Now we construct the specialized UpdateHeader message inside
1393
* the generalized message::Statement container...
1395
/* Set up the update header */
1396
message::UpdateHeader *header= statement.mutable_update_header();
1397
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1400
(void) in_table->getShare()->getSchemaName(schema_name);
1402
(void) in_table->getShare()->getTableName(table_name);
1404
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1405
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1407
Field *current_field;
1408
Field **table_fields= in_table->getFields();
1410
message::FieldMetadata *field_metadata;
1412
/* We will read all the table's fields... */
1413
in_table->setReadSet();
1415
while ((current_field= *table_fields++) != NULL)
1418
* We add the "key field metadata" -- i.e. the fields which is
1419
* the primary key for the table.
1421
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1423
field_metadata= header->add_key_field_metadata();
1424
field_metadata->set_name(current_field->field_name);
1425
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1429
* The below really should be moved into the Field API and Record API. But for now
1430
* we do this crazy pointer fiddling to figure out if the current field
1431
* has been updated in the supplied record raw byte pointers.
1433
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1434
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1436
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1438
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1440
/* Field is changed from old to new */
1441
field_metadata= header->add_set_field_metadata();
1442
field_metadata->set_name(current_field->field_name);
1443
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1447
void TransactionServices::updateRecord(Session *in_session,
1449
const unsigned char *old_record,
1450
const unsigned char *new_record)
1452
ReplicationServices &replication_services= ReplicationServices::singleton();
1453
if (! replication_services.isActive())
1456
uint32_t next_segment_id= 1;
1457
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1459
message::UpdateData *data= statement.mutable_update_data();
1460
data->set_segment_id(next_segment_id);
1461
data->set_end_segment(true);
1462
message::UpdateRecord *record= data->add_record();
1464
Field *current_field;
1465
Field **table_fields= in_table->getFields();
1466
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1467
string_value->set_charset(system_charset_info);
1469
while ((current_field= *table_fields++) != NULL)
1472
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1473
* but then realized that an UPDATE statement could potentially have different values for
1474
* the SET field. For instance, imagine this SQL scenario:
1476
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1477
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1478
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1480
* We will generate two UpdateRecord messages with different set_value byte arrays.
1482
* The below really should be moved into the Field API and Record API. But for now
1483
* we do this crazy pointer fiddling to figure out if the current field
1484
* has been updated in the supplied record raw byte pointers.
1486
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1487
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1489
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1491
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1493
/* Store the original "read bit" for this field */
1494
bool is_read_set= current_field->isReadSet();
1496
/* We need to mark that we will "read" this field... */
1497
in_table->setReadSet(current_field->field_index);
1499
/* Read the string value of this field's contents */
1500
string_value= current_field->val_str(string_value);
1503
* Reset the read bit after reading field to its original state. This
1504
* prevents the field from being included in the WHERE clause
1506
current_field->setReadSet(is_read_set);
1508
if (current_field->is_null())
1510
record->add_is_null(true);
1511
record->add_after_value("", 0);
1515
record->add_is_null(false);
1516
record->add_after_value(string_value->c_ptr(), string_value->length());
1518
string_value->free();
1522
* Add the WHERE clause values now...for now, this means the
1523
* primary key field value. Replication only supports tables
1524
* with a primary key.
1526
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1529
* To say the below is ugly is an understatement. But it works.
1531
* @todo Move this crap into a real Record API.
1533
string_value= current_field->val_str(string_value,
1535
current_field->offset(const_cast<unsigned char *>(new_record)));
1536
record->add_key_value(string_value->c_ptr(), string_value->length());
1537
string_value->free();
1543
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1545
uint32_t *next_segment_id)
1547
message::Statement *statement= in_session->getStatementMessage();
1548
message::Transaction *transaction= NULL;
1551
* Check the type for the current Statement message, if it is anything
1552
* other then DELETE we need to call finalize, this will ensure a
1553
* new DeleteStatement is created. If it is of type DELETE check
1554
* what table the DELETE belongs to, if it is a different table
1555
* call finalize, so a new DeleteStatement can be created.
1557
if (statement != NULL && statement->type() != message::Statement::DELETE)
1559
finalizeStatementMessage(*statement, in_session);
1560
statement= in_session->getStatementMessage();
1562
else if (statement != NULL)
1564
transaction= getActiveTransactionMessage(in_session);
1567
* If we've passed our threshold for the statement size (possible for
1568
* a bulk insert), we'll finalize the Statement and Transaction (doing
1569
* the Transaction will keep it from getting huge).
1571
if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1573
/* Remember the transaction ID so we can re-use it */
1574
uint64_t trx_id= transaction->transaction_context().transaction_id();
1576
message::DeleteData *current_data= statement->mutable_delete_data();
1578
/* Caller should use this value when adding a new record */
1579
*next_segment_id= current_data->segment_id() + 1;
1581
current_data->set_end_segment(false);
1584
* Send the trx message to replicators after finalizing the
1585
* statement and transaction. This will also set the Transaction
1586
* and Statement objects in Session to NULL.
1588
commitTransactionMessage(in_session);
1591
* Statement and Transaction should now be NULL, so new ones will get
1592
* created. We reuse the transaction id since we are segmenting
1595
statement= in_session->getStatementMessage();
1596
transaction= getActiveTransactionMessage(in_session, false);
1597
assert(transaction != NULL);
1599
/* Set the transaction ID to match the previous messages */
1600
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1604
const message::DeleteHeader &delete_header= statement->delete_header();
1605
string old_table_name= delete_header.table_metadata().table_name();
1607
string current_table_name;
1608
(void) in_table->getShare()->getTableName(current_table_name);
1609
if (current_table_name.compare(old_table_name))
1611
finalizeStatementMessage(*statement, in_session);
1612
statement= in_session->getStatementMessage();
1616
/* carry forward the existing segment id */
1617
const message::DeleteData ¤t_data= statement->delete_data();
1618
*next_segment_id= current_data.segment_id();
1623
if (statement == NULL)
1626
* Transaction will be non-NULL only if we had to segment it due to
1627
* transaction size above.
1629
if (transaction == NULL)
1630
transaction= getActiveTransactionMessage(in_session);
1633
* Transaction message initialized and set, but no statement created
1634
* yet. We construct one and initialize it, here, then return the
1635
* message after attaching the new Statement message pointer to the
1636
* Session for easy retrieval later...
1638
statement= transaction->add_statement();
1639
setDeleteHeader(*statement, in_session, in_table);
1640
in_session->setStatementMessage(statement);
1645
void TransactionServices::setDeleteHeader(message::Statement &statement,
1646
Session *in_session,
1649
initStatementMessage(statement, message::Statement::DELETE, in_session);
1652
* Now we construct the specialized DeleteHeader message inside
1653
* the generalized message::Statement container...
1655
message::DeleteHeader *header= statement.mutable_delete_header();
1656
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1659
(void) in_table->getShare()->getSchemaName(schema_name);
1661
(void) in_table->getShare()->getTableName(table_name);
1663
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1664
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1666
Field *current_field;
1667
Field **table_fields= in_table->getFields();
1669
message::FieldMetadata *field_metadata;
1671
while ((current_field= *table_fields++) != NULL)
1674
* Add the WHERE clause values now...for now, this means the
1675
* primary key field value. Replication only supports tables
1676
* with a primary key.
1678
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1680
field_metadata= header->add_key_field_metadata();
1681
field_metadata->set_name(current_field->field_name);
1682
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1687
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1689
ReplicationServices &replication_services= ReplicationServices::singleton();
1690
if (! replication_services.isActive())
1693
uint32_t next_segment_id= 1;
1694
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1696
message::DeleteData *data= statement.mutable_delete_data();
1697
data->set_segment_id(next_segment_id);
1698
data->set_end_segment(true);
1699
message::DeleteRecord *record= data->add_record();
1701
Field *current_field;
1702
Field **table_fields= in_table->getFields();
1703
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1704
string_value->set_charset(system_charset_info);
1706
while ((current_field= *table_fields++) != NULL)
1709
* Add the WHERE clause values now...for now, this means the
1710
* primary key field value. Replication only supports tables
1711
* with a primary key.
1713
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1715
if (use_update_record)
1718
* Temporarily point to the update record to get its value.
1719
* This is pretty much a hack in order to get the PK value from
1720
* the update record rather than the insert record. Field::val_str()
1721
* should not change anything in Field::ptr, so this should be safe.
1722
* We are careful not to change anything in old_ptr.
1724
const unsigned char *old_ptr= current_field->ptr;
1725
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1726
string_value= current_field->val_str(string_value);
1727
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1731
string_value= current_field->val_str(string_value);
1733
* @TODO Store optional old record value in the before data member
1736
record->add_key_value(string_value->c_ptr(), string_value->length());
1737
string_value->free();
1742
void TransactionServices::createTable(Session *in_session,
1743
const message::Table &table)
1745
ReplicationServices &replication_services= ReplicationServices::singleton();
1746
if (! replication_services.isActive())
1749
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1750
message::Statement *statement= transaction->add_statement();
1752
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1755
* Construct the specialized CreateTableStatement message and attach
1756
* it to the generic Statement message
1758
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1759
message::Table *new_table_message= create_table_statement->mutable_table();
1760
*new_table_message= table;
1762
finalizeStatementMessage(*statement, in_session);
1764
finalizeTransactionMessage(*transaction, in_session);
1766
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1768
cleanupTransactionMessage(transaction, in_session);
1772
void TransactionServices::createSchema(Session *in_session,
1773
const message::Schema &schema)
1775
ReplicationServices &replication_services= ReplicationServices::singleton();
1776
if (! replication_services.isActive())
1779
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1780
message::Statement *statement= transaction->add_statement();
1782
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1785
* Construct the specialized CreateSchemaStatement message and attach
1786
* it to the generic Statement message
1788
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1789
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1790
*new_schema_message= schema;
1792
finalizeStatementMessage(*statement, in_session);
1794
finalizeTransactionMessage(*transaction, in_session);
1796
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1798
cleanupTransactionMessage(transaction, in_session);
1802
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1804
ReplicationServices &replication_services= ReplicationServices::singleton();
1805
if (! replication_services.isActive())
1808
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1809
message::Statement *statement= transaction->add_statement();
1811
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1814
* Construct the specialized DropSchemaStatement message and attach
1815
* it to the generic Statement message
1817
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1819
drop_schema_statement->set_schema_name(schema_name);
1821
finalizeStatementMessage(*statement, in_session);
1823
finalizeTransactionMessage(*transaction, in_session);
1825
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1827
cleanupTransactionMessage(transaction, in_session);
1830
void TransactionServices::dropTable(Session *in_session,
1831
const string &schema_name,
1832
const string &table_name,
1835
ReplicationServices &replication_services= ReplicationServices::singleton();
1836
if (! replication_services.isActive())
1839
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1840
message::Statement *statement= transaction->add_statement();
1842
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1845
* Construct the specialized DropTableStatement message and attach
1846
* it to the generic Statement message
1848
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1850
drop_table_statement->set_if_exists_clause(if_exists);
1852
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1854
table_metadata->set_schema_name(schema_name);
1855
table_metadata->set_table_name(table_name);
1857
finalizeStatementMessage(*statement, in_session);
1859
finalizeTransactionMessage(*transaction, in_session);
1861
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1863
cleanupTransactionMessage(transaction, in_session);
1866
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1868
ReplicationServices &replication_services= ReplicationServices::singleton();
1869
if (! replication_services.isActive())
1872
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1873
message::Statement *statement= transaction->add_statement();
1875
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1878
* Construct the specialized TruncateTableStatement message and attach
1879
* it to the generic Statement message
1881
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1882
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1885
(void) in_table->getShare()->getSchemaName(schema_name);
1887
(void) in_table->getShare()->getTableName(table_name);
1889
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1890
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1892
finalizeStatementMessage(*statement, in_session);
1894
finalizeTransactionMessage(*transaction, in_session);
1896
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1898
cleanupTransactionMessage(transaction, in_session);
1901
void TransactionServices::rawStatement(Session *in_session, const string &query)
1903
ReplicationServices &replication_services= ReplicationServices::singleton();
1904
if (! replication_services.isActive())
1907
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1908
message::Statement *statement= transaction->add_statement();
1910
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1911
statement->set_sql(query);
1912
finalizeStatementMessage(*statement, in_session);
1914
finalizeTransactionMessage(*transaction, in_session);
1916
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1918
cleanupTransactionMessage(transaction, in_session);
1921
} /* namespace drizzled */