~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/records.cc

  • Committer: Monty Taylor
  • Date: 2010-03-02 19:10:25 UTC
  • mto: (1317.1.8)
  • mto: This revision was merged to the branch mainline in revision 1322.
  • Revision ID: mordred@inaugust.com-20100302191025-zoxjz4xwkoa6160h
Prevent unauthorized users from changing schema.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/**
17
17
  @file
20
20
  Functions for easy reading of records, possible through a cache
21
21
*/
22
22
#include "config.h"
23
 
 
24
 
#include "drizzled/drizzled.h"
25
23
#include "drizzled/error.h"
 
24
#include "drizzled/table.h"
 
25
#include "drizzled/session.h"
 
26
#include "drizzled/records.h"
 
27
#include "drizzled/optimizer/range.h"
 
28
#include "drizzled/internal/my_sys.h"
26
29
#include "drizzled/internal/iocache.h"
27
 
#include "drizzled/internal/my_sys.h"
28
 
#include "drizzled/optimizer/range.h"
29
 
#include "drizzled/plugin/storage_engine.h"
30
 
#include "drizzled/records.h"
31
 
#include "drizzled/session.h"
32
 
#include "drizzled/table.h"
33
30
 
34
31
namespace drizzled
35
32
{
36
33
 
37
 
static int rr_sequential(ReadRecord *info);
38
 
static int rr_quick(ReadRecord *info);
39
 
static int rr_from_tempfile(ReadRecord *info);
40
 
static int rr_unpack_from_tempfile(ReadRecord *info);
41
 
static int rr_unpack_from_buffer(ReadRecord *info);
42
 
static int rr_from_pointers(ReadRecord *info);
43
 
static int rr_from_cache(ReadRecord *info);
 
34
int rr_sequential(READ_RECORD *info);
 
35
static int rr_quick(READ_RECORD *info);
 
36
static int rr_from_tempfile(READ_RECORD *info);
 
37
static int rr_unpack_from_tempfile(READ_RECORD *info);
 
38
static int rr_unpack_from_buffer(READ_RECORD *info);
 
39
static int rr_from_pointers(READ_RECORD *info);
 
40
static int rr_from_cache(READ_RECORD *info);
 
41
static int init_rr_cache(Session *session, READ_RECORD *info);
44
42
static int rr_cmp(unsigned char *a,unsigned char *b);
45
 
static int rr_index_first(ReadRecord *info);
46
 
static int rr_index(ReadRecord *info);
47
 
 
48
 
void ReadRecord::init_reard_record_sequential()
49
 
{
50
 
  read_record= rr_sequential;
51
 
}
52
 
 
53
 
int ReadRecord::init_read_record_idx(Session *,
54
 
                                     Table *table_arg,
55
 
                                     bool print_error_arg,
56
 
                                     uint32_t idx)
57
 
{
58
 
  table_arg->emptyRecord();
59
 
  table= table_arg;
60
 
  cursor=  table->cursor;
61
 
  record= table->getInsertRecord();
62
 
  print_error= print_error_arg;
 
43
static int rr_index_first(READ_RECORD *info);
 
44
static int rr_index(READ_RECORD *info);
 
45
 
 
46
void init_read_record_idx(READ_RECORD *info, 
 
47
                          Session *, 
 
48
                          Table *table,
 
49
                          bool print_error, 
 
50
                          uint32_t idx)
 
51
{
 
52
  table->emptyRecord();
 
53
  memset(info, 0, sizeof(*info));
 
54
  info->table= table;
 
55
  info->cursor=  table->cursor;
 
56
  info->record= table->record[0];
 
57
  info->print_error= print_error;
63
58
 
64
59
  table->status=0;                      /* And it's always found */
65
 
  if (not table->cursor->inited)
66
 
  {
67
 
    int error= table->cursor->startIndexScan(idx, 1);
68
 
    if (error != 0)
69
 
      return error;
70
 
  }
 
60
  if (!table->cursor->inited)
 
61
    table->cursor->ha_index_init(idx, 1);
71
62
  /* read_record will be changed to rr_index in rr_index_first */
72
 
  read_record= rr_index_first;
73
 
 
74
 
  return 0;
 
63
  info->read_record= rr_index_first;
75
64
}
76
65
 
77
66
 
78
 
int ReadRecord::init_read_record(Session *session_arg,
79
 
                                 Table *table_arg,
80
 
                                 optimizer::SqlSelect *select_arg,
81
 
                                 int use_record_cache,
82
 
                                 bool print_error_arg)
 
67
void init_read_record(READ_RECORD *info,
 
68
                      Session *session, 
 
69
                      Table *table,
 
70
                      optimizer::SqlSelect *select,
 
71
                      int use_record_cache, 
 
72
                      bool print_error)
83
73
{
84
74
  internal::IO_CACHE *tempfile;
85
 
  int error= 0;
86
75
 
87
 
  session= session_arg;
88
 
  table= table_arg;
89
 
  cursor= table->cursor;
90
 
  forms= &table;                /* Only one table */
 
76
  memset(info, 0, sizeof(*info));
 
77
  info->session=session;
 
78
  info->table=table;
 
79
  info->cursor= table->cursor;
 
80
  info->forms= &info->table;            /* Only one table */
91
81
 
92
82
  if (table->sort.addon_field)
93
83
  {
94
 
    rec_buf= table->sort.addon_buf;
95
 
    ref_length= table->sort.addon_length;
 
84
    info->rec_buf= table->sort.addon_buf;
 
85
    info->ref_length= table->sort.addon_length;
96
86
  }
97
87
  else
98
88
  {
99
89
    table->emptyRecord();
100
 
    record= table->getInsertRecord();
101
 
    ref_length= table->cursor->ref_length;
 
90
    info->record= table->record[0];
 
91
    info->ref_length= table->cursor->ref_length;
102
92
  }
103
 
  select= select_arg;
104
 
  print_error= print_error_arg;
105
 
  ignore_not_found_rows= 0;
 
93
  info->select=select;
 
94
  info->print_error= print_error;
 
95
  info->ignore_not_found_rows= 0;
106
96
  table->status=0;                      /* And it's always found */
107
97
 
108
98
  if (select && my_b_inited(select->file))
109
 
  {
110
99
    tempfile= select->file;
111
 
  }
112
100
  else
113
 
  {
114
101
    tempfile= table->sort.io_cache;
115
 
  }
116
 
 
117
102
  if (tempfile && my_b_inited(tempfile)) // Test if ref-records was used
118
103
  {
119
 
    read_record= (table->sort.addon_field ?
120
 
                  rr_unpack_from_tempfile : rr_from_tempfile);
121
 
 
122
 
    io_cache=tempfile;
123
 
    io_cache->reinit_io_cache(internal::READ_CACHE,0L,0,0);
124
 
    ref_pos=table->cursor->ref;
 
104
    info->read_record= (table->sort.addon_field ?
 
105
                        rr_unpack_from_tempfile : rr_from_tempfile);
 
106
    info->io_cache=tempfile;
 
107
    reinit_io_cache(info->io_cache,internal::READ_CACHE,0L,0,0);
 
108
    info->ref_pos=table->cursor->ref;
125
109
    if (!table->cursor->inited)
126
 
    {
127
 
      error= table->cursor->startTableScan(0);
128
 
      if (error != 0)
129
 
        return error;
130
 
    }
 
110
      table->cursor->ha_rnd_init(0);
131
111
 
132
112
    /*
133
113
      table->sort.addon_field is checked because if we use addon fields,
139
119
        !(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) &&
140
120
        (table->db_stat & HA_READ_ONLY ||
141
121
        table->reginfo.lock_type <= TL_READ_NO_INSERT) &&
142
 
        (uint64_t) table->getShare()->getRecordLength() * (table->cursor->stats.records+
 
122
        (uint64_t) table->s->reclength* (table->cursor->stats.records+
143
123
                                                table->cursor->stats.deleted) >
144
124
        (uint64_t) MIN_FILE_LENGTH_TO_USE_ROW_CACHE &&
145
 
        io_cache->end_of_file/ref_length * table->getShare()->getRecordLength() >
 
125
        info->io_cache->end_of_file/info->ref_length * table->s->reclength >
146
126
        (internal::my_off_t) MIN_ROWS_TO_USE_TABLE_CACHE &&
147
 
        !table->getShare()->blob_fields &&
148
 
        ref_length <= MAX_REFLENGTH)
 
127
        !table->s->blob_fields &&
 
128
        info->ref_length <= MAX_REFLENGTH)
149
129
    {
150
 
      if (init_rr_cache())
 
130
      if (! init_rr_cache(session, info))
151
131
      {
152
 
        read_record= rr_from_cache;
 
132
        info->read_record=rr_from_cache;
153
133
      }
154
134
    }
155
135
  }
156
136
  else if (select && select->quick)
157
137
  {
158
 
    read_record= rr_quick;
 
138
    info->read_record=rr_quick;
159
139
  }
160
140
  else if (table->sort.record_pointers)
161
141
  {
162
 
    error= table->cursor->startTableScan(0);
163
 
    if (error != 0)
164
 
      return error;
165
 
 
166
 
    cache_pos=table->sort.record_pointers;
167
 
    cache_end= cache_pos+ table->sort.found_records * ref_length;
168
 
    read_record= (table->sort.addon_field ?  rr_unpack_from_buffer : rr_from_pointers);
 
142
    table->cursor->ha_rnd_init(0);
 
143
    info->cache_pos=table->sort.record_pointers;
 
144
    info->cache_end=info->cache_pos+
 
145
                    table->sort.found_records*info->ref_length;
 
146
    info->read_record= (table->sort.addon_field ?
 
147
                        rr_unpack_from_buffer : rr_from_pointers);
169
148
  }
170
149
  else
171
150
  {
172
 
    read_record= rr_sequential;
173
 
    error= table->cursor->startTableScan(1);
174
 
    if (error != 0)
175
 
      return error;
176
 
 
 
151
    info->read_record= rr_sequential;
 
152
    table->cursor->ha_rnd_init(1);
177
153
    /* We can use record cache if we don't update dynamic length tables */
178
154
    if (!table->no_cache &&
179
155
        (use_record_cache > 0 ||
180
156
        (int) table->reginfo.lock_type <= (int) TL_READ_WITH_SHARED_LOCKS ||
181
 
        !(table->getShare()->db_options_in_use & HA_OPTION_PACK_RECORD)))
182
 
    {
 
157
        !(table->s->db_options_in_use & HA_OPTION_PACK_RECORD)))
183
158
      table->cursor->extra_opt(HA_EXTRA_CACHE, session->variables.read_buff_size);
184
 
    }
185
159
  }
186
160
 
187
 
  return 0;
 
161
  return;
188
162
} /* init_read_record */
189
163
 
