~drizzle-trunk/drizzle/development

1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2008 Sun Microsystems
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
6
 *
7
 *  This program is free software; you can redistribute it and/or modify
8
 *  it under the terms of the GNU General Public License as published by
9
 *  the Free Software Foundation; version 2 of the License.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program; if not, write to the Free Software
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
21
/**
22
 * @file Transaction processing code
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
23
 *
24
 * @note
25
 *
26
 * The TransactionServices component takes internal events (for instance the start of a 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 * component and used during replication.
30
 *
31
 * The reason for this functionality is to encapsulate all communication
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 * Instead of the plugin having to understand the (often fluidly changing)
34
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 * these messages.
37
 *
38
 * @see /drizzled/message/transaction.proto
39
 *
40
 * @todo
41
 *
42
 * We really should store the raw bytes in the messages, not the
43
 * String value of the Field.  But, to do that, the
44
 * statement_transform library needs first to be updated
45
 * to include the transformation code to convert raw
46
 * Drizzle-internal Field byte representation into something
47
 * plugins can understand.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
48
 */
49
50
#include "config.h"
51
#include "drizzled/my_hash.h"
52
#include "drizzled/error.h"
53
#include "drizzled/gettext.h"
54
#include "drizzled/probes.h"
55
#include "drizzled/sql_parse.h"
56
#include "drizzled/session.h"
57
#include "drizzled/sql_base.h"
58
#include "drizzled/replication_services.h"
59
#include "drizzled/transaction_services.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
60
#include "drizzled/transaction_context.h"
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
61
#include "drizzled/message/transaction.pb.h"
62
#include "drizzled/message/statement_transform.h"
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
63
#include "drizzled/resource_context.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
64
#include "drizzled/lock.h"
65
#include "drizzled/item/int.h"
66
#include "drizzled/item/empty_string.h"
67
#include "drizzled/field/timestamp.h"
68
#include "drizzled/plugin/client.h"
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
#include "drizzled/plugin/xa_resource_manager.h"
1856.2.6 by Joseph Daly
merge trunk
72
#include "drizzled/plugin/xa_storage_engine.h"
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
73
#include "drizzled/internal/my_sys.h"
74
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
75
#include <vector>
76
#include <algorithm>
77
#include <functional>
1861.6.3 by David Shrewsbury
Move GPB manipulation code out of Session and into TransactionServices.
78
#include <google/protobuf/repeated_field.h>
79
80
using namespace std;
81
using namespace google;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
82
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
83
namespace drizzled
84
{
85
86
/**
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
87
 * @defgroup Transactions
88
 *
89
 * @brief
90
 *
91
 * Transaction handling in the server
92
 *
93
 * @detail
94
 *
95
 * In each client connection, Drizzle maintains two transaction
96
 * contexts representing the state of the:
97
 *
98
 * 1) Statement Transaction
99
 * 2) Normal Transaction
100
 *
101
 * These two transaction contexts represent the transactional
102
 * state of a Session's SQL and XA transactions for a single
103
 * SQL statement or a series of SQL statements.
104
 *
105
 * When the Session's connection is in AUTOCOMMIT mode, there
106
 * is no practical difference between the statement and the
107
 * normal transaction, as each SQL statement is committed or
108
 * rolled back depending on the success or failure of the
109
 * indvidual SQL statement.
110
 *
111
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
 * the Session has explicitly begun a normal SQL transaction using
113
 * a BEGIN WORK/START TRANSACTION statement, then the normal
114
 * transaction context tracks the aggregate transaction state of
115
 * the SQL transaction's individual statements, and the SQL
116
 * transaction's commit or rollback is done atomically for all of
117
 * the SQL transaction's statement's data changes.
118
 *
119
 * Technically, a statement transaction can be viewed as a savepoint 
120
 * which is maintained automatically in order to make effects of one
121
 * statement atomic.
122
 *
123
 * The normal transaction is started by the user and is typically
124
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
 * The exception to this is that DDL statements implicitly COMMIT
126
 * any previously active normal transaction before they begin executing.
127
 *
128
 * In Drizzle, unlike MySQL, plugins other than a storage engine
129
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
130
 * plugins will automatically be monitored by Drizzle's transaction 
131
 * manager (implemented in this source file), as will all plugins which
132
 * implement plugin::XaResourceManager and register with the transaction
133
 * manager.
134
 *
135
 * If Drizzle's transaction manager sees that more than one resource
136
 * manager (transactional storage engine or XA resource manager) has modified
137
 * data state during a statement or normal transaction, the transaction
138
 * manager will automatically use a two-phase commit protocol for all
139
 * resources which support XA's distributed transaction protocol.  Unlike
140
 * MySQL, storage engines need not manually register with the transaction
141
 * manager during a statement's execution.  Previously, in MySQL, all
142
 * handlertons would have to call trans_register_ha() at some point after
143
 * modifying data state in order to have MySQL include that handler in
144
 * an XA transaction.  Drizzle does all of this grunt work behind the
145
 * scenes for the storage engine implementers.
146
 *
147
 * When a connection is closed, the current normal transaction, if
148
 * any is currently active, is rolled back.
149
 *
150
 * Transaction life cycle
151
 * ----------------------
152
 *
153
 * When a new connection is established, session->transaction
154
 * members are initialized to an empty state. If a statement uses any tables, 
155
 * all affected engines are registered in the statement engine list automatically
156
 * in plugin::StorageEngine::startStatement() and 
157
 * plugin::TransactionalStorageEngine::startTransaction().
158
 *
159
 * You can view the lifetime of a normal transaction in the following
160
 * call-sequence:
161
 *
162
 * drizzled::statement::Statement::execute()
163
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
 *     drizzled::TransactionServices::registerResourceForTransaction()
165
 *     drizzled::TransactionServices::registerResourceForStatement()
166
 *     drizzled::plugin::StorageEngine::startStatement()
167
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
 *     drizzled::plugin::StorageEngine::endStatement()
169
 *   drizzled::TransactionServices::autocommitOrRollback()
170
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
172
 *
173
 * Roles and responsibilities
174
 * --------------------------
175
 *
176
 * Beginning of SQL Statement (and Statement Transaction)
177
 * ------------------------------------------------------
178
 *
179
 * At the start of each SQL statement, for each storage engine
180
 * <strong>that is involved in the SQL statement</strong>, the kernel 
181
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
182
 * engine needs to track some data for the statement, it should use
183
 * this method invocation to initialize this data.  This is the
184
 * beginning of what is called the "statement transaction".
185
 *
186
 * <strong>For transaction storage engines (those storage engines
187
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
 * kernel automatically determines if the start of the SQL statement 
189
 * transaction should <em>also</em> begin the normal SQL transaction.
190
 * This occurs when the connection is in NOT in autocommit mode. If
191
 * the kernel detects this, then the kernel automatically starts the
192
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
 * method and then calls plugin::StorageEngine::startStatement()
194
 * afterwards.
195
 *
196
 * Beginning of an SQL "Normal" Transaction
197
 * ----------------------------------------
198
 *
199
 * As noted above, a "normal SQL transaction" may be started when
200
 * an SQL statement is started in a connection and the connection is
201
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
202
 *
203
 * In addition, when a user executes a START TRANSACTION or
204
 * BEGIN WORK statement in a connection, the kernel explicitly
205
 * calls each transactional storage engine's startTransaction() method.
206
 *
207
 * Ending of an SQL Statement (and Statement Transaction)
208
 * ------------------------------------------------------
209
 *
210
 * At the end of each SQL statement, for each of the aforementioned
211
 * involved storage engines, the kernel calls the engine's
212
 * plugin::StorageEngine::endStatement() method.  If the engine
213
 * has initialized or modified some internal data about the
214
 * statement transaction, it should use this method to reset or destroy
215
 * this data appropriately.
216
 *
217
 * Ending of an SQL "Normal" Transaction
218
 * -------------------------------------
219
 *
220
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
221
 * depending on the success or failure of the statement transaction(s) 
222
 * it encloses.
223
 *
224
 * The end of a "normal transaction" occurs when any of the following
225
 * occurs:
226
 *
227
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
 *    then the normal transaction which encloses the statement
229
 *    transaction ends
230
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
 * 3) Just before a DDL operation occurs, the kernel will implicitly
232
 *    commit the active normal transaction
233
 *
234
 * Transactions and Non-transactional Storage Engines
235
 * --------------------------------------------------
236
 *
237
 * For non-transactional engines, this call can be safely ignored, an
238
 * the kernel tracks whether a non-transactional engine has changed
239
 * any data state, and warns the user appropriately if a transaction
240
 * (statement or normal) is rolled back after such non-transactional
241
 * data changes have been made.
242
 *
243
 * XA Two-phase Commit Protocol
244
 * ----------------------------
245
 *
246
 * During statement execution, whenever any of data-modifying
247
 * PSEA API methods is used, e.g. Cursor::write_row() or
248
 * Cursor::update_row(), the read-write flag is raised in the
249
 * statement transaction for the involved engine.
250
 * Currently All PSEA calls are "traced", and the data can not be
251
 * changed in a way other than issuing a PSEA call. Important:
252
 * unless this invariant is preserved the server will not know that
253
 * a transaction in a given engine is read-write and will not
254
 * involve the two-phase commit protocol!
255
 *
256
 * At the end of a statement, TransactionServices::autocommitOrRollback()
257
 * is invoked. This call in turn
258
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
259
 * resource manager.
260
 *
261
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
263
 * is...)
264
 * 
265
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
 * At statement commit, the statement-related read-write engine
268
 * flag is propagated to the corresponding flag in the normal
269
 * transaction.  When the commit is complete, the list of registered
270
 * engines is cleared.
271
 *
272
 * Rollback is handled in a similar fashion.
273
 *
274
 * Additional notes on DDL and the normal transaction.
275
 * ---------------------------------------------------
276
 *
277
 * CREATE TABLE .. SELECT can start a *new* normal transaction
278
 * because of the fact that SELECTs on a transactional storage
279
 * engine participate in the normal SQL transaction (due to
280
 * isolation level issues and consistent read views).
281
 *
282
 * Behaviour of the server in this case is currently badly
283
 * defined.
284
 *
285
 * DDL statements use a form of "semantic" logging
286
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
 * the newly created table is deleted.
288
 * 
289
 * In addition, some DDL statements issue interim transaction
290
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
 * from the original table to the internal temporary table. Other
292
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
293
 * after itself.
294
 *
295
 * And finally there is a group of DDL statements such as
296
 * RENAME/DROP TABLE that doesn't start a new transaction
297
 * and doesn't commit.
298
 *
299
 * A consistent behaviour is perhaps to always commit the normal
300
 * transaction after all DDLs, just like the statement transaction
301
 * is always committed at the end of all statements.
302
 */
1856.2.6 by Joseph Daly
merge trunk
303
TransactionServices::TransactionServices()
304
{
305
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
306
  if (engine)
307
  {
308
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
309
  }
310
  else 
311
  {
312
    xa_storage_engine= NULL;
313
  }
314
}
315
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
316
void TransactionServices::registerResourceForStatement(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
317
                                                       plugin::MonitoredInTransaction *monitored,
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
318
                                                       plugin::TransactionalStorageEngine *engine)
319
{
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
320
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
  {
322
    /* 
323
     * Now we automatically register this resource manager for the
324
     * normal transaction.  This is fine because a statement
325
     * transaction registration should always enlist the resource
326
     * in the normal transaction which contains the statement
327
     * transaction.
328
     */
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
329
    registerResourceForTransaction(session, monitored, engine);
330
  }
331
332
  TransactionContext *trans= &session->transaction.stmt;
333
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
334
335
  if (resource_context->isStarted())
336
    return; /* already registered, return */
337
338
  assert(monitored->participatesInSqlTransaction());
339
  assert(not monitored->participatesInXaTransaction());
340
341
  resource_context->setMonitored(monitored);
342
  resource_context->setTransactionalStorageEngine(engine);
343
  trans->registerResource(resource_context);
344
345
  trans->no_2pc|= true;
346
}
347
348
void TransactionServices::registerResourceForStatement(Session *session,
349
                                                       plugin::MonitoredInTransaction *monitored,
350
                                                       plugin::TransactionalStorageEngine *engine,
351
                                                       plugin::XaResourceManager *resource_manager)
352
{
353
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
  {
355
    /* 
356
     * Now we automatically register this resource manager for the
357
     * normal transaction.  This is fine because a statement
358
     * transaction registration should always enlist the resource
359
     * in the normal transaction which contains the statement
360
     * transaction.
361
     */
362
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
  }
364
365
  TransactionContext *trans= &session->transaction.stmt;
366
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
367
368
  if (resource_context->isStarted())
369
    return; /* already registered, return */
370
371
  assert(monitored->participatesInXaTransaction());
372
  assert(monitored->participatesInSqlTransaction());
373
374
  resource_context->setMonitored(monitored);
375
  resource_context->setTransactionalStorageEngine(engine);
376
  resource_context->setXaResourceManager(resource_manager);
377
  trans->registerResource(resource_context);
378
379
  trans->no_2pc|= false;
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
380
}
381
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
382
void TransactionServices::registerResourceForTransaction(Session *session,
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
383
                                                         plugin::MonitoredInTransaction *monitored,
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
384
                                                         plugin::TransactionalStorageEngine *engine)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
385
{
1273.1.22 by Jay Pipes
Automates registration of statement transaction resources. No more need for storage engines to call TransactionServices::trans_register_ha(session, false, engine). yeah \o/
386
  TransactionContext *trans= &session->transaction.all;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
387
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
388
389
  if (resource_context->isStarted())
390
    return; /* already registered, return */
391
392
  session->server_status|= SERVER_STATUS_IN_TRANS;
393
394
  trans->registerResource(resource_context);
395
396
  assert(monitored->participatesInSqlTransaction());
397
  assert(not monitored->participatesInXaTransaction());
398
399
  resource_context->setMonitored(monitored);
400
  resource_context->setTransactionalStorageEngine(engine);
401
  trans->no_2pc|= true;
402
403
  if (session->transaction.xid_state.xid.is_null())
404
    session->transaction.xid_state.xid.set(session->getQueryId());
405
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
    registerResourceForStatement(session, monitored, engine);
409
}
410
411
void TransactionServices::registerResourceForTransaction(Session *session,
412
                                                         plugin::MonitoredInTransaction *monitored,
413
                                                         plugin::TransactionalStorageEngine *engine,
414
                                                         plugin::XaResourceManager *resource_manager)
415
{
416
  TransactionContext *trans= &session->transaction.all;
417
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
418
419
  if (resource_context->isStarted())
420
    return; /* already registered, return */
421
422
  session->server_status|= SERVER_STATUS_IN_TRANS;
423
424
  trans->registerResource(resource_context);
425
426
  assert(monitored->participatesInSqlTransaction());
427
428
  resource_context->setMonitored(monitored);
429
  resource_context->setXaResourceManager(resource_manager);
430
  resource_context->setTransactionalStorageEngine(engine);
431
  trans->no_2pc|= true;
432
433
  if (session->transaction.xid_state.xid.is_null())
434
    session->transaction.xid_state.xid.set(session->getQueryId());
435
1333.1.1 by Jay Pipes
Manually issue a call to TransactionStorageEngine::startTransaction() inside
436
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
437
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
438
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
  if (! session->getResourceContext(monitored, 0)->isStarted())
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
441
}
442
1856.2.8 by Joseph Daly
working alter, drop, create schema
443
void TransactionServices::allocateNewTransactionId()
1856.2.7 by Joseph Daly
create schema changes
444
{
1856.2.8 by Joseph Daly
working alter, drop, create schema
445
  ReplicationServices &replication_services= ReplicationServices::singleton();
446
  if (! replication_services.isActive())
447
  {
448
    return;
449
  }
450
451
  Session *my_session= current_session;
452
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
  my_session->setXaId(xa_id);
1856.2.7 by Joseph Daly
create schema changes
454
}
455
1856.2.1 by Joseph Daly
use transaction_id in innodb for transaction log
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
{
1856.2.9 by Joseph Daly
zero out transaction id after its set in the GPB message
458
  if (session->getXaId() == 0)
1856.2.1 by Joseph Daly
use transaction_id in innodb for transaction log
459
  {
1856.2.9 by Joseph Daly
zero out transaction id after its set in the GPB message
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
1856.2.1 by Joseph Daly
use transaction_id in innodb for transaction log
461
  }
1856.2.9 by Joseph Daly
zero out transaction id after its set in the GPB message
462
1856.2.7 by Joseph Daly
create schema changes
463
  return session->getXaId();
1856.2.1 by Joseph Daly
use transaction_id in innodb for transaction log
464
}
465
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
466
/**
467
  @retval
468
    0   ok
469
  @retval
470
    1   transaction was rolled back
471
  @retval
472
    2   error during commit, data may be inconsistent
473
474
  @todo
475
    Since we don't support nested statement transactions in 5.0,
476
    we can't commit or rollback stmt transactions while we are inside
477
    stored functions or triggers. So we simply do nothing now.
478
    TODO: This should be fixed in later ( >= 5.1) releases.
479
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
480
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
481
{
482
  int error= 0, cookie= 0;
483
  /*
484
    'all' means that this is either an explicit commit issued by
485
    user, or an implicit commit issued by a DDL.
486
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
487
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
488
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
489
490
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
491
492
  /*
493
    We must not commit the normal transaction if a statement
494
    transaction is pending. Otherwise statement transaction
495
    flags will not get propagated to its normal transaction's
496
    counterpart.
497
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
498
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
499
              trans == &session->transaction.stmt);
500
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
501
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
502
  {
1910.2.3 by Brian Aker
Second pass on move code to global lock encapsulation.
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
504
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
505
      rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
506
      return 1;
507
    }
508
1405.3.6 by Jay Pipes
Here, we do two main things:
509
    /*
510
     * If replication is on, we do a PREPARE on the resource managers, push the
511
     * Transaction message across the replication stream, and then COMMIT if the
512
     * replication stream returned successfully.
513
     */
514
    if (shouldConstructMessages())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
515
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
516
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
517
           it != resource_contexts.end() && ! error;
518
           ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
519
      {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
520
        ResourceContext *resource_context= *it;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
521
        int err;
522
        /*
523
          Do not call two-phase commit if this particular
524
          transaction is read-only. This allows for simpler
525
          implementation in engines that are always read-only.
526
        */
1273.1.12 by Jay Pipes
Cleanup style and documentation for ResourceContext and setTransactionReadWrite
527
        if (! resource_context->hasModifiedData())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
528
          continue;
1273.1.15 by Jay Pipes
This patch completes the first step in the splitting of
529
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
530
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
531
532
        if (resource->participatesInXaTransaction())
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
533
        {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
534
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
535
          {
536
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
537
            error= 1;
538
          }
539
          else
540
          {
1689.5.1 by Joseph Daly
remove increment calls
541
            session->status_var.ha_prepare_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
542
          }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
543
        }
544
      }
1405.3.6 by Jay Pipes
Here, we do two main things:
545
      if (error == 0 && is_real_trans)
546
      {
547
        /*
548
         * Push the constructed Transaction messages across to
549
         * replicators and appliers.
550
         */
551
        error= commitTransactionMessage(session);
552
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
553
      if (error)
554
      {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
555
        rollbackTransaction(session, normal_transaction);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
556
        error= 1;
557
        goto end;
558
      }
559
    }
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
561
end:
562
    if (is_real_trans)
1910.2.2 by Brian Aker
First pass through the global lock refactor merge.
563
      session->startWaitingGlobalReadLock();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
564
  }
