~drizzle-trunk/drizzle/development

1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2008 Sun Microsystems
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
6
 *
7
 *  This program is free software; you can redistribute it and/or modify
8
 *  it under the terms of the GNU General Public License as published by
9
 *  the Free Software Foundation; version 2 of the License.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
21
/**
22
 * @file Transaction processing code
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
23
 *
24
 * @note
25
 *
26
 * The TransactionServices component takes internal events (for instance the start of a 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 * component and used during replication.
30
 *
31
 * The reason for this functionality is to encapsulate all communication
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 * Instead of the plugin having to understand the (often fluidly changing)
34
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 * these messages.
37
 *
38
 * @see /drizzled/message/transaction.proto
39
 *
40
 * @todo
41
 *
42
 * We really should store the raw bytes in the messages, not the
43
 * String value of the Field.  But, to do that, the
44
 * statement_transform library needs first to be updated
45
 * to include the transformation code to convert raw
46
 * Drizzle-internal Field byte representation into something
47
 * plugins can understand.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
48
 */
49
50
#include "config.h"
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
60
#include "drizzled/transaction_context.h"
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
63
#include "drizzled/resource_context.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
72
#include "drizzled/internal/my_sys.h"
73
74
using namespace std;
75
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
76
#include <vector>
77
#include <algorithm>
78
#include <functional>
79
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
80
namespace drizzled
81
{
82
83
/**
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
84
 * @defgroup Transactions
85
 *
86
 * @brief
87
 *
88
 * Transaction handling in the server
89
 *
90
 * @detail
91
 *
92
 * In each client connection, Drizzle maintains two transaction
93
 * contexts representing the state of the:
94
 *
95
 * 1) Statement Transaction
96
 * 2) Normal Transaction
97
 *
98
 * These two transaction contexts represent the transactional
99
 * state of a Session's SQL and XA transactions for a single
100
 * SQL statement or a series of SQL statements.
101
 *
102
 * When the Session's connection is in AUTOCOMMIT mode, there
103
 * is no practical difference between the statement and the
104
 * normal transaction, as each SQL statement is committed or
105
 * rolled back depending on the success or failure of the
106
 * indvidual SQL statement.
107
 *
108
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
109
 * the Session has explicitly begun a normal SQL transaction using
110
 * a BEGIN WORK/START TRANSACTION statement, then the normal
111
 * transaction context tracks the aggregate transaction state of
112
 * the SQL transaction's individual statements, and the SQL
113
 * transaction's commit or rollback is done atomically for all of
114
 * the SQL transaction's statement's data changes.
115
 *
116
 * Technically, a statement transaction can be viewed as a savepoint 
117
 * which is maintained automatically in order to make effects of one
118
 * statement atomic.
119
 *
120
 * The normal transaction is started by the user and is typically
121
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
122
 * The exception to this is that DDL statements implicitly COMMIT
123
 * any previously active normal transaction before they begin executing.
124
 *
125
 * In Drizzle, unlike MySQL, plugins other than a storage engine
126
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
127
 * plugins will automatically be monitored by Drizzle's transaction 
128
 * manager (implemented in this source file), as will all plugins which
129
 * implement plugin::XaResourceManager and register with the transaction
130
 * manager.
131
 *
132
 * If Drizzle's transaction manager sees that more than one resource
133
 * manager (transactional storage engine or XA resource manager) has modified
134
 * data state during a statement or normal transaction, the transaction
135
 * manager will automatically use a two-phase commit protocol for all
136
 * resources which support XA's distributed transaction protocol.  Unlike
137
 * MySQL, storage engines need not manually register with the transaction
138
 * manager during a statement's execution.  Previously, in MySQL, all
139
 * handlertons would have to call trans_register_ha() at some point after
140
 * modifying data state in order to have MySQL include that handler in
141
 * an XA transaction.  Drizzle does all of this grunt work behind the
142
 * scenes for the storage engine implementers.
143
 *
144
 * When a connection is closed, the current normal transaction, if
145
 * any is currently active, is rolled back.
146
 *
147
 * Transaction life cycle
148
 * ----------------------
149
 *
150
 * When a new connection is established, session->transaction
151
 * members are initialized to an empty state. If a statement uses any tables, 
152
 * all affected engines are registered in the statement engine list automatically
153
 * in plugin::StorageEngine::startStatement() and 
154
 * plugin::TransactionalStorageEngine::startTransaction().
155
 *
156
 * You can view the lifetime of a normal transaction in the following
157
 * call-sequence:
158
 *
159
 * drizzled::statement::Statement::execute()
160
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
161
 *     drizzled::TransactionServices::registerResourceForTransaction()
162
 *     drizzled::TransactionServices::registerResourceForStatement()
163
 *     drizzled::plugin::StorageEngine::startStatement()
164
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
165
 *     drizzled::plugin::StorageEngine::endStatement()
166
 *   drizzled::TransactionServices::autocommitOrRollback()
167
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
168
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
169
 *
170
 * Roles and responsibilities
171
 * --------------------------
172
 *
173
 * Beginning of SQL Statement (and Statement Transaction)
174
 * ------------------------------------------------------
175
 *
176
 * At the start of each SQL statement, for each storage engine
177
 * <strong>that is involved in the SQL statement</strong>, the kernel 
178
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
179
 * engine needs to track some data for the statement, it should use
180
 * this method invocation to initialize this data.  This is the
181
 * beginning of what is called the "statement transaction".
182
 *
183
 * <strong>For transaction storage engines (those storage engines
184
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
185
 * kernel automatically determines if the start of the SQL statement 
186
 * transaction should <em>also</em> begin the normal SQL transaction.
187
 * This occurs when the connection is in NOT in autocommit mode. If
188
 * the kernel detects this, then the kernel automatically starts the
189
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
190
 * method and then calls plugin::StorageEngine::startStatement()
191
 * afterwards.
192
 *
193
 * Beginning of an SQL "Normal" Transaction
194
 * ----------------------------------------
195
 *
196
 * As noted above, a "normal SQL transaction" may be started when
197
 * an SQL statement is started in a connection and the connection is
198
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
199
 *
200
 * In addition, when a user executes a START TRANSACTION or
201
 * BEGIN WORK statement in a connection, the kernel explicitly
202
 * calls each transactional storage engine's startTransaction() method.
203
 *
204
 * Ending of an SQL Statement (and Statement Transaction)
205
 * ------------------------------------------------------
206
 *
207
 * At the end of each SQL statement, for each of the aforementioned
208
 * involved storage engines, the kernel calls the engine's
209
 * plugin::StorageEngine::endStatement() method.  If the engine
210
 * has initialized or modified some internal data about the
211
 * statement transaction, it should use this method to reset or destroy
212
 * this data appropriately.
213
 *
214
 * Ending of an SQL "Normal" Transaction
215
 * -------------------------------------
216
 *
217
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
218
 * depending on the success or failure of the statement transaction(s) 
219
 * it encloses.
220
 *
221
 * The end of a "normal transaction" occurs when any of the following
222
 * occurs:
223
 *
224
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
225
 *    then the normal transaction which encloses the statement
226
 *    transaction ends
227
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
228
 * 3) Just before a DDL operation occurs, the kernel will implicitly
229
 *    commit the active normal transaction
230
 *
231
 * Transactions and Non-transactional Storage Engines
232
 * --------------------------------------------------
233
 *
234
 * For non-transactional engines, this call can be safely ignored, an
235
 * the kernel tracks whether a non-transactional engine has changed
236
 * any data state, and warns the user appropriately if a transaction
237
 * (statement or normal) is rolled back after such non-transactional
238
 * data changes have been made.
239
 *
240
 * XA Two-phase Commit Protocol
241
 * ----------------------------
242
 *
243
 * During statement execution, whenever any of data-modifying
244
 * PSEA API methods is used, e.g. Cursor::write_row() or
245
 * Cursor::update_row(), the read-write flag is raised in the
246
 * statement transaction for the involved engine.
247
 * Currently All PSEA calls are "traced", and the data can not be
248
 * changed in a way other than issuing a PSEA call. Important:
249
 * unless this invariant is preserved the server will not know that
250
 * a transaction in a given engine is read-write and will not
251
 * involve the two-phase commit protocol!
252
 *
253
 * At the end of a statement, TransactionServices::autocommitOrRollback()
254
 * is invoked. This call in turn
255
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
256
 * resource manager.
257
 *
258
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
259
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
260
 * is...)
261
 * 
262
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
263
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
264
 * At statement commit, the statement-related read-write engine
265
 * flag is propagated to the corresponding flag in the normal
266
 * transaction.  When the commit is complete, the list of registered
267
 * engines is cleared.
268
 *
269
 * Rollback is handled in a similar fashion.
270
 *
271
 * Additional notes on DDL and the normal transaction.
272
 * ---------------------------------------------------
273
 *
274
 * CREATE TABLE .. SELECT can start a *new* normal transaction
275
 * because of the fact that SELECTs on a transactional storage
276
 * engine participate in the normal SQL transaction (due to
277
 * isolation level issues and consistent read views).
278
 *
279
 * Behaviour of the server in this case is currently badly
280
 * defined.
281
 *
282
 * DDL statements use a form of "semantic" logging
283
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
284
 * the newly created table is deleted.
285
 * 
286
 * In addition, some DDL statements issue interim transaction
287
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
288
 * from the original table to the internal temporary table. Other
289
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
290
 * after itself.
291
 *
292
 * And finally there is a group of DDL statements such as
293
 * RENAME/DROP TABLE that doesn't start a new transaction
294
 * and doesn't commit.
295
 *
296
 * A consistent behaviour is perhaps to always commit the normal
297
 * transaction after all DDLs, just like the statement transaction
298
 * is always committed at the end of all statements.
299
 */
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
300
void TransactionServices::registerResourceForStatement(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
301
                                                       plugin::MonitoredInTransaction *monitored,
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
302
                                                       plugin::TransactionalStorageEngine *engine)
303
{
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
304
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
305
  {
306
    /* 
307
     * Now we automatically register this resource manager for the
308
     * normal transaction.  This is fine because a statement
309
     * transaction registration should always enlist the resource
310
     * in the normal transaction which contains the statement
311
     * transaction.
312
     */
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
313
    registerResourceForTransaction(session, monitored, engine);
314
  }
315
316
  TransactionContext *trans= &session->transaction.stmt;
317
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
318
319
  if (resource_context->isStarted())
320
    return; /* already registered, return */
321
322
  assert(monitored->participatesInSqlTransaction());
323
  assert(not monitored->participatesInXaTransaction());
324
325
  resource_context->setMonitored(monitored);
326
  resource_context->setTransactionalStorageEngine(engine);
327
  trans->registerResource(resource_context);
328
329
  trans->no_2pc|= true;
330
}
331
332
void TransactionServices::registerResourceForStatement(Session *session,
333
                                                       plugin::MonitoredInTransaction *monitored,
334
                                                       plugin::TransactionalStorageEngine *engine,
335
                                                       plugin::XaResourceManager *resource_manager)
336
{
337
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
338
  {
339
    /* 
340
     * Now we automatically register this resource manager for the
341
     * normal transaction.  This is fine because a statement
342
     * transaction registration should always enlist the resource
343
     * in the normal transaction which contains the statement
344
     * transaction.
345
     */
346
    registerResourceForTransaction(session, monitored, engine, resource_manager);
347
  }
348
349
  TransactionContext *trans= &session->transaction.stmt;
350
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
351
352
  if (resource_context->isStarted())
353
    return; /* already registered, return */
354
355
  assert(monitored->participatesInXaTransaction());
356
  assert(monitored->participatesInSqlTransaction());
357
358
  resource_context->setMonitored(monitored);
359
  resource_context->setTransactionalStorageEngine(engine);
360
  resource_context->setXaResourceManager(resource_manager);
361
  trans->registerResource(resource_context);
362
363
  trans->no_2pc|= false;
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
364
}
365
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
366
void TransactionServices::registerResourceForTransaction(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
367
                                                         plugin::MonitoredInTransaction *monitored,
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
368
                                                         plugin::TransactionalStorageEngine *engine)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
369
{
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
370
  TransactionContext *trans= &session->transaction.all;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
371
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
372
373
  if (resource_context->isStarted())
374
    return; /* already registered, return */
375
376
  session->server_status|= SERVER_STATUS_IN_TRANS;
377
378
  trans->registerResource(resource_context);
379
380
  assert(monitored->participatesInSqlTransaction());
381
  assert(not monitored->participatesInXaTransaction());
382
383
  resource_context->setMonitored(monitored);
384
  resource_context->setTransactionalStorageEngine(engine);
385
  trans->no_2pc|= true;
386
387
  if (session->transaction.xid_state.xid.is_null())
388
    session->transaction.xid_state.xid.set(session->getQueryId());
389
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
390
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
391
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
392
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
393
  if (! session->getResourceContext(monitored, 0)->isStarted())
394
    registerResourceForStatement(session, monitored, engine);
395
}
396
397
void TransactionServices::registerResourceForTransaction(Session *session,
398
                                                         plugin::MonitoredInTransaction *monitored,
399
                                                         plugin::TransactionalStorageEngine *engine,
400
                                                         plugin::XaResourceManager *resource_manager)
401
{
402
  TransactionContext *trans= &session->transaction.all;
403
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
404
405
  if (resource_context->isStarted())
406
    return; /* already registered, return */
407
408
  session->server_status|= SERVER_STATUS_IN_TRANS;
409
410
  trans->registerResource(resource_context);
411
412
  assert(monitored->participatesInSqlTransaction());
413
414
  resource_context->setMonitored(monitored);
415
  resource_context->setXaResourceManager(resource_manager);
416
  resource_context->setTransactionalStorageEngine(engine);
417
  trans->no_2pc|= true;
418
419
  if (session->transaction.xid_state.xid.is_null())
420
    session->transaction.xid_state.xid.set(session->getQueryId());
421
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
422
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
423
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
424
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
425
  if (! session->getResourceContext(monitored, 0)->isStarted())
426
    registerResourceForStatement(session, monitored, engine, resource_manager);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
427
}
428
429
/**
430
  @retval
431
    0   ok
432
  @retval
433
    1   transaction was rolled back
434
  @retval
435
    2   error during commit, data may be inconsistent
436
437
  @todo
438
    Since we don't support nested statement transactions in 5.0,
439
    we can't commit or rollback stmt transactions while we are inside
440
    stored functions or triggers. So we simply do nothing now.
441
    TODO: This should be fixed in later ( >= 5.1) releases.
442
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
443
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
444
{
445
  int error= 0, cookie= 0;
446
  /*
447
    'all' means that this is either an explicit commit issued by
448
    user, or an implicit commit issued by a DDL.
449
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
450
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
451
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
452
453
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
454
455
  /*
456
    We must not commit the normal transaction if a statement
457
    transaction is pending. Otherwise statement transaction
458
    flags will not get propagated to its normal transaction's
459
    counterpart.
460
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
461
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
462
              trans == &session->transaction.stmt);
463
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
464
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
465
  {
466
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
467
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
468
      rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
469
      return 1;
470
    }
471
1405.3.6 by Jay Pipes
Here, we do two main things:
472
    /*
473
     * If replication is on, we do a PREPARE on the resource managers, push the
474
     * Transaction message across the replication stream, and then COMMIT if the
475
     * replication stream returned successfully.
476
     */
477
    if (shouldConstructMessages())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
478
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
479
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
480
           it != resource_contexts.end() && ! error;
481
           ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
482
      {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
483
        ResourceContext *resource_context= *it;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
484
        int err;
485
        /*
486
          Do not call two-phase commit if this particular
487
          transaction is read-only. This allows for simpler
488
          implementation in engines that are always read-only.
489
        */
1273.1.12 by Jay Pipes
Cleanup style and documentation for ResourceContext and setTransactionReadWrite
490
        if (! resource_context->hasModifiedData())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
491
          continue;
1273.1.15 by Jay Pipes
This patch completes the first step in the splitting of
492
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
493
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
494
495
        if (resource->participatesInXaTransaction())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
496
        {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
497
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
498
          {
499
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
500
            error= 1;
501
          }
502
          else
503
          {
504
            status_var_increment(session->status_var.ha_prepare_count);
505
          }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
506
        }
507
      }
1405.3.6 by Jay Pipes
Here, we do two main things:
508
      if (error == 0 && is_real_trans)
509
      {
510
        /*
511
         * Push the constructed Transaction messages across to
512
         * replicators and appliers.
513
         */
514
        error= commitTransactionMessage(session);
515
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
516
      if (error)
517
      {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
518
        rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
519
        error= 1;
520
        goto end;
521
      }
522
    }
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
523
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
524
end:
525
    if (is_real_trans)
526
      start_waiting_global_read_lock(session);
527
  }
