~drizzle-trunk/drizzle/development

1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2008 Sun Microsystems
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
6
 *
7
 *  This program is free software; you can redistribute it and/or modify
8
 *  it under the terms of the GNU General Public License as published by
9
 *  the Free Software Foundation; version 2 of the License.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
21
/**
22
 * @file Transaction processing code
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
23
 *
24
 * @note
25
 *
26
 * The TransactionServices component takes internal events (for instance the start of a 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 * component and used during replication.
30
 *
31
 * The reason for this functionality is to encapsulate all communication
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 * Instead of the plugin having to understand the (often fluidly changing)
34
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 * these messages.
37
 *
38
 * @see /drizzled/message/transaction.proto
39
 *
40
 * @todo
41
 *
42
 * We really should store the raw bytes in the messages, not the
43
 * String value of the Field.  But, to do that, the
44
 * statement_transform library needs first to be updated
45
 * to include the transformation code to convert raw
46
 * Drizzle-internal Field byte representation into something
47
 * plugins can understand.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
48
 */
49
50
#include "config.h"
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
60
#include "drizzled/transaction_context.h"
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
63
#include "drizzled/resource_context.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
72
#include "drizzled/internal/my_sys.h"
73
74
using namespace std;
75
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
76
#include <vector>
77
#include <algorithm>
78
#include <functional>
79
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
80
namespace drizzled
81
{
82
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
83
/** @TODO: Make this a system variable */
84
static const size_t trx_msg_threshold= 1024 * 1024;
85
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
86
/**
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
87
 * @defgroup Transactions
88
 *
89
 * @brief
90
 *
91
 * Transaction handling in the server
92
 *
93
 * @detail
94
 *
95
 * In each client connection, Drizzle maintains two transaction
96
 * contexts representing the state of the:
97
 *
98
 * 1) Statement Transaction
99
 * 2) Normal Transaction
100
 *
101
 * These two transaction contexts represent the transactional
102
 * state of a Session's SQL and XA transactions for a single
103
 * SQL statement or a series of SQL statements.
104
 *
105
 * When the Session's connection is in AUTOCOMMIT mode, there
106
 * is no practical difference between the statement and the
107
 * normal transaction, as each SQL statement is committed or
108
 * rolled back depending on the success or failure of the
109
 * indvidual SQL statement.
110
 *
111
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
 * the Session has explicitly begun a normal SQL transaction using
113
 * a BEGIN WORK/START TRANSACTION statement, then the normal
114
 * transaction context tracks the aggregate transaction state of
115
 * the SQL transaction's individual statements, and the SQL
116
 * transaction's commit or rollback is done atomically for all of
117
 * the SQL transaction's statement's data changes.
118
 *
119
 * Technically, a statement transaction can be viewed as a savepoint 
120
 * which is maintained automatically in order to make effects of one
121
 * statement atomic.
122
 *
123
 * The normal transaction is started by the user and is typically
124
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
 * The exception to this is that DDL statements implicitly COMMIT
126
 * any previously active normal transaction before they begin executing.
127
 *
128
 * In Drizzle, unlike MySQL, plugins other than a storage engine
129
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
130
 * plugins will automatically be monitored by Drizzle's transaction 
131
 * manager (implemented in this source file), as will all plugins which
132
 * implement plugin::XaResourceManager and register with the transaction
133
 * manager.
134
 *
135
 * If Drizzle's transaction manager sees that more than one resource
136
 * manager (transactional storage engine or XA resource manager) has modified
137
 * data state during a statement or normal transaction, the transaction
138
 * manager will automatically use a two-phase commit protocol for all
139
 * resources which support XA's distributed transaction protocol.  Unlike
140
 * MySQL, storage engines need not manually register with the transaction
141
 * manager during a statement's execution.  Previously, in MySQL, all
142
 * handlertons would have to call trans_register_ha() at some point after
143
 * modifying data state in order to have MySQL include that handler in
144
 * an XA transaction.  Drizzle does all of this grunt work behind the
145
 * scenes for the storage engine implementers.
146
 *
147
 * When a connection is closed, the current normal transaction, if
148
 * any is currently active, is rolled back.
149
 *
150
 * Transaction life cycle
151
 * ----------------------
152
 *
153
 * When a new connection is established, session->transaction
154
 * members are initialized to an empty state. If a statement uses any tables, 
155
 * all affected engines are registered in the statement engine list automatically
156
 * in plugin::StorageEngine::startStatement() and 
157
 * plugin::TransactionalStorageEngine::startTransaction().
158
 *
159
 * You can view the lifetime of a normal transaction in the following
160
 * call-sequence:
161
 *
162
 * drizzled::statement::Statement::execute()
163
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
 *     drizzled::TransactionServices::registerResourceForTransaction()
165
 *     drizzled::TransactionServices::registerResourceForStatement()
166
 *     drizzled::plugin::StorageEngine::startStatement()
167
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
 *     drizzled::plugin::StorageEngine::endStatement()
169
 *   drizzled::TransactionServices::autocommitOrRollback()
170
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
172
 *
173
 * Roles and responsibilities
174
 * --------------------------
175
 *
176
 * Beginning of SQL Statement (and Statement Transaction)
177
 * ------------------------------------------------------
178
 *
179
 * At the start of each SQL statement, for each storage engine
180
 * <strong>that is involved in the SQL statement</strong>, the kernel 
181
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
182
 * engine needs to track some data for the statement, it should use
183
 * this method invocation to initialize this data.  This is the
184
 * beginning of what is called the "statement transaction".
185
 *
186
 * <strong>For transaction storage engines (those storage engines
187
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
 * kernel automatically determines if the start of the SQL statement 
189
 * transaction should <em>also</em> begin the normal SQL transaction.
190
 * This occurs when the connection is in NOT in autocommit mode. If
191
 * the kernel detects this, then the kernel automatically starts the
192
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
 * method and then calls plugin::StorageEngine::startStatement()
194
 * afterwards.
195
 *
196
 * Beginning of an SQL "Normal" Transaction
197
 * ----------------------------------------
198
 *
199
 * As noted above, a "normal SQL transaction" may be started when
200
 * an SQL statement is started in a connection and the connection is
201
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
202
 *
203
 * In addition, when a user executes a START TRANSACTION or
204
 * BEGIN WORK statement in a connection, the kernel explicitly
205
 * calls each transactional storage engine's startTransaction() method.
206
 *
207
 * Ending of an SQL Statement (and Statement Transaction)
208
 * ------------------------------------------------------
209
 *
210
 * At the end of each SQL statement, for each of the aforementioned
211
 * involved storage engines, the kernel calls the engine's
212
 * plugin::StorageEngine::endStatement() method.  If the engine
213
 * has initialized or modified some internal data about the
214
 * statement transaction, it should use this method to reset or destroy
215
 * this data appropriately.
216
 *
217
 * Ending of an SQL "Normal" Transaction
218
 * -------------------------------------
219
 *
220
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
221
 * depending on the success or failure of the statement transaction(s) 
222
 * it encloses.
223
 *
224
 * The end of a "normal transaction" occurs when any of the following
225
 * occurs:
226
 *
227
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
 *    then the normal transaction which encloses the statement
229
 *    transaction ends
230
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
 * 3) Just before a DDL operation occurs, the kernel will implicitly
232
 *    commit the active normal transaction
233
 *
234
 * Transactions and Non-transactional Storage Engines
235
 * --------------------------------------------------
236
 *
237
 * For non-transactional engines, this call can be safely ignored, an
238
 * the kernel tracks whether a non-transactional engine has changed
239
 * any data state, and warns the user appropriately if a transaction
240
 * (statement or normal) is rolled back after such non-transactional
241
 * data changes have been made.
242
 *
243
 * XA Two-phase Commit Protocol
244
 * ----------------------------
245
 *
246
 * During statement execution, whenever any of data-modifying
247
 * PSEA API methods is used, e.g. Cursor::write_row() or
248
 * Cursor::update_row(), the read-write flag is raised in the
249
 * statement transaction for the involved engine.
250
 * Currently All PSEA calls are "traced", and the data can not be
251
 * changed in a way other than issuing a PSEA call. Important:
252
 * unless this invariant is preserved the server will not know that
253
 * a transaction in a given engine is read-write and will not
254
 * involve the two-phase commit protocol!
255
 *
256
 * At the end of a statement, TransactionServices::autocommitOrRollback()
257
 * is invoked. This call in turn
258
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
259
 * resource manager.
260
 *
261
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
263
 * is...)
264
 * 
265
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
 * At statement commit, the statement-related read-write engine
268
 * flag is propagated to the corresponding flag in the normal
269
 * transaction.  When the commit is complete, the list of registered
270
 * engines is cleared.
271
 *
272
 * Rollback is handled in a similar fashion.
273
 *
274
 * Additional notes on DDL and the normal transaction.
275
 * ---------------------------------------------------
276
 *
277
 * CREATE TABLE .. SELECT can start a *new* normal transaction
278
 * because of the fact that SELECTs on a transactional storage
279
 * engine participate in the normal SQL transaction (due to
280
 * isolation level issues and consistent read views).
281
 *
282
 * Behaviour of the server in this case is currently badly
283
 * defined.
284
 *
285
 * DDL statements use a form of "semantic" logging
286
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
 * the newly created table is deleted.
288
 * 
289
 * In addition, some DDL statements issue interim transaction
290
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
 * from the original table to the internal temporary table. Other
292
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
293
 * after itself.
294
 *
295
 * And finally there is a group of DDL statements such as
296
 * RENAME/DROP TABLE that doesn't start a new transaction
297
 * and doesn't commit.
298
 *
299
 * A consistent behaviour is perhaps to always commit the normal
300
 * transaction after all DDLs, just like the statement transaction
301
 * is always committed at the end of all statements.
302
 */
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
303
void TransactionServices::registerResourceForStatement(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
304
                                                       plugin::MonitoredInTransaction *monitored,
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
305
                                                       plugin::TransactionalStorageEngine *engine)
306
{
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
307
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
308
  {
309
    /* 
310
     * Now we automatically register this resource manager for the
311
     * normal transaction.  This is fine because a statement
312
     * transaction registration should always enlist the resource
313
     * in the normal transaction which contains the statement
314
     * transaction.
315
     */
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
316
    registerResourceForTransaction(session, monitored, engine);
317
  }
318
319
  TransactionContext *trans= &session->transaction.stmt;
320
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
321
322
  if (resource_context->isStarted())
323
    return; /* already registered, return */
324
325
  assert(monitored->participatesInSqlTransaction());
326
  assert(not monitored->participatesInXaTransaction());
327
328
  resource_context->setMonitored(monitored);
329
  resource_context->setTransactionalStorageEngine(engine);
330
  trans->registerResource(resource_context);
331
332
  trans->no_2pc|= true;
333
}
334
335
void TransactionServices::registerResourceForStatement(Session *session,
336
                                                       plugin::MonitoredInTransaction *monitored,
337
                                                       plugin::TransactionalStorageEngine *engine,
338
                                                       plugin::XaResourceManager *resource_manager)
339
{
340
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
341
  {
342
    /* 
343
     * Now we automatically register this resource manager for the
344
     * normal transaction.  This is fine because a statement
345
     * transaction registration should always enlist the resource
346
     * in the normal transaction which contains the statement
347
     * transaction.
348
     */
349
    registerResourceForTransaction(session, monitored, engine, resource_manager);
350
  }
351
352
  TransactionContext *trans= &session->transaction.stmt;
353
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
354
355
  if (resource_context->isStarted())
356
    return; /* already registered, return */
357
358
  assert(monitored->participatesInXaTransaction());
359
  assert(monitored->participatesInSqlTransaction());
360
361
  resource_context->setMonitored(monitored);
362
  resource_context->setTransactionalStorageEngine(engine);
363
  resource_context->setXaResourceManager(resource_manager);
364
  trans->registerResource(resource_context);
365
366
  trans->no_2pc|= false;
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
367
}
368
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
369
void TransactionServices::registerResourceForTransaction(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
370
                                                         plugin::MonitoredInTransaction *monitored,
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
371
                                                         plugin::TransactionalStorageEngine *engine)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
372
{
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
373
  TransactionContext *trans= &session->transaction.all;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
374
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
375
376
  if (resource_context->isStarted())
377
    return; /* already registered, return */
378
379
  session->server_status|= SERVER_STATUS_IN_TRANS;
380
381
  trans->registerResource(resource_context);
382
383
  assert(monitored->participatesInSqlTransaction());
384
  assert(not monitored->participatesInXaTransaction());
385
386
  resource_context->setMonitored(monitored);
387
  resource_context->setTransactionalStorageEngine(engine);
388
  trans->no_2pc|= true;
389
390
  if (session->transaction.xid_state.xid.is_null())
391
    session->transaction.xid_state.xid.set(session->getQueryId());
392
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
393
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
394
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
395
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
396
  if (! session->getResourceContext(monitored, 0)->isStarted())
397
    registerResourceForStatement(session, monitored, engine);
398
}
399
400
void TransactionServices::registerResourceForTransaction(Session *session,
401
                                                         plugin::MonitoredInTransaction *monitored,
402
                                                         plugin::TransactionalStorageEngine *engine,
403
                                                         plugin::XaResourceManager *resource_manager)
404
{
405
  TransactionContext *trans= &session->transaction.all;
406
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
407
408
  if (resource_context->isStarted())
409
    return; /* already registered, return */
410
411
  session->server_status|= SERVER_STATUS_IN_TRANS;
412
413
  trans->registerResource(resource_context);
414
415
  assert(monitored->participatesInSqlTransaction());
416
417
  resource_context->setMonitored(monitored);
418
  resource_context->setXaResourceManager(resource_manager);
419
  resource_context->setTransactionalStorageEngine(engine);
420
  trans->no_2pc|= true;
421
422
  if (session->transaction.xid_state.xid.is_null())
423
    session->transaction.xid_state.xid.set(session->getQueryId());
424
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
425
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
426
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
427
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
428
  if (! session->getResourceContext(monitored, 0)->isStarted())
429
    registerResourceForStatement(session, monitored, engine, resource_manager);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
430
}
431
432
/**
433
  @retval
434
    0   ok
435
  @retval
436
    1   transaction was rolled back
437
  @retval
438
    2   error during commit, data may be inconsistent
439
440
  @todo
441
    Since we don't support nested statement transactions in 5.0,
442
    we can't commit or rollback stmt transactions while we are inside
443
    stored functions or triggers. So we simply do nothing now.
444
    TODO: This should be fixed in later ( >= 5.1) releases.
445
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
446
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
447
{
448
  int error= 0, cookie= 0;
449
  /*
450
    'all' means that this is either an explicit commit issued by
451
    user, or an implicit commit issued by a DDL.
452
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
453
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
454
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
455
456
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
457
458
  /*
459
    We must not commit the normal transaction if a statement
460
    transaction is pending. Otherwise statement transaction
461
    flags will not get propagated to its normal transaction's
462
    counterpart.
463
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
464
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
465
              trans == &session->transaction.stmt);
466
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
467
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
468
  {
469
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
470
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
471
      rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
472
      return 1;
473
    }
474
1405.3.6 by Jay Pipes
Here, we do two main things:
475
    /*
476
     * If replication is on, we do a PREPARE on the resource managers, push the
477
     * Transaction message across the replication stream, and then COMMIT if the
478
     * replication stream returned successfully.
479
     */
480
    if (shouldConstructMessages())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
481
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
482
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
483
           it != resource_contexts.end() && ! error;
484
           ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
485
      {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
486
        ResourceContext *resource_context= *it;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
487
        int err;
488
        /*
489
          Do not call two-phase commit if this particular
490
          transaction is read-only. This allows for simpler
491
          implementation in engines that are always read-only.
492
        */
1273.1.12 by Jay Pipes
Cleanup style and documentation for ResourceContext and setTransactionReadWrite
493
        if (! resource_context->hasModifiedData())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
494
          continue;
1273.1.15 by Jay Pipes
This patch completes the first step in the splitting of
495
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
496
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
497
498
        if (resource->participatesInXaTransaction())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
499
        {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
500
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
501
          {
502
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
503
            error= 1;
504
          }
505
          else
506
          {
1689.5.1 by Joseph Daly
remove increment calls
507
            session->status_var.ha_prepare_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
508
          }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
509
        }
510
      }
1405.3.6 by Jay Pipes
Here, we do two main things:
511
      if (error == 0 && is_real_trans)
512
      {
513
        /*
514
         * Push the constructed Transaction messages across to
515
         * replicators and appliers.
516
         */
517
        error= commitTransactionMessage(session);
518
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
519
      if (error)
520
      {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
521
        rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
522
        error= 1;
523
        goto end;
524
      }
525
    }
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
526
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
527
end:
528
    if (is_real_trans)
529
      start_waiting_global_read_lock(session);
530
  }
