~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Mark Atwood
  • Date: 2008-10-03 01:39:40 UTC
  • mto: This revision was merged to the branch mainline in revision 437.
  • Revision ID: mark@fallenpegasus.com-20081003013940-mvefjo725dltz41h
rename logging_noop to logging_query

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  This program is free software; you can redistribute it and/or modify
8
 
 *  it under the terms of the GNU General Public License as published by
9
 
 *  the Free Software Foundation; version 2 of the License.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
 
 */
20
 
 
21
 
/**
22
 
 * @file Transaction processing code
23
 
 *
24
 
 * @note
25
 
 *
26
 
 * The TransactionServices component takes internal events (for instance the start of a 
27
 
 * transaction, the changing of a record, or the rollback of a transaction) 
28
 
 * and constructs GPB Messages that are passed to the ReplicationServices
29
 
 * component and used during replication.
30
 
 *
31
 
 * The reason for this functionality is to encapsulate all communication
32
 
 * between the kernel and the replicator/applier plugins into GPB Messages.
33
 
 * Instead of the plugin having to understand the (often fluidly changing)
34
 
 * mechanics of the kernel, all the plugin needs to understand is the message
35
 
 * format, and GPB messages provide a nice, clear, and versioned format for 
36
 
 * these messages.
37
 
 *
38
 
 * @see /drizzled/message/transaction.proto
39
 
 *
40
 
 * @todo
41
 
 *
42
 
 * We really should store the raw bytes in the messages, not the
43
 
 * String value of the Field.  But, to do that, the
44
 
 * statement_transform library needs first to be updated
45
 
 * to include the transformation code to convert raw
46
 
 * Drizzle-internal Field byte representation into something
47
 
 * plugins can understand.
48
 
 */
49
 
 
50
 
#include <config.h>
51
 
#include <drizzled/current_session.h>
52
 
#include <drizzled/my_hash.h>
53
 
#include <drizzled/error.h>
54
 
#include <drizzled/gettext.h>
55
 
#include <drizzled/probes.h>
56
 
#include <drizzled/sql_parse.h>
57
 
#include <drizzled/session.h>
58
 
#include <drizzled/sql_base.h>
59
 
#include <drizzled/replication_services.h>
60
 
#include <drizzled/transaction_services.h>
61
 
#include <drizzled/transaction_context.h>
62
 
#include <drizzled/message/transaction.pb.h>
63
 
#include <drizzled/message/statement_transform.h>
64
 
#include <drizzled/resource_context.h>
65
 
#include <drizzled/lock.h>
66
 
#include <drizzled/item/int.h>
67
 
#include <drizzled/item/empty_string.h>
68
 
#include <drizzled/field/epoch.h>
69
 
#include <drizzled/plugin/client.h>
70
 
#include <drizzled/plugin/monitored_in_transaction.h>
71
 
#include <drizzled/plugin/transactional_storage_engine.h>
72
 
#include <drizzled/plugin/xa_resource_manager.h>
73
 
#include <drizzled/plugin/xa_storage_engine.h>
74
 
#include <drizzled/internal/my_sys.h>
75
 
 
76
 
#include <vector>
77
 
#include <algorithm>
78
 
#include <functional>
79
 
#include <google/protobuf/repeated_field.h>
80
 
 
81
 
using namespace std;
82
 
using namespace google;
83
 
 
84
 
namespace drizzled
85
 
