~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2008-09-16 00:00:48 UTC
  • mto: This revision was merged to the branch mainline in revision 391.
  • Revision ID: monty@inaugust.com-20080916000048-3rvrv3gv9l0ad3gs
Fixed copyright headers in drizzled/

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  This program is free software; you can redistribute it and/or modify
8
 
 *  it under the terms of the GNU General Public License as published by
9
 
 *  the Free Software Foundation; version 2 of the License.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
/**
22
 
 * @file Transaction processing code
23
 
 *
24
 
 * @note
25
 
 *
26
 
 * The TransactionServices component takes internal events (for instance the start of a 
27
 
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 
 * component and used during replication.
30
 
 *
31
 
 * The reason for this functionality is to encapsulate all communication
32
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 
 * Instead of the plugin having to understand the (often fluidly changing)
34
 
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 
 * these messages.
37
 
 *
38
 
 * @see /drizzled/message/transaction.proto
39
 
 *
40
 
 * @todo
41
 
 *
42
 
 * We really should store the raw bytes in the messages, not the
43
 
 * String value of the Field.  But, to do that, the
44
 
 * statement_transform library needs first to be updated
45
 
 * to include the transformation code to convert raw
46
 
 * Drizzle-internal Field byte representation into something
47
 
 * plugins can understand.
48
 
 */
49
 
 
50
 
#include "config.h"
51
 
#include "drizzled/my_hash.h"
52
 
#include "drizzled/error.h"
53
 
#include "drizzled/gettext.h"
54
 
#include "drizzled/probes.h"
55
 
#include "drizzled/sql_parse.h"
56
 
#include "drizzled/session.h"
57
 
#include "drizzled/sql_base.h"
58
 
#include "drizzled/replication_services.h"
59
 
#include "drizzled/transaction_services.h"
60
 
#include "drizzled/transaction_context.h"
61
 
#include "drizzled/message/transaction.pb.h"
62
 
#include "drizzled/message/statement_transform.h"
63
 
#include "drizzled/resource_context.h"
64
 
#include "drizzled/lock.h"
65
 
#include "drizzled/item/int.h"
66
 
#include "drizzled/item/empty_string.h"
67
 
#include "drizzled/field/epoch.h"
68
 
#include "drizzled/plugin/client.h"
69
 
#include "drizzled/plugin/monitored_in_transaction.h"
70
 
#include "drizzled/plugin/transactional_storage_engine.h"
71
 
#include "drizzled/plugin/xa_resource_manager.h"
72
 
#include "drizzled/plugin/xa_storage_engine.h"
73
 
#include "drizzled/internal/my_sys.h"
74
 
 
75
 
#include <vector>
76
 
#include <algorithm>
77
 
#include <functional>
78
 
#include <google/protobuf/repeated_field.h>
79
 
 
80
 
using namespace std;
81
 
using namespace google;
82
 
 
83
 
namespace drizzled
84
 