528
  return error;
529
}
530
531
/**
532
  @note
533
  This function does not care about global read lock. A caller should.
534
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
535
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
536
{
537
  int error=0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
538
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
539
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
540
541
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
542
543
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
544
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
545
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
546
         it != resource_contexts.end();
547
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
548
    {
549
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
550
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
551
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
552
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
553
554
      if (resource->participatesInXaTransaction())
555
      {
556
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
557
        {
558
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
559
          error= 1;
560
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
561
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
562
        {
563
          status_var_increment(session->status_var.ha_commit_count);
564
        }
565
      }
566
      else if (resource->participatesInSqlTransaction())
567
      {
568
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
569
        {
570
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
571
          error= 1;
572
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
573
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
574
        {
575
          status_var_increment(session->status_var.ha_commit_count);
576
        }
577
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
578
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
579
    }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
580
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
581
    if (is_real_trans)
582
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
583
584
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
585
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
586
      session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
587
      session->transaction.cleanup();
588
    }
589
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
590
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
591
  return error;
592
}
593
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
594
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
595
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
596
  int error= 0;
597
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
598
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
599
600
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
601
602
  /*
603
    We must not rollback the normal transaction if a statement
604
    transaction is pending.
605
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
606
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
607
              trans == &session->transaction.stmt);
608
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
609
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
610
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
611
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
612
         it != resource_contexts.end();
613
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
614
    {
615
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
616
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
617
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
618
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
619
620
      if (resource->participatesInXaTransaction())
621
      {
622
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
623
        {
624
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
625
          error= 1;
626
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
627
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
628
        {
629
          status_var_increment(session->status_var.ha_rollback_count);
630
        }
631
      }
632
      else if (resource->participatesInSqlTransaction())
633
      {
634
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
635
        {
636
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
637
          error= 1;
638
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
639
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
640
        {
641
          status_var_increment(session->status_var.ha_rollback_count);
642
        }
643
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
644
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
645
    }
646
    
647
    /* 
648
     * We need to signal the ROLLBACK to ReplicationServices here
649
     * BEFORE we set the transaction ID to NULL.  This is because
650
     * if a bulk segment was sent to replicators, we need to send
651
     * a rollback statement with the corresponding transaction ID
652
     * to rollback.
653
     */
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
654
    rollbackTransactionMessage(session);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