{
86
 
 
87
 
/**
88
 
 * @defgroup Transactions
89
 
 *
90
 
 * @brief
91
 
 *
92
 
 * Transaction handling in the server
93
 
 *
94
 
 * @detail
95
 
 *
96
 
 * In each client connection, Drizzle maintains two transaction
97
 
 * contexts representing the state of the:
98
 
 *
99
 
 * 1) Statement Transaction
100
 
 * 2) Normal Transaction
101
 
 *
102
 
 * These two transaction contexts represent the transactional
103
 
 * state of a Session's SQL and XA transactions for a single
104
 
 * SQL statement or a series of SQL statements.
105
 
 *
106
 
 * When the Session's connection is in AUTOCOMMIT mode, there
107
 
 * is no practical difference between the statement and the
108
 
 * normal transaction, as each SQL statement is committed or
109
 
 * rolled back depending on the success or failure of the
110
 
 * indvidual SQL statement.
111
 
 *
112
 
 * When the Session's connection is NOT in AUTOCOMMIT mode, OR
113
 
 * the Session has explicitly begun a normal SQL transaction using
114
 
 * a BEGIN WORK/START TRANSACTION statement, then the normal
115
 
 * transaction context tracks the aggregate transaction state of
116
 
 * the SQL transaction's individual statements, and the SQL
117
 
 * transaction's commit or rollback is done atomically for all of
118
 
 * the SQL transaction's statement's data changes.
119
 
 *
120
 
 * Technically, a statement transaction can be viewed as a savepoint 
121
 
 * which is maintained automatically in order to make effects of one
122
 
 * statement atomic.
123
 
 *
124
 
 * The normal transaction is started by the user and is typically
125
 
 * ended (COMMIT or ROLLBACK) upon an explicity user request as well.
126
 
 * The exception to this is that DDL statements implicitly COMMIT
127
 
 * any previously active normal transaction before they begin executing.
128
 
 *
129
 
 * In Drizzle, unlike MySQL, plugins other than a storage engine
130
 
 * may participate in a transaction.  All plugin::TransactionalStorageEngine
131
 
 * plugins will automatically be monitored by Drizzle's transaction 
132
 
 * manager (implemented in this source file), as will all plugins which
133
 
 * implement plugin::XaResourceManager and register with the transaction
134
 
 * manager.
135
 
 *
136
 
 * If Drizzle's transaction manager sees that more than one resource
137
 
 * manager (transactional storage engine or XA resource manager) has modified
138
 
 * data state during a statement or normal transaction, the transaction
139
 
 * manager will automatically use a two-phase commit protocol for all
140
 
 * resources which support XA's distributed transaction protocol.  Unlike
141
 
 * MySQL, storage engines need not manually register with the transaction
142
 
 * manager during a statement's execution.  Previously, in MySQL, all
143
 
 * handlertons would have to call trans_register_ha() at some point after
144
 
 * modifying data state in order to have MySQL include that handler in
145
 
 * an XA transaction.  Drizzle does all of this grunt work behind the
146
 
 * scenes for the storage engine implementers.
147
 
 *
148
 
 * When a connection is closed, the current normal transaction, if
149
 
 * any is currently active, is rolled back.
150
 
 *
151
 
 * Transaction life cycle
152
 
 * ----------------------
153
 
 *
154
 
 * When a new connection is established, session->transaction
155
 
 * members are initialized to an empty state. If a statement uses any tables, 
156
 
 * all affected engines are registered in the statement engine list automatically
157
 
 * in plugin::StorageEngine::startStatement() and 
158
 
 * plugin::TransactionalStorageEngine::startTransaction().
159
 
 *
160
 
 * You can view the lifetime of a normal transaction in the following
161
 
 * call-sequence:
162
 
 *
163
 
 * drizzled::statement::Statement::execute()
164
 
 *   drizzled::plugin::TransactionalStorageEngine::startTransaction()
165
 
 *     drizzled::TransactionServices::registerResourceForTransaction()
166
 
 *     drizzled::TransactionServices::registerResourceForStatement()
167
 
 *     drizzled::plugin::StorageEngine::startStatement()
168
 
 *       drizzled::Cursor::write_row() <-- example...could be update_row(), etc
169
 
 *     drizzled::plugin::StorageEngine::endStatement()
170
 
 *   drizzled::TransactionServices::autocommitOrRollback()
171
 
 *     drizzled::TransactionalStorageEngine::commit() <-- or ::rollback()
172
 
 *     drizzled::XaResourceManager::xaCommit() <-- or rollback()
173
 
 *
174
 
 * Roles and responsibilities
175
 
 * --------------------------
176
 
 *
177
 
 * Beginning of SQL Statement (and Statement Transaction)
178
 
 * ------------------------------------------------------
179
 
 *
180
 
 * At the start of each SQL statement, for each storage engine
181
 
 * <strong>that is involved in the SQL statement</strong>, the kernel 
182
 
 * calls the engine's plugin::StoragEngine::startStatement() method.  If the
183
 
 * engine needs to track some data for the statement, it should use
184
 
 * this method invocation to initialize this data.  This is the
185
 
 * beginning of what is called the "statement transaction".
186
 
 *
187
 
 * <strong>For transaction storage engines (those storage engines
188
 
 * that inherit from plugin::TransactionalStorageEngine)</strong>, the
189
 
 * kernel automatically determines if the start of the SQL statement 
190
 
 * transaction should <em>also</em> begin the normal SQL transaction.
191
 
 * This occurs when the connection is in NOT in autocommit mode. If
192
 
 * the kernel detects this, then the kernel automatically starts the
193
 
 * normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
194
 
 * method and then calls plugin::StorageEngine::startStatement()
195
 
 * afterwards.
196
 
 *
197
 
 * Beginning of an SQL "Normal" Transaction
198
 
 * ----------------------------------------
199
 
 *
200
 
 * As noted above, a "normal SQL transaction" may be started when
201
 
 * an SQL statement is started in a connection and the connection is
202
 
 * NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
203
 
 *
204
 
 * In addition, when a user executes a START TRANSACTION or
205
 
 * BEGIN WORK statement in a connection, the kernel explicitly
206
 
 * calls each transactional storage engine's startTransaction() method.
207
 
 *
208
 
 * Ending of an SQL Statement (and Statement Transaction)
209
 
 * ------------------------------------------------------
210
 
 *
211
 
 * At the end of each SQL statement, for each of the aforementioned
212
 
 * involved storage engines, the kernel calls the engine's
213
 
 * plugin::StorageEngine::endStatement() method.  If the engine
214
 
 * has initialized or modified some internal data about the
215
 
 * statement transaction, it should use this method to reset or destroy
216
 
 * this data appropriately.
217
 
 *
218
 
 * Ending of an SQL "Normal" Transaction
219
 
 * -------------------------------------
220
 
 *
221
 
 * The end of a normal transaction is either a ROLLBACK or a COMMIT, 
222
 
 * depending on the success or failure of the statement transaction(s) 
223
 
 * it encloses.
224
 
 *
225
 
 * The end of a "normal transaction" occurs when any of the following
226
 
 * occurs:
227
 
 *
228
 
 * 1) If a statement transaction has completed and AUTOCOMMIT is ON,
229
 
 *    then the normal transaction which encloses the statement
230
 
 *    transaction ends
231
 
 * 2) If a COMMIT or ROLLBACK statement occurs on the connection
232
 
 * 3) Just before a DDL operation occurs, the kernel will implicitly
233
 
 *    commit the active normal transaction
234
 
 *
235
 
 * Transactions and Non-transactional Storage Engines
236
 
 * --------------------------------------------------
237
 
 *
238
 
 * For non-transactional engines, this call can be safely ignored, an
239
 
 * the kernel tracks whether a non-transactional engine has changed
240
 
 * any data state, and warns the user appropriately if a transaction
241
 
 * (statement or normal) is rolled back after such non-transactional
242
 
 * data changes have been made.
243
 
 *
244
 
 * XA Two-phase Commit Protocol
245
 
 * ----------------------------
246
 
 *
247
 
 * During statement execution, whenever any of data-modifying
248
 
 * PSEA API methods is used, e.g. Cursor::write_row() or
249
 
 * Cursor::update_row(), the read-write flag is raised in the
250
 
 * statement transaction for the involved engine.
251
 
 * Currently All PSEA calls are "traced", and the data can not be
252
 
 * changed in a way other than issuing a PSEA call. Important:
253
 
 * unless this invariant is preserved the server will not know that
254
 
 * a transaction in a given engine is read-write and will not
255
 
 * involve the two-phase commit protocol!
256
 
 *
257
 
 * At the end of a statement, TransactionServices::autocommitOrRollback()
258
 
 * is invoked. This call in turn
259
 
 * invokes plugin::XaResourceManager::xapPepare() for every involved XA
260
 
 * resource manager.
261
 
 *
262
 
 * Prepare is followed by a call to plugin::TransactionalStorageEngine::commit()
263
 
 * or plugin::XaResourceManager::xaCommit() (depending on what the resource
264
 
 * is...)
265
 
 * 
266
 
 * If a one-phase commit will suffice, plugin::StorageEngine::prepare() is not
267
 
 * invoked and the server only calls plugin::StorageEngine::commit_one_phase().
268
 
 * At statement commit, the statement-related read-write engine
269
 
 * flag is propagated to the corresponding flag in the normal
270
 
 * transaction.  When the commit is complete, the list of registered
271
 
 * engines is cleared.
272
 
 *
273
 
 * Rollback is handled in a similar fashion.
274
 
 *
275
 
 * Additional notes on DDL and the normal transaction.
276
 
 * ---------------------------------------------------
277
 
 *
278
 
 * CREATE TABLE .. SELECT can start a *new* normal transaction
279
 
 * because of the fact that SELECTs on a transactional storage
280
 
 * engine participate in the normal SQL transaction (due to
281
 
 * isolation level issues and consistent read views).
282
 
 *
283
 
 * Behaviour of the server in this case is currently badly
284
 
 * defined.
285
 
 *
286
 
 * DDL statements use a form of "semantic" logging
287
 
 * to maintain atomicity: if CREATE TABLE .. SELECT failed,
288
 
 * the newly created table is deleted.
289
 
 * 
290
 
 * In addition, some DDL statements issue interim transaction
291
 
 * commits: e.g. ALTER TABLE issues a COMMIT after data is copied
292
 
 * from the original table to the internal temporary table. Other
293
 
 * statements, e.g. CREATE TABLE ... SELECT do not always commit
294
 
 * after itself.
295
 
 *
296
 
 * And finally there is a group of DDL statements such as
297
 
 * RENAME/DROP TABLE that doesn't start a new transaction
298
 
 * and doesn't commit.
299
 
 *
300
 
 * A consistent behaviour is perhaps to always commit the normal
301
 
 * transaction after all DDLs, just like the statement transaction
302
 
 * is always committed at the end of all statements.
303
 
 */
304
 
TransactionServices::TransactionServices()
305
 
{
306
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
307
 
  if (engine)
308
 
  {
309
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
310
 
  }
311
 
  else 
312
 
  {
313
 
    xa_storage_engine= NULL;
314
 
  }
315
 
}
316
 
 
317
 
void TransactionServices::registerResourceForStatement(Session::reference session,
318
 
                                                       plugin::MonitoredInTransaction *monitored,
319
 
                                                       plugin::TransactionalStorageEngine *engine)
320
 
{
321
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
322
 
  {
323
 
    /* 
324
 
     * Now we automatically register this resource manager for the
325
 
     * normal transaction.  This is fine because a statement
326
 
     * transaction registration should always enlist the resource
327
 
     * in the normal transaction which contains the statement
328
 
     * transaction.
329
 
     */
330
 
    registerResourceForTransaction(session, monitored, engine);
331
 
  }
332
 
 
333
 
  TransactionContext *trans= &session.transaction.stmt;
334
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
335
 
 
336
 
  if (resource_context->isStarted())
337
 
    return; /* already registered, return */
338
 
 
339
 
  assert(monitored->participatesInSqlTransaction());
340
 
  assert(not monitored->participatesInXaTransaction());
341
 
 
342
 
  resource_context->setMonitored(monitored);
343
 
  resource_context->setTransactionalStorageEngine(engine);
344
 
  trans->registerResource(resource_context);
345
 
 
346
 
  trans->no_2pc|= true;
347
 
}
348
 
 
349
 
void TransactionServices::registerResourceForStatement(Session::reference session,
350
 
                                                       plugin::MonitoredInTransaction *monitored,
351
 
                                                       plugin::TransactionalStorageEngine *engine,
352
 
                                                       plugin::XaResourceManager *resource_manager)
353
 
{
354
 
  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
355
 
  {
356
 
    /* 
357
 
     * Now we automatically register this resource manager for the
358
 
     * normal transaction.  This is fine because a statement
359
 
     * transaction registration should always enlist the resource
360
 
     * in the normal transaction which contains the statement
361
 
     * transaction.
362
 
     */
363
 
    registerResourceForTransaction(session, monitored, engine, resource_manager);
364
 
  }
365
 
 
366
 
  TransactionContext *trans= &session.transaction.stmt;
367
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
368
 
 
369
 
  if (resource_context->isStarted())
370
 
    return; /* already registered, return */
371
 
 
372
 
  assert(monitored->participatesInXaTransaction());
373
 
  assert(monitored->participatesInSqlTransaction());
374
 
 
375
 
  resource_context->setMonitored(monitored);
376
 
  resource_context->setTransactionalStorageEngine(engine);
377
 
  resource_context->setXaResourceManager(resource_manager);
378
 
  trans->registerResource(resource_context);
379
 
 
380
 
  trans->no_2pc|= false;
381
 
}
382
 
 
383
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
384
 
                                                         plugin::MonitoredInTransaction *monitored,
385
 
                                                         plugin::TransactionalStorageEngine *engine)
386
 
{
387
 
  TransactionContext *trans= &session.transaction.all;
388
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
389
 
 
390
 
  if (resource_context->isStarted())
391
 
    return; /* already registered, return */
392
 
 
393
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
394
 
 
395
 
  trans->registerResource(resource_context);
396
 
 
397
 
  assert(monitored->participatesInSqlTransaction());
398
 
  assert(not monitored->participatesInXaTransaction());
399
 
 
400
 
  resource_context->setMonitored(monitored);
401
 
  resource_context->setTransactionalStorageEngine(engine);
402
 
  trans->no_2pc|= true;
403
 
 
404
 
  if (session.transaction.xid_state.xid.is_null())
405
 
    session.transaction.xid_state.xid.set(session.getQueryId());
406
 
 
407
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
408
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
409
 
    registerResourceForStatement(session, monitored, engine);
410
 
}
411
 
 
412
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
413
 
                                                         plugin::MonitoredInTransaction *monitored,
414
 
                                                         plugin::TransactionalStorageEngine *engine,
415
 
                                                         plugin::XaResourceManager *resource_manager)
416
 
{
417
 
  TransactionContext *trans= &session.transaction.all;
418
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
419
 
 
420
 
  if (resource_context->isStarted())
421
 
    return; /* already registered, return */
422
 
 
423
 
  session.server_status|= SERVER_STATUS_IN_TRANS;
424
 
 
425
 
  trans->registerResource(resource_context);
426
 
 
427
 
  assert(monitored->participatesInSqlTransaction());
428
 
 
429
 
  resource_context->setMonitored(monitored);
430
 
  resource_context->setXaResourceManager(resource_manager);
431
 
  resource_context->setTransactionalStorageEngine(engine);
432
 
  trans->no_2pc|= true;
433
 
 
434
 
  if (session.transaction.xid_state.xid.is_null())
435
 
    session.transaction.xid_state.xid.set(session.getQueryId());
436
 
 
437
 
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
438
 
 
439
 
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
440
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
441
 
    registerResourceForStatement(session, monitored, engine, resource_manager);
442
 
}
443
 
 
444
 
void TransactionServices::allocateNewTransactionId()
445
 