{
85
 
 
86
 
/**
87
 
 * @defgroup Transactions
88
 
 *
89
 
 * @brief
90
 
 *
91
 
 * Transaction handling in the server
92
 
 *
93
 
 * @detail
94
 
 *
95
 
 * In each client connection, Drizzle maintains two transaction
96
 
 * contexts representing the state of the:
97
 
 *
98
 
 * 1) Statement Transaction
99
 
 * 2) Normal Transaction
100
 
 *
101
 
 * These two transaction contexts represent the transactional
102
 
 * state of a Session's SQL and XA transactions for a single
103
 
 * SQL statement or a series of SQL statements.
104
 
 *
105
 
 * When the Session's connection is in AUTOCOMMIT mode, there
106
 
 * is no practical difference between the statement and the
107
 
 * normal transaction, as each SQL statement is committed or
108
 
 * rolled back depending on the success or failure of the
109
 
 * indvidual SQL statement.
110
 
 *
111
 
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
112
 
 * the Session has explicitly begun a normal SQL transaction using
113
 
 * a BEGIN WORK/START TRANSACTION statement, then the normal
114
 
 * transaction context tracks the aggregate transaction state of
115
 
 * the SQL transaction's individual statements, and the SQL
116
 
 * transaction's commit or rollback is done atomically for all of
117
 
 * the SQL transaction's statement's data changes.
118
 
 *
119
 
 * Technically, a statement transaction can be viewed as a savepoint 
120
 
 * which is maintained automatically in order to make effects of one
121
 
 * statement atomic.
122
 
 *
123
 
 * The normal transaction is started by the user and is typically
124
 
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
125
 
 * The exception to this is that DDL statements implicitly COMMIT
126
 
 * any previously active normal transaction before they begin executing.
127
 
 *
128
 
 * In Drizzle, unlike MySQL, plugins other than a storage engine
129
 
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
130
 
 * plugins will automatically be monitored by Drizzle's transaction 
131
 
 * manager (implemented in this source file), as will all plugins which
132
 
 * implement plugin::XaResourceManager and register with the transaction
133
 
 * manager.
134
 
 *
135
 
 * If Drizzle's transaction manager sees that more than one resource
136
 
 * manager (transactional storage engine or XA resource manager) has modified
137
 
 * data state during a statement or normal transaction, the transaction
138
 
 * manager will automatically use a two-phase commit protocol for all
139
 
 * resources which support XA's distributed transaction protocol.  Unlike
140
 
 * MySQL, storage engines need not manually register with the transaction
141
 
 * manager during a statement's execution.  Previously, in MySQL, all
142
 
 * handlertons would have to call trans_register_ha() at some point after
143
 
 * modifying data state in order to have MySQL include that handler in
144
 
 * an XA transaction.  Drizzle does all of this grunt work behind the
145
 
 * scenes for the storage engine implementers.
146
 
 *
147
 
 * When a connection is closed, the current normal transaction, if
148
 
 * any is currently active, is rolled back.
149
 
 *
150
 
 * Transaction life cycle
151
 
 * ----------------------
152
 
 *
153
 
 * When a new connection is established, session->transaction
154
 
 * members are initialized to an empty state. If a statement uses any tables, 
155
 
 * all affected engines are registered in the statement engine list automatically
156
 
 * in plugin::StorageEngine::startStatement() and 
157
 
 * plugin::TransactionalStorageEngine::startTransaction().
158
 
 *
159
 
 * You can view the lifetime of a normal transaction in the following
160
 
 * call-sequence:
161
 
 *
162
 
 * drizzled::statement::Statement::execute()
163
 
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
164
 
 *     drizzled::TransactionServices::registerResourceForTransaction()
165
 
 *     drizzled::TransactionServices::registerResourceForStatement()
166
 
 *     drizzled::plugin::StorageEngine::startStatement()
167
 
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
168
 
 *     drizzled::plugin::StorageEngine::endStatement()
169
 
 *   drizzled::TransactionServices::autocommitOrRollback()
170
 
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
171
 
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
172
 
 *
173
 
 * Roles and responsibilities
174
 
 * --------------------------
175
 
 *
176
 
 * Beginning of SQL Statement (and Statement Transaction)
177
 
 * ------------------------------------------------------
178
 
 *
179
 
 * At the start of each SQL statement, for each storage engine
180
 
 * <strong>that is involved in the SQL statement</strong>, the kernel 
181
 
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
182
 
 * engine needs to track some data for the statement, it should use
183
 
 * this method invocation to initialize this data.  This is the
184
 
 * beginning of what is called the "statement transaction".
185
 
 *
186
 
 * <strong>For transaction storage engines (those storage engines
187
 
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
188
 
 * kernel automatically determines if the start of the SQL statement 
189
 
 * transaction should <em>also</em> begin the normal SQL transaction.
190
 
 * This occurs when the connection is in NOT in autocommit mode. If
191
 
 * the kernel detects this, then the kernel automatically starts the
192
 
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
193
 
 * method and then calls plugin::StorageEngine::startStatement()
194
 
 * afterwards.
195
 
 *
196
 
 * Beginning of an SQL "Normal" Transaction
197
 
 * ----------------------------------------
198
 
 *
199
 
 * As noted above, a "normal SQL transaction" may be started when
200
 
 * an SQL statement is started in a connection and the connection is
201
 
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
202
 
 *
203
 
 * In addition, when a user executes a START TRANSACTION or
204
 
 * BEGIN WORK statement in a connection, the kernel explicitly
205
 
 * calls each transactional storage engine's startTransaction() method.
206
 
 *
207
 
 * Ending of an SQL Statement (and Statement Transaction)
208
 
 * ------------------------------------------------------
209
 
 *
210
 
 * At the end of each SQL statement, for each of the aforementioned
211
 
 * involved storage engines, the kernel calls the engine's
212
 
 * plugin::StorageEngine::endStatement() method.  If the engine
213
 
 * has initialized or modified some internal data about the
214
 
 * statement transaction, it should use this method to reset or destroy
215
 
 * this data appropriately.
216
 
 *
217
 
 * Ending of an SQL "Normal" Transaction
218
 
 * -------------------------------------
219
 
 *
220
 
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
221
 
 * depending on the success or failure of the statement transaction(s) 
222
 
 * it encloses.
223
 
 *
224
 
 * The end of a "normal transaction" occurs when any of the following
225
 
 * occurs:
226
 
 *
227
 
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
228
 
 *    then the normal transaction which encloses the statement
229
 
 *    transaction ends
230
 
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
231
 
 * 3) Just before a DDL operation occurs, the kernel will implicitly
232
 
 *    commit the active normal transaction
233
 
 *
234
 
 * Transactions and Non-transactional Storage Engines
235
 
 * --------------------------------------------------
236
 
 *
237
 
 * For non-transactional engines, this call can be safely ignored, an
238
 
 * the kernel tracks whether a non-transactional engine has changed
239
 
 * any data state, and warns the user appropriately if a transaction
240
 
 * (statement or normal) is rolled back after such non-transactional
241
 
 * data changes have been made.
242
 
 *
243
 
 * XA Two-phase Commit Protocol
244
 
 * ----------------------------
245
 
 *
246
 
 * During statement execution, whenever any of data-modifying
247
 
 * PSEA API methods is used, e.g. Cursor::write_row() or
248
 
 * Cursor::update_row(), the read-write flag is raised in the
249
 
 * statement transaction for the involved engine.
250
 
 * Currently All PSEA calls are "traced", and the data can not be
251
 
 * changed in a way other than issuing a PSEA call. Important:
252
 
 * unless this invariant is preserved the server will not know that
253
 
 * a transaction in a given engine is read-write and will not
254
 
 * involve the two-phase commit protocol!
255
 
 *
256
 
 * At the end of a statement, TransactionServices::autocommitOrRollback()
257
 
 * is invoked. This call in turn
258
 
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
259
 
 * resource manager.
260
 
 *
261
 
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
262
 
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
263
 
 * is...)
264
 
 * 
265
 
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
266
 
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
267
 
 * At statement commit, the statement-related read-write engine
268
 
 * flag is propagated to the corresponding flag in the normal
269
 
 * transaction.  When the commit is complete, the list of registered
270
 
 * engines is cleared.
271
 
 *
272
 
 * Rollback is handled in a similar fashion.
273
 
 *
274
 
 * Additional notes on DDL and the normal transaction.
275
 
 * ---------------------------------------------------
276
 
 *
277
 
 * CREATE TABLE .. SELECT can start a *new* normal transaction
278
 
 * because of the fact that SELECTs on a transactional storage
279
 
 * engine participate in the normal SQL transaction (due to
280
 
 * isolation level issues and consistent read views).
281
 
 *
282
 
 * Behaviour of the server in this case is currently badly
283
 
 * defined.
284
 
 *
285
 
 * DDL statements use a form of "semantic" logging
286
 
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
287
 
 * the newly created table is deleted.
288
 
 * 
289
 
 * In addition, some DDL statements issue interim transaction
290
 
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
291
 
 * from the original table to the internal temporary table. Other
292
 
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
293
 
 * after itself.
294
 
 *
295
 
 * And finally there is a group of DDL statements such as
296
 
 * RENAME/DROP TABLE that doesn't start a new transaction
297
 
 * and doesn't commit.
298
 
 *
299
 
 * A consistent behaviour is perhaps to always commit the normal
300
 
 * transaction after all DDLs, just like the statement transaction
301
 
 * is always committed at the end of all statements.
302
 
 */
303
 
TransactionServices::TransactionServices()
304
 
{
305
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
306
 
  if (engine)
307
 
  {
308
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
309
 
  }
310
 
  else 
311
 
  {
312
 
    xa_storage_engine= NULL;
313
 
  }
314
 
}
315
 
 
316
 
void TransactionServices::registerResourceForStatement(Session::reference session,
317
 
                                                       plugin::MonitoredInTransaction *monitored,
318
 
                                                       plugin::TransactionalStorageEngine *engine)
319
 
{
320
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
321
 
  {
322
 
    /* 
323
 
     * Now we automatically register this resource manager for the
324
 
     * normal transaction.  This is fine because a statement
325
 
     * transaction registration should always enlist the resource
326
 
     * in the normal transaction which contains the statement
327
 
     * transaction.
328
 
     */
329
 
    registerResourceForTransaction(session, monitored, engine);
330
 
  }
331
 
 
332
 
  TransactionContext *trans= &session.transaction.stmt;
333
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
334
 
 
335
 
  if (resource_context->isStarted())
336
 
    return; /* already registered, return */
337
 
 
338
 
  assert(monitored->participatesInSqlTransaction());
339
 
  assert(not monitored->participatesInXaTransaction());
340
 
 
341
 
  resource_context->setMonitored(monitored);
342
 
  resource_context->setTransactionalStorageEngine(engine);
343
 
  trans->registerResource(resource_context);
344
 
 
345
 
  trans->no_2pc|= true;
346
 
}
347
 
 
348
 
void TransactionServices::registerResourceForStatement(Session::reference session,
349
 
                                                       plugin::MonitoredInTransaction *monitored,
350
 
                                                       plugin::TransactionalStorageEngine *engine,
351
 
                                                       plugin::XaResourceManager *resource_manager)
352
 
{
353
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
354
 
  {
355
 
    /* 
356
 
     * Now we automatically register this resource manager for the
357
 
     * normal transaction.  This is fine because a statement
358
 
     * transaction registration should always enlist the resource
359
 
     * in the normal transaction which contains the statement
360
 
     * transaction.
361
 
     */
362
 
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
 
  }
364
 
 
365
 
  TransactionContext *trans= &session.transaction.stmt;
366
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
367
 
 
368
 
  if (resource_context->isStarted())
369
 
    return; /* already registered, return */
370
 
 
371
 
  assert(monitored->participatesInXaTransaction());
372
 
  assert(monitored->participatesInSqlTransaction());
373
 
 
374
 
  resource_context->setMonitored(monitored);
375
 
  resource_context->setTransactionalStorageEngine(engine);
376
 
  resource_context->setXaResourceManager(resource_manager);
377
 
  trans->registerResource(resource_context);
378
 
 
379
 
  trans->no_2pc|= false;
380
 
}
381
 
 
382
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
383
 
                                                         plugin::MonitoredInTransaction *monitored,
384
 
                                                         plugin::TransactionalStorageEngine *engine)
385
 
{
386
 
  TransactionContext *trans= &session.transaction.all;
387
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
388
 
 
389
 
  if (resource_context->isStarted())
390
 
    return; /* already registered, return */
391
 
 
392
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
393
 
 
394
 
  trans->registerResource(resource_context);
395
 
 
396
 
  assert(monitored->participatesInSqlTransaction());
397
 
  assert(not monitored->participatesInXaTransaction());
398
 
 
399
 
  resource_context->setMonitored(monitored);
400
 
  resource_context->setTransactionalStorageEngine(engine);
401
 
  trans->no_2pc|= true;
402
 
 
403
 
  if (session.transaction.xid_state.xid.is_null())
404
 
    session.transaction.xid_state.xid.set(session.getQueryId());
405
 
 
406
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
408
 
    registerResourceForStatement(session, monitored, engine);
409
 
}
410
 
 
411
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
412
 
                                                         plugin::MonitoredInTransaction *monitored,
413
 
                                                         plugin::TransactionalStorageEngine *engine,
414
 
                                                         plugin::XaResourceManager *resource_manager)
415
 
{
416
 
  TransactionContext *trans= &session.transaction.all;
417
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
418
 
 
419
 
  if (resource_context->isStarted())
420
 
    return; /* already registered, return */
421
 
 
422
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
423
 
 
424
 
  trans->registerResource(resource_context);
425
 
 
426
 
  assert(monitored->participatesInSqlTransaction());
427
 
 
428
 
  resource_context->setMonitored(monitored);
429
 
  resource_context->setXaResourceManager(resource_manager);
430
 
  resource_context->setTransactionalStorageEngine(engine);
431
 
  trans->no_2pc|= true;
432
 
 
433
 
  if (session.transaction.xid_state.xid.is_null())
434
 
    session.transaction.xid_state.xid.set(session.getQueryId());
435
 
 
436
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
437
 
 
438
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
440
 
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
 
}
442
 
 
443
 
void TransactionServices::allocateNewTransactionId()
444
 
{
445
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
446
 
  if (! replication_services.isActive())
447
 
  {
448
 
    return;
449
 
  }
450
 
 
451
 
  Session *my_session= current_session;
452
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
 
  my_session->setXaId(xa_id);
454
 
}
455
 
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
457
 
{
458
 
  if (session.getXaId() == 0)
459
 
  {
460
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
461
 
  }
462
 
 
463
 
  return session.getXaId();
464
 
}
465
 
 
466
 
int TransactionServices::commitTransaction(Session::reference session,
467
 
                                           bool normal_transaction)
468
 
{
469
 
  int error= 0, cookie= 0;
470
 
  /*
471
 
    'all' means that this is either an explicit commit issued by
472
 
    user, or an implicit commit issued by a DDL.
473
 
  */
474
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
475
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
476
 
 
477
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
478
 
 
479
 
  /*
480
 
    We must not commit the normal transaction if a statement
481
 
    transaction is pending. Otherwise statement transaction
482
 
    flags will not get propagated to its normal transaction's
483
 
    counterpart.
484
 
  */
485
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
486
 
              trans == &session.transaction.stmt);
