~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/records.cc

  • Committer: Patrick Crews
  • Date: 2011-01-29 14:17:35 UTC
  • mto: (2126.1.1 build)
  • mto: This revision was merged to the branch mainline in revision 2127.
  • Revision ID: gleebix@gmail.com-20110129141735-3y2658vt5ur0a33o
Fixes to make test-dbqp

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
/**
17
17
  @file
27
27
#include "drizzled/optimizer/range.h"
28
28
#include "drizzled/internal/my_sys.h"
29
29
#include "drizzled/internal/iocache.h"
 
30
#include "drizzled/drizzled.h"
30
31
 
31
32
namespace drizzled
32
33
{
33
34
 
34
 
int rr_sequential(READ_RECORD *info);
35
 
static int rr_quick(READ_RECORD *info);
36
 
static int rr_from_tempfile(READ_RECORD *info);
37
 
static int rr_unpack_from_tempfile(READ_RECORD *info);
38
 
static int rr_unpack_from_buffer(READ_RECORD *info);
39
 
static int rr_from_pointers(READ_RECORD *info);
40
 
static int rr_from_cache(READ_RECORD *info);
41
 
static int init_rr_cache(Session *session, READ_RECORD *info);
 
35
static int rr_sequential(ReadRecord *info);
 
36
static int rr_quick(ReadRecord *info);
 
37
static int rr_from_tempfile(ReadRecord *info);
 
38
static int rr_unpack_from_tempfile(ReadRecord *info);
 
39
static int rr_unpack_from_buffer(ReadRecord *info);
 
40
static int rr_from_pointers(ReadRecord *info);
 
41
static int rr_from_cache(ReadRecord *info);
42
42
static int rr_cmp(unsigned char *a,unsigned char *b);
43
 
static int rr_index_first(READ_RECORD *info);
44
 
static int rr_index(READ_RECORD *info);
45
 
 
46
 
void init_read_record_idx(READ_RECORD *info, 
47
 
                          Session *, 
48
 
                          Table *table,
49
 
                          bool print_error, 
50
 
                          uint32_t idx)
51
 
{
52
 
  table->emptyRecord();
53
 
  memset(info, 0, sizeof(*info));
54
 
  info->table= table;
55
 
  info->cursor=  table->cursor;
56
 
  info->record= table->record[0];
57
 
  info->print_error= print_error;
 
43
static int rr_index_first(ReadRecord *info);
 
44
static int rr_index(ReadRecord *info);
 
45
 
 
46
void ReadRecord::init_reard_record_sequential()
 
47
{
 
48
  read_record= rr_sequential;
 
49
}
 
50
 
 
51
int ReadRecord::init_read_record_idx(Session *,
 
52
                                     Table *table_arg,
 
53
                                     bool print_error_arg,
 
54
                                     uint32_t idx)
 
55
{
 
56
  table_arg->emptyRecord();
 
57
  table= table_arg;
 
58
  cursor=  table->cursor;
 
59
  record= table->getInsertRecord();
 
60
  print_error= print_error_arg;
58
61
 
59
62
  table->status=0;                      /* And it's always found */
60
 
  if (!table->cursor->inited)
61
 
    table->cursor->ha_index_init(idx, 1);
 
63
  if (not table->cursor->inited)
 
64
  {
 
65
    int error= table->cursor->startIndexScan(idx, 1);
 
66
    if (error != 0)
 
67
      return error;
 
68
  }
62
69
  /* read_record will be changed to rr_index in rr_index_first */
63
 
  info->read_record= rr_index_first;
 
70
  read_record= rr_index_first;
 
71
 
 
72
  return 0;
64
73
}
65
74
 
66
75
 
67
 
void init_read_record(READ_RECORD *info,
68
 
                      Session *session, 
69
 
                      Table *table,
70
 
                      optimizer::SqlSelect *select,
71
 
                      int use_record_cache, 
72
 
                      bool print_error)
 
76
int ReadRecord::init_read_record(Session *session_arg,
 
77
                                 Table *table_arg,
 
78
                                 optimizer::SqlSelect *select_arg,
 
79
                                 int use_record_cache,
 
80
                                 bool print_error_arg)
