~drizzle-trunk/drizzle/development

1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2008 Sun Microsystems
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
6
 *
7
 *  This program is free software; you can redistribute it and/or modify
8
 *  it under the terms of the GNU General Public License as published by
9
 *  the Free Software Foundation; version 2 of the License.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
21
/**
22
 * @file Transaction processing code
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
23
 *
24
 * @note
25
 *
26
 * The TransactionServices component takes internal events (for instance the start of a 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 * component and used during replication.
30
 *
31
 * The reason for this functionality is to encapsulate all communication
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 * Instead of the plugin having to understand the (often fluidly changing)
34
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 * these messages.
37
 *
38
 * @see /drizzled/message/transaction.proto
39
 *
40
 * @todo
41
 *
42
 * We really should store the raw bytes in the messages, not the
43
 * String value of the Field.  But, to do that, the
44
 * statement_transform library needs first to be updated
45
 * to include the transformation code to convert raw
46
 * Drizzle-internal Field byte representation into something
47
 * plugins can understand.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
48
 */
49
50
#include "config.h"
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
60
#include "drizzled/transaction_context.h"
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
63
#include "drizzled/resource_context.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
72
#include "drizzled/internal/my_sys.h"
73
74
using namespace std;
75
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
76
#include <vector>
77
#include <algorithm>
78
#include <functional>
79
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
80
namespace drizzled
81
{
82
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
83
/** @TODO: Make this a system variable */
84
static const size_t trx_msg_threshold= 1024 * 1024;
85
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
86
/**
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
87
 * @defgroup Transactions
88
 *
89
 * @brief
90
 *
91
 * Transaction handling in the server
92
 *
93
 * @detail
94
 *
95
 * In each client connection, Drizzle maintains two transaction
96
 * contexts representing the state of the:
97
 *
98
 * 1) Statement Transaction
99
 * 2) Normal Transaction
100
 *
101
 * These two transaction contexts represent the transactional
102
 * state of a Session's SQL and XA transactions for a single
103
 * SQL statement or a series of SQL statements.
104
 *
105
 * When the Session's connection is in AUTOCOMMIT mode, there
106
 * is no practical difference between the statement and the
107
 * normal transaction, as each SQL statement is committed or
108
 * rolled back depending on the success or failure of the
109
 * indvidual SQL statement.
110
 *
111
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
 * the Session has explicitly begun a normal SQL transaction using
113
 * a BEGIN WORK/START TRANSACTION statement, then the normal
114
 * transaction context tracks the aggregate transaction state of
115
 * the SQL transaction's individual statements, and the SQL
116
 * transaction's commit or rollback is done atomically for all of
117
 * the SQL transaction's statement's data changes.
118
 *
119
 * Technically, a statement transaction can be viewed as a savepoint 
120
 * which is maintained automatically in order to make effects of one
121
 * statement atomic.
122
 *
123
 * The normal transaction is started by the user and is typically
124
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
 * The exception to this is that DDL statements implicitly COMMIT
126
 * any previously active normal transaction before they begin executing.
127
 *
128
 * In Drizzle, unlike MySQL, plugins other than a storage engine
129
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
130
 * plugins will automatically be monitored by Drizzle's transaction 
131
 * manager (implemented in this source file), as will all plugins which
132
 * implement plugin::XaResourceManager and register with the transaction
133
 * manager.
134
 *
135
 * If Drizzle's transaction manager sees that more than one resource
136
 * manager (transactional storage engine or XA resource manager) has modified
137
 * data state during a statement or normal transaction, the transaction
138
 * manager will automatically use a two-phase commit protocol for all
139
 * resources which support XA's distributed transaction protocol.  Unlike
140
 * MySQL, storage engines need not manually register with the transaction
141
 * manager during a statement's execution.  Previously, in MySQL, all
142
 * handlertons would have to call trans_register_ha() at some point after
143
 * modifying data state in order to have MySQL include that handler in
144
 * an XA transaction.  Drizzle does all of this grunt work behind the
145
 * scenes for the storage engine implementers.
146
 *
147
 * When a connection is closed, the current normal transaction, if
148
 * any is currently active, is rolled back.
149
 *
150
 * Transaction life cycle
151
 * ----------------------
152
 *
153
 * When a new connection is established, session->transaction
154
 * members are initialized to an empty state. If a statement uses any tables, 
155
 * all affected engines are registered in the statement engine list automatically
156
 * in plugin::StorageEngine::startStatement() and 
157
 * plugin::TransactionalStorageEngine::startTransaction().
158
 *
159
 * You can view the lifetime of a normal transaction in the following
160
 * call-sequence:
161
 *
162
 * drizzled::statement::Statement::execute()
163
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
 *     drizzled::TransactionServices::registerResourceForTransaction()
165
 *     drizzled::TransactionServices::registerResourceForStatement()
166
 *     drizzled::plugin::StorageEngine::startStatement()
167
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
 *     drizzled::plugin::StorageEngine::endStatement()
169
 *   drizzled::TransactionServices::autocommitOrRollback()
170
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
172
 *
173
 * Roles and responsibilities
174
 * --------------------------
175
 *
176
 * Beginning of SQL Statement (and Statement Transaction)
177
 * ------------------------------------------------------
178
 *
179
 * At the start of each SQL statement, for each storage engine
180
 * <strong>that is involved in the SQL statement</strong>, the kernel 
181
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
182
 * engine needs to track some data for the statement, it should use
183
 * this method invocation to initialize this data.  This is the
184
 * beginning of what is called the "statement transaction".
185
 *
186
 * <strong>For transaction storage engines (those storage engines
187
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
 * kernel automatically determines if the start of the SQL statement 
189
 * transaction should <em>also</em> begin the normal SQL transaction.
190
 * This occurs when the connection is in NOT in autocommit mode. If
191
 * the kernel detects this, then the kernel automatically starts the
192
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
 * method and then calls plugin::StorageEngine::startStatement()
194
 * afterwards.
195
 *
196
 * Beginning of an SQL "Normal" Transaction
197
 * ----------------------------------------
198
 *
199
 * As noted above, a "normal SQL transaction" may be started when
200
 * an SQL statement is started in a connection and the connection is
201
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
202
 *
203
 * In addition, when a user executes a START TRANSACTION or
204
 * BEGIN WORK statement in a connection, the kernel explicitly
205
 * calls each transactional storage engine's startTransaction() method.
206
 *
207
 * Ending of an SQL Statement (and Statement Transaction)
208
 * ------------------------------------------------------
209
 *
210
 * At the end of each SQL statement, for each of the aforementioned
211
 * involved storage engines, the kernel calls the engine's
212
 * plugin::StorageEngine::endStatement() method.  If the engine
213
 * has initialized or modified some internal data about the
214
 * statement transaction, it should use this method to reset or destroy
215
 * this data appropriately.
216
 *
217
 * Ending of an SQL "Normal" Transaction
218
 * -------------------------------------
219
 *
220
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
221
 * depending on the success or failure of the statement transaction(s) 
222
 * it encloses.
223
 *
224
 * The end of a "normal transaction" occurs when any of the following
225
 * occurs:
226
 *
227
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
 *    then the normal transaction which encloses the statement
229
 *    transaction ends
230
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
 * 3) Just before a DDL operation occurs, the kernel will implicitly
232
 *    commit the active normal transaction
233
 *
234
 * Transactions and Non-transactional Storage Engines
235
 * --------------------------------------------------
236
 *
237
 * For non-transactional engines, this call can be safely ignored, an
238
 * the kernel tracks whether a non-transactional engine has changed
239
 * any data state, and warns the user appropriately if a transaction
240
 * (statement or normal) is rolled back after such non-transactional
241
 * data changes have been made.
242
 *
243
 * XA Two-phase Commit Protocol
244
 * ----------------------------
245
 *
246
 * During statement execution, whenever any of data-modifying
247
 * PSEA API methods is used, e.g. Cursor::write_row() or
248
 * Cursor::update_row(), the read-write flag is raised in the
249
 * statement transaction for the involved engine.
250
 * Currently All PSEA calls are "traced", and the data can not be
251
 * changed in a way other than issuing a PSEA call. Important:
252
 * unless this invariant is preserved the server will not know that
253
 * a transaction in a given engine is read-write and will not
254
 * involve the two-phase commit protocol!
255
 *
256
 * At the end of a statement, TransactionServices::autocommitOrRollback()
257
 * is invoked. This call in turn
258
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
259
 * resource manager.
260
 *
261
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
263
 * is...)
264
 * 
265
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
 * At statement commit, the statement-related read-write engine
268
 * flag is propagated to the corresponding flag in the normal
269
 * transaction.  When the commit is complete, the list of registered
270
 * engines is cleared.
271
 *
272
 * Rollback is handled in a similar fashion.
273
 *
274
 * Additional notes on DDL and the normal transaction.
275
 * ---------------------------------------------------
276
 *
277
 * CREATE TABLE .. SELECT can start a *new* normal transaction
278
 * because of the fact that SELECTs on a transactional storage
279
 * engine participate in the normal SQL transaction (due to
280
 * isolation level issues and consistent read views).
281
 *
282
 * Behaviour of the server in this case is currently badly
283
 * defined.
284
 *
285
 * DDL statements use a form of "semantic" logging
286
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
 * the newly created table is deleted.
288
 * 
289
 * In addition, some DDL statements issue interim transaction
290
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
 * from the original table to the internal temporary table. Other
292
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
293
 * after itself.
294
 *
295
 * And finally there is a group of DDL statements such as
296
 * RENAME/DROP TABLE that doesn't start a new transaction
297
 * and doesn't commit.
298
 *
299
 * A consistent behaviour is perhaps to always commit the normal
300
 * transaction after all DDLs, just like the statement transaction
301
 * is always committed at the end of all statements.
302
 */
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
303
void TransactionServices::registerResourceForStatement(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
304
                                                       plugin::MonitoredInTransaction *monitored,
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
305
                                                       plugin::TransactionalStorageEngine *engine)
306
{
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
307
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
308
  {
309
    /* 
310
     * Now we automatically register this resource manager for the
311
     * normal transaction.  This is fine because a statement
312
     * transaction registration should always enlist the resource
313
     * in the normal transaction which contains the statement
314
     * transaction.
315
     */
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
316
    registerResourceForTransaction(session, monitored, engine);
317
  }
318
319
  TransactionContext *trans= &session->transaction.stmt;
320
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
321
322
  if (resource_context->isStarted())
323
    return; /* already registered, return */
324
325
  assert(monitored->participatesInSqlTransaction());
326
  assert(not monitored->participatesInXaTransaction());
327
328
  resource_context->setMonitored(monitored);
329
  resource_context->setTransactionalStorageEngine(engine);
330
  trans->registerResource(resource_context);
331
332
  trans->no_2pc|= true;
333
}
334
335
void TransactionServices::registerResourceForStatement(Session *session,
336
                                                       plugin::MonitoredInTransaction *monitored,
337
                                                       plugin::TransactionalStorageEngine *engine,
338
                                                       plugin::XaResourceManager *resource_manager)
339
{
340
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
341
  {
342
    /* 
343
     * Now we automatically register this resource manager for the
344
     * normal transaction.  This is fine because a statement
345
     * transaction registration should always enlist the resource
346
     * in the normal transaction which contains the statement
347
     * transaction.
348
     */
349
    registerResourceForTransaction(session, monitored, engine, resource_manager);
350
  }
351
352
  TransactionContext *trans= &session->transaction.stmt;
353
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
354
355
  if (resource_context->isStarted())
356
    return; /* already registered, return */
357
358
  assert(monitored->participatesInXaTransaction());
359
  assert(monitored->participatesInSqlTransaction());
360
361
  resource_context->setMonitored(monitored);
362
  resource_context->setTransactionalStorageEngine(engine);
363
  resource_context->setXaResourceManager(resource_manager);
364
  trans->registerResource(resource_context);
365
366
  trans->no_2pc|= false;
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
367
}
368
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
369
void TransactionServices::registerResourceForTransaction(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
370
                                                         plugin::MonitoredInTransaction *monitored,
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
371
                                                         plugin::TransactionalStorageEngine *engine)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
372
{
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
373
  TransactionContext *trans= &session->transaction.all;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
374
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
375
376
  if (resource_context->isStarted())
377
    return; /* already registered, return */
378
379
  session->server_status|= SERVER_STATUS_IN_TRANS;
380
381
  trans->registerResource(resource_context);
382
383
  assert(monitored->participatesInSqlTransaction());
384
  assert(not monitored->participatesInXaTransaction());
385
386
  resource_context->setMonitored(monitored);
387
  resource_context->setTransactionalStorageEngine(engine);
388
  trans->no_2pc|= true;
389
390
  if (session->transaction.xid_state.xid.is_null())
391
    session->transaction.xid_state.xid.set(session->getQueryId());
392
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
393
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
394
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
395
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
396
  if (! session->getResourceContext(monitored, 0)->isStarted())
397
    registerResourceForStatement(session, monitored, engine);
398
}
399
400
void TransactionServices::registerResourceForTransaction(Session *session,
401
                                                         plugin::MonitoredInTransaction *monitored,
402
                                                         plugin::TransactionalStorageEngine *engine,
403
                                                         plugin::XaResourceManager *resource_manager)
404
{
405
  TransactionContext *trans= &session->transaction.all;
406
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
407
408
  if (resource_context->isStarted())
409
    return; /* already registered, return */
410
411
  session->server_status|= SERVER_STATUS_IN_TRANS;
412
413
  trans->registerResource(resource_context);
414
415
  assert(monitored->participatesInSqlTransaction());
416
417
  resource_context->setMonitored(monitored);
418
  resource_context->setXaResourceManager(resource_manager);
419
  resource_context->setTransactionalStorageEngine(engine);
420
  trans->no_2pc|= true;
421
422
  if (session->transaction.xid_state.xid.is_null())
423
    session->transaction.xid_state.xid.set(session->getQueryId());
424
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
425
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
426
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
427
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
428
  if (! session->getResourceContext(monitored, 0)->isStarted())
429
    registerResourceForStatement(session, monitored, engine, resource_manager);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
430
}
431
432
/**
433
  @retval
434
    0   ok
435
  @retval
436
    1   transaction was rolled back
437
  @retval
438
    2   error during commit, data may be inconsistent
439
440
  @todo
441
    Since we don't support nested statement transactions in 5.0,
442
    we can't commit or rollback stmt transactions while we are inside
443
    stored functions or triggers. So we simply do nothing now.
444
    TODO: This should be fixed in later ( >= 5.1) releases.
445
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
446
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
447
{
448
  int error= 0, cookie= 0;
449
  /*
450
    'all' means that this is either an explicit commit issued by
451
    user, or an implicit commit issued by a DDL.
452
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
453
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
454
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
455
456
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
457
458
  /*
459
    We must not commit the normal transaction if a statement
460
    transaction is pending. Otherwise statement transaction
461
    flags will not get propagated to its normal transaction's
462
    counterpart.
463
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
464
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
465
              trans == &session->transaction.stmt);
466
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
467
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
468
  {
469
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
470
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
471
      rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
472
      return 1;
473
    }
474
1405.3.6 by Jay Pipes
Here, we do two main things:
475
    /*
476
     * If replication is on, we do a PREPARE on the resource managers, push the
477
     * Transaction message across the replication stream, and then COMMIT if the
478
     * replication stream returned successfully.
479
     */
480
    if (shouldConstructMessages())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
481
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
482
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
483
           it != resource_contexts.end() && ! error;
484
           ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
485
      {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
486
        ResourceContext *resource_context= *it;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
487
        int err;
488
        /*
489
          Do not call two-phase commit if this particular
490
          transaction is read-only. This allows for simpler
491
          implementation in engines that are always read-only.
492
        */
1273.1.12 by Jay Pipes
Cleanup style and documentation for ResourceContext and setTransactionReadWrite
493
        if (! resource_context->hasModifiedData())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
494
          continue;
1273.1.15 by Jay Pipes
This patch completes the first step in the splitting of
495
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
496
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
497
498
        if (resource->participatesInXaTransaction())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
499
        {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
500
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
501
          {
502
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
503
            error= 1;
504
          }
505
          else
506
          {
1689.5.1 by Joseph Daly
remove increment calls
507
            session->status_var.ha_prepare_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
508
          }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
509
        }
510
      }
1405.3.6 by Jay Pipes
Here, we do two main things:
511
      if (error == 0 && is_real_trans)
512
      {
513
        /*
514
         * Push the constructed Transaction messages across to
515
         * replicators and appliers.
516
         */
517
        error= commitTransactionMessage(session);
518
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
519
      if (error)
520
      {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
521
        rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
522
        error= 1;
523
        goto end;
524
      }
525
    }
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
526
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
527
end:
528
    if (is_real_trans)
529
      start_waiting_global_read_lock(session);
530
  }
