~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/filesort.cc

  • Committer: Lee Bieber
  • Date: 2010-11-05 20:22:41 UTC
  • mfrom: (1907.1.2 build)
  • Revision ID: kalebral@gmail.com-20101105202241-1fm31t0y0fvdwcd3
Merge Brian - Adding FileSort class, merge in catalog tree
Merge Joe - fix bug 670971: InnoDB does not complete shutdown with transaction log enabled

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
 
32
32
#include "drizzled/drizzled.h"
33
33
#include "drizzled/sql_sort.h"
 
34
#include "drizzled/filesort.h"
34
35
#include "drizzled/error.h"
35
36
#include "drizzled/probes.h"
36
37
#include "drizzled/session.h"
45
46
#include "drizzled/atomics.h"
46
47
#include "drizzled/global_buffer.h"
47
48
 
 
49
 
48
50
using namespace std;
49
51
 
50
52
namespace drizzled
59
61
                                             uint32_t count,
60
62
                                             unsigned char *buf);
61
63
 
62
 
static ha_rows find_all_keys(Session *session,
63
 
                             SORTPARAM *param,
64
 
                             optimizer::SqlSelect *select,
65
 
                             unsigned char * *sort_keys, 
66
 
                             internal::IO_CACHE *buffer_file,
67
 
                             internal::IO_CACHE *tempfile,
68
 
                             internal::IO_CACHE *indexfile);
69
 
 
70
64
static int write_keys(SORTPARAM *param,
71
65
                      unsigned char * *sort_keys,
72
66
                      uint32_t count,
77
71
                         unsigned char *to,
78
72
                         unsigned char *ref_pos);
79
73
static void register_used_fields(SORTPARAM *param);
80
 
static int merge_index(SORTPARAM *param,
81
 
                       unsigned char *sort_buffer,
82
 
                       buffpek *buffpek,
83
 
                       uint32_t maxbuffer,
84
 
                       internal::IO_CACHE *tempfile,
85
 
                       internal::IO_CACHE *outfile);
86
74
static bool save_index(SORTPARAM *param,
87
75
                       unsigned char **sort_keys,
88
76
                       uint32_t count,
89
77
                       filesort_info *table_sort);
90
78
static uint32_t suffix_length(uint32_t string_length);
91
 
static uint32_t sortlength(Session *session,
92
 
                           SortField *sortorder,
93
 
                           uint32_t s_length,
94
 
                           bool *multi_byte_charset);
95
 
static sort_addon_field *get_addon_fields(Session *session,
96
 
                                             Field **ptabfield,
97
 
                                             uint32_t sortlength,
98
 
                                             uint32_t *plength);
99
79
static void unpack_addon_fields(sort_addon_field *addon_field,
100
80
                                unsigned char *buff);
 
81
 
 
82
FileSort::FileSort(Session &arg) :
 
83
  _session(arg)
 
84
 
85
}
 
86
 
101
87
/**
102
88
  Sort a table.
103
89
  Creates a set of pointers that can be used to read the rows
110
96
  The result set is stored in table->io_cache or
111
97
  table->record_pointers.
112
98
 
113
 
  @param session           Current thread
114
99
  @param table          Table to sort
115
100
  @param sortorder      How to sort the table
116
101
  @param s_length       Number of elements in sortorder
134
119
    examined_rows       will be set to number of examined rows
135
120
*/
136
121
 
137
 
ha_rows filesort(Session *session, Table *table, SortField *sortorder, uint32_t s_length,
138
 
                 optimizer::SqlSelect *select, ha_rows max_rows,
139
 
                 bool sort_positions, ha_rows *examined_rows)
 
122
ha_rows FileSort::run(Table *table, SortField *sortorder, uint32_t s_length,
 
123
                      optimizer::SqlSelect *select, ha_rows max_rows,
 
124
                      bool sort_positions, ha_rows *examined_rows)