73
81
{
74
82
  internal::IO_CACHE *tempfile;
 
83
  int error= 0;
75
84
 
76
 
  memset(info, 0, sizeof(*info));
77
 
  info->session=session;
78
 
  info->table=table;
79
 
  info->cursor= table->cursor;
80
 
  info->forms= &info->table;            /* Only one table */
 
85
  session= session_arg;
 
86
  table= table_arg;
 
87
  cursor= table->cursor;
 
88
  forms= &table;                /* Only one table */
81
89
 
82
90
  if (table->sort.addon_field)
83
91
  {
84
 
    info->rec_buf= table->sort.addon_buf;
85
 
    info->ref_length= table->sort.addon_length;
 
92
    rec_buf= table->sort.addon_buf;
 
93
    ref_length= table->sort.addon_length;
86
94
  }
87
95
  else
88
96
  {
89
97
    table->emptyRecord();
90
 
    info->record= table->record[0];
91
 
    info->ref_length= table->cursor->ref_length;
 
98
    record= table->getInsertRecord();
 
99
    ref_length= table->cursor->ref_length;
92
100
  }
93
 
  info->select=select;
94
 
  info->print_error= print_error;
95
 
  info->ignore_not_found_rows= 0;
 
101
  select= select_arg;
 
102
  print_error= print_error_arg;
 
103
  ignore_not_found_rows= 0;
96
104
  table->status=0;                      /* And it's always found */
97
105
 
98
106
  if (select && my_b_inited(select->file))
 
107
  {
99
108
    tempfile= select->file;
 
109
  }
100
110
  else
 
111
  {
101
112
    tempfile= table->sort.io_cache;
 
113
  }
 
114
 
102
115
  if (tempfile && my_b_inited(tempfile)) // Test if ref-records was used
103
116
  {
104
 
    info->read_record= (table->sort.addon_field ?
105
 
                        rr_unpack_from_tempfile : rr_from_tempfile);
106
 
    info->io_cache=tempfile;
107
 
    reinit_io_cache(info->io_cache,internal::READ_CACHE,0L,0,0);
108
 
    info->ref_pos=table->cursor->ref;
 
117
    read_record= (table->sort.addon_field ?
 
118
                  rr_unpack_from_tempfile : rr_from_tempfile);
 
119
 
 
120
    io_cache=tempfile;
 
121
    io_cache->reinit_io_cache(internal::READ_CACHE,0L,0,0);
 
122
    ref_pos=table->cursor->ref;
109
123
    if (!table->cursor->inited)
110
 
      table->cursor->ha_rnd_init(0);
 
124
    {
 
125
      error= table->cursor->startTableScan(0);
 
126
      if (error != 0)
 
127
        return error;
 
128
    }
111
129
 
112
130
    /*
113
131
      table->sort.addon_field is checked because if we use addon fields,
119
137
        !(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) &&
120
138
        (table->db_stat & HA_READ_ONLY ||
121
139
        table->reginfo.lock_type <= TL_READ_NO_INSERT) &&
122
 
        (uint64_t) table->s->reclength* (table->cursor->stats.records+
 
140
        (uint64_t) table->getShare()->getRecordLength() * (table->cursor->stats.records+
123
141
                                                table->cursor->stats.deleted) >
124
142
        (uint64_t) MIN_FILE_LENGTH_TO_USE_ROW_CACHE &&
125
 
        info->io_cache->end_of_file/info->ref_length * table->s->reclength >
 
143
        io_cache->end_of_file/ref_length * table->getShare()->getRecordLength() >
126
144
        (internal::my_off_t) MIN_ROWS_TO_USE_TABLE_CACHE &&
127
 
        !table->s->blob_fields &&
128
 
        info->ref_length <= MAX_REFLENGTH)
 
145
        !table->getShare()->blob_fields &&
 
146
        ref_length <= MAX_REFLENGTH)
129
147
    {
130
 
      if (! init_rr_cache(session, info))
 
148
      if (init_rr_cache())
131
149
      {
132
 
        info->read_record=rr_from_cache;
 
150
        read_record= rr_from_cache;
133
151
      }
134
152
    }
135
153
  }
136
154
  else if (select && select->quick)
137
155
  {
138
 
    info->read_record=rr_quick;
 
156
    read_record= rr_quick;
139
157
  }
140
158
  else if (table->sort.record_pointers)
141
159
  {
142
 
    table->cursor->ha_rnd_init(0);
143
 
    info->cache_pos=table->sort.record_pointers;
144
 
    info->cache_end=info->cache_pos+
145
 
                    table->sort.found_records*info->ref_length;
146
 
    info->read_record= (table->sort.addon_field ?
147
 
                        rr_unpack_from_buffer : rr_from_pointers);
 
160
    error= table->cursor->startTableScan(0);
 
161
    if (error != 0)
 
162
      return error;
 
163
 
 
164
    cache_pos=table->sort.record_pointers;
 
165
    cache_end= cache_pos+ table->sort.found_records * ref_length;
 
166
    read_record= (table->sort.addon_field ?  rr_unpack_from_buffer : rr_from_pointers);
148
167
  }
149
168
  else
150
169
  {
151
 
    info->read_record= rr_sequential;
152
 
    table->cursor->ha_rnd_init(1);
 
170
    read_record= rr_sequential;
 
171
    error= table->cursor->startTableScan(1);
 
172
    if (error != 0)
 
173
      return error;
 
174
 
153
175
    /* We can use record cache if we don't update dynamic length tables */
154
176
    if (!table->no_cache &&
155
177
        (use_record_cache > 0 ||
156
178
        (int) table->reginfo.lock_type <= (int) TL_READ_WITH_SHARED_LOCKS ||
157
 
        !(table->s->db_options_in_use & HA_OPTION_PACK_RECORD)))
 
179
        !(table->getShare()->db_options_in_use & HA_OPTION_PACK_RECORD)))
 
