~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

Merged trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008 Sun Microsystems
 
5
 *  Copyright (c) Jay Pipes <jaypipes@gmail.com>
5
6
 *
6
7
 *  This program is free software; you can redistribute it and/or modify
7
8
 *  it under the terms of the GNU General Public License as published by
31
32
#include "drizzled/sql_base.h"
32
33
#include "drizzled/replication_services.h"
33
34
#include "drizzled/transaction_services.h"
 
35
#include "drizzled/transaction_context.h"
 
36
#include "drizzled/resource_context.h"
34
37
#include "drizzled/lock.h"
35
38
#include "drizzled/item/int.h"
36
39
#include "drizzled/item/empty_string.h"
37
40
#include "drizzled/field/timestamp.h"
38
41
#include "drizzled/plugin/client.h"
 
42
#include "drizzled/plugin/xa_storage_engine.h"
39
43
#include "drizzled/internal/my_sys.h"
40
44
 
41
45
using namespace std;
42
46
 
 
47
#include <vector>
 
48
#include <algorithm>
 
49
#include <functional>
 
50
 
43
51
namespace drizzled
44
52
{
45
53
 
143
151
 
144
152
  The server stores its transaction-related data in
145
153
  session->transaction. This structure has two members of type
146
 
  Session_TRANS. These members correspond to the statement and
 
154
  TransactionContext. These members correspond to the statement and
147
155
  normal transactions respectively:
148
156
 
149
157
  - session->transaction.stmt contains a list of engines
238
246
  session->transaction.all list is cleared.
239
247
 
240
248
  When a connection is closed, the current normal transaction, if
241
 
  any, is rolled back.
 
249
  any is currently active, is rolled back.
242
250
 
243
251
  Roles and responsibilities
244
252
  --------------------------
245
253
 
246
 
  The server has no way to know that an engine participates in
247
 
  the statement and a transaction has been started
248
 
  in it unless the engine says so. Thus, in order to be
249
 
  a part of a transaction, the engine must "register" itself.
250
 
  This is done by invoking trans_register_ha() server call.
251
 
  Normally the engine registers itself whenever Cursor::external_lock()
252
 
  is called. trans_register_ha() can be invoked many times: if
253
 
  an engine is already registered, the call does nothing.
254
 
  In case autocommit is not set, the engine must register itself
255
 
  twice -- both in the statement list and in the normal transaction
256
 
  list.
257
 
  In which list to register is a parameter of trans_register_ha().
258
 
 
259
 
  Note, that although the registration interface in itself is
260
 
  fairly clear, the current usage practice often leads to undesired
261
 
  effects. E.g. since a call to trans_register_ha() in most engines
262
 
  is embedded into implementation of Cursor::external_lock(), some
263
 
  DDL statements start a transaction (at least from the server
264
 
  point of view) even though they are not expected to. E.g.
265
 
  CREATE TABLE does not start a transaction, since
266
 
  Cursor::external_lock() is never called during CREATE TABLE. But
267
 
  CREATE TABLE ... SELECT does, since Cursor::external_lock() is
268
 
  called for the table that is being selected from. This has no
269
 
  practical effects currently, but must be kept in mind
270
 
  nevertheless.
271
 
 
272
 
  Once an engine is registered, the server will do the rest
273
 
  of the work.
 
254
  Beginning of SQL Statement (and Statement Transaction)
 
255
  ------------------------------------------------------
 
256
 
 
257
  At the start of each SQL statement, for each storage engine
 
258
  <strong>that is involved in the SQL statement</strong>, the kernel 
 
259
  calls the engine's plugin::StoragEngine::startStatement() method.  If the
 
260
  engine needs to track some data for the statement, it should use
 
261
  this method invocation to initialize this data.  This is the
 
262
  beginning of what is called the "statement transaction".
 
263
 
 
264
  <strong>For transaction storage engines (those storage engines
 
265
  that inherit from plugin::TransactionalStorageEngine)</strong>, the
 
266
  kernel automatically determines if the start of the SQL statement 
 
267
  transaction should <em>also</em> begin the normal SQL transaction.
 
268
  This occurs when the connection is in NOT in autocommit mode. If
 
269
  the kernel detects this, then the kernel automatically starts the
 
270
  normal transaction w/ plugin::TransactionalStorageEngine::startTransaction()
 
271
  method and then calls plugin::StorageEngine::startStatement()
 
272
  afterwards.
 
273
 
 
274
  Beginning of an SQL "Normal" Transaction
 
275
  ----------------------------------------
 
276
 
 
277
  As noted above, a "normal SQL transaction" may be started when
 
278
  an SQL statement is started in a connection and the connection is
 
279
  NOT in AUTOCOMMIT mode.  This is automatically done by the kernel.
 
280
 
 
281
  In addition, when a user executes a START TRANSACTION or
 
282
  BEGIN WORK statement in a connection, the kernel explicitly
 
283
  calls each transactional storage engine's startTransaction() method.
 
284
 
 
285
  Ending of an SQL Statement (and Statement Transaction)
 
286
  ------------------------------------------------------
 
287
 
 
288
  At the end of each SQL statement, for each of the aforementioned
 
289
  involved storage engines, the kernel calls the engine's
 
290
  plugin::StorageEngine::endStatement() method.  If the engine
 
291
  has initialized or modified some internal data about the
 
292
  statement transaction, it should use this method to reset or destroy
 
293
  this data appropriately.
 
294
 
 
295
  Ending of an SQL "Normal" Transaction
 
296
  -------------------------------------
 
297
  
 
298
  The end of a normal transaction is either a ROLLBACK or a COMMIT, 
 
299
  depending on the success or failure of the statement transaction(s) 
 
300
  it encloses.
 
301
  
 
302
  The end of a "normal transaction" occurs when any of the following
 
303
  occurs:
 
304
 
 
305
  1) If a statement transaction has completed and AUTOCOMMIT is ON,
 