{
446
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
447
 
  if (! replication_services.isActive())
448
 
  {
449
 
    return;
450
 
  }
451
 
 
452
 
  Session *my_session= current_session;
453
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
454
 
  my_session->setXaId(xa_id);
455
 
}
456
 
 
457
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
458
 
{
459
 
  if (session.getXaId() == 0)
460
 
  {
461
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
462
 
  }
463
 
 
464
 
  return session.getXaId();
465
 
}
466
 
 
467
 
int TransactionServices::commitTransaction(Session::reference session,
468
 
                                           bool normal_transaction)
469
 
{
470
 
  int error= 0, cookie= 0;
471
 
  /*
472
 
    'all' means that this is either an explicit commit issued by
473
 
    user, or an implicit commit issued by a DDL.
474
 
  */
475
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
476
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
477
 
 
478
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
479
 
 
480
 
  /*
481
 
    We must not commit the normal transaction if a statement
482
 
    transaction is pending. Otherwise statement transaction
483
 
    flags will not get propagated to its normal transaction's
484
 
    counterpart.
485
 
  */
486
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
487
 
              trans == &session.transaction.stmt);
488
 
 
489
 
  if (resource_contexts.empty() == false)
490
 
  {
491
 
    if (is_real_trans && session.wait_if_global_read_lock(false, false))
492
 
    {
493
 
      rollbackTransaction(session, normal_transaction);
494
 
      return 1;
495
 
    }
496
 
 
497
 
    /*
498
 
     * If replication is on, we do a PREPARE on the resource managers, push the
499
 
     * Transaction message across the replication stream, and then COMMIT if the
500
 
     * replication stream returned successfully.
501
 
     */
502
 
    if (shouldConstructMessages())
503
 
    {
504
 
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
505
 
           it != resource_contexts.end() && ! error;
506
 
           ++it)
507
 
      {
508
 
        ResourceContext *resource_context= *it;
509
 
        int err;
510
 
        /*
511
 
          Do not call two-phase commit if this particular
512
 
          transaction is read-only. This allows for simpler
513
 
          implementation in engines that are always read-only.
514
 
        */
515
 
        if (! resource_context->hasModifiedData())
516
 
          continue;
517
 
 
518
 
        plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
519
 
 
520
 
        if (resource->participatesInXaTransaction())
521
 
        {
522
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
523
 
          {
524
 
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
525
 
            error= 1;
526
 
          }
527
 
          else
528
 
          {
529
 
            session.status_var.ha_prepare_count++;
530
 
          }
531
 
        }
532
 
      }
533
 
      if (error == 0 && is_real_trans)
534
 
      {
535
 
        /*
536
 
         * Push the constructed Transaction messages across to
537
 
         * replicators and appliers.
538
 
         */
539
 
        error= commitTransactionMessage(session);
540
 
      }
541
 
      if (error)
542
 
      {
543
 
        rollbackTransaction(session, normal_transaction);
544
 
        error= 1;
545
 
        goto end;
546
 
      }
547
 
    }
548
 
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
549
 
end:
550
 
    if (is_real_trans)
551
 
      session.startWaitingGlobalReadLock();
552
 
  }
553
 
  return error;
554
 
}
555
 
 
556
 
/**
557
 
  @note
558
 
  This function does not care about global read lock. A caller should.
559
 
*/
560
 
int TransactionServices::commitPhaseOne(Session::reference session,
561
 
                                        bool normal_transaction)
562
 
{
563
 
  int error=0;
564
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
565
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
566
 
 
567
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
568
 
  bool all= normal_transaction;
569
 
 
570
 
  /* If we're in autocommit then we have a real transaction to commit
571
 
     (except if it's BEGIN)
572
 
  */
573
 
  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
574
 
    all= true;
575
 
 
576
 
  if (resource_contexts.empty() == false)
577
 
  {
578
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
579
 
         it != resource_contexts.end();
580
 
         ++it)
581
 
    {
582
 
      int err;
583
 
      ResourceContext *resource_context= *it;
584
 
 
585
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
586
 
 
587
 
      if (resource->participatesInXaTransaction())
588
 
      {
589
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
590
 
        {
591
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
592
 
          error= 1;
593
 
        }
594
 
        else if (normal_transaction)
595
 
        {
596
 
          session.status_var.ha_commit_count++;
597
 
        }
598
 
      }
599
 
      else if (resource->participatesInSqlTransaction())
600
 
      {
601
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
602
 
        {
603
 
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
604
 
          error= 1;
605
 
        }
606
 
        else if (normal_transaction)
607
 
        {
608
 
          session.status_var.ha_commit_count++;
609
 
        }
610
 
      }
611
 
      resource_context->reset(); /* keep it conveniently zero-filled */
612
 
    }
613
 
 
614
 
    if (is_real_trans)
615
 
      session.transaction.xid_state.xid.null();
616
 
 
617
 
    if (normal_transaction)
618
 
    {
619
 
      session.variables.tx_isolation= session.session_tx_isolation;
620
 
      session.transaction.cleanup();
621
 
    }
622
 
  }
623
 
  trans->reset();
624
 
  return error;
625
 
}
626
 
 
627
 
int TransactionServices::rollbackTransaction(Session::reference session,
628
 
                                             bool normal_transaction)
629
 
{
630
 
  int error= 0;
631
 
  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
632
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
633
 
 
634
 
  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
635
 
  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
636
 
 
637
 
  /*
638
 
    We must not rollback the normal transaction if a statement
639
 
    transaction is pending.
640
 
  */
641
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
642
 
              trans == &session.transaction.stmt);
643
 
 
644
 
  if (resource_contexts.empty() == false)
645
 
  {
646
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
647
 
         it != resource_contexts.end();
648
 
         ++it)
649
 
    {
650
 
      int err;
651
 
      ResourceContext *resource_context= *it;
652
 
 
653
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
654
 
 
655
 
      if (resource->participatesInXaTransaction())
656
 
      {
657
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
658
 
        {
659
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
660
 
          error= 1;
661
 
        }
662
 
        else if (normal_transaction)
663
 
        {
664
 
          session.status_var.ha_rollback_count++;
665
 
        }
666
 
      }
667
 
      else if (resource->participatesInSqlTransaction())
668
 
      {
669
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
670
 
        {
671
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
672
 
          error= 1;
673
 
        }
674
 
        else if (normal_transaction)
675
 
        {
676
 
          session.status_var.ha_rollback_count++;
677
 
        }
678
 
      }
679
 
      resource_context->reset(); /* keep it conveniently zero-filled */
680
 
    }
681
 
    
682
 
    /* 
683
 
     * We need to signal the ROLLBACK to ReplicationServices here
684
 
     * BEFORE we set the transaction ID to NULL.  This is because
685
 
     * if a bulk segment was sent to replicators, we need to send
686
 
     * a rollback statement with the corresponding transaction ID
687
 
     * to rollback.
688
 
     */
689
 
    if (all)
690
 
      rollbackTransactionMessage(session);
691
 
    else
692
 
      rollbackStatementMessage(session);
693
 
 
694
 
    if (is_real_trans)
695
 
      session.transaction.xid_state.xid.null();
696
 
    if (normal_transaction)
697
 
    {
698
 
      session.variables.tx_isolation=session.session_tx_isolation;
699
 
      session.transaction.cleanup();
700
 
    }
701
 
  }
702
 
  if (normal_transaction)
703
 
    session.transaction_rollback_request= false;
704
 
 
705
 
  /*
706
 
   * If a non-transactional table was updated, warn the user
707
 
   */
708
 
  if (is_real_trans &&
709
 
      session.transaction.all.hasModifiedNonTransData() &&
710
 
      session.getKilled() != Session::KILL_CONNECTION)
711
 
  {
712
 
    push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
713
 
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
714
 
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
715
 
  }
716
 
  trans->reset();
717
 
  return error;
718
 
}
719
 
 
720
 
int TransactionServices::autocommitOrRollback(Session::reference session,
721
 
                                              int error)
722
 
{
723
 
  /* One GPB Statement message per SQL statement */
724
 
  message::Statement *statement= session.getStatementMessage();
725
 
  if ((statement != NULL) && (! error))
726
 
    finalizeStatementMessage(*statement, session);
727
 
 
728
 
  if (session.transaction.stmt.getResourceContexts().empty() == false)
729
 
  {
730
 
    TransactionContext *trans = &session.transaction.stmt;
731
 
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
732
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
733
 
         it != resource_contexts.end();
734
 
         ++it)
735
 
    {
736
 
      ResourceContext *resource_context= *it;
737
 
 
738
 
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
739
 
    }
740
 
 
741
 
    if (! error)
742
 
    {
743
 
      if (commitTransaction(session, false))
744
 
        error= 1;
745
 
    }
746
 
    else
747
 
    {
748
 
      (void) rollbackTransaction(session, false);
749
 
      if (session.transaction_rollback_request)
750
 
      {
751
 
        (void) rollbackTransaction(session, true);
752
 
        session.server_status&= ~SERVER_STATUS_IN_TRANS;
753
 
      }
754
 
    }
755
 
 
756
 
    session.variables.tx_isolation= session.session_tx_isolation;
757
 
  }
758
 
 
759
 
  return error;
760
 
}
761
 
 
762
 
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
763
 
{
764
 
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
765
 
  {
766
 
    /* The below is perfectly fine, since we're simply comparing addresses for the underlying
767
 
     * resources aren't the same... */
768
 
    return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
769
 
  }
770
 
};
771
 
 
772
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
773
 
                                             NamedSavepoint &sv)
774
 
{
775
 
  int error= 0;
776
 
  TransactionContext *trans= &session.transaction.all;
777
 
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
778
 
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
779
 
 
780
 
  trans->no_2pc= false;
781
 
  /*
782
 
    rolling back to savepoint in all storage engines that were part of the
783
 
    transaction when the savepoint was set
784
 
  */
785
 
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
786
 
       it != sv_resource_contexts.end();
787
 
       ++it)