655
656
    if (is_real_trans)
657
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
658
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
659
    {
660
      session->variables.tx_isolation=session->session_tx_isolation;
661
      session->transaction.cleanup();
662
    }
663
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
664
  if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
665
    session->transaction_rollback_request= false;
666
667
  /*
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
668
   * If a non-transactional table was updated, warn the user
669
   */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
670
  if (is_real_trans &&
1273.1.13 by Jay Pipes
Style cleanup around TransactionContext::modified_non_trans_table and dead code removal
671
      session->transaction.all.hasModifiedNonTransData() &&
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
672
      session->killed != Session::KILL_CONNECTION)
673
  {
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
674
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
675
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
676
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
677
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
678
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
679
  return error;
680
}
681
682
/**
683
  This is used to commit or rollback a single statement depending on
684
  the value of error.
685
686
  @note
687
    Note that if the autocommit is on, then the following call inside
688
    InnoDB will commit or rollback the whole transaction (= the statement). The
689
    autocommit mechanism built into InnoDB is based on counting locks, but if
690
    the user has used LOCK TABLES then that mechanism does not know to do the
691
    commit.
692
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
693
int TransactionServices::autocommitOrRollback(Session *session, int error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
694
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
695
  if (session->transaction.stmt.getResourceContexts().empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
696
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
697
    if (! error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
698
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
699
      if (commitTransaction(session, false))
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
700
        error= 1;
701
    }
702
    else
703
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
704
      (void) rollbackTransaction(session, false);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
705
      if (session->transaction_rollback_request)
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
706
        (void) rollbackTransaction(session, true);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
707
    }
708
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
709
    session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
710
  }
711
  return error;
712
}
713
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
714
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
715
{
716
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
717
  {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
718
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
719
     * resources aren't the same... */