487
 
 
488
 
  if (resource_contexts.empty() == false)
489
 
  {
490
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
491
 
    {
492
 
      rollbackTransaction(session, normal_transaction);
493
 
      return 1;
494
 
    }
495
 
 
496
 
    /*
497
 
     * If replication is on, we do a PREPARE on the resource managers, push the
498
 
     * Transaction message across the replication stream, and then COMMIT if the
499
 
     * replication stream returned successfully.
500
 
     */
501
 
    if (shouldConstructMessages())
502
 
    {
503
 
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
504
 
           it != resource_contexts.end() && ! error;
505
 
           ++it)
506
 
      {
507
 
        ResourceContext *resource_context= *it;
508
 
        int err;
509
 
        /*
510
 
          Do not call two-phase commit if this particular
511
 
          transaction is read-only. This allows for simpler
512
 
          implementation in engines that are always read-only.
513
 
        */
514
 
        if (! resource_context->hasModifiedData())
515
 
          continue;
516
 
 
517
 
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
518
 
 
519
 
        if (resource->participatesInXaTransaction())
520
 
        {
521
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
522
 
          {
523
 
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
524
 
            error= 1;
525
 
          }
526
 
          else
527
 
          {
528
 
            session.status_var.ha_prepare_count++;
529
 
          }
530
 
        }
531
 
      }
532
 
      if (error == 0 && is_real_trans)
533
 
      {
534
 
        /*
535
 
         * Push the constructed Transaction messages across to
536
 
         * replicators and appliers.
537
 
         */
538
 
        error= commitTransactionMessage(session);
539
 
      }
540
 
      if (error)
541
 
      {
542
 
        rollbackTransaction(session, normal_transaction);
543
 
        error= 1;
544
 
        goto end;
545
 
      }
546
 
    }
547
 
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
548
 
end:
549
 
    if (is_real_trans)
550
 
      session.startWaitingGlobalReadLock();
551
 
  }
552
 
  return error;
553
 
}
554
 
 
555
 
/**
556
 
  @note
557
 
  This function does not care about global read lock. A caller should.
558
 
*/
559
 
int TransactionServices::commitPhaseOne(Session::reference session,
560
 
                                        bool normal_transaction)
561
 
{
562
 
  int error=0;
563
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
564
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
565
 
 
566
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
567
 
  bool all= normal_transaction;
568
 
 
569
 
  /* If we're in autocommit then we have a real transaction to commit
570
 
     (except if it's BEGIN)
571
 
  */
572
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
573
 
    all= true;
574
 
 
575
 
  if (resource_contexts.empty() == false)
576
 
  {
577
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
578
 
         it != resource_contexts.end();
579
 
         ++it)
580
 
    {
581
 
      int err;
582
 
      ResourceContext *resource_context= *it;
583
 
 
584
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
585
 
 
586
 
      if (resource->participatesInXaTransaction())
587
 
      {
588
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
589
 
        {
590
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
591
 
          error= 1;
592
 
        }
593
 
        else if (normal_transaction)
594
 
        {
595
 
          session.status_var.ha_commit_count++;
596
 
        }
597
 
      }
598
 
      else if (resource->participatesInSqlTransaction())
599
 
      {
600
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
601
 
        {
602
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
603
 
          error= 1;
604
 
        }
605
 
        else if (normal_transaction)
606
 
        {
607
 
          session.status_var.ha_commit_count++;
608
 
        }
609
 
      }
610
 
      resource_context->reset(); /* keep it conveniently zero-filled */
611
 
    }
612
 
 
613
 
    if (is_real_trans)
614
 
      session.transaction.xid_state.xid.null();
615
 
 
616
 
    if (normal_transaction)
617
 
    {
618
 
      session.variables.tx_isolation= session.session_tx_isolation;
619
 
      session.transaction.cleanup();
620
 
    }
621
 
  }
622
 
  trans->reset();
623
 
  return error;
624
 
}
625
 
 
626
 
int TransactionServices::rollbackTransaction(Session::reference session,
627
 
                                             bool normal_transaction)
628
 
{
629
 
  int error= 0;
630
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
631
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
632
 
 
633
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
634
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
635
 
 
636
 
  /*
637
 
    We must not rollback the normal transaction if a statement
638
 
    transaction is pending.
639
 
  */
640
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
641
 
              trans == &session.transaction.stmt);
642
 
 
643
 
  if (resource_contexts.empty() == false)
644
 
  {
645
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
646
 
         it != resource_contexts.end();
647
 
         ++it)
648
 
    {
649
 
      int err;
650
 
      ResourceContext *resource_context= *it;
651
 
 
652
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
653
 
 
654
 
      if (resource->participatesInXaTransaction())
655
 
      {
656
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
657
 
        {
658
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
659
 
          error= 1;
660
 
        }
661
 
        else if (normal_transaction)
662
 
        {
663
 
          session.status_var.ha_rollback_count++;
664
 
        }
665
 
      }
666
 
      else if (resource->participatesInSqlTransaction())
667
 
      {
668
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
669
 
        {
670
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
671
 
          error= 1;
672
 
        }
673
 
        else if (normal_transaction)
674
 
        {
675
 
          session.status_var.ha_rollback_count++;
676
 
        }
677
 
      }
678
 
      resource_context->reset(); /* keep it conveniently zero-filled */
679
 
    }
680
 
    
681
 
    /* 
682
 
     * We need to signal the ROLLBACK to ReplicationServices here
683
 
     * BEFORE we set the transaction ID to NULL.  This is because
684
 
     * if a bulk segment was sent to replicators, we need to send
685
 
     * a rollback statement with the corresponding transaction ID
686
 
     * to rollback.
687
 
     */
688
 
    if (all)
689
 
      rollbackTransactionMessage(session);
690
 
    else
691
 
      rollbackStatementMessage(session);
692
 
 
693
 
    if (is_real_trans)
694
 
      session.transaction.xid_state.xid.null();
695
 
    if (normal_transaction)
696
 
    {
697
 
      session.variables.tx_isolation=session.session_tx_isolation;
698
 
      session.transaction.cleanup();
699
 
    }
700
 
  }
701
 
  if (normal_transaction)
702
 
    session.transaction_rollback_request= false;
703
 
 
704
 
  /*
705
 
   * If a non-transactional table was updated, warn the user
706
 
   */
707
 
  if (is_real_trans &&
708
 
      session.transaction.all.hasModifiedNonTransData() &&
709
 
      session.getKilled() != Session::KILL_CONNECTION)
710
 
  {
711
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
712
 
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
713
 
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
714
 
  }
715
 
  trans->reset();
716
 
  return error;
717
 
}
718
 
 
719
 
int TransactionServices::autocommitOrRollback(Session::reference session,
720
 
                                              int error)
721
 
{
722
 
  /* One GPB Statement message per SQL statement */
723
 
  message::Statement *statement= session.getStatementMessage();
724
 
  if ((statement != NULL) && (! error))
725
 
    finalizeStatementMessage(*statement, session);
726
 
 
727
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
728
 
  {
729
 
    TransactionContext *trans = &session.transaction.stmt;
730
 
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
731
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
732
 
         it != resource_contexts.end();
733
 
         ++it)
734
 
    {
735
 
      ResourceContext *resource_context= *it;
736
 
 
737
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
738
 
    }
739
 
 
740
 
    if (! error)
741
 
    {
742
 
      if (commitTransaction(session, false))
743
 
        error= 1;
744
 
    }
745
 
    else
746
 
    {
747
 
      (void) rollbackTransaction(session, false);
748
 
      if (session.transaction_rollback_request)
749
 
      {
750
 
        (void) rollbackTransaction(session, true);
751
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
752
 
      }
753
 
    }
754
 
 
755
 
    session.variables.tx_isolation= session.session_tx_isolation;
756
 
  }
757
 
 
758
 
  return error;
759
 
}
760
 
 
761
 
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
762
 
{
763
 
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
764
 
  {
765
 
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
766
 
     * resources aren't the same... */
767
 
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
768
 
  }
769
 
};
770
 
 
771
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
772
 
                                             NamedSavepoint &sv)
773
 
{
774
 
  int error= 0;
775
 
  TransactionContext *trans= &session.transaction.all;
776
 
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
777
 
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
778
 
 
779
 
  trans->no_2pc= false;
780
 
  /*
781
 
    rolling back to savepoint in all storage engines that were part of the
782
 
    transaction when the savepoint was set
783
 
  */
784
 
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
785
 
       it != sv_resource_contexts.end();
786
 
       ++it)
787
 
  {
788
 
    int err;
789
 
    ResourceContext *resource_context= *it;
790
 
 
791
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
792
 
 
793
 
    if (resource->participatesInSqlTransaction())
794
 
    {
795
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
796
 
      {
797
 
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
798
 
        error= 1;
799
 
      }
800
 
      else
801
 
      {
802
 
        session.status_var.ha_savepoint_rollback_count++;
803
 
      }
804
 
    }
805
 
    trans->no_2pc|= not resource->participatesInXaTransaction();
806
 
  }