788
 
  {
789
 
    int err;
790
 
    ResourceContext *resource_context= *it;
791
 
 
792
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
793
 
 
794
 
    if (resource->participatesInSqlTransaction())
795
 
    {
796
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
797
 
      {
798
 
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
799
 
        error= 1;
800
 
      }
801
 
      else
802
 
      {
803
 
        session.status_var.ha_savepoint_rollback_count++;
804
 
      }
805
 
    }
806
 
    trans->no_2pc|= not resource->participatesInXaTransaction();
807
 
  }
808
 
  /*
809
 
    rolling back the transaction in all storage engines that were not part of
810
 
    the transaction when the savepoint was set
811
 
  */
812
 
  {
813
 
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
814
 
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
815
 
    TransactionContext::ResourceContexts set_difference_contexts;
816
 
 
817
 
    /* 
818
 
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
819
 
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
820
 
     * here
821
 
     */
822
 
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
823
 
 
824
 
    sort(sorted_tran_resource_contexts.begin(),
825
 
         sorted_tran_resource_contexts.end(),
826
 
         ResourceContextCompare());
827
 
    sort(sorted_sv_resource_contexts.begin(),
828
 
         sorted_sv_resource_contexts.end(),
829
 
         ResourceContextCompare());
830
 
    set_difference(sorted_tran_resource_contexts.begin(),
831
 
                   sorted_tran_resource_contexts.end(),
832
 
                   sorted_sv_resource_contexts.begin(),
833
 
                   sorted_sv_resource_contexts.end(),
834
 
                   set_difference_contexts.begin(),
835
 
                   ResourceContextCompare());
836
 
    /* 
837
 
     * set_difference_contexts now contains all resource contexts
838
 
     * which are in the transaction context but were NOT in the
839
 
     * savepoint's resource contexts.
840
 
     */
841
 
        
842
 
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
843
 
         it != set_difference_contexts.end();
844
 
         ++it)
845
 
    {
846
 
      ResourceContext *resource_context= *it;
847
 
      int err;
848
 
 
849
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
850
 
 
851
 
      if (resource->participatesInSqlTransaction())
852
 
      {
853
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
854
 
        {
855
 
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
856
 
          error= 1;
857
 
        }
858
 
        else
859
 
        {
860
 
          session.status_var.ha_rollback_count++;
861
 
        }
862
 
      }
863
 
      resource_context->reset(); /* keep it conveniently zero-filled */
864
 
    }
865
 
  }
866
 
  trans->setResourceContexts(sv_resource_contexts);
867
 
 
868
 
  if (shouldConstructMessages())
869
 
  {
870
 
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
871
 
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
872
 
    if (savepoint_transaction != NULL)
873
 
    {
874
 
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
875
 
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
876
 
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
877
 
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
878
 
      */ 
879
 
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
880
 
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
881
 
      if (num_statements == 0)
882
 
      {    
883
 
        session.setStatementMessage(NULL);
884
 
      }    
885
 
      else 
886
 
      {
887
 
        session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
888
 
      }    
889
 
      session.setTransactionMessage(savepoint_transaction_copy);
890
 
    }
891
 
  }
892
 
 
893
 
  return error;
894
 
}
895
 
 
896
 
/**
897
 
  @note
898
 
  according to the sql standard (ISO/IEC 9075-2:2003)
899
 
  section "4.33.4 SQL-statements and transaction states",
900
 
  NamedSavepoint is *not* transaction-initiating SQL-statement
901
 
*/
902
 
int TransactionServices::setSavepoint(Session::reference session,
903
 
                                      NamedSavepoint &sv)
904
 
{
905
 
  int error= 0;
906
 
  TransactionContext *trans= &session.transaction.all;
907
 
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
908
 
 
909
 
  if (resource_contexts.empty() == false)
910
 
  {
911
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
912
 
         it != resource_contexts.end();
913
 
         ++it)
914
 
    {
915
 
      ResourceContext *resource_context= *it;
916
 
      int err;
917
 
 
918
 
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
919
 
 
920
 
      if (resource->participatesInSqlTransaction())
921
 
      {
922
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
923
 
        {
924
 
          my_error(ER_GET_ERRNO, MYF(0), err);
925
 
          error= 1;
926
 
        }
927
 
        else
928
 
        {
929
 
          session.status_var.ha_savepoint_count++;
930
 
        }
931
 
      }
932
 
    }
933
 
  }
934
 
  /*
935
 
    Remember the list of registered storage engines.
936
 
  */
937
 
  sv.setResourceContexts(resource_contexts);
938
 
 
939
 
  if (shouldConstructMessages())
940
 
  {
941
 
    message::Transaction *transaction= session.getTransactionMessage();
942
 
                  
943
 
    if (transaction != NULL)
944
 
    {
945
 
      message::Transaction *transaction_savepoint= 
946
 
        new message::Transaction(*transaction);
947
 
      sv.setTransactionMessage(transaction_savepoint);
948
 
    }
949
 
  } 
950
 
 
951
 
  return error;
952
 
}
953
 
 
954
 
int TransactionServices::releaseSavepoint(Session::reference session,
955
 
                                          NamedSavepoint &sv)
956
 
{
957
 
  int error= 0;
958
 
 
959
 
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
960
 
 
961
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
962
 
       it != resource_contexts.end();
963
 
       ++it)
964
 
  {
965
 
    int err;
966
 
    ResourceContext *resource_context= *it;
967
 
 
968
 
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
969
 
 
970
 
    if (resource->participatesInSqlTransaction())
971
 
    {
972
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
973
 
      {
974
 
        my_error(ER_GET_ERRNO, MYF(0), err);
975
 
        error= 1;
976
 
      }
977
 
    }
978
 
  }
979
 
  
980
 
  return error;
981
 
}
982
 
 
983
 
bool TransactionServices::shouldConstructMessages()
984
 
{
985
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
986
 
  return replication_services.isActive();
987
 
}
988
 
 
989
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
990
 
                                                                       bool should_inc_trx_id)
991
 
{
992
 
  message::Transaction *transaction= session.getTransactionMessage();
993
 
 
994
 
  if (unlikely(transaction == NULL))
995
 
  {
996
 
    /* 
997
 
     * Allocate and initialize a new transaction message 
998
 
     * for this Session object.  Session is responsible for
999
 
     * deleting transaction message when done with it.
1000
 
     */
1001
 
    transaction= new (nothrow) message::Transaction();
1002
 
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1003
 
    session.setTransactionMessage(transaction);
1004
 
    return transaction;
1005
 
  }
1006
 
  else
1007
 
    return transaction;
1008
 
}
1009
 
 
1010
 
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1011
 
                                                 Session::reference session,
1012
 
                                                 bool should_inc_trx_id)
1013
 
{
1014
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1015
 
  trx->set_server_id(session.getServerId());
1016
 
 
1017
 
  if (should_inc_trx_id)
1018
 
  {
1019
 
    trx->set_transaction_id(getCurrentTransactionId(session));
1020
 
    session.setXaId(0);
1021
 
  }
1022
 
  else
1023
 
  {
1024
 
    /* trx and seg id will get set properly elsewhere */
1025
 
    trx->set_transaction_id(0);
1026
 
  }
1027
 
 
1028
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
1029
 
  
1030
 
  /* segment info may get set elsewhere as needed */
1031
 
  transaction.set_segment_id(1);
1032
 
  transaction.set_end_segment(true);
1033
 
}
1034
 
 
1035
 
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1036
 
                                                     Session::const_reference session)
1037
 
{
1038
 
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1039
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
1040
 
}
1041
 
 
1042
 
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1043
 
                                                    Session::reference session)
1044
 
{
1045
 
  delete transaction;
1046
 
  session.setStatementMessage(NULL);
1047
 
  session.setTransactionMessage(NULL);
1048
 
  session.setXaId(0);
1049
 
}
1050
 
 
1051
 
int TransactionServices::commitTransactionMessage(Session::reference session)
1052
 
{
1053
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1054
 
  if (! replication_services.isActive())
1055
 
    return 0;
1056
 
 
1057
 
  /*
1058
 
   * If no Transaction message was ever created, then no data modification
1059
 
   * occurred inside the transaction, so nothing to do.
1060
 
   */
1061
 
  if (session.getTransactionMessage() == NULL)
1062
 
    return 0;
1063
 
  
1064
 
  /* If there is an active statement message, finalize it. */
1065
 
  message::Statement *statement= session.getStatementMessage();
1066
 
 
1067
 
  if (statement != NULL)
1068
 
  {
1069
 
    finalizeStatementMessage(*statement, session);
1070
 
  }
1071
 
 
1072
 
  message::Transaction* transaction= getActiveTransactionMessage(session);
1073
 
 
1074
 
  /*
1075
 
   * It is possible that we could have a Transaction without any Statements
1076
 
   * if we had created a Statement but had to roll it back due to it failing
1077
 
   * mid-execution, and no subsequent Statements were added to the Transaction
1078
 
   * message. In this case, we simply clean up the message and not push it.
1079
 
   */
1080
 
  if (transaction->statement_size() == 0)
1081
 
  {
1082
 
    cleanupTransactionMessage(transaction, session);
1083
 
    return 0;
1084
 
  }
1085
 
  
1086
 
  finalizeTransactionMessage(*transaction, session);
1087
 
  
1088
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
1089
 
 
1090
 
  cleanupTransactionMessage(transaction, session);
1091
 
 
1092
 
  return static_cast<int>(result);
1093
 
}
1094
 
 
1095
 