306
     then the normal transaction which encloses the statement
 
307
     transaction ends
 
308
  2) If a COMMIT or ROLLBACK statement occurs on the connection
 
309
  3) Just before a DDL operation occurs, the kernel will implicitly
 
310
     commit the active normal transaction
 
311
  
 
312
  Transactions and Non-transactional Storage Engines
 
313
  --------------------------------------------------
 
314
  
 
315
  For non-transactional engines, this call can be safely ignored, and
 
316
  the kernel tracks whether a non-transactional engine has changed
 
317
  any data state, and warns the user appropriately if a transaction
 
318
  (statement or normal) is rolled back after such non-transactional
 
319
  data changes have been made.
 
320
 
 
321
  XA Two-phase Commit Protocol
 
322
  ----------------------------
274
323
 
275
324
  During statement execution, whenever any of data-modifying
276
325
  PSEA API methods is used, e.g. Cursor::write_row() or
298
347
  Additional notes on DDL and the normal transaction.
299
348
  ---------------------------------------------------
300
349
 
301
 
  DDLs and operations with non-transactional engines
302
 
  do not "register" in session->transaction lists, and thus do not
303
 
  modify the transaction state. Besides, each DDL in
304
 
  MySQL is prefixed with an implicit normal transaction commit
305
 
  (a call to Session::endActiveTransaction()), and thus leaves nothing
306
 
  to modify.
307
 
  However, as it has been pointed out with CREATE TABLE .. SELECT,
308
 
  some DDL statements can start a *new* transaction.
 
350
  CREATE TABLE .. SELECT can start a *new* normal transaction
 
351
  because of the fact that SELECTs on a transactional storage
 
352
  engine participate in the normal SQL transaction (due to
 
353
  isolation level issues and consistent read views).
309
354
 
310
355
  Behaviour of the server in this case is currently badly
311
356
  defined.
 
357
 
312
358
  DDL statements use a form of "semantic" logging
313
359
  to maintain atomicity: if CREATE TABLE .. SELECT failed,
314
360
  the newly created table is deleted.
 
361
 
315
362
  In addition, some DDL statements issue interim transaction
316
 
  commits: e.g. ALTER Table issues a commit after data is copied
 
363
  commits: e.g. ALTER TABLE issues a COMMIT after data is copied
317
364
  from the original table to the internal temporary table. Other
318
365
  statements, e.g. CREATE TABLE ... SELECT do not always commit
