~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2009-03-06 03:33:24 UTC
  • mfrom: (916.1.2 merge)
  • Revision ID: mordred@inaugust.com-20090306033324-dcedf80g9qzywbvu
Merged Brian's merge... re-rotate the tree.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  This program is free software; you can redistribute it and/or modify
8
 
 *  it under the terms of the GNU General Public License as published by
9
 
 *  the Free Software Foundation; version 2 of the License.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
/**
22
 
 * @file Transaction processing code
23
 
 *
24
 
 * @note
25
 
 *
26
 
 * The TransactionServices component takes internal events (for instance the start of a 
27
 
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 
 * component and used during replication.
30
 
 *
31
 
 * The reason for this functionality is to encapsulate all communication
32
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 
 * Instead of the plugin having to understand the (often fluidly changing)
34
 
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 
 * these messages.
37
 
 *
38
 
 * @see /drizzled/message/transaction.proto
39
 
 *
40
 
 * @todo
41
 
 *
42
 
 * We really should store the raw bytes in the messages, not the
43
 
 * String value of the Field.  But, to do that, the
44
 
 * statement_transform library needs first to be updated
45
 
 * to include the transformation code to convert raw
46
 
 * Drizzle-internal Field byte representation into something
47
 
 * plugins can understand.
48
 
 */
49
 
 
50
 
#include "config.h"
51
 
#include "drizzled/my_hash.h"
52
 
#include "drizzled/error.h"
53
 
#include "drizzled/gettext.h"
54
 
#include "drizzled/probes.h"
55
 
#include "drizzled/sql_parse.h"
56
 
#include "drizzled/session.h"
57
 
#include "drizzled/sql_base.h"
58
 
#include "drizzled/replication_services.h"
59
 
#include "drizzled/transaction_services.h"
60
 
#include "drizzled/transaction_context.h"
61
 
#include "drizzled/message/transaction.pb.h"
62
 
#include "drizzled/message/statement_transform.h"
63
 
#include "drizzled/resource_context.h"
64
 
#include "drizzled/lock.h"
65
 
#include "drizzled/item/int.h"
66
 
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/timestamp.h"
68
 
#include "drizzled/plugin/client.h"
69
 
#include "drizzled/plugin/monitored_in_transaction.h"
70
 
#include "drizzled/plugin/transactional_storage_engine.h"
71
 
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/plugin/xa_storage_engine.h"
73
 
#include "drizzled/internal/my_sys.h"
74
 
 
75
 
#include <vector>
76
 
#include <algorithm>
77
 
#include <functional>
78
 
#include <google/protobuf/repeated_field.h>
79
 
 
80
 
using namespace std;
81
 
using namespace google;
82
 
 
83
 
namespace drizzled
84
 
{
85
 
 
86
 
/**
87
 
 * @defgroup Transactions
88
 
 *
89
 
 * @brief
90
 
 *
91
 
 * Transaction handling in the server
92
 
 *
93
 
 * @detail
94
 
 *
95
 
 * In each client connection, Drizzle maintains two transaction
96
 
 * contexts representing the state of the:
97
 
 *
98
 
 * 1) Statement Transaction
99
 
 * 2) Normal Transaction
100
 
 *
101
 
 * These two transaction contexts represent the transactional
102
 
 * state of a Session's SQL and XA transactions for a single
103
 
 * SQL statement or a series of SQL statements.
104
 
 *
105
 
 * When the Session's connection is in AUTOCOMMIT mode, there
106
 
 * is no practical difference between the statement and the
107
 
 * normal transaction, as each SQL statement is committed or
108
 
 * rolled back depending on the success or failure of the
109
 
 * indvidual SQL statement.
110
 
 *
111
 
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
 
 * the Session has explicitly begun a normal SQL transaction using
113
 
 * a BEGIN WORK/START TRANSACTION statement, then the normal
114
 
 * transaction context tracks the aggregate transaction state of
115
 
 * the SQL transaction's individual statements, and the SQL
116
 
 * transaction's commit or rollback is done atomically for all of
117
 
 * the SQL transaction's statement's data changes.
118
 
 *
119
 
 * Technically, a statement transaction can be viewed as a savepoint 
120
 
 * which is maintained automatically in order to make effects of one
121
 
 * statement atomic.
122
 
 *
123
 
 * The normal transaction is started by the user and is typically
124
 
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
 
 * The exception to this is that DDL statements implicitly COMMIT
126
 
 * any previously active normal transaction before they begin executing.
127
 
 *
128
 
 * In Drizzle, unlike MySQL, plugins other than a storage engine
129
 
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
130
 
 * plugins will automatically be monitored by Drizzle's transaction 
131
 
 * manager (implemented in this source file), as will all plugins which
132
 
 * implement plugin::XaResourceManager and register with the transaction
133
 
 * manager.
134
 
 *
135
 
 * If Drizzle's transaction manager sees that more than one resource
136
 
 * manager (transactional storage engine or XA resource manager) has modified
137
 
 * data state during a statement or normal transaction, the transaction
138
 
 * manager will automatically use a two-phase commit protocol for all
139
 
 * resources which support XA's distributed transaction protocol.  Unlike
140
 
 * MySQL, storage engines need not manually register with the transaction
141
 
 * manager during a statement's execution.  Previously, in MySQL, all
142
 
 * handlertons would have to call trans_register_ha() at some point after
143
 
 * modifying data state in order to have MySQL include that handler in
144
 
 * an XA transaction.  Drizzle does all of this grunt work behind the
145
 
 * scenes for the storage engine implementers.
146
 
 *
147
 
 * When a connection is closed, the current normal transaction, if
148
 
 * any is currently active, is rolled back.
149
 
 *
150
 
 * Transaction life cycle
151
 
 * ----------------------
152
 
 *
153
 
 * When a new connection is established, session->transaction
154
 
 * members are initialized to an empty state. If a statement uses any tables, 
155
 
 * all affected engines are registered in the statement engine list automatically
156
 
 * in plugin::StorageEngine::startStatement() and 
157
 
 * plugin::TransactionalStorageEngine::startTransaction().
158
 
 *
159
 
 * You can view the lifetime of a normal transaction in the following
160
 
 * call-sequence:
161
 
 *
162
 
 * drizzled::statement::Statement::execute()
163
 
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
 
 *     drizzled::TransactionServices::registerResourceForTransaction()
165
 
 *     drizzled::TransactionServices::registerResourceForStatement()
166
 
 *     drizzled::plugin::StorageEngine::startStatement()
167
 
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
 
 *     drizzled::plugin::StorageEngine::endStatement()
169
 
 *   drizzled::TransactionServices::autocommitOrRollback()
170
 
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
 
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
172
 
 *
173
 
 * Roles and responsibilities
174
 
 * --------------------------
175
 
 *
176
 
 * Beginning of SQL Statement (and Statement Transaction)
177
 
 * ------------------------------------------------------
178
 
 *
179
 
 * At the start of each SQL statement, for each storage engine
180
 
 * <strong>that is involved in the SQL statement</strong>, the kernel 
181
 
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
182
 
 * engine needs to track some data for the statement, it should use
183
 
 * this method invocation to initialize this data.  This is the
184
 
 * beginning of what is called the "statement transaction".
185
 
 *
186
 
 * <strong>For transaction storage engines (those storage engines
187
 
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
 
 * kernel automatically determines if the start of the SQL statement 
189
 
 * transaction should <em>also</em> begin the normal SQL transaction.
190
 
 * This occurs when the connection is in NOT in autocommit mode. If
191
 
 * the kernel detects this, then the kernel automatically starts the
192
 
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
 
 * method and then calls plugin::StorageEngine::startStatement()
194
 
 * afterwards.
195
 
 *
196
 
 * Beginning of an SQL "Normal" Transaction
197
 
 * ----------------------------------------
198
 
 *
199
 
 * As noted above, a "normal SQL transaction" may be started when
200
 
 * an SQL statement is started in a connection and the connection is
201
 
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
202
 
 *
203
 
 * In addition, when a user executes a START TRANSACTION or
204
 
 * BEGIN WORK statement in a connection, the kernel explicitly
205
 
 * calls each transactional storage engine's startTransaction() method.
206
 
 *
207
 
 * Ending of an SQL Statement (and Statement Transaction)
208
 
 * ------------------------------------------------------
209
 
 *
210
 
 * At the end of each SQL statement, for each of the aforementioned
211
 
 * involved storage engines, the kernel calls the engine's
212
 
 * plugin::StorageEngine::endStatement() method.  If the engine
213
 
 * has initialized or modified some internal data about the
214
 
 * statement transaction, it should use this method to reset or destroy
215
 
 * this data appropriately.
216
 
 *
217
 
 * Ending of an SQL "Normal" Transaction
218
 
 * -------------------------------------
219
 
 *
220
 
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
221
 
 * depending on the success or failure of the statement transaction(s) 
222
 
 * it encloses.
223
 
 *
224
 
 * The end of a "normal transaction" occurs when any of the following
225
 
 * occurs:
226
 
 *
227
 
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
 
 *    then the normal transaction which encloses the statement
229
 
 *    transaction ends
230
 
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
 
 * 3) Just before a DDL operation occurs, the kernel will implicitly
232
 
 *    commit the active normal transaction
233
 
 *
234
 
 * Transactions and Non-transactional Storage Engines
235
 
 * --------------------------------------------------
236
 
 *
237
 
 * For non-transactional engines, this call can be safely ignored, an
238
 
 * the kernel tracks whether a non-transactional engine has changed
239
 
 * any data state, and warns the user appropriately if a transaction
240
 
 * (statement or normal) is rolled back after such non-transactional
241
 
 * data changes have been made.
242
 
 *
243
 
 * XA Two-phase Commit Protocol
244
 
 * ----------------------------
245
 
 *
246
 
 * During statement execution, whenever any of data-modifying
247
 
 * PSEA API methods is used, e.g. Cursor::write_row() or
248
 
 * Cursor::update_row(), the read-write flag is raised in the
249
 
 * statement transaction for the involved engine.
250
 
 * Currently All PSEA calls are "traced", and the data can not be
251
 
 * changed in a way other than issuing a PSEA call. Important:
252
 
 * unless this invariant is preserved the server will not know that
253
 
 * a transaction in a given engine is read-write and will not
254
 
 * involve the two-phase commit protocol!
255
 
 *
256
 
 * At the end of a statement, TransactionServices::autocommitOrRollback()
257
 
 * is invoked. This call in turn
258
 
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
259
 
 * resource manager.
260
 
 *
261
 
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
 
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
263
 
 * is...)
264
 
 * 
265
 
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
 
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
 
 * At statement commit, the statement-related read-write engine
268
 
 * flag is propagated to the corresponding flag in the normal
269
 
 * transaction.  When the commit is complete, the list of registered
270
 
 * engines is cleared.
271
 
 *
272
 
 * Rollback is handled in a similar fashion.
273
 
 *
274
 
 * Additional notes on DDL and the normal transaction.
275
 
 * ---------------------------------------------------
276
 
 *
277
 
 * CREATE TABLE .. SELECT can start a *new* normal transaction
278
 
 * because of the fact that SELECTs on a transactional storage
279
 
 * engine participate in the normal SQL transaction (due to
280
 
 * isolation level issues and consistent read views).
281
 
 *
282
 
 * Behaviour of the server in this case is currently badly
283
 
 * defined.
284
 
 *
285
 
 * DDL statements use a form of "semantic" logging
286
 
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
 
 * the newly created table is deleted.
288
 
 * 
289
 
 * In addition, some DDL statements issue interim transaction
290
 
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
 
 * from the original table to the internal temporary table. Other
292
 
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
293
 
 * after itself.
294
 
 *
295
 
 * And finally there is a group of DDL statements such as
296
 
 * RENAME/DROP TABLE that doesn't start a new transaction
297
 
 * and doesn't commit.
298
 
 *
299
 
 * A consistent behaviour is perhaps to always commit the normal
300
 
 * transaction after all DDLs, just like the statement transaction
301
 
 * is always committed at the end of all statements.
302
 
 */
303
 
TransactionServices::TransactionServices()
304
 
{
305
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
306
 
  if (engine)
307
 
  {
308
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
309
 
  }
310
 
  else 
311
 
  {
312
 
    xa_storage_engine= NULL;
313
 
  }
314
 
}
315
 
 
316
 
void TransactionServices::registerResourceForStatement(Session *session,
317
 
                                                       plugin::MonitoredInTransaction *monitored,
318
 
                                                       plugin::TransactionalStorageEngine *engine)
319
 
{
320
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
 
  {
322
 
    /* 
323
 
     * Now we automatically register this resource manager for the
324
 
     * normal transaction.  This is fine because a statement
325
 
     * transaction registration should always enlist the resource
326
 
     * in the normal transaction which contains the statement
327
 
     * transaction.
328
 
     */
329
 
    registerResourceForTransaction(session, monitored, engine);
330
 
  }
331
 
 
332
 
  TransactionContext *trans= &session->transaction.stmt;
333
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
334
 
 
335
 
  if (resource_context->isStarted())
336
 
    return; /* already registered, return */
337
 
 
338
 
  assert(monitored->participatesInSqlTransaction());
339
 
  assert(not monitored->participatesInXaTransaction());
340
 
 
341
 
  resource_context->setMonitored(monitored);
342
 
  resource_context->setTransactionalStorageEngine(engine);
343
 
  trans->registerResource(resource_context);
344
 
 
345
 
  trans->no_2pc|= true;
346
 
}
347
 
 
348
 
void TransactionServices::registerResourceForStatement(Session *session,
349
 
                                                       plugin::MonitoredInTransaction *monitored,
350
 
                                                       plugin::TransactionalStorageEngine *engine,
351
 
                                                       plugin::XaResourceManager *resource_manager)
352
 
{
353
 
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
 
  {
355
 
    /* 
356
 
     * Now we automatically register this resource manager for the
357
 
     * normal transaction.  This is fine because a statement
358
 
     * transaction registration should always enlist the resource
359
 
     * in the normal transaction which contains the statement
360
 
     * transaction.
361
 
     */
362
 
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
 
  }
364
 
 
365
 
  TransactionContext *trans= &session->transaction.stmt;
366
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 0);
367
 
 
368
 
  if (resource_context->isStarted())
369
 
    return; /* already registered, return */
370
 
 
371
 
  assert(monitored->participatesInXaTransaction());
372
 
  assert(monitored->participatesInSqlTransaction());
373
 
 
374
 
  resource_context->setMonitored(monitored);
375
 
  resource_context->setTransactionalStorageEngine(engine);
376
 
  resource_context->setXaResourceManager(resource_manager);
377
 
  trans->registerResource(resource_context);
378
 
 
379
 
  trans->no_2pc|= false;
380
 
}
381
 
 
382
 