531
  return error;
532
}
533
534
/**
535
  @note
536
  This function does not care about global read lock. A caller should.
537
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
538
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
539
{
540
  int error=0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
541
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
542
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
543
544
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
545
546
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
547
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
548
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
549
         it != resource_contexts.end();
550
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
551
    {
552
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
553
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
554
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
555
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
556
557
      if (resource->participatesInXaTransaction())
558
      {
559
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
560
        {
561
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
562
          error= 1;
563
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
564
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
565
        {
1689.5.1 by Joseph Daly
remove increment calls
566
          session->status_var.ha_commit_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
567
        }
568
      }
569
      else if (resource->participatesInSqlTransaction())
570
      {
571
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
572
        {
573
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
574
          error= 1;
575
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
576
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
577
        {
1689.5.1 by Joseph Daly
remove increment calls
578
          session->status_var.ha_commit_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
579
        }
580
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
581
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
582
    }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
583
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
584
    if (is_real_trans)
585
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
586
587
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
588
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
589
      session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
590
      session->transaction.cleanup();
591
    }
592
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
593
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
594
  return error;
595
}
596
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
597
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
598
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
599
  int error= 0;
600
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
601
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
602
603
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
604
605
  /*
606
    We must not rollback the normal transaction if a statement
607
    transaction is pending.
608
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
609
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
610
              trans == &session->transaction.stmt);
611
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
612
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
613
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
614
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
615
         it != resource_contexts.end();
616
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
617
    {
618
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
619
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
620
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
621
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
622
623
      if (resource->participatesInXaTransaction())
624
      {
625
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
626
        {
627
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
628
          error= 1;
629
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
630
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
631
        {
1689.5.1 by Joseph Daly
remove increment calls
632
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
633
        }
634
      }
635
      else if (resource->participatesInSqlTransaction())
636
      {
637
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
638
        {
639
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
640
          error= 1;
641
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
642
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
643
        {
1689.5.1 by Joseph Daly
remove increment calls
644
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
645
        }
646
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
647
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
648
    }
649
    
650
    /* 
651
     * We need to signal the ROLLBACK to ReplicationServices here
652
     * BEFORE we set the transaction ID to NULL.  This is because
653
     * if a bulk segment was sent to replicators, we need to send
654
     * a rollback statement with the corresponding transaction ID
655
     * to rollback.
656
     */