140
125
{
141
126
  int error;
142
127
  uint32_t memavl= 0, min_sort_memory;
159
144
   Release InnoDB's adaptive hash index latch (if holding) before
160
145
   running a sort.
161
146
  */
162
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
147
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(&getSession());
163
148
 
164
149
  /*
165
150
    Don't use table->sort in filesort as it is also used by
175
160
  buffpek_inst=0;
176
161
  error= 1;
177
162
  memset(&param, 0, sizeof(param));
178
 
  param.sort_length= sortlength(session, sortorder, s_length, &multi_byte_charset);
 
163
  param.sort_length= sortlength(sortorder, s_length, &multi_byte_charset);
179
164
  param.ref_length= table->cursor->ref_length;
180
165
  param.addon_field= 0;
181
166
  param.addon_length= 0;
185
170
      Get the descriptors of all fields whose values are appended
186
171
      to sorted fields and get its total length in param.spack_length.
187
172
    */
188
 
    param.addon_field= get_addon_fields(session, table->getFields(),
 
173
    param.addon_field= get_addon_fields(table->getFields(),
189
174
                                        param.sort_length,
190
175
                                        &param.addon_length);
191
176
  }
214
199
 
215
200
  if (select && select->quick)
216
201
  {
217
 
    session->status_var.filesort_range_count++;
 
202
    getSession().status_var.filesort_range_count++;
218
203
  }
219
204
  else
220
205
  {
221
 
    session->status_var.filesort_scan_count++;
 
206
    getSession().status_var.filesort_scan_count++;
222
207
  }
223
208
#ifdef CAN_TRUST_RANGE
224
209
  if (select && select->quick && select->quick->records > 0L)
244
229
      !(param.tmp_buffer= (char*) malloc(param.sort_length)))
245
230
    goto err;
246
231
 
247
 
  memavl= session->variables.sortbuff_size;
 
232
  memavl= getSession().variables.sortbuff_size;
248
233
  min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
249
234
  while (memavl >= min_sort_memory)
250
235
  {
283
268
  param.keys--;                         /* TODO: check why we do this */
284
269
  param.sort_form= table;
285
270
  param.end=(param.local_sortorder=sortorder)+s_length;
286
 
  if ((records=find_all_keys(session, &param,select,sort_keys, &buffpek_pointers,
287
 
                             &tempfile, selected_records_file)) ==
288
 
      HA_POS_ERROR)
 
271
  if ((records= find_all_keys(&param,select,sort_keys, &buffpek_pointers,
 
272
                              &tempfile, selected_records_file)) == HA_POS_ERROR)
 
273
  {
289
274
    goto err;
 
275
  }
290
276
  maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek_inst));
291
277
 
292
278
  if (maxbuffer == 0)                   // The whole set is in memory
311
297
    close_cached_file(&buffpek_pointers);
312
298
        /* Open cached file if it isn't open */
313
299
    if (! my_b_inited(outfile) &&
314
 
        open_cached_file(outfile,drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER,
315
 
                          MYF(MY_WME)))
 
300
        open_cached_file(outfile,drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME)))
316
301
      goto err;
317
302
    if (reinit_io_cache(outfile,internal::WRITE_CACHE,0L,0,0))
318
303
      goto err;
369
354
  }
370
355
  else
371
356
  {
372
 
    session->status_var.filesort_rows+= (uint32_t) records;
 
357
    getSession().status_var.filesort_rows+= (uint32_t) records;
373
358
  }
374
359
  *examined_rows= param.examined_rows;
375
360
  global_sort_buffer.sub(allocated_sort_memory);
402
387
      sort.buffpek_len= 0;
403
388
    }
404
389
  }
 
390
 
405
391
  if (sort.addon_buf)