565
  return error;
566
}
567
568
/**
569
  @note
570
  This function does not care about global read lock. A caller should.
571
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
572
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
573
{
574
  int error=0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
575
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
576
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
577
578
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
579
580
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
581
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
582
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
583
         it != resource_contexts.end();
584
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
585
    {
586
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
587
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
588
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
589
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
590
591
      if (resource->participatesInXaTransaction())
592
      {
593
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
594
        {
595
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
596
          error= 1;
597
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
598
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
599
        {
1689.5.1 by Joseph Daly
remove increment calls
600
          session->status_var.ha_commit_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
601
        }
602
      }
603
      else if (resource->participatesInSqlTransaction())
604
      {
605
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
606
        {
607
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
608
          error= 1;
609
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
610
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
611
        {
1689.5.1 by Joseph Daly
remove increment calls
612
          session->status_var.ha_commit_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
613
        }
614
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
615
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
616
    }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
617
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
618
    if (is_real_trans)
619
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
620
621
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
622
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
623
      session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
624
      session->transaction.cleanup();
625
    }
626
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
627
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
628
  return error;
629
}
630
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
631
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
632
{
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
633
  int error= 0;
634
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
635
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
636
637
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
638
639
  /*
640
    We must not rollback the normal transaction if a statement
641
    transaction is pending.
642
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
643
  assert(session->transaction.stmt.getResourceContexts().empty() ||
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
644
              trans == &session->transaction.stmt);
645
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
646
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
647
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
648
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
649
         it != resource_contexts.end();
650
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
651
    {
652
      int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
653
      ResourceContext *resource_context= *it;
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
654
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
655
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
656
657
      if (resource->participatesInXaTransaction())
658
      {
659
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
660
        {
661
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
          error= 1;
663
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
664
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
665
        {
1689.5.1 by Joseph Daly
remove increment calls
666
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
667
        }
668
      }
669
      else if (resource->participatesInSqlTransaction())
670
      {
671
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
672
        {
673
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
          error= 1;
675
        }
1324.1.1 by Jay Pipes
Fixes Bug #535296 by only incrementing ha_commit_count when its a normal transaction commit.
676
        else if (normal_transaction)
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
677
        {
1689.5.1 by Joseph Daly
remove increment calls
678
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
679
        }
680
      }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
681
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
682
    }
683
    
684
    /* 
685
     * We need to signal the ROLLBACK to ReplicationServices here
686
     * BEFORE we set the transaction ID to NULL.  This is because
687
     * if a bulk segment was sent to replicators, we need to send
688
     * a rollback statement with the corresponding transaction ID
689
     * to rollback.
690
     */