319
366
  after itself.
 
367
 
320
368
  And finally there is a group of DDL statements such as
321
 
  RENAME/DROP Table that doesn't start a new transaction
 
369
  RENAME/DROP TABLE that doesn't start a new transaction
322
370
  and doesn't commit.
323
371
 
324
 
  This diversity makes it hard to say what will happen if
325
 
  by chance a stored function is invoked during a DDL --
326
 
  whether any modifications it makes will be committed or not
327
 
  is not clear. Fortunately, SQL grammar of few DDLs allows
328
 
  invocation of a stored function.
329
 
 
330
372
  A consistent behaviour is perhaps to always commit the normal
331
373
  transaction after all DDLs, just like the statement transaction
332
374
  is always committed at the end of all statements.
333
375
*/
334
 
 
335
 
/**
336
 
  Register a storage engine for a transaction.
337
 
 
338
 
  Every storage engine MUST call this function when it starts
339
 
  a transaction or a statement (that is it must be called both for the
340
 
  "beginning of transaction" and "beginning of statement").
341
 
  Only storage engines registered for the transaction/statement
342
 
  will know when to commit/rollback it.
343
 
 
344
 
  @note
345
 
    trans_register_ha is idempotent - storage engine may register many
346
 
    times per transaction.
347
 
 
348
 
*/
349
 
void TransactionServices::trans_register_ha(Session *session, bool all, plugin::StorageEngine *engine)
 
376
void TransactionServices::registerResourceForStatement(Session *session,
 
377
                                                       plugin::TransactionalStorageEngine *engine)
