~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/cursor.cc

  • Committer: Monty Taylor
  • Date: 2010-02-05 08:11:15 UTC
  • mfrom: (1283 build)
  • mto: (1273.13.43 fix_is)
  • mto: This revision was merged to the branch mainline in revision 1300.
  • Revision ID: mordred@inaugust.com-20100205081115-dr82nvrwv4lvw7sd
Merged trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
32
32
#include "drizzled/gettext.h"
33
33
#include "drizzled/probes.h"
34
34
#include "drizzled/sql_parse.h"
35
 
#include "drizzled/optimizer/cost_vector.h"
 
35
#include "drizzled/cost_vect.h"
36
36
#include "drizzled/session.h"
37
37
#include "drizzled/sql_base.h"
38
 
#include "drizzled/transaction_services.h"
 
38
#include "drizzled/replication_services.h"
39
39
#include "drizzled/lock.h"
40
40
#include "drizzled/item/int.h"
41
41
#include "drizzled/item/empty_string.h"
43
43
#include "drizzled/message/table.pb.h"
44
44
#include "drizzled/plugin/client.h"
45
45
#include "drizzled/internal/my_sys.h"
46
 
#include "drizzled/plugin/event_observer.h"
 
46
#include "drizzled/transaction_services.h"
47
47
 
48
48
using namespace std;
49
49
 
54
54
** General Cursor functions
55
55
****************************************************************************/
56
56
Cursor::Cursor(plugin::StorageEngine &engine_arg,
57
 
               Table &arg)
58
 
  : table(arg),
59
 
    engine(engine_arg),
60
 
    estimation_rows_to_insert(0),
61
 
    ref(0),
 
57
               TableShare &share_arg)
 
58
  : table_share(&share_arg), table(0),
 
59
    estimation_rows_to_insert(0), engine(&engine_arg),
 
60
    ref(0), in_range_check_pushed_down(false),
62
61
    key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
63
62
    ref_length(sizeof(internal::my_off_t)),
64
63
    inited(NONE),
65
 
    locked(false),
 
64
    locked(false), implicit_emptied(0),
66
65
    next_insert_id(0), insert_id_for_cur_row(0)
67
66
{ }
68
67
 
73
72
}
74
73
 
75
74
 
76
 
/*
77
 
 * @note this only used in
78
 
 * optimizer::QuickRangeSelect::init_ror_merged_scan(bool reuse_handler) as
79
 
 * of the writing of this comment. -Brian
80
 
 */