531
  return error;
532
}
533
534
/**
535
  @note
536
  This function does not care about global read lock. A caller should.
537
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
538
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
539
{
540
  int error=0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
541
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
542
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
543
544
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
545
546
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
547
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
548
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
549
         it != resource_contexts.end();
550
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
551
    {
552
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
553
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
554
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
555
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
556
557
      if (resource->participatesInXaTransaction())
558
      {
559
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
560
        {
561
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
562
          error= 1;
563
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
564
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
565
        {
1689.5.1 by Joseph Daly
remove increment calls
566
          session->status_var.ha_commit_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
567
        }
568
      }
569
      else if (resource->participatesInSqlTransaction())
570
      {
571
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
572
        {
573
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
574
          error= 1;
575
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
576
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
577
        {
1689.5.1 by Joseph Daly
remove increment calls
578
          session->status_var.ha_commit_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
579
        }
580
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
581
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
582
    }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
583
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
584
    if (is_real_trans)
585
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
586
587
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
588
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
589
      session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
590
      session->transaction.cleanup();
591
    }
592
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
593
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
594
  return error;
595
}
596
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
597
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
598
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
599
  int error= 0;
600
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
601
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
602
603
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
604
605
  /*
606
    We must not rollback the normal transaction if a statement
607
    transaction is pending.
608
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
609
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
610
              trans == &session->transaction.stmt);
611
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
612
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
613
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
614
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
615
         it != resource_contexts.end();
616
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
617
    {
618
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
619
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
620
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
621
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
622
623
      if (resource->participatesInXaTransaction())
624
      {
625
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
626
        {
627
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
628
          error= 1;
629
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
630
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
631
        {
1689.5.1 by Joseph Daly
remove increment calls
632
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
633
        }
634
      }
635
      else if (resource->participatesInSqlTransaction())
636
      {
637
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
638
        {
639
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
640
          error= 1;
641
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
642
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
643
        {
1689.5.1 by Joseph Daly
remove increment calls
644
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
645
        }
646
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
647
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
648
    }
649
    
650
    /* 
651
     * We need to signal the ROLLBACK to ReplicationServices here
652
     * BEFORE we set the transaction ID to NULL.  This is because
653
     * if a bulk segment was sent to replicators, we need to send
654
     * a rollback statement with the corresponding transaction ID
655
     * to rollback.
656
     */
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
657
    rollbackTransactionMessage(session);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