190
164
 
191
 
void ReadRecord::end_read_record()
 
165
void end_read_record(READ_RECORD *info)
192
166
{                   /* free cache if used */
193
 
  if (cache)
 
167
  if (info->cache)
194
168
  {
195
 
    global_read_rnd_buffer.sub(session->variables.read_rnd_buff_size);
196
 
    free((char*) cache);
197
 
    cache= NULL;
 
169
    free((char*) info->cache);
 
170
    info->cache=0;
198
171
  }
199
 
  if (table)
 
172
  if (info->table)
200
173
  {
201
 
    table->filesort_free_buffers();
202
 
    (void) cursor->extra(HA_EXTRA_NO_CACHE);
203
 
    if (read_record != rr_quick) // otherwise quick_range does it
204
 
      (void) cursor->ha_index_or_rnd_end();
205
 
 
206
 
    table= NULL;
 
174
    info->table->filesort_free_buffers();
 
175
    (void) info->cursor->extra(HA_EXTRA_NO_CACHE);
 
176
    if (info->read_record != rr_quick) // otherwise quick_range does it
 
177
      (void) info->cursor->ha_index_or_rnd_end();
 
178
    info->table=0;
207
179
  }
208
180
}
209
181
 
210
 
static int rr_handle_error(ReadRecord *info, int error)
 