807
 
  /*
808
 
    rolling back the transaction in all storage engines that were not part of
809
 
    the transaction when the savepoint was set
810
 
  */
811
 
  {
812
 
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
813
 
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
814
 
    TransactionContext::ResourceContexts set_difference_contexts;
815
 
 
816
 
    /* 
817
 
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
818
 
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
819
 
     * here
820
 
     */
821
 
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
822
 
 
823
 
    sort(sorted_tran_resource_contexts.begin(),
824
 
         sorted_tran_resource_contexts.end(),
825
 
         ResourceContextCompare());
826
 
    sort(sorted_sv_resource_contexts.begin(),
827
 
         sorted_sv_resource_contexts.end(),
828
 
         ResourceContextCompare());
829
 
    set_difference(sorted_tran_resource_contexts.begin(),
830
 
                   sorted_tran_resource_contexts.end(),
831
 
                   sorted_sv_resource_contexts.begin(),
832
 
                   sorted_sv_resource_contexts.end(),
833
 
                   set_difference_contexts.begin(),
834
 
                   ResourceContextCompare());
835
 
    /* 
836
 
     * set_difference_contexts now contains all resource contexts
837
 
     * which are in the transaction context but were NOT in the
838
 
     * savepoint's resource contexts.
839
 
     */
840
 
        
841
 
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
842
 
         it != set_difference_contexts.end();
843
 
         ++it)
844
 
    {
845
 
      ResourceContext *resource_context= *it;
846
 
      int err;
847
 
 
848
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
849
 
 
850
 
      if (resource->participatesInSqlTransaction())
851
 
      {
852
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
853
 
        {
854
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
855
 
          error= 1;
856
 
        }
857
 
        else
858
 
        {
859
 
          session.status_var.ha_rollback_count++;
860
 
        }
861
 
      }
862
 
      resource_context->reset(); /* keep it conveniently zero-filled */
863
 
    }
864
 
  }
865
 
  trans->setResourceContexts(sv_resource_contexts);
866
 
 
867
 
  if (shouldConstructMessages())
868
 
  {
869
 
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
870
 
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
871
 
    if (savepoint_transaction != NULL)
872
 
    {
873
 
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
874
 
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
875
 
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
876
 
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
877
 
      */ 
878
 
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
879
 
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
880
 
      if (num_statements == 0)
881
 
      {    
882
 
        session.setStatementMessage(NULL);
883
 
      }    
884
 
      else 
885
 
      {
886
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
887
 
      }    
888
 
      session.setTransactionMessage(savepoint_transaction_copy);
889
 
    }
890
 
  }
891
 
 
892
 
  return error;
893
 
}
894
 
 
895
 
/**
896
 
  @note
897
 
  according to the sql standard (ISO/IEC 9075-2:2003)
898
 
  section "4.33.4 SQL-statements and transaction states",
899
 
  NamedSavepoint is *not* transaction-initiating SQL-statement
900
 
*/
901
 
int TransactionServices::setSavepoint(Session::reference session,
902
 
                                      NamedSavepoint &sv)
903
 
{
904
 
  int error= 0;
905
 
  TransactionContext *trans= &session.transaction.all;
906
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
907
 
 
908
 
  if (resource_contexts.empty() == false)
909
 
  {
910
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
911
 
         it != resource_contexts.end();
912
 
         ++it)
913
 
    {
914
 
      ResourceContext *resource_context= *it;
915
 
      int err;
916
 
 
917
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
918
 
 
919
 
      if (resource->participatesInSqlTransaction())
920
 
      {
921
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
922
 
        {
923
 
          my_error(ER_GET_ERRNO, MYF(0), err);
924
 
          error= 1;
925
 
        }
926
 
        else
927
 
        {
928
 
          session.status_var.ha_savepoint_count++;
929
 
        }
930
 
      }
931
 
    }
932
 
  }
933
 
  /*
934
 
    Remember the list of registered storage engines.
935
 
  */
936
 
  sv.setResourceContexts(resource_contexts);
937
 
 
938
 
  if (shouldConstructMessages())
939
 
  {
940
 
    message::Transaction *transaction= session.getTransactionMessage();
941
 
                  
942
 
    if (transaction != NULL)
943
 
    {
944
 
      message::Transaction *transaction_savepoint= 
945
 
        new message::Transaction(*transaction);
946
 
      sv.setTransactionMessage(transaction_savepoint);
947
 
    }
948
 
  } 
949
 
 
950
 
  return error;
951
 
}
952
 
 
953
 
int TransactionServices::releaseSavepoint(Session::reference session,
954
 
                                          NamedSavepoint &sv)
955
 
{
956
 
  int error= 0;
957
 
 
958
 
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
959
 
 
960
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
961
 
       it != resource_contexts.end();
962
 
       ++it)
963
 
  {
964
 
    int err;
965
 
    ResourceContext *resource_context= *it;
966
 
 
967
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
968
 
 
969
 
    if (resource->participatesInSqlTransaction())
970
 
    {
971
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
972
 
      {
973
 
        my_error(ER_GET_ERRNO, MYF(0), err);
974
 
        error= 1;
975
 
      }
976
 
    }
977
 
  }
978
 
  
979
 
  return error;
980
 
}
981
 
 
982
 
bool TransactionServices::shouldConstructMessages()
983
 
{
984
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
985
 
  return replication_services.isActive();
986
 
}
987
 
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
989
 
                                                                       bool should_inc_trx_id)
990
 
{
991
 
  message::Transaction *transaction= session.getTransactionMessage();
992
 
 
993
 
  if (unlikely(transaction == NULL))
994
 
  {
995
 
    /* 
996
 
     * Allocate and initialize a new transaction message 
997
 
     * for this Session object.  Session is responsible for
998
 
     * deleting transaction message when done with it.
999
 
     */
1000
 
    transaction= new (nothrow) message::Transaction();
1001
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
 
    session.setTransactionMessage(transaction);
1003
 
    return transaction;
1004
 
  }
1005
 
  else
1006
 
    return transaction;
1007
 
}
1008
 
 
1009
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
 
                                                 Session::reference session,
1011
 
                                                 bool should_inc_trx_id)
1012
 
{
1013
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1014
 
  trx->set_server_id(session.getServerId());
1015
 
 
1016
 
  if (should_inc_trx_id)
1017
 
  {
1018
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1019
 
    session.setXaId(0);
1020
 
  }
1021
 
  else
1022
 
  {
1023
 
    /* trx and seg id will get set properly elsewhere */
1024
 
    trx->set_transaction_id(0);
1025
 
  }
1026
 
 
1027
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1028
 
  
1029
 
  /* segment info may get set elsewhere as needed */
1030
 
  transaction.set_segment_id(1);
1031
 
  transaction.set_end_segment(true);
1032
 
}
1033
 
 
1034
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
 
                                                     Session::const_reference session)
1036
 
{
1037
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1039
 
}
1040
 
 
1041
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
 
                                                    Session::reference session)
1043
 
{
1044
 
  delete transaction;
1045
 
  session.setStatementMessage(NULL);
1046
 
  session.setTransactionMessage(NULL);
1047
 
  session.setXaId(0);
1048
 
}
1049
 
 
1050
 
int TransactionServices::commitTransactionMessage(Session::reference session)
1051
 
{
1052
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
 
  if (! replication_services.isActive())
1054
 
    return 0;
1055
 
 
1056
 
  /*
1057
 
   * If no Transaction message was ever created, then no data modification
1058
 
   * occurred inside the transaction, so nothing to do.
1059
 
   */
1060
 
  if (session.getTransactionMessage() == NULL)
1061
 
    return 0;
1062
 
  
1063
 
  /* If there is an active statement message, finalize it. */
1064
 
  message::Statement *statement= session.getStatementMessage();
1065
 
 
1066
 
  if (statement != NULL)
1067
 
  {
1068
 
    finalizeStatementMessage(*statement, session);
1069
 
  }
1070
 
 
1071
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
1072
 
 
1073
 
  /*
1074
 
   * It is possible that we could have a Transaction without any Statements
1075
 
   * if we had created a Statement but had to roll it back due to it failing
1076
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1077
 
   * message. In this case, we simply clean up the message and not push it.
1078
 
   */
1079
 
  if (transaction->statement_size() == 0)
1080
 
  {
1081
 
    cleanupTransactionMessage(transaction, session);
1082
 
    return 0;
1083
 
  }
1084
 
  
1085
 
  finalizeTransactionMessage(*transaction, session);
1086
 
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1088
 
 
1089
 
  cleanupTransactionMessage(transaction, session);
1090
 
 
1091
 
  return static_cast<int>(result);
1092
 
}
1093
 
 
1094
 
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
 
                                               message::Statement::Type type,
1096
 
                                               Session::const_reference session)
1097
 