658
659
    if (is_real_trans)
660
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
661
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
662
    {
663
      session->variables.tx_isolation=session->session_tx_isolation;
664
      session->transaction.cleanup();
665
    }
666
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
667
  if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
668
    session->transaction_rollback_request= false;
669
670
  /*
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
671
   * If a non-transactional table was updated, warn the user
672
   */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
673
  if (is_real_trans &&
1273.1.13 by Jay Pipes
Style cleanup around TransactionContext::modified_non_trans_table and dead code removal
674
      session->transaction.all.hasModifiedNonTransData() &&
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
675
      session->killed != Session::KILL_CONNECTION)
676
  {
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
677
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
678
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
679
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
680
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
681
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
682
  return error;
683
}
684
685
/**
686
  This is used to commit or rollback a single statement depending on
687
  the value of error.
688
689
  @note
690
    Note that if the autocommit is on, then the following call inside
691
    InnoDB will commit or rollback the whole transaction (= the statement). The
692
    autocommit mechanism built into InnoDB is based on counting locks, but if
693
    the user has used LOCK TABLES then that mechanism does not know to do the
694
    commit.
695
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
696
int TransactionServices::autocommitOrRollback(Session *session, int error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
697
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
698
  if (session->transaction.stmt.getResourceContexts().empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
699
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
700
    if (! error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
701
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
702
      if (commitTransaction(session, false))
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
703
        error= 1;
704
    }
705
    else
706
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
707
      (void) rollbackTransaction(session, false);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
708
      if (session->transaction_rollback_request)
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
709
        (void) rollbackTransaction(session, true);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
710
    }
711
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
712
    session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
713
  }
714
  return error;
715
}
716
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
717
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
718
{
719
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
720
  {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
721
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
722
     * resources aren't the same... */
723
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
724
  }