350
378
{
351
 
  Session_TRANS *trans;
352
 
  Ha_trx_info *ha_info;
353
 
 
354
 
  if (all)
 
379
  if (session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
355
380
  {
356
 
    trans= &session->transaction.all;
357
 
    session->server_status|= SERVER_STATUS_IN_TRANS;
 
381
    /* 
 
382
     * Now we automatically register this resource manager for the
 
383
     * normal transaction.  This is fine because a statement
 
384
     * transaction registration should always enlist the resource
 
385
     * in the normal transaction which contains the statement
 
386
     * transaction.
 
387
     */
 
388
    registerResourceForTransaction(session, engine);
358
389
  }
359
 
  else
360
 
    trans= &session->transaction.stmt;
361
 
 
362
 
  ha_info= session->getEngineInfo(engine, all ? 1 : 0);
363
 
 
364
 
  if (ha_info->is_started())
365
 
    return; /* already registered, return */
366
 
 
367
 
  ha_info->register_ha(trans, engine);
368
 
 
369
 
  trans->no_2pc|= not engine->has_2pc();
 
390
 
 
391
  TransactionContext *trans= &session->transaction.stmt;
 
392
  ResourceContext *resource_context= session->getResourceContext(engine, 0);
 
393
 
 
394
  if (resource_context->isStarted())
 
395
    return; /* already registered, return */
 
396
 
 
397
  resource_context->setResource(engine);
 
398
  trans->registerResource(resource_context);
 
399
 
 
400
  trans->no_2pc|= not engine->hasTwoPhaseCommit();
 
401
}
 
402
 
 
403
void TransactionServices::registerResourceForTransaction(Session *session,
 
404
                                                         plugin::TransactionalStorageEngine *engine)
 
405
{
 
406
  TransactionContext *trans= &session->transaction.all;
 
407
  ResourceContext *resource_context= session->getResourceContext(engine, 1);
 
408
 
 
409
  if (resource_context->isStarted())
 
410
    return; /* already registered, return */
 
411
 
 
412
  session->server_status|= SERVER_STATUS_IN_TRANS;
 
413
 
 
414
  resource_context->setResource(engine);
 
415
  trans->registerResource(resource_context);
 
416
 
 
417
  trans->no_2pc|= not engine->hasTwoPhaseCommit();
 
418
 
370
419
  if (session->transaction.xid_state.xid.is_null())
371
420
    session->transaction.xid_state.xid.set(session->getQueryId());
 
421
 
 
422
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
 
423
  if (! session->getResourceContext(engine, 0)->isStarted())
 
424
    registerResourceForStatement(session, engine);
372
425
}
373
426
 
374
427
/**
386
439
*/
387
440
static
388
441
bool
389
 
ha_check_and_coalesce_trx_read_only(Session *session, Ha_trx_info *ha_list,
390
 
                                    bool all)
 
442
ha_check_and_coalesce_trx_read_only(Session *session,
 
443
                                    TransactionContext::ResourceContexts &resource_contexts,
 
444
                                    bool normal_transaction)
391
445
{
392
446
  /* The number of storage engines that have actual changes. */
393
 
  unsigned rw_ha_count= 0;
394
 
  Ha_trx_info *ha_info;
 
447
  unsigned num_resources_modified_data= 0;
 
448
  ResourceContext *resource_context;
395
449
 
396
 
  for (ha_info= ha_list; ha_info; ha_info= ha_info->next())
 
450
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
451
       it != resource_contexts.end();
 
452
       ++it)
397
453
  {
398
 
    if (ha_info->is_trx_read_write())
399
 
      ++rw_ha_count;
 
454
    resource_context= *it;
 
455
    if (resource_context->hasModifiedData())
 
456
      ++num_resources_modified_data;
400
457
 
401
 
    if (! all)
 
458
    if (! normal_transaction)
402
459
    {
403
 
      Ha_trx_info *ha_info_all= session->getEngineInfo(ha_info->engine(), 1);
404
 
      assert(ha_info != ha_info_all);
 
460
      ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getResource(), true);
 
461
      assert(resource_context != resource_context_normal);
405
462
      /*
406
463
        Merge read-only/read-write information about statement
407
464
        transaction to its enclosing normal transaction. Do this
408
465
        only if in a real transaction -- that is, if we know
409
 
        that ha_info_all is registered in session->transaction.all.
 
466
        that resource_context_all is registered in session->transaction.all.
410
467
        Since otherwise we only clutter the normal transaction flags.
411
468
      */
412
 
      if (ha_info_all->is_started()) /* false if autocommit. */
413
 
        ha_info_all->coalesce_trx_with(ha_info);
 
469
      if (resource_context_normal->isStarted()) /* false if autocommit. */
 
470
        resource_context_normal->coalesceWith(resource_context);
414
471
    }
415
 
    else if (rw_ha_count > 1)
 
472
    else if (num_resources_modified_data > 1)
416
473
    {
417
474
      /*
418
475
        It is a normal transaction, so we don't need to merge read/write
422
479
      break;
423
480
    }
424
481
  }
425
 
  return rw_ha_count > 1;
 
482
  return num_resources_modified_data > 1;
426
483
}
427
484
 
428
485
 
440
497
    stored functions or triggers. So we simply do nothing now.
441
498
    TODO: This should be fixed in later ( >= 5.1) releases.
442
499
*/
443
 
int TransactionServices::ha_commit_trans(Session *session, bool all)
 
500
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
444
501
{
445
502
  int error= 0, cookie= 0;
446
503
  /*
447
504
    'all' means that this is either an explicit commit issued by
448
505
    user, or an implicit commit issued by a DDL.
449
506
  */
450
 
  Session_TRANS *trans= all ? &session->transaction.all : &session->transaction.stmt;
451
 
  bool is_real_trans= all || session->transaction.all.ha_list == 0;
452
 
  Ha_trx_info *ha_info= trans->ha_list;
 
507
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
508
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
509
 
 
510
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
453
511
 
454
512
  /*
455
513
    We must not commit the normal transaction if a statement
457
515
    flags will not get propagated to its normal transaction's
458
516
    counterpart.
459
517
  */
460
 
  assert(session->transaction.stmt.ha_list == NULL ||
 
518
  assert(session->transaction.stmt.getResourceContexts().empty() ||
461
519
              trans == &session->transaction.stmt);
462
520
 
463
 
  if (ha_info)
 
521
  if (resource_contexts.empty() == false)
464
522
  {
465
523
    bool must_2pc;
466
524
 
467
525
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
468
526
    {
469
 
      ha_rollback_trans(session, all);
 
527
      ha_rollback_trans(session, normal_transaction);
470
528
      return 1;
471
529
    }
472
530
 
473
 
    must_2pc= ha_check_and_coalesce_trx_read_only(session, ha_info, all);
 
531
    must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
474
532
 
475
 
    if (!trans->no_2pc && must_2pc)
 
533
    if (! trans->no_2pc && must_2pc)
476
534
    {
477
 
      for (; ha_info && !error; ha_info= ha_info->next())
 
535
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
536
           it != resource_contexts.end() && ! error;
 
537
           ++it)
478
538
      {
 
539
        ResourceContext *resource_context= *it;
479
540
        int err;
480
 
        plugin::StorageEngine *engine= ha_info->engine();
481
541
        /*
482
542
          Do not call two-phase commit if this particular
483
543
          transaction is read-only. This allows for simpler
484
544
          implementation in engines that are always read-only.
485
545
        */
486
 
        if (! ha_info->is_trx_read_write())
 
546
        if (! resource_context->hasModifiedData())
487
547
          continue;
488
 
        /*
489
 
          Sic: we know that prepare() is not NULL since otherwise
490
 
          trans->no_2pc would have been set.
491
 
        */
492
 
        if ((err= engine->prepare(session, all)))
 
548
 
 
549
        plugin::StorageEngine *engine= resource_context->getResource();
 
550
        if ((err= static_cast<plugin::XaStorageEngine *>(engine)->prepare(session, normal_transaction)))
493
551
        {
494
552
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
495
553
          error= 1;
498
556
      }
499
557
      if (error)
500
558
      {
501
 
        ha_rollback_trans(session, all);
 
559
        ha_rollback_trans(session, normal_transaction);
502
560
        error= 1;
503
561
        goto end;
504
562
      }
505
563
    }
506
 
    error=ha_commit_one_phase(session, all) ? (cookie ? 2 : 1) : 0;
 
564
    error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
507
565
end:
508
566
    if (is_real_trans)
509
567
      start_waiting_global_read_lock(session);
515
573
  @note
516
574
  This function does not care about global read lock. A caller should.
517
575
*/
518
 
int TransactionServices::ha_commit_one_phase(Session *session, bool all)
 
576
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
519
577
{
520
578
  int error=0;
521
 
  Session_TRANS *trans=all ? &session->transaction.all : &session->transaction.stmt;
522
 
  bool is_real_trans=all || session->transaction.all.ha_list == 0;
523
 
  Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
524
 
  if (ha_info)
 
579
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
580
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
581
 
 
582
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
 
583
 
 
584
  if (resource_contexts.empty() == false)
525
585
  {
526
 
    for (; ha_info; ha_info= ha_info_next)
 
586
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
587
         it != resource_contexts.end();
 
588
         ++it)
527
589
    {
528
590
      int err;
529
 
      plugin::StorageEngine *engine= ha_info->engine();
530
 
      if ((err= engine->commit(session, all)))
 
591
      ResourceContext *resource_context= *it;
 
592
 
 
593
      plugin::TransactionalStorageEngine *engine= static_cast<plugin::TransactionalStorageEngine *>(resource_context->getResource());
 
594
      if ((err= engine->commit(session, normal_transaction)))
531
595
      {
532
596
        my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
533
 
        error=1;
 
597
        error= 1;
534
598
      }
535
599
      status_var_increment(session->status_var.ha_commit_count);
536
 
      ha_info_next= ha_info->next();
537
 
      ha_info->reset(); /* keep it conveniently zero-filled */
 
600
      resource_context->reset(); /* keep it conveniently zero-filled */
538
601
    }
539
 
    trans->ha_list= 0;
540
 
    trans->no_2pc=0;
 
602
 
541
603
    if (is_real_trans)
542
604
      session->transaction.xid_state.xid.null();
543
 
    if (all)
 
605
 
 
606
    if (normal_transaction)
544
607
    {
545
 
      session->variables.tx_isolation=session->session_tx_isolation;
 
608
      session->variables.tx_isolation= session->session_tx_isolation;
546
609
      session->transaction.cleanup();
547
610
    }
548
611
  }
 
612
  trans->reset();
549
613
  if (error == 0)
550
614
  {
551
615
    if (is_real_trans)
552
616
    {
553
617
      /* 
554
 
        * We commit the normal transaction by finalizing the transaction message
555
 
        * and propogating the message to all registered replicators.
556
 
        */
 
618
       * We commit the normal transaction by finalizing the transaction message
 
619
       * and propogating the message to all registered replicators.
 
620
       */
557
621
      ReplicationServices &replication_services= ReplicationServices::singleton();
558
622
      replication_services.commitTransaction(session);
559
623
    }
561
625
  return error;
562
626
}
563
627
 
564
 
 
565
 
int TransactionServices::ha_rollback_trans(Session *session, bool all)
 
628
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
566
629
{
567
 
  int error=0;
568
 
  Session_TRANS *trans=all ? &session->transaction.all : &session->transaction.stmt;
569
 
  Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
570
 
  bool is_real_trans=all || session->transaction.all.ha_list == 0;
 
630
  int error= 0;
 
631
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
 
632
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
633
 
 
634
  bool is_real_trans= normal_transaction || session->transaction.all.getResourceContexts().empty();
571
635
 
572
636
  /*
573
637
    We must not rollback the normal transaction if a statement
574
638
    transaction is pending.
575
639
  */
576
 
  assert(session->transaction.stmt.ha_list == NULL ||
 
640
  assert(session->transaction.stmt.getResourceContexts().empty() ||
577
641
              trans == &session->transaction.stmt);
578
642
 
579
 
  if (ha_info)
 
643
  if (resource_contexts.empty() == false)
580
644
  {
581
 
    for (; ha_info; ha_info= ha_info_next)
 
645
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
646
         it != resource_contexts.end();
 
647
         ++it)
582
648
    {
583
649
      int err;
584
 
      plugin::StorageEngine *engine= ha_info->engine();
585
 
      if ((err= engine->rollback(session, all)))
586
 
      { // cannot happen
 
650
      ResourceContext *resource_context= *it;
 
651
 
 
652
      plugin::TransactionalStorageEngine *engine= static_cast<plugin::TransactionalStorageEngine *>(resource_context->getResource());
 
653
      if ((err= engine->rollback(session, normal_transaction)))
 
654
      {
587
655
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
588
656
        error=1;
589
657
      }
590
658
      status_var_increment(session->status_var.ha_rollback_count);
591
 
      ha_info_next= ha_info->next();
592
 
      ha_info->reset(); /* keep it conveniently zero-filled */
 
659
      resource_context->reset(); /* keep it conveniently zero-filled */
593
660
    }
594
 
    trans->ha_list= 0;
595
 
    trans->no_2pc=0;
596
661
    
597
662
    /* 
598
663
     * We need to signal the ROLLBACK to ReplicationServices here
606
671
 
607
672
    if (is_real_trans)
608
673
      session->transaction.xid_state.xid.null();
609
 
    if (all)
 
674
    if (normal_transaction)
610
675
    {
611
676
      session->variables.tx_isolation=session->session_tx_isolation;
612
677
      session->transaction.cleanup();
613
678
    }
614
679
  }
615
 
  if (all)
 
680
  if (normal_transaction)
616
681
    session->transaction_rollback_request= false;
617
682
 
618
683
  /*
619
 
    If a non-transactional table was updated, warn; don't warn if this is a
620
 
    slave thread (because when a slave thread executes a ROLLBACK, it has
621
 
    been read from the binary log, so it's 100% sure and normal to produce
622
 
    error ER_WARNING_NOT_COMPLETE_ROLLBACK. If we sent the warning to the
623
 
    slave SQL thread, it would not stop the thread but just be printed in
624
 
    the error log; but we don't want users to wonder why they have this
625
 
    message in the error log, so we don't send it.
626
 
  */
627
 
  if (is_real_trans && session->transaction.all.modified_non_trans_table && session->killed != Session::KILL_CONNECTION)
 
684
   * If a non-transactional table was updated, warn the user
 
685
   */
 
686
  if (is_real_trans &&
 
687
      session->transaction.all.hasModifiedNonTransData() &&
 
688
      session->killed != Session::KILL_CONNECTION)
 
689
  {
628
690
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
629
691
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
630
692
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
 
693
  }
 
694
  trans->reset();
631
695
  return error;
632
696
}
633
697
 
644
708
*/
645
709
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
646
710
{
647
 
  if (session->transaction.stmt.ha_list)
 
711
  if (session->transaction.stmt.getResourceContexts().empty() == false)
648
712
  {
649
 
    if (!error)
 
713
    if (! error)
650
714
    {
651
715
      if (ha_commit_trans(session, false))
652
716
        error= 1;
658
722
        (void) ha_rollback_trans(session, true);
659
723
    }
660
724
 
661
 
    session->variables.tx_isolation=session->session_tx_isolation;
 
725
    session->variables.tx_isolation= session->session_tx_isolation;
662
726
  }
663
 
 
664
727
  return error;
665
728
}
666
729
 
709
772
  return 0;
710
773
}
711
774
 
 
775
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
 
776
{
 
777
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
 
778
  {
 
779
    return lhs->getResource()->getSlot() < rhs->getResource()->getSlot();
 
780
  }
 
781
};
712
782
 
713
783
int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
714
784
{
715
785
  int error= 0;
716
 
  Session_TRANS *trans= &session->transaction.all;
717
 
  Ha_trx_info *ha_info, *ha_info_next;
 
786
  TransactionContext *trans= &session->transaction.all;
 
787
  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
 
788
  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
718
789
 
719
 
  trans->no_2pc=0;
 
790
  trans->no_2pc= false;
720
791
  /*
721
792
    rolling back to savepoint in all storage engines that were part of the
722
793
    transaction when the savepoint was set
723
794
  */
724
 
  for (ha_info= sv.ha_list; ha_info; ha_info= ha_info->next())
 
795
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
 
796
       it != sv_resource_contexts.end();
 
797
       ++it)
725
798
  {
726
799
    int err;
727
 
    plugin::StorageEngine *engine= ha_info->engine();
728
 
    assert(engine);
729
 
    if ((err= engine->savepoint_rollback(session, sv)))
 
800
    ResourceContext *resource_context= *it;
 
801
    plugin::TransactionalStorageEngine *engine= static_cast<plugin::TransactionalStorageEngine *>(resource_context->getResource());
 
802
    assert(engine != NULL);
 
803
    if ((err= engine->rollbackToSavepoint(session, sv)))
730
804
    { // cannot happen
731
805
      my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
732
806
      error= 1;
733
807
    }
734
808
    status_var_increment(session->status_var.ha_savepoint_rollback_count);
735
 
    trans->no_2pc|= not engine->has_2pc();
 
809
    trans->no_2pc|= not engine->hasTwoPhaseCommit();
736
810
  }
737
811
  /*
738
812
    rolling back the transaction in all storage engines that were not part of
739
813
    the transaction when the savepoint was set
740
814
  */
741
 
  for (ha_info= trans->ha_list; ha_info != sv.ha_list;
742
 
       ha_info= ha_info_next)
743
815
  {
744
 
    int err;
745
 
    plugin::StorageEngine *engine= ha_info->engine();
746
 
    if ((err= engine->rollback(session, !(0))))
747
 
    { // cannot happen
748
 
      my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
749
 
      error= 1;
 
816
    TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
 
817
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
 
818
    TransactionContext::ResourceContexts set_difference_contexts;
 
819
 
 
820
    sort(sorted_tran_resource_contexts.begin(),
 
821
         sorted_tran_resource_contexts.end(),
 
822
         ResourceContextCompare());
 
823
    sort(sorted_sv_resource_contexts.begin(),
 
824
         sorted_sv_resource_contexts.end(),
 
825
         ResourceContextCompare());
 
826
    set_difference(sorted_tran_resource_contexts.begin(),
 
827
                   sorted_tran_resource_contexts.end(),
 
828
                   sorted_sv_resource_contexts.begin(),
 
829
                   sorted_sv_resource_contexts.end(),
 
830
                   set_difference_contexts.begin(),
 
831
                   ResourceContextCompare());
 
832
    /* 
 
833
     * set_difference_contexts now contains all resource contexts
 
834
     * which are in the transaction context but were NOT in the
 
835
     * savepoint's resource contexts.
 
836
     */
 
837
        
 
838
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
 
839
         it != set_difference_contexts.end();
 
840
         ++it)
 
841
    {
 
842
      ResourceContext *resource_context= *it;
 
843
      int err;
 
844
      plugin::TransactionalStorageEngine *engine= static_cast<plugin::TransactionalStorageEngine *>(resource_context->getResource());
 
845
      if ((err= engine->rollback(session, !(0))))
 
846
      { // cannot happen
 
847
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
 
848
        error= 1;
 
849
      }
 
850
      status_var_increment(session->status_var.ha_rollback_count);
 
851
      resource_context->reset(); /* keep it conveniently zero-filled */
750
852
    }
751
 
    status_var_increment(session->status_var.ha_rollback_count);
752
 
    ha_info_next= ha_info->next();
753
 
    ha_info->reset(); /* keep it conveniently zero-filled */
754
853
  }
755
 
  trans->ha_list= sv.ha_list;
 
854
  trans->setResourceContexts(sv_resource_contexts);
756
855
  return error;
757
856
}
758
857
 
765
864
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
766
865
{
767
866
  int error= 0;
768
 
  Session_TRANS *trans= &session->transaction.all;
769
 
  Ha_trx_info *ha_info= trans->ha_list;
770
 
  for (; ha_info; ha_info= ha_info->next())
 
867
  TransactionContext *trans= &session->transaction.all;
 
868
  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
869
 
 
870
  if (resource_contexts.empty() == false)
771
871
  {
772
 
    int err;
773
 
    plugin::StorageEngine *engine= ha_info->engine();
774
 
    assert(engine);
775
 
#ifdef NOT_IMPLEMENTED /*- TODO (examine this againt the original code base) */
776
 
    if (! engine->savepoint_set)
 
872
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
873
         it != resource_contexts.end();
 
874
         ++it)
777
875
    {
778
 
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "NamedSavepoint");
779
 
      error= 1;
780
 
      break;
781
 
    } 
782
 
#endif
783
 
    if ((err= engine->savepoint_set(session, sv)))
784
 
    { // cannot happen
785
 
      my_error(ER_GET_ERRNO, MYF(0), err);
786
 
      error= 1;
 
876
      ResourceContext *resource_context= *it;
 
877
      int err;
 
878
      plugin::TransactionalStorageEngine *engine= static_cast<plugin::TransactionalStorageEngine *>(resource_context->getResource());
 
879
      assert(engine);
 
880
      if ((err= engine->setSavepoint(session, sv)))
 
881
      { // cannot happen
 
882
        my_error(ER_GET_ERRNO, MYF(0), err);
 
883
        error= 1;
 
884
      }
 
885
      status_var_increment(session->status_var.ha_savepoint_count);
787
886
    }
788
 
    status_var_increment(session->status_var.ha_savepoint_count);
789
887
  }
790
888
  /*
791
 
    Remember the list of registered storage engines. All new
792
 
    engines are prepended to the beginning of the list.
 
889
    Remember the list of registered storage engines.
793
890
  */
794
 
  sv.ha_list= trans->ha_list;
 
891
  sv.setResourceContexts(resource_contexts);
795
892
  return error;
796
893
}
797
894
 
798
895
int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
799
896
{
800
897
  int error= 0;
801
 
  Ha_trx_info *ha_info= sv.ha_list;
802
 
 
803
 
  for (; ha_info; ha_info= ha_info->next())
 
898
 
 
899
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
 
900
 
 
901
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
902
       it != resource_contexts.end();
 
903
       ++it)
804
904
  {
805
905
    int err;
806
 
    plugin::StorageEngine *engine= ha_info->engine();
807
 
    /* Savepoint life time is enclosed into transaction life time. */
 
906
    ResourceContext *resource_context= *it;
 
907
    plugin::TransactionalStorageEngine *engine= static_cast<plugin::TransactionalStorageEngine *>(resource_context->getResource());
808
908
    assert(engine);
809
 
    if ((err= engine->savepoint_release(session, sv)))
 
909
    if ((err= engine->releaseSavepoint(session, sv)))
810
910
    { // cannot happen
811
911
      my_error(ER_GET_ERRNO, MYF(0), err);
812
912
      error= 1;