{
1098
 
  statement.set_type(type);
1099
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
1100
 
 
1101
 
  if (session.variables.replicate_query)
1102
 
    statement.set_sql(session.getQueryString()->c_str());
1103
 
}
1104
 
 
1105
 
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                                   Session::reference session)
1107
 
{
1108
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1109
 
  session.setStatementMessage(NULL);
1110
 
}
1111
 
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1113
 
{
1114
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
 
  if (! replication_services.isActive())
1116
 
    return;
1117
 
  
1118
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1119
 
 
1120
 
  /*
1121
 
   * OK, so there are two situations that we need to deal with here:
1122
 
   *
1123
 
   * 1) We receive an instruction to ROLLBACK the current transaction
1124
 
   *    and the currently-stored Transaction message is *self-contained*, 
1125
 
   *    meaning that no Statement messages in the Transaction message
1126
 
   *    contain a message having its segment_id member greater than 1.  If
1127
 
   *    no non-segment ID 1 members are found, we can simply clear the
1128
 
   *    current Transaction message and remove it from memory.
1129
 
   *
1130
 
   * 2) If the Transaction message does indeed have a non-end segment, that
1131
 
   *    means that a bulk update/delete/insert Transaction message segment
1132
 
   *    has previously been sent over the wire to replicators.  In this case, 
1133
 
   *    we need to package a Transaction with a Statement message of type
1134
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
1135
 
   *    messages must be un-applied.
1136
 
   */
1137
 
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1138
 
  {
1139
 
    /* Remember the transaction ID so we can re-use it */
1140
 
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1141
 
    uint32_t seg_id= transaction->segment_id();
1142
 
 
1143
 
    /*
1144
 
     * Clear the transaction, create a Rollback statement message, 
1145
 
     * attach it to the transaction, and push it to replicators.
1146
 
     */
1147
 
    transaction->Clear();
1148
 
    initTransactionMessage(*transaction, session, false);
1149
 
 
1150
 
    /* Set the transaction ID to match the previous messages */
1151
 
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1152
 
    transaction->set_segment_id(seg_id);
1153
 
    transaction->set_end_segment(true);
1154
 
 
1155
 
    message::Statement *statement= transaction->add_statement();
1156
 
 
1157
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1158
 
    finalizeStatementMessage(*statement, session);
1159
 
 
1160
 
    finalizeTransactionMessage(*transaction, session);
1161
 
    
1162
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
1163
 
  }
1164
 
 
1165
 
  cleanupTransactionMessage(transaction, session);
1166
 
}
1167
 
 
1168
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
1169
 
{
1170
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1171
 
  if (! replication_services.isActive())
1172
 
    return;
1173
 
 
1174
 
  message::Statement *current_statement= session.getStatementMessage();
1175
 
 
1176
 
  /* If we never added a Statement message, nothing to undo. */
1177
 
  if (current_statement == NULL)
1178
 
    return;
1179
 
 
1180
 
  /*
1181
 
   * If the Statement has been segmented, then we've already pushed a portion
1182
 
   * of this Statement's row changes through the replication stream and we
1183
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1184
 
   * delete the current Statement message.
1185
 
   */
1186
 
  bool is_segmented= false;
1187
 
 
1188
 
  switch (current_statement->type())
1189
 
  {
1190
 
    case message::Statement::INSERT:
1191
 
      if (current_statement->insert_data().segment_id() > 1)
1192
 
        is_segmented= true;
1193
 
      break;
1194
 
 
1195
 
    case message::Statement::UPDATE:
1196
 
      if (current_statement->update_data().segment_id() > 1)
1197
 
        is_segmented= true;
1198
 
      break;
1199
 
 
1200
 
    case message::Statement::DELETE:
1201
 
      if (current_statement->delete_data().segment_id() > 1)
1202
 
        is_segmented= true;
1203
 
      break;
1204
 
 
1205
 
    default:
1206
 
      break;
1207
 
  }
1208
 
 
1209
 
  /*
1210
 
   * Remove the Statement message we've been working with (same as
1211
 
   * current_statement).
1212
 
   */
1213
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1214
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1215
 
  statements_in_txn= transaction->mutable_statement();
1216
 
  statements_in_txn->RemoveLast();
1217
 
  session.setStatementMessage(NULL);
1218
 
  
1219
 
  /*
1220
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1221
 
   * an indicator to cancel the previous Statement message which should have
1222
 
   * had its end_segment attribute set to false.
1223
 
   */
1224
 
  if (is_segmented)
1225
 
  {
1226
 
    current_statement= transaction->add_statement();
1227
 
    initStatementMessage(*current_statement,
1228
 
                         message::Statement::ROLLBACK_STATEMENT,
1229
 
                         session);
1230
 
    finalizeStatementMessage(*current_statement, session);
1231
 
  }
1232
 
}
1233
 
 
1234
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1235
 
                                                                     message::Transaction *transaction)
1236
 
{
1237
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1238
 
  uint32_t seg_id= transaction->segment_id();
1239
 
  
1240
 
  transaction->set_end_segment(false);
1241
 
  commitTransactionMessage(session);
1242
 
  transaction= getActiveTransactionMessage(session, false);
1243
 
  
1244
 
  /* Set the transaction ID to match the previous messages */
1245
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1246
 
  transaction->set_segment_id(seg_id + 1);
1247
 
  transaction->set_end_segment(true);
1248
 
 
1249
 
  return transaction;
1250
 
}
1251
 
 
1252
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1253
 
                                                            Table &table,
1254
 
                                                            uint32_t *next_segment_id)
1255
 
{
1256
 
  message::Statement *statement= session.getStatementMessage();
1257
 
  message::Transaction *transaction= NULL;
1258
 
  
1259
 
  /*
1260
 
   * If statement is NULL, this is a new statement.
1261
 
   * If statement is NOT NULL, this a continuation of the same statement.
1262
 
   * This is because autocommitOrRollback() finalizes the statement so that
1263
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1264
 
   * share a single GPB message for multiple statements).
1265
 
   */
1266
 
  if (statement == NULL)
1267
 
  {
1268
 
    transaction= getActiveTransactionMessage(session);
1269
 
 
1270
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1271
 
        transaction_message_threshold)
1272
 
    {
1273
 
      transaction= segmentTransactionMessage(session, transaction);
1274
 
    }
1275
 
 
1276
 
    statement= transaction->add_statement();
1277
 
    setInsertHeader(*statement, session, table);
1278
 
    session.setStatementMessage(statement);
1279
 
  }
1280
 
  else
1281
 
  {
1282
 
    transaction= getActiveTransactionMessage(session);
1283
 
    
1284
 
    /*
1285
 
     * If we've passed our threshold for the statement size (possible for
1286
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1287
 
     * the Transaction will keep it from getting huge).
1288
 
     */
1289
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1290
 
        transaction_message_threshold)
1291
 
    {
1292
 
      /* Remember the transaction ID so we can re-use it */
1293
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1294
 
      uint32_t seg_id= transaction->segment_id();
1295
 
      
1296
 
      message::InsertData *current_data= statement->mutable_insert_data();
1297
 
      
1298
 
      /* Caller should use this value when adding a new record */
1299
 
      *next_segment_id= current_data->segment_id() + 1;
1300
 
      
1301
 
      current_data->set_end_segment(false);
1302
 
      transaction->set_end_segment(false);
1303
 
      
1304
 
      /* 
1305
 
       * Send the trx message to replicators after finalizing the 
1306
 
       * statement and transaction. This will also set the Transaction
1307
 
       * and Statement objects in Session to NULL.
1308
 
       */
1309
 
      commitTransactionMessage(session);
1310
 
      
1311
 
      /*
1312
 
       * Statement and Transaction should now be NULL, so new ones will get
1313
 
       * created. We reuse the transaction id since we are segmenting
1314
 
       * one transaction.
1315
 
       */
1316
 
      transaction= getActiveTransactionMessage(session, false);
1317
 
      assert(transaction != NULL);
1318
 
 
1319
 
      statement= transaction->add_statement();
1320
 
      setInsertHeader(*statement, session, table);
1321
 
      session.setStatementMessage(statement);
1322
 
            
1323
 
      /* Set the transaction ID to match the previous messages */
1324
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1325
 
      transaction->set_segment_id(seg_id + 1);
1326
 
      transaction->set_end_segment(true);
1327
 
    }
1328
 
    else
1329
 
    {
1330
 
      /*
1331
 
       * Continuation of the same statement. Carry forward the existing
1332
 
       * segment id.
1333
 
       */
1334
 
      const message::InsertData &current_data= statement->insert_data();
1335
 
      *next_segment_id= current_data.segment_id();
1336
 
    }
1337
 
  }
1338
 
  
1339
 
  return *statement;
1340
 
}
1341
 
 
1342
 
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
 
                                          Session::const_reference session,
1344
 
                                          Table &table)
1345
 