720
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
721
  }
722
};
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
723
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
724
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
725
{
726
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
727
  TransactionContext *trans= &session->transaction.all;
728
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
729
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
730
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
731
  trans->no_2pc= false;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
732
  /*
733
    rolling back to savepoint in all storage engines that were part of the
734
    transaction when the savepoint was set
735
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
736
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
737
       it != sv_resource_contexts.end();
738
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
739
  {
740
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
741
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
742
743
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
744
745
    if (resource->participatesInSqlTransaction())
746
    {
747
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
748
      {
749
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
750
        error= 1;
751
      }
752
      else
753
      {
754
        status_var_increment(session->status_var.ha_savepoint_rollback_count);
755
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
756
    }
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
757
    trans->no_2pc|= not resource->participatesInXaTransaction();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
758
  }
759
  /*
760
    rolling back the transaction in all storage engines that were not part of
761
    the transaction when the savepoint was set
762
  */
763
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
764
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
765
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
766
    TransactionContext::ResourceContexts set_difference_contexts;
767
768
    sort(sorted_tran_resource_contexts.begin(),
769
         sorted_tran_resource_contexts.end(),
770
         ResourceContextCompare());
771
    sort(sorted_sv_resource_contexts.begin(),
772
         sorted_sv_resource_contexts.end(),
773
         ResourceContextCompare());
774
    set_difference(sorted_tran_resource_contexts.begin(),
775
                   sorted_tran_resource_contexts.end(),
776
                   sorted_sv_resource_contexts.begin(),
777
                   sorted_sv_resource_contexts.end(),
778
                   set_difference_contexts.begin(),
779
                   ResourceContextCompare());
780
    /* 
781
     * set_difference_contexts now contains all resource contexts
782
     * which are in the transaction context but were NOT in the
783
     * savepoint's resource contexts.
784
     */
785
        
786
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
787
         it != set_difference_contexts.end();
788
         ++it)
