~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/cursor.cc

  • Committer: Stewart Smith
  • Date: 2010-06-04 02:51:04 UTC
  • mto: This revision was merged to the branch mainline in revision 1590.
  • Revision ID: stewart@flamingspork.com-20100604025104-ilf5mrvwbpd5crzv
have a constant for the maximum number of enum elements

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
 
4
 *  Copyright (C) 2008 Sun Microsystems
5
5
 *
6
6
 *  This program is free software; you can redistribute it and/or modify
7
7
 *  it under the terms of the GNU General Public License as published by
23
23
  Handler-calling-functions
24
24
*/
25
25
 
26
 
#include <config.h>
 
26
#include "config.h"
27
27
 
28
28
#include <fcntl.h>
29
29
 
30
 
#include <drizzled/error.h>
31
 
#include <drizzled/field/epoch.h>
32
 
#include <drizzled/gettext.h>
33
 
#include <drizzled/internal/my_sys.h>
34
 
#include <drizzled/item/empty_string.h>
35
 
#include <drizzled/item/int.h>
36
 
#include <drizzled/lock.h>
37
 
#include <drizzled/message/table.h>
38
 
#include <drizzled/my_hash.h>
39
 
#include <drizzled/optimizer/cost_vector.h>
40
 
#include <drizzled/plugin/client.h>
41
 
#include <drizzled/plugin/event_observer.h>
42
 
#include <drizzled/plugin/storage_engine.h>
43
 
#include <drizzled/probes.h>
44
 
#include <drizzled/session.h>
45
 
#include <drizzled/sql_base.h>
46
 
#include <drizzled/sql_parse.h>
47
 
#include <drizzled/transaction_services.h>
 
30
#include "drizzled/my_hash.h"
 
31
#include "drizzled/error.h"
 
32
#include "drizzled/gettext.h"
 
33
#include "drizzled/probes.h"
 
34
#include "drizzled/sql_parse.h"
 
35
#include "drizzled/optimizer/cost_vector.h"
 
36
#include "drizzled/session.h"
 
37
#include "drizzled/sql_base.h"
 
38
#include "drizzled/transaction_services.h"
 
39
#include "drizzled/lock.h"
 
40
#include "drizzled/item/int.h"
 
41
#include "drizzled/item/empty_string.h"
 
42
#include "drizzled/field/timestamp.h"
 
43
#include "drizzled/message/table.pb.h"
 
44
#include "drizzled/plugin/client.h"
 
45
#include "drizzled/internal/my_sys.h"
 
46
#include "drizzled/plugin/event_observer.h"
48
47
 
49
48
using namespace std;
50
49
 
55
54
** General Cursor functions
56
55
****************************************************************************/
57
56
Cursor::Cursor(plugin::StorageEngine &engine_arg,
58
 
               Table &arg)
59
 
  : table(arg),
60
 
    engine(engine_arg),
61
 
    estimation_rows_to_insert(0),
 
57
               TableShare &share_arg)
 
58
  : table_share(&share_arg), table(0),
 
59
    estimation_rows_to_insert(0), engine(&engine_arg),
62
60
    ref(0),
63
61
    key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
64
62
    ref_length(sizeof(internal::my_off_t)),
74
72
}
75
73
 
76
74
 
77
 
/*
78
 
 * @note this only used in
79
 
 * optimizer::QuickRangeSelect::init_ror_merged_scan(bool reuse_handler) as
80
 
 * of the writing of this comment. -Brian
81
 
 */