{
1346
 
  initStatementMessage(statement, message::Statement::INSERT, session);
1347
 
 
1348
 
  /* 
1349
 
   * Now we construct the specialized InsertHeader message inside
1350
 
   * the generalized message::Statement container...
1351
 
   */
1352
 
  /* Set up the insert header */
1353
 
  message::InsertHeader *header= statement.mutable_insert_header();
1354
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1355
 
 
1356
 
  string schema_name;
1357
 
  (void) table.getShare()->getSchemaName(schema_name);
1358
 
  string table_name;
1359
 
  (void) table.getShare()->getTableName(table_name);
1360
 
 
1361
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1362
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1363
 
 
1364
 
  Field *current_field;
1365
 
  Field **table_fields= table.getFields();
1366
 
 
1367
 
  message::FieldMetadata *field_metadata;
1368
 
 
1369
 
  /* We will read all the table's fields... */
1370
 
  table.setReadSet();
1371
 
 
1372
 
  while ((current_field= *table_fields++) != NULL) 
1373
 
  {
1374
 
    field_metadata= header->add_field_metadata();
1375
 
    field_metadata->set_name(current_field->field_name);
1376
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1377
 
  }
1378
 
}
1379
 
 
1380
 
bool TransactionServices::insertRecord(Session::reference session,
1381
 
                                       Table &table)
1382
 
{
1383
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1384
 
  if (! replication_services.isActive())
1385
 
    return false;
1386
 
 
1387
 
  /**
1388
 
   * We do this check here because we don't want to even create a 
1389
 
   * statement if there isn't a primary key on the table...
1390
 
   *
1391
 
   * @todo
1392
 
   *
1393
 
   * Multi-column primary keys are handled how exactly?
1394
 
   */
1395
 
  if (not table.getShare()->hasPrimaryKey())
1396
 
  {
1397
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1398
 
    return true;
1399
 
  }
1400
 
 
1401
 
  uint32_t next_segment_id= 1;
1402
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1403
 
 
1404
 
  message::InsertData *data= statement.mutable_insert_data();
1405
 
  data->set_segment_id(next_segment_id);
1406
 
  data->set_end_segment(true);
1407
 
  message::InsertRecord *record= data->add_record();
1408
 
 
1409
 
  Field *current_field;
1410
 
  Field **table_fields= table.getFields();
1411
 
 
1412
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1413
 
  string_value->set_charset(system_charset_info);
1414
 
 
1415
 
  /* We will read all the table's fields... */
1416
 
  table.setReadSet();
1417
 
 
1418
 
  while ((current_field= *table_fields++) != NULL) 
1419
 
  {
1420
 
    if (current_field->is_null())
1421
 
    {
1422
 
      record->add_is_null(true);
1423
 
      record->add_insert_value("", 0);
1424
 
    } 
1425
 
    else 
1426
 
    {
1427
 
      string_value= current_field->val_str_internal(string_value);
1428
 
      record->add_is_null(false);
1429
 
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1430
 
      string_value->free();
1431
 
    }
1432
 
  }
1433
 
  return false;
1434
 
}
1435
 
 
1436
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1437
 
                                                            Table &table,
1438
 
                                                            const unsigned char *old_record, 
1439
 
                                                            const unsigned char *new_record,
1440
 
                                                            uint32_t *next_segment_id)
1441
 
{
1442
 
  message::Statement *statement= session.getStatementMessage();
1443
 
  message::Transaction *transaction= NULL;
1444
 
 
1445
 
  /*
1446
 
   * If statement is NULL, this is a new statement.
1447
 
   * If statement is NOT NULL, this a continuation of the same statement.
1448
 
   * This is because autocommitOrRollback() finalizes the statement so that
1449
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1450
 
   * share a single GPB message for multiple statements).
1451
 
   */
1452
 
  if (statement == NULL)
1453
 
  {
1454
 
    transaction= getActiveTransactionMessage(session);
1455
 
    
1456
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1457
 
        transaction_message_threshold)
1458
 
    {
1459
 
      transaction= segmentTransactionMessage(session, transaction);
1460
 
    }
1461
 
    
1462
 
    statement= transaction->add_statement();
1463
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1464
 
    session.setStatementMessage(statement);
1465
 
  }
1466
 
  else
1467
 
  {
1468
 
    transaction= getActiveTransactionMessage(session);
1469
 
    
1470
 
    /*
1471
 
     * If we've passed our threshold for the statement size (possible for
1472
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1473
 
     * the Transaction will keep it from getting huge).
1474
 
     */
1475
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1476
 
        transaction_message_threshold)
1477
 
    {
1478
 
      /* Remember the transaction ID so we can re-use it */
1479
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1480
 
      uint32_t seg_id= transaction->segment_id();
1481
 
      
1482
 
      message::UpdateData *current_data= statement->mutable_update_data();
1483
 
      
1484
 
      /* Caller should use this value when adding a new record */
1485
 
      *next_segment_id= current_data->segment_id() + 1;
1486
 
      
1487
 
      current_data->set_end_segment(false);
1488
 
      transaction->set_end_segment(false);
1489
 
      
1490
 
      /* 
1491
 
       * Send the trx message to replicators after finalizing the 
1492
 
       * statement and transaction. This will also set the Transaction
1493
 
       * and Statement objects in Session to NULL.
1494
 
       */
1495
 
      commitTransactionMessage(session);
1496
 
      
1497
 
      /*
1498
 
       * Statement and Transaction should now be NULL, so new ones will get
1499
 
       * created. We reuse the transaction id since we are segmenting
1500
 
       * one transaction.
1501
 
       */
1502
 
      transaction= getActiveTransactionMessage(session, false);
1503
 
      assert(transaction != NULL);
1504
 
      
1505
 
      statement= transaction->add_statement();
1506
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1507
 
      session.setStatementMessage(statement);
1508
 
      
1509
 
      /* Set the transaction ID to match the previous messages */
1510
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1511
 
      transaction->set_segment_id(seg_id + 1);
1512
 
      transaction->set_end_segment(true);
1513
 
    }
1514
 
    else
1515
 
    {
1516
 
      /*
1517
 
       * Continuation of the same statement. Carry forward the existing
1518
 
       * segment id.
1519
 
       */
1520
 
      const message::UpdateData &current_data= statement->update_data();
1521
 
      *next_segment_id= current_data.segment_id();
1522
 
    }
1523
 
  }
1524
 
  
1525
 
  return *statement;
1526
 
}
1527
 
 
1528
 
void TransactionServices::setUpdateHeader(message::Statement &statement,
1529
 
                                          Session::const_reference session,
1530
 
                                          Table &table,
1531
 
                                          const unsigned char *old_record, 
1532
 
                                          const unsigned char *new_record)
1533
 
{
1534
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
1535
 
 
1536
 
  /* 
1537
 
   * Now we construct the specialized UpdateHeader message inside
1538
 
   * the generalized message::Statement container...
1539
 
   */
1540
 
  /* Set up the update header */
1541
 
  message::UpdateHeader *header= statement.mutable_update_header();
1542
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1543
 
 
1544
 
  string schema_name;
1545
 
  (void) table.getShare()->getSchemaName(schema_name);
1546
 
  string table_name;
1547
 
  (void) table.getShare()->getTableName(table_name);
1548
 
 
1549
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1550
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1551
 
 
1552
 
  Field *current_field;
1553
 
  Field **table_fields= table.getFields();
1554
 
 
1555
 
  message::FieldMetadata *field_metadata;
1556
 
 
1557
 
  /* We will read all the table's fields... */
1558
 
  table.setReadSet();
1559
 
 
1560
 
  while ((current_field= *table_fields++) != NULL) 
1561
 
  {
1562
 
    /*
1563
 
     * We add the "key field metadata" -- i.e. the fields which is
1564
 
     * the primary key for the table.
1565
 
     */
1566
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1567
 
    {
1568
 
      field_metadata= header->add_key_field_metadata();
1569
 
      field_metadata->set_name(current_field->field_name);
1570
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1571
 
    }
1572
 
 
1573
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
1574
 
    {
1575
 
      /* Field is changed from old to new */
1576
 
      field_metadata= header->add_set_field_metadata();
1577
 
      field_metadata->set_name(current_field->field_name);
1578
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1579
 
    }
1580
 
  }
1581
 
}
1582
 
void TransactionServices::updateRecord(Session::reference session,
1583
 
                                       Table &table, 
1584
 
                                       const unsigned char *old_record, 
1585
 
                                       const unsigned char *new_record)
1586
 