180
    {
158
181
      table->cursor->extra_opt(HA_EXTRA_CACHE, session->variables.read_buff_size);
 
182
    }
159
183
  }
160
184
 
161
 
  return;
 
185
  return 0;
162
186
} /* init_read_record */
163
187
 
164
188
 
165
 
void end_read_record(READ_RECORD *info)
 
189
void ReadRecord::end_read_record()
166
190
{                   /* free cache if used */
167
 
  if (info->cache)
 
191
  if (cache)
168
192
  {
169
 
    free((char*) info->cache);
170
 
    info->cache=0;
 
193
    global_read_rnd_buffer.sub(session->variables.read_rnd_buff_size);
 
194
    free((char*) cache);
 
195
    cache= NULL;
171
196
  }
172
 
  if (info->table)
 
197
  if (table)
173
198
  {
174
 
    info->table->filesort_free_buffers();
175
 
    (void) info->cursor->extra(HA_EXTRA_NO_CACHE);
176
 
    if (info->read_record != rr_quick) // otherwise quick_range does it
177
 
      (void) info->cursor->ha_index_or_rnd_end();
178
 
    info->table=0;
 
199
    table->filesort_free_buffers();
 
200
    (void) cursor->extra(HA_EXTRA_NO_CACHE);
 
201
    if (read_record != rr_quick) // otherwise quick_range does it
 
202
      (void) cursor->ha_index_or_rnd_end();
 
203
 
 
204
    table= NULL;
179
205
  }
180
206
}
181
207
 
182
 
static int rr_handle_error(READ_RECORD *info, int error)
 
208
static int rr_handle_error(ReadRecord *info, int error)
183
209
{
184
210
  if (error == HA_ERR_END_OF_FILE)
185
211
    error= -1;
194
220
}
195
221
 
196
222
/** Read a record from head-database. */
197
 
static int rr_quick(READ_RECORD *info)
 