void TransactionServices::registerResourceForTransaction(Session *session,
383
 
                                                         plugin::MonitoredInTransaction *monitored,
384
 
                                                         plugin::TransactionalStorageEngine *engine)
385
 
{
386
 
  TransactionContext *trans= &session->transaction.all;
387
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
388
 
 
389
 
  if (resource_context->isStarted())
390
 
    return; /* already registered, return */
391
 
 
392
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
393
 
 
394
 
  trans->registerResource(resource_context);
395
 
 
396
 
  assert(monitored->participatesInSqlTransaction());
397
 
  assert(not monitored->participatesInXaTransaction());
398
 
 
399
 
  resource_context->setMonitored(monitored);
400
 
  resource_context->setTransactionalStorageEngine(engine);
401
 
  trans->no_2pc|= true;
402
 
 
403
 
  if (session->transaction.xid_state.xid.is_null())
404
 
    session->transaction.xid_state.xid.set(session->getQueryId());
405
 
 
406
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
408
 
    registerResourceForStatement(session, monitored, engine);
409
 
}
410
 
 
411
 
void TransactionServices::registerResourceForTransaction(Session *session,
412
 
                                                         plugin::MonitoredInTransaction *monitored,
413
 
                                                         plugin::TransactionalStorageEngine *engine,
414
 
                                                         plugin::XaResourceManager *resource_manager)
415
 
{
416
 
  TransactionContext *trans= &session->transaction.all;
417
 
  ResourceContext *resource_context= session->getResourceContext(monitored, 1);
418
 
 
419
 
  if (resource_context->isStarted())
420
 
    return; /* already registered, return */
421
 
 
422
 
  session->server_status|= SERVER_STATUS_IN_TRANS;
423
 
 
424
 
  trans->registerResource(resource_context);
425
 
 
426
 
  assert(monitored->participatesInSqlTransaction());
427
 
 
428
 
  resource_context->setMonitored(monitored);
429
 
  resource_context->setXaResourceManager(resource_manager);
430
 
  resource_context->setTransactionalStorageEngine(engine);
431
 
  trans->no_2pc|= true;
432
 
 
433
 
  if (session->transaction.xid_state.xid.is_null())
434
 
    session->transaction.xid_state.xid.set(session->getQueryId());
435
 
 
436
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
437
 
 
438
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session->getResourceContext(monitored, 0)->isStarted())
440
 
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
 
}
442
 
 
443
 
void TransactionServices::allocateNewTransactionId()
444
 
{
445
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
446
 
  if (! replication_services.isActive())
447
 
  {
448
 
    return;
449
 
  }
450
 
 
451
 
  Session *my_session= current_session;
452
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
 
  my_session->setXaId(xa_id);
454
 
}
455
 
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
457
 
{
458
 
  if (session->getXaId() == 0)
459
 
  {
460
 
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
461
 
  }
462
 
 
463
 
  return session->getXaId();
464
 
}
465
 
 
466
 
/**
467
 
  @retval
468
 
    0   ok
469
 
  @retval
470
 
    1   transaction was rolled back
471
 
  @retval
472
 
    2   error during commit, data may be inconsistent
473
 
 
474
 
  @todo
475
 
    Since we don't support nested statement transactions in 5.0,
476
 
    we can't commit or rollback stmt transactions while we are inside
477
 
    stored functions or triggers. So we simply do nothing now.
478
 
    TODO: This should be fixed in later ( >= 5.1) releases.
479
 
*/
480
 
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
481
 
{
482
 
  int error= 0, cookie= 0;
483
 
  /*
484
 
    'all' means that this is either an explicit commit issued by
485
 
    user, or an implicit commit issued by a DDL.
486
 
  */
487
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
488
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
489
 
 
490
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
491
 
 
492
 
  /*
493
 
    We must not commit the normal transaction if a statement
494
 
    transaction is pending. Otherwise statement transaction
495
 
    flags will not get propagated to its normal transaction's
496
 
    counterpart.
497
 
  */
498
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
499
 
              trans == &session->transaction.stmt);