1796.3.1 by David Shrewsbury
Fix for accidentally deleting all GPB Transaction message contents recorded before a single failing statement contained within the transaction.
657
    if (normal_transaction)
658
      rollbackTransactionMessage(session);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
659
660
    if (is_real_trans)
661
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
662
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
663
    {
664
      session->variables.tx_isolation=session->session_tx_isolation;
665
      session->transaction.cleanup();
666
    }
667
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
668
  if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
669
    session->transaction_rollback_request= false;
670
671
  /*
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
672
   * If a non-transactional table was updated, warn the user
673
   */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
674
  if (is_real_trans &&
1273.1.13 by Jay Pipes
Style cleanup around TransactionContext::modified_non_trans_table and dead code removal
675
      session->transaction.all.hasModifiedNonTransData() &&
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
676
      session->killed != Session::KILL_CONNECTION)
677
  {
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
678
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
679
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
680
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
681
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
682
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
683
  return error;
684
}
685
686
/**
687
  This is used to commit or rollback a single statement depending on
688
  the value of error.
689
690
  @note
691
    Note that if the autocommit is on, then the following call inside
692
    InnoDB will commit or rollback the whole transaction (= the statement). The
693
    autocommit mechanism built into InnoDB is based on counting locks, but if
694
    the user has used LOCK TABLES then that mechanism does not know to do the
695
    commit.
696
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
697
int TransactionServices::autocommitOrRollback(Session *session, int error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
698
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
699
  if (session->transaction.stmt.getResourceContexts().empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
700
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
701
    if (! error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
702
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
703
      if (commitTransaction(session, false))
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
704
        error= 1;
705
    }
706
    else
707
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
708
      (void) rollbackTransaction(session, false);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
709
      if (session->transaction_rollback_request)
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
710
        (void) rollbackTransaction(session, true);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
711
    }
712
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
713
    session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
714
  }
715
  return error;
716
}
717
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
718
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
719
{
720
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
721
  {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
722
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
723
     * resources aren't the same... */
724
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
725
  }