{
1587
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1588
 
  if (! replication_services.isActive())
1589
 
    return;
1590
 
 
1591
 
  uint32_t next_segment_id= 1;
1592
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1593
 
 
1594
 
  message::UpdateData *data= statement.mutable_update_data();
1595
 
  data->set_segment_id(next_segment_id);
1596
 
  data->set_end_segment(true);
1597
 
  message::UpdateRecord *record= data->add_record();
1598
 
 
1599
 
  Field *current_field;
1600
 
  Field **table_fields= table.getFields();
1601
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1602
 
  string_value->set_charset(system_charset_info);
1603
 
 
1604
 
  while ((current_field= *table_fields++) != NULL) 
1605
 
  {
1606
 
    /*
1607
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1608
 
     * but then realized that an UPDATE statement could potentially have different values for
1609
 
     * the SET field.  For instance, imagine this SQL scenario:
1610
 
     *
1611
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1612
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1613
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1614
 
     *
1615
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1616
 
     */
1617
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
1618
 
    {
1619
 
      /* Store the original "read bit" for this field */
1620
 
      bool is_read_set= current_field->isReadSet();
1621
 
 
1622
 
      /* We need to mark that we will "read" this field... */
1623
 
      table.setReadSet(current_field->position());
1624
 
 
1625
 
      /* Read the string value of this field's contents */
1626
 
      string_value= current_field->val_str_internal(string_value);
1627
 
 
1628
 
      /* 
1629
 
       * Reset the read bit after reading field to its original state.  This 
1630
 
       * prevents the field from being included in the WHERE clause
1631
 
       */
1632
 
      current_field->setReadSet(is_read_set);
1633
 
 
1634
 
      if (current_field->is_null())
1635
 
      {
1636
 
        record->add_is_null(true);
1637
 
        record->add_after_value("", 0);
1638
 
      }
1639
 
      else
1640
 
      {
1641
 
        record->add_is_null(false);
1642
 
        record->add_after_value(string_value->c_ptr(), string_value->length());
1643
 
      }
1644
 
      string_value->free();
1645
 
    }
1646
 
 
1647
 
    /* 
1648
 
     * Add the WHERE clause values now...for now, this means the
1649
 
     * primary key field value.  Replication only supports tables
1650
 
     * with a primary key.
1651
 
     */
1652
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1653
 
    {
1654
 
      /**
1655
 
       * To say the below is ugly is an understatement. But it works.
1656
 
       * 
1657
 
       * @todo Move this crap into a real Record API.
1658
 
       */
1659
 
      string_value= current_field->val_str_internal(string_value,
1660
 
                                                    old_record + 
1661
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
1662
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1663
 
      string_value->free();
1664
 
    }
1665
 
 
1666
 
  }
1667
 
}
1668
 
 
1669
 
bool TransactionServices::isFieldUpdated(Field *current_field,
1670
 
                                         Table &table,
1671
 
                                         const unsigned char *old_record,
1672
 
                                         const unsigned char *new_record)
1673
 
{
1674
 
  /*
1675
 
   * The below really should be moved into the Field API and Record API.  But for now
1676
 
   * we do this crazy pointer fiddling to figure out if the current field
1677
 
   * has been updated in the supplied record raw byte pointers.
1678
 
   */
1679
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1680
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1681
 
 
1682
 
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1683
 
 
1684
 
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1685
 
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1686
 
 
1687
 
  bool isUpdated= false;
1688
 
  if (old_value_is_null != new_value_is_null)
1689
 
  {
1690
 
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1691
 
    {
1692
 
      isUpdated= true;
1693
 
    }
1694
 
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1695
 
    {
1696
 
      isUpdated= true;
1697
 
    }
1698
 
  }
1699
 
 
1700
 
  if (! isUpdated)
1701
 
  {
1702
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1703
 
    {
1704
 
      isUpdated= true;
1705
 
    }
1706
 
  }
1707
 
  return isUpdated;
1708
 
}  
1709
 
 
1710
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1711
 
                                                            Table &table,
1712
 
                                                            uint32_t *next_segment_id)
1713
 
{
1714
 
  message::Statement *statement= session.getStatementMessage();
1715
 
  message::Transaction *transaction= NULL;
1716
 
 
1717
 
  /*
1718
 
   * If statement is NULL, this is a new statement.
1719
 
   * If statement is NOT NULL, this a continuation of the same statement.
1720
 
   * This is because autocommitOrRollback() finalizes the statement so that
1721
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1722
 
   * share a single GPB message for multiple statements).
1723
 
   */
1724
 
  if (statement == NULL)
1725
 
  {
1726
 
    transaction= getActiveTransactionMessage(session);
1727
 
    
1728
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1729
 
        transaction_message_threshold)
1730
 
    {
1731
 
      transaction= segmentTransactionMessage(session, transaction);
1732
 
    }
1733
 
    
1734
 
    statement= transaction->add_statement();
1735
 
    setDeleteHeader(*statement, session, table);
1736
 
    session.setStatementMessage(statement);
1737
 
  }
1738
 
  else
1739
 
  {
1740
 
    transaction= getActiveTransactionMessage(session);
1741
 
    
1742
 
    /*
1743
 
     * If we've passed our threshold for the statement size (possible for
1744
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1745
 
     * the Transaction will keep it from getting huge).
1746
 
     */
1747
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1748
 
        transaction_message_threshold)
1749
 
    {
1750
 
      /* Remember the transaction ID so we can re-use it */
1751
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1752
 
      uint32_t seg_id= transaction->segment_id();
1753
 
      
1754
 
      message::DeleteData *current_data= statement->mutable_delete_data();
1755
 
      
1756
 
      /* Caller should use this value when adding a new record */
1757
 
      *next_segment_id= current_data->segment_id() + 1;
1758
 
      
1759
 
      current_data->set_end_segment(false);
1760
 
      transaction->set_end_segment(false);
1761
 
      
1762
 
      /* 
1763
 
       * Send the trx message to replicators after finalizing the 
1764
 
       * statement and transaction. This will also set the Transaction
1765
 
       * and Statement objects in Session to NULL.
1766
 
       */
1767
 
      commitTransactionMessage(session);
1768
 
      
1769
 
      /*
1770
 
       * Statement and Transaction should now be NULL, so new ones will get
1771
 
       * created. We reuse the transaction id since we are segmenting
1772
 
       * one transaction.
1773
 
       */
1774
 
      transaction= getActiveTransactionMessage(session, false);
1775
 
      assert(transaction != NULL);
1776
 
      
1777
 
      statement= transaction->add_statement();
1778
 
      setDeleteHeader(*statement, session, table);
1779
 
      session.setStatementMessage(statement);
1780
 
      
1781
 
      /* Set the transaction ID to match the previous messages */
1782
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1783
 
      transaction->set_segment_id(seg_id + 1);
1784
 
      transaction->set_end_segment(true);
1785
 
    }
1786
 
    else
1787
 
    {
1788
 
      /*
1789
 
       * Continuation of the same statement. Carry forward the existing
1790
 
       * segment id.
1791
 
       */
1792
 
      const message::DeleteData &current_data= statement->delete_data();
1793
 
      *next_segment_id= current_data.segment_id();
1794
 
    }
1795
 
  }
1796
 
  
1797
 
  return *statement;
1798
 
}
1799
 
 
1800
 
void TransactionServices::setDeleteHeader(message::Statement &statement,
1801
 
                                          Session::const_reference session,
1802
 
                                          Table &table)
1803
 
{
1804
 
  initStatementMessage(statement, message::Statement::DELETE, session);
1805
 
 
1806
 
  /* 
1807
 
   * Now we construct the specialized DeleteHeader message inside
1808
 
   * the generalized message::Statement container...
1809
 
   */
1810
 
  message::DeleteHeader *header= statement.mutable_delete_header();
1811
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1812
 
 
1813
 
  string schema_name;
1814
 
  (void) table.getShare()->getSchemaName(schema_name);
1815
 
  string table_name;
1816
 
  (void) table.getShare()->getTableName(table_name);
1817
 
 
1818
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1819
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1820
 
 
1821
 
  Field *current_field;
1822
 
  Field **table_fields= table.getFields();
1823
 
 
1824
 
  message::FieldMetadata *field_metadata;
1825
 
 
1826
 
  while ((current_field= *table_fields++) != NULL) 
1827
 
  {
1828
 
    /* 
1829
 
     * Add the WHERE clause values now...for now, this means the
1830
 
     * primary key field value.  Replication only supports tables
1831
 
     * with a primary key.
1832
 
     */
1833
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1834
 
    {
1835
 
      field_metadata= header->add_key_field_metadata();
1836
 
      field_metadata->set_name(current_field->field_name);
1837
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1838
 
    }
1839
 
  }
1840
 
}
1841
 
 
1842
 
void TransactionServices::deleteRecord(Session::reference session,
1843
 
                                       Table &table,
1844
 
                                       bool use_update_record)
1845
 