725
};
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
726
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
727
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
728
{
729
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
730
  TransactionContext *trans= &session->transaction.all;
731
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
732
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
733
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
734
  trans->no_2pc= false;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
735
  /*
736
    rolling back to savepoint in all storage engines that were part of the
737
    transaction when the savepoint was set
738
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
739
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
740
       it != sv_resource_contexts.end();
741
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
742
  {
743
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
744
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
745
746
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
747
748
    if (resource->participatesInSqlTransaction())
749
    {
750
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
751
      {
752
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
753
        error= 1;
754
      }
755
      else
756
      {
1689.5.1 by Joseph Daly
remove increment calls
757
        session->status_var.ha_savepoint_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
758
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
759
    }
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
760
    trans->no_2pc|= not resource->participatesInXaTransaction();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
761
  }
762
  /*
763
    rolling back the transaction in all storage engines that were not part of
764
    the transaction when the savepoint was set
765
  */
766
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
767
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
768
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
769
    TransactionContext::ResourceContexts set_difference_contexts;
770
1491.2.3 by Jay Pipes
Fix for Bug #542299. The cause of the segfault was no pre-allocation of the target vector in set_difference(). Depending on STL implementations, set_difference() calls the STL's copy(), which requires pre-allocation of all targets and destinations. This meant the default constructor for set_difference_contexts was not adequate, and we should call vector<>::reserve() to allocate enough memory for pointers to the target elements.
771
    /* 
772
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
773
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
774
     * here
775
     */
776
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
777
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
778
    sort(sorted_tran_resource_contexts.begin(),
779
         sorted_tran_resource_contexts.end(),
780
         ResourceContextCompare());
781
    sort(sorted_sv_resource_contexts.begin(),
782
         sorted_sv_resource_contexts.end(),
783
         ResourceContextCompare());
784
    set_difference(sorted_tran_resource_contexts.begin(),
785
                   sorted_tran_resource_contexts.end(),
786
                   sorted_sv_resource_contexts.begin(),
787
                   sorted_sv_resource_contexts.end(),
788
                   set_difference_contexts.begin(),
789
                   ResourceContextCompare());
790
    /* 
791
     * set_difference_contexts now contains all resource contexts
792
     * which are in the transaction context but were NOT in the
793
     * savepoint's resource contexts.
794
     */
795
        
796
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
797
         it != set_difference_contexts.end();
798
         ++it)
799
    {
800
      ResourceContext *resource_context= *it;
801
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
802
803
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
804
805
      if (resource->participatesInSqlTransaction())
806
      {
807
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
808
        {
809
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
810
          error= 1;
811
        }
812
        else
813
        {
1689.5.1 by Joseph Daly
remove increment calls
814
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
815
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
816
      }
817
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
818
    }
819
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
820
  trans->setResourceContexts(sv_resource_contexts);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
821
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
822
  if (shouldConstructMessages())
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
823
  {
824
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
825
    message::Transaction *savepoint_transaction= sv.getTransactionSavepoint();
826
827
    google::protobuf::RepeatedPtrField< message::Statement> *statements= 
828
      savepoint_transaction->mutable_statement();
829
830
    /* A iterator is used here rather then the other GPB functions as there needs
1746.7.3 by Joseph Daly
add cleanup code
831
       to be a check for the case where there are no statements (a NULL value) */
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
832
    if (statements != NULL)
833
    {
834
      google::protobuf::RepeatedPtrField< message::Statement>::iterator it= 
835
        statements->begin();
836
      google::protobuf::RepeatedPtrField< message::Statement>::iterator end_it= 
837
        statements->end();
838
 
839
      message::Statement *new_statement= NULL;
840
      
841
      for (; it != end_it; ++it)
842
      {
843
        new_statement= &*it;
844
      }
845
846
      session->setTransactionMessage(savepoint_transaction);
847
      session->setStatementMessage(new_statement);
848
    }
849
  }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
850
  return error;
851
}
852
853
/**
854
  @note
855
  according to the sql standard (ISO/IEC 9075-2:2003)
856
  section "4.33.4 SQL-statements and transaction states",
1273.1.4 by Jay Pipes
This patch significantly reworks the way that
857
  NamedSavepoint is *not* transaction-initiating SQL-statement
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
858
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
859
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
860
{
861
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
862
  TransactionContext *trans= &session->transaction.all;
863
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
864
865
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
866
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
867
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
868
         it != resource_contexts.end();
869
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
870
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
871
      ResourceContext *resource_context= *it;
872
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
873
874
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
875
876
      if (resource->participatesInSqlTransaction())
877
      {
878
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
879
        {
880
          my_error(ER_GET_ERRNO, MYF(0), err);
881
          error= 1;
882
        }
883
        else
884
        {
1689.5.1 by Joseph Daly
remove increment calls
885
          session->status_var.ha_savepoint_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
886
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
887
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
888
    }
889
  }
890
  /*
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
891
    Remember the list of registered storage engines.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
892
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
893
  sv.setResourceContexts(resource_contexts);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
894
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
895
  if (shouldConstructMessages())
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
896
  {
1746.7.5 by Joseph Daly
fix to get tests working with release savepoint
897
    message::Transaction *transaction= session->getTransactionMessage();
898
                  
899
    if (transaction != NULL)
900
    {
901
      message::Transaction *transaction_savepoint= 
902
        new message::Transaction(*transaction);
903
      sv.setTransactionSavepoint(transaction_savepoint);
904
    }
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
905
  } 
906
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
907
  return error;
908
}
909
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
910
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
911
{
912
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
913
914
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
915
916
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
917
       it != resource_contexts.end();
918
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
919
  {
920
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
921
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
922
923
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
924
925
    if (resource->participatesInSqlTransaction())
926
    {
927
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
928
      {
929
        my_error(ER_GET_ERRNO, MYF(0), err);
930
        error= 1;
931
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
932
    }
933
  }
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
934
  
935
  if (shouldConstructMessages())
1746.7.3 by Joseph Daly
add cleanup code
936
  {
937
    delete sv.getTransactionSavepoint();
938
    sv.setTransactionSavepoint(NULL);
939
  }
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
940
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
941
  return error;
942
}
943
1405.4.10 by Jay Pipes
OK, Sun Studio still didn't like that...seems to think that inline means something different than other compilers think it is...
944
bool TransactionServices::shouldConstructMessages()
945
{
946
  ReplicationServices &replication_services= ReplicationServices::singleton();
947
  return replication_services.isActive();
948
}
949
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
950
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
951
{
952
  message::Transaction *transaction= in_session->getTransactionMessage();
953
954
  if (unlikely(transaction == NULL))
955
  {
956
    /* 
957
     * Allocate and initialize a new transaction message 
958
     * for this Session object.  Session is responsible for
959
     * deleting transaction message when done with it.
960
     */
961
    transaction= new (nothrow) message::Transaction();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
962
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
963
    in_session->setTransactionMessage(transaction);
964
    return transaction;
965
  }
966
  else
967
    return transaction;
968
}
969
970
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
971
                                                 Session *in_session,
972
                                                 bool should_inc_trx_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
973
{
974
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
975
  trx->set_server_id(in_session->getServerId());
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
976
977
  if (should_inc_trx_id)
978
    trx->set_transaction_id(getNextTransactionId());
979
  else
980
    trx->set_transaction_id(getCurrentTransactionId());
981
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
982
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
983
}
984
985
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
986
                                              Session *in_session)
987
{
988
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
989
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
990
}
991
992
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
993
                                             Session *in_session)