223
static int rr_quick(ReadRecord *info)
198
224
{
199
225
  int tmp;
200
226
  while ((tmp= info->select->quick->get_next()))
201
227
  {
202
 
    if (info->session->killed)
 
228
    if (info->session->getKilled())
203
229
    {
204
230
      my_error(ER_SERVER_SHUTDOWN, MYF(0));
205
231
      return 1;
226
252
  @retval
227
253
    1   Error
228
254
*/
229
 
static int rr_index_first(READ_RECORD *info)
 
255
static int rr_index_first(ReadRecord *info)
230
256
{
231
257
  int tmp= info->cursor->index_first(info->record);
232
258
  info->read_record= rr_index;
250
276
  @retval
251
277
    1   Error
252
278
*/
253
 
static int rr_index(READ_RECORD *info)
 
279
static int rr_index(ReadRecord *info)
254
280
{
255
281
  int tmp= info->cursor->index_next(info->record);
256
282
  if (tmp)
258
284
  return tmp;
259
285
}
260
286
 
261
 
int rr_sequential(READ_RECORD *info)
 
287
int rr_sequential(ReadRecord *info)
262
288
{
263
289
  int tmp;
264
290
  while ((tmp= info->cursor->rnd_next(info->record)))
265
291
  {
266
 
    if (info->session->killed)
 
292
    if (info->session->getKilled())
267
293
    {
268
294
      info->session->send_kill_message();
269
295
      return 1;
283
309
  return tmp;
284
310
}
285
311
 
286
 
static int rr_from_tempfile(READ_RECORD *info)
 
312
static int rr_from_tempfile(ReadRecord *info)
287
313
{
288
314
  int tmp;
289
315
  for (;;)
317
343
  @retval
318
344
    -1   There is no record to be read anymore.
319
345
*/
320
 
static int rr_unpack_from_tempfile(READ_RECORD *info)
 
346
static int rr_unpack_from_tempfile(ReadRecord *info)
321
347
{
322
348
  if (my_b_read(info->io_cache, info->rec_buf, info->ref_length))
323
349
    return -1;
327
353
  return 0;
328
354
}
329
355
 
330
 
static int rr_from_pointers(READ_RECORD *info)
 
356
static int rr_from_pointers(ReadRecord *info)
331
357
{
332
358
  int tmp;
333
359
  unsigned char *cache_pos;
334
360
 
 
361
 
335
362
  for (;;)
336
363
  {
337
364
    if (info->cache_pos == info->cache_end)
367
394
  @retval
368
395
    -1   There is no record to be read anymore.
369
396
*/
370
 
static int rr_unpack_from_buffer(READ_RECORD *info)
 
397
static int rr_unpack_from_buffer(ReadRecord *info)
371
398
{
372
399
  if (info->cache_pos == info->cache_end)
373
400
    return -1;                      /* End of buffer */
379
406
}
380
407
 
381
408
/* cacheing of records from a database */
382
 
static int init_rr_cache(Session *session, READ_RECORD *info)
 
409
bool ReadRecord::init_rr_cache()
383
410
{
384
 
  uint32_t rec_cache_size;
385
 
 
386
 
  info->struct_length= 3+MAX_REFLENGTH;
387
 
  info->reclength= ALIGN_SIZE(info->table->s->reclength+1);
388
 
  if (info->reclength < info->struct_length)
389
 
    info->reclength= ALIGN_SIZE(info->struct_length);
390
 
 
391
 
  info->error_offset= info->table->s->reclength;
392
 
  info->cache_records= (session->variables.read_rnd_buff_size /
393
 
                        (info->reclength+info->struct_length));
394
 
  rec_cache_size= info->cache_records*info->reclength;
395
 
  info->rec_cache_size= info->cache_records*info->ref_length;
 
411
  uint32_t local_rec_cache_size;
 
412
 
 
413
  struct_length= 3 + MAX_REFLENGTH;
 
414
  reclength= ALIGN_SIZE(table->getShare()->getRecordLength() + 1);
 
415
  if (reclength < struct_length)
 
416
    reclength= ALIGN_SIZE(struct_length);
 
417
 
 
418
  error_offset= table->getShare()->getRecordLength();
 
419
  cache_records= (session->variables.read_rnd_buff_size /
 
420
                        (reclength + struct_length));
 
421
  local_rec_cache_size= cache_records * reclength;
 
422
  rec_cache_size= cache_records * ref_length;
 
423
 
 
424
  if (not global_read_rnd_buffer.add(session->variables.read_rnd_buff_size))
 
425
  {
 
426
    my_error(ER_OUT_OF_GLOBAL_READRNDMEMORY, MYF(ME_ERROR+ME_WAITTANG));
 
427
    return false;
 
428
  }
396
429
 
397
430
  // We have to allocate one more byte to use uint3korr (see comments for it)
398
 
  if (info->cache_records <= 2 ||
399
 
      !(info->cache=(unsigned char*) malloc(rec_cache_size+info->cache_records*
400
 
                                            info->struct_length+1)))
401
 
    return(1);
402
 
#ifdef HAVE_purify
 
431
  if (cache_records <= 2 ||
 
432
      !(cache=(unsigned char*) malloc(local_rec_cache_size + cache_records * struct_length + 1)))
 
433
  {
 
434
    return false;
 
435
  }
 
436
#ifdef HAVE_VALGRIND
403
437
  // Avoid warnings in qsort
404
 
  memset(info->cache, 0,
405
 
         rec_cache_size+info->cache_records* info->struct_length+1);
 
438
  memset(cache, 0, local_rec_cache_size + cache_records * struct_length + 1);
406
439
#endif
407
 
  info->read_positions=info->cache+rec_cache_size;
408
 
  info->cache_pos=info->cache_end=info->cache;
409
 
  return(0);
 
440
  read_positions= cache + local_rec_cache_size;
 
441
  cache_pos= cache_end= cache;
 
442
 
 
443
  return true;
410
444
} /* init_rr_cache */
411
445
 
412
 
static int rr_from_cache(READ_RECORD *info)
 
446
static int rr_from_cache(ReadRecord *info)
413
447
{
414
 
  register uint32_t i;
415
448
  uint32_t length;
416
449
  internal::my_off_t rest_of_file;
417
450
  int16_t error;
431
464
      else
432
465
      {
433
466
        error=0;
434
 
        memcpy(info->record,info->cache_pos, (size_t) info->table->s->reclength);
 
467
        memcpy(info->record,info->cache_pos, (size_t) info->table->getShare()->getRecordLength());
435
468
      }
436
 
      info->cache_pos+=info->reclength;
 
469
      info->cache_pos+= info->reclength;
437
470
      return ((int) error);
438
471
    }
439
472
    length=info->rec_cache_size;
440
 
    rest_of_file=info->io_cache->end_of_file - my_b_tell(info->io_cache);
 
473
    rest_of_file= info->io_cache->end_of_file - my_b_tell(info->io_cache);
441
474
    if ((internal::my_off_t) length > rest_of_file)
 
475
    {
442
476
      length= (uint32_t) rest_of_file;
443
 
    if (!length || my_b_read(info->io_cache,info->cache,length))
 
477
    }
 
478
 
 
479
    if (!length || my_b_read(info->io_cache, info->getCache(), length))
444
480
    {
445
481
      return -1;                        /* End of cursor */
446
482
    }
447
483
 
448
484
    length/=info->ref_length;
449
 
    position=info->cache;
 
485
    position=info->getCache();
450
486
    ref_position=info->read_positions;
451
 
    for (i=0 ; i < length ; i++,position+=info->ref_length)
 
487
    for (uint32_t i= 0 ; i < length ; i++,position+=info->ref_length)
452
488
    {
453
489
      memcpy(ref_position,position,(size_t) info->ref_length);
454
490
      ref_position+=MAX_REFLENGTH;
459
495
                       (qsort_cmp) rr_cmp);
460
496
 
461
497
    position=info->read_positions;
462
 
    for (i=0 ; i < length ; i++)
 
498
    for (uint32_t i= 0 ; i < length ; i++)
463
499
    {
464
 
      memcpy(info->ref_pos,position,(size_t) info->ref_length);
 
500
      memcpy(info->ref_pos, position, (size_t)info->ref_length);
465
501
      position+=MAX_REFLENGTH;
466
502
      record=uint3korr(position);
467
503
      position+=3;
468
 
      record_pos=info->cache+record*info->reclength;
 
504
      record_pos= info->getCache() + record * info->reclength;
469
505
      if ((error=(int16_t) info->cursor->rnd_pos(record_pos,info->ref_pos)))
470
506
      {
471
507
        record_pos[info->error_offset]=1;
474
510
      else
475
511
        record_pos[info->error_offset]=0;
476
512
    }
477
 
    info->cache_end=(info->cache_pos=info->cache)+length*info->reclength;
 
513
    info->cache_end= (info->cache_pos= info->getCache())+length*info->reclength;
478
514
  }
479
515
} /* rr_from_cache */
480
516