void TransactionServices::initStatementMessage(message::Statement &statement,
1096
 
                                               message::Statement::Type type,
1097
 
                                               Session::const_reference session)
1098
 
{
1099
 
  statement.set_type(type);
1100
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
1101
 
 
1102
 
  if (session.variables.replicate_query)
1103
 
    statement.set_sql(session.getQueryString()->c_str());
1104
 
}
1105
 
 
1106
 
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1107
 
                                                   Session::reference session)
1108
 
{
1109
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
1110
 
  session.setStatementMessage(NULL);
1111
 
}
1112
 
 
1113
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1114
 
{
1115
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1116
 
  if (! replication_services.isActive())
1117
 
    return;
1118
 
  
1119
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1120
 
 
1121
 
  /*
1122
 
   * OK, so there are two situations that we need to deal with here:
1123
 
   *
1124
 
   * 1) We receive an instruction to ROLLBACK the current transaction
1125
 
   *    and the currently-stored Transaction message is *self-contained*, 
1126
 
   *    meaning that no Statement messages in the Transaction message
1127
 
   *    contain a message having its segment_id member greater than 1.  If
1128
 
   *    no non-segment ID 1 members are found, we can simply clear the
1129
 
   *    current Transaction message and remove it from memory.
1130
 
   *
1131
 
   * 2) If the Transaction message does indeed have a non-end segment, that
1132
 
   *    means that a bulk update/delete/insert Transaction message segment
1133
 
   *    has previously been sent over the wire to replicators.  In this case, 
1134
 
   *    we need to package a Transaction with a Statement message of type
1135
 
   *    ROLLBACK to indicate to replicators that previously-transmitted
1136
 
   *    messages must be un-applied.
1137
 
   */
1138
 
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1139
 
  {
1140
 
    /* Remember the transaction ID so we can re-use it */
1141
 
    uint64_t trx_id= transaction->transaction_context().transaction_id();
1142
 
    uint32_t seg_id= transaction->segment_id();
1143
 
 
1144
 
    /*
1145
 
     * Clear the transaction, create a Rollback statement message, 
1146
 
     * attach it to the transaction, and push it to replicators.
1147
 
     */
1148
 
    transaction->Clear();
1149
 
    initTransactionMessage(*transaction, session, false);
1150
 
 
1151
 
    /* Set the transaction ID to match the previous messages */
1152
 
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1153
 
    transaction->set_segment_id(seg_id);
1154
 
    transaction->set_end_segment(true);
1155
 
 
1156
 
    message::Statement *statement= transaction->add_statement();
1157
 
 
1158
 
    initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1159
 
    finalizeStatementMessage(*statement, session);
1160
 
 
1161
 
    finalizeTransactionMessage(*transaction, session);
1162
 
    
1163
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
1164
 
  }
1165
 
 
1166
 
  cleanupTransactionMessage(transaction, session);
1167
 
}
1168
 
 
1169
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
1170
 
{
1171
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1172
 
  if (! replication_services.isActive())
1173
 
    return;
1174
 
 
1175
 
  message::Statement *current_statement= session.getStatementMessage();
1176
 
 
1177
 
  /* If we never added a Statement message, nothing to undo. */
1178
 
  if (current_statement == NULL)
1179
 
    return;
1180
 
 
1181
 
  /*
1182
 
   * If the Statement has been segmented, then we've already pushed a portion
1183
 
   * of this Statement's row changes through the replication stream and we
1184
 
   * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1185
 
   * delete the current Statement message.
1186
 
   */
1187
 
  bool is_segmented= false;
1188
 
 
1189
 
  switch (current_statement->type())
1190
 
  {
1191
 
    case message::Statement::INSERT:
1192
 
      if (current_statement->insert_data().segment_id() > 1)
1193
 
        is_segmented= true;
1194
 
      break;
1195
 
 
1196
 
    case message::Statement::UPDATE:
1197
 
      if (current_statement->update_data().segment_id() > 1)
1198
 
        is_segmented= true;
1199
 
      break;
1200
 
 
1201
 
    case message::Statement::DELETE:
1202
 
      if (current_statement->delete_data().segment_id() > 1)
1203
 
        is_segmented= true;
1204
 
      break;
1205
 
 
1206
 
    default:
1207
 
      break;
1208
 
  }
1209
 
 
1210
 
  /*
1211
 
   * Remove the Statement message we've been working with (same as
1212
 
   * current_statement).
1213
 
   */
1214
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1215
 
  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1216
 
  statements_in_txn= transaction->mutable_statement();
1217
 
  statements_in_txn->RemoveLast();
1218
 
  session.setStatementMessage(NULL);
1219
 
  
1220
 
  /*
1221
 
   * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1222
 
   * an indicator to cancel the previous Statement message which should have
1223
 
   * had its end_segment attribute set to false.
1224
 
   */
1225
 
  if (is_segmented)
1226
 
  {
1227
 
    current_statement= transaction->add_statement();
1228
 
    initStatementMessage(*current_statement,
1229
 
                         message::Statement::ROLLBACK_STATEMENT,
1230
 
                         session);
1231
 
    finalizeStatementMessage(*current_statement, session);
1232
 
  }
1233
 
}
1234
 
 
1235
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
1236
 
                                                                     message::Transaction *transaction)
1237
 
{
1238
 
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1239
 
  uint32_t seg_id= transaction->segment_id();
1240
 
  
1241
 
  transaction->set_end_segment(false);
1242
 
  commitTransactionMessage(session);
1243
 
  transaction= getActiveTransactionMessage(session, false);
1244
 
  
1245
 
  /* Set the transaction ID to match the previous messages */
1246
 
  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1247
 
  transaction->set_segment_id(seg_id + 1);
1248
 
  transaction->set_end_segment(true);
1249
 
 
1250
 
  return transaction;
1251
 
}
1252
 
 
1253
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
1254
 
                                                            Table &table,
1255
 
                                                            uint32_t *next_segment_id)
1256
 
{
1257
 
  message::Statement *statement= session.getStatementMessage();
1258
 
  message::Transaction *transaction= NULL;
1259
 
  
1260
 
  /*
1261
 
   * If statement is NULL, this is a new statement.
1262
 
   * If statement is NOT NULL, this a continuation of the same statement.
1263
 
   * This is because autocommitOrRollback() finalizes the statement so that
1264
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1265
 
   * share a single GPB message for multiple statements).
1266
 
   */
1267
 
  if (statement == NULL)
1268
 
  {
1269
 
    transaction= getActiveTransactionMessage(session);
1270
 
 
1271
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1272
 
        transaction_message_threshold)
1273
 
    {
1274
 
      transaction= segmentTransactionMessage(session, transaction);
1275
 
    }
1276
 
 
1277
 
    statement= transaction->add_statement();
1278
 
    setInsertHeader(*statement, session, table);
1279
 
    session.setStatementMessage(statement);
1280
 
  }
1281
 
  else
1282
 
  {
1283
 
    transaction= getActiveTransactionMessage(session);
1284
 
    
1285
 
    /*
1286
 
     * If we've passed our threshold for the statement size (possible for
1287
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1288
 
     * the Transaction will keep it from getting huge).
1289
 
     */
1290
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1291
 
        transaction_message_threshold)
1292
 
    {
1293
 
      /* Remember the transaction ID so we can re-use it */
1294
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1295
 
      uint32_t seg_id= transaction->segment_id();
1296
 
      
1297
 
      message::InsertData *current_data= statement->mutable_insert_data();
1298
 
      
1299
 
      /* Caller should use this value when adding a new record */
1300
 
      *next_segment_id= current_data->segment_id() + 1;
1301
 
      
1302
 
      current_data->set_end_segment(false);
1303
 
      transaction->set_end_segment(false);
1304
 
      
1305
 
      /* 
1306
 
       * Send the trx message to replicators after finalizing the 
1307
 
       * statement and transaction. This will also set the Transaction
1308
 
       * and Statement objects in Session to NULL.
1309
 
       */
1310
 
      commitTransactionMessage(session);
1311
 
      
1312
 
      /*
1313
 
       * Statement and Transaction should now be NULL, so new ones will get
1314
 
       * created. We reuse the transaction id since we are segmenting
1315
 
       * one transaction.
1316
 
       */
1317
 
      transaction= getActiveTransactionMessage(session, false);
1318
 
      assert(transaction != NULL);
1319
 
 
1320
 
      statement= transaction->add_statement();
1321
 
      setInsertHeader(*statement, session, table);
1322
 
      session.setStatementMessage(statement);
1323
 
            
1324
 
      /* Set the transaction ID to match the previous messages */
1325
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1326
 
      transaction->set_segment_id(seg_id + 1);
1327
 
      transaction->set_end_segment(true);
1328
 
    }
1329
 
    else
1330
 
    {
1331
 
      /*
1332
 
       * Continuation of the same statement. Carry forward the existing
1333
 
       * segment id.
1334
 
       */
1335
 
      const message::InsertData &current_data= statement->insert_data();
1336
 
      *next_segment_id= current_data.segment_id();
1337
 
    }
1338
 
  }
1339
 
  
1340
 
  return *statement;
1341
 
}
1342
 
 
1343
 
void TransactionServices::setInsertHeader(message::Statement &statement,
1344
 
                                          Session::const_reference session,
1345
 
                                          Table &table)
1346
 