726
};
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
727
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
728
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
729
{
730
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
731
  TransactionContext *trans= &session->transaction.all;
732
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
733
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
734
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
735
  trans->no_2pc= false;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
736
  /*
737
    rolling back to savepoint in all storage engines that were part of the
738
    transaction when the savepoint was set
739
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
740
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
741
       it != sv_resource_contexts.end();
742
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
743
  {
744
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
745
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
746
747
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
748
749
    if (resource->participatesInSqlTransaction())
750
    {
751
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
752
      {
753
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
754
        error= 1;
755
      }
756
      else
757
      {
1689.5.1 by Joseph Daly
remove increment calls
758
        session->status_var.ha_savepoint_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
759
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
760
    }
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
761
    trans->no_2pc|= not resource->participatesInXaTransaction();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
762
  }
763
  /*
764
    rolling back the transaction in all storage engines that were not part of
765
    the transaction when the savepoint was set
766
  */
767
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
768
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
769
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
770
    TransactionContext::ResourceContexts set_difference_contexts;
771
1491.2.3 by Jay Pipes
Fix for Bug #542299. The cause of the segfault was no pre-allocation of the target vector in set_difference(). Depending on STL implementations, set_difference() calls the STL's copy(), which requires pre-allocation of all targets and destinations. This meant the default constructor for set_difference_contexts was not adequate, and we should call vector<>::reserve() to allocate enough memory for pointers to the target elements.
772
    /* 
773
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
774
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
775
     * here
776
     */
777
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
778
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
779
    sort(sorted_tran_resource_contexts.begin(),
780
         sorted_tran_resource_contexts.end(),
781
         ResourceContextCompare());
782
    sort(sorted_sv_resource_contexts.begin(),
783
         sorted_sv_resource_contexts.end(),
784
         ResourceContextCompare());
785
    set_difference(sorted_tran_resource_contexts.begin(),
786
                   sorted_tran_resource_contexts.end(),
787
                   sorted_sv_resource_contexts.begin(),
788
                   sorted_sv_resource_contexts.end(),
789
                   set_difference_contexts.begin(),
790
                   ResourceContextCompare());
791
    /* 
792
     * set_difference_contexts now contains all resource contexts
793
     * which are in the transaction context but were NOT in the
794
     * savepoint's resource contexts.
795
     */
796
        
797
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
798
         it != set_difference_contexts.end();
799
         ++it)
800
    {
801
      ResourceContext *resource_context= *it;
802
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
803
804
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
805
806
      if (resource->participatesInSqlTransaction())
807
      {
808
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
809
        {
810
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
811
          error= 1;
812
        }
813
        else
814
        {
1689.5.1 by Joseph Daly
remove increment calls
815
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
816
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
817
      }
818
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
819
    }
820
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
821
  trans->setResourceContexts(sv_resource_contexts);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
822
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
823
  if (shouldConstructMessages())
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
824
  {
825
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
826
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
827
    if (savepoint_transaction != NULL)
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
828
    {
1762.2.5 by Joseph Daly
add descriptive comments and use savepoint_transaction_copy to determine statement size
829
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
830
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
831
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
832
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
833
      */ 
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
834
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
1762.2.5 by Joseph Daly
add descriptive comments and use savepoint_transaction_copy to determine statement size
835
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
836
      if (num_statements == 0)
837
      {    
838
        session->setStatementMessage(NULL);
839
      }    
840
      else 
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
841
      {
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
842
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
843
      }    
844
      session->setTransactionMessage(savepoint_transaction_copy);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
845
    }
846
  }
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
847
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
848
  return error;
849
}
850
851
/**
852
  @note
853
  according to the sql standard (ISO/IEC 9075-2:2003)
854
  section "4.33.4 SQL-statements and transaction states",
1273.1.4 by Jay Pipes
This patch significantly reworks the way that
855
  NamedSavepoint is *not* transaction-initiating SQL-statement
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
856
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
857
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
858
{
859
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
860
  TransactionContext *trans= &session->transaction.all;
861
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
862
863
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
864
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
865
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
866
         it != resource_contexts.end();
867
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
868
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
869
      ResourceContext *resource_context= *it;
870
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
871
872
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
873
874
      if (resource->participatesInSqlTransaction())
875
      {
876
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
877
        {
878
          my_error(ER_GET_ERRNO, MYF(0), err);
879
          error= 1;
880
        }
881
        else
882
        {
1689.5.1 by Joseph Daly
remove increment calls
883
          session->status_var.ha_savepoint_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
884
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
885
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
886
    }
887
  }
888
  /*
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
889
    Remember the list of registered storage engines.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
890
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
891
  sv.setResourceContexts(resource_contexts);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
892
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
893
  if (shouldConstructMessages())
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
894
  {
1746.7.5 by Joseph Daly
fix to get tests working with release savepoint
895
    message::Transaction *transaction= session->getTransactionMessage();
896
                  
897
    if (transaction != NULL)
898
    {
899
      message::Transaction *transaction_savepoint= 
900
        new message::Transaction(*transaction);
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
901
      sv.setTransactionMessage(transaction_savepoint);
1746.7.5 by Joseph Daly
fix to get tests working with release savepoint
902
    }
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
903
  } 
904
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
905
  return error;
906
}
907
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
908
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
909
{
910
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
911
912
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
913
914
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
915
       it != resource_contexts.end();
916
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
917
  {
918
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
919
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
920
921
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
922
923
    if (resource->participatesInSqlTransaction())
924
    {
925
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
926
      {
927
        my_error(ER_GET_ERRNO, MYF(0), err);
928
        error= 1;
929
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
930
    }
931
  }
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
932
  
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
933
  return error;
934
}
935
1405.4.10 by Jay Pipes
OK, Sun Studio still didn't like that...seems to think that inline means something different than other compilers think it is...
936
bool TransactionServices::shouldConstructMessages()
937
{
938
  ReplicationServices &replication_services= ReplicationServices::singleton();
939
  return replication_services.isActive();
940
}
941
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
942
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
943
{
944
  message::Transaction *transaction= in_session->getTransactionMessage();
945
946
  if (unlikely(transaction == NULL))
947
  {
948
    /* 
949
     * Allocate and initialize a new transaction message 
950
     * for this Session object.  Session is responsible for
951
     * deleting transaction message when done with it.
952
     */
953
    transaction= new (nothrow) message::Transaction();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
954
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
955
    in_session->setTransactionMessage(transaction);
956
    return transaction;
957
  }
958
  else
959
    return transaction;
960
}
961
962
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
963
                                                 Session *in_session,
964
                                                 bool should_inc_trx_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
965
{
966
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
967
  trx->set_server_id(in_session->getServerId());
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
968
969
  if (should_inc_trx_id)
970
    trx->set_transaction_id(getNextTransactionId());
971
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
972
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
973
}
974
975
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
976
                                              Session *in_session)
977
{
978
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
979
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
980
}
981
982
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
983
                                             Session *in_session)
984
{
985
  delete in_transaction;
986
  in_session->setStatementMessage(NULL);
987
  in_session->setTransactionMessage(NULL);
988
}
989
1405.3.6 by Jay Pipes
Here, we do two main things:
990
int TransactionServices::commitTransactionMessage(Session *in_session)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
991
{
992
  ReplicationServices &replication_services= ReplicationServices::singleton();
993
  if (! replication_services.isActive())
1405.3.6 by Jay Pipes
Here, we do two main things:
994
    return 0;
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
995
996
  /* If there is an active statement message, finalize it */
997
  message::Statement *statement= in_session->getStatementMessage();
998
999
  if (statement != NULL)
1000
  {
1001
    finalizeStatementMessage(*statement, in_session);
1002
  }
1003
  else
1405.3.6 by Jay Pipes
Here, we do two main things:
1004
    return 0; /* No data modification occurred inside the transaction */
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1005
  
1006
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1007
1008
  finalizeTransactionMessage(*transaction, in_session);
1009
  
1405.3.6 by Jay Pipes
Here, we do two main things:
1010
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1011
1012
  cleanupTransactionMessage(transaction, in_session);
1405.3.6 by Jay Pipes
Here, we do two main things:
1013
1014
  return static_cast<int>(result);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1015
}
1016
1017
void TransactionServices::initStatementMessage(message::Statement &statement,
1018
                                        message::Statement::Type in_type,
1019
                                        Session *in_session)