500
 
 
501
 
  if (resource_contexts.empty() == false)
502
 
  {
503
 
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
504
 
    {
505
 
      rollbackTransaction(session, normal_transaction);
506
 
      return 1;
507
 
    }
508
 
 
509
 
    /*
510
 
     * If replication is on, we do a PREPARE on the resource managers, push the
511
 
     * Transaction message across the replication stream, and then COMMIT if the
512
 
     * replication stream returned successfully.
513
 
     */
514
 
    if (shouldConstructMessages())
515
 
    {
516
 
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
517
 
           it != resource_contexts.end() && ! error;
518
 
           ++it)
519
 
      {
520
 
        ResourceContext *resource_context= *it;
521
 
        int err;
522
 
        /*
523
 
          Do not call two-phase commit if this particular
524
 
          transaction is read-only. This allows for simpler
525
 
          implementation in engines that are always read-only.
526
 
        */
527
 
        if (! resource_context->hasModifiedData())
528
 
          continue;
529
 
 
530
 
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
531
 
 
532
 
        if (resource->participatesInXaTransaction())
533
 
        {
534
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(session, normal_transaction)))
535
 
          {
536
 
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
537
 
            error= 1;
538
 
          }
539
 
          else
540
 
          {
541
 
            session->status_var.ha_prepare_count++;
542
 
          }
543
 
        }
544
 
      }
545
 
      if (error == 0 && is_real_trans)
546
 
      {
547
 
        /*
548
 
         * Push the constructed Transaction messages across to
549
 
         * replicators and appliers.
550
 
         */
551
 
        error= commitTransactionMessage(session);
552
 
      }
553
 
      if (error)
554
 
      {
555
 
        rollbackTransaction(session, normal_transaction);
556
 
        error= 1;
557
 
        goto end;
558
 
      }
559
 
    }
560
 
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
561
 
end:
562
 
    if (is_real_trans)
563
 
      session->startWaitingGlobalReadLock();
564
 
  }
565
 
  return error;
566
 
}
567
 
 
568
 
/**
569
 
  @note
570
 
  This function does not care about global read lock. A caller should.
571
 
*/
572
 
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
573
 
{
574
 
  int error=0;
575
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
576
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
577
 
 
578
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
579
 
 
580
 
  if (resource_contexts.empty() == false)
581
 
  {
582
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
583
 
         it != resource_contexts.end();
584
 
         ++it)
585
 
    {
586
 
      int err;
587
 
      ResourceContext *resource_context= *it;
588
 
 
589
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
590
 
 
591
 
      if (resource->participatesInXaTransaction())
592
 
      {
593
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(session, normal_transaction)))
594
 
        {
595
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
596
 
          error= 1;
597
 
        }
598
 
        else if (normal_transaction)
599
 
        {
600
 
          session->status_var.ha_commit_count++;
601
 
        }
602
 
      }
603
 
      else if (resource->participatesInSqlTransaction())
604
 
      {
605
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(session, normal_transaction)))
606
 
        {
607
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
608
 
          error= 1;
609
 
        }
610
 
        else if (normal_transaction)
611
 
        {
612
 
          session->status_var.ha_commit_count++;
613
 
        }
614
 
      }
615
 
      resource_context->reset(); /* keep it conveniently zero-filled */
616
 
    }
617
 
 
618
 
    if (is_real_trans)
619
 
      session->transaction.xid_state.xid.null();
620
 
 
621
 
    if (normal_transaction)
622
 
    {
623
 
      session->variables.tx_isolation= session->session_tx_isolation;
624
 
      session->transaction.cleanup();
625
 
    }
626
 
  }
627
 
  trans->reset();
628
 
  return error;
629
 
}
630
 
 
631
 
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
632
 
{
633
 
  int error= 0;
634
 
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
635
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
636
 
 
637
 
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
638
 
 
639
 
  /*
640
 
    We must not rollback the normal transaction if a statement
641
 
    transaction is pending.
642
 
  */
643
 
  assert(session->transaction.stmt.getResourceContexts().empty() ||
644
 
              trans == &session->transaction.stmt);
645
 
 
646
 
  if (resource_contexts.empty() == false)
647
 
  {
648
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
649
 
         it != resource_contexts.end();
650
 
         ++it)
651
 
    {
652
 
      int err;
653
 
      ResourceContext *resource_context= *it;
654
 
 
655
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
656
 
 
657
 
      if (resource->participatesInXaTransaction())
658
 
      {
659
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(session, normal_transaction)))
660
 
        {
661
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
662
 
          error= 1;
663
 
        }
664
 
        else if (normal_transaction)
665
 
        {
666
 
          session->status_var.ha_rollback_count++;
667
 
        }
668
 
      }
669
 
      else if (resource->participatesInSqlTransaction())
670
 
      {
671
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, normal_transaction)))
672
 
        {
673
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
674
 
          error= 1;
675
 
        }
676
 
        else if (normal_transaction)
677
 
        {
678
 
          session->status_var.ha_rollback_count++;
679
 
        }
680
 
      }
681
 
      resource_context->reset(); /* keep it conveniently zero-filled */
682
 
    }
683
 
    
684
 
    /* 
685
 
     * We need to signal the ROLLBACK to ReplicationServices here
686
 
     * BEFORE we set the transaction ID to NULL.  This is because
687
 
     * if a bulk segment was sent to replicators, we need to send
688
 
     * a rollback statement with the corresponding transaction ID
689
 
     * to rollback.
690
 
     */
691
 
    if (normal_transaction)
692
 
      rollbackTransactionMessage(session);
693
 
    else
694
 
      rollbackStatementMessage(session);
695
 
 
696
 
    if (is_real_trans)
697
 
      session->transaction.xid_state.xid.null();
698
 
    if (normal_transaction)
699
 
    {
700
 
      session->variables.tx_isolation=session->session_tx_isolation;
701
 
      session->transaction.cleanup();
702
 
    }
703
 
  }
704
 
  if (normal_transaction)
705
 
    session->transaction_rollback_request= false;
706
 
 
707
 
  /*
708
 
   * If a non-transactional table was updated, warn the user
709
 
   */
710
 
  if (is_real_trans &&
711
 
      session->transaction.all.hasModifiedNonTransData() &&
712
 
      session->getKilled() != Session::KILL_CONNECTION)
713
 
  {
714
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
715
 
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
716
 
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
717
 
  }
718
 
  trans->reset();
719
 
  return error;
720
 
}
721
 
 
722
 
/**
723
 
  This is used to commit or rollback a single statement depending on
724
 
  the value of error.
725
 
 
726
 
  @note
727
 
    Note that if the autocommit is on, then the following call inside
728
 
    InnoDB will commit or rollback the whole transaction (= the statement). The
729
 
    autocommit mechanism built into InnoDB is based on counting locks, but if
730
 
    the user has used LOCK TABLES then that mechanism does not know to do the
731
 
    commit.
732
 
*/
733
 
int TransactionServices::autocommitOrRollback(Session *session, int error)
734
 
{
735
 
  /* One GPB Statement message per SQL statement */
736
 
  message::Statement *statement= session->getStatementMessage();
737
 
  if ((statement != NULL) && (! error))
738
 
    finalizeStatementMessage(*statement, session);
739
 
 
740
 
  if (session->transaction.stmt.getResourceContexts().empty() == false)
741
 
  {
742
 
    TransactionContext *trans = &session->transaction.stmt;
743
 
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
744
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
745
 
         it != resource_contexts.end();
746
 
         ++it)
747
 
    {
748
 
      ResourceContext *resource_context= *it;
749
 
 
750
 
      resource_context->getTransactionalStorageEngine()->endStatement(session);
751
 
    }
752
 
 
753
 
    if (! error)
754
 
    {
755
 
      if (commitTransaction(session, false))
756
 
        error= 1;
757
 
    }
758
 
    else
759
 
    {
760
 
      (void) rollbackTransaction(session, false);
761
 
      if (session->transaction_rollback_request)
762
 
        (void) rollbackTransaction(session, true);
763
 
    }
764
 
 
765
 
    session->variables.tx_isolation= session->session_tx_isolation;
766
 
  }
767
 
  return error;
768
 
}
769
 
 
770
 
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
771
 
{
772
 
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
773
 
  {
774
 
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
775
 
     * resources aren't the same... */
776
 
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
777
 
  }
778
 
};
779
 
 
780
 
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
781
 
{
782
 
  int error= 0;
783
 
  TransactionContext *trans= &session->transaction.all;
784
 
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
785
 
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
786
 
 
787
 
  trans->no_2pc= false;
788
 
  /*
789
 
    rolling back to savepoint in all storage engines that were part of the
790
 
    transaction when the savepoint was set
791
 
  */
792
 
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
793
 
       it != sv_resource_contexts.end();
794
 
       ++it)
795
 
  {
796
 
    int err;
797
 
    ResourceContext *resource_context= *it;
798
 
 
799
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
800
 
 
801
 
    if (resource->participatesInSqlTransaction())
802
 
    {
803
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(session, sv)))
804
 
      {
805
 
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
806
 
        error= 1;
807
 
      }
808
 
      else
809
 
      {
810
 
        session->status_var.ha_savepoint_rollback_count++;
811
 
      }
812
 
    }
813
 
    trans->no_2pc|= not resource->participatesInXaTransaction();
814
 
  }
