~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/handler.cc

  • Committer: Monty Taylor
  • Date: 2008-12-06 22:41:03 UTC
  • mto: (656.1.7 devel)
  • mto: This revision was merged to the branch mainline in revision 665.
  • Revision ID: monty@inaugust.com-20081206224103-jdouqwt9hb0f01y1
Moved non-working tests into broken suite for easier running of working tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
  Handler-calling-functions
24
24
*/
25
25
 
26
 
#include "drizzled/server_includes.h"
27
 
#include "mysys/hash.h"
28
 
#include "drizzled/error.h"
29
 
#include "drizzled/gettext.h"
30
 
#include "drizzled/data_home.h"
31
 
#include "drizzled/probes.h"
32
 
#include "drizzled/sql_parse.h"
33
 
#include "drizzled/cost_vect.h"
34
 
#include "drizzled/session.h"
35
 
#include "drizzled/sql_base.h"
36
 
#include "drizzled/replicator.h"
37
 
#include "drizzled/lock.h"
38
 
#include "drizzled/item/int.h"
39
 
#include "drizzled/item/empty_string.h"
40
 
#include "drizzled/unireg.h" // for mysql_frm_type
41
 
#include "drizzled/field/timestamp.h"
42
 
#include "drizzled/serialize/table.pb.h"
43
 
 
44
 
using namespace std;
 
26
#include <drizzled/server_includes.h>
 
27
#include <libdrizzle/libdrizzle.h>
 
28
#include <mysys/hash.h>
 
29
#include <drizzled/error.h>
 
30
#include <drizzled/gettext.h>
 
31
#include <drizzled/data_home.h>
 
32
#include <drizzled/probes.h>
 
33
#include <drizzled/sql_parse.h>
 
34
#include <drizzled/cost_vect.h>
 
35
#include CMATH_H
 
36
#include <drizzled/session.h>
 
37
#include <drizzled/sql_base.h>
 
38
 
 
39
#if defined(CMATH_NAMESPACE)
 
40
using namespace CMATH_NAMESPACE;
 
41
#endif
 
42
 
 
43
 
 
44
extern HASH open_cache;
45
45
 
46
46
KEY_CREATE_INFO default_key_create_info= { HA_KEY_ALG_UNDEF, 0, {NULL,0}, {NULL,0} };
47
47
 
48
 
/* number of entries in storage_engines[] */
 
48
/* number of entries in handlertons[] */
49
49
uint32_t total_ha= 0;
50
 
/* number of storage engines (from storage_engines[]) that support 2pc */
 
50
/* number of storage engines (from handlertons[]) that support 2pc */
51
51
uint32_t total_ha_2pc= 0;
52
52
/* size of savepoint storage area (see ha_init) */
53
53
uint32_t savepoint_alloc_size= 0;
63
63
TYPELIB tx_isolation_typelib= {array_elements(tx_isolation_names)-1,"",
64
64
                               tx_isolation_names, NULL};
65
65
 
 
66
static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
 
67
uint32_t known_extensions_id= 0;
 
68
 
66
69
 
67
70
/**
68
71
  Register handler error messages for use with my_error().
161
164
    binary log (which is considered a transaction-capable storage engine in
162
165
    counting total_ha)
163
166
  */
 
167
  opt_using_transactions= total_ha>(uint32_t)opt_bin_log;
164
168
  savepoint_alloc_size+= sizeof(SAVEPOINT);
165
169
  return(error);
166
170
}
180
184
  return(error);
181
185
}
182
186
 
183
 
 
 
187
static bool dropdb_handlerton(Session *unused1 __attribute__((unused)),
 
188
                              plugin_ref plugin,
 
189
                              void *path)
 
190
{
 
191
  handlerton *hton= plugin_data(plugin, handlerton *);
 
192
  if (hton->state == SHOW_OPTION_YES && hton->drop_database)
 
193
    hton->drop_database(hton, (char *)path);
 
194
  return false;
 
195
}
 
196
 
 
197
 
 
198
void ha_drop_database(char* path)
 
199
{
 
200
  plugin_foreach(NULL, dropdb_handlerton, DRIZZLE_STORAGE_ENGINE_PLUGIN, path);
 
201
}
 
202
 
 
203
 
 
204
static bool closecon_handlerton(Session *session, plugin_ref plugin,
 
205
                                void *unused __attribute__((unused)))
 
206
{
 
207
  handlerton *hton= plugin_data(plugin, handlerton *);
 
208
  /*
 
209
    there's no need to rollback here as all transactions must
 
210
    be rolled back already
 
211
  */
 
212
  if (hton->state == SHOW_OPTION_YES && hton->close_connection &&
 
213
      session_get_ha_data(session, hton))
 
214
    hton->close_connection(hton, session);
 
215
  return false;
 
216
}
 
217
 
 
218
 
 
219
/**
 
220
  @note
 
221
    don't bother to rollback here, it's done already
 
222
*/
 
223
void ha_close_connection(Session* session)
 
224
{
 
225
  plugin_foreach(session, closecon_handlerton, DRIZZLE_STORAGE_ENGINE_PLUGIN, 0);
 
226
}
184
227
 
185
228
/* ========================================================================
186
229
 ======================= TRANSACTIONS ===================================*/
362
405
  in each engine independently. The two-phase commit protocol
363
406
  is used only if:
364
407
  - all participating engines support two-phase commit (provide
365
 
    StorageEngine::prepare PSEA API call) and
 
408
    handlerton::prepare PSEA API call) and
366
409
  - transactions in at least two engines modify data (i.e. are
367
410
  not read-only).
368
411
 
426
469
 
427
470
  At the end of a statement, server call
428
471
  ha_autocommit_or_rollback() is invoked. This call in turn
429
 
  invokes StorageEngine::prepare() for every involved engine.
430
 
  Prepare is followed by a call to StorageEngine::commit_one_phase()
431
 
  If a one-phase commit will suffice, StorageEngine::prepare() is not
432
 
  invoked and the server only calls StorageEngine::commit_one_phase().
 
472
  invokes handlerton::prepare() for every involved engine.
 
473
  Prepare is followed by a call to handlerton::commit_one_phase()
 
474
  If a one-phase commit will suffice, handlerton::prepare() is not
 
475
  invoked and the server only calls handlerton::commit_one_phase().
433
476
  At statement commit, the statement-related read-write engine
434
477
  flag is propagated to the corresponding flag in the normal
435
478
  transaction.  When the commit is complete, the list of registered
444
487
  do not "register" in session->transaction lists, and thus do not
445
488
  modify the transaction state. Besides, each DDL in
446
489
  MySQL is prefixed with an implicit normal transaction commit
447
 
  (a call to Session::endActiveTransaction()), and thus leaves nothing
 
490
  (a call to end_active_trans()), and thus leaves nothing
448
491
  to modify.
449
492
  However, as it has been pointed out with CREATE TABLE .. SELECT,
450
493
  some DDL statements can start a *new* transaction.
488
531
    times per transaction.
489
532
 
490
533
*/
491
 
void trans_register_ha(Session *session, bool all, StorageEngine *engine)
 
534
void trans_register_ha(Session *session, bool all, handlerton *ht_arg)
492
535
{
493
536
  Session_TRANS *trans;
494
537
  Ha_trx_info *ha_info;
501
544
  else
502
545
    trans= &session->transaction.stmt;
503
546
 
504
 
  ha_info= session->ha_data[engine->slot].ha_info + static_cast<unsigned>(all);
 
547
  ha_info= session->ha_data[ht_arg->slot].ha_info + static_cast<unsigned>(all);
505
548
 
506
549
  if (ha_info->is_started())
507
550
    return; /* already registered, return */
508
551
 
509
 
  ha_info->register_ha(trans, engine);
 
552
  ha_info->register_ha(trans, ht_arg);
510
553
 
511
 
  trans->no_2pc|= not engine->has_2pc();
 
554
  trans->no_2pc|=(ht_arg->prepare==0);
512
555
  if (session->transaction.xid_state.xid.is_null())
513
556
    session->transaction.xid_state.xid.set(session->query_id);
514
557
 
531
574
    for (; ha_info; ha_info= ha_info->next())
532
575
    {
533
576
      int err;
534
 
      StorageEngine *engine= ha_info->engine();
 
577
      handlerton *ht= ha_info->ht();
535
578
      status_var_increment(session->status_var.ha_prepare_count);
536
 
      if ((err= engine->prepare(session, all)))
 
579
      if (ht->prepare)
537
580
      {
538
 
        my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
539
 
        ha_rollback_trans(session, all);
540
 
        error=1;
541
 
        break;
 
581
        if ((err= ht->prepare(ht, session, all)))
 
582
        {
 
583
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
 
584
          ha_rollback_trans(session, all);
 
585
          error=1;
 
586
          break;
 
587
        }
542
588
      }
543
589
      else
544
590
      {
545
591
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
546
592
                            ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
547
 
                            engine->getName().c_str());
 
593
                            ha_resolve_storage_engine_name(ht));
548
594
      }
549
595
    }
550
596
  }
581
627
 
582
628
    if (! all)