182
static int rr_handle_error(READ_RECORD *info, int error)
211
183
{
212
184
  if (error == HA_ERR_END_OF_FILE)
213
185
    error= -1;
222
194
}
223
195
 
224
196
/** Read a record from head-database. */
225
 
static int rr_quick(ReadRecord *info)
 
197
static int rr_quick(READ_RECORD *info)
226
198
{
227
199
  int tmp;
228
200
  while ((tmp= info->select->quick->get_next()))
229
201
  {
230
 
    if (info->session->getKilled())
 
202
    if (info->session->killed)
231
203
    {
232
204
      my_error(ER_SERVER_SHUTDOWN, MYF(0));
233
205
      return 1;
254
226
  @retval
255
227
    1   Error
256
228
*/
257
 
static int rr_index_first(ReadRecord *info)
 
229
static int rr_index_first(READ_RECORD *info)
258
230
{
259
231
  int tmp= info->cursor->index_first(info->record);
260
232
  info->read_record= rr_index;
278
250
  @retval
279
251
    1   Error
280
252
*/
281
 
static int rr_index(ReadRecord *info)
 
253
static int rr_index(READ_RECORD *info)
282
254
{
283
255
  int tmp= info->cursor->index_next(info->record);
284
256
  if (tmp)
286
258
  return tmp;
287
259
}
288
260
 
289
 
int rr_sequential(ReadRecord *info)
 
261
int rr_sequential(READ_RECORD *info)
290
262
{
291
263
  int tmp;
292
264
  while ((tmp= info->cursor->rnd_next(info->record)))
293
265
  {
294
 
    if (info->session->getKilled())
 
266
    if (info->session->killed)
295
267
    {
296
268
      info->session->send_kill_message();
297
269
      return 1;
311
283
  return tmp;
312
284
}
313
285
 
314
 
static int rr_from_tempfile(ReadRecord *info)
 
286
static int rr_from_tempfile(READ_RECORD *info)
315
287
{
316
288
  int tmp;
317
289
  for (;;)
345
317
  @retval
346
318
    -1   There is no record to be read anymore.
347
319
*/
348
 
static int rr_unpack_from_tempfile(ReadRecord *info)
 
320
static int rr_unpack_from_tempfile(READ_RECORD *info)
349
321
{
350
322
  if (my_b_read(info->io_cache, info->rec_buf, info->ref_length))
351
323
    return -1;
355
327
  return 0;
356
328
}
357
329
 
358
 
static int rr_from_pointers(ReadRecord *info)
 
330
static int rr_from_pointers(READ_RECORD *info)
359
331
{
360
332
  int tmp;
361
333
  unsigned char *cache_pos;
362
334
 
363
 
 
364
335
  for (;;)
365
336
  {
366
337
    if (info->cache_pos == info->cache_end)
396
367
  @retval
397
368
    -1   There is no record to be read anymore.
398
369
*/
399
 
static int rr_unpack_from_buffer(ReadRecord *info)
 
370
static int rr_unpack_from_buffer(READ_RECORD *info)
400
371
{
401
372
  if (info->cache_pos == info->cache_end)
402
373
    return -1;                      /* End of buffer */
408
379
}
409
380
 
410
381
/* cacheing of records from a database */
411
 
bool ReadRecord::init_rr_cache()
 
382
static int init_rr_cache(Session *session, READ_RECORD *info)
412
383
{
413
 
  uint32_t local_rec_cache_size;
414
 
 
415
 
  struct_length= 3 + MAX_REFLENGTH;
416
 
  reclength= ALIGN_SIZE(table->getShare()->getRecordLength() + 1);
417
 
  if (reclength < struct_length)
418
 
    reclength= ALIGN_SIZE(struct_length);
419
 
 
420
 
  error_offset= table->getShare()->getRecordLength();
421
 
  cache_records= (session->variables.read_rnd_buff_size /
422
 
                        (reclength + struct_length));
423
 
  local_rec_cache_size= cache_records * reclength;
424
 
  rec_cache_size= cache_records * ref_length;
425
 
 
426
 
  if (not global_read_rnd_buffer.add(session->variables.read_rnd_buff_size))
427
 
  {
428
 
    my_error(ER_OUT_OF_GLOBAL_READRNDMEMORY, MYF(ME_ERROR+ME_WAITTANG));
429
 
    return false;
430
 
  }
 
384
  uint32_t rec_cache_size;
 
385
 
 
386
  info->struct_length= 3+MAX_REFLENGTH;
 
387
  info->reclength= ALIGN_SIZE(info->table->s->reclength+1);
 
388
  if (info->reclength < info->struct_length)
 
389
    info->reclength= ALIGN_SIZE(info->struct_length);
 
390
 
 
391
  info->error_offset= info->table->s->reclength;
 
392
  info->cache_records= (session->variables.read_rnd_buff_size /
 
393
                        (info->reclength+info->struct_length));
 
394
  rec_cache_size= info->cache_records*info->reclength;
 
395
  info->rec_cache_size= info->cache_records*info->ref_length;
431
396
 
432
397
  // We have to allocate one more byte to use uint3korr (see comments for it)
433
 
  if (cache_records <= 2 ||
434
 
      !(cache=(unsigned char*) malloc(local_rec_cache_size + cache_records * struct_length + 1)))
435
 
  {
436
 
    return false;
437
 
  }
438
 
#ifdef HAVE_VALGRIND
 
398
  if (info->cache_records <= 2 ||
 
399
      !(info->cache=(unsigned char*) malloc(rec_cache_size+info->cache_records*
 
400
                                            info->struct_length+1)))
 
401
    return(1);
 
402
#ifdef HAVE_purify
439
403
  // Avoid warnings in qsort
440
 
  memset(cache, 0, local_rec_cache_size + cache_records * struct_length + 1);
 
404
  memset(info->cache, 0,
 
405
         rec_cache_size+info->cache_records* info->struct_length+1);
441
406
#endif
442
 
  read_positions= cache + local_rec_cache_size;
443
 
  cache_pos= cache_end= cache;
444
 
 
445
 
  return true;
 
407
  info->read_positions=info->cache+rec_cache_size;
 
408
  info->cache_pos=info->cache_end=info->cache;
 
409
  return(0);
446
410
} /* init_rr_cache */
447
411
 
448
 
static int rr_from_cache(ReadRecord *info)
 
412
static int rr_from_cache(READ_RECORD *info)
449
413
{
 
414
  register uint32_t i;
450
415
  uint32_t length;
451
416
  internal::my_off_t rest_of_file;
452
417
  int16_t error;
466
431
      else
467
432
      {
468
433
        error=0;
469
 
        memcpy(info->record,info->cache_pos, (size_t) info->table->getShare()->getRecordLength());
 
434
        memcpy(info->record,info->cache_pos, (size_t) info->table->s->reclength);
470
435
      }
471
 
      info->cache_pos+= info->reclength;
 
436
      info->cache_pos+=info->reclength;
472
437
      return ((int) error);
473
438
    }
474
439
    length=info->rec_cache_size;
475
 
    rest_of_file= info->io_cache->end_of_file - my_b_tell(info->io_cache);
 
440
    rest_of_file=info->io_cache->end_of_file - my_b_tell(info->io_cache);
476
441
    if ((internal::my_off_t) length > rest_of_file)
477
 
    {
478
442
      length= (uint32_t) rest_of_file;
479
 
    }
480
 
 
481
 
    if (!length || my_b_read(info->io_cache, info->getCache(), length))
 
443
    if (!length || my_b_read(info->io_cache,info->cache,length))
482
444
    {
483
445
      return -1;                        /* End of cursor */
484
446
    }
485
447
 
486
448
    length/=info->ref_length;
487
 
    position=info->getCache();
 
449
    position=info->cache;
488
450
    ref_position=info->read_positions;
489
 
    for (uint32_t i= 0 ; i < length ; i++,position+=info->ref_length)
 
451
    for (i=0 ; i < length ; i++,position+=info->ref_length)
490
452
    {
491
453
      memcpy(ref_position,position,(size_t) info->ref_length);
492
454
      ref_position+=MAX_REFLENGTH;
497
459
                       (qsort_cmp) rr_cmp);
498
460
 
499
461
    position=info->read_positions;
500
 
    for (uint32_t i= 0 ; i < length ; i++)
 
462
    for (i=0 ; i < length ; i++)
501
463
    {
502
 
      memcpy(info->ref_pos, position, (size_t)info->ref_length);
 
464
      memcpy(info->ref_pos,position,(size_t) info->ref_length);
503
465
      position+=MAX_REFLENGTH;
504
466
      record=uint3korr(position);
505
467
      position+=3;
506
 
      record_pos= info->getCache() + record * info->reclength;
 
468
      record_pos=info->cache+record*info->reclength;
507
469
      if ((error=(int16_t) info->cursor->rnd_pos(record_pos,info->ref_pos)))
508
470
      {
509
471
        record_pos[info->error_offset]=1;
512
474
      else
513
475
        record_pos[info->error_offset]=0;
514
476
    }
515
 
    info->cache_end= (info->cache_pos= info->getCache())+length*info->reclength;
 
477
    info->cache_end=(info->cache_pos=info->cache)+length*info->reclength;
516
478
  }
517
479
} /* rr_from_cache */
518
480