789
    {
790
      ResourceContext *resource_context= *it;
791
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
792
793
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
794
795
      if (resource->participatesInSqlTransaction())
796
      {
797
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
798
        {
799
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
800
          error= 1;
801
        }
802
        else
803
        {
804
          status_var_increment(session->status_var.ha_rollback_count);
805
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
806
      }
807
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
808
    }
809
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
810
  trans->setResourceContexts(sv_resource_contexts);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
811
  return error;
812
}
813
814
/**
815
  @note
816
  according to the sql standard (ISO/IEC 9075-2:2003)
817
  section "4.33.4 SQL-statements and transaction states",
1273.1.4 by Jay Pipes
This patch significantly reworks the way that
818
  NamedSavepoint is *not* transaction-initiating SQL-statement
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
819
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
820
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
821
{
822
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
823
  TransactionContext *trans= &session->transaction.all;
824
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
825
826
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
827
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
828
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
829
         it != resource_contexts.end();
830
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
831
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
832
      ResourceContext *resource_context= *it;
833
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
834
835
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
836
837
      if (resource->participatesInSqlTransaction())
838
      {
839
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
840
        {
841
          my_error(ER_GET_ERRNO, MYF(0), err);
842
          error= 1;
843
        }
844
        else
845
        {
846
          status_var_increment(session->status_var.ha_savepoint_count);
847
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
848
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
849
    }
850
  }
851
  /*
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
852
    Remember the list of registered storage engines.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
853
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
854
  sv.setResourceContexts(resource_contexts);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
855
  return error;
856
}
857
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
858
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
859
{
860
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
861
862
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
863
864
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
865
       it != resource_contexts.end();
866
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
867
  {
868
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
869
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
870
871
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
872
873
    if (resource->participatesInSqlTransaction())
874
    {
875
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
876
      {
877
        my_error(ER_GET_ERRNO, MYF(0), err);
878
        error= 1;
879
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
880
    }
881
  }
882
  return error;
883
}
884
1405.4.10 by Jay Pipes
OK, Sun Studio still didn't like that...seems to think that inline means something different than other compilers think it is...
885
bool TransactionServices::shouldConstructMessages()
886
{
887
  ReplicationServices &replication_services= ReplicationServices::singleton();
888
  return replication_services.isActive();
889
}
890
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
891
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
892
{
893
  message::Transaction *transaction= in_session->getTransactionMessage();
894
895
  if (unlikely(transaction == NULL))
896
  {
897
    /* 
898
     * Allocate and initialize a new transaction message 
899
     * for this Session object.  Session is responsible for
900
     * deleting transaction message when done with it.
901
     */
902
    transaction= new (nothrow) message::Transaction();
903
    initTransactionMessage(*transaction, in_session);
904
    in_session->setTransactionMessage(transaction);
905
    return transaction;
906
  }
907
  else
908
    return transaction;
909
}
910
911
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
912
                                          Session *in_session)
913
{
914
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
915
  trx->set_server_id(in_session->getServerId());
1405.4.7 by Jay Pipes
* Fixes drizzled's atomics:
916
  trx->set_transaction_id(getNextTransactionId());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
917
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
918
}
919
920
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
921
                                              Session *in_session)
922
{
923
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
924
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
925
}
926
927
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
928
                                             Session *in_session)
929
{
930
  delete in_transaction;
931
  in_session->setStatementMessage(NULL);
932
  in_session->setTransactionMessage(NULL);
933
}
934
1405.3.6 by Jay Pipes
Here, we do two main things:
935
int TransactionServices::commitTransactionMessage(Session *in_session)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
936
{
937
  ReplicationServices &replication_services= ReplicationServices::singleton();
938
  if (! replication_services.isActive())
1405.3.6 by Jay Pipes
Here, we do two main things:
939
    return 0;
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
940
941
  /* If there is an active statement message, finalize it */
942
  message::Statement *statement= in_session->getStatementMessage();
943
944
  if (statement != NULL)
945
  {
946
    finalizeStatementMessage(*statement, in_session);
947
  }
948
  else
1405.3.6 by Jay Pipes
Here, we do two main things:
949
    return 0; /* No data modification occurred inside the transaction */
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
950
  
951
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
952
953
  finalizeTransactionMessage(*transaction, in_session);
954
  
1405.3.6 by Jay Pipes
Here, we do two main things:
955
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
956
957
  cleanupTransactionMessage(transaction, in_session);
1405.3.6 by Jay Pipes
Here, we do two main things:
958
959
  return static_cast<int>(result);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
960
}
961
962
void TransactionServices::initStatementMessage(message::Statement &statement,
963
                                        message::Statement::Type in_type,
964
                                        Session *in_session)
965
{
966
  statement.set_type(in_type);
967
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
968
  /** @TODO Set sql string optionally */
969
}
970
971
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
972
                                            Session *in_session)
973
{
974
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
975
  in_session->setStatementMessage(NULL);
976
}
977
978
void TransactionServices::rollbackTransactionMessage(Session *in_session)
979
{
980
  ReplicationServices &replication_services= ReplicationServices::singleton();
981
  if (! replication_services.isActive())
982
    return;
983
  
984
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
985
986
  /*
987
   * OK, so there are two situations that we need to deal with here:
988
   *
989
   * 1) We receive an instruction to ROLLBACK the current transaction
990
   *    and the currently-stored Transaction message is *self-contained*, 
991
   *    meaning that no Statement messages in the Transaction message
992
   *    contain a message having its segment_id member greater than 1.  If
993
   *    no non-segment ID 1 members are found, we can simply clear the
994
   *    current Transaction message and remove it from memory.
995
   *
996
   * 2) If the Transaction message does indeed have a non-end segment, that
997
   *    means that a bulk update/delete/insert Transaction message segment
998
   *    has previously been sent over the wire to replicators.  In this case, 
999
   *    we need to package a Transaction with a Statement message of type
1000
   *    ROLLBACK to indicate to replicators that previously-transmitted
1001
   *    messages must be un-applied.
1002
   */
1003
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1004
  {
1005
    /*
1006
     * Clear the transaction, create a Rollback statement message, 
1007
     * attach it to the transaction, and push it to replicators.
1008
     */
1009
    transaction->Clear();
1010
    initTransactionMessage(*transaction, in_session);
1011
1012
    message::Statement *statement= transaction->add_statement();
1013
1014
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1015
    finalizeStatementMessage(*statement, in_session);
1016
1017
    finalizeTransactionMessage(*transaction, in_session);
1018
    
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1019
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1020
  }
1021
  cleanupTransactionMessage(transaction, in_session);
1022
}
1023
1024
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1025
                                                                 Table *in_table)