815
 
  /*
816
 
    rolling back the transaction in all storage engines that were not part of
817
 
    the transaction when the savepoint was set
818
 
  */
819
 
  {
820
 
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
821
 
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
822
 
    TransactionContext::ResourceContexts set_difference_contexts;
823
 
 
824
 
    /* 
825
 
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
826
 
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
827
 
     * here
828
 
     */
829
 
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
830
 
 
831
 
    sort(sorted_tran_resource_contexts.begin(),
832
 
         sorted_tran_resource_contexts.end(),
833
 
         ResourceContextCompare());
834
 
    sort(sorted_sv_resource_contexts.begin(),
835
 
         sorted_sv_resource_contexts.end(),
836
 
         ResourceContextCompare());
837
 
    set_difference(sorted_tran_resource_contexts.begin(),
838
 
                   sorted_tran_resource_contexts.end(),
839
 
                   sorted_sv_resource_contexts.begin(),
840
 
                   sorted_sv_resource_contexts.end(),
841
 
                   set_difference_contexts.begin(),
842
 
                   ResourceContextCompare());
843
 
    /* 
844
 
     * set_difference_contexts now contains all resource contexts
845
 
     * which are in the transaction context but were NOT in the
846
 
     * savepoint's resource contexts.
847
 
     */
848
 
        
849
 
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
850
 
         it != set_difference_contexts.end();
851
 
         ++it)
852
 
    {
853
 
      ResourceContext *resource_context= *it;
854
 
      int err;
855
 
 
856
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
857
 
 
858
 
      if (resource->participatesInSqlTransaction())
859
 
      {
860
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(session, !(0))))
861
 
        {
862
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
863
 
          error= 1;
864
 
        }
865
 
        else
866
 
        {
867
 
          session->status_var.ha_rollback_count++;
868
 
        }
869
 
      }
870
 
      resource_context->reset(); /* keep it conveniently zero-filled */
871
 
    }
872
 
  }
873
 
  trans->setResourceContexts(sv_resource_contexts);
874
 
 
875
 
  if (shouldConstructMessages())
876
 
  {
877
 
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
878
 
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
879
 
    if (savepoint_transaction != NULL)
880
 
    {
881
 
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
882
 
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
883
 
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
884
 
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
885
 
      */ 
886
 
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
887
 
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
888
 
      if (num_statements == 0)
889
 
      {    
890
 
        session->setStatementMessage(NULL);
891
 
      }    
892
 
      else 
893
 
      {
894
 
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
895
 
      }    
896
 
      session->setTransactionMessage(savepoint_transaction_copy);
897
 
    }
898
 
  }
899
 
 
900
 
  return error;
901
 
}
902
 
 
903
 
/**
904
 
  @note
905
 
  according to the sql standard (ISO/IEC 9075-2:2003)
906
 
  section "4.33.4 SQL-statements and transaction states",
907
 
  NamedSavepoint is *not* transaction-initiating SQL-statement
908
 
*/
909
 
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
910
 
{
911
 
  int error= 0;
912
 
  TransactionContext *trans= &session->transaction.all;
913
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
914
 
 
915
 
  if (resource_contexts.empty() == false)
916
 
  {
917
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
918
 
         it != resource_contexts.end();
919
 
         ++it)
920
 
    {
921
 
      ResourceContext *resource_context= *it;
922
 
      int err;
923
 
 
924
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
925
 
 
926
 
      if (resource->participatesInSqlTransaction())
927
 
      {
928
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(session, sv)))
929
 
        {
930
 
          my_error(ER_GET_ERRNO, MYF(0), err);
931
 
          error= 1;
932
 
        }
933
 
        else
934
 
        {
935
 
          session->status_var.ha_savepoint_count++;
936
 
        }
937
 
      }
938
 
    }
939
 
  }
940
 
  /*
941
 
    Remember the list of registered storage engines.
942
 
  */
943
 
  sv.setResourceContexts(resource_contexts);
944
 
 
945
 
  if (shouldConstructMessages())
946
 
  {
947
 
    message::Transaction *transaction= session->getTransactionMessage();
948
 
                  
949
 
    if (transaction != NULL)
950
 
    {
951
 
      message::Transaction *transaction_savepoint= 
952
 
        new message::Transaction(*transaction);
953
 
      sv.setTransactionMessage(transaction_savepoint);
954
 
    }
955
 
  } 
956
 
 
957
 
  return error;
958
 
}
959
 
 
960
 
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
961
 
{
962
 
  int error= 0;
963
 
 
964
 
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
965
 
 
966
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
967
 
       it != resource_contexts.end();
968
 
       ++it)
969
 
  {
970
 
    int err;
971
 
    ResourceContext *resource_context= *it;
972
 
 
973
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
974
 
 
975
 
    if (resource->participatesInSqlTransaction())
976
 
    {
977
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(session, sv)))
978
 
      {
979
 
        my_error(ER_GET_ERRNO, MYF(0), err);
980
 
        error= 1;
981
 
      }
982
 
    }
983
 
  }
984
 
  
985
 
  return error;
986
 
}
987
 
 
988
 
bool TransactionServices::shouldConstructMessages()
989
 
{
990
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
991
 
  return replication_services.isActive();
992
 
}
993
 
 
994
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
995
 
{
996
 
  message::Transaction *transaction= in_session->getTransactionMessage();
997
 
 
998
 
  if (unlikely(transaction == NULL))
999
 
  {
1000
 
    /* 
1001
 
     * Allocate and initialize a new transaction message 
1002
 
     * for this Session object.  Session is responsible for
1003
 
     * deleting transaction message when done with it.
1004
 
     */
1005
 
    transaction= new (nothrow) message::Transaction();
1006
 
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
1007
 
    in_session->setTransactionMessage(transaction);
1008
 
    return transaction;
1009
 
  }
1010
 
  else
1011
 
    return transaction;
1012
 
}
1013
 
 
1014
 
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
1015
 
                                                 Session *in_session,
1016
 
                                                 bool should_inc_trx_id)
1017
 
{
1018
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1019
 
  trx->set_server_id(in_session->getServerId());
1020
 
 
1021
 
  if (should_inc_trx_id)
1022
 
  {
1023
 
    trx->set_transaction_id(getCurrentTransactionId(in_session));
1024
 
    in_session->setXaId(0);
1025
 
  }  
1026
 
  else
1027
 
  { 
1028
 
    trx->set_transaction_id(0);
1029
 
  }
1030
 
 
1031
 
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
1032
 
}
1033
 
 
1034
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
1035
 
                                              Session *in_session)
1036
 
{
1037
 
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
1039
 
}
1040
 
 
1041
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
1042
 
                                             Session *in_session)
1043
 
{
1044
 
  delete in_transaction;
1045
 
  in_session->setStatementMessage(NULL);
1046
 
  in_session->setTransactionMessage(NULL);
1047
 
  in_session->setXaId(0);
1048
 
}
1049
 
 
1050
 
int TransactionServices::commitTransactionMessage(Session *in_session)
1051
 
{
1052
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
 
  if (! replication_services.isActive())
1054
 
    return 0;
1055
 
 
1056
 
  /*
1057
 
   * If no Transaction message was ever created, then no data modification
1058
 
   * occurred inside the transaction, so nothing to do.
1059
 
   */
1060
 
  if (in_session->getTransactionMessage() == NULL)
1061
 
    return 0;
1062
 
  
1063
 
  /* If there is an active statement message, finalize it. */
1064
 
  message::Statement *statement= in_session->getStatementMessage();
1065
 
 
1066
 
  if (statement != NULL)
1067
 
  {
1068
 
    finalizeStatementMessage(*statement, in_session);
1069
 
  }
1070
 
 
1071
 
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
1072
 
 
1073
 
  /*
1074
 
   * It is possible that we could have a Transaction without any Statements
1075
 
   * if we had created a Statement but had to roll it back due to it failing
1076
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
 
   * message. In this case, we simply clean up the message and not push it.
1078
 
   */
1079
 
  if (transaction->statement_size() == 0)
1080
 
  {
1081
 
    cleanupTransactionMessage(transaction, in_session);
1082
 
    return 0;
1083
 
  }
1084
 
  
1085
 
  finalizeTransactionMessage(*transaction, in_session);
1086
 
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1088
 
 
1089
 
  cleanupTransactionMessage(transaction, in_session);
1090
 
 
1091
 
  return static_cast<int>(result);
1092
 
}
1093
 
 
1094
 
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
 
                                        message::Statement::Type in_type,
1096
 
                                        Session *in_session)
1097
 
{
1098
 
  statement.set_type(in_type);
1099
 
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1100
 
 
1101
 
  if (in_session->variables.replicate_query)
1102
 
    statement.set_sql(in_session->getQueryString()->c_str());
1103
 
}
1104
 
 
1105
 
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                            Session *in_session)
1107
 
{
1108
 
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
1109
 
  in_session->setStatementMessage(NULL);
1110
 
}
1111
 
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1113
 