1020
{
1021
  statement.set_type(in_type);
1022
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1023
  /** @TODO Set sql string optionally */
1024
}
1025
1026
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1027
                                            Session *in_session)
1028
{
1029
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1030
  in_session->setStatementMessage(NULL);
1031
}
1032
1033
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1034
{
1035
  ReplicationServices &replication_services= ReplicationServices::singleton();
1036
  if (! replication_services.isActive())
1037
    return;
1038
  
1039
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1040
1041
  /*
1042
   * OK, so there are two situations that we need to deal with here:
1043
   *
1044
   * 1) We receive an instruction to ROLLBACK the current transaction
1045
   *    and the currently-stored Transaction message is *self-contained*, 
1046
   *    meaning that no Statement messages in the Transaction message
1047
   *    contain a message having its segment_id member greater than 1.  If
1048
   *    no non-segment ID 1 members are found, we can simply clear the
1049
   *    current Transaction message and remove it from memory.
1050
   *
1051
   * 2) If the Transaction message does indeed have a non-end segment, that
1052
   *    means that a bulk update/delete/insert Transaction message segment
1053
   *    has previously been sent over the wire to replicators.  In this case, 
1054
   *    we need to package a Transaction with a Statement message of type
1055
   *    ROLLBACK to indicate to replicators that previously-transmitted
1056
   *    messages must be un-applied.
1057
   */
1058
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1059
  {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1060
    /* Remember the transaction ID so we can re-use it */
1061
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1062
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1063
    /*
1064
     * Clear the transaction, create a Rollback statement message, 
1065
     * attach it to the transaction, and push it to replicators.
1066
     */
1067
    transaction->Clear();
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1068
    initTransactionMessage(*transaction, in_session, false);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1069
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1070
    /* Set the transaction ID to match the previous messages */
1071
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1072
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1073
    message::Statement *statement= transaction->add_statement();
1074
1075
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1076
    finalizeStatementMessage(*statement, in_session);
1077
1078
    finalizeTransactionMessage(*transaction, in_session);
1079
    
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1080
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1081
  }
1082
  cleanupTransactionMessage(transaction, in_session);
1083
}
1084
1085
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1086
                                                            Table *in_table,
1087
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1088
{
1089
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1090
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1091
1092
  /* 
1093
   * Check the type for the current Statement message, if it is anything
1094
   * other then INSERT we need to call finalize, this will ensure a 
1095
   * new InsertStatement is created. If it is of type INSERT check
1096
   * what table the INSERT belongs to, if it is a different table
1097
   * call finalize, so a new InsertStatement can be created. 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1098
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1099
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1100
  {
1101
    finalizeStatementMessage(*statement, in_session);
1102
    statement= in_session->getStatementMessage();
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1103
  } 
1104
  else if (statement != NULL)
1105
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1106
    transaction= getActiveTransactionMessage(in_session);
1107
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1108
    /*
1109
     * If we've passed our threshold for the statement size (possible for
1110
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1111
     * the Transaction will keep it from getting huge).
1112
     */
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1113
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1114
    {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1115
      /* Remember the transaction ID so we can re-use it */
1116
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1117
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1118
      message::InsertData *current_data= statement->mutable_insert_data();
1119
1120
      /* Caller should use this value when adding a new record */
1121
      *next_segment_id= current_data->segment_id() + 1;
1122
1123
      current_data->set_end_segment(false);
1124
1125
      /* 
1126
       * Send the trx message to replicators after finalizing the 
1127
       * statement and transaction. This will also set the Transaction
1128
       * and Statement objects in Session to NULL.
1129
       */
1130
      commitTransactionMessage(in_session);
1131
1132
      /*
1133
       * Statement and Transaction should now be NULL, so new ones will get
1134
       * created. We reuse the transaction id since we are segmenting
1135
       * one transaction.
1136
       */
1137
      statement= in_session->getStatementMessage();
1138
      transaction= getActiveTransactionMessage(in_session, false);
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1139
      assert(transaction != NULL);
1140
1141
      /* Set the transaction ID to match the previous messages */
1142
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1143
    }
1144
    else
1145
    {
1146
      const message::InsertHeader &insert_header= statement->insert_header();
1147
      string old_table_name= insert_header.table_metadata().table_name();
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1148
     
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1149
      string current_table_name;
1150
      (void) in_table->getShare()->getTableName(current_table_name);
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1151
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1152
      if (current_table_name.compare(old_table_name))
1153
      {
1154
        finalizeStatementMessage(*statement, in_session);
1155
        statement= in_session->getStatementMessage();
1156
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1157
      else
1158
      {
1159
        /* carry forward the existing segment id */
1160
        const message::InsertData &current_data= statement->insert_data();
1161
        *next_segment_id= current_data.segment_id();
1162
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1163
    }
1164
  } 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1165
1166
  if (statement == NULL)
1167
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1168
    /*
1169
     * Transaction will be non-NULL only if we had to segment it due to
1170
     * transaction size above.
1171
     */
1172
    if (transaction == NULL)
1173
      transaction= getActiveTransactionMessage(in_session);
1174
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1175
    /* 
1176
     * Transaction message initialized and set, but no statement created
1177
     * yet.  We construct one and initialize it, here, then return the
1178
     * message after attaching the new Statement message pointer to the 
1179
     * Session for easy retrieval later...
1180
     */
1181
    statement= transaction->add_statement();
1182
    setInsertHeader(*statement, in_session, in_table);
1183
    in_session->setStatementMessage(statement);
1184
  }
1185
  return *statement;
1186
}
1187
1188
void TransactionServices::setInsertHeader(message::Statement &statement,
1189
                                          Session *in_session,
1190
                                          Table *in_table)