1796.3.1 by David Shrewsbury
Fix for accidentally deleting all GPB Transaction message contents recorded before a single failing statement contained within the transaction.
691
    if (normal_transaction)
692
      rollbackTransactionMessage(session);
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
693
    else
694
      rollbackStatementMessage(session);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
695
696
    if (is_real_trans)
697
      session->transaction.xid_state.xid.null();
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
698
    if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
699
    {
700
      session->variables.tx_isolation=session->session_tx_isolation;
701
      session->transaction.cleanup();
702
    }
703
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
704
  if (normal_transaction)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
705
    session->transaction_rollback_request= false;
706
707
  /*
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
708
   * If a non-transactional table was updated, warn the user
709
   */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
710
  if (is_real_trans &&
1273.1.13 by Jay Pipes
Style cleanup around TransactionContext::modified_non_trans_table and dead code removal
711
      session->transaction.all.hasModifiedNonTransData() &&
1910.2.8 by Brian Aker
Enapsulate Kill.
712
      session->getKilled() != Session::KILL_CONNECTION)
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
713
  {
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
714
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
716
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
717
  }
1273.1.27 by Jay Pipes
Completes the work of removing the weirdness around transaction
718
  trans->reset();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
719
  return error;
720
}
721
722
/**
723
  This is used to commit or rollback a single statement depending on
724
  the value of error.
725
726
  @note
727
    Note that if the autocommit is on, then the following call inside
728
    InnoDB will commit or rollback the whole transaction (= the statement). The
729
    autocommit mechanism built into InnoDB is based on counting locks, but if
730
    the user has used LOCK TABLES then that mechanism does not know to do the
731
    commit.
732
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
733
int TransactionServices::autocommitOrRollback(Session *session, int error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
734
{
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
735
  /* One GPB Statement message per SQL statement */
736
  message::Statement *statement= session->getStatementMessage();
737
  if ((statement != NULL) && (! error))
738
    finalizeStatementMessage(*statement, session);
1860.2.9 by Stewart Smith
endStatement() not being called at end of statement, breaking READ COMMITTED isolation level in innobase plugin
739
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
740
  if (session->transaction.stmt.getResourceContexts().empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
741
  {
1860.2.9 by Stewart Smith
endStatement() not being called at end of statement, breaking READ COMMITTED isolation level in innobase plugin
742
    TransactionContext *trans = &session->transaction.stmt;
743
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
         it != resource_contexts.end();
746
         ++it)
747
    {
748
      ResourceContext *resource_context= *it;
749
750
      resource_context->getTransactionalStorageEngine()->endStatement(session);
751
    }
752
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
753
    if (! error)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
754
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
755
      if (commitTransaction(session, false))
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
756
        error= 1;
757
    }
758
    else
759
    {
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
760
      (void) rollbackTransaction(session, false);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
761
      if (session->transaction_rollback_request)
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
762
        (void) rollbackTransaction(session, true);
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
763
    }
764
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
765
    session->variables.tx_isolation= session->session_tx_isolation;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
766
  }
767
  return error;
768
}
769
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
770
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
771
{
772
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
773
  {
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
774
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
775
     * resources aren't the same... */
776
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
777
  }
778
};
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
779
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
780
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
781
{
782
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
783
  TransactionContext *trans= &session->transaction.all;
784
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
785
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
786
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
787
  trans->no_2pc= false;
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
788
  /*
789
    rolling back to savepoint in all storage engines that were part of the
790
    transaction when the savepoint was set
791
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
792
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
793
       it != sv_resource_contexts.end();
794
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
795
  {
796
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
797
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
798
799
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
800
801
    if (resource->participatesInSqlTransaction())
802
    {
803
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
804
      {
805
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
806
        error= 1;
807
      }
808
      else
809
      {
1689.5.1 by Joseph Daly
remove increment calls
810
        session->status_var.ha_savepoint_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
811
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
812
    }
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
813
    trans->no_2pc|= not resource->participatesInXaTransaction();
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
814
  }
815
  /*
816
    rolling back the transaction in all storage engines that were not part of
817
    the transaction when the savepoint was set
818
  */
819
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
820
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
821
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
822
    TransactionContext::ResourceContexts set_difference_contexts;
823
1491.2.3 by Jay Pipes
Fix for Bug #542299. The cause of the segfault was no pre-allocation of the target vector in set_difference(). Depending on STL implementations, set_difference() calls the STL's copy(), which requires pre-allocation of all targets and destinations. This meant the default constructor for set_difference_contexts was not adequate, and we should call vector<>::reserve() to allocate enough memory for pointers to the target elements.
824
    /* 
825
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
826
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
827
     * here
828
     */
829
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
830
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
831
    sort(sorted_tran_resource_contexts.begin(),
832
         sorted_tran_resource_contexts.end(),
833
         ResourceContextCompare());
834
    sort(sorted_sv_resource_contexts.begin(),
835
         sorted_sv_resource_contexts.end(),
836
         ResourceContextCompare());
837
    set_difference(sorted_tran_resource_contexts.begin(),
838
                   sorted_tran_resource_contexts.end(),
839
                   sorted_sv_resource_contexts.begin(),
840
                   sorted_sv_resource_contexts.end(),
841
                   set_difference_contexts.begin(),
842
                   ResourceContextCompare());
843
    /* 
844
     * set_difference_contexts now contains all resource contexts
845
     * which are in the transaction context but were NOT in the
846
     * savepoint's resource contexts.
847
     */
848
        
849
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
850
         it != set_difference_contexts.end();
851
         ++it)