{
1347
 
  initStatementMessage(statement, message::Statement::INSERT, session);
1348
 
 
1349
 
  /* 
1350
 
   * Now we construct the specialized InsertHeader message inside
1351
 
   * the generalized message::Statement container...
1352
 
   */
1353
 
  /* Set up the insert header */
1354
 
  message::InsertHeader *header= statement.mutable_insert_header();
1355
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1356
 
 
1357
 
  string schema_name;
1358
 
  (void) table.getShare()->getSchemaName(schema_name);
1359
 
  string table_name;
1360
 
  (void) table.getShare()->getTableName(table_name);
1361
 
 
1362
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1363
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1364
 
 
1365
 
  Field *current_field;
1366
 
  Field **table_fields= table.getFields();
1367
 
 
1368
 
  message::FieldMetadata *field_metadata;
1369
 
 
1370
 
  /* We will read all the table's fields... */
1371
 
  table.setReadSet();
1372
 
 
1373
 
  while ((current_field= *table_fields++) != NULL) 
1374
 
  {
1375
 
    field_metadata= header->add_field_metadata();
1376
 
    field_metadata->set_name(current_field->field_name);
1377
 
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1378
 
  }
1379
 
}
1380
 
 
1381
 
bool TransactionServices::insertRecord(Session::reference session,
1382
 
                                       Table &table)
1383
 
{
1384
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1385
 
  if (! replication_services.isActive())
1386
 
    return false;
1387
 
 
1388
 
  if (not table.getShare()->replicate())
1389
 
    return false;
1390
 
 
1391
 
  /**
1392
 
   * We do this check here because we don't want to even create a 
1393
 
   * statement if there isn't a primary key on the table...
1394
 
   *
1395
 
   * @todo
1396
 
   *
1397
 
   * Multi-column primary keys are handled how exactly?
1398
 
   */
1399
 
  if (not table.getShare()->hasPrimaryKey())
1400
 
  {
1401
 
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1402
 
    return true;
1403
 
  }
1404
 
 
1405
 
  uint32_t next_segment_id= 1;
1406
 
  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1407
 
 
1408
 
  message::InsertData *data= statement.mutable_insert_data();
1409
 
  data->set_segment_id(next_segment_id);
1410
 
  data->set_end_segment(true);
1411
 
  message::InsertRecord *record= data->add_record();
1412
 
 
1413
 
  Field *current_field;
1414
 
  Field **table_fields= table.getFields();
1415
 
 
1416
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1417
 
  string_value->set_charset(system_charset_info);
1418
 
 
1419
 
  /* We will read all the table's fields... */
1420
 
  table.setReadSet();
1421
 
 
1422
 
  while ((current_field= *table_fields++) != NULL) 
1423
 
  {
1424
 
    if (current_field->is_null())
1425
 
    {
1426
 
      record->add_is_null(true);
1427
 
      record->add_insert_value("", 0);
1428
 
    } 
1429
 
    else 
1430
 
    {
1431
 
      string_value= current_field->val_str_internal(string_value);
1432
 
      record->add_is_null(false);
1433
 
      record->add_insert_value(string_value->c_ptr(), string_value->length());
1434
 
      string_value->free();
1435
 
    }
1436
 
  }
1437
 
  return false;
1438
 
}
1439
 
 
1440
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
1441
 
                                                            Table &table,
1442
 
                                                            const unsigned char *old_record, 
1443
 
                                                            const unsigned char *new_record,
1444
 
                                                            uint32_t *next_segment_id)
1445
 
{
1446
 
  message::Statement *statement= session.getStatementMessage();
1447
 
  message::Transaction *transaction= NULL;
1448
 
 
1449
 
  /*
1450
 
   * If statement is NULL, this is a new statement.
1451
 
   * If statement is NOT NULL, this a continuation of the same statement.
1452
 
   * This is because autocommitOrRollback() finalizes the statement so that
1453
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1454
 
   * share a single GPB message for multiple statements).
1455
 
   */
1456
 
  if (statement == NULL)
1457
 
  {
1458
 
    transaction= getActiveTransactionMessage(session);
1459
 
    
1460
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1461
 
        transaction_message_threshold)
1462
 
    {
1463
 
      transaction= segmentTransactionMessage(session, transaction);
1464
 
    }
1465
 
    
1466
 
    statement= transaction->add_statement();
1467
 
    setUpdateHeader(*statement, session, table, old_record, new_record);
1468
 
    session.setStatementMessage(statement);
1469
 
  }
1470
 
  else
1471
 
  {
1472
 
    transaction= getActiveTransactionMessage(session);
1473
 
    
1474
 
    /*
1475
 
     * If we've passed our threshold for the statement size (possible for
1476
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1477
 
     * the Transaction will keep it from getting huge).
1478
 
     */
1479
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1480
 
        transaction_message_threshold)
1481
 
    {
1482
 
      /* Remember the transaction ID so we can re-use it */
1483
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1484
 
      uint32_t seg_id= transaction->segment_id();
1485
 
      
1486
 
      message::UpdateData *current_data= statement->mutable_update_data();
1487
 
      
1488
 
      /* Caller should use this value when adding a new record */
1489
 
      *next_segment_id= current_data->segment_id() + 1;
1490
 
      
1491
 
      current_data->set_end_segment(false);
1492
 
      transaction->set_end_segment(false);
1493
 
      
1494
 
      /* 
1495
 
       * Send the trx message to replicators after finalizing the 
1496
 
       * statement and transaction. This will also set the Transaction
1497
 
       * and Statement objects in Session to NULL.
1498
 
       */
1499
 
      commitTransactionMessage(session);
1500
 
      
1501
 
      /*
1502
 
       * Statement and Transaction should now be NULL, so new ones will get
1503
 
       * created. We reuse the transaction id since we are segmenting
1504
 
       * one transaction.
1505
 
       */
1506
 
      transaction= getActiveTransactionMessage(session, false);
1507
 
      assert(transaction != NULL);
1508
 
      
1509
 
      statement= transaction->add_statement();
1510
 
      setUpdateHeader(*statement, session, table, old_record, new_record);
1511
 
      session.setStatementMessage(statement);
1512
 
      
1513
 
      /* Set the transaction ID to match the previous messages */
1514
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1515
 
      transaction->set_segment_id(seg_id + 1);
1516
 
      transaction->set_end_segment(true);
1517
 
    }
1518
 
    else
1519
 
    {
1520
 
      /*
1521
 
       * Continuation of the same statement. Carry forward the existing
1522
 
       * segment id.
1523
 
       */
1524
 
      const message::UpdateData &current_data= statement->update_data();
1525
 
      *next_segment_id= current_data.segment_id();
1526
 
    }
1527
 
  }
1528
 
  
1529
 
  return *statement;
1530
 
}
1531
 
 
1532
 
void TransactionServices::setUpdateHeader(message::Statement &statement,
1533
 
                                          Session::const_reference session,
1534
 
                                          Table &table,
1535
 
                                          const unsigned char *old_record, 
1536
 
                                          const unsigned char *new_record)
1537
 
{
1538
 
  initStatementMessage(statement, message::Statement::UPDATE, session);
1539
 
 
1540
 
  /* 
1541
 
   * Now we construct the specialized UpdateHeader message inside
1542
 
   * the generalized message::Statement container...
1543
 
   */
1544
 
  /* Set up the update header */
1545
 
  message::UpdateHeader *header= statement.mutable_update_header();
1546
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1547
 
 
1548
 
  string schema_name;
1549
 
  (void) table.getShare()->getSchemaName(schema_name);
1550
 
  string table_name;
1551
 
  (void) table.getShare()->getTableName(table_name);
1552
 
 
1553
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1554
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1555
 
 
1556
 
  Field *current_field;
1557
 
  Field **table_fields= table.getFields();
1558
 
 
1559
 
  message::FieldMetadata *field_metadata;
1560
 
 
1561
 
  /* We will read all the table's fields... */
1562
 
  table.setReadSet();
1563
 
 
1564
 
  while ((current_field= *table_fields++) != NULL) 
1565
 
  {
1566
 
    /*
1567
 
     * We add the "key field metadata" -- i.e. the fields which is
1568
 
     * the primary key for the table.
1569
 
     */
1570
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1571
 
    {
1572
 
      field_metadata= header->add_key_field_metadata();
1573
 
      field_metadata->set_name(current_field->field_name);
1574
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1575
 
    }
1576
 
 
1577
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
1578
 
    {
1579
 
      /* Field is changed from old to new */
1580
 
      field_metadata= header->add_set_field_metadata();
1581
 
      field_metadata->set_name(current_field->field_name);
1582
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1583
 
    }
1584
 
  }
1585
 
}
1586
 
 
1587
 
void TransactionServices::updateRecord(Session::reference session,
1588
 
                                       Table &table, 
1589
 
                                       const unsigned char *old_record, 
1590
 
                                       const unsigned char *new_record)
1591
 