994
{
995
  delete in_transaction;
996
  in_session->setStatementMessage(NULL);
997
  in_session->setTransactionMessage(NULL);
998
}
999
1405.3.6 by Jay Pipes
Here, we do two main things:
1000
int TransactionServices::commitTransactionMessage(Session *in_session)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1001
{
1002
  ReplicationServices &replication_services= ReplicationServices::singleton();
1003
  if (! replication_services.isActive())
1405.3.6 by Jay Pipes
Here, we do two main things:
1004
    return 0;
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1005
1006
  /* If there is an active statement message, finalize it */
1007
  message::Statement *statement= in_session->getStatementMessage();
1008
1009
  if (statement != NULL)
1010
  {
1011
    finalizeStatementMessage(*statement, in_session);
1012
  }
1013
  else
1405.3.6 by Jay Pipes
Here, we do two main things:
1014
    return 0; /* No data modification occurred inside the transaction */
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1015
  
1016
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1017
1018
  finalizeTransactionMessage(*transaction, in_session);
1019
  
1405.3.6 by Jay Pipes
Here, we do two main things:
1020
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1021
1022
  cleanupTransactionMessage(transaction, in_session);
1405.3.6 by Jay Pipes
Here, we do two main things:
1023
1024
  return static_cast<int>(result);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1025
}
1026
1027
void TransactionServices::initStatementMessage(message::Statement &statement,
1028
                                        message::Statement::Type in_type,
1029
                                        Session *in_session)
1030
{
1031
  statement.set_type(in_type);
1032
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1033
  /** @TODO Set sql string optionally */
1034
}
1035
1036
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1037
                                            Session *in_session)
1038
{
1039
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1040
  in_session->setStatementMessage(NULL);
1041
}
1042
1043
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1044
{
1045
  ReplicationServices &replication_services= ReplicationServices::singleton();
1046
  if (! replication_services.isActive())
1047
    return;
1048
  
1049
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1050
1051
  /*
1052
   * OK, so there are two situations that we need to deal with here:
1053
   *
1054
   * 1) We receive an instruction to ROLLBACK the current transaction
1055
   *    and the currently-stored Transaction message is *self-contained*, 
1056
   *    meaning that no Statement messages in the Transaction message
1057
   *    contain a message having its segment_id member greater than 1.  If
1058
   *    no non-segment ID 1 members are found, we can simply clear the
1059
   *    current Transaction message and remove it from memory.
1060
   *
1061
   * 2) If the Transaction message does indeed have a non-end segment, that
1062
   *    means that a bulk update/delete/insert Transaction message segment
1063
   *    has previously been sent over the wire to replicators.  In this case, 
1064
   *    we need to package a Transaction with a Statement message of type
1065
   *    ROLLBACK to indicate to replicators that previously-transmitted
1066
   *    messages must be un-applied.
1067
   */
1068
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1069
  {
1070
    /*
1071
     * Clear the transaction, create a Rollback statement message, 
1072
     * attach it to the transaction, and push it to replicators.
1073
     */
1074
    transaction->Clear();
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1075
    initTransactionMessage(*transaction, in_session, false);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1076
1077
    message::Statement *statement= transaction->add_statement();
1078
1079
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1080
    finalizeStatementMessage(*statement, in_session);
1081
1082
    finalizeTransactionMessage(*transaction, in_session);
1083
    
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1084
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1085
  }
1086
  cleanupTransactionMessage(transaction, in_session);
1087
}
1088
1089
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1090
                                                            Table *in_table,
1091
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1092
{
1093
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1094
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1095
1096
  /* 
1097
   * Check the type for the current Statement message, if it is anything
1098
   * other then INSERT we need to call finalize, this will ensure a 
1099
   * new InsertStatement is created. If it is of type INSERT check
1100
   * what table the INSERT belongs to, if it is a different table
1101
   * call finalize, so a new InsertStatement can be created. 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1102
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1103
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1104
  {
1105
    finalizeStatementMessage(*statement, in_session);
1106
    statement= in_session->getStatementMessage();
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1107
  } 
1108
  else if (statement != NULL)
1109
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1110
    transaction= getActiveTransactionMessage(in_session);
1111
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1112
    /*
1113
     * If we've passed our threshold for the statement size (possible for
1114
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1115
     * the Transaction will keep it from getting huge).
1116
     */
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1117
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1118
    {
1119
      message::InsertData *current_data= statement->mutable_insert_data();
1120
1121
      /* Caller should use this value when adding a new record */
1122
      *next_segment_id= current_data->segment_id() + 1;
1123
1124
      current_data->set_end_segment(false);
1125
1126
      /* 
1127
       * Send the trx message to replicators after finalizing the 
1128
       * statement and transaction. This will also set the Transaction
1129
       * and Statement objects in Session to NULL.
1130
       */
1131
      commitTransactionMessage(in_session);
1132
1133
      /*
1134
       * Statement and Transaction should now be NULL, so new ones will get
1135
       * created. We reuse the transaction id since we are segmenting
1136
       * one transaction.
1137
       */
1138
      statement= in_session->getStatementMessage();
1139
      transaction= getActiveTransactionMessage(in_session, false);
1140
    }
1141
    else
1142
    {
1143
      const message::InsertHeader &insert_header= statement->insert_header();
1144
      string old_table_name= insert_header.table_metadata().table_name();
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1145
     
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1146
      string current_table_name;
1147
      (void) in_table->getShare()->getTableName(current_table_name);
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1148
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1149
      if (current_table_name.compare(old_table_name))
1150
      {
1151
        finalizeStatementMessage(*statement, in_session);
1152
        statement= in_session->getStatementMessage();
1153
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1154
      else
1155
      {
1156
        /* carry forward the existing segment id */
1157
        const message::InsertData &current_data= statement->insert_data();
1158
        *next_segment_id= current_data.segment_id();
1159
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1160
    }
1161
  } 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1162
1163
  if (statement == NULL)
1164
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1165
    /*
1166
     * Transaction will be non-NULL only if we had to segment it due to
1167
     * transaction size above.
1168
     */
1169
    if (transaction == NULL)
1170
      transaction= getActiveTransactionMessage(in_session);
1171
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1172
    /* 
1173
     * Transaction message initialized and set, but no statement created
1174
     * yet.  We construct one and initialize it, here, then return the
1175
     * message after attaching the new Statement message pointer to the 
1176
     * Session for easy retrieval later...
1177
     */
1178
    statement= transaction->add_statement();
1179
    setInsertHeader(*statement, in_session, in_table);
1180
    in_session->setStatementMessage(statement);
1181
  }
1182
  return *statement;
1183
}
1184
1185
void TransactionServices::setInsertHeader(message::Statement &statement,
1186
                                          Session *in_session,
1187
                                          Table *in_table)