852
    {
853
      ResourceContext *resource_context= *it;
854
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
855
856
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
857
858
      if (resource->participatesInSqlTransaction())
859
      {
860
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
861
        {
862
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
863
          error= 1;
864
        }
865
        else
866
        {
1689.5.1 by Joseph Daly
remove increment calls
867
          session->status_var.ha_rollback_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
868
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
869
      }
870
      resource_context->reset(); /* keep it conveniently zero-filled */
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
871
    }
872
  }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
873
  trans->setResourceContexts(sv_resource_contexts);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
874
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
875
  if (shouldConstructMessages())
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
876
  {
877
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
878
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
879
    if (savepoint_transaction != NULL)
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
880
    {
1762.2.5 by Joseph Daly
add descriptive comments and use savepoint_transaction_copy to determine statement size
881
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
882
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
883
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
884
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
885
      */ 
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
886
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
1762.2.5 by Joseph Daly
add descriptive comments and use savepoint_transaction_copy to determine statement size
887
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
888
      if (num_statements == 0)
889
      {    
890
        session->setStatementMessage(NULL);
891
      }    
892
      else 
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
893
      {
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
894
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
895
      }    
896
      session->setTransactionMessage(savepoint_transaction_copy);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
897
    }
898
  }
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
899
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
900
  return error;
901
}
902
903
/**
904
  @note
905
  according to the sql standard (ISO/IEC 9075-2:2003)
906
  section "4.33.4 SQL-statements and transaction states",
1273.1.4 by Jay Pipes
This patch significantly reworks the way that
907
  NamedSavepoint is *not* transaction-initiating SQL-statement
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
908
*/
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
909
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
910
{
911
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
912
  TransactionContext *trans= &session->transaction.all;
913
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
914
915
  if (resource_contexts.empty() == false)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
916
  {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
917
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
918
         it != resource_contexts.end();
919
         ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
920
    {
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
921
      ResourceContext *resource_context= *it;
922
      int err;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
923
924
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
925
926
      if (resource->participatesInSqlTransaction())
927
      {
928
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
929
        {
930
          my_error(ER_GET_ERRNO, MYF(0), err);
931
          error= 1;
932
        }
933
        else
934
        {
1689.5.1 by Joseph Daly
remove increment calls
935
          session->status_var.ha_savepoint_count++;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
936
        }
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
937
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
938
    }
939
  }
940
  /*
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
941
    Remember the list of registered storage engines.
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
942
  */
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
943
  sv.setResourceContexts(resource_contexts);
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
944
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
945
  if (shouldConstructMessages())
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
946
  {
1746.7.5 by Joseph Daly
fix to get tests working with release savepoint
947
    message::Transaction *transaction= session->getTransactionMessage();
948
                  
949
    if (transaction != NULL)
950
    {
951
      message::Transaction *transaction_savepoint= 
952
        new message::Transaction(*transaction);
1762.2.1 by Joseph Daly
fix up bugs 638518, and memory leak problems
953
      sv.setTransactionMessage(transaction_savepoint);
1746.7.5 by Joseph Daly
fix to get tests working with release savepoint
954
    }
1746.7.1 by Joseph Daly
handle rollback to savepoint properly 600032
955
  } 
956
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
957
  return error;
958
}
959
1405.3.5 by Jay Pipes
TransactionServices method names now meet code style guidelines.
960
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
961
{
962
  int error= 0;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
963
964
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
965
966
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
967
       it != resource_contexts.end();
968
       ++it)
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
969
  {
970
    int err;
1273.1.10 by Jay Pipes
* Renames Ha_trx_info to drizzled::ResourceContext
971
    ResourceContext *resource_context= *it;
1273.1.30 by Jay Pipes
* Completes the blueprint for splitting the XA Resource Manager
972
973
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
974
975
    if (resource->participatesInSqlTransaction())
976
    {
977
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
978
      {
979
        my_error(ER_GET_ERRNO, MYF(0), err);
980
        error= 1;
981
      }
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
982
    }
983
  }
1746.7.4 by Joseph Daly
add test, and use shouldConstructMessages() rather then calling the singleton each time
984
  
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
985
  return error;
986
}
987
1405.4.10 by Jay Pipes
OK, Sun Studio still didn't like that...seems to think that inline means something different than other compilers think it is...
988
bool TransactionServices::shouldConstructMessages()
989
{
990
  ReplicationServices &replication_services= ReplicationServices::singleton();
991
  return replication_services.isActive();
992
}
993
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
994
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
995
{
996
  message::Transaction *transaction= in_session->getTransactionMessage();
997
998
  if (unlikely(transaction == NULL))
999
  {
1000
    /* 
1001
     * Allocate and initialize a new transaction message 
1002
     * for this Session object.  Session is responsible for
1003
     * deleting transaction message when done with it.
1004
     */
1005
    transaction= new (nothrow) message::Transaction();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1006
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1007
    in_session->setTransactionMessage(transaction);
1008
    return transaction;
1009
  }
1010
  else
1011
    return transaction;
1012
}
1013
1014
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1015
                                                 Session *in_session,
1016
                                                 bool should_inc_trx_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1017
{
1018
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
  trx->set_server_id(in_session->getServerId());
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1020
1021
  if (should_inc_trx_id)
1856.2.9 by Joseph Daly
zero out transaction id after its set in the GPB message
1022
  {
1856.2.1 by Joseph Daly
use transaction_id in innodb for transaction log
1023
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1802.17.21 by Joseph Daly
revert change
1024
    in_session->setXaId(0);
1856.2.9 by Joseph Daly
zero out transaction id after its set in the GPB message
1025
  }  
1026
  else
1027
  { 
1856.2.7 by Joseph Daly
create schema changes
1028
    trx->set_transaction_id(0);
1856.2.9 by Joseph Daly
zero out transaction id after its set in the GPB message
1029
  }
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1030
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1031
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1032
}
1033
1034
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
                                              Session *in_session)
1036
{
1037
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1039
}
1040
1041
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
                                             Session *in_session)
1043
{
1044
  delete in_transaction;
1045
  in_session->setStatementMessage(NULL);
1046
  in_session->setTransactionMessage(NULL);
1926.3.1 by Joseph Daly
merge changes to fix duplicate trx ids for raw sql 674588
1047
  in_session->setXaId(0);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1048
}
1049
1405.3.6 by Jay Pipes
Here, we do two main things:
1050
int TransactionServices::commitTransactionMessage(Session *in_session)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1051
{
1052
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
  if (! replication_services.isActive())
1405.3.6 by Jay Pipes
Here, we do two main things:
1054
    return 0;
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1055
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
1056
  /*
1057
   * If no Transaction message was ever created, then no data modification
1058
   * occurred inside the transaction, so nothing to do.
1059
   */
1060
  if (in_session->getTransactionMessage() == NULL)
1061
    return 0;
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1062
  
1976.8.2 by David Shrewsbury
Support rollback of segmented statements ; removed unnecessary code for --replicate-query (didn't work correctly anyway).
1063
  /* If there is an active statement message, finalize it. */
1064
  message::Statement *statement= in_session->getStatementMessage();
1065
1066
  if (statement != NULL)
1067
  {
1068
    finalizeStatementMessage(*statement, in_session);
1069
  }
1070
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1071
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1072
1976.8.2 by David Shrewsbury
Support rollback of segmented statements ; removed unnecessary code for --replicate-query (didn't work correctly anyway).
1073
  /*
1074
   * It is possible that we could have a Transaction without any Statements
1075
   * if we had created a Statement but had to roll it back due to it failing
1076
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
   * message. In this case, we simply clean up the message and not push it.
1078
   */
1079
  if (transaction->statement_size() == 0)
1080
  {
1081
    cleanupTransactionMessage(transaction, in_session);
1082
    return 0;
1083
  }
1084
  
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1085
  finalizeTransactionMessage(*transaction, in_session);
1086
  
1405.3.6 by Jay Pipes
Here, we do two main things:
1087
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1088
1089
  cleanupTransactionMessage(transaction, in_session);
1405.3.6 by Jay Pipes
Here, we do two main things:
1090
1091
  return static_cast<int>(result);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1092
}
1093
1094
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
                                        message::Statement::Type in_type,
1096
                                        Session *in_session)
1097
{
1098
  statement.set_type(in_type);
1099
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1938.3.1 by David Shrewsbury
Add --replicate-query option.
1100
1101
  if (in_session->variables.replicate_query)
1102
    statement.set_sql(in_session->getQueryString()->c_str());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1103
}
1104
1105
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
                                            Session *in_session)