{
1592
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1593
 
  if (! replication_services.isActive())
1594
 
    return;
1595
 
 
1596
 
  if (not table.getShare()->replicate())
1597
 
    return;
1598
 
 
1599
 
  uint32_t next_segment_id= 1;
1600
 
  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1601
 
 
1602
 
  message::UpdateData *data= statement.mutable_update_data();
1603
 
  data->set_segment_id(next_segment_id);
1604
 
  data->set_end_segment(true);
1605
 
  message::UpdateRecord *record= data->add_record();
1606
 
 
1607
 
  Field *current_field;
1608
 
  Field **table_fields= table.getFields();
1609
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1610
 
  string_value->set_charset(system_charset_info);
1611
 
 
1612
 
  while ((current_field= *table_fields++) != NULL) 
1613
 
  {
1614
 
    /*
1615
 
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
1616
 
     * but then realized that an UPDATE statement could potentially have different values for
1617
 
     * the SET field.  For instance, imagine this SQL scenario:
1618
 
     *
1619
 
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1620
 
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1621
 
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1622
 
     *
1623
 
     * We will generate two UpdateRecord messages with different set_value byte arrays.
1624
 
     */
1625
 
    if (isFieldUpdated(current_field, table, old_record, new_record))
1626
 
    {
1627
 
      /* Store the original "read bit" for this field */
1628
 
      bool is_read_set= current_field->isReadSet();
1629
 
 
1630
 
      /* We need to mark that we will "read" this field... */
1631
 
      table.setReadSet(current_field->position());
1632
 
 
1633
 
      /* Read the string value of this field's contents */
1634
 
      string_value= current_field->val_str_internal(string_value);
1635
 
 
1636
 
      /* 
1637
 
       * Reset the read bit after reading field to its original state.  This 
1638
 
       * prevents the field from being included in the WHERE clause
1639
 
       */
1640
 
      current_field->setReadSet(is_read_set);
1641
 
 
1642
 
      if (current_field->is_null())
1643
 
      {
1644
 
        record->add_is_null(true);
1645
 
        record->add_after_value("", 0);
1646
 
      }
1647
 
      else
1648
 
      {
1649
 
        record->add_is_null(false);
1650
 
        record->add_after_value(string_value->c_ptr(), string_value->length());
1651
 
      }
1652
 
      string_value->free();
1653
 
    }
1654
 
 
1655
 
    /* 
1656
 
     * Add the WHERE clause values now...for now, this means the
1657
 
     * primary key field value.  Replication only supports tables
1658
 
     * with a primary key.
1659
 
     */
1660
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1661
 
    {
1662
 
      /**
1663
 
       * To say the below is ugly is an understatement. But it works.
1664
 
       * 
1665
 
       * @todo Move this crap into a real Record API.
1666
 
       */
1667
 
      string_value= current_field->val_str_internal(string_value,
1668
 
                                                    old_record + 
1669
 
                                                    current_field->offset(const_cast<unsigned char *>(new_record)));
1670
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1671
 
      string_value->free();
1672
 
    }
1673
 
 
1674
 
  }
1675
 
}
1676
 
 
1677
 
bool TransactionServices::isFieldUpdated(Field *current_field,
1678
 
                                         Table &table,
1679
 
                                         const unsigned char *old_record,
1680
 
                                         const unsigned char *new_record)
1681
 
{
1682
 
  /*
1683
 
   * The below really should be moved into the Field API and Record API.  But for now
1684
 
   * we do this crazy pointer fiddling to figure out if the current field
1685
 
   * has been updated in the supplied record raw byte pointers.
1686
 
   */
1687
 
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1688
 
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1689
 
 
1690
 
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1691
 
 
1692
 
  bool old_value_is_null= current_field->is_null_in_record(old_record);
1693
 
  bool new_value_is_null= current_field->is_null_in_record(new_record);
1694
 
 
1695
 
  bool isUpdated= false;
1696
 
  if (old_value_is_null != new_value_is_null)
1697
 
  {
1698
 
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1699
 
    {
1700
 
      isUpdated= true;
1701
 
    }
1702
 
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1703
 
    {
1704
 
      isUpdated= true;
1705
 
    }
1706
 
  }
1707
 
 
1708
 
  if (! isUpdated)
1709
 
  {
1710
 
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
1711
 
    {
1712
 
      isUpdated= true;
1713
 
    }
1714
 
  }
1715
 
  return isUpdated;
1716
 
}  
1717
 
 
1718
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
1719
 
                                                            Table &table,
1720
 
                                                            uint32_t *next_segment_id)
1721
 
{
1722
 
  message::Statement *statement= session.getStatementMessage();
1723
 
  message::Transaction *transaction= NULL;
1724
 
 
1725
 
  /*
1726
 
   * If statement is NULL, this is a new statement.
1727
 
   * If statement is NOT NULL, this a continuation of the same statement.
1728
 
   * This is because autocommitOrRollback() finalizes the statement so that
1729
 
   * we guarantee only one Statement message per statement (i.e., we no longer
1730
 
   * share a single GPB message for multiple statements).
1731
 
   */
1732
 
  if (statement == NULL)
1733
 
  {
1734
 
    transaction= getActiveTransactionMessage(session);
1735
 
    
1736
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1737
 
        transaction_message_threshold)
1738
 
    {
1739
 
      transaction= segmentTransactionMessage(session, transaction);
1740
 
    }
1741
 
    
1742
 
    statement= transaction->add_statement();
1743
 
    setDeleteHeader(*statement, session, table);
1744
 
    session.setStatementMessage(statement);
1745
 
  }
1746
 
  else
1747
 
  {
1748
 
    transaction= getActiveTransactionMessage(session);
1749
 
    
1750
 
    /*
1751
 
     * If we've passed our threshold for the statement size (possible for
1752
 
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1753
 
     * the Transaction will keep it from getting huge).
1754
 
     */
1755
 
    if (static_cast<size_t>(transaction->ByteSize()) >= 
1756
 
        transaction_message_threshold)
1757
 
    {
1758
 
      /* Remember the transaction ID so we can re-use it */
1759
 
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1760
 
      uint32_t seg_id= transaction->segment_id();
1761
 
      
1762
 
      message::DeleteData *current_data= statement->mutable_delete_data();
1763
 
      
1764
 
      /* Caller should use this value when adding a new record */
1765
 
      *next_segment_id= current_data->segment_id() + 1;
1766
 
      
1767
 
      current_data->set_end_segment(false);
1768
 
      transaction->set_end_segment(false);
1769
 
      
1770
 
      /* 
1771
 
       * Send the trx message to replicators after finalizing the 
1772
 
       * statement and transaction. This will also set the Transaction
1773
 
       * and Statement objects in Session to NULL.
1774
 
       */
1775
 
      commitTransactionMessage(session);
1776
 
      
1777
 
      /*
1778
 
       * Statement and Transaction should now be NULL, so new ones will get
1779
 
       * created. We reuse the transaction id since we are segmenting
1780
 
       * one transaction.
1781
 
       */
1782
 
      transaction= getActiveTransactionMessage(session, false);
1783
 
      assert(transaction != NULL);
1784
 
      
1785
 
      statement= transaction->add_statement();
1786
 
      setDeleteHeader(*statement, session, table);
1787
 
      session.setStatementMessage(statement);
1788
 
      
1789
 
      /* Set the transaction ID to match the previous messages */
1790
 
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1791
 
      transaction->set_segment_id(seg_id + 1);
1792
 
      transaction->set_end_segment(true);
1793
 
    }
1794
 
    else
1795
 
    {
1796
 
      /*
1797
 
       * Continuation of the same statement. Carry forward the existing
1798
 
       * segment id.
1799
 
       */
1800
 
      const message::DeleteData &current_data= statement->delete_data();
1801
 
      *next_segment_id= current_data.segment_id();
1802
 
    }
1803
 
  }
1804
 
  
1805
 
  return *statement;
1806
 
}
1807
 
 
1808
 
void TransactionServices::setDeleteHeader(message::Statement &statement,
1809
 
                                          Session::const_reference session,
1810
 
                                          Table &table)
1811
 
{
1812
 
  initStatementMessage(statement, message::Statement::DELETE, session);
1813
 
 
1814
 
  /* 
1815
 
   * Now we construct the specialized DeleteHeader message inside
1816
 
   * the generalized message::Statement container...
1817
 
   */
1818
 
  message::DeleteHeader *header= statement.mutable_delete_header();
1819
 
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1820
 
 
1821
 
  string schema_name;
1822
 
  (void) table.getShare()->getSchemaName(schema_name);
1823
 
  string table_name;
1824
 
  (void) table.getShare()->getTableName(table_name);
1825
 
 
1826
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1827
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1828
 
 
1829
 
  Field *current_field;
1830
 
  Field **table_fields= table.getFields();
1831
 
 
1832
 
  message::FieldMetadata *field_metadata;
1833
 
 
1834
 
  while ((current_field= *table_fields++) != NULL) 
1835
 
  {
1836
 
    /* 
1837
 
     * Add the WHERE clause values now...for now, this means the
1838
 
     * primary key field value.  Replication only supports tables
1839
 
     * with a primary key.
1840
 
     */
1841
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1842
 
    {
1843
 
      field_metadata= header->add_key_field_metadata();
1844
 
      field_metadata->set_name(current_field->field_name);
1845
 
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1846
 
    }
1847
 
  }
1848
 
}
1849
 
 
1850
 
void TransactionServices::deleteRecord(Session::reference session,
1851
 
                                       Table &table,
1852
 
                                       bool use_update_record)
1853
 
{
1854
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1855
 
  if (! replication_services.isActive())
1856
 
    return;
1857
 
 
1858
 
  if (not table.getShare()->replicate())
1859
 
    return;
1860
 
 
1861
 
  uint32_t next_segment_id= 1;
1862
 
  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1863
 
 
1864
 
  message::DeleteData *data= statement.mutable_delete_data();
1865
 
  data->set_segment_id(next_segment_id);
1866
 
  data->set_end_segment(true);
1867
 
  message::DeleteRecord *record= data->add_record();
1868
 
 
1869
 
  Field *current_field;
1870
 
  Field **table_fields= table.getFields();
1871
 
  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1872
 
  string_value->set_charset(system_charset_info);
1873
 
 
1874
 
  while ((current_field= *table_fields++) != NULL) 
1875
 
  {
1876
 
    /* 
1877
 
     * Add the WHERE clause values now...for now, this means the
1878
 
     * primary key field value.  Replication only supports tables
1879
 
     * with a primary key.
1880
 
     */
1881
 
    if (table.getShare()->fieldInPrimaryKey(current_field))
1882
 
    {
1883
 
      if (use_update_record)
1884
 
      {
1885
 
        /*
1886
 
         * Temporarily point to the update record to get its value.
1887
 
         * This is pretty much a hack in order to get the PK value from
1888
 
         * the update record rather than the insert record. Field::val_str()
1889
 
         * should not change anything in Field::ptr, so this should be safe.
1890
 
         * We are careful not to change anything in old_ptr.
1891
 
         */
1892
 
        const unsigned char *old_ptr= current_field->ptr;
1893
 
        current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1894
 
        string_value= current_field->val_str_internal(string_value);
1895
 
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
1896
 
      }
1897
 
      else
1898
 
      {
1899
 
        string_value= current_field->val_str_internal(string_value);
1900
 
        /**
1901
 
         * @TODO Store optional old record value in the before data member
1902
 
         */
1903
 
      }
1904
 
      record->add_key_value(string_value->c_ptr(), string_value->length());
1905
 
      string_value->free();
1906
 
    }
1907
 
  }
1908
 
}
1909
 
 
1910
 