1026
{
1027
  message::Statement *statement= in_session->getStatementMessage();
1028
  /*
1029
   * We check to see if the current Statement message is of type INSERT.
1030
   * If it is not, we finalize the current Statement and ensure a new
1031
   * InsertStatement is created.
1032
   */
1033
  if (statement != NULL &&
1034
      statement->type() != message::Statement::INSERT)
1035
  {
1036
    finalizeStatementMessage(*statement, in_session);
1037
    statement= in_session->getStatementMessage();
1038
  }
1039
1040
  if (statement == NULL)
1041
  {
1042
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1043
    /* 
1044
     * Transaction message initialized and set, but no statement created
1045
     * yet.  We construct one and initialize it, here, then return the
1046
     * message after attaching the new Statement message pointer to the 
1047
     * Session for easy retrieval later...
1048
     */
1049
    statement= transaction->add_statement();
1050
    setInsertHeader(*statement, in_session, in_table);
1051
    in_session->setStatementMessage(statement);
1052
  }
1053
  return *statement;
1054
}
1055
1056
void TransactionServices::setInsertHeader(message::Statement &statement,
1057
                                          Session *in_session,
1058
                                          Table *in_table)
1059
{
1060
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1061
1062
  /* 
1063
   * Now we construct the specialized InsertHeader message inside
1064
   * the generalized message::Statement container...
1065
   */
1066
  /* Set up the insert header */
1067
  message::InsertHeader *header= statement.mutable_insert_header();
1068
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1069
1336.2.3 by Jay Pipes
Merge trunk and resolve
1070
  string schema_name;
1071
  (void) in_table->getShare()->getSchemaName(schema_name);
1072
  string table_name;
1073
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1074
1336.2.3 by Jay Pipes
Merge trunk and resolve
1075
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1076
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1077
1078
  Field *current_field;
1079
  Field **table_fields= in_table->field;
1080
1081
  message::FieldMetadata *field_metadata;
1082
1083
  /* We will read all the table's fields... */
1084
  in_table->setReadSet();
1085
1086
  while ((current_field= *table_fields++) != NULL) 
1087
  {
1088
    field_metadata= header->add_field_metadata();
1089
    field_metadata->set_name(current_field->field_name);
1090
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1091
  }
1092
}
1093
1094
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1095
{
1096
  ReplicationServices &replication_services= ReplicationServices::singleton();
1097
  if (! replication_services.isActive())
1098
    return false;
1099
  /**
1100
   * We do this check here because we don't want to even create a 
1101
   * statement if there isn't a primary key on the table...
1102
   *
1103
   * @todo
1104
   *
1105
   * Multi-column primary keys are handled how exactly?
1106
   */
1107
  if (in_table->s->primary_key == MAX_KEY)
1108
  {
1109
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1110
    return true;
1111
  }
1112
1113
  message::Statement &statement= getInsertStatement(in_session, in_table);
1114
1115
  message::InsertData *data= statement.mutable_insert_data();
1116
  data->set_segment_id(1);
1117
  data->set_end_segment(true);
1118
  message::InsertRecord *record= data->add_record();
1119
1120
  Field *current_field;
1121
  Field **table_fields= in_table->field;
1122
1123
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1124
  string_value->set_charset(system_charset_info);
1125
1126
  /* We will read all the table's fields... */
1127
  in_table->setReadSet();
1128
1129
  while ((current_field= *table_fields++) != NULL) 
1130
  {
1131
    string_value= current_field->val_str(string_value);
1132
    record->add_insert_value(string_value->c_ptr(), string_value->length());
1133
    string_value->free();
1134
  }
1135
  return false;
1136
}
1137
1138
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1139
                                                            Table *in_table,
1140
                                                            const unsigned char *old_record, 
1141
                                                            const unsigned char *new_record)
1142
{
1143
  message::Statement *statement= in_session->getStatementMessage();
1144
  /*
1145
   * We check to see if the current Statement message is of type UPDATE.
1146
   * If it is not, we finalize the current Statement and ensure a new
1147
   * UpdateStatement is created.
1148
   */
1149
  if (statement != NULL &&
1150
      statement->type() != message::Statement::UPDATE)
1151
  {
1152
    finalizeStatementMessage(*statement, in_session);
1153
    statement= in_session->getStatementMessage();
1154
  }
1155
1156
  if (statement == NULL)
1157
  {
1158
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1159
    /* 
1160
     * Transaction message initialized and set, but no statement created
1161
     * yet.  We construct one and initialize it, here, then return the
1162
     * message after attaching the new Statement message pointer to the 
1163
     * Session for easy retrieval later...
1164
     */
1165
    statement= transaction->add_statement();
1166
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1167
    in_session->setStatementMessage(statement);
1168
  }
1169
  return *statement;
1170
}
1171
1172
void TransactionServices::setUpdateHeader(message::Statement &statement,
1173
                                          Session *in_session,
1174
                                          Table *in_table,
1175
                                          const unsigned char *old_record, 
1176
                                          const unsigned char *new_record)
1177
{
1178
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1179
1180
  /* 
1181
   * Now we construct the specialized UpdateHeader message inside
1182
   * the generalized message::Statement container...
1183
   */
1184
  /* Set up the update header */
1185
  message::UpdateHeader *header= statement.mutable_update_header();
1186
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1187
1336.2.3 by Jay Pipes
Merge trunk and resolve
1188
  string schema_name;
1189
  (void) in_table->getShare()->getSchemaName(schema_name);
1190
  string table_name;
1191
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1192
1336.2.3 by Jay Pipes
Merge trunk and resolve
1193
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1194
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1195
1196
  Field *current_field;
1197
  Field **table_fields= in_table->field;
1198
1199
  message::FieldMetadata *field_metadata;
1200
1201
  /* We will read all the table's fields... */
1202
  in_table->setReadSet();
1203
1204
  while ((current_field= *table_fields++) != NULL) 
1205
  {
1206
    /*
1207
     * We add the "key field metadata" -- i.e. the fields which is
1208
     * the primary key for the table.
1209
     */
1210
    if (in_table->s->fieldInPrimaryKey(current_field))
1211
    {
1212
      field_metadata= header->add_key_field_metadata();
1213
      field_metadata->set_name(current_field->field_name);
1214
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1215
    }
1216
1217
    /*
1218
     * The below really should be moved into the Field API and Record API.  But for now
1219
     * we do this crazy pointer fiddling to figure out if the current field
1220
     * has been updated in the supplied record raw byte pointers.
1221
     */
1222
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1223
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1224
1225
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1226
1227
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1228
    {
1229
      /* Field is changed from old to new */
1230
      field_metadata= header->add_set_field_metadata();
1231
      field_metadata->set_name(current_field->field_name);
1232
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1233
    }
1234
  }
1235
}
1236
void TransactionServices::updateRecord(Session *in_session,
1237
                                       Table *in_table, 
1238
                                       const unsigned char *old_record, 
1239
                                       const unsigned char *new_record)