1107
{
1108
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
  in_session->setStatementMessage(NULL);
1110
}
1111
1112
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1113
{
1114
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
  if (! replication_services.isActive())
1116
    return;
1117
  
1118
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1119
1120
  /*
1121
   * OK, so there are two situations that we need to deal with here:
1122
   *
1123
   * 1) We receive an instruction to ROLLBACK the current transaction
1124
   *    and the currently-stored Transaction message is *self-contained*, 
1125
   *    meaning that no Statement messages in the Transaction message
1126
   *    contain a message having its segment_id member greater than 1.  If
1127
   *    no non-segment ID 1 members are found, we can simply clear the
1128
   *    current Transaction message and remove it from memory.
1129
   *
1130
   * 2) If the Transaction message does indeed have a non-end segment, that
1131
   *    means that a bulk update/delete/insert Transaction message segment
1132
   *    has previously been sent over the wire to replicators.  In this case, 
1133
   *    we need to package a Transaction with a Statement message of type
1134
   *    ROLLBACK to indicate to replicators that previously-transmitted
1135
   *    messages must be un-applied.
1136
   */
1137
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1138
  {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1139
    /* Remember the transaction ID so we can re-use it */
1140
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1142
    /*
1143
     * Clear the transaction, create a Rollback statement message, 
1144
     * attach it to the transaction, and push it to replicators.
1145
     */
1146
    transaction->Clear();
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1147
    initTransactionMessage(*transaction, in_session, false);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1148
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1149
    /* Set the transaction ID to match the previous messages */
1150
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1151
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1152
    message::Statement *statement= transaction->add_statement();
1153
1154
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
    finalizeStatementMessage(*statement, in_session);
1156
1157
    finalizeTransactionMessage(*transaction, in_session);
1158
    
1405.3.3 by Jay Pipes
Adds Session reference to replication API
1159
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1160
  }
1161
  cleanupTransactionMessage(transaction, in_session);
1162
}
1163
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
1164
void TransactionServices::rollbackStatementMessage(Session *in_session)
1165
{
1166
  ReplicationServices &replication_services= ReplicationServices::singleton();
1167
  if (! replication_services.isActive())
1168
    return;
1169
1976.8.2 by David Shrewsbury
Support rollback of segmented statements ; removed unnecessary code for --replicate-query (didn't work correctly anyway).
1170
  message::Statement *current_statement= in_session->getStatementMessage();
1171
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
1172
  /* If we never added a Statement message, nothing to undo. */
1976.8.2 by David Shrewsbury
Support rollback of segmented statements ; removed unnecessary code for --replicate-query (didn't work correctly anyway).
1173
  if (current_statement == NULL)
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
1174
    return;
1175
1976.8.2 by David Shrewsbury
Support rollback of segmented statements ; removed unnecessary code for --replicate-query (didn't work correctly anyway).
1176
  /*
1177
   * If the Statement has been segmented, then we've already pushed a portion
1178
   * of this Statement's row changes through the replication stream and we
1179
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
   * delete the current Statement message.
1181
   */
1182
  bool is_segmented= false;
1183
1184
  switch (current_statement->type())
1185
  {
1186
    case message::Statement::INSERT:
1187
      if (current_statement->insert_data().segment_id() > 1)
1188
        is_segmented= true;
1189
      break;
1190
1191
    case message::Statement::UPDATE:
1192
      if (current_statement->update_data().segment_id() > 1)
1193
        is_segmented= true;
1194
      break;
1195
1196
    case message::Statement::DELETE:
1197
      if (current_statement->delete_data().segment_id() > 1)
1198
        is_segmented= true;
1199
      break;
1200
1201
    default:
1202
      break;
1203
  }
1204
1205
  /*
1206
   * Remove the Statement message we've been working with (same as
1207
   * current_statement).
1208
   */
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
1209
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
  statements_in_txn= transaction->mutable_statement();
1212
  statements_in_txn->RemoveLast();
1213
  in_session->setStatementMessage(NULL);
1976.8.2 by David Shrewsbury
Support rollback of segmented statements ; removed unnecessary code for --replicate-query (didn't work correctly anyway).
1214
  
1215
  /*
1216
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
   * an indicator to cancel the previous Statement message which should have
1218
   * had its end_segment attribute set to false.
1219
   */
1220
  if (is_segmented)
1221
  {
1222
    current_statement= transaction->add_statement();
1223
    initStatementMessage(*current_statement,
1224
                         message::Statement::ROLLBACK_STATEMENT,
1225
                         in_session);
1226
    finalizeStatementMessage(*current_statement, in_session);
1227
  }
1976.8.1 by David Shrewsbury
Add ability to rollback a statement, change to use 1 GPB Statement message per SQL statement. NOTE: segmented statements not handled yet.
1228
}
1229
  
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1230
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1231
                                                            Table *in_table,
1232
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1233
{
1234
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1235
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1236
1237
  /* 
1238
   * Check the type for the current Statement message, if it is anything
1239
   * other then INSERT we need to call finalize, this will ensure a 
1240
   * new InsertStatement is created. If it is of type INSERT check
1241
   * what table the INSERT belongs to, if it is a different table
1242
   * call finalize, so a new InsertStatement can be created. 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1243
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1244
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1245
  {
1246
    finalizeStatementMessage(*statement, in_session);
1247
    statement= in_session->getStatementMessage();
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1248
  } 
1249
  else if (statement != NULL)
1250
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1251
    transaction= getActiveTransactionMessage(in_session);
1252
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1253
    /*
1254
     * If we've passed our threshold for the statement size (possible for
1255
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1256
     * the Transaction will keep it from getting huge).
1257
     */
1802.14.1 by Joseph Daly
add variable for gpb size
1258
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1259
      in_session->variables.transaction_message_threshold)
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1260
    {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1261
      /* Remember the transaction ID so we can re-use it */
1262
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1263
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1264
      message::InsertData *current_data= statement->mutable_insert_data();
1265
1266
      /* Caller should use this value when adding a new record */
1267
      *next_segment_id= current_data->segment_id() + 1;
1268
1269
      current_data->set_end_segment(false);
1270
1271
      /* 
1272
       * Send the trx message to replicators after finalizing the 
1273
       * statement and transaction. This will also set the Transaction
1274
       * and Statement objects in Session to NULL.
1275
       */
1276
      commitTransactionMessage(in_session);
1277
1278
      /*
1279
       * Statement and Transaction should now be NULL, so new ones will get
1280
       * created. We reuse the transaction id since we are segmenting
1281
       * one transaction.
1282
       */
1283
      statement= in_session->getStatementMessage();
1284
      transaction= getActiveTransactionMessage(in_session, false);
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1285
      assert(transaction != NULL);
1286
1287
      /* Set the transaction ID to match the previous messages */
1288
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1289
    }
1290
    else
1291
    {
1292
      const message::InsertHeader &insert_header= statement->insert_header();
1293
      string old_table_name= insert_header.table_metadata().table_name();
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1294
     
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1295
      string current_table_name;
1296
      (void) in_table->getShare()->getTableName(current_table_name);
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1297
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1298
      if (current_table_name.compare(old_table_name))
1299
      {
1300
        finalizeStatementMessage(*statement, in_session);
1301
        statement= in_session->getStatementMessage();
1302
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1303
      else
1304
      {
1305
        /* carry forward the existing segment id */
1306
        const message::InsertData &current_data= statement->insert_data();
1307
        *next_segment_id= current_data.segment_id();
1308
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1309
    }
1310
  } 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1311
1312
  if (statement == NULL)
1313
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1314
    /*
1315
     * Transaction will be non-NULL only if we had to segment it due to
1316
     * transaction size above.
1317
     */
1318
    if (transaction == NULL)
1319
      transaction= getActiveTransactionMessage(in_session);
1320
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1321
    /* 
1322
     * Transaction message initialized and set, but no statement created
1323
     * yet.  We construct one and initialize it, here, then return the
1324
     * message after attaching the new Statement message pointer to the 
1325
     * Session for easy retrieval later...
1326
     */
1327
    statement= transaction->add_statement();
1328
    setInsertHeader(*statement, in_session, in_table);
1329
    in_session->setStatementMessage(statement);
1330
  }
1331
  return *statement;
1332
}
1333
1334
void TransactionServices::setInsertHeader(message::Statement &statement,
1335
                                          Session *in_session,
1336
                                          Table *in_table)
1337
{
1338
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1339
1340
  /* 
1341
   * Now we construct the specialized InsertHeader message inside
1342
   * the generalized message::Statement container...
1343
   */
1344
  /* Set up the insert header */
1345
  message::InsertHeader *header= statement.mutable_insert_header();
1346
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1347
1336.2.3 by Jay Pipes
Merge trunk and resolve
1348
  string schema_name;
1349
  (void) in_table->getShare()->getSchemaName(schema_name);
1350
  string table_name;
1351
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1352
1336.2.3 by Jay Pipes
Merge trunk and resolve
1353
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1354
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1355
1356
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1357
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1358
1359
  message::FieldMetadata *field_metadata;
1360
1361
  /* We will read all the table's fields... */
1362
  in_table->setReadSet();
1363
1364
  while ((current_field= *table_fields++) != NULL) 
1365
  {
1366
    field_metadata= header->add_field_metadata();
1367
    field_metadata->set_name(current_field->field_name);
1368
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1369
  }
1370
}
1371
1372
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1373
{
1374
  ReplicationServices &replication_services= ReplicationServices::singleton();
1375
  if (! replication_services.isActive())
1376
    return false;
1377
  /**
1378
   * We do this check here because we don't want to even create a 
1379
   * statement if there isn't a primary key on the table...
1380
   *
1381
   * @todo
1382
   *
1383
   * Multi-column primary keys are handled how exactly?
1384
   */
1618 by Brian Aker
This is a rollup set of patches for modifications to TableIdentifier to have
1385
  if (not in_table->getShare()->hasPrimaryKey())
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1386
  {
1387
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1388
    return true;
1389
  }
1390
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1391
  uint32_t next_segment_id= 1;
1392
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1393
1394
  message::InsertData *data= statement.mutable_insert_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1395
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1396
  data->set_end_segment(true);
1397
  message::InsertRecord *record= data->add_record();
1398
1399
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1400
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1401
1402
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1403
  string_value->set_charset(system_charset_info);
1404
1405
  /* We will read all the table's fields... */
1406
  in_table->setReadSet();
1407
1408
  while ((current_field= *table_fields++) != NULL) 
1409
  {
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1410
    if (current_field->is_null())
1411
    {
1412
      record->add_is_null(true);
1667.3.7 by Joe Daly
merge trunk
1413
      record->add_insert_value("", 0);
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1414
    } 
1415
    else 
1416
    {
1417
      string_value= current_field->val_str(string_value);
1418
      record->add_is_null(false);
1419
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1420
      string_value->free();
1421
    }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1422
  }
1423
  return false;
1424
}
1425
1426
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1427
                                                            Table *in_table,
1428
                                                            const unsigned char *old_record, 
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1429
                                                            const unsigned char *new_record,
1430
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1431
{
1432
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1433
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1434
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1435
  /*
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1436
   * Check the type for the current Statement message, if it is anything
1437
   * other then UPDATE we need to call finalize, this will ensure a
1438
   * new UpdateStatement is created. If it is of type UPDATE check
1439
   * what table the UPDATE belongs to, if it is a different table
1440
   * call finalize, so a new UpdateStatement can be created.
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1441
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1442
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1443
  {
1444
    finalizeStatementMessage(*statement, in_session);
1445
    statement= in_session->getStatementMessage();
1446
  }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1447
  else if (statement != NULL)
1448
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1449
    transaction= getActiveTransactionMessage(in_session);
1450
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1451
    /*
1452
     * If we've passed our threshold for the statement size (possible for
1453
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1454
     * the Transaction will keep it from getting huge).
1455
     */
1802.14.1 by Joseph Daly
add variable for gpb size
1456
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
      in_session->variables.transaction_message_threshold)
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1458
    {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1459
      /* Remember the transaction ID so we can re-use it */
1460
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1461
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1462
      message::UpdateData *current_data= statement->mutable_update_data();
1463
1464
      /* Caller should use this value when adding a new record */
1465
      *next_segment_id= current_data->segment_id() + 1;
1466
1467
      current_data->set_end_segment(false);
1468
1469
      /*
1470
       * Send the trx message to replicators after finalizing the 
1471
       * statement and transaction. This will also set the Transaction
1472
       * and Statement objects in Session to NULL.
1473
       */
1474
      commitTransactionMessage(in_session);
1475
1476
      /*
1477
       * Statement and Transaction should now be NULL, so new ones will get
1478
       * created. We reuse the transaction id since we are segmenting
1479
       * one transaction.
1480
       */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1481
      statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1482
      transaction= getActiveTransactionMessage(in_session, false);
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1483
      assert(transaction != NULL);
1484
1485
      /* Set the transaction ID to match the previous messages */
1486
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1487
    }
1488
    else
1489
    {
1821.2.1 by Joseph Daly
fix 655352
1490
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1491
      {
1492
        /* carry forward the existing segment id */
1493
        const message::UpdateData &current_data= statement->update_data();
1494
        *next_segment_id= current_data.segment_id();
1821.2.1 by Joseph Daly
fix 655352
1495
      } 
1496
      else 
1497
      {
1498
        finalizeStatementMessage(*statement, in_session);
1499
        statement= in_session->getStatementMessage();
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1500
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1501
    }
1502
  }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1503
1504
  if (statement == NULL)
1505
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1506
    /*
1507
     * Transaction will be non-NULL only if we had to segment it due to
1508
     * transaction size above.
1509
     */
1510
    if (transaction == NULL)
1511
      transaction= getActiveTransactionMessage(in_session);
1512
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1513
    /* 
1514
     * Transaction message initialized and set, but no statement created
1515
     * yet.  We construct one and initialize it, here, then return the
1516
     * message after attaching the new Statement message pointer to the 
1517
     * Session for easy retrieval later...
1518
     */
1519
    statement= transaction->add_statement();
1520
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
    in_session->setStatementMessage(statement);
1522
  }
1523
  return *statement;
1524
}
1525
1821.2.1 by Joseph Daly
fix 655352
1526
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1527
                                                  Table *in_table,
1528
                                                  const unsigned char *old_record,
1529
                                                  const unsigned char *new_record)