1191
{
1192
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1193
1194
  /* 
1195
   * Now we construct the specialized InsertHeader message inside
1196
   * the generalized message::Statement container...
1197
   */
1198
  /* Set up the insert header */
1199
  message::InsertHeader *header= statement.mutable_insert_header();
1200
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1201
1336.2.3 by Jay Pipes
Merge trunk and resolve
1202
  string schema_name;
1203
  (void) in_table->getShare()->getSchemaName(schema_name);
1204
  string table_name;
1205
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1206
1336.2.3 by Jay Pipes
Merge trunk and resolve
1207
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1208
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1209
1210
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1211
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1212
1213
  message::FieldMetadata *field_metadata;
1214
1215
  /* We will read all the table's fields... */
1216
  in_table->setReadSet();
1217
1218
  while ((current_field= *table_fields++) != NULL) 
1219
  {
1220
    field_metadata= header->add_field_metadata();
1221
    field_metadata->set_name(current_field->field_name);
1222
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1223
  }
1224
}
1225
1226
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1227
{
1228
  ReplicationServices &replication_services= ReplicationServices::singleton();
1229
  if (! replication_services.isActive())
1230
    return false;
1231
  /**
1232
   * We do this check here because we don't want to even create a 
1233
   * statement if there isn't a primary key on the table...
1234
   *
1235
   * @todo
1236
   *
1237
   * Multi-column primary keys are handled how exactly?
1238
   */
1618 by Brian Aker
This is a rollup set of patches for modifications to TableIdentifier to have
1239
  if (not in_table->getShare()->hasPrimaryKey())
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1240
  {
1241
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1242
    return true;
1243
  }
1244
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1245
  uint32_t next_segment_id= 1;
1246
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1247
1248
  message::InsertData *data= statement.mutable_insert_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1249
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1250
  data->set_end_segment(true);
1251
  message::InsertRecord *record= data->add_record();
1252
1253
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1254
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1255
1256
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1257
  string_value->set_charset(system_charset_info);
1258
1259
  /* We will read all the table's fields... */
1260
  in_table->setReadSet();
1261
1262
  while ((current_field= *table_fields++) != NULL) 
1263
  {
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1264
    if (current_field->is_null())
1265
    {
1266
      record->add_is_null(true);
1667.3.7 by Joe Daly
merge trunk
1267
      record->add_insert_value("", 0);
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1268
    } 
1269
    else 
1270
    {
1271
      string_value= current_field->val_str(string_value);
1272
      record->add_is_null(false);
1273
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1274
      string_value->free();
1275
    }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1276
  }
1277
  return false;
1278
}
1279
1280
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1281
                                                            Table *in_table,
1282
                                                            const unsigned char *old_record, 
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1283
                                                            const unsigned char *new_record,
1284
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1285
{
1286
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1287
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1288
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1289
  /*
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1290
   * Check the type for the current Statement message, if it is anything
1291
   * other then UPDATE we need to call finalize, this will ensure a
1292
   * new UpdateStatement is created. If it is of type UPDATE check
1293
   * what table the UPDATE belongs to, if it is a different table
1294
   * call finalize, so a new UpdateStatement can be created.
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1295
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1296
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1297
  {
1298
    finalizeStatementMessage(*statement, in_session);
1299
    statement= in_session->getStatementMessage();
1300
  }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1301
  else if (statement != NULL)
1302
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1303
    transaction= getActiveTransactionMessage(in_session);
1304
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1305
    /*
1306
     * If we've passed our threshold for the statement size (possible for
1307
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1308
     * the Transaction will keep it from getting huge).
1309
     */
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1310
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1311
    {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1312
      /* Remember the transaction ID so we can re-use it */
1313
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1314
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1315
      message::UpdateData *current_data= statement->mutable_update_data();
1316
1317
      /* Caller should use this value when adding a new record */
1318
      *next_segment_id= current_data->segment_id() + 1;
1319
1320
      current_data->set_end_segment(false);
1321
1322
      /*
1323
       * Send the trx message to replicators after finalizing the 
1324
       * statement and transaction. This will also set the Transaction
1325
       * and Statement objects in Session to NULL.
1326
       */
1327
      commitTransactionMessage(in_session);
1328
1329
      /*
1330
       * Statement and Transaction should now be NULL, so new ones will get
1331
       * created. We reuse the transaction id since we are segmenting
1332
       * one transaction.
1333
       */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1334
      statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1335
      transaction= getActiveTransactionMessage(in_session, false);
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1336
      assert(transaction != NULL);
1337
1338
      /* Set the transaction ID to match the previous messages */
1339
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1340
    }
1341
    else
1342
    {
1343
      const message::UpdateHeader &update_header= statement->update_header();
1344
      string old_table_name= update_header.table_metadata().table_name();
1345
1346
      string current_table_name;
1347
      (void) in_table->getShare()->getTableName(current_table_name);
1348
      if (current_table_name.compare(old_table_name))
1349
      {
1350
        finalizeStatementMessage(*statement, in_session);
1351
        statement= in_session->getStatementMessage();
1352
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1353
      else
1354
      {
1355
        /* carry forward the existing segment id */
1356
        const message::UpdateData &current_data= statement->update_data();
1357
        *next_segment_id= current_data.segment_id();
1358
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1359
    }
1360
  }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1361
1362
  if (statement == NULL)
1363
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1364
    /*
1365
     * Transaction will be non-NULL only if we had to segment it due to
1366
     * transaction size above.
1367
     */
1368
    if (transaction == NULL)
1369
      transaction= getActiveTransactionMessage(in_session);
1370
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1371
    /* 
1372
     * Transaction message initialized and set, but no statement created
1373
     * yet.  We construct one and initialize it, here, then return the
1374
     * message after attaching the new Statement message pointer to the 
1375
     * Session for easy retrieval later...
1376
     */
1377
    statement= transaction->add_statement();
1378
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1379
    in_session->setStatementMessage(statement);
1380
  }
1381
  return *statement;
1382
}
1383
1384
void TransactionServices::setUpdateHeader(message::Statement &statement,
1385
                                          Session *in_session,
1386
                                          Table *in_table,
1387
                                          const unsigned char *old_record, 
1388
                                          const unsigned char *new_record)
1389
{
1390
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1391
1392
  /* 
1393
   * Now we construct the specialized UpdateHeader message inside
1394
   * the generalized message::Statement container...
1395
   */
1396
  /* Set up the update header */
1397
  message::UpdateHeader *header= statement.mutable_update_header();
1398
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1399
1336.2.3 by Jay Pipes
Merge trunk and resolve
1400
  string schema_name;
1401
  (void) in_table->getShare()->getSchemaName(schema_name);
1402
  string table_name;
1403
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1404
1336.2.3 by Jay Pipes
Merge trunk and resolve
1405
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1406
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1407
1408
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1409
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1410
1411
  message::FieldMetadata *field_metadata;
1412
1413
  /* We will read all the table's fields... */
1414
  in_table->setReadSet();
1415
1416
  while ((current_field= *table_fields++) != NULL) 
1417
  {
1418
    /*
1419
     * We add the "key field metadata" -- i.e. the fields which is
1420
     * the primary key for the table.
1421
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1422
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1423
    {
1424
      field_metadata= header->add_key_field_metadata();
1425
      field_metadata->set_name(current_field->field_name);
1426
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1427
    }
1428
1795.2.2 by Joseph Daly
add test and add isFieldUpdated function
1429
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1430
    {
1431
      /* Field is changed from old to new */
1432
      field_metadata= header->add_set_field_metadata();
1433
      field_metadata->set_name(current_field->field_name);
1434
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1435
    }
1436
  }
1437
}
1438
void TransactionServices::updateRecord(Session *in_session,
1439
                                       Table *in_table, 
1440
                                       const unsigned char *old_record, 
1441
                                       const unsigned char *new_record)