1188
{
1189
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1190
1191
  /* 
1192
   * Now we construct the specialized InsertHeader message inside
1193
   * the generalized message::Statement container...
1194
   */
1195
  /* Set up the insert header */
1196
  message::InsertHeader *header= statement.mutable_insert_header();
1197
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1198
1336.2.3 by Jay Pipes
Merge trunk and resolve
1199
  string schema_name;
1200
  (void) in_table->getShare()->getSchemaName(schema_name);
1201
  string table_name;
1202
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1203
1336.2.3 by Jay Pipes
Merge trunk and resolve
1204
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1205
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1206
1207
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1208
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1209
1210
  message::FieldMetadata *field_metadata;
1211
1212
  /* We will read all the table's fields... */
1213
  in_table->setReadSet();
1214
1215
  while ((current_field= *table_fields++) != NULL) 
1216
  {
1217
    field_metadata= header->add_field_metadata();
1218
    field_metadata->set_name(current_field->field_name);
1219
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1220
  }
1221
}
1222
1223
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1224
{
1225
  ReplicationServices &replication_services= ReplicationServices::singleton();
1226
  if (! replication_services.isActive())
1227
    return false;
1228
  /**
1229
   * We do this check here because we don't want to even create a 
1230
   * statement if there isn't a primary key on the table...
1231
   *
1232
   * @todo
1233
   *
1234
   * Multi-column primary keys are handled how exactly?
1235
   */
1618 by Brian Aker
This is a rollup set of patches for modifications to TableIdentifier to have
1236
  if (not in_table->getShare()->hasPrimaryKey())
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1237
  {
1238
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1239
    return true;
1240
  }
1241
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1242
  uint32_t next_segment_id= 1;
1243
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1244
1245
  message::InsertData *data= statement.mutable_insert_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1246
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1247
  data->set_end_segment(true);
1248
  message::InsertRecord *record= data->add_record();
1249
1250
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1251
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1252
1253
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1254
  string_value->set_charset(system_charset_info);
1255
1256
  /* We will read all the table's fields... */
1257
  in_table->setReadSet();
1258
1259
  while ((current_field= *table_fields++) != NULL) 
1260
  {
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1261
    if (current_field->is_null())
1262
    {
1263
      record->add_is_null(true);
1667.3.7 by Joe Daly
merge trunk
1264
      record->add_insert_value("", 0);
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1265
    } 
1266
    else 
1267
    {
1268
      string_value= current_field->val_str(string_value);
1269
      record->add_is_null(false);
1270
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1271
      string_value->free();
1272
    }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1273
  }
1274
  return false;
1275
}
1276
1277
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1278
                                                            Table *in_table,
1279
                                                            const unsigned char *old_record, 
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1280
                                                            const unsigned char *new_record,
1281
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1282
{
1283
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1284
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1285
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1286
  /*
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1287
   * Check the type for the current Statement message, if it is anything
1288
   * other then UPDATE we need to call finalize, this will ensure a
1289
   * new UpdateStatement is created. If it is of type UPDATE check
1290
   * what table the UPDATE belongs to, if it is a different table
1291
   * call finalize, so a new UpdateStatement can be created.
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1292
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1293
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1294
  {
1295
    finalizeStatementMessage(*statement, in_session);
1296
    statement= in_session->getStatementMessage();
1297
  }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1298
  else if (statement != NULL)
1299
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1300
    transaction= getActiveTransactionMessage(in_session);
1301
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1302
    /*
1303
     * If we've passed our threshold for the statement size (possible for
1304
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1305
     * the Transaction will keep it from getting huge).
1306
     */
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1307
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1308
    {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1309
      message::UpdateData *current_data= statement->mutable_update_data();
1310
1311
      /* Caller should use this value when adding a new record */
1312
      *next_segment_id= current_data->segment_id() + 1;
1313
1314
      current_data->set_end_segment(false);
1315
1316
      /*
1317
       * Send the trx message to replicators after finalizing the 
1318
       * statement and transaction. This will also set the Transaction
1319
       * and Statement objects in Session to NULL.
1320
       */
1321
      commitTransactionMessage(in_session);
1322
1323
      /*
1324
       * Statement and Transaction should now be NULL, so new ones will get
1325
       * created. We reuse the transaction id since we are segmenting
1326
       * one transaction.
1327
       */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1328
      statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1329
      transaction= getActiveTransactionMessage(in_session, false);
1330
    }
1331
    else
1332
    {
1333
      const message::UpdateHeader &update_header= statement->update_header();
1334
      string old_table_name= update_header.table_metadata().table_name();
1335
1336
      string current_table_name;
1337
      (void) in_table->getShare()->getTableName(current_table_name);
1338
      if (current_table_name.compare(old_table_name))
1339
      {
1340
        finalizeStatementMessage(*statement, in_session);
1341
        statement= in_session->getStatementMessage();
1342
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1343
      else
1344
      {
1345
        /* carry forward the existing segment id */
1346
        const message::UpdateData &current_data= statement->update_data();
1347
        *next_segment_id= current_data.segment_id();
1348
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1349
    }
1350
  }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1351
1352
  if (statement == NULL)
1353
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1354
    /*
1355
     * Transaction will be non-NULL only if we had to segment it due to
1356
     * transaction size above.
1357
     */
1358
    if (transaction == NULL)
1359
      transaction= getActiveTransactionMessage(in_session);
1360
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1361
    /* 
1362
     * Transaction message initialized and set, but no statement created
1363
     * yet.  We construct one and initialize it, here, then return the
1364
     * message after attaching the new Statement message pointer to the 
1365
     * Session for easy retrieval later...
1366
     */
1367
    statement= transaction->add_statement();
1368
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1369
    in_session->setStatementMessage(statement);
1370
  }
1371
  return *statement;
1372
}
1373
1374
void TransactionServices::setUpdateHeader(message::Statement &statement,
1375
                                          Session *in_session,
1376
                                          Table *in_table,
1377
                                          const unsigned char *old_record, 
1378
                                          const unsigned char *new_record)
1379
{
1380
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1381
1382
  /* 
1383
   * Now we construct the specialized UpdateHeader message inside
1384
   * the generalized message::Statement container...
1385
   */
1386
  /* Set up the update header */
1387
  message::UpdateHeader *header= statement.mutable_update_header();
1388
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1389
1336.2.3 by Jay Pipes
Merge trunk and resolve
1390
  string schema_name;
1391
  (void) in_table->getShare()->getSchemaName(schema_name);
1392
  string table_name;
1393
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1394
1336.2.3 by Jay Pipes
Merge trunk and resolve
1395
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1396
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1397
1398
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1399
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1400
1401
  message::FieldMetadata *field_metadata;
1402
1403
  /* We will read all the table's fields... */
1404
  in_table->setReadSet();
1405
1406
  while ((current_field= *table_fields++) != NULL) 
1407
  {
1408
    /*
1409
     * We add the "key field metadata" -- i.e. the fields which is
1410
     * the primary key for the table.
1411
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1412
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1413
    {
1414
      field_metadata= header->add_key_field_metadata();
1415
      field_metadata->set_name(current_field->field_name);
1416
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1417
    }
1418
1419
    /*
1420
     * The below really should be moved into the Field API and Record API.  But for now
1421
     * we do this crazy pointer fiddling to figure out if the current field
1422
     * has been updated in the supplied record raw byte pointers.
1423
     */
1672.3.6 by Brian Aker
First pass in encapsulating row
1424
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1425
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1426
1427
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1428
1429
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1430
    {
1431
      /* Field is changed from old to new */
1432
      field_metadata= header->add_set_field_metadata();
1433
      field_metadata->set_name(current_field->field_name);
1434
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1435
    }
1436
  }
1437
}
1438
void TransactionServices::updateRecord(Session *in_session,
1439
                                       Table *in_table, 
1440
                                       const unsigned char *old_record, 
1441
                                       const unsigned char *new_record)