void TransactionServices::createTable(Session::reference session,
1911
 
                                      const message::Table &table)
1912
 
{
1913
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1914
 
  if (! replication_services.isActive())
1915
 
    return;
1916
 
 
1917
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
1918
 
    return;
1919
 
 
1920
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1921
 
  message::Statement *statement= transaction->add_statement();
1922
 
 
1923
 
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1924
 
 
1925
 
  /* 
1926
 
   * Construct the specialized CreateTableStatement message and attach
1927
 
   * it to the generic Statement message
1928
 
   */
1929
 
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1930
 
  message::Table *new_table_message= create_table_statement->mutable_table();
1931
 
  *new_table_message= table;
1932
 
 
1933
 
  finalizeStatementMessage(*statement, session);
1934
 
 
1935
 
  finalizeTransactionMessage(*transaction, session);
1936
 
  
1937
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1938
 
 
1939
 
  cleanupTransactionMessage(transaction, session);
1940
 
 
1941
 
}
1942
 
 
1943
 
void TransactionServices::createSchema(Session::reference session,
1944
 
                                       const message::Schema &schema)
1945
 
{
1946
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1947
 
  if (! replication_services.isActive())
1948
 
    return;
1949
 
 
1950
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1951
 
    return;
1952
 
 
1953
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1954
 
  message::Statement *statement= transaction->add_statement();
1955
 
 
1956
 
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1957
 
 
1958
 
  /* 
1959
 
   * Construct the specialized CreateSchemaStatement message and attach
1960
 
   * it to the generic Statement message
1961
 
   */
1962
 
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1963
 
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1964
 
  *new_schema_message= schema;
1965
 
 
1966
 
  finalizeStatementMessage(*statement, session);
1967
 
 
1968
 
  finalizeTransactionMessage(*transaction, session);
1969
 
  
1970
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
1971
 
 
1972
 
  cleanupTransactionMessage(transaction, session);
1973
 
 
1974
 
}
1975
 
 
1976
 
void TransactionServices::dropSchema(Session::reference session,
1977
 
                                     identifier::Schema::const_reference identifier,
1978
 
                                     message::schema::const_reference schema)
1979
 
{
1980
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1981
 
  if (! replication_services.isActive())
1982
 
    return;
1983
 
 
1984
 
  if (schema.has_replication_options() and schema.replication_options().has_dont_replicate() and schema.replication_options().dont_replicate())
1985
 
    return;
1986
 
 
1987
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
1988
 
  message::Statement *statement= transaction->add_statement();
1989
 
 
1990
 
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1991
 
 
1992
 
  /* 
1993
 
   * Construct the specialized DropSchemaStatement message and attach
1994
 
   * it to the generic Statement message
1995
 
   */
1996
 
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1997
 
 
1998
 
  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1999
 
 
2000
 
  finalizeStatementMessage(*statement, session);
2001
 
 
2002
 
  finalizeTransactionMessage(*transaction, session);
2003
 
  
2004
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2005
 
 
2006
 
  cleanupTransactionMessage(transaction, session);
2007
 
}
2008
 
 
2009
 
void TransactionServices::alterSchema(Session::reference session,
2010
 
                                      const message::Schema &old_schema,
2011
 
                                      const message::Schema &new_schema)
2012
 
{
2013
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2014
 
  if (! replication_services.isActive())
2015
 
    return;
2016
 
 
2017
 
  if (old_schema.has_replication_options() and old_schema.replication_options().has_dont_replicate() and old_schema.replication_options().dont_replicate())
2018
 
    return;
2019
 
 
2020
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2021
 
  message::Statement *statement= transaction->add_statement();
2022
 
 
2023
 
  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
2024
 
 
2025
 
  /* 
2026
 
   * Construct the specialized AlterSchemaStatement message and attach
2027
 
   * it to the generic Statement message
2028
 
   */
2029
 
  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
2030
 
 
2031
 
  message::Schema *before= alter_schema_statement->mutable_before();
2032
 
  message::Schema *after= alter_schema_statement->mutable_after();
2033
 
 
2034
 
  *before= old_schema;
2035
 
  *after= new_schema;
2036
 
 
2037
 
  finalizeStatementMessage(*statement, session);
2038
 
 
2039
 
  finalizeTransactionMessage(*transaction, session);
2040
 
  
2041
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2042
 
 
2043
 
  cleanupTransactionMessage(transaction, session);
2044
 
}
2045
 
 
2046
 
void TransactionServices::dropTable(Session::reference session,
2047
 
                                    identifier::Table::const_reference identifier,
2048
 
                                    message::table::const_reference table,
2049
 
                                    bool if_exists)
2050
 
{
2051
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2052
 
  if (! replication_services.isActive())
2053
 
    return;
2054
 
 
2055
 
  if (table.has_options() and table.options().has_dont_replicate() and table.options().dont_replicate())
2056
 
    return;
2057
 
 
2058
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2059
 
  message::Statement *statement= transaction->add_statement();
2060
 
 
2061
 
  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
2062
 
 
2063
 
  /* 
2064
 
   * Construct the specialized DropTableStatement message and attach
2065
 
   * it to the generic Statement message
2066
 
   */
2067
 
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
2068
 
 
2069
 
  drop_table_statement->set_if_exists_clause(if_exists);
2070
 
 
2071
 
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
2072
 
 
2073
 
  table_metadata->set_schema_name(identifier.getSchemaName());
2074
 
  table_metadata->set_table_name(identifier.getTableName());
2075
 
 
2076
 
  finalizeStatementMessage(*statement, session);
2077
 
 
2078
 
  finalizeTransactionMessage(*transaction, session);
2079
 
  
2080
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2081
 
 
2082
 
  cleanupTransactionMessage(transaction, session);
2083
 
}
2084
 
 
2085
 
void TransactionServices::truncateTable(Session::reference session,
2086
 
                                        Table &table)
2087
 
{
2088
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2089
 
  if (! replication_services.isActive())
2090
 
    return;
2091
 
 
2092
 
  if (not table.getShare()->replicate())
2093
 
    return;
2094
 
 
2095
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2096
 
  message::Statement *statement= transaction->add_statement();
2097
 
 
2098
 
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2099
 
 
2100
 
  /* 
2101
 
   * Construct the specialized TruncateTableStatement message and attach
2102
 
   * it to the generic Statement message
2103
 
   */
2104
 
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2105
 
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2106
 
 
2107
 
  string schema_name;
2108
 
  (void) table.getShare()->getSchemaName(schema_name);
2109
 
 
2110
 
  string table_name;
2111
 
  (void) table.getShare()->getTableName(table_name);
2112
 
 
2113
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2114
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
2115
 
 
2116
 
  finalizeStatementMessage(*statement, session);
2117
 
 
2118
 
  finalizeTransactionMessage(*transaction, session);
2119
 
  
2120
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2121
 
 
2122
 
  cleanupTransactionMessage(transaction, session);
2123
 
}
2124
 
 
2125
 
void TransactionServices::rawStatement(Session::reference session,
2126
 
                                       const string &query)
2127
 
{
2128
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2129
 
  if (! replication_services.isActive())
2130
 
    return;
2131
 
 
2132
 
  message::Transaction *transaction= getActiveTransactionMessage(session);
2133
 
  message::Statement *statement= transaction->add_statement();
2134
 
 
2135
 
  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2136
 
  statement->set_sql(query);
2137
 
  finalizeStatementMessage(*statement, session);
2138
 
 
2139
 
  finalizeTransactionMessage(*transaction, session);
2140
 
  
2141
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
2142
 
 
2143
 
  cleanupTransactionMessage(transaction, session);
2144
 
}
2145
 
 
2146
 
int TransactionServices::sendEvent(Session::reference session,
2147
 
                                   const message::Event &event)
2148
 
{
2149
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2150
 
  if (! replication_services.isActive())
2151
 
    return 0;
2152
 
 
2153
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
2154
 
 
2155
 
  // set server id, start timestamp
2156
 
  initTransactionMessage(*transaction, session, true);
2157
 
 
2158
 
  // set end timestamp
2159
 
  finalizeTransactionMessage(*transaction, session);
2160
 
 
2161
 
  message::Event *trx_event= transaction->mutable_event();
2162
 
 
2163
 
  trx_event->CopyFrom(event);
2164
 
 
2165
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2166
 
 
2167
 
  delete transaction;
2168
 
 
2169
 
  return static_cast<int>(result);
2170
 
}
2171
 
 
2172
 
bool TransactionServices::sendStartupEvent(Session::reference session)
2173
 
{
2174
 
  message::Event event;
2175
 
  event.set_type(message::Event::STARTUP);
2176
 
  if (sendEvent(session, event) != 0)
2177
 
    return false;
2178
 
  return true;
2179
 
}
2180
 
 
2181
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
2182
 
{
2183
 
  message::Event event;
2184
 
  event.set_type(message::Event::SHUTDOWN);
2185
 
  if (sendEvent(session, event) != 0)
2186
 
    return false;
2187
 
  return true;
2188
 
}
2189
 
 
2190
 
} /* namespace drizzled */