1442
{
1443
  ReplicationServices &replication_services= ReplicationServices::singleton();
1444
  if (! replication_services.isActive())
1445
    return;
1446
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1447
  uint32_t next_segment_id= 1;
1448
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1449
1450
  message::UpdateData *data= statement.mutable_update_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1451
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1452
  data->set_end_segment(true);
1453
  message::UpdateRecord *record= data->add_record();
1454
1455
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1456
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1457
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1458
  string_value->set_charset(system_charset_info);
1459
1460
  while ((current_field= *table_fields++) != NULL) 
1461
  {
1462
    /*
1463
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1464
     * but then realized that an UPDATE statement could potentially have different values for
1465
     * the SET field.  For instance, imagine this SQL scenario:
1466
     *
1467
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1468
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1469
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1470
     *
1471
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1472
     */
1795.2.2 by Joseph Daly
add test and add isFieldUpdated function
1473
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1474
    {
1475
      /* Store the original "read bit" for this field */
1476
      bool is_read_set= current_field->isReadSet();
1477
1478
      /* We need to mark that we will "read" this field... */
1479
      in_table->setReadSet(current_field->field_index);
1480
1481
      /* Read the string value of this field's contents */
1482
      string_value= current_field->val_str(string_value);
1483
1484
      /* 
1485
       * Reset the read bit after reading field to its original state.  This 
1486
       * prevents the field from being included in the WHERE clause
1487
       */
1488
      current_field->setReadSet(is_read_set);
1489
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1490
      if (current_field->is_null())
1491
      {
1492
        record->add_is_null(true);
1667.3.7 by Joe Daly
merge trunk
1493
        record->add_after_value("", 0);
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1494
      }
1495
      else
1496
      {
1497
        record->add_is_null(false);
1498
        record->add_after_value(string_value->c_ptr(), string_value->length());
1499
      }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1500
      string_value->free();
1501
    }
1502
1503
    /* 
1504
     * Add the WHERE clause values now...for now, this means the
1505
     * primary key field value.  Replication only supports tables
1506
     * with a primary key.
1507
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1508
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1509
    {
1510
      /**
1511
       * To say the below is ugly is an understatement. But it works.
1512
       * 
1513
       * @todo Move this crap into a real Record API.
1514
       */
1515
      string_value= current_field->val_str(string_value,
1516
                                           old_record + 
1517
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1518
      record->add_key_value(string_value->c_ptr(), string_value->length());
1519
      string_value->free();
1520
    }
1521
1522
  }
1523
}
1524
1795.2.2 by Joseph Daly
add test and add isFieldUpdated function
1525
bool TransactionServices::isFieldUpdated(Field *current_field,
1526
                                         Table *in_table,
1527
                                         const unsigned char *old_record,
1528
                                         const unsigned char *new_record)
1529
{
1530
  /*
1531
   * The below really should be moved into the Field API and Record API.  But for now
1532
   * we do this crazy pointer fiddling to figure out if the current field
1533
   * has been updated in the supplied record raw byte pointers.
1534
   */
1535
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1536
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1537
1538
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1539
1540
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1541
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1542
1543
  bool isUpdated= false;
1544
  if (old_value_is_null != new_value_is_null)
1545
  {
1546
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1547
    {
1548
      isUpdated= true;
1549
    }
1550
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1551
    {
1552
      isUpdated= true;
1553
    }
1554
  }
1555
1556
  if (! isUpdated)
1557
  {
1558
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1559
    {
1560
      isUpdated= true;
1561
    }
1562
  }
1563
  return isUpdated;
1564
}  
1565
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1566
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1567
                                                            Table *in_table,
1568
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1569
{
1570
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1571
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1572
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1573
  /*
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1574
   * Check the type for the current Statement message, if it is anything
1575
   * other then DELETE we need to call finalize, this will ensure a
1576
   * new DeleteStatement is created. If it is of type DELETE check
1577
   * what table the DELETE belongs to, if it is a different table
1578
   * call finalize, so a new DeleteStatement can be created.
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1579
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1580
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1581
  {
1582
    finalizeStatementMessage(*statement, in_session);
1583
    statement= in_session->getStatementMessage();
1584
  }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1585
  else if (statement != NULL)
1586
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1587
    transaction= getActiveTransactionMessage(in_session);
1588
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1589
    /*
1590
     * If we've passed our threshold for the statement size (possible for
1591
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1592
     * the Transaction will keep it from getting huge).
1593
     */
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1594
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1595
    {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1596
      /* Remember the transaction ID so we can re-use it */
1597
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1598
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1599
      message::DeleteData *current_data= statement->mutable_delete_data();
1600
1601
      /* Caller should use this value when adding a new record */
1602
      *next_segment_id= current_data->segment_id() + 1;
1603
1604
      current_data->set_end_segment(false);
1605
1606
      /* 
1607
       * Send the trx message to replicators after finalizing the 
1608
       * statement and transaction. This will also set the Transaction
1609
       * and Statement objects in Session to NULL.
1610
       */
1611
      commitTransactionMessage(in_session);
1612
1613
      /*
1614
       * Statement and Transaction should now be NULL, so new ones will get
1615
       * created. We reuse the transaction id since we are segmenting
1616
       * one transaction.
1617
       */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1618
      statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1619
      transaction= getActiveTransactionMessage(in_session, false);
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1620
      assert(transaction != NULL);
1621
1622
      /* Set the transaction ID to match the previous messages */
1623
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1624
    }
1625
    else
1626
    {
1627
      const message::DeleteHeader &delete_header= statement->delete_header();
1628
      string old_table_name= delete_header.table_metadata().table_name();
1629
1630
      string current_table_name;
1631
      (void) in_table->getShare()->getTableName(current_table_name);
1632
      if (current_table_name.compare(old_table_name))
1633
      {
1634
        finalizeStatementMessage(*statement, in_session);
1635
        statement= in_session->getStatementMessage();
1636
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1637
      else
1638
      {
1639
        /* carry forward the existing segment id */
1640
        const message::DeleteData &current_data= statement->delete_data();
1641
        *next_segment_id= current_data.segment_id();
1642
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1643
    }
1644
  }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1645
1646
  if (statement == NULL)
1647
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1648
    /*
1649
     * Transaction will be non-NULL only if we had to segment it due to
1650
     * transaction size above.
1651
     */
1652
    if (transaction == NULL)
1653
      transaction= getActiveTransactionMessage(in_session);
1654
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1655
    /* 
1656
     * Transaction message initialized and set, but no statement created
1657
     * yet.  We construct one and initialize it, here, then return the
1658
     * message after attaching the new Statement message pointer to the 
1659
     * Session for easy retrieval later...
1660
     */
1661
    statement= transaction->add_statement();
1662
    setDeleteHeader(*statement, in_session, in_table);
1663
    in_session->setStatementMessage(statement);
1664
  }
1665
  return *statement;
1666
}
1667
1668
void TransactionServices::setDeleteHeader(message::Statement &statement,
1669
                                          Session *in_session,
1670
                                          Table *in_table)