1442
{
1443
  ReplicationServices &replication_services= ReplicationServices::singleton();
1444
  if (! replication_services.isActive())
1445
    return;
1446
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1447
  uint32_t next_segment_id= 1;
1448
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1449
1450
  message::UpdateData *data= statement.mutable_update_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1451
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1452
  data->set_end_segment(true);
1453
  message::UpdateRecord *record= data->add_record();
1454
1455
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1456
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1457
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1458
  string_value->set_charset(system_charset_info);
1459
1460
  while ((current_field= *table_fields++) != NULL) 
1461
  {
1462
    /*
1463
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1464
     * but then realized that an UPDATE statement could potentially have different values for
1465
     * the SET field.  For instance, imagine this SQL scenario:
1466
     *
1467
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1468
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1469
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1470
     *
1471
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1472
     *
1473
     * The below really should be moved into the Field API and Record API.  But for now
1474
     * we do this crazy pointer fiddling to figure out if the current field
1475
     * has been updated in the supplied record raw byte pointers.
1476
     */
1672.3.6 by Brian Aker
First pass in encapsulating row
1477
    const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1478
    const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord()); 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1479
1480
    uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1481
1482
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1483
    {
1484
      /* Store the original "read bit" for this field */
1485
      bool is_read_set= current_field->isReadSet();
1486
1487
      /* We need to mark that we will "read" this field... */
1488
      in_table->setReadSet(current_field->field_index);
1489
1490
      /* Read the string value of this field's contents */
1491
      string_value= current_field->val_str(string_value);
1492
1493
      /* 
1494
       * Reset the read bit after reading field to its original state.  This 
1495
       * prevents the field from being included in the WHERE clause
1496
       */
1497
      current_field->setReadSet(is_read_set);
1498
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1499
      if (current_field->is_null())
1500
      {
1501
        record->add_is_null(true);
1667.3.7 by Joe Daly
merge trunk
1502
        record->add_after_value("", 0);
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1503
      }
1504
      else
1505
      {
1506
        record->add_is_null(false);
1507
        record->add_after_value(string_value->c_ptr(), string_value->length());
1508
      }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1509
      string_value->free();
1510
    }
1511
1512
    /* 
1513
     * Add the WHERE clause values now...for now, this means the
1514
     * primary key field value.  Replication only supports tables
1515
     * with a primary key.
1516
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1517
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1518
    {
1519
      /**
1520
       * To say the below is ugly is an understatement. But it works.
1521
       * 
1522
       * @todo Move this crap into a real Record API.
1523
       */
1524
      string_value= current_field->val_str(string_value,
1525
                                           old_record + 
1526
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1527
      record->add_key_value(string_value->c_ptr(), string_value->length());
1528
      string_value->free();
1529
    }
1530
1531
  }
1532
}
1533
1534
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1535
                                                            Table *in_table,
1536
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1537
{
1538
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1539
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1540
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1541
  /*
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1542
   * Check the type for the current Statement message, if it is anything
1543
   * other then DELETE we need to call finalize, this will ensure a
1544
   * new DeleteStatement is created. If it is of type DELETE check
1545
   * what table the DELETE belongs to, if it is a different table
1546
   * call finalize, so a new DeleteStatement can be created.
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1547
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1548
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1549
  {
1550
    finalizeStatementMessage(*statement, in_session);
1551
    statement= in_session->getStatementMessage();
1552
  }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1553
  else if (statement != NULL)
1554
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1555
    transaction= getActiveTransactionMessage(in_session);
1556
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1557
    /*
1558
     * If we've passed our threshold for the statement size (possible for
1559
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1560
     * the Transaction will keep it from getting huge).
1561
     */
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1562
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1563
    {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1564
      message::DeleteData *current_data= statement->mutable_delete_data();
1565
1566
      /* Caller should use this value when adding a new record */
1567
      *next_segment_id= current_data->segment_id() + 1;
1568
1569
      current_data->set_end_segment(false);
1570
1571
      /* 
1572
       * Send the trx message to replicators after finalizing the 
1573
       * statement and transaction. This will also set the Transaction
1574
       * and Statement objects in Session to NULL.
1575
       */
1576
      commitTransactionMessage(in_session);
1577
1578
      /*
1579
       * Statement and Transaction should now be NULL, so new ones will get
1580
       * created. We reuse the transaction id since we are segmenting
1581
       * one transaction.
1582
       */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1583
      statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1584
      transaction= getActiveTransactionMessage(in_session, false);
1585
    }
1586
    else
1587
    {
1588
      const message::DeleteHeader &delete_header= statement->delete_header();
1589
      string old_table_name= delete_header.table_metadata().table_name();
1590
1591
      string current_table_name;
1592
      (void) in_table->getShare()->getTableName(current_table_name);
1593
      if (current_table_name.compare(old_table_name))
1594
      {
1595
        finalizeStatementMessage(*statement, in_session);
1596
        statement= in_session->getStatementMessage();
1597
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1598
      else
1599
      {
1600
        /* carry forward the existing segment id */
1601
        const message::DeleteData &current_data= statement->delete_data();
1602
        *next_segment_id= current_data.segment_id();
1603
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1604
    }
1605
  }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1606
1607
  if (statement == NULL)
1608
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1609
    /*
1610
     * Transaction will be non-NULL only if we had to segment it due to
1611
     * transaction size above.
1612
     */
1613
    if (transaction == NULL)
1614
      transaction= getActiveTransactionMessage(in_session);
1615
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1616
    /* 
1617
     * Transaction message initialized and set, but no statement created
1618
     * yet.  We construct one and initialize it, here, then return the
1619
     * message after attaching the new Statement message pointer to the 
1620
     * Session for easy retrieval later...
1621
     */
1622
    statement= transaction->add_statement();
1623
    setDeleteHeader(*statement, in_session, in_table);
1624
    in_session->setStatementMessage(statement);
1625
  }
1626
  return *statement;
1627
}
1628
1629
void TransactionServices::setDeleteHeader(message::Statement &statement,
1630
                                          Session *in_session,
1631
                                          Table *in_table)