406
392
  {
407
393
    free((char *) sort.addon_buf);
491
477
    HA_POS_ERROR on error.
492
478
*/
493
479
 
494
 
static ha_rows find_all_keys(Session *session,
495
 
                             SORTPARAM *param, 
496
 
                             optimizer::SqlSelect *select,
497
 
                             unsigned char **sort_keys,
498
 
                             internal::IO_CACHE *buffpek_pointers,
499
 
                             internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
 
480
ha_rows FileSort::find_all_keys(SORTPARAM *param, 
 
481
                                optimizer::SqlSelect *select,
 
482
                                unsigned char **sort_keys,
 
483
                                internal::IO_CACHE *buffpek_pointers,
 
484
                                internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
500
485
{
501
486
  int error,flag,quick_select;
502
487
  uint32_t idx,indexpos,ref_length;
503
488
  unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
504
489
  internal::my_off_t record;
505
490
  Table *sort_form;
506
 
  volatile Session::killed_state *killed= &session->killed;
 
491
  volatile Session::killed_state *killed= &getSession().killed;
507
492
  Cursor *file;
508
493
  boost::dynamic_bitset<> *save_read_set= NULL;
509
494
  boost::dynamic_bitset<> *save_write_set= NULL;
525
510
  {
526
511
    next_pos=(unsigned char*) 0;                        /* Find records in sequence */
527
512
    file->startTableScan(1);
528
 
    file->extra_opt(HA_EXTRA_CACHE,
529
 
                    session->variables.read_buff_size);
 
513
    file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
530
514
  }
531
515
 
532
516
  ReadRecord read_record_info;
535
519
    if (select->quick->reset())
536
520
      return(HA_POS_ERROR);
537
521
 
538
 
    read_record_info.init_read_record(session, select->quick->head, select, 1, 1);
 
522
    read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1);
539
523
  }
540
524
 
541
525
  /* Remember original bitmaps */
614
598
    else
615
599
      file->unlock_row();
616
600
    /* It does not make sense to read more keys in case of a fatal error */
617
 
    if (session->is_error())
 
601
    if (getSession().is_error())
618
602
      break;
619
603
  }
620
604
  if (quick_select)
632
616
      file->endTableScan();
633
617
  }
634
618
 
635
 
  if (session->is_error())
 
619
  if (getSession().is_error())
636
620
    return(HA_POS_ERROR);
637
621
 
638
622
  /* Signal we should use orignal column read and write maps */
1039
1023
 
1040
1024
/** Merge buffers to make < MERGEBUFF2 buffers. */
1041
1025
 
1042
 