{
1846
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1847
 
  if (! replication_services.isActive())
1848
 
    return;
1849
 
 
1850
 
  uint32_t next_segment_id= 1;
1851
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1852
 
 
1853
 
  message::DeleteData *data= statement.mutable_delete_data();
1854
 
  data->set_segment_id(next_segment_id);
1855
 
  data->set_end_segment(true);
1856
 
  message::DeleteRecord *record= data->add_record();
1857
 
 
1858
 
  Field *current_field;
1859
 
  Field **table_fields= table.getFields();
1860
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1861
 
  string_value->set_charset(system_charset_info);
1862
 
 
1863
 
  while ((current_field= *table_fields++) != NULL) 
1864
 
  {
1865
 
    /* 
1866
 
     * Add the WHERE clause values now...for now, this means the
1867
 
     * primary key field value.  Replication only supports tables
1868
 
     * with a primary key.
1869
 
     */
1870
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1871
 
    {
1872
 
      if (use_update_record)
1873
 
      {
1874
 
        /*
1875
 
         * Temporarily point to the update record to get its value.
1876
 
         * This is pretty much a hack in order to get the PK value from
1877
 
         * the update record rather than the insert record. Field::val_str()
1878
 
         * should not change anything in Field::ptr, so this should be safe.
1879
 
         * We are careful not to change anything in old_ptr.
1880
 
         */
1881
 
        const unsigned char *old_ptr= current_field->ptr;
1882
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1883
 
        string_value= current_field->val_str_internal(string_value);
1884
 
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1885
 
      }
1886
 
      else
1887
 
      {
1888
 
        string_value= current_field->val_str_internal(string_value);
1889
 
        /**
1890
 
         * @TODO Store optional old record value in the before data member
1891
 
         */
1892
 
      }
1893
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1894
 
      string_value->free();
1895
 
    }
1896
 
  }
1897
 
}
1898
 
 
1899
 
void TransactionServices::createTable(Session::reference session,
1900
 
                                      const message::Table &table)
1901
 
{
1902
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1903
 
  if (! replication_services.isActive())
1904
 
    return;
1905
 
  
1906
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1907
 
  message::Statement *statement= transaction->add_statement();
1908
 
 
1909
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1910
 
 
1911
 
  /* 
1912
 
   * Construct the specialized CreateTableStatement message and attach
1913
 
   * it to the generic Statement message
1914
 
   */
1915
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1916
 
  message::Table *new_table_message= create_table_statement->mutable_table();
1917
 
  *new_table_message= table;
1918
 
 
1919
 
  finalizeStatementMessage(*statement, session);
1920
 
 
1921
 
  finalizeTransactionMessage(*transaction, session);
1922
 
  
1923
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1924
 
 
1925
 
  cleanupTransactionMessage(transaction, session);
1926
 
 
1927
 
}
1928
 
 
1929
 
void TransactionServices::createSchema(Session::reference session,
1930
 
                                       const message::Schema &schema)
1931
 
{
1932
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1933
 
  if (! replication_services.isActive())
1934
 
    return;
1935
 
  
1936
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1937
 
  message::Statement *statement= transaction->add_statement();
1938
 
 
1939
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1940
 
 
1941
 
  /* 
1942
 
   * Construct the specialized CreateSchemaStatement message and attach
1943
 
   * it to the generic Statement message
1944
 
   */
1945
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1946
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1947
 
  *new_schema_message= schema;
1948
 
 
1949
 
  finalizeStatementMessage(*statement, session);
1950
 
 
1951
 
  finalizeTransactionMessage(*transaction, session);
1952
 
  
1953
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1954
 
 
1955
 
  cleanupTransactionMessage(transaction, session);
1956
 
 
1957
 
}
1958
 
 
1959
 
void TransactionServices::dropSchema(Session::reference session,
1960
 
                                     identifier::Schema::const_reference identifier)
1961
 
{
1962
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1963
 
  if (! replication_services.isActive())
1964
 
    return;
1965
 
  
1966
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1967
 
  message::Statement *statement= transaction->add_statement();
1968
 
 
1969
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1970
 
 
1971
 
  /* 
1972
 
   * Construct the specialized DropSchemaStatement message and attach
1973
 
   * it to the generic Statement message
1974
 
   */
1975
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1976
 
 
1977
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1978
 
 
1979
 
  finalizeStatementMessage(*statement, session);
1980
 
 
1981
 
  finalizeTransactionMessage(*transaction, session);
1982
 
  
1983
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1984
 
 
1985
 
  cleanupTransactionMessage(transaction, session);
1986
 
}
1987
 
 
1988
 
void TransactionServices::alterSchema(Session::reference session,
1989
 
                                      const message::schema::shared_ptr &old_schema,
1990
 
                                      const message::Schema &new_schema)
1991
 
{
1992
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1993
 
  if (! replication_services.isActive())
1994
 
    return;
1995
 
  
1996
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1997
 
  message::Statement *statement= transaction->add_statement();
1998
 
 
1999
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2000
 
 
2001
 
  /* 
2002
 
   * Construct the specialized AlterSchemaStatement message and attach
2003
 
   * it to the generic Statement message
2004
 
   */
2005
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2006
 
 
2007
 
  message::Schema *before= alter_schema_statement->mutable_before();
2008
 
  message::Schema *after= alter_schema_statement->mutable_after();
2009
 
 
2010
 
  *before= *old_schema;
2011
 
  *after= new_schema;
2012
 
 
2013
 
  finalizeStatementMessage(*statement, session);
2014
 
 
2015
 
  finalizeTransactionMessage(*transaction, session);
2016
 
  
2017
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2018
 
 
2019
 
  cleanupTransactionMessage(transaction, session);
2020
 
}
2021
 
 
2022
 
void TransactionServices::dropTable(Session::reference session,
2023
 
                                    const identifier::Table &table,
2024
 
                                    bool if_exists)
2025
 
{
2026
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2027
 
  if (! replication_services.isActive())
2028
 
    return;
2029
 
  
2030
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2031
 
  message::Statement *statement= transaction->add_statement();
2032
 
 
2033
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2034
 
 
2035
 
  /* 
2036
 
   * Construct the specialized DropTableStatement message and attach
2037
 
   * it to the generic Statement message
2038
 
   */
2039
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2040
 
 
2041
 
  drop_table_statement->set_if_exists_clause(if_exists);
2042
 
 
2043
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2044
 
 
2045
 
  table_metadata->set_schema_name(table.getSchemaName());
2046
 
  table_metadata->set_table_name(table.getTableName());
2047
 
 
2048
 
  finalizeStatementMessage(*statement, session);
2049
 
 
2050
 
  finalizeTransactionMessage(*transaction, session);
2051
 
  
2052
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2053
 
 
2054
 
  cleanupTransactionMessage(transaction, session);
2055
 
}
2056
 
 
2057
 
void TransactionServices::truncateTable(Session::reference session,
2058
 
                                        Table &table)
2059
 
{
2060
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2061
 
  if (! replication_services.isActive())
2062
 
    return;
2063
 
  
2064
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2065
 
  message::Statement *statement= transaction->add_statement();
2066
 
 
2067
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2068
 
 
2069
 
  /* 
2070
 
   * Construct the specialized TruncateTableStatement message and attach
2071
 
   * it to the generic Statement message
2072
 
   */
2073
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2074
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2075
 
 
2076
 
  string schema_name;
2077
 
  (void) table.getShare()->getSchemaName(schema_name);
2078
 
  string table_name;
2079
 
  (void) table.getShare()->getTableName(table_name);
2080
 
 
2081
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2082
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2083
 
 
2084
 
  finalizeStatementMessage(*statement, session);
2085
 
 
2086
 
  finalizeTransactionMessage(*transaction, session);
2087
 
  
2088
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2089
 
 
2090
 
  cleanupTransactionMessage(transaction, session);
2091
 
}
2092
 
 
2093
 
void TransactionServices::rawStatement(Session::reference session,
2094
 
                                       const string &query)
2095
 
{
2096
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2097
 
  if (! replication_services.isActive())
2098
 
    return;
2099
 
 
2100
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2101
 
  message::Statement *statement= transaction->add_statement();
2102
 
 
2103
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2104
 
  statement->set_sql(query);
2105
 
  finalizeStatementMessage(*statement, session);
2106
 
 
2107
 
  finalizeTransactionMessage(*transaction, session);
2108
 
  
2109
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2110
 
 
2111
 
  cleanupTransactionMessage(transaction, session);
2112
 
}
2113
 
 
2114
 
int TransactionServices::sendEvent(Session::reference session,
2115
 
                                   const message::Event &event)
2116
 
{
2117
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2118
 
  if (! replication_services.isActive())
2119
 
    return 0;
2120
 
 
2121
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
2122
 
 
2123
 
  // set server id, start timestamp
2124
 
  initTransactionMessage(*transaction, session, true);
2125
 
 
2126
 
  // set end timestamp
2127
 
  finalizeTransactionMessage(*transaction, session);
2128
 
 
2129
 
  message::Event *trx_event= transaction->mutable_event();
2130
 
 
2131
 
  trx_event->CopyFrom(event);
2132
 
 
2133
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2134
 
 
2135
 
  delete transaction;
2136
 
 
2137
 
  return static_cast<int>(result);
2138
 
}
2139
 
 
2140
 
bool TransactionServices::sendStartupEvent(Session::reference session)
2141
 
{
2142
 
  message::Event event;
2143
 
  event.set_type(message::Event::STARTUP);
2144
 
  if (sendEvent(session, event) != 0)
2145
 
    return false;
2146
 
  return true;
2147
 
}
2148
 
 
2149
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
2150
 
{
2151
 
  message::Event event;
2152
 
  event.set_type(message::Event::SHUTDOWN);
2153
 
  if (sendEvent(session, event) != 0)
2154
 
    return false;
2155
 
  return true;
2156
 
}
2157
 
 
2158
 
} /* namespace drizzled */