1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
7
* This program is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; version 2 of the License.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22
* @file Transaction processing code
26
* The TransactionServices component takes internal events (for instance the start of a
27
* transaction, the changing of a record, or the rollback of a transaction)
28
* and constructs GPB Messages that are passed to the ReplicationServices
29
* component and used during replication.
31
* The reason for this functionality is to encapsulate all communication
32
* between the kernel and the replicator/applier plugins into GPB Messages.
33
* Instead of the plugin having to understand the (often fluidly changing)
34
* mechanics of the kernel, all the plugin needs to understand is the message
35
* format, and GPB messages provide a nice, clear, and versioned format for
38
* @see /drizzled/message/transaction.proto
42
* We really should store the raw bytes in the messages, not the
43
* String value of the Field. But, to do that, the
44
* statement_transform library needs first to be updated
45
* to include the transformation code to convert raw
46
* Drizzle-internal Field byte representation into something
47
* plugins can understand.
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
60
#include "drizzled/transaction_context.h"
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
63
#include "drizzled/resource_context.h"
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
72
#include "drizzled/internal/my_sys.h"
84
* @defgroup Transactions
88
* Transaction handling in the server
92
* In each client connection, Drizzle maintains two transaction
93
* contexts representing the state of the:
95
* 1) Statement Transaction
96
* 2) Normal Transaction
98
* These two transaction contexts represent the transactional
99
* state of a Session's SQL and XA transactions for a single
100
* SQL statement or a series of SQL statements.
102
* When the Session's connection is in AUTOCOMMIT mode, there
103
* is no practical difference between the statement and the
104
* normal transaction, as each SQL statement is committed or
105
* rolled back depending on the success or failure of the
106
* indvidual SQL statement.
108
* When the Session's connection is NOT in AUTOCOMMIT mode, OR
109
* the Session has explicitly begun a normal SQL transaction using
110
* a BEGIN WORK/START TRANSACTION statement, then the normal
111
* transaction context tracks the aggregate transaction state of
112
* the SQL transaction's individual statements, and the SQL
113
* transaction's commit or rollback is done atomically for all of
114
* the SQL transaction's statement's data changes.
116
* Technically, a statement transaction can be viewed as a savepoint
117
* which is maintained automatically in order to make effects of one
120
* The normal transaction is started by the user and is typically
121
* ended (COMMIT or ROLLBACK) upon an explicity user request as well.
122
* The exception to this is that DDL statements implicitly COMMIT
123
* any previously active normal transaction before they begin executing.
125
* In Drizzle, unlike MySQL, plugins other than a storage engine
126
* may participate in a transaction. All plugin::TransactionalStorageEngine
127
* plugins will automatically be monitored by Drizzle's transaction
128
* manager (implemented in this source file), as will all plugins which
129
* implement plugin::XaResourceManager and register with the transaction
132
* If Drizzle's transaction manager sees that more than one resource
133
* manager (transactional storage engine or XA resource manager) has modified
134
* data state during a statement or normal transaction, the transaction
135
* manager will automatically use a two-phase commit protocol for all
136
* resources which support XA's distributed transaction protocol. Unlike
137
* MySQL, storage engines need not manually register with the transaction
138
* manager during a statement's execution. Previously, in MySQL, all
139
* handlertons would have to call trans_register_ha() at some point after
140
* modifying data state in order to have MySQL include that handler in
141
* an XA transaction. Drizzle does all of this grunt work behind the
142
* scenes for the storage engine implementers.
144
* When a connection is closed, the current normal transaction, if
145
* any is currently active, is rolled back.
147
* Transaction life cycle
148
* ----------------------
150
* When a new connection is established, session->transaction
151
* members are initialized to an empty state. If a statement uses any tables,
152
* all affected engines are registered in the statement engine list automatically
153
* in plugin::StorageEngine::startStatement() and
154
* plugin::TransactionalStorageEngine::startTransaction().
156
* You can view the lifetime of a normal transaction in the following
159
* drizzled::statement::Statement::execute()
160
* drizzled::plugin::TransactionalStorageEngine::startTransaction()
161
* drizzled::TransactionServices::registerResourceForTransaction()
162
* drizzled::TransactionServices::registerResourceForStatement()
163
* drizzled::plugin::StorageEngine::startStatement()
164
* drizzled::Cursor::write_row() <-- example...could be update_row(), etc
165
* drizzled::plugin::StorageEngine::endStatement()
166
* drizzled::TransactionServices::autocommitOrRollback()
167
* drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
168
* drizzled::XaResourceManager::xaCommit() <-- or rollback()
170
* Roles and responsibilities
171
* --------------------------
173
* Beginning of SQL Statement (and Statement Transaction)
174
* ------------------------------------------------------
176
* At the start of each SQL statement, for each storage engine
177
* <strong>that is involved in the SQL statement</strong>, the kernel
178
* calls the engine's plugin::StoragEngine::startStatement() method. If the
179
* engine needs to track some data for the statement, it should use
180
* this method invocation to initialize this data. This is the
181
* beginning of what is called the "statement transaction".
183
* <strong>For transaction storage engines (those storage engines
184
* that inherit from plugin::TransactionalStorageEngine)</strong>, the
185
* kernel automatically determines if the start of the SQL statement
186
* transaction should <em>also</em> begin the normal SQL transaction.
187
* This occurs when the connection is in NOT in autocommit mode. If
188
* the kernel detects this, then the kernel automatically starts the
189
* normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
190
* method and then calls plugin::StorageEngine::startStatement()
193
* Beginning of an SQL "Normal" Transaction
194
* ----------------------------------------
196
* As noted above, a "normal SQL transaction" may be started when
197
* an SQL statement is started in a connection and the connection is
198
* NOT in AUTOCOMMIT mode. This is automatically done by the kernel.
200
* In addition, when a user executes a START TRANSACTION or
201
* BEGIN WORK statement in a connection, the kernel explicitly
202
* calls each transactional storage engine's startTransaction() method.
204
* Ending of an SQL Statement (and Statement Transaction)
205
* ------------------------------------------------------
207
* At the end of each SQL statement, for each of the aforementioned
208
* involved storage engines, the kernel calls the engine's
209
* plugin::StorageEngine::endStatement() method. If the engine
210
* has initialized or modified some internal data about the
211
* statement transaction, it should use this method to reset or destroy
212
* this data appropriately.
214
* Ending of an SQL "Normal" Transaction
215
* -------------------------------------
217
* The end of a normal transaction is either a ROLLBACK or a COMMIT,
218
* depending on the success or failure of the statement transaction(s)
221
* The end of a "normal transaction" occurs when any of the following
224
* 1) If a statement transaction has completed and AUTOCOMMIT is ON,
225
* then the normal transaction which encloses the statement
227
* 2) If a COMMIT or ROLLBACK statement occurs on the connection
228
* 3) Just before a DDL operation occurs, the kernel will implicitly
229
* commit the active normal transaction
231
* Transactions and Non-transactional Storage Engines
232
* --------------------------------------------------
234
* For non-transactional engines, this call can be safely ignored, an
235
* the kernel tracks whether a non-transactional engine has changed
236
* any data state, and warns the user appropriately if a transaction
237
* (statement or normal) is rolled back after such non-transactional
238
* data changes have been made.
240
* XA Two-phase Commit Protocol
241
* ----------------------------
243
* During statement execution, whenever any of data-modifying
244
* PSEA API methods is used, e.g. Cursor::write_row() or
245
* Cursor::update_row(), the read-write flag is raised in the
246
* statement transaction for the involved engine.
247
* Currently All PSEA calls are "traced", and the data can not be
248
* changed in a way other than issuing a PSEA call. Important:
249
* unless this invariant is preserved the server will not know that
250
* a transaction in a given engine is read-write and will not
251
* involve the two-phase commit protocol!
253
* At the end of a statement, TransactionServices::autocommitOrRollback()
254
* is invoked. This call in turn
255
* invokes plugin::XaResourceManager::xapPepare() for every involved XA
258
* Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
259
* or plugin::XaResourceManager::xaCommit() (depending on what the resource
262
* If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
263
* invoked and the server only calls plugin::StorageEngine::commit_one_phase().
264
* At statement commit, the statement-related read-write engine
265
* flag is propagated to the corresponding flag in the normal
266
* transaction. When the commit is complete, the list of registered
267
* engines is cleared.
269
* Rollback is handled in a similar fashion.
271
* Additional notes on DDL and the normal transaction.
272
* ---------------------------------------------------
274
* CREATE TABLE .. SELECT can start a *new* normal transaction
275
* because of the fact that SELECTs on a transactional storage
276
* engine participate in the normal SQL transaction (due to
277
* isolation level issues and consistent read views).
279
* Behaviour of the server in this case is currently badly
282
* DDL statements use a form of "semantic" logging
283
* to maintain atomicity: if CREATE TABLE .. SELECT failed,
284
* the newly created table is deleted.
286
* In addition, some DDL statements issue interim transaction
287
* commits: e.g. ALTER TABLE issues a COMMIT after data is copied
288
* from the original table to the internal temporary table. Other
289
* statements, e.g. CREATE TABLE ... SELECT do not always commit
292
* And finally there is a group of DDL statements such as
293
* RENAME/DROP TABLE that doesn't start a new transaction
294
* and doesn't commit.
296
* A consistent behaviour is perhaps to always commit the normal
297
* transaction after all DDLs, just like the statement transaction
298
* is always committed at the end of all statements.
300
void TransactionServices::registerResourceForStatement(Session *session,
301
plugin::MonitoredInTransaction *monitored,
302
plugin::TransactionalStorageEngine *engine)
304
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
307
* Now we automatically register this resource manager for the
308
* normal transaction. This is fine because a statement
309
* transaction registration should always enlist the resource
310
* in the normal transaction which contains the statement
313
registerResourceForTransaction(session, monitored, engine);
316
TransactionContext *trans= &session->transaction.stmt;
317
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
319
if (resource_context->isStarted())
320
return; /* already registered, return */
322
assert(monitored->participatesInSqlTransaction());
323
assert(not monitored->participatesInXaTransaction());
325
resource_context->setMonitored(monitored);
326
resource_context->setTransactionalStorageEngine(engine);
327
trans->registerResource(resource_context);
329
trans->no_2pc|= true;
332
void TransactionServices::registerResourceForStatement(Session *session,
333
plugin::MonitoredInTransaction *monitored,
334
plugin::TransactionalStorageEngine *engine,
335
plugin::XaResourceManager *resource_manager)
337
if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
340
* Now we automatically register this resource manager for the
341
* normal transaction. This is fine because a statement
342
* transaction registration should always enlist the resource
343
* in the normal transaction which contains the statement
346
registerResourceForTransaction(session, monitored, engine, resource_manager);
349
TransactionContext *trans= &session->transaction.stmt;
350
ResourceContext *resource_context= session->getResourceContext(monitored, 0);
352
if (resource_context->isStarted())
353
return; /* already registered, return */
355
assert(monitored->participatesInXaTransaction());
356
assert(monitored->participatesInSqlTransaction());
358
resource_context->setMonitored(monitored);
359
resource_context->setTransactionalStorageEngine(engine);
360
resource_context->setXaResourceManager(resource_manager);
361
trans->registerResource(resource_context);
363
trans->no_2pc|= false;
366
void TransactionServices::registerResourceForTransaction(Session *session,
367
plugin::MonitoredInTransaction *monitored,
368
plugin::TransactionalStorageEngine *engine)
370
TransactionContext *trans= &session->transaction.all;
371
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
373
if (resource_context->isStarted())
374
return; /* already registered, return */
376
session->server_status|= SERVER_STATUS_IN_TRANS;
378
trans->registerResource(resource_context);
380
assert(monitored->participatesInSqlTransaction());
381
assert(not monitored->participatesInXaTransaction());
383
resource_context->setMonitored(monitored);
384
resource_context->setTransactionalStorageEngine(engine);
385
trans->no_2pc|= true;
387
if (session->transaction.xid_state.xid.is_null())
388
session->transaction.xid_state.xid.set(session->getQueryId());
390
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
392
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
393
if (! session->getResourceContext(monitored, 0)->isStarted())
394
registerResourceForStatement(session, monitored, engine);
397
void TransactionServices::registerResourceForTransaction(Session *session,
398
plugin::MonitoredInTransaction *monitored,
399
plugin::TransactionalStorageEngine *engine,
400
plugin::XaResourceManager *resource_manager)
402
TransactionContext *trans= &session->transaction.all;
403
ResourceContext *resource_context= session->getResourceContext(monitored, 1);
405
if (resource_context->isStarted())
406
return; /* already registered, return */
408
session->server_status|= SERVER_STATUS_IN_TRANS;
410
trans->registerResource(resource_context);
412
assert(monitored->participatesInSqlTransaction());
414
resource_context->setMonitored(monitored);
415
resource_context->setXaResourceManager(resource_manager);
416
resource_context->setTransactionalStorageEngine(engine);
417
trans->no_2pc|= true;
419
if (session->transaction.xid_state.xid.is_null())
420
session->transaction.xid_state.xid.set(session->getQueryId());
422
engine->startTransaction(session, START_TRANS_NO_OPTIONS);
424
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
425
if (! session->getResourceContext(monitored, 0)->isStarted())
426
registerResourceForStatement(session, monitored, engine, resource_manager);
430
Check if we can skip the two-phase commit.
432
A helper function to evaluate if two-phase commit is mandatory.
433
As a side effect, propagates the read-only/read-write flags
434
of the statement transaction to its enclosing normal transaction.
436
@retval true we must run a two-phase commit. Returned
437
if we have at least two engines with read-write changes.
438
@retval false Don't need two-phase commit. Even if we have two
439
transactional engines, we can run two independent
440
commits if changes in one of the engines are read-only.
444
ha_check_and_coalesce_trx_read_only(Session *session,
445
TransactionContext::ResourceContexts &resource_contexts,
446
bool normal_transaction)
448
/* The number of storage engines that have actual changes. */
449
unsigned num_resources_modified_data= 0;
450
ResourceContext *resource_context;
452
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453
it != resource_contexts.end();
456
resource_context= *it;
457
if (resource_context->hasModifiedData())
458
++num_resources_modified_data;
460
if (! normal_transaction)
462
ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463
assert(resource_context != resource_context_normal);
465
Merge read-only/read-write information about statement
466
transaction to its enclosing normal transaction. Do this
467
only if in a real transaction -- that is, if we know
468
that resource_context_all is registered in session->transaction.all.
469
Since otherwise we only clutter the normal transaction flags.
471
if (resource_context_normal->isStarted()) /* false if autocommit. */
472
resource_context_normal->coalesceWith(resource_context);
474
else if (num_resources_modified_data > 1)
477
It is a normal transaction, so we don't need to merge read/write
478
information up, and the need for two-phase commit has been
479
already established. Break the loop prematurely.
484
return num_resources_modified_data > 1;
492
1 transaction was rolled back
494
2 error during commit, data may be inconsistent
497
Since we don't support nested statement transactions in 5.0,
498
we can't commit or rollback stmt transactions while we are inside
499
stored functions or triggers. So we simply do nothing now.
500
TODO: This should be fixed in later ( >= 5.1) releases.
502
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
504
int error= 0, cookie= 0;
506
'all' means that this is either an explicit commit issued by
507
user, or an implicit commit issued by a DDL.
509
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
510
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
512
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
515
We must not commit the normal transaction if a statement
516
transaction is pending. Otherwise statement transaction
517
flags will not get propagated to its normal transaction's
520
assert(session->transaction.stmt.getResourceContexts().empty() ||
521
trans == &session->transaction.stmt);
523
if (resource_contexts.empty() == false)
527
if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
529
ha_rollback_trans(session, normal_transaction);
533
must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
535
if (! trans->no_2pc && must_2pc)
537
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538
it != resource_contexts.end() && ! error;
541
ResourceContext *resource_context= *it;
544
Do not call two-phase commit if this particular
545
transaction is read-only. This allows for simpler
546
implementation in engines that are always read-only.
548
if (! resource_context->hasModifiedData())
551
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
553
if (resource->participatesInXaTransaction())
555
if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
557
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
562
status_var_increment(session->status_var.ha_prepare_count);
568
ha_rollback_trans(session, normal_transaction);
573
error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
576
start_waiting_global_read_lock(session);
583
This function does not care about global read lock. A caller should.
585
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
588
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
589
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
591
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
593
if (resource_contexts.empty() == false)
595
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
596
it != resource_contexts.end();
600
ResourceContext *resource_context= *it;
602
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
604
if (resource->participatesInXaTransaction())
606
if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
608
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
611
else if (normal_transaction)
613
status_var_increment(session->status_var.ha_commit_count);
616
else if (resource->participatesInSqlTransaction())
618
if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
620
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
623
else if (normal_transaction)
625
status_var_increment(session->status_var.ha_commit_count);
628
resource_context->reset(); /* keep it conveniently zero-filled */
632
session->transaction.xid_state.xid.null();
634
if (normal_transaction)
636
session->variables.tx_isolation= session->session_tx_isolation;
637
session->transaction.cleanup();
646
* We commit the normal transaction by finalizing the transaction message
647
* and propogating the message to all registered replicators.
649
commitTransactionMessage(session);
655
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
658
TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
659
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
661
bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
664
We must not rollback the normal transaction if a statement
665
transaction is pending.
667
assert(session->transaction.stmt.getResourceContexts().empty() ||
668
trans == &session->transaction.stmt);
670
if (resource_contexts.empty() == false)
672
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
673
it != resource_contexts.end();
677
ResourceContext *resource_context= *it;
679
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
681
if (resource->participatesInXaTransaction())
683
if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
685
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
688
else if (normal_transaction)
690
status_var_increment(session->status_var.ha_rollback_count);
693
else if (resource->participatesInSqlTransaction())
695
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
697
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
700
else if (normal_transaction)
702
status_var_increment(session->status_var.ha_rollback_count);
705
resource_context->reset(); /* keep it conveniently zero-filled */
709
* We need to signal the ROLLBACK to ReplicationServices here
710
* BEFORE we set the transaction ID to NULL. This is because
711
* if a bulk segment was sent to replicators, we need to send
712
* a rollback statement with the corresponding transaction ID
715
rollbackTransactionMessage(session);
718
session->transaction.xid_state.xid.null();
719
if (normal_transaction)
721
session->variables.tx_isolation=session->session_tx_isolation;
722
session->transaction.cleanup();
725
if (normal_transaction)
726
session->transaction_rollback_request= false;
729
* If a non-transactional table was updated, warn the user
732
session->transaction.all.hasModifiedNonTransData() &&
733
session->killed != Session::KILL_CONNECTION)
735
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
736
ER_WARNING_NOT_COMPLETE_ROLLBACK,
737
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
744
This is used to commit or rollback a single statement depending on
748
Note that if the autocommit is on, then the following call inside
749
InnoDB will commit or rollback the whole transaction (= the statement). The
750
autocommit mechanism built into InnoDB is based on counting locks, but if
751
the user has used LOCK TABLES then that mechanism does not know to do the
754
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
756
if (session->transaction.stmt.getResourceContexts().empty() == false)
760
if (ha_commit_trans(session, false))
765
(void) ha_rollback_trans(session, false);
766
if (session->transaction_rollback_request)
767
(void) ha_rollback_trans(session, true);
770
session->variables.tx_isolation= session->session_tx_isolation;
776
return the list of XID's to a client, the same way SHOW commands do.
779
I didn't find in XA specs that an RM cannot return the same XID twice,
780
so mysql_xa_recover does not filter XID's to ensure uniqueness.
781
It can be easily fixed later, if necessary.
783
bool TransactionServices::mysql_xa_recover(Session *session)
785
List<Item> field_list;
789
field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790
field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791
field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792
field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
794
if (session->client->sendFields(&field_list))
797
pthread_mutex_lock(&LOCK_xid_cache);
798
while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
800
if (xs->xa_state==XA_PREPARED)
802
session->client->store((int64_t)xs->xid.formatID);
803
session->client->store((int64_t)xs->xid.gtrid_length);
804
session->client->store((int64_t)xs->xid.bqual_length);
805
session->client->store(xs->xid.data,
806
xs->xid.gtrid_length+xs->xid.bqual_length);
807
if (session->client->flush())
809
pthread_mutex_unlock(&LOCK_xid_cache);
815
pthread_mutex_unlock(&LOCK_xid_cache);
820
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
822
result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
824
/* The below is perfectly fine, since we're simply comparing addresses for the underlying
825
* resources aren't the same... */
826
return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
830
int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
833
TransactionContext *trans= &session->transaction.all;
834
TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
835
TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
837
trans->no_2pc= false;
839
rolling back to savepoint in all storage engines that were part of the
840
transaction when the savepoint was set
842
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
843
it != sv_resource_contexts.end();
847
ResourceContext *resource_context= *it;
849
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
851
if (resource->participatesInSqlTransaction())
853
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
855
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
860
status_var_increment(session->status_var.ha_savepoint_rollback_count);
863
trans->no_2pc|= not resource->participatesInXaTransaction();
866
rolling back the transaction in all storage engines that were not part of
867
the transaction when the savepoint was set
870
TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
871
TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
872
TransactionContext::ResourceContexts set_difference_contexts;
874
sort(sorted_tran_resource_contexts.begin(),
875
sorted_tran_resource_contexts.end(),
876
ResourceContextCompare());
877
sort(sorted_sv_resource_contexts.begin(),
878
sorted_sv_resource_contexts.end(),
879
ResourceContextCompare());
880
set_difference(sorted_tran_resource_contexts.begin(),
881
sorted_tran_resource_contexts.end(),
882
sorted_sv_resource_contexts.begin(),
883
sorted_sv_resource_contexts.end(),
884
set_difference_contexts.begin(),
885
ResourceContextCompare());
887
* set_difference_contexts now contains all resource contexts
888
* which are in the transaction context but were NOT in the
889
* savepoint's resource contexts.
892
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
893
it != set_difference_contexts.end();
896
ResourceContext *resource_context= *it;
899
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
901
if (resource->participatesInSqlTransaction())
903
if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
905
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
910
status_var_increment(session->status_var.ha_rollback_count);
913
resource_context->reset(); /* keep it conveniently zero-filled */
916
trans->setResourceContexts(sv_resource_contexts);
922
according to the sql standard (ISO/IEC 9075-2:2003)
923
section "4.33.4 SQL-statements and transaction states",
924
NamedSavepoint is *not* transaction-initiating SQL-statement
926
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
929
TransactionContext *trans= &session->transaction.all;
930
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
932
if (resource_contexts.empty() == false)
934
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
935
it != resource_contexts.end();
938
ResourceContext *resource_context= *it;
941
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
943
if (resource->participatesInSqlTransaction())
945
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
947
my_error(ER_GET_ERRNO, MYF(0), err);
952
status_var_increment(session->status_var.ha_savepoint_count);
958
Remember the list of registered storage engines.
960
sv.setResourceContexts(resource_contexts);
964
int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
968
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
970
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
971
it != resource_contexts.end();
975
ResourceContext *resource_context= *it;
977
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
979
if (resource->participatesInSqlTransaction())
981
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
983
my_error(ER_GET_ERRNO, MYF(0), err);
991
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
993
message::Transaction *transaction= in_session->getTransactionMessage();
995
if (unlikely(transaction == NULL))
998
* Allocate and initialize a new transaction message
999
* for this Session object. Session is responsible for
1000
* deleting transaction message when done with it.
1002
transaction= new (nothrow) message::Transaction();
1003
initTransactionMessage(*transaction, in_session);
1004
in_session->setTransactionMessage(transaction);
1011
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1012
Session *in_session)
1014
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015
trx->set_server_id(in_session->getServerId());
1016
trx->set_transaction_id(in_session->getQueryId());
1017
trx->set_start_timestamp(in_session->getCurrentTimestamp());
1020
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1021
Session *in_session)
1023
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1024
trx->set_end_timestamp(in_session->getCurrentTimestamp());
1027
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1028
Session *in_session)
1030
delete in_transaction;
1031
in_session->setStatementMessage(NULL);
1032
in_session->setTransactionMessage(NULL);
1035
void TransactionServices::commitTransactionMessage(Session *in_session)
1037
ReplicationServices &replication_services= ReplicationServices::singleton();
1038
if (! replication_services.isActive())
1041
/* If there is an active statement message, finalize it */
1042
message::Statement *statement= in_session->getStatementMessage();
1044
if (statement != NULL)
1046
finalizeStatementMessage(*statement, in_session);
1049
return; /* No data modification occurred inside the transaction */
1051
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1053
finalizeTransactionMessage(*transaction, in_session);
1055
replication_services.pushTransactionMessage(*transaction);
1057
cleanupTransactionMessage(transaction, in_session);
1060
void TransactionServices::initStatementMessage(message::Statement &statement,
1061
message::Statement::Type in_type,
1062
Session *in_session)
1064
statement.set_type(in_type);
1065
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1066
/** @TODO Set sql string optionally */
1069
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1070
Session *in_session)
1072
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1073
in_session->setStatementMessage(NULL);
1076
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1078
ReplicationServices &replication_services= ReplicationServices::singleton();
1079
if (! replication_services.isActive())
1082
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1085
* OK, so there are two situations that we need to deal with here:
1087
* 1) We receive an instruction to ROLLBACK the current transaction
1088
* and the currently-stored Transaction message is *self-contained*,
1089
* meaning that no Statement messages in the Transaction message
1090
* contain a message having its segment_id member greater than 1. If
1091
* no non-segment ID 1 members are found, we can simply clear the
1092
* current Transaction message and remove it from memory.
1094
* 2) If the Transaction message does indeed have a non-end segment, that
1095
* means that a bulk update/delete/insert Transaction message segment
1096
* has previously been sent over the wire to replicators. In this case,
1097
* we need to package a Transaction with a Statement message of type
1098
* ROLLBACK to indicate to replicators that previously-transmitted
1099
* messages must be un-applied.
1101
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1104
* Clear the transaction, create a Rollback statement message,
1105
* attach it to the transaction, and push it to replicators.
1107
transaction->Clear();
1108
initTransactionMessage(*transaction, in_session);
1110
message::Statement *statement= transaction->add_statement();
1112
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1113
finalizeStatementMessage(*statement, in_session);
1115
finalizeTransactionMessage(*transaction, in_session);
1117
replication_services.pushTransactionMessage(*transaction);
1119
cleanupTransactionMessage(transaction, in_session);
1122
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1125
message::Statement *statement= in_session->getStatementMessage();
1127
* We check to see if the current Statement message is of type INSERT.
1128
* If it is not, we finalize the current Statement and ensure a new
1129
* InsertStatement is created.
1131
if (statement != NULL &&
1132
statement->type() != message::Statement::INSERT)
1134
finalizeStatementMessage(*statement, in_session);
1135
statement= in_session->getStatementMessage();
1138
if (statement == NULL)
1140
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1142
* Transaction message initialized and set, but no statement created
1143
* yet. We construct one and initialize it, here, then return the
1144
* message after attaching the new Statement message pointer to the
1145
* Session for easy retrieval later...
1147
statement= transaction->add_statement();
1148
setInsertHeader(*statement, in_session, in_table);
1149
in_session->setStatementMessage(statement);
1154
void TransactionServices::setInsertHeader(message::Statement &statement,
1155
Session *in_session,
1158
initStatementMessage(statement, message::Statement::INSERT, in_session);
1161
* Now we construct the specialized InsertHeader message inside
1162
* the generalized message::Statement container...
1164
/* Set up the insert header */
1165
message::InsertHeader *header= statement.mutable_insert_header();
1166
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1169
(void) in_table->getShare()->getSchemaName(schema_name);
1171
(void) in_table->getShare()->getTableName(table_name);
1173
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1174
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1176
Field *current_field;
1177
Field **table_fields= in_table->field;
1179
message::FieldMetadata *field_metadata;
1181
/* We will read all the table's fields... */
1182
in_table->setReadSet();
1184
while ((current_field= *table_fields++) != NULL)
1186
field_metadata= header->add_field_metadata();
1187
field_metadata->set_name(current_field->field_name);
1188
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1192
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1194
ReplicationServices &replication_services= ReplicationServices::singleton();
1195
if (! replication_services.isActive())
1198
* We do this check here because we don't want to even create a
1199
* statement if there isn't a primary key on the table...
1203
* Multi-column primary keys are handled how exactly?
1205
if (in_table->s->primary_key == MAX_KEY)
1207
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1211
message::Statement &statement= getInsertStatement(in_session, in_table);
1213
message::InsertData *data= statement.mutable_insert_data();
1214
data->set_segment_id(1);
1215
data->set_end_segment(true);
1216
message::InsertRecord *record= data->add_record();
1218
Field *current_field;
1219
Field **table_fields= in_table->field;
1221
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1222
string_value->set_charset(system_charset_info);
1224
/* We will read all the table's fields... */
1225
in_table->setReadSet();
1227
while ((current_field= *table_fields++) != NULL)
1229
string_value= current_field->val_str(string_value);
1230
record->add_insert_value(string_value->c_ptr(), string_value->length());
1231
string_value->free();
1236
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1238
const unsigned char *old_record,
1239
const unsigned char *new_record)
1241
message::Statement *statement= in_session->getStatementMessage();
1243
* We check to see if the current Statement message is of type UPDATE.
1244
* If it is not, we finalize the current Statement and ensure a new
1245
* UpdateStatement is created.
1247
if (statement != NULL &&
1248
statement->type() != message::Statement::UPDATE)
1250
finalizeStatementMessage(*statement, in_session);
1251
statement= in_session->getStatementMessage();
1254
if (statement == NULL)
1256
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1258
* Transaction message initialized and set, but no statement created
1259
* yet. We construct one and initialize it, here, then return the
1260
* message after attaching the new Statement message pointer to the
1261
* Session for easy retrieval later...
1263
statement= transaction->add_statement();
1264
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1265
in_session->setStatementMessage(statement);
1270
void TransactionServices::setUpdateHeader(message::Statement &statement,
1271
Session *in_session,
1273
const unsigned char *old_record,
1274
const unsigned char *new_record)
1276
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1279
* Now we construct the specialized UpdateHeader message inside
1280
* the generalized message::Statement container...
1282
/* Set up the update header */
1283
message::UpdateHeader *header= statement.mutable_update_header();
1284
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1287
(void) in_table->getShare()->getSchemaName(schema_name);
1289
(void) in_table->getShare()->getTableName(table_name);
1291
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1292
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1294
Field *current_field;
1295
Field **table_fields= in_table->field;
1297
message::FieldMetadata *field_metadata;
1299
/* We will read all the table's fields... */
1300
in_table->setReadSet();
1302
while ((current_field= *table_fields++) != NULL)
1305
* We add the "key field metadata" -- i.e. the fields which is
1306
* the primary key for the table.
1308
if (in_table->s->fieldInPrimaryKey(current_field))
1310
field_metadata= header->add_key_field_metadata();
1311
field_metadata->set_name(current_field->field_name);
1312
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1316
* The below really should be moved into the Field API and Record API. But for now
1317
* we do this crazy pointer fiddling to figure out if the current field
1318
* has been updated in the supplied record raw byte pointers.
1320
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1321
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1323
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1325
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1327
/* Field is changed from old to new */
1328
field_metadata= header->add_set_field_metadata();
1329
field_metadata->set_name(current_field->field_name);
1330
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1334
void TransactionServices::updateRecord(Session *in_session,
1336
const unsigned char *old_record,
1337
const unsigned char *new_record)
1339
ReplicationServices &replication_services= ReplicationServices::singleton();
1340
if (! replication_services.isActive())
1343
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1345
message::UpdateData *data= statement.mutable_update_data();
1346
data->set_segment_id(1);
1347
data->set_end_segment(true);
1348
message::UpdateRecord *record= data->add_record();
1350
Field *current_field;
1351
Field **table_fields= in_table->field;
1352
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1353
string_value->set_charset(system_charset_info);
1355
while ((current_field= *table_fields++) != NULL)
1358
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1359
* but then realized that an UPDATE statement could potentially have different values for
1360
* the SET field. For instance, imagine this SQL scenario:
1362
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1363
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1364
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1366
* We will generate two UpdateRecord messages with different set_value byte arrays.
1368
* The below really should be moved into the Field API and Record API. But for now
1369
* we do this crazy pointer fiddling to figure out if the current field
1370
* has been updated in the supplied record raw byte pointers.
1372
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1373
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]);
1375
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1377
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1379
/* Store the original "read bit" for this field */
1380
bool is_read_set= current_field->isReadSet();
1382
/* We need to mark that we will "read" this field... */
1383
in_table->setReadSet(current_field->field_index);
1385
/* Read the string value of this field's contents */
1386
string_value= current_field->val_str(string_value);
1389
* Reset the read bit after reading field to its original state. This
1390
* prevents the field from being included in the WHERE clause
1392
current_field->setReadSet(is_read_set);
1394
record->add_after_value(string_value->c_ptr(), string_value->length());
1395
string_value->free();
1399
* Add the WHERE clause values now...for now, this means the
1400
* primary key field value. Replication only supports tables
1401
* with a primary key.
1403
if (in_table->s->fieldInPrimaryKey(current_field))
1406
* To say the below is ugly is an understatement. But it works.
1408
* @todo Move this crap into a real Record API.
1410
string_value= current_field->val_str(string_value,
1412
current_field->offset(const_cast<unsigned char *>(new_record)));
1413
record->add_key_value(string_value->c_ptr(), string_value->length());
1414
string_value->free();
1420
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1423
message::Statement *statement= in_session->getStatementMessage();
1425
* We check to see if the current Statement message is of type DELETE.
1426
* If it is not, we finalize the current Statement and ensure a new
1427
* DeleteStatement is created.
1429
if (statement != NULL &&
1430
statement->type() != message::Statement::DELETE)
1432
finalizeStatementMessage(*statement, in_session);
1433
statement= in_session->getStatementMessage();
1436
if (statement == NULL)
1438
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1440
* Transaction message initialized and set, but no statement created
1441
* yet. We construct one and initialize it, here, then return the
1442
* message after attaching the new Statement message pointer to the
1443
* Session for easy retrieval later...
1445
statement= transaction->add_statement();
1446
setDeleteHeader(*statement, in_session, in_table);
1447
in_session->setStatementMessage(statement);
1452
void TransactionServices::setDeleteHeader(message::Statement &statement,
1453
Session *in_session,
1456
initStatementMessage(statement, message::Statement::DELETE, in_session);
1459
* Now we construct the specialized DeleteHeader message inside
1460
* the generalized message::Statement container...
1462
message::DeleteHeader *header= statement.mutable_delete_header();
1463
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1466
(void) in_table->getShare()->getSchemaName(schema_name);
1468
(void) in_table->getShare()->getTableName(table_name);
1470
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1471
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1473
Field *current_field;
1474
Field **table_fields= in_table->field;
1476
message::FieldMetadata *field_metadata;
1478
while ((current_field= *table_fields++) != NULL)
1481
* Add the WHERE clause values now...for now, this means the
1482
* primary key field value. Replication only supports tables
1483
* with a primary key.
1485
if (in_table->s->fieldInPrimaryKey(current_field))
1487
field_metadata= header->add_key_field_metadata();
1488
field_metadata->set_name(current_field->field_name);
1489
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1494
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1496
ReplicationServices &replication_services= ReplicationServices::singleton();
1497
if (! replication_services.isActive())
1500
message::Statement &statement= getDeleteStatement(in_session, in_table);
1502
message::DeleteData *data= statement.mutable_delete_data();
1503
data->set_segment_id(1);
1504
data->set_end_segment(true);
1505
message::DeleteRecord *record= data->add_record();
1507
Field *current_field;
1508
Field **table_fields= in_table->field;
1509
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1510
string_value->set_charset(system_charset_info);
1512
while ((current_field= *table_fields++) != NULL)
1515
* Add the WHERE clause values now...for now, this means the
1516
* primary key field value. Replication only supports tables
1517
* with a primary key.
1519
if (in_table->s->fieldInPrimaryKey(current_field))
1521
string_value= current_field->val_str(string_value);
1522
record->add_key_value(string_value->c_ptr(), string_value->length());
1524
* @TODO Store optional old record value in the before data member
1526
string_value->free();
1531
void TransactionServices::createTable(Session *in_session,
1532
const message::Table &table)
1534
ReplicationServices &replication_services= ReplicationServices::singleton();
1535
if (! replication_services.isActive())
1538
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1539
message::Statement *statement= transaction->add_statement();
1541
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1544
* Construct the specialized CreateTableStatement message and attach
1545
* it to the generic Statement message
1547
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1548
message::Table *new_table_message= create_table_statement->mutable_table();
1549
*new_table_message= table;
1551
finalizeStatementMessage(*statement, in_session);
1553
finalizeTransactionMessage(*transaction, in_session);
1555
replication_services.pushTransactionMessage(*transaction);
1557
cleanupTransactionMessage(transaction, in_session);
1561
void TransactionServices::createSchema(Session *in_session,
1562
const message::Schema &schema)
1564
ReplicationServices &replication_services= ReplicationServices::singleton();
1565
if (! replication_services.isActive())
1568
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1569
message::Statement *statement= transaction->add_statement();
1571
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1574
* Construct the specialized CreateSchemaStatement message and attach
1575
* it to the generic Statement message
1577
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1578
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1579
*new_schema_message= schema;
1581
finalizeStatementMessage(*statement, in_session);
1583
finalizeTransactionMessage(*transaction, in_session);
1585
replication_services.pushTransactionMessage(*transaction);
1587
cleanupTransactionMessage(transaction, in_session);
1591
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1593
ReplicationServices &replication_services= ReplicationServices::singleton();
1594
if (! replication_services.isActive())
1597
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1598
message::Statement *statement= transaction->add_statement();
1600
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1603
* Construct the specialized DropSchemaStatement message and attach
1604
* it to the generic Statement message
1606
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1608
drop_schema_statement->set_schema_name(schema_name);
1610
finalizeStatementMessage(*statement, in_session);
1612
finalizeTransactionMessage(*transaction, in_session);
1614
replication_services.pushTransactionMessage(*transaction);
1616
cleanupTransactionMessage(transaction, in_session);
1619
void TransactionServices::dropTable(Session *in_session,
1620
const string &schema_name,
1621
const string &table_name,
1624
ReplicationServices &replication_services= ReplicationServices::singleton();
1625
if (! replication_services.isActive())
1628
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1629
message::Statement *statement= transaction->add_statement();
1631
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1634
* Construct the specialized DropTableStatement message and attach
1635
* it to the generic Statement message
1637
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1639
drop_table_statement->set_if_exists_clause(if_exists);
1641
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1643
table_metadata->set_schema_name(schema_name);
1644
table_metadata->set_table_name(table_name);
1646
finalizeStatementMessage(*statement, in_session);
1648
finalizeTransactionMessage(*transaction, in_session);
1650
replication_services.pushTransactionMessage(*transaction);
1652
cleanupTransactionMessage(transaction, in_session);
1655
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1657
ReplicationServices &replication_services= ReplicationServices::singleton();
1658
if (! replication_services.isActive())
1661
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1662
message::Statement *statement= transaction->add_statement();
1664
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1667
* Construct the specialized TruncateTableStatement message and attach
1668
* it to the generic Statement message
1670
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1671
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1674
(void) in_table->getShare()->getSchemaName(schema_name);
1676
(void) in_table->getShare()->getTableName(table_name);
1678
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1679
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1681
finalizeStatementMessage(*statement, in_session);
1683
finalizeTransactionMessage(*transaction, in_session);
1685
replication_services.pushTransactionMessage(*transaction);
1687
cleanupTransactionMessage(transaction, in_session);
1690
void TransactionServices::rawStatement(Session *in_session, const string &query)
1692
ReplicationServices &replication_services= ReplicationServices::singleton();
1693
if (! replication_services.isActive())
1696
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1697
message::Statement *statement= transaction->add_statement();
1699
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1700
statement->set_sql(query);
1701
finalizeStatementMessage(*statement, in_session);
1703
finalizeTransactionMessage(*transaction, in_session);
1705
replication_services.pushTransactionMessage(*transaction);
1707
cleanupTransactionMessage(transaction, in_session);
1710
} /* namespace drizzled */