82
75
Cursor *Cursor::clone(memory::Root *mem_root)
83
76
{
84
 
  Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
 
77
  Cursor *new_handler= table->getMutableShare()->db_type()->getCursor(*table->getMutableShare(), mem_root);
85
78
 
86
79
  /*
87
80
    Allocate Cursor->ref here because otherwise ha_open will allocate it
90
83
  */
91
84
  if (!(new_handler->ref= (unsigned char*) mem_root->alloc_root(ALIGN_SIZE(ref_length)*2)))
92
85
    return NULL;
93
 
 
94
 
  identifier::Table identifier(getTable()->getShare()->getSchemaName(),
95
 
                             getTable()->getShare()->getTableName(),
96
 
                             getTable()->getShare()->getType());
97
 
 
98
 
  if (new_handler && !new_handler->ha_open(identifier,
99
 
                                           getTable()->getDBStat(),
 
86
  if (new_handler && !new_handler->ha_open(table,
 
87
                                           table->getMutableShare()->getNormalizedPath(),
 
88
                                           table->getDBStat(),
100
89
                                           HA_OPEN_IGNORE_IF_LOCKED))
101
90
    return new_handler;
102
91
  return NULL;
112
101
  /* works only with key prefixes */
113
102
  assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
114
103
 
115
 
  const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
116
 
  const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
 
104
  const KeyPartInfo *key_part_found= table->getShare()->getKeyInfo(key_position).key_part;
 
105
  const KeyPartInfo *end_key_part_found= key_part_found + table->getShare()->getKeyInfo(key_position).key_parts;
117
106
  uint32_t length= 0;
118
107
 
119
108
  while (key_part_found < end_key_part_found && keypart_map_arg)
176
165
  return end_bulk_insert();
177
166
}
178
167
 
 
168
void Cursor::change_table_ptr(Table *table_arg, TableShare *share)
 
169
{
 
170
  table= table_arg;
 
171
  table_share= share;
 
172
}
 
173
 
179
174
const key_map *Cursor::keys_to_use_for_scanning()
180
175
{
181
176
  return &key_map_empty;
183
178
 
184
179
bool Cursor::has_transactions()
185
180
{
186
 
  return (getTable()->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
 
181
  return (table->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
187
182
}
188
183
 
189
 
void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset) const
 
184
void Cursor::ha_statistic_increment(ulong system_status_var::*offset) const
190
185
{
191
 
  (getTable()->in_use->status_var.*offset)++;
 
186
  status_var_increment(table->in_use->status_var.*offset);
192
187
}
193
188
 
194
189
void **Cursor::ha_data(Session *session) const
195
190
{
196
 
  return session->getEngineData(getEngine());
197
 
}
 
191
  return session->getEngineData(engine);
 
192
}
 
193
 
 
194
Session *Cursor::ha_session(void) const
 
195
{
 
196
  assert(!table || !table->in_use || table->in_use == current_session);
 
197
  return (table && table->in_use) ? table->in_use : current_session;
 
198
}
 
199
 
198
200
 
199
201
bool Cursor::is_fatal_error(int error, uint32_t flags)
200
202
{
209
211
 
210
212
ha_rows Cursor::records() { return stats.records; }
211
213
uint64_t Cursor::tableSize() { return stats.index_file_length + stats.data_file_length; }
212
 
uint64_t Cursor::rowSize() { return getTable()->getRecordLength() + getTable()->sizeFields(); }
213
 
 
214
 
int Cursor::doOpen(const identifier::Table &identifier, int mode, uint32_t test_if_locked)
215
 
{
216
 
  return open(identifier.getPath().c_str(), mode, test_if_locked);
217
 
}
 
214
uint64_t Cursor::rowSize() { return table->getRecordLength() + table->sizeFields(); }
218
215
 
219
216
/**
220
217
  Open database-Cursor.
222
219
  Try O_RDONLY if cannot open as O_RDWR
223
220
  Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set
224
221
*/
225
 
int Cursor::ha_open(const identifier::Table &identifier,
226
 
                    int mode,
227
 
                    int test_if_locked)
 
222
int Cursor::ha_open(Table *table_arg, const char *name, int mode,
 
223
                     int test_if_locked)
228
224
{
229
225
  int error;
230
226
 
231
 
  if ((error= doOpen(identifier, mode, test_if_locked)))
 
227
  table= table_arg;
 
228
  assert(table->getShare() == table_share);
 
229
 
 
230
  if ((error=open(name, mode, test_if_locked)))
232
231
  {
233
232
    if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
234
 
        (getTable()->db_stat & HA_TRY_READ_ONLY))
 
233
        (table->db_stat & HA_TRY_READ_ONLY))
235
234
    {
236
 
      getTable()->db_stat|=HA_READ_ONLY;
237
 
      error= doOpen(identifier, O_RDONLY,test_if_locked);
 
235
      table->db_stat|=HA_READ_ONLY;
 
236
      error=open(name,O_RDONLY,test_if_locked);
238
237
    }
239
238
  }
240
239
  if (error)
243
242
  }
244
243
  else
245
244
  {
246
 
    if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
247
 
      getTable()->db_stat|=HA_READ_ONLY;
 
245
    if (table->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
 
246
      table->db_stat|=HA_READ_ONLY;
248
247
    (void) extra(HA_EXTRA_NO_READCHECK);        // Not needed in SQL
249
248
 
250
249
    /* ref is already allocated for us if we're called from Cursor::clone() */
251
 
    if (!ref && !(ref= (unsigned char*) getTable()->alloc_root(ALIGN_SIZE(ref_length)*2)))
 
250
    if (!ref && !(ref= (unsigned char*) table->alloc_root(ALIGN_SIZE(ref_length)*2)))
252
251
    {
253
252
      close();
254
253
      error=HA_ERR_OUT_OF_MEM;
267
266
*/
268
267
int Cursor::read_first_row(unsigned char * buf, uint32_t primary_key)
269
268
{
270
 
  int error;
 
269
  register int error;
271
270
 
272
271
  ha_statistic_increment(&system_status_var::ha_read_first_count);
273
272
 
274
273
  /*
275
274
    If there is very few deleted rows in the table, find the first row by
276
275
    scanning the table.
277
 
    @todo remove the test for HA_READ_ORDER
 
276
    TODO remove the test for HA_READ_ORDER
278
277
  */
279
278
  if (stats.deleted < 10 || primary_key >= MAX_KEY ||
280
 
      !(getTable()->index_flags(primary_key) & HA_READ_ORDER))
 
279
      !(table->index_flags(primary_key) & HA_READ_ORDER))
281
280
  {
282
 
    error= startTableScan(1);
283
 
    if (error == 0)
284
 
    {
285
 
      while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
286
 
      (void) endTableScan();
287
 
    }
 
281
    (void) startTableScan(1);
 
282
    while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
 
283
    (void) endTableScan();
288
284
  }
289
285
  else
290
286
  {
291
287
    /* Find the first row through the primary key */
292
 
    error= startIndexScan(primary_key, 0);
293
 
    if (error == 0)
294
 
    {
295
 
      error=index_first(buf);
296
 
      (void) endIndexScan();
297
 
    }
 
288
    (void) startIndexScan(primary_key, 0);
 
289
    error=index_first(buf);
 
290
    (void) endIndexScan();
298
291
  }
299
292
  return error;
300
293
}
311
304
  @verbatim 1,5,15,25,35,... @endverbatim
312
305
*/
313
306
inline uint64_t
314
 
compute_next_insert_id(uint64_t nr, drizzle_system_variables *variables)
 
307
compute_next_insert_id(uint64_t nr,struct system_variables *variables)
315
308
{
316
309
  if (variables->auto_increment_increment == 1)
317
310
    return (nr+1); // optimization of the formula below
331
324
    Session::next_insert_id to be greater than the explicit value.
332
325
  */
333
326
  if ((next_insert_id > 0) && (nr >= next_insert_id))
334
 
    set_next_insert_id(compute_next_insert_id(nr, &getTable()->in_use->variables));
 
327
    set_next_insert_id(compute_next_insert_id(nr, &table->in_use->variables));
335
328
}
336
329
 
337
330
 
351
344
    The number X if it exists, "nr" otherwise.
352
345
*/
353
346
inline uint64_t
354
 
prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
 
347
prev_insert_id(uint64_t nr, struct system_variables *variables)
355
348
{
356
349
  if (unlikely(nr < variables->auto_increment_offset))
357
350
  {
448
441
{
449
442
  uint64_t nr, nb_reserved_values;
450
443
  bool append= false;
451
 
  Session *session= getTable()->in_use;
452
 
  drizzle_system_variables *variables= &session->variables;
 
444
  Session *session= table->in_use;
 
445
  struct system_variables *variables= &session->variables;
453
446
 
454
447
  /*
455
448
    next_insert_id is a "cursor" into the reserved interval, it may go greater
461
454
     for an auto increment column, not a magic value like NULL is.
462
455
     same as sql_mode=NO_AUTO_VALUE_ON_ZERO */
463
456
 
464
 
  if ((nr= getTable()->next_number_field->val_int()) != 0
465
 
      || getTable()->auto_increment_field_not_null)
 
457
  if ((nr= table->next_number_field->val_int()) != 0
 
458
      || table->auto_increment_field_not_null)
466
459
  {
467
460
    /*
468
461
      Update next_insert_id if we had already generated a value in this
540
533
      nr= compute_next_insert_id(nr-1, variables);
541
534
    }
542
535
 
543
 
    if (getTable()->getShare()->next_number_keypart == 0)
 
536
    if (table->getShare()->next_number_keypart == 0)
544
537
    {
545
538
      /* We must defer the appending until "nr" has been possibly truncated */
546
539
      append= true;
547
540
    }
548
541
  }
549
542
 
550
 
  if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
 
543
  if (unlikely(table->next_number_field->store((int64_t) nr, true)))
551
544
  {
552
545
    /*
553
546
      first test if the query was aborted due to strict mode constraints
554
547
    */
555
 
    if (session->getKilled() == Session::KILL_BAD_DATA)
 
548
    if (session->killed == Session::KILL_BAD_DATA)
556
549
      return HA_ERR_AUTOINC_ERANGE;
557
550
 
558
551
    /*
563
556
      bother shifting the right bound (anyway any other value from this
564
557
      interval will cause a duplicate key).
565
558
    */
566
 
    nr= prev_insert_id(getTable()->next_number_field->val_int(), variables);
567
 
    if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
568
 
      nr= getTable()->next_number_field->val_int();
 
559
    nr= prev_insert_id(table->next_number_field->val_int(), variables);
 
560
    if (unlikely(table->next_number_field->store((int64_t) nr, true)))
 
561
      nr= table->next_number_field->val_int();
569
562
  }
570
563
  if (append)
571
564
  {
619
612
      this statement used forced auto_increment values if there were some,
620
613
      wipe them away for other statements.
621
614
    */
622
 
    getTable()->in_use->auto_inc_intervals_forced.empty();
 
615
    table->in_use->auto_inc_intervals_forced.empty();
623
616
  }
624
617
}
625
618
 
658
651
void
659
652
Cursor::setTransactionReadWrite()
660
653
{
661
 
  ResourceContext *resource_context;
662
 
 
663
 
  /*
664
 
   * If the cursor has not context for execution then there should be no
665
 
   * possible resource to gain (and if there is... then there is a bug such
666
 
   * that in_use should have been set.
667
 
 */
668
 
  if (not getTable()->in_use)
669
 
    return;
670
 
 
671
 
  resource_context= getTable()->in_use->getResourceContext(getEngine());
 
654
  ResourceContext *resource_context= ha_session()->getResourceContext(engine);
672
655
  /*
673
656
    When a storage engine method is called, the transaction must
674
657
    have been started, unless it's a DDL call, for which the
709
692
     * @todo Make TransactionServices generic to AfterTriggerServices
710
693
     * or similar...
711
694
     */
712
 
    Session *const session= getTable()->in_use;
 
695
    Session *const session= table->in_use;
713
696
    TransactionServices &transaction_services= TransactionServices::singleton();
714
 
    transaction_services.truncateTable(*session, *getTable());
 
697
    transaction_services.truncateTable(session, table);
715
698
  }
716
699
 
717
700
  return result;
810
793
  int error;
811
794
  if (!(error=index_next(buf)))
812
795
  {
813
 
    ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
 
796
    ptrdiff_t ptrdiff= buf - table->record[0];
814
797
    unsigned char *save_record_0= NULL;
815
798
    KeyInfo *key_info= NULL;
816
799
    KeyPartInfo *key_part;
817
800
    KeyPartInfo *key_part_end= NULL;
818
801
 
819
802
    /*
820
 
      key_cmp_if_same() compares table->getInsertRecord() against 'key'.
821
 
      In parts it uses table->getInsertRecord() directly, in parts it uses
822
 
      field objects with their local pointers into table->getInsertRecord().
823
 
      If 'buf' is distinct from table->getInsertRecord(), we need to move
824
 
      all record references. This is table->getInsertRecord() itself and
 
803
      key_cmp_if_same() compares table->record[0] against 'key'.
 
804
      In parts it uses table->record[0] directly, in parts it uses
 
805
      field objects with their local pointers into table->record[0].
 
806
      If 'buf' is distinct from table->record[0], we need to move
 
807
      all record references. This is table->record[0] itself and
825
808
      the field pointers of the fields used in this key.
826
809
    */
827
810
    if (ptrdiff)
828
811
    {
829
 
      save_record_0= getTable()->getInsertRecord();
830
 
      getTable()->record[0]= buf;
831
 
      key_info= getTable()->key_info + active_index;
 
812
      save_record_0= table->record[0];
 
813
      table->record[0]= buf;
 
814
      key_info= table->key_info + active_index;
832
815
      key_part= key_info->key_part;
833
816
      key_part_end= key_part + key_info->key_parts;
834
817
      for (; key_part < key_part_end; key_part++)
838
821
      }
839
822
    }
840
823
 
841
 
    if (key_cmp_if_same(getTable(), key, active_index, keylen))
 
824
    if (key_cmp_if_same(table, key, active_index, keylen))
842
825
    {
843
 
      getTable()->status=STATUS_NOT_FOUND;
 
826
      table->status=STATUS_NOT_FOUND;
844
827
      error=HA_ERR_END_OF_FILE;
845
828
    }
846
829
 
847
830
    /* Move back if necessary. */
848
831
    if (ptrdiff)
849
832
    {
850
 
      getTable()->record[0]= save_record_0;
 
833
      table->record[0]= save_record_0;
851
834
      for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
852
835
        key_part->field->move_field_offset(-ptrdiff);
853
836
    }
884
867
double Cursor::index_only_read_time(uint32_t keynr, double key_records)
885
868
{
886
869
  uint32_t keys_per_block= (stats.block_size/2/
887
 
                        (getTable()->key_info[keynr].key_length + ref_length) + 1);
 
870
                        (table->key_info[keynr].key_length + ref_length) + 1);
888
871
  return ((double) (key_records + keys_per_block-1) /
889
872
          (double) keys_per_block);
890
873
}
913
896
 
914
897
  @note
915
898
    This method (or an overriding one in a derived class) must check for
916
 
    session->getKilled() and return HA_POS_ERROR if it is not zero. This is required
 
899
    session->killed and return HA_POS_ERROR if it is not zero. This is required
917
900
    for a user to be able to interrupt the calculation by killing the
918
901
    connection/query.
919
902
 
935
918
  range_seq_t seq_it;
936
919
  ha_rows rows, total_rows= 0;
937
920
  uint32_t n_ranges=0;
 
921
  Session *session= current_session;
938
922
 
939
923
  /* Default MRR implementation doesn't need buffer */
940
924
  *bufsz= 0;
942
926
  seq_it= seq->init(seq_init_param, n_ranges, *flags);
943
927
  while (!seq->next(seq_it, &range))
944
928
  {
 
929
    if (unlikely(session->killed != 0))
 
930
      return HA_POS_ERROR;
 
931
 
945
932
    n_ranges++;
946
933
    key_range *min_endp, *max_endp;
947
934
    {
1166
1153
  @param sorted         Set to 1 if result should be sorted per key
1167
1154
 
1168
1155
  @note
1169
 
    Record is read into table->getInsertRecord()
 
1156
    Record is read into table->record[0]
1170
1157
 
1171
1158
  @retval
1172
1159
    0                   Found row
1191
1178
    key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
1192
1179
                                  (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
1193
1180
  }
1194
 
  range_key_part= getTable()->key_info[active_index].key_part;
 
1181
  range_key_part= table->key_info[active_index].key_part;
1195
1182
 
1196
1183
  if (!start_key)                       // Read first record
1197
 
    result= index_first(getTable()->getInsertRecord());
 
1184
    result= index_first(table->record[0]);
1198
1185
  else
1199
 
    result= index_read_map(getTable()->getInsertRecord(),
 
1186
    result= index_read_map(table->record[0],
1200
1187
                           start_key->key,
1201
1188
                           start_key->keypart_map,
1202
1189
                           start_key->flag);
1213
1200
  Read next row between two endpoints.
1214
1201
 
1215
1202
  @note
1216
 
    Record is read into table->getInsertRecord()
 
1203
    Record is read into table->record[0]
1217
1204
 
1218
1205
  @retval
1219
1206
    0                   Found row
1229
1216
  if (eq_range)
1230
1217
  {
1231
1218
    /* We trust that index_next_same always gives a row in range */
1232
 
    return(index_next_same(getTable()->getInsertRecord(),
 
1219
    return(index_next_same(table->record[0],
1233
1220
                                end_range->key,
1234
1221
                                end_range->length));
1235
1222
  }
1236
 
  result= index_next(getTable()->getInsertRecord());
 
1223
  result= index_next(table->record[0]);
1237
1224
  if (result)
1238
1225
    return result;
1239
1226
  return(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
1295
1282
  TransactionServices &transaction_services= TransactionServices::singleton();
1296
1283
  Session *const session= table->in_use;
1297
1284
 
1298
 
  if (table->getShare()->getType() || not transaction_services.shouldConstructMessages())
 
1285
  if (table->getShare()->tmp_table || not transaction_services.shouldConstructMessages())
1299
1286
    return false;
1300
1287
 
1301
1288
  bool result= false;
1302
1289
 
1303
 
  switch (session->getLex()->sql_command)
 
1290
  switch (session->lex->sql_command)
1304
1291
  {
1305
1292
  case SQLCOM_CREATE_TABLE:
1306
1293
    /*
1312
1299
     * CREATE TABLE will commit the transaction containing
1313
1300
     * it).
1314
1301
     */
1315
 
    result= transaction_services.insertRecord(*session, *table);
 
1302
    result= transaction_services.insertRecord(session, table);
1316
1303
    break;
1317
1304
  case SQLCOM_REPLACE:
1318
1305
  case SQLCOM_REPLACE_SELECT:
1323
1310
     * called.  If it fails, then a call to deleteRecord()
1324
1311
     * is called, followed by a repeat of the original
1325
1312
     * call to insertRecord().  So, log_row_for_replication
1326
 
     * could be called multiple times for a REPLACE
 
1313
     * could be called either once or twice for a REPLACE
1327
1314
     * statement.  The below looks at the values of before_record
1328
1315
     * and after_record to determine which call to this
1329
1316
     * function is for the delete or the insert, since NULL
1336
1323
     */
1337
1324
    if (after_record == NULL)
1338
1325
    {
1339
 
      /*
1340
 
       * The storage engine is passed the record in table->record[1]
1341
 
       * as the row to delete (this is the conflicting row), so
1342
 
       * we need to notify TransactionService to use that row.
1343
 
       */
1344
 
      transaction_services.deleteRecord(*session, *table, true);
 
1326
      transaction_services.deleteRecord(session, table);
1345
1327
      /* 
1346
1328
       * We set the "current" statement message to NULL.  This triggers
1347
1329
       * the replication services component to generate a new statement
1348
1330
       * message for the inserted record which will come next.
1349
1331
       */
1350
 
      transaction_services.finalizeStatementMessage(*session->getStatementMessage(), *session);
 
1332
      transaction_services.finalizeStatementMessage(*session->getStatementMessage(), session);
1351
1333
    }
1352
1334
    else
1353
1335
    {
1354
1336
      if (before_record == NULL)
1355
 
        result= transaction_services.insertRecord(*session, *table);
 
1337
        result= transaction_services.insertRecord(session, table);
1356
1338
      else
1357
 
        transaction_services.updateRecord(*session, *table, before_record, after_record);
 
1339
        transaction_services.updateRecord(session, table, before_record, after_record);
1358
1340
    }
1359
1341
    break;
1360
1342
  case SQLCOM_INSERT:
1361
1343
  case SQLCOM_INSERT_SELECT:
1362
 
  case SQLCOM_LOAD:
1363
1344
    /*
1364
1345
     * The else block below represents an 
1365
1346
     * INSERT ... ON DUPLICATE KEY UPDATE that
1367
1348
     * an update.
1368
1349
     */
1369
1350
    if (before_record == NULL)
1370
 
      result= transaction_services.insertRecord(*session, *table);
 
1351
      result= transaction_services.insertRecord(session, table);
1371
1352
    else
1372
 
      transaction_services.updateRecord(*session, *table, before_record, after_record);
 
1353
      transaction_services.updateRecord(session, table, before_record, after_record);
1373
1354
    break;
1374
1355
 
1375
1356
  case SQLCOM_UPDATE:
1376
 
    transaction_services.updateRecord(*session, *table, before_record, after_record);
 
1357
    transaction_services.updateRecord(session, table, before_record, after_record);
1377
1358
    break;
1378
1359
 
1379
1360
  case SQLCOM_DELETE:
1380
 
    transaction_services.deleteRecord(*session, *table);
 
1361
    transaction_services.deleteRecord(session, table);
1381
1362
    break;
1382
1363
  default:
1383
1364
    break;
1401
1382
  {
1402
1383
    if (lock_type == F_RDLCK)
1403
1384
    {
1404
 
      DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
1405
 
                                  getTable()->getShare()->getTableName());
 
1385
      DRIZZLE_CURSOR_RDLOCK_START(table_share->getSchemaName(),
 
1386
                                  table_share->getTableName());
1406
1387
    }
1407
1388
    else if (lock_type == F_WRLCK)
1408
1389
    {
1409
 
      DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
1410
 
                                  getTable()->getShare()->getTableName());
 
1390
      DRIZZLE_CURSOR_WRLOCK_START(table_share->getSchemaName(),
 
1391
                                  table_share->getTableName());
1411
1392
    }
1412
1393
    else if (lock_type == F_UNLCK)
1413
1394
    {
1414
 
      DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
1415
 
                                  getTable()->getShare()->getTableName());
 
1395
      DRIZZLE_CURSOR_UNLOCK_START(table_share->getSchemaName(),
 
1396
                                  table_share->getTableName());
1416
1397
    }
1417
1398
  }
1418
1399
 
1451
1432
int Cursor::ha_reset()
1452
1433
{
1453
1434
  /* Check that we have called all proper deallocation functions */
1454
 
  assert(! getTable()->getShare()->all_set.none());
1455
 
  assert(getTable()->key_read == 0);
 
1435
  assert((unsigned char*) table->def_read_set.getBitmap() +
 
1436
              table->getShare()->column_bitmap_size ==
 
1437
              (unsigned char*) table->def_write_set.getBitmap());
 
1438
  assert(table->getShare()->all_set.isSetAll());
 
1439
  assert(table->key_read == 0);
1456
1440
  /* ensure that ha_index_end / endTableScan has been called */
1457
1441
  assert(inited == NONE);
1458
1442
  /* Free cache used by filesort */
1459
 
  getTable()->free_io_cache();
 
1443
  table->free_io_cache();
1460
1444
  /* reset the bitmaps to point to defaults */
1461
 
  getTable()->default_column_bitmaps();
 
1445
  table->default_column_bitmaps();
1462
1446
  return(reset());
1463
1447
}
1464
1448
 
1473
1457
   * @TODO Technically, the below two lines can be take even further out of the
1474
1458
   * Cursor interface and into the fill_record() method.
1475
1459
   */
1476
 
  if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
1477
 
  {
1478
 
    getTable()->timestamp_field->set_time();
1479
 
  }
 
1460
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
1461
    table->timestamp_field->set_time();
1480
1462
 
1481
 
  DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
 
1463
  DRIZZLE_INSERT_ROW_START(table_share->getSchemaName(), table_share->getTableName());
1482
1464
  setTransactionReadWrite();
1483
1465
  
1484
 
  if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
 
1466
  if (unlikely(plugin::EventObserver::beforeInsertRecord(*(table->in_use), *table_share, buf)))
1485
1467
  {
1486
1468
    error= ER_EVENT_OBSERVER_PLUGIN;
1487
1469
  }
1488
1470
  else
1489
1471
  {
1490
1472
    error= doInsertRecord(buf);
1491
 
    if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error))) 
 
1473
    if (unlikely(plugin::EventObserver::afterInsertRecord(*(table->in_use), *table_share, buf, error))) 
1492
1474
    {
1493
1475
      error= ER_EVENT_OBSERVER_PLUGIN;
1494
1476
    }
1495
1477
  }
1496
 
 
1497
 
  ha_statistic_increment(&system_status_var::ha_write_count);
1498
 
 
 
1478
 
1499
1479
  DRIZZLE_INSERT_ROW_DONE(error);
1500
1480
 
1501
1481
  if (unlikely(error))
1503
1483
    return error;
1504
1484
  }
1505
1485
 
1506
 
  if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
 
1486
  if (unlikely(log_row_for_replication(table, NULL, buf)))
1507
1487
    return HA_ERR_RBR_LOGGING_FAILED;
1508
1488
 
1509
1489
  return 0;
1515
1495
  int error;
1516
1496
 
1517
1497
  /*
1518
 
    Some storage engines require that the new record is in getInsertRecord()
1519
 
    (and the old record is in getUpdateRecord()).
 
1498
    Some storage engines require that the new record is in record[0]
 
1499
    (and the old record is in record[1]).
1520
1500
   */
1521
 
  assert(new_data == getTable()->getInsertRecord());
 
1501
  assert(new_data == table->record[0]);
1522
1502
 
1523
 
  DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
 
1503
  DRIZZLE_UPDATE_ROW_START(table_share->getSchemaName(), table_share->getTableName());
1524
1504
  setTransactionReadWrite();
1525
 
  if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
 
1505
  if (unlikely(plugin::EventObserver::beforeUpdateRecord(*(table->in_use), *table_share, old_data, new_data)))
1526
1506
  {
1527
1507
    error= ER_EVENT_OBSERVER_PLUGIN;
1528
1508
  }
1529
1509
  else
1530
1510
  {
1531
 
    if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1532
 
    {
1533
 
      getTable()->timestamp_field->set_time();
1534
 
    }
1535
 
 
1536
1511
    error= doUpdateRecord(old_data, new_data);
1537
 
    if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
 
1512
    if (unlikely(plugin::EventObserver::afterUpdateRecord(*(table->in_use), *table_share, old_data, new_data, error)))
1538
1513
    {
1539
1514
      error= ER_EVENT_OBSERVER_PLUGIN;
1540
1515
    }
1541
1516
  }
1542
1517
 
1543
 
  ha_statistic_increment(&system_status_var::ha_update_count);
1544
 
 
1545
1518
  DRIZZLE_UPDATE_ROW_DONE(error);
1546
1519
 
1547
1520
  if (unlikely(error))
1549
1522
    return error;
1550
1523
  }
1551
1524
 
1552
 
  if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
 
1525
  if (unlikely(log_row_for_replication(table, old_data, new_data)))
1553
1526
    return HA_ERR_RBR_LOGGING_FAILED;
1554
1527
 
1555
1528
  return 0;
1556
1529
}
1557
 
TableShare *Cursor::getShare()
1558
 
{
1559
 
  return getTable()->getMutableShare();
1560
 
}
1561
1530
 
1562
1531
int Cursor::deleteRecord(const unsigned char *buf)
1563
1532
{
1564
1533
  int error;
1565
1534
 
1566
 
  DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
 
1535
  DRIZZLE_DELETE_ROW_START(table_share->getSchemaName(), table_share->getTableName());
1567
1536
  setTransactionReadWrite();
1568
 
  if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
 
1537
  if (unlikely(plugin::EventObserver::beforeDeleteRecord(*(table->in_use), *table_share, buf)))
1569
1538
  {
1570
1539
    error= ER_EVENT_OBSERVER_PLUGIN;
1571
1540
  }
1572
1541
  else
1573
1542
  {
1574
1543
    error= doDeleteRecord(buf);
1575
 
    if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
 
1544
    if (unlikely(plugin::EventObserver::afterDeleteRecord(*(table->in_use), *table_share, buf, error)))
1576
1545
    {
1577
1546
      error= ER_EVENT_OBSERVER_PLUGIN;
1578
1547
    }
1579
1548
  }
1580
1549
 
1581
 
  ha_statistic_increment(&system_status_var::ha_delete_count);
1582
 
 
1583
1550
  DRIZZLE_DELETE_ROW_DONE(error);
1584
1551
 
1585
1552
  if (unlikely(error))
1586
1553
    return error;
1587
1554
 
1588
 
  if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
 
1555
  if (unlikely(log_row_for_replication(table, buf, NULL)))
1589
1556
    return HA_ERR_RBR_LOGGING_FAILED;
1590
1557
 
1591
1558
  return 0;