81
75
Cursor *Cursor::clone(memory::Root *mem_root)
82
76
{
83
 
  Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
 
77
  Cursor *new_handler= table->s->db_type()->getCursor(*table->s, mem_root);
84
78
 
85
79
  /*
86
80
    Allocate Cursor->ref here because otherwise ha_open will allocate it
87
81
    on this->table->mem_root and we will not be able to reclaim that memory
88
82
    when the clone Cursor object is destroyed.
89
83
  */
90
 
  if (!(new_handler->ref= (unsigned char*) mem_root->alloc_root(ALIGN_SIZE(ref_length)*2)))
 
84
  if (!(new_handler->ref= (unsigned char*) alloc_root(mem_root, ALIGN_SIZE(ref_length)*2)))
91
85
    return NULL;
92
 
 
93
 
  TableIdentifier identifier(getTable()->getShare()->getSchemaName(),
94
 
                             getTable()->getShare()->getTableName(),
95
 
                             getTable()->getShare()->getType());
96
 
 
97
 
  if (new_handler && !new_handler->ha_open(identifier,
98
 
                                           getTable()->getDBStat(),
 
86
  if (new_handler && !new_handler->ha_open(table,
 
87
                                           table->s->normalized_path.str,
 
88
                                           table->getDBStat(),
99
89
                                           HA_OPEN_IGNORE_IF_LOCKED))
100
90
    return new_handler;
101
91
  return NULL;
102
92
}
103
93
 
104
 
/*
105
 
  DESCRIPTION
106
 
    given a buffer with a key value, and a map of keyparts
107
 
    that are present in this value, returns the length of the value
108
 
*/
109
 
uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
110
 
{
111
 
  /* works only with key prefixes */
112
 
  assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
113
 
 
114
 
  const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
115
 
  const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
116
 
  uint32_t length= 0;
117
 
 
118
 
  while (key_part_found < end_key_part_found && keypart_map_arg)
119
 
  {
120
 
    length+= key_part_found->store_length;
121
 
    keypart_map_arg >>= 1;
122
 
    key_part_found++;
123
 
  }
124
 
  return length;
125
 
}
126
 
 
127
 
int Cursor::startIndexScan(uint32_t idx, bool sorted)
 
94
int Cursor::ha_index_init(uint32_t idx, bool sorted)
128
95
{
129
96
  int result;
130
97
  assert(inited == NONE);
131
 
  if (!(result= doStartIndexScan(idx, sorted)))
 
98
  if (!(result= index_init(idx, sorted)))
132
99
    inited=INDEX;
133
100
  end_range= NULL;
134
101
  return result;
135
102
}
136
103
 
137
 
int Cursor::endIndexScan()
 
104
int Cursor::ha_index_end()
138
105
{
139
106
  assert(inited==INDEX);
140
107
  inited=NONE;
141
108
  end_range= NULL;
142
 
  return(doEndIndexScan());
 
109
  return(index_end());
143
110
}
144
111
 
145
 
int Cursor::startTableScan(bool scan)
 
112
int Cursor::ha_rnd_init(bool scan)
146
113
{
147
114
  int result;
148
115
  assert(inited==NONE || (inited==RND && scan));
149
 
  inited= (result= doStartTableScan(scan)) ? NONE: RND;
 
116
  inited= (result= rnd_init(scan)) ? NONE: RND;
150
117
 
151
118
  return result;
152
119
}
153
120
 
154
 
int Cursor::endTableScan()
 
121
int Cursor::ha_rnd_end()
155
122
{
156
123
  assert(inited==RND);
157
124
  inited=NONE;
158
 
  return(doEndTableScan());
 
125
  return(rnd_end());
159
126
}
160
127
 
161
128
int Cursor::ha_index_or_rnd_end()
162
129
{
163
 
  return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
 
130
  return inited == INDEX ? ha_index_end() : inited == RND ? ha_rnd_end() : 0;
164
131
}
165
132
 
166
133
void Cursor::ha_start_bulk_insert(ha_rows rows)
175
142
  return end_bulk_insert();
176
143
}
177
144
 
 
145
void Cursor::change_table_ptr(Table *table_arg, TableShare *share)
 
146
{
 
147
  table= table_arg;
 
148
  table_share= share;
 
149
}
 
150
 
178
151
const key_map *Cursor::keys_to_use_for_scanning()
179
152
{
180
153
  return &key_map_empty;
182
155
 
183
156
bool Cursor::has_transactions()
184
157
{
185
 
  return (getTable()->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
 
158
  return (table->s->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
186
159
}
187
160
 
188
 
void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset) const
 
161
void Cursor::ha_statistic_increment(ulong SSV::*offset) const
189
162
{
190
 
  (getTable()->in_use->status_var.*offset)++;
 
163
  status_var_increment(table->in_use->status_var.*offset);
191
164
}
192
165
 
193
166
void **Cursor::ha_data(Session *session) const
194
167
{
195
 
  return session->getEngineData(getEngine());
196
 
}
 
168
  return session->getEngineData(engine);
 
169
}
 
170
 
 
171
Session *Cursor::ha_session(void) const
 
172
{
 
173
  assert(!table || !table->in_use || table->in_use == current_session);
 
174
  return (table && table->in_use) ? table->in_use : current_session;
 
175
}
 
176
 
197
177
 
198
178
bool Cursor::is_fatal_error(int error, uint32_t flags)
199
179
{
207
187
 
208
188
 
209
189
ha_rows Cursor::records() { return stats.records; }
210
 
uint64_t Cursor::tableSize() { return stats.index_file_length + stats.data_file_length; }
211
 
uint64_t Cursor::rowSize() { return getTable()->getRecordLength() + getTable()->sizeFields(); }
212
 
 
213
 
int Cursor::doOpen(const TableIdentifier &identifier, int mode, uint32_t test_if_locked)
214
 
{
215
 
  return open(identifier.getPath().c_str(), mode, test_if_locked);
216
 
}
217
190
 
218
191
/**
219
192
  Open database-Cursor.
221
194
  Try O_RDONLY if cannot open as O_RDWR
222
195
  Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set
223
196
*/
224
 
int Cursor::ha_open(const TableIdentifier &identifier,
225
 
                    int mode,
226
 
                    int test_if_locked)
 
197
int Cursor::ha_open(Table *table_arg, const char *name, int mode,
 
198
                     int test_if_locked)
227
199
{
228
200
  int error;
229
201
 
230
 
  if ((error= doOpen(identifier, mode, test_if_locked)))
 
202
  table= table_arg;
 
203
  assert(table->s == table_share);
 
204
  assert(alloc_root_inited(&table->mem_root));
 
205
 
 
206
  if ((error=open(name,mode,test_if_locked)))
231
207
  {
232
208
    if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
233
 
        (getTable()->db_stat & HA_TRY_READ_ONLY))
 
209
        (table->db_stat & HA_TRY_READ_ONLY))
234
210
    {
235
 
      getTable()->db_stat|=HA_READ_ONLY;
236
 
      error= doOpen(identifier, O_RDONLY,test_if_locked);
 
211
      table->db_stat|=HA_READ_ONLY;
 
212
      error=open(name,O_RDONLY,test_if_locked);
237
213
    }
238
214
  }
239
215
  if (error)
242
218
  }
243
219
  else
244
220
  {
245
 
    if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
246
 
      getTable()->db_stat|=HA_READ_ONLY;
 
221
    if (table->s->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
 
222
      table->db_stat|=HA_READ_ONLY;
247
223
    (void) extra(HA_EXTRA_NO_READCHECK);        // Not needed in SQL
248
224
 
249
225
    /* ref is already allocated for us if we're called from Cursor::clone() */
250
 
    if (!ref && !(ref= (unsigned char*) getTable()->alloc_root(ALIGN_SIZE(ref_length)*2)))
 
226
    if (!ref && !(ref= (unsigned char*) alloc_root(&table->mem_root,
 
227
                                          ALIGN_SIZE(ref_length)*2)))
251
228
    {
252
229
      close();
253
230
      error=HA_ERR_OUT_OF_MEM;
259
236
}
260
237
 
261
238
/**
 
239
  one has to use this method when to find
 
240
  random position by record as the plain
 
241
  position() call doesn't work for some
 
242
  handlers for random position
 
243
*/
 
244
 
 
245
int Cursor::rnd_pos_by_record(unsigned char *record)
 
246
{
 
247
  register int error;
 
248
 
 
249
  position(record);
 
250
  if (inited && (error= ha_index_end()))
 
251
    return error;
 
252
  if ((error= ha_rnd_init(false)))
 
253
    return error;
 
254
 
 
255
  return rnd_pos(record, ref);
 
256
}
 
257
 
 
258
/**
262
259
  Read first row (only) from a table.
263
260
 
264
261
  This is never called for InnoDB tables, as these table types
268
265
{
269
266
  register int error;
270
267
 
271
 
  ha_statistic_increment(&system_status_var::ha_read_first_count);
 
268
  ha_statistic_increment(&SSV::ha_read_first_count);
272
269
 
273
270
  /*
274
271
    If there is very few deleted rows in the table, find the first row by
276
273
    TODO remove the test for HA_READ_ORDER
277
274
  */
278
275
  if (stats.deleted < 10 || primary_key >= MAX_KEY ||
279
 
      !(getTable()->index_flags(primary_key) & HA_READ_ORDER))
 
276
      !(table->index_flags(primary_key) & HA_READ_ORDER))
280
277
  {
281
 
    (void) startTableScan(1);
 
278
    (void) ha_rnd_init(1);
282
279
    while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
283
 
    (void) endTableScan();
 
280
    (void) ha_rnd_end();
284
281
  }
285
282
  else
286
283
  {
287
284
    /* Find the first row through the primary key */
288
 
    (void) startIndexScan(primary_key, 0);
 
285
    (void) ha_index_init(primary_key, 0);
289
286
    error=index_first(buf);
290
 
    (void) endIndexScan();
 
287
    (void) ha_index_end();
291
288
  }
292
289
  return error;
293
290
}
304
301
  @verbatim 1,5,15,25,35,... @endverbatim
305
302
*/
306
303
inline uint64_t
307
 
compute_next_insert_id(uint64_t nr, drizzle_system_variables *variables)
 
304
compute_next_insert_id(uint64_t nr,struct system_variables *variables)
308
305
{
309
306
  if (variables->auto_increment_increment == 1)
310
307
    return (nr+1); // optimization of the formula below
324
321
    Session::next_insert_id to be greater than the explicit value.
325
322
  */
326
323
  if ((next_insert_id > 0) && (nr >= next_insert_id))
327
 
    set_next_insert_id(compute_next_insert_id(nr, &getTable()->in_use->variables));
 
324
    set_next_insert_id(compute_next_insert_id(nr, &table->in_use->variables));
328
325
}
329
326
 
330
327
 
344
341
    The number X if it exists, "nr" otherwise.
345
342
*/
346
343
inline uint64_t
347
 
prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
 
344
prev_insert_id(uint64_t nr, struct system_variables *variables)
348
345
{
349
346
  if (unlikely(nr < variables->auto_increment_offset))
350
347
  {
441
438
{
442
439
  uint64_t nr, nb_reserved_values;
443
440
  bool append= false;
444
 
  Session *session= getTable()->in_use;
445
 
  drizzle_system_variables *variables= &session->variables;
 
441
  Session *session= table->in_use;
 
442
  struct system_variables *variables= &session->variables;
446
443
 
447
444
  /*
448
445
    next_insert_id is a "cursor" into the reserved interval, it may go greater
454
451
     for an auto increment column, not a magic value like NULL is.
455
452
     same as sql_mode=NO_AUTO_VALUE_ON_ZERO */
456
453
 
457
 
  if ((nr= getTable()->next_number_field->val_int()) != 0
458
 
      || getTable()->auto_increment_field_not_null)
 
454
  if ((nr= table->next_number_field->val_int()) != 0
 
455
      || table->auto_increment_field_not_null)
459
456
  {
460
457
    /*
461
458
      Update next_insert_id if we had already generated a value in this
533
530
      nr= compute_next_insert_id(nr-1, variables);
534
531
    }
535
532
 
536
 
    if (getTable()->getShare()->next_number_keypart == 0)
 
533
    if (table->s->next_number_keypart == 0)
537
534
    {
538
535
      /* We must defer the appending until "nr" has been possibly truncated */
539
536
      append= true;
540
537
    }
541
538
  }
542
539
 
543
 
  if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
 
540
  if (unlikely(table->next_number_field->store((int64_t) nr, true)))
544
541
  {
545
542
    /*
546
543
      first test if the query was aborted due to strict mode constraints
547
544
    */
548
 
    if (session->getKilled() == Session::KILL_BAD_DATA)
 
545
    if (session->killed == Session::KILL_BAD_DATA)
549
546
      return HA_ERR_AUTOINC_ERANGE;
550
547
 
551
548
    /*
556
553
      bother shifting the right bound (anyway any other value from this
557
554
      interval will cause a duplicate key).
558
555
    */
559
 
    nr= prev_insert_id(getTable()->next_number_field->val_int(), variables);
560
 
    if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
561
 
      nr= getTable()->next_number_field->val_int();
 
556
    nr= prev_insert_id(table->next_number_field->val_int(), variables);
 
557
    if (unlikely(table->next_number_field->store((int64_t) nr, true)))
 
558
      nr= table->next_number_field->val_int();
562
559
  }
563
560
  if (append)
564
561
  {
599
596
  @param first_value         (OUT) the first value reserved by the Cursor
600
597
  @param nb_reserved_values  (OUT) how many values the Cursor reserved
601
598
*/
 
599
void Cursor::get_auto_increment(uint64_t ,
 
600
                                 uint64_t ,
 
601
                                 uint64_t ,
 
602
                                 uint64_t *first_value,
 
603
                                 uint64_t *nb_reserved_values)
 
604
{
 
605
  uint64_t nr;
 
606
  int error;
 
607
 
 
608
  (void) extra(HA_EXTRA_KEYREAD);
 
609
  table->mark_columns_used_by_index_no_reset(table->s->next_number_index);
 
610
  index_init(table->s->next_number_index, 1);
 
611
  if (table->s->next_number_keypart == 0)
 
612
  {                                             // Autoincrement at key-start
 
613
    error=index_last(table->record[1]);
 
614
    /*
 
615
      MySQL implicitely assumes such method does locking (as MySQL decides to
 
616
      use nr+increment without checking again with the Cursor, in
 
617
      Cursor::update_auto_increment()), so reserves to infinite.
 
618
    */
 
619
    *nb_reserved_values= UINT64_MAX;
 
620
  }
 
621
  else
 
622
  {
 
623
    unsigned char key[MAX_KEY_LENGTH];
 
624
    key_copy(key, table->record[0],
 
625
             table->key_info + table->s->next_number_index,
 
626
             table->s->next_number_key_offset);
 
627
    error= index_read_map(table->record[1], key,
 
628
                          make_prev_keypart_map(table->s->next_number_keypart),
 
629
                          HA_READ_PREFIX_LAST);
 
630
    /*
 
631
      MySQL needs to call us for next row: assume we are inserting ("a",null)
 
632
      here, we return 3, and next this statement will want to insert
 
633
      ("b",null): there is no reason why ("b",3+1) would be the good row to
 
634
      insert: maybe it already exists, maybe 3+1 is too large...
 
635
    */
 
636
    *nb_reserved_values= 1;
 
637
  }
 
638
 
 
639
  if (error)
 
640
    nr=1;
 
641
  else
 
642
    nr= ((uint64_t) table->next_number_field->
 
643
         val_int_offset(table->s->rec_buff_length)+1);
 
644
  index_end();
 
645
  (void) extra(HA_EXTRA_NO_KEYREAD);
 
646
  *first_value= nr;
 
647
}
 
648
 
602
649
 
603
650
void Cursor::ha_release_auto_increment()
604
651
{
612
659
      this statement used forced auto_increment values if there were some,
613
660
      wipe them away for other statements.
614
661
    */
615
 
    getTable()->in_use->auto_inc_intervals_forced.empty();
 
662
    table->in_use->auto_inc_intervals_forced.empty();
616
663
  }
617
664
}
618
665
 
649
696
 
650
697
inline
651
698
void
652
 
Cursor::setTransactionReadWrite()
 
699
Cursor::mark_trx_read_write()
653
700
{
654
 
  ResourceContext *resource_context;
655
 
 
656
 
  /*
657
 
   * If the cursor has not context for execution then there should be no
658
 
   * possible resource to gain (and if there is... then there is a bug such
659
 
   * that in_use should have been set.
660
 
 */
661
 
  if (not getTable()->in_use)
662
 
    return;
663
 
 
664
 
  resource_context= getTable()->in_use->getResourceContext(getEngine());
 
701
  Ha_trx_info *ha_info= ha_session()->getEngineInfo(engine);
665
702
  /*
666
703
    When a storage engine method is called, the transaction must
667
704
    have been started, unless it's a DDL call, for which the
670
707
    Unfortunately here we can't know know for sure if the engine
671
708
    has registered the transaction or not, so we must check.
672
709
  */
673
 
  if (resource_context->isStarted())
 
710
  if (ha_info->is_started())
674
711
  {
675
 
    resource_context->markModifiedData();
 
712
      ha_info->set_trx_read_write();
676
713
  }
677
714
}
678
715
 
690
727
int
691
728
Cursor::ha_delete_all_rows()
692
729
{
693
 
  setTransactionReadWrite();
 
730
  mark_trx_read_write();
694
731
 
695
732
  int result= delete_all_rows();
696
733
 
699
736
    /** 
700
737
     * Trigger post-truncate notification to plugins... 
701
738
     *
702
 
     * @todo Make TransactionServices generic to AfterTriggerServices
 
739
     * @todo Make ReplicationServices generic to AfterTriggerServices
703
740
     * or similar...
704
741
     */
705
 
    Session *const session= getTable()->in_use;
706
 
    TransactionServices &transaction_services= TransactionServices::singleton();
707
 
    transaction_services.truncateTable(session, getTable());
 
742
    Session *const session= table->in_use;
 
743
    ReplicationServices &replication_services= ReplicationServices::singleton();
 
744
    replication_services.truncateTable(session, table);
708
745
  }
709
746
 
710
747
  return result;
720
757
int
721
758
Cursor::ha_reset_auto_increment(uint64_t value)
722
759
{
723
 
  setTransactionReadWrite();
 
760
  mark_trx_read_write();
724
761
 
725
762
  return reset_auto_increment(value);
726
763
}
735
772
int
736
773
Cursor::ha_analyze(Session* session, HA_CHECK_OPT*)
737
774
{
738
 
  setTransactionReadWrite();
 
775
  mark_trx_read_write();
739
776
 
740
777
  return analyze(session);
741
778
}
749
786
int
750
787
Cursor::ha_disable_indexes(uint32_t mode)
751
788
{
752
 
  setTransactionReadWrite();
 
789
  mark_trx_read_write();
753
790
 
754
791
  return disable_indexes(mode);
755
792
}
764
801
int
765
802
Cursor::ha_enable_indexes(uint32_t mode)
766
803
{
767
 
  setTransactionReadWrite();
 
804
  mark_trx_read_write();
768
805
 
769
806
  return enable_indexes(mode);
770
807
}
779
816
int
780
817
Cursor::ha_discard_or_import_tablespace(bool discard)
781
818
{
782
 
  setTransactionReadWrite();
 
819
  mark_trx_read_write();
783
820
 
784
821
  return discard_or_import_tablespace(discard);
785
822
}
793
830
void
794
831
Cursor::closeMarkForDelete(const char *name)
795
832
{
796
 
  setTransactionReadWrite();
 
833
  mark_trx_read_write();
797
834
 
798
835
  return drop_table(name);
799
836
}
803
840
  int error;
804
841
  if (!(error=index_next(buf)))
805
842
  {
806
 
    ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
 
843
    ptrdiff_t ptrdiff= buf - table->record[0];
807
844
    unsigned char *save_record_0= NULL;
808
 
    KeyInfo *key_info= NULL;
809
 
    KeyPartInfo *key_part;
810
 
    KeyPartInfo *key_part_end= NULL;
 
845
    KEY *key_info= NULL;
 
846
    KEY_PART_INFO *key_part;
 
847
    KEY_PART_INFO *key_part_end= NULL;
811
848
 
812
849
    /*
813
 
      key_cmp_if_same() compares table->getInsertRecord() against 'key'.
814
 
      In parts it uses table->getInsertRecord() directly, in parts it uses
815
 
      field objects with their local pointers into table->getInsertRecord().
816
 
      If 'buf' is distinct from table->getInsertRecord(), we need to move
817
 
      all record references. This is table->getInsertRecord() itself and
 
850
      key_cmp_if_same() compares table->record[0] against 'key'.
 
851
      In parts it uses table->record[0] directly, in parts it uses
 
852
      field objects with their local pointers into table->record[0].
 
853
      If 'buf' is distinct from table->record[0], we need to move
 
854
      all record references. This is table->record[0] itself and
818
855
      the field pointers of the fields used in this key.
819
856
    */
820
857
    if (ptrdiff)
821
858
    {
822
 
      save_record_0= getTable()->getInsertRecord();
823
 
      getTable()->record[0]= buf;
824
 
      key_info= getTable()->key_info + active_index;
 
859
      save_record_0= table->record[0];
 
860
      table->record[0]= buf;
 
861
      key_info= table->key_info + active_index;
825
862
      key_part= key_info->key_part;
826
863
      key_part_end= key_part + key_info->key_parts;
827
864
      for (; key_part < key_part_end; key_part++)
831
868
      }
832
869
    }
833
870
 
834
 
    if (key_cmp_if_same(getTable(), key, active_index, keylen))
 
871
    if (key_cmp_if_same(table, key, active_index, keylen))
835
872
    {
836
 
      getTable()->status=STATUS_NOT_FOUND;
 
873
      table->status=STATUS_NOT_FOUND;
837
874
      error=HA_ERR_END_OF_FILE;
838
875
    }
839
876
 
840
877
    /* Move back if necessary. */
841
878
    if (ptrdiff)
842
879
    {
843
 
      getTable()->record[0]= save_record_0;
 
880
      table->record[0]= save_record_0;
844
881
      for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
845
882
        key_part->field->move_field_offset(-ptrdiff);
846
883
    }
877
914
double Cursor::index_only_read_time(uint32_t keynr, double key_records)
878
915
{
879
916
  uint32_t keys_per_block= (stats.block_size/2/
880
 
                        (getTable()->key_info[keynr].key_length + ref_length) + 1);
 
917
                        (table->key_info[keynr].key_length + ref_length) + 1);
881
918
  return ((double) (key_records + keys_per_block-1) /
882
919
          (double) keys_per_block);
883
920
}
906
943
 
907
944
  @note
908
945
    This method (or an overriding one in a derived class) must check for
909
 
    session->getKilled() and return HA_POS_ERROR if it is not zero. This is required
 
946
    session->killed and return HA_POS_ERROR if it is not zero. This is required
910
947
    for a user to be able to interrupt the calculation by killing the
911
948
    connection/query.
912
949
 
922
959
Cursor::multi_range_read_info_const(uint32_t keyno, RANGE_SEQ_IF *seq,
923
960
                                     void *seq_init_param,
924
961
                                     uint32_t ,
925
 
                                     uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
 
962
                                     uint32_t *bufsz, uint32_t *flags, COST_VECT *cost)
926
963
{
927
964
  KEY_MULTI_RANGE range;
928
965
  range_seq_t seq_it;
929
966
  ha_rows rows, total_rows= 0;
930
967
  uint32_t n_ranges=0;
 
968
  Session *session= current_session;
931
969
 
932
970
  /* Default MRR implementation doesn't need buffer */
933
971
  *bufsz= 0;
935
973
  seq_it= seq->init(seq_init_param, n_ranges, *flags);
936
974
  while (!seq->next(seq_it, &range))
937
975
  {
 
976
    if (unlikely(session->killed != 0))
 
977
      return HA_POS_ERROR;
 
978
 
938
979
    n_ranges++;
939
980
    key_range *min_endp, *max_endp;
940
981
    {
961
1002
    /* The following calculation is the same as in multi_range_read_info(): */
962
1003
    *flags |= HA_MRR_USE_DEFAULT_IMPL;
963
1004
    cost->zero();
964
 
    cost->setAvgIOCost(1); /* assume random seeks */
 
1005
    cost->avg_io_cost= 1; /* assume random seeks */
965
1006
    if ((*flags & HA_MRR_INDEX_ONLY) && total_rows > 2)
966
 
      cost->setIOCount(index_only_read_time(keyno, (uint32_t)total_rows));
 
1007
      cost->io_count= index_only_read_time(keyno, (uint32_t)total_rows);
967
1008
    else
968
 
      cost->setIOCount(read_time(keyno, n_ranges, total_rows));
969
 
    cost->setCpuCost((double) total_rows / TIME_FOR_COMPARE + 0.01);
 
1009
      cost->io_count= read_time(keyno, n_ranges, total_rows);
 
1010
    cost->cpu_cost= (double) total_rows / TIME_FOR_COMPARE + 0.01;
970
1011
  }
971
1012
  return total_rows;
972
1013
}
1007
1048
*/
1008
1049
 
1009
1050
int Cursor::multi_range_read_info(uint32_t keyno, uint32_t n_ranges, uint32_t n_rows,
1010
 
                                   uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
 
1051
                                   uint32_t *bufsz, uint32_t *flags, COST_VECT *cost)
1011
1052
{
1012
1053
  *bufsz= 0; /* Default implementation doesn't need a buffer */
1013
1054
 
1014
1055
  *flags |= HA_MRR_USE_DEFAULT_IMPL;
1015
1056
 
1016
1057
  cost->zero();
1017
 
  cost->setAvgIOCost(1); /* assume random seeks */
 
1058
  cost->avg_io_cost= 1; /* assume random seeks */
1018
1059
 
1019
1060
  /* Produce the same cost as non-MRR code does */
1020
1061
  if (*flags & HA_MRR_INDEX_ONLY)
1021
 
    cost->setIOCount(index_only_read_time(keyno, n_rows));
 
1062
    cost->io_count= index_only_read_time(keyno, n_rows);
1022
1063
  else
1023
 
    cost->setIOCount(read_time(keyno, n_ranges, n_rows));
 
1064
    cost->io_count= read_time(keyno, n_ranges, n_rows);
1024
1065
  return 0;
1025
1066
}
1026
1067
 
1044
1085
  @param buf             INOUT: memory buffer to be used
1045
1086
 
1046
1087
  @note
1047
 
    One must have called doStartIndexScan() before calling this function. Several
 
1088
    One must have called index_init() before calling this function. Several
1048
1089
    multi_range_read_init() calls may be made in course of one query.
1049
1090
 
1050
1091
    Until WL#2623 is done (see its text, section 3.2), the following will
1059
1100
    The callee consumes all or some fraction of the provided buffer space, and
1060
1101
    sets the HANDLER_BUFFER members accordingly.
1061
1102
    The callee may use the buffer memory until the next multi_range_read_init()
1062
 
    call is made, all records have been read, or until doEndIndexScan() call is
 
1103
    call is made, all records have been read, or until index_end() call is
1063
1104
    made, whichever comes first.
1064
1105
 
1065
1106
  @retval 0  OK
1068
1109
 
1069
1110
int
1070
1111
Cursor::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
1071
 
                               uint32_t n_ranges, uint32_t mode)
 
1112
                               uint32_t n_ranges, uint32_t mode,
 
1113
                               HANDLER_BUFFER *)
1072
1114
{
1073
1115
  mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
1074
1116
  mrr_funcs= *seq_funcs;
1075
1117
  mrr_is_output_sorted= test(mode & HA_MRR_SORTED);
1076
1118
  mrr_have_range= false;
1077
 
 
1078
1119
  return 0;
1079
1120
}
1080
1121
 
1097
1138
  int result= 0;
1098
1139
  int range_res= 0;
1099
1140
 
1100
 
  if (not mrr_have_range)
 
1141
  if (!mrr_have_range)
1101
1142
  {
1102
1143
    mrr_have_range= true;
1103
1144
    goto start;
1159
1200
  @param sorted         Set to 1 if result should be sorted per key
1160
1201
 
1161
1202
  @note
1162
 
    Record is read into table->getInsertRecord()
 
1203
    Record is read into table->record[0]
1163
1204
 
1164
1205
  @retval
1165
1206
    0                   Found row
1184
1225
    key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
1185
1226
                                  (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
1186
1227
  }
1187
 
  range_key_part= getTable()->key_info[active_index].key_part;
 
1228
  range_key_part= table->key_info[active_index].key_part;
1188
1229
 
1189
1230
  if (!start_key)                       // Read first record
1190
 
    result= index_first(getTable()->getInsertRecord());
 
1231
    result= index_first(table->record[0]);
1191
1232
  else
1192
 
    result= index_read_map(getTable()->getInsertRecord(),
 
1233
    result= index_read_map(table->record[0],
1193
1234
                           start_key->key,
1194
1235
                           start_key->keypart_map,
1195
1236
                           start_key->flag);
1206
1247
  Read next row between two endpoints.
1207
1248
 
1208
1249
  @note
1209
 
    Record is read into table->getInsertRecord()
 
1250
    Record is read into table->record[0]
1210
1251
 
1211
1252
  @retval
1212
1253
    0                   Found row
1222
1263
  if (eq_range)
1223
1264
  {
1224
1265
    /* We trust that index_next_same always gives a row in range */
1225
 
    return(index_next_same(getTable()->getInsertRecord(),
 
1266
    return(index_next_same(table->record[0],
1226
1267
                                end_range->key,
1227
1268
                                end_range->length));
1228
1269
  }
1229
 
  result= index_next(getTable()->getInsertRecord());
 
1270
  result= index_next(table->record[0]);
1230
1271
  if (result)
1231
1272
    return result;
1232
1273
  return(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
1251
1292
int Cursor::compare_key(key_range *range)
1252
1293
{
1253
1294
  int cmp;
1254
 
  if (not range)
 
1295
  if (!range || in_range_check_pushed_down)
1255
1296
    return 0;                                   // No max range
1256
1297
  cmp= key_cmp(range_key_part, range->key, range->length);
1257
1298
  if (!cmp)
1259
1300
  return cmp;
1260
1301
}
1261
1302
 
 
1303
 
 
1304
/*
 
1305
  Same as compare_key() but doesn't check have in_range_check_pushed_down.
 
1306
  This is used by index condition pushdown implementation.
 
1307
*/
 
1308
 
 
1309
int Cursor::compare_key2(key_range *range)
 
1310
{
 
1311
  int cmp;
 
1312
  if (!range)
 
1313
    return 0;                                   // no max range
 
1314
  cmp= key_cmp(range_key_part, range->key, range->length);
 
1315
  if (!cmp)
 
1316
    cmp= key_compare_result_on_equal;
 
1317
  return cmp;
 
1318
}
 
1319
 
1262
1320
int Cursor::index_read_idx_map(unsigned char * buf, uint32_t index,
1263
1321
                                const unsigned char * key,
1264
1322
                                key_part_map keypart_map,
1265
1323
                                enum ha_rkey_function find_flag)
1266
1324
{
1267
1325
  int error, error1;
1268
 
  error= doStartIndexScan(index, 0);
 
1326
  error= index_init(index, 0);
1269
1327
  if (!error)
1270
1328
  {
1271
1329
    error= index_read_map(buf, key, keypart_map, find_flag);
1272
 
    error1= doEndIndexScan();
 
1330
    error1= index_end();
1273
1331
  }
1274
1332
  return error ?  error : error1;
1275
1333
}
1285
1343
                                    const unsigned char *before_record,
1286
1344
                                    const unsigned char *after_record)
1287
1345
{
1288
 
  TransactionServices &transaction_services= TransactionServices::singleton();
 
1346
  ReplicationServices &replication_services= ReplicationServices::singleton();
1289
1347
  Session *const session= table->in_use;
1290
1348
 
1291
 
  if (table->getShare()->getType() || not transaction_services.shouldConstructMessages())
 
1349
  if (table->s->tmp_table || ! replication_services.isActive())
1292
1350
    return false;
1293
1351
 
1294
1352
  bool result= false;
1295
1353
 
1296
1354
  switch (session->lex->sql_command)
1297
1355
  {
1298
 
  case SQLCOM_CREATE_TABLE:
1299
 
    /*
1300
 
     * We are in a CREATE TABLE ... SELECT statement
1301
 
     * and the kernel has already created the table
1302
 
     * and put a CreateTableStatement in the active
1303
 
     * Transaction message.  Here, we add a new InsertRecord
1304
 
     * to a new Transaction message (because the above
1305
 
     * CREATE TABLE will commit the transaction containing
1306
 
     * it).
1307
 
     */
1308
 
    result= transaction_services.insertRecord(session, table);
1309
 
    break;
1310
1356
  case SQLCOM_REPLACE:
1311
1357
  case SQLCOM_REPLACE_SELECT:
1312
1358
    /*
1313
1359
     * This is a total hack because of the code that is
1314
1360
     * in write_record() in sql_insert.cc. During
1315
 
     * a REPLACE statement, a call to insertRecord() is
1316
 
     * called.  If it fails, then a call to deleteRecord()
 
1361
     * a REPLACE statement, a call to ha_write_row() is
 
1362
     * called.  If it fails, then a call to ha_delete_row()
1317
1363
     * is called, followed by a repeat of the original
1318
 
     * call to insertRecord().  So, log_row_for_replication
1319
 
     * could be called multiple times for a REPLACE
 
1364
     * call to ha_write_row().  So, log_row_for_replication
 
1365
     * could be called either once or twice for a REPLACE
1320
1366
     * statement.  The below looks at the values of before_record
1321
1367
     * and after_record to determine which call to this
1322
1368
     * function is for the delete or the insert, since NULL
1329
1375
     */
1330
1376
    if (after_record == NULL)
1331
1377
    {
1332
 
      /*
1333
 
       * The storage engine is passed the record in table->record[1]
1334
 
       * as the row to delete (this is the conflicting row), so
1335
 
       * we need to notify TransactionService to use that row.
1336
 
       */
1337
 
      transaction_services.deleteRecord(session, table, true);
 
1378
      replication_services.deleteRecord(session, table);
1338
1379
      /* 
1339
1380
       * We set the "current" statement message to NULL.  This triggers
1340
1381
       * the replication services component to generate a new statement
1341
1382
       * message for the inserted record which will come next.
1342
1383
       */
1343
 
      transaction_services.finalizeStatementMessage(*session->getStatementMessage(), session);
 
1384
      replication_services.finalizeStatement(*session->getStatementMessage(), session);
1344
1385
    }
1345
1386
    else
1346
1387
    {
1347
1388
      if (before_record == NULL)
1348
 
        result= transaction_services.insertRecord(session, table);
 
1389
        result= replication_services.insertRecord(session, table);
1349
1390
      else
1350
 
        transaction_services.updateRecord(session, table, before_record, after_record);
 
1391
        replication_services.updateRecord(session, table, before_record, after_record);
1351
1392
    }
1352
1393
    break;
1353
1394
  case SQLCOM_INSERT:
1354
1395
  case SQLCOM_INSERT_SELECT:
1355
 
  case SQLCOM_LOAD:
1356
1396
    /*
1357
1397
     * The else block below represents an 
1358
1398
     * INSERT ... ON DUPLICATE KEY UPDATE that
1360
1400
     * an update.
1361
1401
     */
1362
1402
    if (before_record == NULL)
1363
 
      result= transaction_services.insertRecord(session, table);
 
1403
      result= replication_services.insertRecord(session, table);
1364
1404
    else
1365
 
      transaction_services.updateRecord(session, table, before_record, after_record);
 
1405
      replication_services.updateRecord(session, table, before_record, after_record);
1366
1406
    break;
1367
1407
 
1368
1408
  case SQLCOM_UPDATE:
1369
 
    transaction_services.updateRecord(session, table, before_record, after_record);
 
1409
    replication_services.updateRecord(session, table, before_record, after_record);
1370
1410
    break;
1371
1411
 
1372
1412
  case SQLCOM_DELETE:
1373
 
    transaction_services.deleteRecord(session, table);
 
1413
    replication_services.deleteRecord(session, table);
1374
1414
    break;
1375
1415
  default:
1376
1416
    break;
1394
1434
  {
1395
1435
    if (lock_type == F_RDLCK)
1396
1436
    {
1397
 
      DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
1398
 
                                  getTable()->getShare()->getTableName());
 
1437
      DRIZZLE_CURSOR_RDLOCK_START(table_share->db.str,
 
1438
                                  table_share->table_name.str);
1399
1439
    }
1400
1440
    else if (lock_type == F_WRLCK)
1401
1441
    {
1402
 
      DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
1403
 
                                  getTable()->getShare()->getTableName());
 
1442
      DRIZZLE_CURSOR_WRLOCK_START(table_share->db.str,
 
1443
                                  table_share->table_name.str);
1404
1444
    }
1405
1445
    else if (lock_type == F_UNLCK)
1406
1446
    {
1407
 
      DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
1408
 
                                  getTable()->getShare()->getTableName());
 
1447
      DRIZZLE_CURSOR_UNLOCK_START(table_share->db.str,
 
1448
                                  table_share->table_name.str);
1409
1449
    }
1410
1450
  }
1411
1451
 
1444
1484
int Cursor::ha_reset()
1445
1485
{
1446
1486
  /* Check that we have called all proper deallocation functions */
1447
 
  assert(! getTable()->getShare()->all_set.none());
1448
 
  assert(getTable()->key_read == 0);
1449
 
  /* ensure that ha_index_end / endTableScan has been called */
 
1487
  assert((unsigned char*) table->def_read_set.getBitmap() +
 
1488
              table->s->column_bitmap_size ==
 
1489
              (unsigned char*) table->def_write_set.getBitmap());
 
1490
  assert(table->s->all_set.isSetAll());
 
1491
  assert(table->key_read == 0);
 
1492
  /* ensure that ha_index_end / ha_rnd_end has been called */
1450
1493
  assert(inited == NONE);
1451
1494
  /* Free cache used by filesort */
1452
 
  getTable()->free_io_cache();
 
1495
  table->free_io_cache();
1453
1496
  /* reset the bitmaps to point to defaults */
1454
 
  getTable()->default_column_bitmaps();
 
1497
  table->default_column_bitmaps();
1455
1498
  return(reset());
1456
1499
}
1457
1500
 
1458
1501
 
1459
 
int Cursor::insertRecord(unsigned char *buf)
 
1502
int Cursor::ha_write_row(unsigned char *buf)
1460
1503
{
1461
1504
  int error;
1462
1505
 
1466
1509
   * @TODO Technically, the below two lines can be take even further out of the
1467
1510
   * Cursor interface and into the fill_record() method.
1468
1511
   */
1469
 
  if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
1470
 
  {
1471
 
    getTable()->timestamp_field->set_time();
1472
 
  }
1473
 
 
1474
 
  DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1475
 
  setTransactionReadWrite();
1476
 
  
1477
 
  if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
1478
 
  {
1479
 
    error= ER_EVENT_OBSERVER_PLUGIN;
1480
 
  }
1481
 
  else
1482
 
  {
1483
 
    error= doInsertRecord(buf);
1484
 
    if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error))) 
1485
 
    {
1486
 
      error= ER_EVENT_OBSERVER_PLUGIN;
1487
 
    }
1488
 
  }
1489
 
 
1490
 
  ha_statistic_increment(&system_status_var::ha_write_count);
1491
 
 
 
1512
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
1513
    table->timestamp_field->set_time();
 
1514
 
 
1515
  DRIZZLE_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
 
1516
  mark_trx_read_write();
 
1517
  error= write_row(buf);
1492
1518
  DRIZZLE_INSERT_ROW_DONE(error);
1493
1519
 
1494
1520
  if (unlikely(error))
1496
1522
    return error;
1497
1523
  }
1498
1524
 
1499
 
  if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
 
1525
  if (unlikely(log_row_for_replication(table, NULL, buf)))
1500
1526
    return HA_ERR_RBR_LOGGING_FAILED;
1501
1527
 
1502
1528
  return 0;
1503
1529
}
1504
1530
 