1240
{
1241
  ReplicationServices &replication_services= ReplicationServices::singleton();
1242
  if (! replication_services.isActive())
1243
    return;
1244
1245
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
1246
1247
  message::UpdateData *data= statement.mutable_update_data();
1248
  data->set_segment_id(1);
1249
  data->set_end_segment(true);
1250
  message::UpdateRecord *record= data->add_record();
1251
1252
  Field *current_field;
1253
  Field **table_fields= in_table->field;
1254
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1255
  string_value->set_charset(system_charset_info);
1256
1257
  while ((current_field= *table_fields++) != NULL) 
1258
  {
1259
    /*
1260
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1261
     * but then realized that an UPDATE statement could potentially have different values for
1262
     * the SET field.  For instance, imagine this SQL scenario:
1263
     *
1264
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1265
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1266
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1267
     *
1268
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1269
     *
1270
     * The below really should be moved into the Field API and Record API.  But for now
1271
     * we do this crazy pointer fiddling to figure out if the current field
1272
     * has been updated in the supplied record raw byte pointers.
1273
     */
1274
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1275
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->record[0]); 
1276
1277
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1278
1279
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1280
    {
1281
      /* Store the original "read bit" for this field */
1282
      bool is_read_set= current_field->isReadSet();
1283
1284
      /* We need to mark that we will "read" this field... */
1285
      in_table->setReadSet(current_field->field_index);
1286
1287
      /* Read the string value of this field's contents */
1288
      string_value= current_field->val_str(string_value);
1289
1290
      /* 
1291
       * Reset the read bit after reading field to its original state.  This 
1292
       * prevents the field from being included in the WHERE clause
1293
       */
1294
      current_field->setReadSet(is_read_set);
1295
1296
      record->add_after_value(string_value->c_ptr(), string_value->length());
1297
      string_value->free();
1298
    }
1299
1300
    /* 
1301
     * Add the WHERE clause values now...for now, this means the
1302
     * primary key field value.  Replication only supports tables
1303
     * with a primary key.
1304
     */
1305
    if (in_table->s->fieldInPrimaryKey(current_field))
1306
    {
1307
      /**
1308
       * To say the below is ugly is an understatement. But it works.
1309
       * 
1310
       * @todo Move this crap into a real Record API.
1311
       */
1312
      string_value= current_field->val_str(string_value,
1313
                                           old_record + 
1314
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1315
      record->add_key_value(string_value->c_ptr(), string_value->length());
1316
      string_value->free();
1317
    }
1318
1319
  }
1320
}
1321
1322
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1323
                                                            Table *in_table)
1324
{
1325
  message::Statement *statement= in_session->getStatementMessage();
1326
  /*
1327
   * We check to see if the current Statement message is of type DELETE.
1328
   * If it is not, we finalize the current Statement and ensure a new
1329
   * DeleteStatement is created.
1330
   */
1331
  if (statement != NULL &&
1332
      statement->type() != message::Statement::DELETE)
1333
  {
1334
    finalizeStatementMessage(*statement, in_session);
1335
    statement= in_session->getStatementMessage();
1336
  }
1337
1338
  if (statement == NULL)
1339
  {
1340
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
1341
    /* 
1342
     * Transaction message initialized and set, but no statement created
1343
     * yet.  We construct one and initialize it, here, then return the
1344
     * message after attaching the new Statement message pointer to the 
1345
     * Session for easy retrieval later...
1346
     */
1347
    statement= transaction->add_statement();
1348
    setDeleteHeader(*statement, in_session, in_table);
1349
    in_session->setStatementMessage(statement);
1350
  }
1351
  return *statement;
1352
}
1353
1354
void TransactionServices::setDeleteHeader(message::Statement &statement,
1355
                                          Session *in_session,
1356
                                          Table *in_table)
1357
{
1358
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1359
1360
  /* 
1361
   * Now we construct the specialized DeleteHeader message inside
1362
   * the generalized message::Statement container...
1363
   */
1364
  message::DeleteHeader *header= statement.mutable_delete_header();
1365
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1366
1336.2.3 by Jay Pipes
Merge trunk and resolve
1367
  string schema_name;
1368
  (void) in_table->getShare()->getSchemaName(schema_name);
1369
  string table_name;
1370
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1371
1336.2.3 by Jay Pipes
Merge trunk and resolve
1372
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1373
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1374
1375
  Field *current_field;
1376
  Field **table_fields= in_table->field;
1377
1378
  message::FieldMetadata *field_metadata;
1379
1380
  while ((current_field= *table_fields++) != NULL) 
1381
  {
1382
    /* 
1383
     * Add the WHERE clause values now...for now, this means the
1384
     * primary key field value.  Replication only supports tables
1385
     * with a primary key.
1386
     */
1387
    if (in_table->s->fieldInPrimaryKey(current_field))
1388
    {
1389
      field_metadata= header->add_key_field_metadata();
1390
      field_metadata->set_name(current_field->field_name);
1391
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1392
    }
1393
  }
1394
}
1395
1396
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
1397
{
1398
  ReplicationServices &replication_services= ReplicationServices::singleton();
1399
  if (! replication_services.isActive())
1400
    return;
1401
1402
  message::Statement &statement= getDeleteStatement(in_session, in_table);
1403
1404
  message::DeleteData *data= statement.mutable_delete_data();
1405
  data->set_segment_id(1);
1406
  data->set_end_segment(true);
1407
  message::DeleteRecord *record= data->add_record();
1408
1409
  Field *current_field;
1410
  Field **table_fields= in_table->field;
1411
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1412
  string_value->set_charset(system_charset_info);
1413
1414
  while ((current_field= *table_fields++) != NULL) 
1415
  {
1416
    /* 
1417
     * Add the WHERE clause values now...for now, this means the
1418
     * primary key field value.  Replication only supports tables
1419
     * with a primary key.
1420
     */
1421
    if (in_table->s->fieldInPrimaryKey(current_field))
1422
    {
1423
      string_value= current_field->val_str(string_value);
1424
      record->add_key_value(string_value->c_ptr(), string_value->length());
1425
      /**
1426
       * @TODO Store optional old record value in the before data member
1427
       */
1428
      string_value->free();
1429
    }
1430
  }
1431
}
1432
1433
void TransactionServices::createTable(Session *in_session,
1434
                                      const message::Table &table)