1530
{
1531
  const message::UpdateHeader &update_header= statement.update_header();
1532
  string old_table_name= update_header.table_metadata().table_name();
1533
1534
  string current_table_name;
1535
  (void) in_table->getShare()->getTableName(current_table_name);
1536
  if (current_table_name.compare(old_table_name))
1537
  {
1538
    return false;
1539
  }
1540
  else
1541
  {
1542
    /* Compare the set fields in the existing UpdateHeader and see if they
1543
     * match the updated fields in the new record, if they do not we must
1544
     * create a new UpdateHeader 
1545
     */
1546
    size_t num_set_fields= update_header.set_field_metadata_size();
1547
1548
    Field *current_field;
1549
    Field **table_fields= in_table->getFields();
1550
    in_table->setReadSet();
1551
1552
    size_t num_calculated_updated_fields= 0;
1821.2.2 by Joseph Daly
merge trunk, update test
1553
    bool found= false;
1821.2.1 by Joseph Daly
fix 655352
1554
    while ((current_field= *table_fields++) != NULL)
1555
    {
1556
      if (num_calculated_updated_fields > num_set_fields)
1557
      {
1558
        break;
1559
      }
1560
1561
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
1562
      {
1563
        /* check that this field exists in the UpdateHeader record */
1821.2.2 by Joseph Daly
merge trunk, update test
1564
        found= false;
1821.2.1 by Joseph Daly
fix 655352
1565
1566
        for (size_t x= 0; x < num_set_fields; ++x)
1567
        {
1568
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
          string name= field_metadata.name();
1570
          if (name.compare(current_field->field_name) == 0)
1571
          {
1572
            found= true;
1573
            ++num_calculated_updated_fields;
1574
            break;
1575
          } 
1576
        }
1577
        if (! found)
1578
        {
1579
          break;
1580
        } 
1581
      }
1582
    }
1583
1821.2.2 by Joseph Daly
merge trunk, update test
1584
    if ((num_calculated_updated_fields == num_set_fields) && found)
1821.2.1 by Joseph Daly
fix 655352
1585
    {
1586
      return true;
1587
    } 
1588
    else 
1589
    {
1590
      return false;
1591
    }
1592
  }
1593
}  
1594
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1595
void TransactionServices::setUpdateHeader(message::Statement &statement,
1596
                                          Session *in_session,
1597
                                          Table *in_table,
1598
                                          const unsigned char *old_record, 
1599
                                          const unsigned char *new_record)
1600
{
1601
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1602
1603
  /* 
1604
   * Now we construct the specialized UpdateHeader message inside
1605
   * the generalized message::Statement container...
1606
   */
1607
  /* Set up the update header */
1608
  message::UpdateHeader *header= statement.mutable_update_header();
1609
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1610
1336.2.3 by Jay Pipes
Merge trunk and resolve
1611
  string schema_name;
1612
  (void) in_table->getShare()->getSchemaName(schema_name);
1613
  string table_name;
1614
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1615
1336.2.3 by Jay Pipes
Merge trunk and resolve
1616
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1617
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1618
1619
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1620
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1621
1622
  message::FieldMetadata *field_metadata;
1623
1624
  /* We will read all the table's fields... */
1625
  in_table->setReadSet();
1626
1627
  while ((current_field= *table_fields++) != NULL) 
1628
  {
1629
    /*
1630
     * We add the "key field metadata" -- i.e. the fields which is
1631
     * the primary key for the table.
1632
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1633
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1634
    {
1635
      field_metadata= header->add_key_field_metadata();
1636
      field_metadata->set_name(current_field->field_name);
1637
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1638
    }
1639
1795.2.2 by Joseph Daly
add test and add isFieldUpdated function
1640
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1641
    {
1642
      /* Field is changed from old to new */
1643
      field_metadata= header->add_set_field_metadata();
1644
      field_metadata->set_name(current_field->field_name);
1645
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1646
    }
1647
  }
1648
}
1649
void TransactionServices::updateRecord(Session *in_session,
1650
                                       Table *in_table, 
1651
                                       const unsigned char *old_record, 
1652
                                       const unsigned char *new_record)