1505
1531
 
1506
 
int Cursor::updateRecord(const unsigned char *old_data, unsigned char *new_data)
 
1532
int Cursor::ha_update_row(const unsigned char *old_data, unsigned char *new_data)
1507
1533
{
1508
1534
  int error;
1509
1535
 
1510
1536
  /*
1511
 
    Some storage engines require that the new record is in getInsertRecord()
1512
 
    (and the old record is in getUpdateRecord()).
 
1537
    Some storage engines require that the new record is in record[0]
 
1538
    (and the old record is in record[1]).
1513
1539
   */
1514
 
  assert(new_data == getTable()->getInsertRecord());
1515
 
 
1516
 
  DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1517
 
  setTransactionReadWrite();
1518
 
  if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
1519
 
  {
1520
 
    error= ER_EVENT_OBSERVER_PLUGIN;
1521
 
  }
1522
 
  else
1523
 
  {
1524
 
    if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1525
 
    {
1526
 
      getTable()->timestamp_field->set_time();
1527
 
    }
1528
 
 
1529
 
    error= doUpdateRecord(old_data, new_data);
1530
 
    if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
1531
 
    {
1532
 
      error= ER_EVENT_OBSERVER_PLUGIN;
1533
 
    }
1534
 
  }
1535
 
 
1536
 
  ha_statistic_increment(&system_status_var::ha_update_count);
1537
 
 
 
1540
  assert(new_data == table->record[0]);
 
1541
 
 
1542
  DRIZZLE_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
 
1543
  mark_trx_read_write();
 
1544
  error= update_row(old_data, new_data);
1538
1545
  DRIZZLE_UPDATE_ROW_DONE(error);
1539
1546
 
1540
1547
  if (unlikely(error))
1542
1549
    return error;
1543
1550
  }