583
629
    {
584
 
      Ha_trx_info *ha_info_all= &session->ha_data[ha_info->engine()->slot].ha_info[1];
 
630
      Ha_trx_info *ha_info_all= &session->ha_data[ha_info->ht()->slot].ha_info[1];
585
631
      assert(ha_info != ha_info_all);
586
632
      /*
587
633
        Merge read-only/read-write information about statement
631
677
  Session_TRANS *trans= all ? &session->transaction.all : &session->transaction.stmt;
632
678
  bool is_real_trans= all || session->transaction.all.ha_list == 0;
633
679
  Ha_trx_info *ha_info= trans->ha_list;
 
680
  my_xid xid= session->transaction.xid_state.xid.get_my_xid();
634
681
 
635
682
  /*
636
683
    We must not commit the normal transaction if a statement
651
698
      return(1);
652
699
    }
653
700
 
 
701
    if (   is_real_trans
 
702
        && opt_readonly
 
703
        && ! session->slave_thread
 
704
       )
 
705
    {
 
706
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
 
707
      ha_rollback_trans(session, all);
 
708
      error= 1;
 
709
      goto end;
 
710
    }
 
711
 
654
712
    must_2pc= ha_check_and_coalesce_trx_read_only(session, ha_info, all);
655
713
 
656
714
    if (!trans->no_2pc && must_2pc)
658
716
      for (; ha_info && !error; ha_info= ha_info->next())
659
717
      {
660
718
        int err;
661
 
        StorageEngine *engine= ha_info->engine();
 
719
        handlerton *ht= ha_info->ht();
662
720
        /*
663
721
          Do not call two-phase commit if this particular
664
722
          transaction is read-only. This allows for simpler
670
728
          Sic: we know that prepare() is not NULL since otherwise
671
729
          trans->no_2pc would have been set.
672
730
        */
673
 
        if ((err= engine->prepare(session, all)))
 
731
        if ((err= ht->prepare(ht, session, all)))
674
732
        {
675
733
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
676
734
          error= 1;
677
735
        }
678
736
        status_var_increment(session->status_var.ha_prepare_count);
679
737
      }
680
 
      if (error)
 
738
      if (error || (is_real_trans && xid &&
 
739
                    (error= !(cookie= tc_log->log_xid(session, xid)))))
681
740
      {
682
741
        ha_rollback_trans(session, all);
683
742
        error= 1;
685
744
      }
686
745
    }
687
746
    error=ha_commit_one_phase(session, all) ? (cookie ? 2 : 1) : 0;
 
747
    if (cookie)
 
748
      tc_log->unlog(cookie, xid);
688
749
end:
689
750
    if (is_real_trans)
690
751
      start_waiting_global_read_lock(session);
707
768
    for (; ha_info; ha_info= ha_info_next)