1653
{
1654
  ReplicationServices &replication_services= ReplicationServices::singleton();
1655
  if (! replication_services.isActive())
1656
    return;
1657
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1658
  uint32_t next_segment_id= 1;
1659
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1660
1661
  message::UpdateData *data= statement.mutable_update_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1662
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1663
  data->set_end_segment(true);
1664
  message::UpdateRecord *record= data->add_record();
1665
1666
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1667
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1668
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1669
  string_value->set_charset(system_charset_info);
1670
1671
  while ((current_field= *table_fields++) != NULL) 
1672
  {
1673
    /*
1674
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1675
     * but then realized that an UPDATE statement could potentially have different values for
1676
     * the SET field.  For instance, imagine this SQL scenario:
1677
     *
1678
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1679
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1680
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1681
     *
1682
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1683
     */
1795.2.2 by Joseph Daly
add test and add isFieldUpdated function
1684
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1685
    {
1686
      /* Store the original "read bit" for this field */
1687
      bool is_read_set= current_field->isReadSet();
1688
1689
      /* We need to mark that we will "read" this field... */
1690
      in_table->setReadSet(current_field->field_index);
1691
1692
      /* Read the string value of this field's contents */
1693
      string_value= current_field->val_str(string_value);
1694
1695
      /* 
1696
       * Reset the read bit after reading field to its original state.  This 
1697
       * prevents the field from being included in the WHERE clause
1698
       */
1699
      current_field->setReadSet(is_read_set);
1700
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1701
      if (current_field->is_null())
1702
      {
1703
        record->add_is_null(true);
1667.3.7 by Joe Daly
merge trunk
1704
        record->add_after_value("", 0);
1667.3.6 by Joe Daly
bug 594873 fix null handling in enum in the transaction log
1705
      }
1706
      else
1707
      {
1708
        record->add_is_null(false);
1709
        record->add_after_value(string_value->c_ptr(), string_value->length());
1710
      }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1711
      string_value->free();
1712
    }
1713
1714
    /* 
1715
     * Add the WHERE clause values now...for now, this means the
1716
     * primary key field value.  Replication only supports tables
1717
     * with a primary key.
1718
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1719
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1720
    {
1721
      /**
1722
       * To say the below is ugly is an understatement. But it works.
1723
       * 
1724
       * @todo Move this crap into a real Record API.
1725
       */
1726
      string_value= current_field->val_str(string_value,
1727
                                           old_record + 
1728
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
1729
      record->add_key_value(string_value->c_ptr(), string_value->length());
1730
      string_value->free();
1731
    }
1732
1733
  }
1734
}
1735
1795.2.2 by Joseph Daly
add test and add isFieldUpdated function
1736
bool TransactionServices::isFieldUpdated(Field *current_field,
1737
                                         Table *in_table,
1738
                                         const unsigned char *old_record,
1739
                                         const unsigned char *new_record)
1740
{
1741
  /*
1742
   * The below really should be moved into the Field API and Record API.  But for now
1743
   * we do this crazy pointer fiddling to figure out if the current field
1744
   * has been updated in the supplied record raw byte pointers.
1745
   */
1746
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1747
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1748
1749
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1750
1751
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1752
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1753
1754
  bool isUpdated= false;
1755
  if (old_value_is_null != new_value_is_null)
1756
  {
1757
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1758
    {
1759
      isUpdated= true;
1760
    }
1761
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1762
    {
1763
      isUpdated= true;
1764
    }
1765
  }
1766
1767
  if (! isUpdated)
1768
  {
1769
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1770
    {
1771
      isUpdated= true;
1772
    }
1773
  }
1774
  return isUpdated;
1775
}  
1776
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1777
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1778
                                                            Table *in_table,
1779
                                                            uint32_t *next_segment_id)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1780
{
1781
  message::Statement *statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1782
  message::Transaction *transaction= NULL;
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1783
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1784
  /*
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1785
   * Check the type for the current Statement message, if it is anything
1786
   * other then DELETE we need to call finalize, this will ensure a
1787
   * new DeleteStatement is created. If it is of type DELETE check
1788
   * what table the DELETE belongs to, if it is a different table
1789
   * call finalize, so a new DeleteStatement can be created.
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1790
   */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1791
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1792
  {
1793
    finalizeStatementMessage(*statement, in_session);
1794
    statement= in_session->getStatementMessage();
1795
  }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1796
  else if (statement != NULL)
1797
  {
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1798
    transaction= getActiveTransactionMessage(in_session);
1799
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1800
    /*
1801
     * If we've passed our threshold for the statement size (possible for
1802
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1803
     * the Transaction will keep it from getting huge).
1804
     */
1802.14.1 by Joseph Daly
add variable for gpb size
1805
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1806
      in_session->variables.transaction_message_threshold)
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1807
    {
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1808
      /* Remember the transaction ID so we can re-use it */
1809
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1810
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1811
      message::DeleteData *current_data= statement->mutable_delete_data();
1812
1813
      /* Caller should use this value when adding a new record */
1814
      *next_segment_id= current_data->segment_id() + 1;
1815
1816
      current_data->set_end_segment(false);
1817
1818
      /* 
1819
       * Send the trx message to replicators after finalizing the 
1820
       * statement and transaction. This will also set the Transaction
1821
       * and Statement objects in Session to NULL.
1822
       */
1823
      commitTransactionMessage(in_session);
1824
1825
      /*
1826
       * Statement and Transaction should now be NULL, so new ones will get
1827
       * created. We reuse the transaction id since we are segmenting
1828
       * one transaction.
1829
       */
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1830
      statement= in_session->getStatementMessage();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1831
      transaction= getActiveTransactionMessage(in_session, false);
1780.1.1 by David Shrewsbury
Set transaction ID properly on large, multi-Transaction message transactions. Also add bulk load tests.
1832
      assert(transaction != NULL);
1833
1834
      /* Set the transaction ID to match the previous messages */
1835
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1836
    }
1837
    else
1838
    {
1839
      const message::DeleteHeader &delete_header= statement->delete_header();
1840
      string old_table_name= delete_header.table_metadata().table_name();
1841
1842
      string current_table_name;
1843
      (void) in_table->getShare()->getTableName(current_table_name);
1844
      if (current_table_name.compare(old_table_name))
1845
      {
1846
        finalizeStatementMessage(*statement, in_session);
1847
        statement= in_session->getStatementMessage();
1848
      }
1760.2.1 by David Shrewsbury
Make sure that trx msg threshold is checked against Transaction message size; make sure that segment ids are not reset when they shouldn't be
1849
      else
1850
      {
1851
        /* carry forward the existing segment id */
1852
        const message::DeleteData &current_data= statement->delete_data();
1853
        *next_segment_id= current_data.segment_id();
1854
      }
1662.3.1 by Joe Daly
fix a problem with multiple tables being updated in the same transaction these should go into seperate records
1855
    }
1856
  }
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1857
1858
  if (statement == NULL)
1859
  {
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1860
    /*
1861
     * Transaction will be non-NULL only if we had to segment it due to
1862
     * transaction size above.
1863
     */
1864
    if (transaction == NULL)
1865
      transaction= getActiveTransactionMessage(in_session);
1866
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1867
    /* 
1868
     * Transaction message initialized and set, but no statement created
1869
     * yet.  We construct one and initialize it, here, then return the
1870
     * message after attaching the new Statement message pointer to the 
1871
     * Session for easy retrieval later...
1872
     */
1873
    statement= transaction->add_statement();
1874
    setDeleteHeader(*statement, in_session, in_table);
1875
    in_session->setStatementMessage(statement);
1876
  }
1877
  return *statement;
1878
}
1879
1880
void TransactionServices::setDeleteHeader(message::Statement &statement,
1881
                                          Session *in_session,
1882
                                          Table *in_table)