1632
{
1633
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1634
1635
  /* 
1636
   * Now we construct the specialized DeleteHeader message inside
1637
   * the generalized message::Statement container...
1638
   */
1639
  message::DeleteHeader *header= statement.mutable_delete_header();
1640
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1641
1336.2.3 by Jay Pipes
Merge trunk and resolve
1642
  string schema_name;
1643
  (void) in_table->getShare()->getSchemaName(schema_name);
1644
  string table_name;
1645
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1646
1336.2.3 by Jay Pipes
Merge trunk and resolve
1647
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1648
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1649
1650
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1651
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1652
1653
  message::FieldMetadata *field_metadata;
1654
1655
  while ((current_field= *table_fields++) != NULL) 
1656
  {
1657
    /* 
1658
     * Add the WHERE clause values now...for now, this means the
1659
     * primary key field value.  Replication only supports tables
1660
     * with a primary key.
1661
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1662
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1663
    {
1664
      field_metadata= header->add_key_field_metadata();
1665
      field_metadata->set_name(current_field->field_name);
1666
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1667
    }
1668
  }
1669
}
1670
1730.5.1 by David Shrewsbury
Use the update record, not insert record, when record DELETE operations during REPLACE.
1671
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1672
{
1673
  ReplicationServices &replication_services= ReplicationServices::singleton();
1674
  if (! replication_services.isActive())
1675
    return;
1676
1746.1.1 by David Shrewsbury
Fix uninitialized variable.
1677
  uint32_t next_segment_id= 1;
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1678
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1679
1680
  message::DeleteData *data= statement.mutable_delete_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1681
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1682
  data->set_end_segment(true);
1683
  message::DeleteRecord *record= data->add_record();
1684
1685
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1686
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1687
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1688
  string_value->set_charset(system_charset_info);
1689
1690
  while ((current_field= *table_fields++) != NULL) 
1691
  {
1667.3.5 by Joe Daly
fix up spacing again
1692
    /* 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1693
     * Add the WHERE clause values now...for now, this means the
1694
     * primary key field value.  Replication only supports tables
1695
     * with a primary key.
1696
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1697
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1698
    {
1730.5.1 by David Shrewsbury
Use the update record, not insert record, when record DELETE operations during REPLACE.
1699
      if (use_update_record)
1700
      {
1701
        /*
1702
         * Temporarily point to the update record to get its value.
1703
         * This is pretty much a hack in order to get the PK value from
1704
         * the update record rather than the insert record. Field::val_str()
1705
         * should not change anything in Field::ptr, so this should be safe.
1706
         * We are careful not to change anything in old_ptr.
1707
         */
1708
        const unsigned char *old_ptr= current_field->ptr;
1709
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1710
        string_value= current_field->val_str(string_value);
1711
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1712
      }
1713
      else
1714
      {
1715
        string_value= current_field->val_str(string_value);
1716
        /**
1717
         * @TODO Store optional old record value in the before data member
1718
         */
1719
      }
1667.3.2 by Joe Daly
add a working test case, and more fixes so the test case works
1720
      record->add_key_value(string_value->c_ptr(), string_value->length());
1721
      string_value->free();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1722
    }
1723
  }
1724
}
1725
1726
void TransactionServices::createTable(Session *in_session,
1727
                                      const message::Table &table)
1728
{
1729
  ReplicationServices &replication_services= ReplicationServices::singleton();
1730
  if (! replication_services.isActive())
1731
    return;
1732
  
1733
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1734
  message::Statement *statement= transaction->add_statement();
1735
1736
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1737
1738
  /* 
1739
   * Construct the specialized CreateTableStatement message and attach
1740
   * it to the generic Statement message
1741
   */
1742
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1743
  message::Table *new_table_message= create_table_statement->mutable_table();
1744
  *new_table_message= table;
1745
1746
  finalizeStatementMessage(*statement, in_session);
1747
1748
  finalizeTransactionMessage(*transaction, in_session);
1749
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1750
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1751
1752
  cleanupTransactionMessage(transaction, in_session);
1753
1754
}
1755
1756
void TransactionServices::createSchema(Session *in_session,
1757
                                       const message::Schema &schema)
1758
{
1759
  ReplicationServices &replication_services= ReplicationServices::singleton();
1760
  if (! replication_services.isActive())
1761
    return;
1762
  
1763
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1764
  message::Statement *statement= transaction->add_statement();
1765
1766
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1767
1768
  /* 
1769
   * Construct the specialized CreateSchemaStatement message and attach
1770
   * it to the generic Statement message
1771
   */
1772
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1773
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1774
  *new_schema_message= schema;
1775
1776
  finalizeStatementMessage(*statement, in_session);
1777
1778
  finalizeTransactionMessage(*transaction, in_session);
1779
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1780
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1781
1782
  cleanupTransactionMessage(transaction, in_session);
1783
1784
}
1785
1786
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1787
{
1788
  ReplicationServices &replication_services= ReplicationServices::singleton();
1789
  if (! replication_services.isActive())
1790
    return;
1791
  
1792
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1793
  message::Statement *statement= transaction->add_statement();
1794
1795
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1796
1797
  /* 
1798
   * Construct the specialized DropSchemaStatement message and attach
1799
   * it to the generic Statement message
1800
   */
1801
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1802
1803
  drop_schema_statement->set_schema_name(schema_name);
1804
1805
  finalizeStatementMessage(*statement, in_session);
1806
1807
  finalizeTransactionMessage(*transaction, in_session);
1808
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1809
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1810
1811
  cleanupTransactionMessage(transaction, in_session);
1812
}
1813
1814
void TransactionServices::dropTable(Session *in_session,
1815
                                    const string &schema_name,
1816
                                    const string &table_name,
1817
                                    bool if_exists)
1818
{
1819
  ReplicationServices &replication_services= ReplicationServices::singleton();
1820
  if (! replication_services.isActive())
1821
    return;
1822
  
1823
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1824
  message::Statement *statement= transaction->add_statement();
1825
1826
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1827
1828
  /* 
1829
   * Construct the specialized DropTableStatement message and attach
1830
   * it to the generic Statement message
1831
   */
1832
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1833
1834
  drop_table_statement->set_if_exists_clause(if_exists);
1835
1836
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1837
1838
  table_metadata->set_schema_name(schema_name);
1839
  table_metadata->set_table_name(table_name);
1840
1841
  finalizeStatementMessage(*statement, in_session);
1842
1843
  finalizeTransactionMessage(*transaction, in_session);
1844
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1845
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1846
1847
  cleanupTransactionMessage(transaction, in_session);
1848
}
1849
1850
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1851
{
1852
  ReplicationServices &replication_services= ReplicationServices::singleton();
1853
  if (! replication_services.isActive())
1854
    return;
1855
  
1856
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1857
  message::Statement *statement= transaction->add_statement();
1858
1859
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1860
1861
  /* 
1862
   * Construct the specialized TruncateTableStatement message and attach
1863
   * it to the generic Statement message
1864
   */
1865
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1866
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1867
1336.2.3 by Jay Pipes
Merge trunk and resolve
1868
  string schema_name;
1869
  (void) in_table->getShare()->getSchemaName(schema_name);
1870
  string table_name;
1871
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1872
1336.2.3 by Jay Pipes
Merge trunk and resolve
1873
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1874
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1875
1876
  finalizeStatementMessage(*statement, in_session);
1877
1878
  finalizeTransactionMessage(*transaction, in_session);
1879
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1880
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1881
1882
  cleanupTransactionMessage(transaction, in_session);
1883
}
1884
1885
void TransactionServices::rawStatement(Session *in_session, const string &query)
1886
{
1887
  ReplicationServices &replication_services= ReplicationServices::singleton();
1888
  if (! replication_services.isActive())
1889
    return;
1890
  
1891
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1892
  message::Statement *statement= transaction->add_statement();
1893
1894
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1895
  statement->set_sql(query);
1896
  finalizeStatementMessage(*statement, in_session);
1897
1898
  finalizeTransactionMessage(*transaction, in_session);
1899
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1900
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1901
1902
  cleanupTransactionMessage(transaction, in_session);
1903
}
1904
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1905
} /* namespace drizzled */