1671
{
1672
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1673
1674
  /* 
1675
   * Now we construct the specialized DeleteHeader message inside
1676
   * the generalized message::Statement container...
1677
   */
1678
  message::DeleteHeader *header= statement.mutable_delete_header();
1679
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1680
1336.2.3 by Jay Pipes
Merge trunk and resolve
1681
  string schema_name;
1682
  (void) in_table->getShare()->getSchemaName(schema_name);
1683
  string table_name;
1684
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1685
1336.2.3 by Jay Pipes
Merge trunk and resolve
1686
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1687
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1688
1689
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1690
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1691
1692
  message::FieldMetadata *field_metadata;
1693
1694
  while ((current_field= *table_fields++) != NULL) 
1695
  {
1696
    /* 
1697
     * Add the WHERE clause values now...for now, this means the
1698
     * primary key field value.  Replication only supports tables
1699
     * with a primary key.
1700
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1701
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1702
    {
1703
      field_metadata= header->add_key_field_metadata();
1704
      field_metadata->set_name(current_field->field_name);
1705
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1706
    }
1707
  }
1708
}
1709
1730.5.1 by David Shrewsbury
Use the update record, not insert record, when record DELETE operations during REPLACE.
1710
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1711
{
1712
  ReplicationServices &replication_services= ReplicationServices::singleton();
1713
  if (! replication_services.isActive())
1714
    return;
1715
1746.1.1 by David Shrewsbury
Fix uninitialized variable.
1716
  uint32_t next_segment_id= 1;
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1717
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1718
1719
  message::DeleteData *data= statement.mutable_delete_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1720
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1721
  data->set_end_segment(true);
1722
  message::DeleteRecord *record= data->add_record();
1723
1724
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1725
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1726
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1727
  string_value->set_charset(system_charset_info);
1728
1729
  while ((current_field= *table_fields++) != NULL) 
1730
  {
1667.3.5 by Joe Daly
fix up spacing again
1731
    /* 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1732
     * Add the WHERE clause values now...for now, this means the
1733
     * primary key field value.  Replication only supports tables
1734
     * with a primary key.
1735
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1736
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1737
    {
1730.5.1 by David Shrewsbury
Use the update record, not insert record, when record DELETE operations during REPLACE.
1738
      if (use_update_record)
1739
      {
1740
        /*
1741
         * Temporarily point to the update record to get its value.
1742
         * This is pretty much a hack in order to get the PK value from
1743
         * the update record rather than the insert record. Field::val_str()
1744
         * should not change anything in Field::ptr, so this should be safe.
1745
         * We are careful not to change anything in old_ptr.
1746
         */
1747
        const unsigned char *old_ptr= current_field->ptr;
1748
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1749
        string_value= current_field->val_str(string_value);
1750
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1751
      }
1752
      else
1753
      {
1754
        string_value= current_field->val_str(string_value);
1755
        /**
1756
         * @TODO Store optional old record value in the before data member
1757
         */
1758
      }
1667.3.2 by Joe Daly
add a working test case, and more fixes so the test case works
1759
      record->add_key_value(string_value->c_ptr(), string_value->length());
1760
      string_value->free();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1761
    }
1762
  }
1763
}
1764
1765
void TransactionServices::createTable(Session *in_session,
1766
                                      const message::Table &table)
1767
{
1768
  ReplicationServices &replication_services= ReplicationServices::singleton();
1769
  if (! replication_services.isActive())
1770
    return;
1771
  
1772
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1773
  message::Statement *statement= transaction->add_statement();
1774
1775
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1776
1777
  /* 
1778
   * Construct the specialized CreateTableStatement message and attach
1779
   * it to the generic Statement message
1780
   */
1781
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1782
  message::Table *new_table_message= create_table_statement->mutable_table();
1783
  *new_table_message= table;
1784
1785
  finalizeStatementMessage(*statement, in_session);
1786
1787
  finalizeTransactionMessage(*transaction, in_session);
1788
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1789
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1790
1791
  cleanupTransactionMessage(transaction, in_session);
1792
1793
}
1794
1795
void TransactionServices::createSchema(Session *in_session,
1796
                                       const message::Schema &schema)
1797
{
1798
  ReplicationServices &replication_services= ReplicationServices::singleton();
1799
  if (! replication_services.isActive())
1800
    return;
1801
  
1802
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1803
  message::Statement *statement= transaction->add_statement();
1804
1805
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1806
1807
  /* 
1808
   * Construct the specialized CreateSchemaStatement message and attach
1809
   * it to the generic Statement message
1810
   */
1811
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1812
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1813
  *new_schema_message= schema;
1814
1815
  finalizeStatementMessage(*statement, in_session);
1816
1817
  finalizeTransactionMessage(*transaction, in_session);
1818
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1819
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1820
1821
  cleanupTransactionMessage(transaction, in_session);
1822
1823
}
1824
1825
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1826
{
1827
  ReplicationServices &replication_services= ReplicationServices::singleton();
1828
  if (! replication_services.isActive())
1829
    return;
1830
  
1831
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1832
  message::Statement *statement= transaction->add_statement();
1833
1834
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1835
1836
  /* 
1837
   * Construct the specialized DropSchemaStatement message and attach
1838
   * it to the generic Statement message
1839
   */
1840
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1841
1842
  drop_schema_statement->set_schema_name(schema_name);
1843
1844
  finalizeStatementMessage(*statement, in_session);
1845
1846
  finalizeTransactionMessage(*transaction, in_session);
1847
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1848
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1849
1850
  cleanupTransactionMessage(transaction, in_session);
1851
}
1852
1853
void TransactionServices::dropTable(Session *in_session,
1854
                                    const string &schema_name,
1855
                                    const string &table_name,
1856
                                    bool if_exists)
1857
{
1858
  ReplicationServices &replication_services= ReplicationServices::singleton();
1859
  if (! replication_services.isActive())
1860
    return;
1861
  
1862
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1863
  message::Statement *statement= transaction->add_statement();
1864
1865
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1866
1867
  /* 
1868
   * Construct the specialized DropTableStatement message and attach
1869
   * it to the generic Statement message
1870
   */
1871
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1872
1873
  drop_table_statement->set_if_exists_clause(if_exists);
1874
1875
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1876
1877
  table_metadata->set_schema_name(schema_name);
1878
  table_metadata->set_table_name(table_name);
1879
1880
  finalizeStatementMessage(*statement, in_session);
1881
1882
  finalizeTransactionMessage(*transaction, in_session);
1883
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1884
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1885
1886
  cleanupTransactionMessage(transaction, in_session);
1887
}
1888
1889
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1890
{
1891
  ReplicationServices &replication_services= ReplicationServices::singleton();
1892
  if (! replication_services.isActive())
1893
    return;
1894
  
1895
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1896
  message::Statement *statement= transaction->add_statement();
1897
1898
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1899
1900
  /* 
1901
   * Construct the specialized TruncateTableStatement message and attach
1902
   * it to the generic Statement message
1903
   */
1904
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1905
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1906
1336.2.3 by Jay Pipes
Merge trunk and resolve
1907
  string schema_name;
1908
  (void) in_table->getShare()->getSchemaName(schema_name);
1909
  string table_name;
1910
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1911
1336.2.3 by Jay Pipes
Merge trunk and resolve
1912
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1913
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1914
1915
  finalizeStatementMessage(*statement, in_session);
1916
1917
  finalizeTransactionMessage(*transaction, in_session);
1918
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1919
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1920
1921
  cleanupTransactionMessage(transaction, in_session);
1922
}
1923
1924
void TransactionServices::rawStatement(Session *in_session, const string &query)
1925
{
1926
  ReplicationServices &replication_services= ReplicationServices::singleton();
1927
  if (! replication_services.isActive())
1928
    return;
1929
  
1930
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1931
  message::Statement *statement= transaction->add_statement();
1932
1933
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1934
  statement->set_sql(query);
1935
  finalizeStatementMessage(*statement, in_session);
1936
1937
  finalizeTransactionMessage(*transaction, in_session);
1938
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1939
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1940
1941
  cleanupTransactionMessage(transaction, in_session);
1942
}
1943
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1944
} /* namespace drizzled */