int merge_many_buff(SORTPARAM *param, unsigned char *sort_buffer,
1043
 
                    buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
 
1026
int FileSort::merge_many_buff(SORTPARAM *param, unsigned char *sort_buffer,
 
1027
                              buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
1044
1028
{
1045
1029
  internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
1046
1030
  buffpek *lastbuff;
1114
1098
    (uint32_t)-1 if something goes wrong
1115
1099
*/
1116
1100
 
1117
 
uint32_t read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst,
1118
 
                        uint32_t rec_length)
 
1101
uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
1119
1102
{
1120
1103
  register uint32_t count;
1121
1104
  uint32_t length;
1168
1151
    other  error
1169
1152
*/
1170
1153
 
1171
 
int merge_buffers(SORTPARAM *param, internal::IO_CACHE *from_file,
1172
 
                  internal::IO_CACHE *to_file, unsigned char *sort_buffer,
1173
 
                  buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
1174
 
                  int flag)
 
1154
int FileSort::merge_buffers(SORTPARAM *param, internal::IO_CACHE *from_file,
 
1155
                            internal::IO_CACHE *to_file, unsigned char *sort_buffer,
 
1156
                            buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
 
1157
                            int flag)
1175
1158
{
1176
1159
  int error;
1177
1160
  uint32_t rec_length,res_length,offset;
1183
1166
  buffpek *buffpek_inst;
1184
1167
  qsort2_cmp cmp;
1185
1168
  void *first_cmp_arg;
1186
 
  volatile Session::killed_state *killed= &current_session->killed;
 
1169
  volatile Session::killed_state *killed= &getSession().killed;
1187
1170
  Session::killed_state not_killable;
1188
1171
 
1189
 
  current_session->status_var.filesort_merge_passes++;
 
1172
  getSession().status_var.filesort_merge_passes++;
1190
1173
  if (param->not_killable)
1191
1174
  {
1192
1175
    killed= &not_killable;
1381
1364
 
1382
1365
        /* Do a merge to output-file (save only positions) */
1383
1366
 
1384
 
static int merge_index(SORTPARAM *param, unsigned char *sort_buffer,
1385
 
                       buffpek *buffpek_inst, uint32_t maxbuffer,
1386
 
                       internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
 
1367
int FileSort::merge_index(SORTPARAM *param, unsigned char *sort_buffer,
 
1368
                          buffpek *buffpek_inst, uint32_t maxbuffer,
 
1369
                          internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
1387
1370
{
1388
1371
  if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek_inst,buffpek_inst,
1389
1372
                    buffpek_inst+maxbuffer,1))
1390
 
    return(1);
1391
 
  return(0);
 
1373
    return 1;
 
1374
 
 
1375
  return 0;
1392
1376
} /* merge_index */
1393
1377
 
1394
1378
 
1408
1392
/**
1409
1393
  Calculate length of sort key.
1410
1394
 
1411
 
  @param session                          Thread Cursor
1412
1395
  @param sortorder                Order of items to sort
1413
1396
  @param s_length                 Number of items to sort
1414
1397
  @param[out] multi_byte_charset Set to 1 if we are using multi-byte charset
1423
1406
    Total length of sort buffer in bytes
1424
1407
*/
1425
1408
 
1426
 
static uint32_t
1427
 
sortlength(Session *session, SortField *sortorder, uint32_t s_length,
1428
 
           bool *multi_byte_charset)
 
1409
uint32_t FileSort::sortlength(SortField *sortorder, uint32_t s_length, bool *multi_byte_charset)
1429
1410
{
1430
1411
  register uint32_t length;
1431
1412
  const CHARSET_INFO *cs;
1459
1440
      case STRING_RESULT:
1460
1441
        sortorder->length=sortorder->item->max_length;
1461
1442
        set_if_smaller(sortorder->length,
1462
 
                       session->variables.max_sort_length);
 
1443
                       getSession().variables.max_sort_length);
1463
1444
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1464
1445
        {
1465
1446
          sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1494
1475
      if (sortorder->item->maybe_null)
1495
1476
        length++;                               // Place for NULL marker
1496
1477
    }
1497
 
    set_if_smaller(sortorder->length,
1498
 
                   (size_t)session->variables.max_sort_length);
 
1478
    set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
1499
1479
    length+=sortorder->length;
1500
1480
  }
1501
1481
  sortorder->field= (Field*) 0;                 // end marker
1515
1495
  layouts for the values of the non-sorted fields in the buffer and
1516
1496
  fills them.
1517
1497
 
1518
 
  @param session                 Current thread
1519
1498
  @param ptabfield           Array of references to the table fields
1520
1499
  @param sortlength          Total length of sorted fields
1521
1500
  @param[out] plength        Total length of appended fields
1530
1509
    NULL   if we do not store field values with sort data.
1531
1510
*/
1532
1511
 
1533
 
static sort_addon_field *
1534
 
get_addon_fields(Session *session, Field **ptabfield, uint32_t sortlength, uint32_t *plength)
 
1512
sort_addon_field *FileSort::get_addon_fields(Field **ptabfield, uint32_t sortlength_arg, uint32_t *plength)
1535
1513
{
1536
1514
  Field **pfield;
1537
1515
  Field *field;
1566
1544
    return 0;
1567
1545
  length+= (null_fields+7)/8;
1568
1546
 
1569
 
  if (length+sortlength > session->variables.max_length_for_sort_data ||
 
1547
  if (length+sortlength_arg > getSession().variables.max_length_for_sort_data ||
1570
1548
      !(addonf= (sort_addon_field *) malloc(sizeof(sort_addon_field)*
1571
1549
                                            (fields+1))))
1572
1550
    return 0;