1544
1551
 
1545
 
  if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
 
1552
  if (unlikely(log_row_for_replication(table, old_data, new_data)))
1546
1553
    return HA_ERR_RBR_LOGGING_FAILED;
1547
1554
 
1548
1555
  return 0;
1549
1556
}
1550
 
TableShare *Cursor::getShare()
1551
 
{
1552
 
  return getTable()->getMutableShare();
1553
 
}
1554
1557
 
1555
 
int Cursor::deleteRecord(const unsigned char *buf)
 
1558
int Cursor::ha_delete_row(const unsigned char *buf)
1556
1559
{
1557
1560
  int error;
1558
1561
 
1559
 
  DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1560
 
  setTransactionReadWrite();
1561
 
  if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
1562
 
  {
1563
 
    error= ER_EVENT_OBSERVER_PLUGIN;
1564
 
  }
1565
 
  else
1566
 
  {
1567
 
    error= doDeleteRecord(buf);
1568
 
    if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
1569
 
    {
1570
 
      error= ER_EVENT_OBSERVER_PLUGIN;
1571
 
    }
1572
 
  }
1573
 
 
1574
 
  ha_statistic_increment(&system_status_var::ha_delete_count);
1575
 
 
 
1562
  DRIZZLE_DELETE_ROW_START(table_share->db.str, table_share->table_name.str);
 
1563
  mark_trx_read_write();
 
1564
  error= delete_row(buf);
1576
1565
  DRIZZLE_DELETE_ROW_DONE(error);
1577
1566
 
1578
1567
  if (unlikely(error))
1579
1568
    return error;
1580
1569
 
1581
 
  if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
 
1570
  if (unlikely(log_row_for_replication(table, buf, NULL)))
1582
1571
    return HA_ERR_RBR_LOGGING_FAILED;
1583
1572
 
1584
1573
  return 0;