{
1114
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
 
  if (! replication_services.isActive())
1116
 
    return;
1117
 
  
1118
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1119
 
 
1120
 
  /*
1121
 
   * OK, so there are two situations that we need to deal with here:
1122
 
   *
1123
 
   * 1) We receive an instruction to ROLLBACK the current transaction
1124
 
   *    and the currently-stored Transaction message is *self-contained*, 
1125
 
   *    meaning that no Statement messages in the Transaction message
1126
 
   *    contain a message having its segment_id member greater than 1.  If
1127
 
   *    no non-segment ID 1 members are found, we can simply clear the
1128
 
   *    current Transaction message and remove it from memory.
1129
 
   *
1130
 
   * 2) If the Transaction message does indeed have a non-end segment, that
1131
 
   *    means that a bulk update/delete/insert Transaction message segment
1132
 
   *    has previously been sent over the wire to replicators.  In this case, 
1133
 
   *    we need to package a Transaction with a Statement message of type
1134
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
1135
 
   *    messages must be un-applied.
1136
 
   */
1137
 
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1138
 
  {
1139
 
    /* Remember the transaction ID so we can re-use it */
1140
 
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
 
 
1142
 
    /*
1143
 
     * Clear the transaction, create a Rollback statement message, 
1144
 
     * attach it to the transaction, and push it to replicators.
1145
 
     */
1146
 
    transaction->Clear();
1147
 
    initTransactionMessage(*transaction, in_session, false);
1148
 
 
1149
 
    /* Set the transaction ID to match the previous messages */
1150
 
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1151
 
 
1152
 
    message::Statement *statement= transaction->add_statement();
1153
 
 
1154
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1155
 
    finalizeStatementMessage(*statement, in_session);
1156
 
 
1157
 
    finalizeTransactionMessage(*transaction, in_session);
1158
 
    
1159
 
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1160
 
  }
1161
 
  cleanupTransactionMessage(transaction, in_session);
1162
 
}
1163
 
 
1164
 
void TransactionServices::rollbackStatementMessage(Session *in_session)
1165
 
{
1166
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1167
 
  if (! replication_services.isActive())
1168
 
    return;
1169
 
 
1170
 
  message::Statement *current_statement= in_session->getStatementMessage();
1171
 
 
1172
 
  /* If we never added a Statement message, nothing to undo. */
1173
 
  if (current_statement == NULL)
1174
 
    return;
1175
 
 
1176
 
  /*
1177
 
   * If the Statement has been segmented, then we've already pushed a portion
1178
 
   * of this Statement's row changes through the replication stream and we
1179
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1180
 
   * delete the current Statement message.
1181
 
   */
1182
 
  bool is_segmented= false;
1183
 
 
1184
 
  switch (current_statement->type())
1185
 
  {
1186
 
    case message::Statement::INSERT:
1187
 
      if (current_statement->insert_data().segment_id() > 1)
1188
 
        is_segmented= true;
1189
 
      break;
1190
 
 
1191
 
    case message::Statement::UPDATE:
1192
 
      if (current_statement->update_data().segment_id() > 1)
1193
 
        is_segmented= true;
1194
 
      break;
1195
 
 
1196
 
    case message::Statement::DELETE:
1197
 
      if (current_statement->delete_data().segment_id() > 1)
1198
 
        is_segmented= true;
1199
 
      break;
1200
 
 
1201
 
    default:
1202
 
      break;
1203
 
  }
1204
 
 
1205
 
  /*
1206
 
   * Remove the Statement message we've been working with (same as
1207
 
   * current_statement).
1208
 
   */
1209
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1210
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1211
 
  statements_in_txn= transaction->mutable_statement();
1212
 
  statements_in_txn->RemoveLast();
1213
 
  in_session->setStatementMessage(NULL);
1214
 
  
1215
 
  /*
1216
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1217
 
   * an indicator to cancel the previous Statement message which should have
1218
 
   * had its end_segment attribute set to false.
1219
 
   */
1220
 
  if (is_segmented)
1221
 
  {
1222
 
    current_statement= transaction->add_statement();
1223
 
    initStatementMessage(*current_statement,
1224
 
                         message::Statement::ROLLBACK_STATEMENT,
1225
 
                         in_session);
1226
 
    finalizeStatementMessage(*current_statement, in_session);
1227
 
  }
1228
 
}
1229
 
  
1230
 
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1231
 
                                                            Table *in_table,
1232
 
                                                            uint32_t *next_segment_id)
1233
 
{
1234
 
  message::Statement *statement= in_session->getStatementMessage();
1235
 
  message::Transaction *transaction= NULL;
1236
 
 
1237
 
  /* 
1238
 
   * Check the type for the current Statement message, if it is anything
1239
 
   * other then INSERT we need to call finalize, this will ensure a 
1240
 
   * new InsertStatement is created. If it is of type INSERT check
1241
 
   * what table the INSERT belongs to, if it is a different table
1242
 
   * call finalize, so a new InsertStatement can be created. 
1243
 
   */
1244
 
  if (statement != NULL && statement->type() != message::Statement::INSERT)
1245
 
  {
1246
 
    finalizeStatementMessage(*statement, in_session);
1247
 
    statement= in_session->getStatementMessage();
1248
 
  } 
1249
 
  else if (statement != NULL)
1250
 
  {
1251
 
    transaction= getActiveTransactionMessage(in_session);
1252
 
 
1253
 
    /*
1254
 
     * If we've passed our threshold for the statement size (possible for
1255
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1256
 
     * the Transaction will keep it from getting huge).
1257
 
     */
1258
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1259
 
      in_session->variables.transaction_message_threshold)
1260
 
    {
1261
 
      /* Remember the transaction ID so we can re-use it */
1262
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1263
 
 
1264
 
      message::InsertData *current_data= statement->mutable_insert_data();
1265
 
 
1266
 
      /* Caller should use this value when adding a new record */
1267
 
      *next_segment_id= current_data->segment_id() + 1;
1268
 
 
1269
 
      current_data->set_end_segment(false);
1270
 
 
1271
 
      /* 
1272
 
       * Send the trx message to replicators after finalizing the 
1273
 
       * statement and transaction. This will also set the Transaction
1274
 
       * and Statement objects in Session to NULL.
1275
 
       */
1276
 
      commitTransactionMessage(in_session);
1277
 
 
1278
 
      /*
1279
 
       * Statement and Transaction should now be NULL, so new ones will get
1280
 
       * created. We reuse the transaction id since we are segmenting
1281
 
       * one transaction.
1282
 
       */
1283
 
      statement= in_session->getStatementMessage();
1284
 
      transaction= getActiveTransactionMessage(in_session, false);
1285
 
      assert(transaction != NULL);
1286
 
 
1287
 
      /* Set the transaction ID to match the previous messages */
1288
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1289
 
    }
1290
 
    else
1291
 
    {
1292
 
      const message::InsertHeader &insert_header= statement->insert_header();
1293
 
      string old_table_name= insert_header.table_metadata().table_name();
1294
 
     
1295
 
      string current_table_name;
1296
 
      (void) in_table->getShare()->getTableName(current_table_name);
1297
 
 
1298
 
      if (current_table_name.compare(old_table_name))
1299
 
      {
1300
 
        finalizeStatementMessage(*statement, in_session);
1301
 
        statement= in_session->getStatementMessage();
1302
 
      }
1303
 
      else
1304
 
      {
1305
 
        /* carry forward the existing segment id */
1306
 
        const message::InsertData &current_data= statement->insert_data();
1307
 
        *next_segment_id= current_data.segment_id();
1308
 
      }
1309
 
    }
1310
 
  } 
1311
 
 
1312
 
  if (statement == NULL)
1313
 
  {
1314
 
    /*
1315
 
     * Transaction will be non-NULL only if we had to segment it due to
1316
 
     * transaction size above.
1317
 
     */
1318
 
    if (transaction == NULL)
1319
 
      transaction= getActiveTransactionMessage(in_session);
1320
 
 
1321
 
    /* 
1322
 
     * Transaction message initialized and set, but no statement created
1323
 
     * yet.  We construct one and initialize it, here, then return the
1324
 
     * message after attaching the new Statement message pointer to the 
1325
 
     * Session for easy retrieval later...
1326
 
     */
1327
 
    statement= transaction->add_statement();
1328
 
    setInsertHeader(*statement, in_session, in_table);
1329
 
    in_session->setStatementMessage(statement);
1330
 
  }
1331
 
  return *statement;
1332
 
}
1333
 
 
1334
 
void TransactionServices::setInsertHeader(message::Statement &statement,
1335
 
                                          Session *in_session,
1336
 
                                          Table *in_table)
1337
 
{
1338
 
  initStatementMessage(statement, message::Statement::INSERT, in_session);
1339
 
 
1340
 
  /* 
1341
 
   * Now we construct the specialized InsertHeader message inside
1342
 
   * the generalized message::Statement container...
1343
 
   */
1344
 
  /* Set up the insert header */
1345
 
  message::InsertHeader *header= statement.mutable_insert_header();
1346
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1347
 
 
1348
 
  string schema_name;
1349
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1350
 
  string table_name;
1351
 
  (void) in_table->getShare()->getTableName(table_name);
1352
 
 
1353
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1354
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1355
 
 
1356
 
  Field *current_field;
1357
 
  Field **table_fields= in_table->getFields();
1358
 
 
1359
 
  message::FieldMetadata *field_metadata;
1360
 
 
1361
 
  /* We will read all the table's fields... */
1362
 
  in_table->setReadSet();
1363
 
 
1364
 
  while ((current_field= *table_fields++) != NULL) 
1365
 
  {
1366
 
    field_metadata= header->add_field_metadata();
1367
 
    field_metadata->set_name(current_field->field_name);
1368
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1369
 
  }
1370
 
}
1371
 
 
1372
 
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1373
 