1435
{
1436
  ReplicationServices &replication_services= ReplicationServices::singleton();
1437
  if (! replication_services.isActive())
1438
    return;
1439
  
1440
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1441
  message::Statement *statement= transaction->add_statement();
1442
1443
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1444
1445
  /* 
1446
   * Construct the specialized CreateTableStatement message and attach
1447
   * it to the generic Statement message
1448
   */
1449
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1450
  message::Table *new_table_message= create_table_statement->mutable_table();
1451
  *new_table_message= table;
1452
1453
  finalizeStatementMessage(*statement, in_session);
1454
1455
  finalizeTransactionMessage(*transaction, in_session);
1456
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1457
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1458
1459
  cleanupTransactionMessage(transaction, in_session);
1460
1461
}
1462
1463
void TransactionServices::createSchema(Session *in_session,
1464
                                       const message::Schema &schema)
1465
{
1466
  ReplicationServices &replication_services= ReplicationServices::singleton();
1467
  if (! replication_services.isActive())
1468
    return;
1469
  
1470
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1471
  message::Statement *statement= transaction->add_statement();
1472
1473
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1474
1475
  /* 
1476
   * Construct the specialized CreateSchemaStatement message and attach
1477
   * it to the generic Statement message
1478
   */
1479
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1480
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1481
  *new_schema_message= schema;
1482
1483
  finalizeStatementMessage(*statement, in_session);
1484
1485
  finalizeTransactionMessage(*transaction, in_session);
1486
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1487
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1488
1489
  cleanupTransactionMessage(transaction, in_session);
1490
1491
}
1492
1493
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1494
{
1495
  ReplicationServices &replication_services= ReplicationServices::singleton();
1496
  if (! replication_services.isActive())
1497
    return;
1498
  
1499
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1500
  message::Statement *statement= transaction->add_statement();
1501
1502
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1503
1504
  /* 
1505
   * Construct the specialized DropSchemaStatement message and attach
1506
   * it to the generic Statement message
1507
   */
1508
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1509
1510
  drop_schema_statement->set_schema_name(schema_name);
1511
1512
  finalizeStatementMessage(*statement, in_session);
1513
1514
  finalizeTransactionMessage(*transaction, in_session);
1515
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1516
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1517
1518
  cleanupTransactionMessage(transaction, in_session);
1519
}
1520
1521
void TransactionServices::dropTable(Session *in_session,
1522
                                    const string &schema_name,
1523
                                    const string &table_name,
1524
                                    bool if_exists)
1525
{
1526
  ReplicationServices &replication_services= ReplicationServices::singleton();
1527
  if (! replication_services.isActive())
1528
    return;
1529
  
1530
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1531
  message::Statement *statement= transaction->add_statement();
1532
1533
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1534
1535
  /* 
1536
   * Construct the specialized DropTableStatement message and attach
1537
   * it to the generic Statement message
1538
   */
1539
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1540
1541
  drop_table_statement->set_if_exists_clause(if_exists);
1542
1543
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1544
1545
  table_metadata->set_schema_name(schema_name);
1546
  table_metadata->set_table_name(table_name);
1547
1548
  finalizeStatementMessage(*statement, in_session);
1549
1550
  finalizeTransactionMessage(*transaction, in_session);
1551
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1552
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1553
1554
  cleanupTransactionMessage(transaction, in_session);
1555
}
1556
1557
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1558
{
1559
  ReplicationServices &replication_services= ReplicationServices::singleton();
1560
  if (! replication_services.isActive())
1561
    return;
1562
  
1563
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1564
  message::Statement *statement= transaction->add_statement();
1565
1566
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1567
1568
  /* 
1569
   * Construct the specialized TruncateTableStatement message and attach
1570
   * it to the generic Statement message
1571
   */
1572
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1573
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1574
1336.2.3 by Jay Pipes
Merge trunk and resolve
1575
  string schema_name;
1576
  (void) in_table->getShare()->getSchemaName(schema_name);
1577
  string table_name;
1578
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1579
1336.2.3 by Jay Pipes
Merge trunk and resolve
1580
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1581
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1582
1583
  finalizeStatementMessage(*statement, in_session);
1584
1585
  finalizeTransactionMessage(*transaction, in_session);
1586
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1587
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1588
1589
  cleanupTransactionMessage(transaction, in_session);
1590
}
1591
1592
void TransactionServices::rawStatement(Session *in_session, const string &query)
1593
{
1594
  ReplicationServices &replication_services= ReplicationServices::singleton();
1595
  if (! replication_services.isActive())
1596
    return;
1597
  
1598
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1599
  message::Statement *statement= transaction->add_statement();
1600
1601
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1602
  statement->set_sql(query);
1603
  finalizeStatementMessage(*statement, in_session);
1604
1605
  finalizeTransactionMessage(*transaction, in_session);
1606
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1607
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1608
1609
  cleanupTransactionMessage(transaction, in_session);
1610
}
1611
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1612
} /* namespace drizzled */