1883
{
1884
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1885
1886
  /* 
1887
   * Now we construct the specialized DeleteHeader message inside
1888
   * the generalized message::Statement container...
1889
   */
1890
  message::DeleteHeader *header= statement.mutable_delete_header();
1891
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1892
1336.2.3 by Jay Pipes
Merge trunk and resolve
1893
  string schema_name;
1894
  (void) in_table->getShare()->getSchemaName(schema_name);
1895
  string table_name;
1896
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1897
1336.2.3 by Jay Pipes
Merge trunk and resolve
1898
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1899
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1900
1901
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1902
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1903
1904
  message::FieldMetadata *field_metadata;
1905
1906
  while ((current_field= *table_fields++) != NULL) 
1907
  {
1908
    /* 
1909
     * Add the WHERE clause values now...for now, this means the
1910
     * primary key field value.  Replication only supports tables
1911
     * with a primary key.
1912
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1913
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1914
    {
1915
      field_metadata= header->add_key_field_metadata();
1916
      field_metadata->set_name(current_field->field_name);
1917
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1918
    }
1919
  }
1920
}
1921
1730.5.1 by David Shrewsbury
Use the update record, not insert record, when record DELETE operations during REPLACE.
1922
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1923
{
1924
  ReplicationServices &replication_services= ReplicationServices::singleton();
1925
  if (! replication_services.isActive())
1926
    return;
1927
1746.1.1 by David Shrewsbury
Fix uninitialized variable.
1928
  uint32_t next_segment_id= 1;
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1929
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1930
1931
  message::DeleteData *data= statement.mutable_delete_data();
1719.3.1 by David Shrewsbury
Fix to capture INSERTs from a LOAD DATA command in the replication stream.
1932
  data->set_segment_id(next_segment_id);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1933
  data->set_end_segment(true);
1934
  message::DeleteRecord *record= data->add_record();
1935
1936
  Field *current_field;
1578.2.16 by Brian Aker
Merge in change to getTable() to private the field objects.
1937
  Field **table_fields= in_table->getFields();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1938
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1939
  string_value->set_charset(system_charset_info);
1940
1941
  while ((current_field= *table_fields++) != NULL) 
1942
  {
1667.3.5 by Joe Daly
fix up spacing again
1943
    /* 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1944
     * Add the WHERE clause values now...for now, this means the
1945
     * primary key field value.  Replication only supports tables
1946
     * with a primary key.
1947
     */
1574 by Brian Aker
Rollup patch for hiding tableshare.
1948
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1949
    {
1730.5.1 by David Shrewsbury
Use the update record, not insert record, when record DELETE operations during REPLACE.
1950
      if (use_update_record)
1951
      {
1952
        /*
1953
         * Temporarily point to the update record to get its value.
1954
         * This is pretty much a hack in order to get the PK value from
1955
         * the update record rather than the insert record. Field::val_str()
1956
         * should not change anything in Field::ptr, so this should be safe.
1957
         * We are careful not to change anything in old_ptr.
1958
         */
1959
        const unsigned char *old_ptr= current_field->ptr;
1960
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1961
        string_value= current_field->val_str(string_value);
1962
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1963
      }
1964
      else
1965
      {
1966
        string_value= current_field->val_str(string_value);
1967
        /**
1968
         * @TODO Store optional old record value in the before data member
1969
         */
1970
      }
1667.3.2 by Joe Daly
add a working test case, and more fixes so the test case works
1971
      record->add_key_value(string_value->c_ptr(), string_value->length());
1972
      string_value->free();
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
1973
    }
1974
  }
1975
}
1976
1977
void TransactionServices::createTable(Session *in_session,
1978
                                      const message::Table &table)
1979
{
1980
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
  if (! replication_services.isActive())
1982
    return;
1983
  
1984
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1985
  message::Statement *statement= transaction->add_statement();
1986
1987
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1988
1989
  /* 
1990
   * Construct the specialized CreateTableStatement message and attach
1991
   * it to the generic Statement message
1992
   */
1993
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1994
  message::Table *new_table_message= create_table_statement->mutable_table();
1995
  *new_table_message= table;
1996
1997
  finalizeStatementMessage(*statement, in_session);
1998
1999
  finalizeTransactionMessage(*transaction, in_session);
2000
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
2001
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2002
2003
  cleanupTransactionMessage(transaction, in_session);
2004
2005
}
2006
2007
void TransactionServices::createSchema(Session *in_session,
2008
                                       const message::Schema &schema)
2009
{
2010
  ReplicationServices &replication_services= ReplicationServices::singleton();
2011
  if (! replication_services.isActive())
2012
    return;
2013
  
2014
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2015
  message::Statement *statement= transaction->add_statement();
2016
2017
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
2018
2019
  /* 
2020
   * Construct the specialized CreateSchemaStatement message and attach
2021
   * it to the generic Statement message
2022
   */
2023
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
2024
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2025
  *new_schema_message= schema;
2026
2027
  finalizeStatementMessage(*statement, in_session);
2028
2029
  finalizeTransactionMessage(*transaction, in_session);
2030
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
2031
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2032
2033
  cleanupTransactionMessage(transaction, in_session);
2034
2035
}
2036
2037
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
2038
{
2039
  ReplicationServices &replication_services= ReplicationServices::singleton();
2040
  if (! replication_services.isActive())
2041
    return;
2042
  
2043
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2044
  message::Statement *statement= transaction->add_statement();
2045
2046
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
2047
2048
  /* 
2049
   * Construct the specialized DropSchemaStatement message and attach
2050
   * it to the generic Statement message
2051
   */
2052
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2053
2054
  drop_schema_statement->set_schema_name(schema_name);
2055
2056
  finalizeStatementMessage(*statement, in_session);
2057
2058
  finalizeTransactionMessage(*transaction, in_session);
2059
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
2060
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2061
2062
  cleanupTransactionMessage(transaction, in_session);
2063
}
2064
2065
void TransactionServices::dropTable(Session *in_session,
2066
                                    const string &schema_name,
2067
                                    const string &table_name,
2068
                                    bool if_exists)
2069
{
2070
  ReplicationServices &replication_services= ReplicationServices::singleton();
2071
  if (! replication_services.isActive())
2072
    return;
2073
  
2074
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2075
  message::Statement *statement= transaction->add_statement();
2076
2077
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2078
2079
  /* 
2080
   * Construct the specialized DropTableStatement message and attach
2081
   * it to the generic Statement message
2082
   */
2083
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2084
2085
  drop_table_statement->set_if_exists_clause(if_exists);
2086
2087
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2088
2089
  table_metadata->set_schema_name(schema_name);
2090
  table_metadata->set_table_name(table_name);
2091
2092
  finalizeStatementMessage(*statement, in_session);
2093
2094
  finalizeTransactionMessage(*transaction, in_session);
2095
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
2096
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2097
2098
  cleanupTransactionMessage(transaction, in_session);
2099
}
2100
2101
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2102
{
2103
  ReplicationServices &replication_services= ReplicationServices::singleton();
2104
  if (! replication_services.isActive())
2105
    return;
2106
  
2107
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2108
  message::Statement *statement= transaction->add_statement();
2109
2110
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2111
2112
  /* 
2113
   * Construct the specialized TruncateTableStatement message and attach
2114
   * it to the generic Statement message
2115
   */
2116
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2117
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2118
1336.2.3 by Jay Pipes
Merge trunk and resolve
2119
  string schema_name;
2120
  (void) in_table->getShare()->getSchemaName(schema_name);
2121
  string table_name;
2122
  (void) in_table->getShare()->getTableName(table_name);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2123
1336.2.3 by Jay Pipes
Merge trunk and resolve
2124
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2125
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2126
2127
  finalizeStatementMessage(*statement, in_session);
2128
2129
  finalizeTransactionMessage(*transaction, in_session);
2130
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
2131
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2132
2133
  cleanupTransactionMessage(transaction, in_session);
2134
}
2135
2136
void TransactionServices::rawStatement(Session *in_session, const string &query)
2137
{
2138
  ReplicationServices &replication_services= ReplicationServices::singleton();
2139
  if (! replication_services.isActive())
2140
    return;
1926.3.1 by Joseph Daly
merge changes to fix duplicate trx ids for raw sql 674588
2141
 
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2142
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2143
  message::Statement *statement= transaction->add_statement();
2144
2145
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2146
  statement->set_sql(query);
2147
  finalizeStatementMessage(*statement, in_session);
2148
2149
  finalizeTransactionMessage(*transaction, in_session);
2150
  
1405.3.3 by Jay Pipes
Adds Session reference to replication API
2151
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1336.2.2 by Jay Pipes
NO CODE CHANGES - Simply moves pieces of ReplicationServices to TransactionServices. Preparation for making the ReplicationServices component only responsible for communication between replicators, appliers, publishers, and subscribers.
2152
2153
  cleanupTransactionMessage(transaction, in_session);
2154
}
2155
1819.4.1 by David Shrewsbury
Add server startup and shutdown events to the replication stream.
2156
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2157
{
2158
  ReplicationServices &replication_services= ReplicationServices::singleton();
2159
  if (! replication_services.isActive())
2160
    return 0;
2161
2162
  message::Transaction *transaction= new (nothrow) message::Transaction();
2163
2164
  // set server id, start timestamp
1856.2.14 by Joseph Daly
allow events to use a transaction id
2165
  initTransactionMessage(*transaction, session, true);
1819.4.1 by David Shrewsbury
Add server startup and shutdown events to the replication stream.
2166
2167
  // set end timestamp
2168
  finalizeTransactionMessage(*transaction, session);
2169
2170
  message::Event *trx_event= transaction->mutable_event();
2171
2172
  trx_event->CopyFrom(event);
2173
2174
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2175
2176
  delete transaction;
2177
2178
  return static_cast<int>(result);
2179
}
2180
2181
bool TransactionServices::sendStartupEvent(Session *session)
2182
{
2183
  message::Event event;
2184
  event.set_type(message::Event::STARTUP);
2185
  if (sendEvent(session, event) != 0)
2186
    return false;
2187
  return true;
2188
}
2189
2190
bool TransactionServices::sendShutdownEvent(Session *session)
2191
{
2192
  message::Event event;
2193
  event.set_type(message::Event::SHUTDOWN);
2194
  if (sendEvent(session, event) != 0)
2195
    return false;
2196
  return true;
2197
}
2198
1273.1.2 by Jay Pipes
This patch does not change any algorithms or code paths,
2199
} /* namespace drizzled */