{
1374
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1375
 
  if (! replication_services.isActive())
1376
 
    return false;
1377
 
  /**
1378
 
   * We do this check here because we don't want to even create a 
1379
 
   * statement if there isn't a primary key on the table...
1380
 
   *
1381
 
   * @todo
1382
 
   *
1383
 
   * Multi-column primary keys are handled how exactly?
1384
 
   */
1385
 
  if (not in_table->getShare()->hasPrimaryKey())
1386
 
  {
1387
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1388
 
    return true;
1389
 
  }
1390
 
 
1391
 
  uint32_t next_segment_id= 1;
1392
 
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1393
 
 
1394
 
  message::InsertData *data= statement.mutable_insert_data();
1395
 
  data->set_segment_id(next_segment_id);
1396
 
  data->set_end_segment(true);
1397
 
  message::InsertRecord *record= data->add_record();
1398
 
 
1399
 
  Field *current_field;
1400
 
  Field **table_fields= in_table->getFields();
1401
 
 
1402
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1403
 
  string_value->set_charset(system_charset_info);
1404
 
 
1405
 
  /* We will read all the table's fields... */
1406
 
  in_table->setReadSet();
1407
 
 
1408
 
  while ((current_field= *table_fields++) != NULL) 
1409
 
  {
1410
 
    if (current_field->is_null())
1411
 
    {
1412
 
      record->add_is_null(true);
1413
 
      record->add_insert_value("", 0);
1414
 
    } 
1415
 
    else 
1416
 
    {
1417
 
      string_value= current_field->val_str_internal(string_value);
1418
 
      record->add_is_null(false);
1419
 
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1420
 
      string_value->free();
1421
 
    }
1422
 
  }
1423
 
  return false;
1424
 
}
1425
 
 
1426
 
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1427
 
                                                            Table *in_table,
1428
 
                                                            const unsigned char *old_record, 
1429
 
                                                            const unsigned char *new_record,
1430
 
                                                            uint32_t *next_segment_id)
1431
 
{
1432
 
  message::Statement *statement= in_session->getStatementMessage();
1433
 
  message::Transaction *transaction= NULL;
1434
 
 
1435
 
  /*
1436
 
   * Check the type for the current Statement message, if it is anything
1437
 
   * other then UPDATE we need to call finalize, this will ensure a
1438
 
   * new UpdateStatement is created. If it is of type UPDATE check
1439
 
   * what table the UPDATE belongs to, if it is a different table
1440
 
   * call finalize, so a new UpdateStatement can be created.
1441
 
   */
1442
 
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
1443
 
  {
1444
 
    finalizeStatementMessage(*statement, in_session);
1445
 
    statement= in_session->getStatementMessage();
1446
 
  }
1447
 
  else if (statement != NULL)
1448
 
  {
1449
 
    transaction= getActiveTransactionMessage(in_session);
1450
 
 
1451
 
    /*
1452
 
     * If we've passed our threshold for the statement size (possible for
1453
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1454
 
     * the Transaction will keep it from getting huge).
1455
 
     */
1456
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
 
      in_session->variables.transaction_message_threshold)
1458
 
    {
1459
 
      /* Remember the transaction ID so we can re-use it */
1460
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1461
 
 
1462
 
      message::UpdateData *current_data= statement->mutable_update_data();
1463
 
 
1464
 
      /* Caller should use this value when adding a new record */
1465
 
      *next_segment_id= current_data->segment_id() + 1;
1466
 
 
1467
 
      current_data->set_end_segment(false);
1468
 
 
1469
 
      /*
1470
 
       * Send the trx message to replicators after finalizing the 
1471
 
       * statement and transaction. This will also set the Transaction
1472
 
       * and Statement objects in Session to NULL.
1473
 
       */
1474
 
      commitTransactionMessage(in_session);
1475
 
 
1476
 
      /*
1477
 
       * Statement and Transaction should now be NULL, so new ones will get
1478
 
       * created. We reuse the transaction id since we are segmenting
1479
 
       * one transaction.
1480
 
       */
1481
 
      statement= in_session->getStatementMessage();
1482
 
      transaction= getActiveTransactionMessage(in_session, false);
1483
 
      assert(transaction != NULL);
1484
 
 
1485
 
      /* Set the transaction ID to match the previous messages */
1486
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1487
 
    }
1488
 
    else
1489
 
    {
1490
 
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1491
 
      {
1492
 
        /* carry forward the existing segment id */
1493
 
        const message::UpdateData &current_data= statement->update_data();
1494
 
        *next_segment_id= current_data.segment_id();
1495
 
      } 
1496
 
      else 
1497
 
      {
1498
 
        finalizeStatementMessage(*statement, in_session);
1499
 
        statement= in_session->getStatementMessage();
1500
 
      }
1501
 
    }
1502
 
  }
1503
 
 
1504
 
  if (statement == NULL)
1505
 
  {
1506
 
    /*
1507
 
     * Transaction will be non-NULL only if we had to segment it due to
1508
 
     * transaction size above.
1509
 
     */
1510
 
    if (transaction == NULL)
1511
 
      transaction= getActiveTransactionMessage(in_session);
1512
 
 
1513
 
    /* 
1514
 
     * Transaction message initialized and set, but no statement created
1515
 
     * yet.  We construct one and initialize it, here, then return the
1516
 
     * message after attaching the new Statement message pointer to the 
1517
 
     * Session for easy retrieval later...
1518
 
     */
1519
 
    statement= transaction->add_statement();
1520
 
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1521
 
    in_session->setStatementMessage(statement);
1522
 
  }
1523
 
  return *statement;
1524
 
}
1525
 
 
1526
 
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1527
 
                                                  Table *in_table,
1528
 
                                                  const unsigned char *old_record,
1529
 
                                                  const unsigned char *new_record)
1530
 
{
1531
 
  const message::UpdateHeader &update_header= statement.update_header();
1532
 
  string old_table_name= update_header.table_metadata().table_name();
1533
 
 
1534
 
  string current_table_name;
1535
 
  (void) in_table->getShare()->getTableName(current_table_name);
1536
 
  if (current_table_name.compare(old_table_name))
1537
 
  {
1538
 
    return false;
1539
 
  }
1540
 
  else
1541
 
  {
1542
 
    /* Compare the set fields in the existing UpdateHeader and see if they
1543
 
     * match the updated fields in the new record, if they do not we must
1544
 
     * create a new UpdateHeader 
1545
 
     */
1546
 
    size_t num_set_fields= update_header.set_field_metadata_size();
1547
 
 
1548
 
    Field *current_field;
1549
 
    Field **table_fields= in_table->getFields();
1550
 
    in_table->setReadSet();
1551
 
 
1552
 
    size_t num_calculated_updated_fields= 0;
1553
 
    bool found= false;
1554
 
    while ((current_field= *table_fields++) != NULL)
1555
 
    {
1556
 
      if (num_calculated_updated_fields > num_set_fields)
1557
 
      {
1558
 
        break;
1559
 
      }
1560
 
 
1561
 
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
1562
 
      {
1563
 
        /* check that this field exists in the UpdateHeader record */
1564
 
        found= false;
1565
 
 
1566
 
        for (size_t x= 0; x < num_set_fields; ++x)
1567
 
        {
1568
 
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1569
 
          string name= field_metadata.name();
1570
 
          if (name.compare(current_field->field_name) == 0)
1571
 
          {
1572
 
            found= true;
1573
 
            ++num_calculated_updated_fields;
1574
 
            break;
1575
 
          } 
1576
 
        }
1577
 
        if (! found)
1578
 
        {
1579
 
          break;
1580
 
        } 
1581
 
      }
1582
 
    }
1583
 
 
1584
 
    if ((num_calculated_updated_fields == num_set_fields) && found)
1585
 
    {
1586
 
      return true;
1587
 
    } 
1588
 
    else 
1589
 
    {
1590
 
      return false;
1591
 
    }
1592
 
  }
1593
 
}  
1594
 
 
1595
 
void TransactionServices::setUpdateHeader(message::Statement &statement,
1596
 
                                          Session *in_session,
1597
 
                                          Table *in_table,
1598
 
                                          const unsigned char *old_record, 
1599
 
                                          const unsigned char *new_record)
1600
 
{
1601
 
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
1602
 
 
1603
 
  /* 
1604
 
   * Now we construct the specialized UpdateHeader message inside
1605
 
   * the generalized message::Statement container...
1606
 
   */
1607
 
  /* Set up the update header */
1608
 
  message::UpdateHeader *header= statement.mutable_update_header();
1609
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1610
 
 
1611
 
  string schema_name;
1612
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1613
 
  string table_name;
1614
 
  (void) in_table->getShare()->getTableName(table_name);
1615
 
 
1616
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1617
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1618
 
 
1619
 
  Field *current_field;
1620
 
  Field **table_fields= in_table->getFields();
1621
 
 
1622
 
  message::FieldMetadata *field_metadata;
1623
 
 
1624
 
  /* We will read all the table's fields... */
1625
 
  in_table->setReadSet();
1626
 
 
1627
 
  while ((current_field= *table_fields++) != NULL) 
1628
 
  {
1629
 
    /*
1630
 
     * We add the "key field metadata" -- i.e. the fields which is
1631
 
     * the primary key for the table.
1632
 
     */
1633
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1634
 
    {
1635
 
      field_metadata= header->add_key_field_metadata();
1636
 
      field_metadata->set_name(current_field->field_name);
1637
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1638
 
    }
1639
 
 
1640
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1641
 
    {
1642
 
      /* Field is changed from old to new */
1643
 
      field_metadata= header->add_set_field_metadata();
1644
 
      field_metadata->set_name(current_field->field_name);
1645
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1646
 
    }
1647
 
  }
1648
 
}
1649
 