708
769
    {
709
770
      int err;
710
 
      StorageEngine *engine= ha_info->engine();
711
 
      if ((err= engine->commit(session, all)))
 
771
      handlerton *ht= ha_info->ht();
 
772
      if ((err= ht->commit(ht, session, all)))
712
773
      {
713
774
        my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
714
775
        error=1;
750
811
    for (; ha_info; ha_info= ha_info_next)
751
812
    {
752
813
      int err;
753
 
      StorageEngine *engine= ha_info->engine();
754
 
      if ((err= engine->rollback(session, all)))
 
814
      handlerton *ht= ha_info->ht();
 
815
      if ((err= ht->rollback(ht, session, all)))
755
816
      { // cannot happen
756
817
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
757
818
        error=1;
782
843
    the error log; but we don't want users to wonder why they have this
783
844
    message in the error log, so we don't send it.
784
845
  */
785
 
  if (is_real_trans && session->transaction.all.modified_non_trans_table && session->killed != Session::KILL_CONNECTION)
 
846
  if (is_real_trans && session->transaction.all.modified_non_trans_table &&
 
847
      !session->slave_thread && session->killed != Session::KILL_CONNECTION)
786
848
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
787
849
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
788
850
                 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
822
884
}
823
885
 
824
886
 
825
 
 
 
887
struct xahton_st {
 
888
  XID *xid;
 
889
  int result;
 
890
};
 
891
 
 
892
static bool xacommit_handlerton(Session *unused1 __attribute__((unused)),
 
893
                                plugin_ref plugin,
 
894
                                void *arg)
 
895
{
 
896
  handlerton *hton= plugin_data(plugin, handlerton *);
 
897
  if (hton->state == SHOW_OPTION_YES && hton->recover)
 
898
  {
 
899
    hton->commit_by_xid(hton, ((struct xahton_st *)arg)->xid);
 
900
    ((struct xahton_st *)arg)->result= 0;
 
901
  }
 
902
  return false;
 
903
}
 
904
 
 
905
static bool xarollback_handlerton(Session *unused1 __attribute__((unused)),
 
906
                                  plugin_ref plugin,
 
907
                                  void *arg)
 
908
{
 
909
  handlerton *hton= plugin_data(plugin, handlerton *);
 
910
  if (hton->state == SHOW_OPTION_YES && hton->recover)
 
911
  {
 
912
    hton->rollback_by_xid(hton, ((struct xahton_st *)arg)->xid);
 
913
    ((struct xahton_st *)arg)->result= 0;
 
914
  }
 
915
  return false;
 
916
}
 
917
 
 
918
 
 
919
int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
 
920
{
 
921
  struct xahton_st xaop;
 
922
  xaop.xid= xid;
 
923
  xaop.result= 1;
 
924
 
 
925
  plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
 
926
                 DRIZZLE_STORAGE_ENGINE_PLUGIN, &xaop);
 
927
 
 
928
  return xaop.result;
 
929
}
 
930
 
 
931
/**
 
932
  recover() step of xa.
 
933
 
 
934
  @note
 
935
    there are three modes of operation:
 
936
    - automatic recover after a crash
 
937
    in this case commit_list != 0, tc_heuristic_recover==0
 
938
    all xids from commit_list are committed, others are rolled back
 
939
    - manual (heuristic) recover
 
940
    in this case commit_list==0, tc_heuristic_recover != 0
 
941
    DBA has explicitly specified that all prepared transactions should
 
942
    be committed (or rolled back).
 
943
    - no recovery (MySQL did not detect a crash)
 
944
    in this case commit_list==0, tc_heuristic_recover == 0
 
945
    there should be no prepared transactions in this case.
 
946
*/
 
947
struct xarecover_st
 
948
{
 
949
  int len, found_foreign_xids, found_my_xids;
 
950
  XID *list;
 
951
  HASH *commit_list;
 
952
  bool dry_run;
 
953
};
 
954
 
 
955
static bool xarecover_handlerton(Session *unused __attribute__((unused)),
 
956
                                 plugin_ref plugin,
 
957
                                 void *arg)
 
958
{
 
959
  handlerton *hton= plugin_data(plugin, handlerton *);
 
960
  struct xarecover_st *info= (struct xarecover_st *) arg;
 
961
  int got;
 
962
 
 
963
  if (hton->state == SHOW_OPTION_YES && hton->recover)
 
964
  {
 
965
    while ((got= hton->recover(hton, info->list, info->len)) > 0 )
 
966
    {
 
967
      sql_print_information(_("Found %d prepared transaction(s) in %s"),
 
968
                            got, ha_resolve_storage_engine_name(hton));
 
969
      for (int i=0; i < got; i ++)
 
970
      {
 
971
        my_xid x=info->list[i].get_my_xid();
 
972
        if (!x) // not "mine" - that is generated by external TM
 
973
        {
 
974
          xid_cache_insert(info->list+i, XA_PREPARED);
 
975
          info->found_foreign_xids++;
 
976
          continue;
 
977
        }
 
978
        if (info->dry_run)
 
979
        {
 
980
          info->found_my_xids++;
 
981
          continue;
 
982
        }
 
983
        // recovery mode
 
984
        if (info->commit_list ?
 
985
            hash_search(info->commit_list, (unsigned char *)&x, sizeof(x)) != 0 :
 
986
            tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
 
987
        {
 
988
          hton->commit_by_xid(hton, info->list+i);
 
989
        }
 
990
        else
 
991
        {
 
992
          hton->rollback_by_xid(hton, info->list+i);
 
993
        }
 
994
      }
 
995
      if (got < info->len)
 
996
        break;
 
997
    }
 
998
  }
 
999
  return false;
 
1000
}
 
1001
 
 
1002
int ha_recover(HASH *commit_list)
 
1003
{
 
1004
  struct xarecover_st info;
 
1005
  info.found_foreign_xids= info.found_my_xids= 0;
 
1006
  info.commit_list= commit_list;
 
1007
  info.dry_run= (info.commit_list==0 && tc_heuristic_recover==0);
 
1008
  info.list= NULL;
 
1009
 
 
1010
  /* commit_list and tc_heuristic_recover cannot be set both */
 
1011
  assert(info.commit_list==0 || tc_heuristic_recover==0);
 
1012
  /* if either is set, total_ha_2pc must be set too */
 
1013
  assert(info.dry_run || total_ha_2pc>(uint32_t)opt_bin_log);
 
1014
 
 
1015
  if (total_ha_2pc <= (uint32_t)opt_bin_log)
 
1016
    return(0);
 
1017
 
 
1018
  if (info.commit_list)
 
1019
    sql_print_information(_("Starting crash recovery..."));
 
1020
 
 
1021
 
 
1022
#ifndef WILL_BE_DELETED_LATER
 
1023
 
 
1024
  /*
 
1025
    for now, only InnoDB supports 2pc. It means we can always safely
 
1026
    rollback all pending transactions, without risking inconsistent data
 
1027
  */
 
1028
 
 
1029
  assert(total_ha_2pc == (uint32_t) opt_bin_log+1); // only InnoDB and binlog
 
1030
  tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
 
1031
  info.dry_run=false;
 
1032
#endif
 
1033
 
 
1034
 
 
1035
  for (info.len= MAX_XID_LIST_SIZE ;
 
1036
       info.list==0 && info.len > MIN_XID_LIST_SIZE; info.len/=2)
 
1037
  {
 
1038
    info.list=(XID *)malloc(info.len*sizeof(XID));
 
1039
  }
 
1040
  if (!info.list)
 
1041
  {
 
1042
    sql_print_error(ER(ER_OUTOFMEMORY), info.len*sizeof(XID));
 
1043
    return(1);
 
1044
  }
 
1045
 
 
1046
  plugin_foreach(NULL, xarecover_handlerton,
 
1047
                 DRIZZLE_STORAGE_ENGINE_PLUGIN, &info);
 
1048
 
 
1049
  free((unsigned char*)info.list);
 
1050
  if (info.found_foreign_xids)
 
1051
    sql_print_warning(_("Found %d prepared XA transactions"),
 
1052
                      info.found_foreign_xids);
 
1053
  if (info.dry_run && info.found_my_xids)
 
1054
  {
 
1055
    sql_print_error(_("Found %d prepared transactions! It means that drizzled "
 
1056
                    "was not shut down properly last time and critical "
 
1057
                    "recovery information (last binlog or %s file) was "
 
1058
                    "manually deleted after a crash. You have to start "
 
1059
                    "drizzled with the --tc-heuristic-recover switch to "
 
1060
                    "commit or rollback pending transactions."),
 
1061
                    info.found_my_xids, opt_tc_log_file);
 
1062
    return(1);
 
1063
  }
 
1064
  if (info.commit_list)
 
1065
    sql_print_information(_("Crash recovery finished."));
 
1066
  return(0);
 
1067
}
826
1068
 
827
1069
/**
828
1070
  return the list of XID's to a client, the same way SHOW commands do.
868
1110
  }
869
1111
 
870
1112
  pthread_mutex_unlock(&LOCK_xid_cache);
871
 
  session->my_eof();
 
1113
  my_eof(session);
872
1114
  return(0);
873
1115
}
874
1116
 
 
1117
/**
 
1118
  @details
 
1119
  This function should be called when MySQL sends rows of a SELECT result set
 
1120
  or the EOF mark to the client. It releases a possible adaptive hash index
 
1121
  S-latch held by session in InnoDB and also releases a possible InnoDB query
 
1122
  FIFO ticket to enter InnoDB. To save CPU time, InnoDB allows a session to
 
1123
  keep them over several calls of the InnoDB handler interface when a join
 
1124
  is executed. But when we let the control to pass to the client they have
 
1125
  to be released because if the application program uses mysql_use_result(),
 
1126
  it may deadlock on the S-latch if the application on another connection
 
1127
  performs another SQL query. In MySQL-4.1 this is even more important because
 
1128
  there a connection can have several SELECT queries open at the same time.
 
1129
 
 
1130
  @param session           the thread handle of the current connection
 
1131
 
 
1132
  @return
 
1133
    always 0
 
1134
*/
 
1135
static bool release_temporary_latches(Session *session, plugin_ref plugin,
 
1136
                                      void *unused __attribute__((unused)))
 
1137
{
 
1138
  handlerton *hton= plugin_data(plugin, handlerton *);
 
1139
 
 
1140
  if (hton->state == SHOW_OPTION_YES && hton->release_temporary_latches)
 
1141
    hton->release_temporary_latches(hton, session);
 
1142
 
 
1143
  return false;
 
1144
}
 
1145
 
 
1146
 
 
1147
int ha_release_temporary_latches(Session *session)
 
1148
{
 
1149
  plugin_foreach(session, release_temporary_latches, DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1150
                 NULL);
 
1151
 
 
1152
  return 0;
 
1153
}
875
1154
 
876
1155
int ha_rollback_to_savepoint(Session *session, SAVEPOINT *sv)
877
1156
{
887
1166
  for (ha_info= sv->ha_list; ha_info; ha_info= ha_info->next())
888
1167
  {
889
1168
    int err;
890
 
    StorageEngine *engine= ha_info->engine();
891
 
    assert(engine);
892
 
    if ((err= engine->savepoint_rollback(session,
893
 
                                         (void *)(sv+1))))
 
1169
    handlerton *ht= ha_info->ht();
 
1170
    assert(ht);
 
1171
    assert(ht->savepoint_set != 0);
 
1172
    if ((err= ht->savepoint_rollback(ht, session,
 
1173
                                     (unsigned char *)(sv+1)+ht->savepoint_offset)))
894
1174
    { // cannot happen
895
1175
      my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
896
1176
      error=1;
897
1177
    }
898
1178
    status_var_increment(session->status_var.ha_savepoint_rollback_count);
899
 
    trans->no_2pc|= not engine->has_2pc();
 
1179
    trans->no_2pc|= ht->prepare == 0;
900
1180
  }
901
1181
  /*
902
1182
    rolling back the transaction in all storage engines that were not part of
906
1186
       ha_info= ha_info_next)
907
1187
  {
908
1188
    int err;
909
 
    StorageEngine *engine= ha_info->engine();
910
 
    if ((err= engine->rollback(session, !(0))))
 
1189
    handlerton *ht= ha_info->ht();
 
1190
    if ((err= ht->rollback(ht, session, !(0))))
911
1191
    { // cannot happen
912
1192
      my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
913
1193
      error=1;
934
1214
  for (; ha_info; ha_info= ha_info->next())
935
1215
  {
936
1216
    int err;
937
 
    StorageEngine *engine= ha_info->engine();
938
 
    assert(engine);
939
 
/*    if (! engine->savepoint_set)
 
1217
    handlerton *ht= ha_info->ht();
 
1218
    assert(ht);
 
1219
    if (! ht->savepoint_set)
940
1220
    {
941
1221
      my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "SAVEPOINT");
942
1222
      error=1;
943
1223
      break;
944
 
    } */
945
 
    if ((err= engine->savepoint_set(session, (void *)(sv+1))))
 
1224
    }
 
1225
    if ((err= ht->savepoint_set(ht, session, (unsigned char *)(sv+1)+ht->savepoint_offset)))
946
1226
    { // cannot happen
947
1227
      my_error(ER_GET_ERRNO, MYF(0), err);
948
1228
      error=1;
965
1245
  for (; ha_info; ha_info= ha_info->next())
966
1246
  {
967
1247
    int err;
968
 
    StorageEngine *engine= ha_info->engine();
 
1248
    handlerton *ht= ha_info->ht();
969
1249
    /* Savepoint life time is enclosed into transaction life time. */
970
 
    assert(engine);
971
 
    if ((err= engine->savepoint_release(session,
972
 
                                        (void *)(sv+1))))
 
1250
    assert(ht);
 
1251
    if (!ht->savepoint_release)
 
1252
      continue;
 
1253
    if ((err= ht->savepoint_release(ht, session,
 
1254
                                    (unsigned char *)(sv+1) + ht->savepoint_offset)))
973
1255
    { // cannot happen
974
1256
      my_error(ER_GET_ERRNO, MYF(0), err);
975
1257
      error=1;
979
1261
}
980
1262
 
981
1263
 
982
 
 
983
 
 
 
1264
static bool snapshot_handlerton(Session *session, plugin_ref plugin, void *arg)
 
1265
{
 
1266
  handlerton *hton= plugin_data(plugin, handlerton *);
 
1267
  if (hton->state == SHOW_OPTION_YES &&
 
1268
      hton->start_consistent_snapshot)
 
1269
  {
 
1270
    hton->start_consistent_snapshot(hton, session);
 
1271
    *((bool *)arg)= false;
 
1272
  }
 
1273
  return false;
 
1274
}
 
1275
 
 
1276
int ha_start_consistent_snapshot(Session *session)
 
1277
{
 
1278
  bool warn= true;
 
1279
 
 
1280
  plugin_foreach(session, snapshot_handlerton, DRIZZLE_STORAGE_ENGINE_PLUGIN, &warn);
 
1281
 
 
1282
  /*
 
1283
    Same idea as when one wants to CREATE TABLE in one engine which does not
 
1284
    exist:
 
1285
  */
 
1286
  if (warn)
 
1287
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
 
1288
                 "This MySQL server does not support any "
 
1289
                 "consistent-read capable storage engine");
 
1290
  return 0;
 
1291
}
 
1292
 
 
1293
 
 
1294
static bool flush_handlerton(Session *session __attribute__((unused)),
 
1295
                             plugin_ref plugin,
 
1296
                             void *arg __attribute__((unused)))
 
1297
{
 
1298
  handlerton *hton= plugin_data(plugin, handlerton *);
 
1299
  if (hton->state == SHOW_OPTION_YES && hton->flush_logs &&
 
1300
      hton->flush_logs(hton))
 
1301
    return true;
 
1302
  return false;
 
1303
}
 
1304
 
 
1305
 
 
1306
bool ha_flush_logs(handlerton *db_type)
 
1307
{
 
1308
  if (db_type == NULL)
 
1309
  {
 
1310
    if (plugin_foreach(NULL, flush_handlerton,
 
1311
                          DRIZZLE_STORAGE_ENGINE_PLUGIN, 0))
 
1312
      return true;
 
1313
  }
 
1314
  else
 
1315
  {
 
1316
    if (db_type->state != SHOW_OPTION_YES ||
 
1317
        (db_type->flush_logs && db_type->flush_logs(db_type)))
 
1318
      return true;
 
1319
  }
 
1320
  return false;
 
1321
}
 
1322
 
 
1323
static const char *check_lowercase_names(handler *file, const char *path,
 
1324
                                         char *tmp_path)
 
1325
{
 
1326
  if (lower_case_table_names != 2 || (file->ha_table_flags() & HA_FILE_BASED))
 
1327
    return path;
 
1328
 
 
1329
  /* Ensure that table handler get path in lower case */
 
1330
  if (tmp_path != path)
 
1331
    strcpy(tmp_path, path);
 
1332
 
 
1333
  /*
 
1334
    we only should turn into lowercase database/table part
 
1335
    so start the process after homedirectory
 
1336
  */
 
1337
  my_casedn_str(files_charset_info, tmp_path + drizzle_data_home_len);
 
1338
  return tmp_path;
 
1339
}
 
1340
 
 
1341
 
 
1342
/**
 
1343
  An interceptor to hijack the text of the error message without
 
1344
  setting an error in the thread. We need the text to present it
 
1345
  in the form of a warning to the user.
 
1346
*/
 
1347
 
 
1348
struct Ha_delete_table_error_handler: public Internal_error_handler
 
1349
{
 
1350
public:
 
1351
  virtual bool handle_error(uint32_t sql_errno,
 
1352
                            const char *message,
 
1353
                            DRIZZLE_ERROR::enum_warning_level level,
 
1354
                            Session *session);
 
1355
  char buff[DRIZZLE_ERRMSG_SIZE];
 
1356
};
 
1357
 
 
1358
 
 
1359
bool
 
1360
Ha_delete_table_error_handler::
 
1361
handle_error(uint32_t sql_errno  __attribute__((unused)),
 
1362
             const char *message,
 
1363
             DRIZZLE_ERROR::enum_warning_level level __attribute__((unused)),
 
1364
             Session *session __attribute__((unused)))
 
1365
{
 
1366
  /* Grab the error message */
 
1367
  strncpy(buff, message, sizeof(buff)-1);
 
1368
  return true;
 
1369
}
 
1370
 
 
1371
 
 
1372
struct handlerton_delete_table_args {
 
1373
  Session *session;
 
1374
  const char *path;
 
1375
  handler *file;
 
1376
  int error;
 
1377
};
 
1378
 
 
1379
static bool deletetable_handlerton(Session *unused1 __attribute__((unused)),
 
1380
                                   plugin_ref plugin,
 
1381
                                   void *args)
 
1382
{
 
1383
  struct handlerton_delete_table_args *dtargs= (struct handlerton_delete_table_args *) args;
 
1384
 
 
1385
  Session *session= dtargs->session;
 
1386
  const char *path= dtargs->path;
 
1387
 
 
1388
  handler *file;
 
1389
  char tmp_path[FN_REFLEN];
 
1390
 
 
1391
  if(dtargs->error!=ENOENT) /* already deleted table */
 
1392
    return false;
 
1393
 
 
1394
  handlerton *table_type= plugin_data(plugin, handlerton *);
 
1395
 
 
1396
  if(!table_type)
 
1397
    return false;
 
1398
 
 
1399
  if(!(table_type->state == SHOW_OPTION_YES && table_type->create))
 
1400
    return false;
 
1401
 
 
1402
  if ((file= table_type->create(table_type, NULL, session->mem_root)))
 
1403
    file->init();
 
1404
  else
 
1405
    return false;
 
1406
 
 
1407
  path= check_lowercase_names(file, path, tmp_path);
 
1408
  int error= file->ha_delete_table(path);
 
1409
 
 
1410
  if(error!=ENOENT)
 
1411
  {
 
1412
    dtargs->error= error;
 
1413
    if(dtargs->file)
 
1414
      delete dtargs->file;
 
1415
    dtargs->file= file;
 
1416
    return true;
 
1417
  }
 
1418
 
 
1419
  return false;
 
1420
}
 
1421
 
 
1422
/**
 
1423
  This should return ENOENT if the file doesn't exists.
 
1424
  The .frm file will be deleted only if we return 0 or ENOENT
 
1425
*/
 
1426
int ha_delete_table(Session *session, const char *path,
 
1427
                    const char *db, const char *alias, bool generate_warning)
 
1428
{
 
1429
  TABLE_SHARE dummy_share;
 
1430
  Table dummy_table;
 
1431
 
 
1432
  struct handlerton_delete_table_args dtargs;
 
1433
  dtargs.error= ENOENT;
 
1434
  dtargs.session= session;
 
1435
  dtargs.path= path;
 
1436
  dtargs.file= NULL;
 
1437
 
 
1438
  plugin_foreach(NULL, deletetable_handlerton, DRIZZLE_STORAGE_ENGINE_PLUGIN,
 
1439
                 &dtargs);
 
1440
 
 
1441
  memset(&dummy_table, 0, sizeof(dummy_table));
 
1442
  memset(&dummy_share, 0, sizeof(dummy_share));
 
1443
  dummy_table.s= &dummy_share;
 
1444
 
 
1445
  if (dtargs.error && generate_warning)
 
1446
  {
 
1447
    /*
 
1448
      Because file->print_error() use my_error() to generate the error message
 
1449
      we use an internal error handler to intercept it and store the text
 
1450
      in a temporary buffer. Later the message will be presented to user
 
1451
      as a warning.
 
1452
    */
 
1453
    Ha_delete_table_error_handler ha_delete_table_error_handler;
 
1454
 
 
1455
    /* Fill up strucutures that print_error may need */
 
1456
    dummy_share.path.str= (char*) path;
 
1457
    dummy_share.path.length= strlen(path);
 
1458
    dummy_share.db.str= (char*) db;
 
1459
    dummy_share.db.length= strlen(db);
 
1460
    dummy_share.table_name.str= (char*) alias;
 
1461
    dummy_share.table_name.length= strlen(alias);
 
1462
    dummy_table.alias= alias;
 
1463
 
 
1464
    handler *file= dtargs.file;
 
1465
    file->change_table_ptr(&dummy_table, &dummy_share);
 
1466
 
 
1467
    session->push_internal_handler(&ha_delete_table_error_handler);
 
1468
    file->print_error(dtargs.error, 0);
 
1469
 
 
1470
    session->pop_internal_handler();
 
1471
 
 
1472
    /*
 
1473
      XXX: should we convert *all* errors to warnings here?
 
1474
      What if the error is fatal?
 
1475
    */
 
1476
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR, dtargs.error,
 
1477
                ha_delete_table_error_handler.buff);
 
1478
  }
 
1479
 
 
1480
  if(dtargs.file)
 
1481
    delete dtargs.file;
 
1482
 
 
1483
  return dtargs.error;
 
1484
}
984
1485
 
985
1486
/****************************************************************************
986
1487
** General handler functions
1081
1582
 
1082
1583
void **handler::ha_data(Session *session) const
1083
1584
{
1084
 
  return session_ha_data(session, engine);
 
1585
  return session_ha_data(session, ht);
1085
1586
}
1086
1587
 
1087
1588
Session *handler::ha_session(void) const
1418
1919
        {
1419
1920
          nb_desired_values= AUTO_INC_DEFAULT_NB_ROWS *
1420
1921
            (1 << nb_already_reserved_intervals);
1421
 
          set_if_smaller(nb_desired_values, (uint64_t)AUTO_INC_DEFAULT_NB_MAX);
 
1922
          set_if_smaller(nb_desired_values, AUTO_INC_DEFAULT_NB_MAX);
1422
1923
        }
1423
1924
        else
1424
1925
          nb_desired_values= AUTO_INC_DEFAULT_NB_MAX;
1526
2027
  @param first_value         (OUT) the first value reserved by the handler
1527
2028
  @param nb_reserved_values  (OUT) how many values the handler reserved
1528
2029
*/
1529
 
void handler::get_auto_increment(uint64_t ,
1530
 
                                 uint64_t ,
1531
 
                                 uint64_t ,
 
2030
void handler::get_auto_increment(uint64_t offset __attribute__((unused)),
 
2031
                                 uint64_t increment __attribute__((unused)),
 
2032
                                 uint64_t nb_desired_values __attribute__((unused)),
1532
2033
                                 uint64_t *first_value,
1533
2034
                                 uint64_t *nb_reserved_values)
1534
2035
{
1611
2112
  else
1612
2113
  {
1613
2114
    /* Table is opened and defined at this point */
1614
 
    key_unpack(&str,table,(uint32_t) key_nr);
1615
 
    uint32_t max_length=DRIZZLE_ERRMSG_SIZE-(uint32_t) strlen(msg);
 
2115
    key_unpack(&str,table,(uint) key_nr);
 
2116
    uint32_t max_length=DRIZZLE_ERRMSG_SIZE-(uint) strlen(msg);
1616
2117
    if (str.length() >= max_length)
1617
2118
    {
1618
2119
      str.length(max_length-4);
1675
2176
      char key[MAX_KEY_LENGTH];
1676
2177
      String str(key,sizeof(key),system_charset_info);
1677
2178
      /* Table is opened and defined at this point */
1678
 
      key_unpack(&str,table,(uint32_t) key_nr);
 
2179
      key_unpack(&str,table,(uint) key_nr);
1679
2180
      max_length= (DRIZZLE_ERRMSG_SIZE-
1680
 
                   (uint32_t) strlen(ER(ER_FOREIGN_DUPLICATE_KEY)));
 
2181
                   (uint) strlen(ER(ER_FOREIGN_DUPLICATE_KEY)));
1681
2182
      if (str.length() >= max_length)
1682
2183
      {
1683
2184
        str.length(max_length-4);
1791
2292
    my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
1792
2293
               ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
1793
2294
    return;
 
2295
    break;
1794
2296
  default:
1795
2297
    {
1796
2298
      /* The error was "unknown" to this function.
1800
2302
      temporary= get_error_message(error, &str);
1801
2303
      if (!str.is_empty())
1802
2304
      {
1803
 
              const char* engine_name= table_type();
1804
 
              if (temporary)
1805
 
                my_error(ER_GET_TEMPORARY_ERRMSG, MYF(0), error, str.ptr(),
1806
 
                   engine_name);
1807
 
              else
1808
 
                my_error(ER_GET_ERRMSG, MYF(0), error, str.ptr(), engine_name);
 
2305
        const char* engine= table_type();
 
2306
        if (temporary)
 
2307
          my_error(ER_GET_TEMPORARY_ERRMSG, MYF(0), error, str.ptr(), engine);
 
2308
        else
 
2309
          my_error(ER_GET_ERRMSG, MYF(0), error, str.ptr(), engine);
1809
2310
      }
1810
2311
      else
1811
 
      {
1812
 
              my_error(ER_GET_ERRNO,errflag,error);
1813
 
      }
 
2312
        my_error(ER_GET_ERRNO,errflag,error);
1814
2313
      return;
1815
2314
    }
1816
2315
  }
1828
2327
  @return
1829
2328
    Returns true if this is a temporary error
1830
2329
*/
1831
 
bool handler::get_error_message(int ,
1832
 
                                String* )
 
2330
bool handler::get_error_message(int error __attribute__((unused)),
 
2331
                                String* buf __attribute__((unused)))
1833
2332
{
1834
2333
  return false;
1835
2334
}
1856
2355
        Field *field= table->field[keypart->fieldnr-1];
1857
2356
        if (field->type() == DRIZZLE_TYPE_BLOB)
1858
2357
        {
 
2358
          if (check_opt->sql_flags & TT_FOR_UPGRADE)
 
2359
            check_opt->flags= T_MEDIUM;
1859
2360
          return HA_ADMIN_NEEDS_CHECK;
1860
2361
        }
1861
2362
      }
1877
2378
*/
1878
2379
uint32_t handler::get_dup_key(int error)
1879
2380
{
1880
 
  table->file->errkey  = (uint32_t) -1;
 
2381
  table->file->errkey  = (uint) -1;
1881
2382
  if (error == HA_ERR_FOUND_DUPP_KEY || error == HA_ERR_FOREIGN_DUPLICATE_KEY ||
1882
2383
      error == HA_ERR_FOUND_DUPP_UNIQUE ||
1883
2384
      error == HA_ERR_DROP_INDEX_FK)
1965
2466
{
1966
2467
  int error;
1967
2468
 
 
2469
  if ((table->s->mysql_version >= DRIZZLE_VERSION_ID) &&
 
2470
      (check_opt->sql_flags & TT_FOR_UPGRADE))
 
2471
    return 0;
 
2472
 
1968
2473
  if (table->s->mysql_version < DRIZZLE_VERSION_ID)
1969
2474
  {
1970
2475
    if ((error= check_old_types()))
1972
2477
    error= ha_check_for_upgrade(check_opt);
1973
2478
    if (error && (error != HA_ADMIN_NEEDS_CHECK))
1974
2479
      return error;
 
2480
    if (!error && (check_opt->sql_flags & TT_FOR_UPGRADE))
 
2481
      return 0;
1975
2482
  }
1976
2483
  if ((error= check(session, check_opt)))
1977
2484
    return error;
1987
2494
void
1988
2495
handler::mark_trx_read_write()
1989
2496
{
1990
 
  Ha_trx_info *ha_info= &ha_session()->ha_data[engine->slot].ha_info[0];
 
2497
  Ha_trx_info *ha_info= &ha_session()->ha_data[ht->slot].ha_info[0];
1991
2498
  /*
1992
2499
    When a storage engine method is called, the transaction must
1993
2500
    have been started, unless it's a DDL call, for which the
1998
2505
  */
1999
2506
  if (ha_info->is_started())
2000
2507
  {
 
2508
    assert(has_transactions());
2001
2509
    /*
2002
2510
      table_share can be NULL in ha_delete_table(). See implementation
2003
2511
      of standalone function ha_delete_table() in sql_base.cc.
2231
2739
*/
2232
2740
 
2233
2741
int
2234
 
handler::ha_create(const char *name, Table *form, HA_CREATE_INFO *create_info)
 
2742
handler::ha_create(const char *name, Table *form, HA_CREATE_INFO *info)
2235
2743
{
2236
2744
  mark_trx_read_write();
2237
2745
 
2238
 
  return create(name, form, create_info);
 
2746
  return create(name, form, info);
2239
2747
}
2240
2748
 
2241
2749
 
2247
2755
 
2248
2756
int
2249
2757
handler::ha_create_handler_files(const char *name, const char *old_name,
2250
 
                                 int action_flag, HA_CREATE_INFO *create_info)
 
2758
                        int action_flag, HA_CREATE_INFO *info)
2251
2759
{
2252
2760
  mark_trx_read_write();
2253
2761
 
2254
 
  return create_handler_files(name, old_name, action_flag, create_info);
 
2762
  return create_handler_files(name, old_name, action_flag, info);
2255
2763
}
2256
2764
 
2257
2765
 
2276
2784
      So, let's commit an open transaction (if any) now.
2277
2785
    */
2278
2786
    if (!(error= ha_commit_trans(session, 0)))
2279
 
      if (! session->endTransaction(COMMIT))
2280
 
        error= 1;
2281
 
 
 
2787
      error= end_trans(session, COMMIT);
2282
2788
  }
2283
2789
  return(error);
2284
2790
}
2338
2844
** Some general functions that isn't in the handler class
2339
2845
****************************************************************************/
2340
2846
 
 
2847
/**
 
2848
  Initiates table-file and calls appropriate database-creator.
 
2849
 
 
2850
  @retval
 
2851
   0  ok
 
2852
  @retval
 
2853
   1  error
 
2854
*/
 
2855
int ha_create_table(Session *session, const char *path,
 
2856
                    const char *db, const char *table_name,
 
2857
                    HA_CREATE_INFO *create_info,
 
2858
                    bool update_create_info)
 
2859
{
 
2860
  int error= 1;
 
2861
  Table table;
 
2862
  char name_buff[FN_REFLEN];
 
2863
  const char *name;
 
2864
  TABLE_SHARE share;
 
2865
 
 
2866
  init_tmp_table_share(session, &share, db, 0, table_name, path);
 
2867
  if (open_table_def(session, &share, 0) ||
 
2868
      open_table_from_share(session, &share, "", 0, (uint) READ_ALL, 0, &table,
 
2869
                            OTM_CREATE))
 
2870
    goto err;
 
2871
 
 
2872
  if (update_create_info)
 
2873
    table.updateCreateInfo(create_info);
 
2874
 
 
2875
  name= check_lowercase_names(table.file, share.path.str, name_buff);
 
2876
 
 
2877
  error= table.file->ha_create(name, &table, create_info);
 
2878
  closefrm(&table, 0);
 
2879
  if (error)
 
2880
  {
 
2881
    strxmov(name_buff, db, ".", table_name, NULL);
 
2882
    my_error(ER_CANT_CREATE_TABLE, MYF(ME_BELL+ME_WAITTANG), name_buff, error);
 
2883
  }
 
2884
err:
 
2885
  free_table_share(&share);
 
2886
  return(error != 0);
 
2887
}
 
2888
 
 
2889
/**
 
2890
  Try to discover table from engine.
 
2891
 
 
2892
  @note
 
2893
    If found, write the frm file to disk.
 
2894
 
 
2895
  @retval
 
2896
  -1    Table did not exists
 
2897
  @retval
 
2898
   0    Table created ok
 
2899
  @retval
 
2900
   > 0  Error, table existed but could not be created
 
2901
*/
 
2902
int ha_create_table_from_engine(Session* session, const char *db, const char *name)
 
2903
{
 
2904
  int error;
 
2905
  unsigned char *frmblob;
 
2906
  size_t frmlen;
 
2907
  char path[FN_REFLEN];
 
2908
  HA_CREATE_INFO create_info;
 
2909
  Table table;
 
2910
  TABLE_SHARE share;
 
2911
 
 
2912
  memset(&create_info, 0, sizeof(create_info));
 
2913
  if ((error= ha_discover(session, db, name, &frmblob, &frmlen)))
 
2914
  {
 
2915
    /* Table could not be discovered and thus not created */
 
2916
    return(error);
 
2917
  }
 
2918
 
 
2919
  /*
 
2920
    Table exists in handler and could be discovered
 
2921
    frmblob and frmlen are set, write the frm to disk
 
2922
  */
 
2923
 
 
2924
  build_table_filename(path, FN_REFLEN-1, db, name, "", 0);
 
2925
  // Save the frm file
 
2926
  error= writefrm(path, frmblob, frmlen);
 
2927
  free(frmblob);
 
2928
  if (error)
 
2929
    return(2);
 
2930
 
 
2931
  init_tmp_table_share(session, &share, db, 0, name, path);
 
2932
  if (open_table_def(session, &share, 0))
 
2933
  {
 
2934
    return(3);
 
2935
  }
 
2936
  if (open_table_from_share(session, &share, "" ,0, 0, 0, &table, OTM_OPEN))
 
2937
  {
 
2938
    free_table_share(&share);
 
2939
    return(3);
 
2940
  }
 
2941
 
 
2942
  table.updateCreateInfo(&create_info);
 
2943
  create_info.table_options|= HA_OPTION_CREATE_FROM_ENGINE;
 
2944
 
 
2945
  check_lowercase_names(table.file, path, path);
 
2946
  error=table.file->ha_create(path, &table, &create_info);
 
2947
  closefrm(&table, 1);
 
2948
 
 
2949
  return(error != 0);
 
2950
}
2341
2951
 
2342
2952
void st_ha_check_opt::init()
2343
2953
{
2344
 
  flags= 0; 
2345
 
  use_frm= false;
 
2954
  flags= sql_flags= 0;
 
2955
  sort_buffer_size = current_session->variables.myisam_sort_buff_size;
2346
2956
}
2347
2957
 
2348
2958
 
2359
2969
/**
2360
2970
  Init a key cache if it has not been initied before.
2361
2971
*/
2362
 
int ha_init_key_cache(const char *,
 
2972
int ha_init_key_cache(const char *name __attribute__((unused)),
2363
2973
                      KEY_CACHE *key_cache)
2364
2974
{
2365
2975
  if (!key_cache->key_cache_inited)
2366
2976
  {
2367
2977
    pthread_mutex_lock(&LOCK_global_system_variables);
2368
2978
    uint32_t tmp_buff_size= (uint32_t) key_cache->param_buff_size;
2369
 
    uint32_t tmp_block_size= (uint32_t) key_cache->param_block_size;
 
2979
    uint32_t tmp_block_size= (uint) key_cache->param_block_size;
2370
2980
    uint32_t division_limit= key_cache->param_division_limit;
2371
2981
    uint32_t age_threshold=  key_cache->param_age_threshold;
2372
2982
    pthread_mutex_unlock(&LOCK_global_system_variables);
2437
3047
 
2438
3048
 
2439
3049
/**
 
3050
  Try to discover one table from handler(s).
 
3051
 
 
3052
  @retval
 
3053
    -1   Table did not exists
 
3054
  @retval
 
3055
    0   OK. In this case *frmblob and *frmlen are set
 
3056
  @retval
 
3057
    >0   error.  frmblob and frmlen may not be set
 
3058
*/
 
3059
struct st_discover_args
 
3060
{
 
3061
  const char *db;
 
3062
  const char *name;
 
3063
  unsigned char **frmblob;
 
3064
  size_t *frmlen;
 
3065
};
 
3066
 
 
3067
static bool discover_handlerton(Session *session, plugin_ref plugin,
 
3068
                                void *arg)
 
3069
{
 
3070
  st_discover_args *vargs= (st_discover_args *)arg;
 
3071
  handlerton *hton= plugin_data(plugin, handlerton *);
 
3072
  if (hton->state == SHOW_OPTION_YES && hton->discover &&
 
3073
      (!(hton->discover(hton, session, vargs->db, vargs->name,
 
3074
                        vargs->frmblob,
 
3075
                        vargs->frmlen))))
 
3076
    return true;
 
3077
 
 
3078
  return false;
 
3079
}
 
3080
 
 
3081
int ha_discover(Session *session, const char *db, const char *name,
 
3082
                unsigned char **frmblob, size_t *frmlen)
 
3083
{
 
3084
  int error= -1; // Table does not exist in any handler
 
3085
  st_discover_args args= {db, name, frmblob, frmlen};
 
3086
 
 
3087
  if (is_prefix(name, TMP_FILE_PREFIX)) /* skip temporary tables */
 
3088
    return(error);
 
3089
 
 
3090
  if (plugin_foreach(session, discover_handlerton,
 
3091
                 DRIZZLE_STORAGE_ENGINE_PLUGIN, &args))
 
3092
    error= 0;
 
3093
 
 
3094
  if (!error)
 
3095
    status_var_increment(session->status_var.ha_discover_count);
 
3096
  return(error);
 
3097
}
 
3098
 
 
3099
 
 
3100
/**
 
3101
  Call this function in order to give the handler the possiblity
 
3102
  to ask engine if there are any new tables that should be written to disk
 
3103
  or any dropped tables that need to be removed from disk
 
3104
*/
 
3105
struct st_find_files_args
 
3106
{
 
3107
  const char *db;
 
3108
  const char *path;
 
3109
  const char *wild;
 
3110
  bool dir;
 
3111
  List<LEX_STRING> *files;
 
3112
};
 
3113
 
 
3114
/**
 
3115
  Ask handler if the table exists in engine.
 
3116
  @retval
 
3117
    HA_ERR_NO_SUCH_TABLE     Table does not exist
 
3118
  @retval
 
3119
    HA_ERR_TABLE_EXIST       Table exists
 
3120
  @retval
 
3121
    \#                  Error code
 
3122
*/
 
3123
struct st_table_exists_in_engine_args
 
3124
{
 
3125
  const char *db;
 
3126
  const char *name;
 
3127
  int err;
 
3128
};
 
3129
 
 
3130
static bool table_exists_in_engine_handlerton(Session *session, plugin_ref plugin,
 
3131
                                              void *arg)
 
3132
{
 
3133
  st_table_exists_in_engine_args *vargs= (st_table_exists_in_engine_args *)arg;
 
3134
  handlerton *hton= plugin_data(plugin, handlerton *);
 
3135
 
 
3136
  int err= HA_ERR_NO_SUCH_TABLE;
 
3137
 
 
3138
  if (hton->state == SHOW_OPTION_YES && hton->table_exists_in_engine)
 
3139
    err = hton->table_exists_in_engine(hton, session, vargs->db, vargs->name);
 
3140
 
 
3141
  vargs->err = err;
 
3142
  if (vargs->err == HA_ERR_TABLE_EXIST)
 
3143
    return true;
 
3144
 
 
3145
  return false;
 
3146
}
 
3147
 
 
3148
int ha_table_exists_in_engine(Session* session, const char* db, const char* name)
 
3149
{
 
3150
  st_table_exists_in_engine_args args= {db, name, HA_ERR_NO_SUCH_TABLE};
 
3151
  plugin_foreach(session, table_exists_in_engine_handlerton,
 
3152
                 DRIZZLE_STORAGE_ENGINE_PLUGIN, &args);
 
3153
  return(args.err);
 
3154
}
 
3155
 
 
3156
/**
2440
3157
  Calculate cost of 'index only' scan for given index and number of records
2441
3158
 
2442
3159
  @param keynr    Index number
2457
3174
    Estimated cost of 'index only' scan
2458
3175
*/
2459
3176
 
2460
 
double handler::index_only_read_time(uint32_t keynr, double key_records)
 
3177
double handler::index_only_read_time(uint32_t keynr, double records)
2461
3178
{
 
3179
  double read_time;
2462
3180
  uint32_t keys_per_block= (stats.block_size/2/
2463
3181
                        (table->key_info[keynr].key_length + ref_length) + 1);
2464
 
  return ((double) (key_records + keys_per_block-1) /
2465
 
          (double) keys_per_block);
 
3182
  read_time=((double) (records + keys_per_block-1) /
 
3183
             (double) keys_per_block);
 
3184
  return read_time;
2466
3185
}
2467
3186
 
2468
3187
 
2504
3223
ha_rows
2505
3224
handler::multi_range_read_info_const(uint32_t keyno, RANGE_SEQ_IF *seq,
2506
3225
                                     void *seq_init_param,
2507
 
                                     uint32_t ,
 
3226
                                     uint32_t n_ranges_arg __attribute__((unused)),
2508
3227
                                     uint32_t *bufsz, uint32_t *flags, COST_VECT *cost)
2509
3228
{
2510
3229
  KEY_MULTI_RANGE range;
2550
3269
    cost->zero();
2551
3270
    cost->avg_io_cost= 1; /* assume random seeks */
2552
3271
    if ((*flags & HA_MRR_INDEX_ONLY) && total_rows > 2)
2553
 
      cost->io_count= index_only_read_time(keyno, (uint32_t)total_rows);
 
3272
      cost->io_count= index_only_read_time(keyno, (uint)total_rows);
2554
3273
    else
2555
3274
      cost->io_count= read_time(keyno, n_ranges, total_rows);
2556
3275
    cost->cpu_cost= (double) total_rows / TIME_FOR_COMPARE + 0.01;
2656
3375
int
2657
3376
handler::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
2658
3377
                               uint32_t n_ranges, uint32_t mode,
2659
 
                               HANDLER_BUFFER *)
 
3378
                               HANDLER_BUFFER *buf __attribute__((unused)))
2660
3379
{
2661
3380
  mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
2662
3381
  mrr_funcs= *seq_funcs;
2755
3474
  @retval other Error
2756
3475
*/
2757
3476
 
2758
 
int DsMrr_impl::dsmrr_init(handler *h_in, KEY *key,
 
3477
int DsMrr_impl::dsmrr_init(handler *h, KEY *key,
2759
3478
                           RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
2760
3479
                           uint32_t n_ranges, uint32_t mode, HANDLER_BUFFER *buf)
2761
3480
{
2763
3482
  uint32_t keyno;
2764
3483
  Item *pushed_cond= NULL;
2765
3484
  handler *new_h2;
2766
 
  keyno= h_in->active_index;
 
3485
  keyno= h->active_index;
2767
3486
  assert(h2 == NULL);
2768
3487
  if (mode & HA_MRR_USE_DEFAULT_IMPL || mode & HA_MRR_SORTED)
2769
3488
  {
2770
3489
    use_default_impl= true;
2771
 
    return(h_in->handler::multi_range_read_init(seq_funcs, seq_init_param,
 
3490
    return(h->handler::multi_range_read_init(seq_funcs, seq_init_param,
2772
3491
                                                  n_ranges, mode, buf));
2773
3492
  }
2774
3493
  rowids_buf= buf->buffer;
2775
3494
  //psergey-todo: don't add key_length as it is not needed anymore
2776
 
  rowids_buf += key->key_length + h_in->ref_length;
 
3495
  rowids_buf += key->key_length + h->ref_length;
2777
3496
 
2778
3497
  is_mrr_assoc= !test(mode & HA_MRR_NO_ASSOCIATION);
2779
3498
  rowids_buf_end= buf->buffer_end;
2780
3499
 
2781
 
  elem_size= h_in->ref_length + (int)is_mrr_assoc * sizeof(void*);
 
3500
  elem_size= h->ref_length + (int)is_mrr_assoc * sizeof(void*);
2782
3501
  rowids_buf_last= rowids_buf +
2783
3502
                      ((rowids_buf_end - rowids_buf)/ elem_size)*
2784
3503
                      elem_size;
2786
3505
 
2787
3506
  /* Create a separate handler object to do rndpos() calls. */
2788
3507
  Session *session= current_session;
2789
 
  if (!(new_h2= h_in->clone(session->mem_root)) ||
 
3508
  if (!(new_h2= h->clone(session->mem_root)) ||
2790
3509
      new_h2->ha_external_lock(session, F_RDLCK))
2791
3510
  {
2792
3511
    delete new_h2;
2793
3512
    return(1);
2794
3513
  }
2795
3514
 
2796
 
  if (keyno == h_in->pushed_idx_cond_keyno)
2797
 
    pushed_cond= h_in->pushed_idx_cond;
2798
 
  if (h_in->ha_index_end())
 
3515
  if (keyno == h->pushed_idx_cond_keyno)
 
3516
    pushed_cond= h->pushed_idx_cond;
 
3517
  if (h->ha_index_end())
2799
3518
  {
2800
3519
    new_h2= h2;
2801
3520
    goto error;
2823
3542
  if (dsmrr_eof)
2824
3543
    buf->end_of_used_area= rowids_buf_last;
2825
3544
 
2826
 
  if (h_in->ha_rnd_init(false))
 
3545
  if (h->ha_rnd_init(false))
2827
3546
    goto error;
2828
3547
 
2829
3548
  return(0);
2873
3592
  @retval other  Error
2874
3593
*/
2875
3594
 
2876
 
int DsMrr_impl::dsmrr_fill_buffer(handler *)
 
3595
int DsMrr_impl::dsmrr_fill_buffer(handler *unused __attribute__((unused)))
2877
3596
{
2878
3597
  char *range_info;
2879
3598
  int res = 0;
2914
3633
  DS-MRR implementation: multi_range_read_next() function
2915
3634
*/
2916
3635
 
2917
 
int DsMrr_impl::dsmrr_next(handler *h_in, char **range_info)
 
3636
int DsMrr_impl::dsmrr_next(handler *h, char **range_info)
2918
3637
{
2919
3638
  int res;
2920
3639
 
2921
3640
  if (use_default_impl)
2922
 
    return h_in->handler::multi_range_read_next(range_info);
 
3641
    return h->handler::multi_range_read_next(range_info);
2923
3642
 
2924
3643
  if (rowids_buf_cur == rowids_buf_last)
2925
3644
  {
2940
3659
    goto end;
2941
3660
  }
2942
3661
 
2943
 
  res= h_in->rnd_pos(table->record[0], rowids_buf_cur);
2944
 
  rowids_buf_cur += h_in->ref_length;
 
3662
  res= h->rnd_pos(table->record[0], rowids_buf_cur);
 
3663
  rowids_buf_cur += h->ref_length;
2945
3664
  if (is_mrr_assoc)
2946
3665
  {
2947
3666
    memcpy(range_info, rowids_buf_cur, sizeof(void*));
3157
3876
    return true; /* Buffer has not enough space for even 1 rowid */
3158
3877
 
3159
3878
  /* Number of iterations we'll make with full buffer */
3160
 
  n_full_steps= (uint32_t)floor(rows2double(rows) / max_buff_entries);
 
3879
  n_full_steps= (uint)floor(rows2double(rows) / max_buff_entries);
3161
3880
 
3162
3881
  /*
3163
3882
    Get numbers of rows we'll be processing in
3279
3998
  if (table->file->primary_key_is_clustered())
3280
3999
  {
3281
4000
    cost->io_count= table->file->read_time(table->s->primary_key,
3282
 
                                           (uint32_t) nrows, nrows);
 
4001
                                           (uint) nrows, nrows);
3283
4002
  }
3284
4003
  else
3285
4004
  {
3328
4047
int handler::read_range_first(const key_range *start_key,
3329
4048
                              const key_range *end_key,
3330
4049
                              bool eq_range_arg,
3331
 
                              bool )
 
4050
                              bool sorted  __attribute__((unused)))
3332
4051
{
3333
4052
  int result;
3334
4053
 
3449
4168
}
3450
4169
 
3451
4170
 
 
4171
/**
 
4172
  Returns a list of all known extensions.
 
4173
 
 
4174
    No mutexes, worst case race is a minor surplus memory allocation
 
4175
    We have to recreate the extension map if mysqld is restarted (for example
 
4176
    within libmysqld)
 
4177
 
 
4178
  @retval
 
4179
    pointer             pointer to TYPELIB structure
 
4180
*/
 
4181
static bool exts_handlerton(Session *unused __attribute__((unused)),
 
4182
                            plugin_ref plugin,
 
4183
                            void *arg)
 
4184
{
 
4185
  List<char> *found_exts= (List<char> *) arg;
 
4186
  handlerton *hton= plugin_data(plugin, handlerton *);
 
4187
  handler *file;
 
4188
  if (hton->state == SHOW_OPTION_YES && hton->create &&
 
4189
      (file= hton->create(hton, (TABLE_SHARE*) 0, current_session->mem_root)))
 
4190
  {
 
4191
    List_iterator_fast<char> it(*found_exts);
 
4192
    const char **ext, *old_ext;
 
4193
 
 
4194
    for (ext= file->bas_ext(); *ext; ext++)
 
4195
    {
 
4196
      while ((old_ext= it++))
 
4197
      {
 
4198
        if (!strcmp(old_ext, *ext))
 
4199
          break;
 
4200
      }
 
4201
      if (!old_ext)
 
4202
        found_exts->push_back((char *) *ext);
 
4203
 
 
4204
      it.rewind();
 
4205
    }
 
4206
    delete file;
 
4207
  }
 
4208
  return false;
 
4209
}
 
4210
 
 
4211
TYPELIB *ha_known_exts(void)
 
4212
{
 
4213
  if (!known_extensions.type_names || mysys_usage_id != known_extensions_id)
 
4214
  {
 
4215
    List<char> found_exts;
 
4216
    const char **ext, *old_ext;
 
4217
 
 
4218
    known_extensions_id= mysys_usage_id;
 
4219
 
 
4220
    plugin_foreach(NULL, exts_handlerton,
 
4221
                   DRIZZLE_STORAGE_ENGINE_PLUGIN, &found_exts);
 
4222
 
 
4223
    ext= (const char **) my_once_alloc(sizeof(char *)*
 
4224
                                       (found_exts.elements+1),
 
4225
                                       MYF(MY_WME | MY_FAE));
 
4226
 
 
4227
    assert(ext != 0);
 
4228
    known_extensions.count= found_exts.elements;
 
4229
    known_extensions.type_names= ext;
 
4230
 
 
4231
    List_iterator_fast<char> it(found_exts);
 
4232
    while ((old_ext= it++))
 
4233
      *ext++= old_ext;
 
4234
    *ext= 0;
 
4235
  }
 
4236
  return &known_extensions;
 
4237
}
 
4238
 
 
4239
 
3452
4240
static bool stat_print(Session *session, const char *type, uint32_t type_len,
3453
4241
                       const char *file, uint32_t file_len,
3454
4242
                       const char *status, uint32_t status_len)
3463
4251
  return false;
3464
4252
}
3465
4253
 
3466
 
bool ha_show_status(Session *session, StorageEngine *engine, enum ha_stat_type stat)
 
4254
bool ha_show_status(Session *session, handlerton *db_type, enum ha_stat_type stat)
3467
4255
{
3468
4256
  List<Item> field_list;
3469
4257
  Protocol *protocol= session->protocol;
3477
4265
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
3478
4266
    return true;
3479
4267
 
3480
 
  result= engine->show_status(session, stat_print, stat) ? 1 : 0;
 
4268
  result= db_type->show_status &&
 
4269
    db_type->show_status(db_type, session, stat_print, stat) ? 1 : 0;
3481
4270
 
3482
4271
  if (!result)
3483
 
    session->my_eof();
 
4272
    my_eof(session);
3484
4273
  return result;
3485
4274
}
3486
4275
 
3497
4286
  - table is not mysql.event
3498
4287
*/
3499
4288
 
3500
 
static bool binlog_log_row(Table* table,
3501
 
                           const unsigned char *before_record,
3502
 
                           const unsigned char *after_record)
3503
 
{
3504
 
  bool error= false;
 
4289
static bool check_table_binlog_row_based(Session *session, Table *table)
 
4290
{
 
4291
  if (table->s->cached_row_logging_check == -1)
 
4292
  {
 
4293
    int const check(table->s->tmp_table == NO_TMP_TABLE);
 
4294
    table->s->cached_row_logging_check= check;
 
4295
  }
 
4296
 
 
4297
  assert(table->s->cached_row_logging_check == 0 ||
 
4298
              table->s->cached_row_logging_check == 1);
 
4299
 
 
4300
  return (table->s->cached_row_logging_check &&
 
4301
          (session->options & OPTION_BIN_LOG) &&
 
4302
          drizzle_bin_log.is_open());
 
4303
}
 
4304
 
 
4305
 
 
4306
/**
 
4307
   Write table maps for all (manually or automatically) locked tables
 
4308
   to the binary log.
 
4309
 
 
4310
   This function will generate and write table maps for all tables
 
4311
   that are locked by the thread 'session'.  Either manually locked
 
4312
   (stored in Session::locked_tables) and automatically locked (stored
 
4313
   in Session::lock) are considered.
 
4314
 
 
4315
   @param session     Pointer to Session structure
 
4316
 
 
4317
   @retval 0   All OK
 
4318
   @retval 1   Failed to write all table maps
 
4319
 
 
4320
   @sa
 
4321
       Session::lock
 
4322
       Session::locked_tables
 
4323
*/
 
4324
 
 
4325
static int write_locked_table_maps(Session *session)
 
4326
{
 
4327
  if (session->get_binlog_table_maps() == 0)
 
4328
  {
 
4329
    DRIZZLE_LOCK *locks[3];
 
4330
    locks[0]= session->extra_lock;
 
4331
    locks[1]= session->lock;
 
4332
    locks[2]= session->locked_tables;
 
4333
    for (uint32_t i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
 
4334
    {
 
4335
      DRIZZLE_LOCK const *const lock= locks[i];
 
4336
      if (lock == NULL)
 
4337
        continue;
 
4338
 
 
4339
      Table **const end_ptr= lock->table + lock->table_count;
 
4340
      for (Table **table_ptr= lock->table ;
 
4341
           table_ptr != end_ptr ;
 
4342
           ++table_ptr)
 
4343
      {
 
4344
        Table *const table= *table_ptr;
 
4345
        if (table->current_lock == F_WRLCK &&
 
4346
            check_table_binlog_row_based(session, table))
 
4347
        {
 
4348
          int const has_trans= table->file->has_transactions();
 
4349
          int const error= session->binlog_write_table_map(table, has_trans);
 
4350
          /*
 
4351
            If an error occurs, it is the responsibility of the caller to
 
4352
            roll back the transaction.
 
4353
          */
 
4354
          if (unlikely(error))
 
4355
            return(1);
 
4356
        }
 
4357
      }
 
4358
    }
 
4359
  }
 
4360
  return(0);
 
4361
}
 
4362
 
 
4363
 
 
4364
typedef bool Log_func(Session*, Table*, bool, const unsigned char*, const unsigned char*);
 
4365
 
 
4366
static int binlog_log_row(Table* table,
 
4367
                          const unsigned char *before_record,
 
4368
                          const unsigned char *after_record,
 
4369
                          Log_func *log_func)
 
4370
{
 
4371
  if (table->no_replicate)
 
4372
    return 0;
 
4373
 
 
4374
  bool error= 0;
3505
4375
  Session *const session= table->in_use;
3506
4376
 
3507
 
  if (table->no_replicate == false)
3508
 
    return false;
3509
 
 
3510
 
  error= replicator_session_init(session);
3511
 
 
3512
 
  switch (session->lex->sql_command)
 
4377
  if (check_table_binlog_row_based(session, table))
3513
4378
  {
3514
 
  case SQLCOM_REPLACE:
3515
 
  case SQLCOM_INSERT:
3516
 
  case SQLCOM_REPLACE_SELECT:
3517
 
  case SQLCOM_INSERT_SELECT:
3518
 
  case SQLCOM_CREATE_TABLE:
3519
 
    error= replicator_write_row(session, table);
3520
 
    break;
3521
 
 
3522
 
  case SQLCOM_UPDATE:
3523
 
  case SQLCOM_UPDATE_MULTI:
3524
 
    error= replicator_update_row(session, table, before_record, after_record);
3525
 
    break;
3526
 
 
3527
 
  case SQLCOM_DELETE:
3528
 
  case SQLCOM_DELETE_MULTI:
3529
 
    error= replicator_delete_row(session, table);
3530
 
    break;
3531
 
 
3532
4379
    /*
3533
 
      For everything else we ignore the event (since it just involves a temp table)
 
4380
      If there are no table maps written to the binary log, this is
 
4381
      the first row handled in this statement. In that case, we need
 
4382
      to write table maps for all locked tables to the binary log.
3534
4383
    */
3535
 
  default:
3536
 
    break;
 
4384
    if (likely(!(error= write_locked_table_maps(session))))
 
4385
    {
 
4386
      bool const has_trans= table->file->has_transactions();
 
4387
      error= (*log_func)(session, table, has_trans, before_record, after_record);
 
4388
    }
3537
4389
  }
3538
4390
 
3539
 
  return error;
 
4391
  return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
3540
4392
}
3541
4393
 
3542
4394
int handler::ha_external_lock(Session *session, int lock_type)
3585
4437
int handler::ha_write_row(unsigned char *buf)
3586
4438
{
3587
4439
  int error;
 
4440
  Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
3588
4441
  DRIZZLE_INSERT_ROW_START();
3589
4442
 
3590
 
  /* 
3591
 
   * If we have a timestamp column, update it to the current time 
3592
 
   * 
3593
 
   * @TODO Technically, the below two lines can be take even further out of the
3594
 
   * handler interface and into the fill_record() method.
3595
 
   */
3596
 
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
3597
 
    table->timestamp_field->set_time();
3598
 
 
3599
4443
  mark_trx_read_write();
3600
4444
 
3601
4445
  if (unlikely(error= write_row(buf)))
3602
4446
    return(error);
3603
 
 
3604
 
  if (unlikely(binlog_log_row(table, 0, buf)))
3605
 
    return HA_ERR_RBR_LOGGING_FAILED; /* purecov: inspected */
3606
 
 
 
4447
  if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
 
4448
    return(error); /* purecov: inspected */
3607
4449
  DRIZZLE_INSERT_ROW_END();
3608
4450
  return(0);
3609
4451
}
3612
4454
int handler::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
3613
4455
{
3614
4456
  int error;
 
4457
  Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
3615
4458
 
3616
4459
  /*
3617
4460
    Some storage engines require that the new record is in record[0]
3623
4466
 
3624
4467
  if (unlikely(error= update_row(old_data, new_data)))
3625
4468
    return error;
3626
 
 
3627
 
  if (unlikely(binlog_log_row(table, old_data, new_data)))
3628
 
    return HA_ERR_RBR_LOGGING_FAILED;
3629
 
 
 
4469
  if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func)))
 
4470
    return error;
3630
4471
  return 0;
3631
4472
}
3632
4473
 
3633
4474
int handler::ha_delete_row(const unsigned char *buf)
3634
4475
{
3635
4476
  int error;
 
4477
  Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
3636
4478
 
3637
4479
  mark_trx_read_write();
3638
4480
 
3639
4481
  if (unlikely(error= delete_row(buf)))
3640
4482
    return error;
3641
 
 
3642
 
  if (unlikely(binlog_log_row(table, buf, 0)))
3643
 
    return HA_ERR_RBR_LOGGING_FAILED;
3644
 
 
 
4483
  if (unlikely(error= binlog_log_row(table, buf, 0, log_func)))
 
4484
    return error;
3645
4485
  return 0;
3646
4486
}
3647
4487