void TransactionServices::updateRecord(Session *in_session,
1650
 
                                       Table *in_table, 
1651
 
                                       const unsigned char *old_record, 
1652
 
                                       const unsigned char *new_record)
1653
 
{
1654
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1655
 
  if (! replication_services.isActive())
1656
 
    return;
1657
 
 
1658
 
  uint32_t next_segment_id= 1;
1659
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1660
 
 
1661
 
  message::UpdateData *data= statement.mutable_update_data();
1662
 
  data->set_segment_id(next_segment_id);
1663
 
  data->set_end_segment(true);
1664
 
  message::UpdateRecord *record= data->add_record();
1665
 
 
1666
 
  Field *current_field;
1667
 
  Field **table_fields= in_table->getFields();
1668
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1669
 
  string_value->set_charset(system_charset_info);
1670
 
 
1671
 
  while ((current_field= *table_fields++) != NULL) 
1672
 
  {
1673
 
    /*
1674
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1675
 
     * but then realized that an UPDATE statement could potentially have different values for
1676
 
     * the SET field.  For instance, imagine this SQL scenario:
1677
 
     *
1678
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1679
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1680
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1681
 
     *
1682
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1683
 
     */
1684
 
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
1685
 
    {
1686
 
      /* Store the original "read bit" for this field */
1687
 
      bool is_read_set= current_field->isReadSet();
1688
 
 
1689
 
      /* We need to mark that we will "read" this field... */
1690
 
      in_table->setReadSet(current_field->position());
1691
 
 
1692
 
      /* Read the string value of this field's contents */
1693
 
      string_value= current_field->val_str_internal(string_value);
1694
 
 
1695
 
      /* 
1696
 
       * Reset the read bit after reading field to its original state.  This 
1697
 
       * prevents the field from being included in the WHERE clause
1698
 
       */
1699
 
      current_field->setReadSet(is_read_set);
1700
 
 
1701
 
      if (current_field->is_null())
1702
 
      {
1703
 
        record->add_is_null(true);
1704
 
        record->add_after_value("", 0);
1705
 
      }
1706
 
      else
1707
 
      {
1708
 
        record->add_is_null(false);
1709
 
        record->add_after_value(string_value->c_ptr(), string_value->length());
1710
 
      }
1711
 
      string_value->free();
1712
 
    }
1713
 
 
1714
 
    /* 
1715
 
     * Add the WHERE clause values now...for now, this means the
1716
 
     * primary key field value.  Replication only supports tables
1717
 
     * with a primary key.
1718
 
     */
1719
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1720
 
    {
1721
 
      /**
1722
 
       * To say the below is ugly is an understatement. But it works.
1723
 
       * 
1724
 
       * @todo Move this crap into a real Record API.
1725
 
       */
1726
 
      string_value= current_field->val_str_internal(string_value,
1727
 
                                                    old_record + 
1728
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
1729
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1730
 
      string_value->free();
1731
 
    }
1732
 
 
1733
 
  }
1734
 
}
1735
 
 
1736
 
bool TransactionServices::isFieldUpdated(Field *current_field,
1737
 
                                         Table *in_table,
1738
 
                                         const unsigned char *old_record,
1739
 
                                         const unsigned char *new_record)
1740
 
{
1741
 
  /*
1742
 
   * The below really should be moved into the Field API and Record API.  But for now
1743
 
   * we do this crazy pointer fiddling to figure out if the current field
1744
 
   * has been updated in the supplied record raw byte pointers.
1745
 
   */
1746
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1747
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1748
 
 
1749
 
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1750
 
 
1751
 
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1752
 
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1753
 
 
1754
 
  bool isUpdated= false;
1755
 
  if (old_value_is_null != new_value_is_null)
1756
 
  {
1757
 
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1758
 
    {
1759
 
      isUpdated= true;
1760
 
    }
1761
 
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1762
 
    {
1763
 
      isUpdated= true;
1764
 
    }
1765
 
  }
1766
 
 
1767
 
  if (! isUpdated)
1768
 
  {
1769
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1770
 
    {
1771
 
      isUpdated= true;
1772
 
    }
1773
 
  }
1774
 
  return isUpdated;
1775
 
}  
1776
 
 
1777
 
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1778
 
                                                            Table *in_table,
1779
 
                                                            uint32_t *next_segment_id)
1780
 
{
1781
 
  message::Statement *statement= in_session->getStatementMessage();
1782
 
  message::Transaction *transaction= NULL;
1783
 
 
1784
 
  /*
1785
 
   * Check the type for the current Statement message, if it is anything
1786
 
   * other then DELETE we need to call finalize, this will ensure a
1787
 
   * new DeleteStatement is created. If it is of type DELETE check
1788
 
   * what table the DELETE belongs to, if it is a different table
1789
 
   * call finalize, so a new DeleteStatement can be created.
1790
 
   */
1791
 
  if (statement != NULL && statement->type() != message::Statement::DELETE)
1792
 
  {
1793
 
    finalizeStatementMessage(*statement, in_session);
1794
 
    statement= in_session->getStatementMessage();
1795
 
  }
1796
 
  else if (statement != NULL)
1797
 
  {
1798
 
    transaction= getActiveTransactionMessage(in_session);
1799
 
 
1800
 
    /*
1801
 
     * If we've passed our threshold for the statement size (possible for
1802
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1803
 
     * the Transaction will keep it from getting huge).
1804
 
     */
1805
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1806
 
      in_session->variables.transaction_message_threshold)
1807
 
    {
1808
 
      /* Remember the transaction ID so we can re-use it */
1809
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1810
 
 
1811
 
      message::DeleteData *current_data= statement->mutable_delete_data();
1812
 
 
1813
 
      /* Caller should use this value when adding a new record */
1814
 
      *next_segment_id= current_data->segment_id() + 1;
1815
 
 
1816
 
      current_data->set_end_segment(false);
1817
 
 
1818
 
      /* 
1819
 
       * Send the trx message to replicators after finalizing the 
1820
 
       * statement and transaction. This will also set the Transaction
1821
 
       * and Statement objects in Session to NULL.
1822
 
       */
1823
 
      commitTransactionMessage(in_session);
1824
 
 
1825
 
      /*
1826
 
       * Statement and Transaction should now be NULL, so new ones will get
1827
 
       * created. We reuse the transaction id since we are segmenting
1828
 
       * one transaction.
1829
 
       */
1830
 
      statement= in_session->getStatementMessage();
1831
 
      transaction= getActiveTransactionMessage(in_session, false);
1832
 
      assert(transaction != NULL);
1833
 
 
1834
 
      /* Set the transaction ID to match the previous messages */
1835
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1836
 
    }
1837
 
    else
1838
 
    {
1839
 
      const message::DeleteHeader &delete_header= statement->delete_header();
1840
 
      string old_table_name= delete_header.table_metadata().table_name();
1841
 
 
1842
 
      string current_table_name;
1843
 
      (void) in_table->getShare()->getTableName(current_table_name);
1844
 
      if (current_table_name.compare(old_table_name))
1845
 
      {
1846
 
        finalizeStatementMessage(*statement, in_session);
1847
 
        statement= in_session->getStatementMessage();
1848
 
      }
1849
 
      else
1850
 
      {
1851
 
        /* carry forward the existing segment id */
1852
 
        const message::DeleteData &current_data= statement->delete_data();
1853
 
        *next_segment_id= current_data.segment_id();
1854
 
      }
1855
 
    }
1856
 
  }
1857
 
 
1858
 
  if (statement == NULL)
1859
 
  {
1860
 
    /*
1861
 
     * Transaction will be non-NULL only if we had to segment it due to
1862
 
     * transaction size above.
1863
 
     */
1864
 
    if (transaction == NULL)
1865
 
      transaction= getActiveTransactionMessage(in_session);
1866
 
 
1867
 
    /* 
1868
 
     * Transaction message initialized and set, but no statement created
1869
 
     * yet.  We construct one and initialize it, here, then return the
1870
 
     * message after attaching the new Statement message pointer to the 
1871
 
     * Session for easy retrieval later...
1872
 
     */
1873
 
    statement= transaction->add_statement();
1874
 
    setDeleteHeader(*statement, in_session, in_table);
1875
 
    in_session->setStatementMessage(statement);
1876
 
  }
1877
 
  return *statement;
1878
 
}
1879
 
 
1880
 
void TransactionServices::setDeleteHeader(message::Statement &statement,
1881
 
                                          Session *in_session,
1882
 
                                          Table *in_table)
1883
 
{
1884
 
  initStatementMessage(statement, message::Statement::DELETE, in_session);
1885
 
 
1886
 
  /* 
1887
 
   * Now we construct the specialized DeleteHeader message inside
1888
 
   * the generalized message::Statement container...
1889
 
   */
1890
 
  message::DeleteHeader *header= statement.mutable_delete_header();
1891
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1892
 
 
1893
 
  string schema_name;
1894
 
  (void) in_table->getShare()->getSchemaName(schema_name);
1895
 
  string table_name;
1896
 
  (void) in_table->getShare()->getTableName(table_name);
1897
 
 
1898
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1899
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1900
 
 
1901
 
  Field *current_field;
1902
 
  Field **table_fields= in_table->getFields();
1903
 
 
1904
 
  message::FieldMetadata *field_metadata;
1905
 
 
1906
 
  while ((current_field= *table_fields++) != NULL) 
1907
 
  {
1908
 
    /* 
1909
 
     * Add the WHERE clause values now...for now, this means the
1910
 
     * primary key field value.  Replication only supports tables
1911
 
     * with a primary key.
1912
 
     */
1913
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1914
 
    {
1915
 
      field_metadata= header->add_key_field_metadata();
1916
 
      field_metadata->set_name(current_field->field_name);
1917
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1918
 
    }
1919
 
  }
1920
 
}
1921
 
 
1922
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1923
 
{
1924
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1925
 
  if (! replication_services.isActive())
1926
 
    return;
1927
 
 
1928
 
  uint32_t next_segment_id= 1;
1929
 
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1930
 
 
1931
 
  message::DeleteData *data= statement.mutable_delete_data();
1932
 
  data->set_segment_id(next_segment_id);
1933
 
  data->set_end_segment(true);
1934
 
  message::DeleteRecord *record= data->add_record();
1935
 
 
1936
 
  Field *current_field;
1937
 
  Field **table_fields= in_table->getFields();
1938
 
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1939
 
  string_value->set_charset(system_charset_info);
1940
 
 
1941
 
  while ((current_field= *table_fields++) != NULL) 
1942
 
  {
1943
 
    /* 
1944
 
     * Add the WHERE clause values now...for now, this means the
1945
 
     * primary key field value.  Replication only supports tables
1946
 
     * with a primary key.
1947
 
     */
1948
 
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1949
 
    {
1950
 
      if (use_update_record)
1951
 
      {
1952
 
        /*
1953
 
         * Temporarily point to the update record to get its value.
1954
 
         * This is pretty much a hack in order to get the PK value from
1955
 
         * the update record rather than the insert record. Field::val_str()
1956
 
         * should not change anything in Field::ptr, so this should be safe.
1957
 
         * We are careful not to change anything in old_ptr.
1958
 
         */
1959
 
        const unsigned char *old_ptr= current_field->ptr;
1960
 
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1961
 
        string_value= current_field->val_str_internal(string_value);
1962
 
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1963
 
      }
1964
 
      else
1965
 
      {
1966
 
        string_value= current_field->val_str_internal(string_value);
1967
 
        /**
1968
 
         * @TODO Store optional old record value in the before data member
1969
 
         */
1970
 
      }
1971
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1972
 
      string_value->free();
1973
 
    }
1974
 
  }
1975
 
}
1976
 
 
1977
 
void TransactionServices::createTable(Session *in_session,
1978
 
                                      const message::Table &table)
1979
 
{
1980
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
 
  if (! replication_services.isActive())
1982
 
    return;
1983
 
  
1984
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
1985
 
  message::Statement *statement= transaction->add_statement();
1986
 
 
1987
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1988
 
 
1989
 
  /* 
1990
 
   * Construct the specialized CreateTableStatement message and attach
1991
 
   * it to the generic Statement message
1992
 
   */
1993
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1994
 
  message::Table *new_table_message= create_table_statement->mutable_table();
1995
 
  *new_table_message= table;
1996
 
 
1997
 
  finalizeStatementMessage(*statement, in_session);
1998
 
 
1999
 
  finalizeTransactionMessage(*transaction, in_session);
2000
 
  
2001
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2002
 
 
2003
 
  cleanupTransactionMessage(transaction, in_session);
2004
 
 
2005
 
}
2006
 
 
2007
 
void TransactionServices::createSchema(Session *in_session,
2008
 
                                       const message::Schema &schema)
2009
 
{
2010
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2011
 
  if (! replication_services.isActive())
2012
 
    return;
2013
 
  
2014
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2015
 
  message::Statement *statement= transaction->add_statement();
2016
 
 
2017
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
2018
 
 
2019
 
  /* 
2020
 
   * Construct the specialized CreateSchemaStatement message and attach
2021
 
   * it to the generic Statement message
2022
 
   */
2023
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
2024
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
2025
 
  *new_schema_message= schema;
2026
 
 
2027
 
  finalizeStatementMessage(*statement, in_session);
2028
 
 
2029
 
  finalizeTransactionMessage(*transaction, in_session);
2030
 
  
2031
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2032
 
 
2033
 
  cleanupTransactionMessage(transaction, in_session);
2034
 
 
2035
 
}
2036
 
 
2037
 
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
2038
 
{
2039
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2040
 
  if (! replication_services.isActive())
2041
 
    return;
2042
 
  
2043
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2044
 
  message::Statement *statement= transaction->add_statement();
2045
 
 
2046
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
2047
 
 
2048
 
  /* 
2049
 
   * Construct the specialized DropSchemaStatement message and attach
2050
 
   * it to the generic Statement message
2051
 
   */
2052
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
2053
 
 
2054
 
  drop_schema_statement->set_schema_name(schema_name);
2055
 
 
2056
 
  finalizeStatementMessage(*statement, in_session);
2057
 
 
2058
 
  finalizeTransactionMessage(*transaction, in_session);
2059
 
  
2060
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2061
 
 
2062
 
  cleanupTransactionMessage(transaction, in_session);
2063
 
}
2064
 
 
2065
 
void TransactionServices::dropTable(Session *in_session,
2066
 
                                    const string &schema_name,
2067
 
                                    const string &table_name,
2068
 
                                    bool if_exists)
2069
 
{
2070
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2071
 
  if (! replication_services.isActive())
2072
 
    return;
2073
 
  
2074
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2075
 
  message::Statement *statement= transaction->add_statement();
2076
 
 
2077
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
2078
 
 
2079
 
  /* 
2080
 
   * Construct the specialized DropTableStatement message and attach
2081
 
   * it to the generic Statement message
2082
 
   */
2083
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2084
 
 
2085
 
  drop_table_statement->set_if_exists_clause(if_exists);
2086
 
 
2087
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2088
 
 
2089
 
  table_metadata->set_schema_name(schema_name);
2090
 
  table_metadata->set_table_name(table_name);
2091
 
 
2092
 
  finalizeStatementMessage(*statement, in_session);
2093
 
 
2094
 
  finalizeTransactionMessage(*transaction, in_session);
2095
 
  
2096
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2097
 
 
2098
 
  cleanupTransactionMessage(transaction, in_session);
2099
 
}
2100
 
 
2101
 
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
2102
 
{
2103
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2104
 
  if (! replication_services.isActive())
2105
 
    return;
2106
 
  
2107
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2108
 
  message::Statement *statement= transaction->add_statement();
2109
 
 
2110
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
2111
 
 
2112
 
  /* 
2113
 
   * Construct the specialized TruncateTableStatement message and attach
2114
 
   * it to the generic Statement message
2115
 
   */
2116
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2117
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2118
 
 
2119
 
  string schema_name;
2120
 
  (void) in_table->getShare()->getSchemaName(schema_name);
2121
 
  string table_name;
2122
 
  (void) in_table->getShare()->getTableName(table_name);
2123
 
 
2124
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2125
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2126
 
 
2127
 
  finalizeStatementMessage(*statement, in_session);
2128
 
 
2129
 
  finalizeTransactionMessage(*transaction, in_session);
2130
 
  
2131
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2132
 
 
2133
 
  cleanupTransactionMessage(transaction, in_session);
2134
 
}
2135
 
 
2136
 
void TransactionServices::rawStatement(Session *in_session, const string &query)
2137
 
{
2138
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2139
 
  if (! replication_services.isActive())
2140
 
    return;
2141
 
 
2142
 
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
2143
 
  message::Statement *statement= transaction->add_statement();
2144
 
 
2145
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
2146
 
  statement->set_sql(query);
2147
 
  finalizeStatementMessage(*statement, in_session);
2148
 
 
2149
 
  finalizeTransactionMessage(*transaction, in_session);
2150
 
  
2151
 
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
2152
 
 
2153
 
  cleanupTransactionMessage(transaction, in_session);
2154
 
}
2155
 
 
2156
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2157
 
{
2158
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2159
 
  if (! replication_services.isActive())
2160
 
    return 0;
2161
 
 
2162
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
2163
 
 
2164
 
  // set server id, start timestamp
2165
 
  initTransactionMessage(*transaction, session, true);
2166
 
 
2167
 
  // set end timestamp
2168
 
  finalizeTransactionMessage(*transaction, session);
2169
 
 
2170
 
  message::Event *trx_event= transaction->mutable_event();
2171
 
 
2172
 
  trx_event->CopyFrom(event);
2173
 
 
2174
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2175
 
 
2176
 
  delete transaction;
2177
 
 
2178
 
  return static_cast<int>(result);
2179
 
}
2180
 
 
2181
 
bool TransactionServices::sendStartupEvent(Session *session)
2182
 
{
2183
 
  message::Event event;
2184
 
  event.set_type(message::Event::STARTUP);
2185
 
  if (sendEvent(session, event) != 0)
2186
 
    return false;
2187
 
  return true;
2188
 
}
2189
 
 
2190
 
bool TransactionServices::sendShutdownEvent(Session *session)
2191
 
{
2192
 
  message::Event event;
2193
 
  event.set_type(message::Event::SHUTDOWN);
2194
 
  if (sendEvent(session, event) != 0)
2195
 
    return false;
2196
 
  return true;
2197
 
}
2198
 
 
2199
 
} /* namespace drizzled */