~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/filesort.cc

  • Committer: Brian Aker
  • Date: 2010-09-15 20:24:31 UTC
  • mto: (1766.1.1 build)
  • mto: This revision was merged to the branch mainline in revision 1767.
  • Revision ID: brian@tangent.org-20100915202431-wbrrl4vg6rzjvdiu
Adding opt for optional schedulers and making them --plugin-add only.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/**
28
28
 
29
29
#include <queue>
30
30
#include <algorithm>
31
 
#include <iostream>
32
31
 
33
 
#include "drizzled/drizzled.h"
34
32
#include "drizzled/sql_sort.h"
35
 
#include "drizzled/filesort.h"
36
33
#include "drizzled/error.h"
37
34
#include "drizzled/probes.h"
38
35
#include "drizzled/session.h"
44
41
#include "drizzled/internal/my_sys.h"
45
42
#include "plugin/myisam/myisam.h"
46
43
#include "drizzled/plugin/transactional_storage_engine.h"
47
 
#include "drizzled/atomics.h"
48
 
#include "drizzled/global_buffer.h"
49
 
 
50
44
 
51
45
using namespace std;
52
46
 
53
47
namespace drizzled
54
48
{
55
49
 
56
 
/* Defines used by filesort and uniques */
57
 
#define MERGEBUFF               7
58
 
#define MERGEBUFF2              15
59
 
 
60
 
class BufferCompareContext
61
 
{
62
 
public:
63
 
  qsort_cmp2 key_compare;
64
 
  void *key_compare_arg;
65
 
 
66
 
  BufferCompareContext() :
67
 
    key_compare(0),
68
 
    key_compare_arg(0)
69
 
  { }
70
 
 
71
 
};
72
 
 
73
 
class SortParam {
74
 
public:
75
 
  uint32_t rec_length;          /* Length of sorted records */
76
 
  uint32_t sort_length;                 /* Length of sorted columns */
77
 
  uint32_t ref_length;                  /* Length of record ref. */
78
 
  uint32_t addon_length;        /* Length of added packed fields */
79
 
  uint32_t res_length;          /* Length of records in final sorted file/buffer */
80
 
  uint32_t keys;                                /* Max keys / buffer */
81
 
  ha_rows max_rows,examined_rows;
82
 
  Table *sort_form;                     /* For quicker make_sortkey */
83
 
  SortField *local_sortorder;
84
 
  SortField *end;
85
 
  sort_addon_field *addon_field; /* Descriptors for companion fields */
86
 
  unsigned char *unique_buff;
87
 
  bool not_killable;
88
 
  char *tmp_buffer;
89
 
  /* The fields below are used only by Unique class */
90
 
  qsort2_cmp compare;
91
 
  BufferCompareContext cmp_context;
92
 
 
93
 
  SortParam() :
94
 
    rec_length(0),
95
 
    sort_length(0),
96
 
    ref_length(0),
97
 
    addon_length(0),
98
 
    res_length(0),
99
 
    keys(0),
100
 
    max_rows(0),
101
 
    examined_rows(0),
102
 
    sort_form(0),
103
 
    local_sortorder(0),
104
 
    end(0),
105
 
    addon_field(0),
106
 
    unique_buff(0),
107
 
    not_killable(0),
108
 
    tmp_buffer(0),
109
 
    compare(0)
110
 
  {
111
 
  }
112
 
 
113
 
  ~SortParam()
114
 
  {
115
 
    if (tmp_buffer)
116
 
      free(tmp_buffer);
117
 
  }
118
 
 
119
 
  int write_keys(unsigned char * *sort_keys,
120
 
                 uint32_t count,
121
 
                 internal::IO_CACHE *buffer_file,
122
 
                 internal::IO_CACHE *tempfile);
123
 
 
124
 
  void make_sortkey(unsigned char *to,
125
 
                    unsigned char *ref_pos);
126
 
  void register_used_fields();
127
 
  bool save_index(unsigned char **sort_keys,
128
 
                  uint32_t count,
129
 
                  filesort_info *table_sort);
130
 
 
131
 
};
132
 
 
133
50
/* functions defined in this file */
134
51
 
135
52
static char **make_char_array(char **old_pos, register uint32_t fields,
139
56
                                             uint32_t count,
140
57
                                             unsigned char *buf);
141
58
 
 
59
static ha_rows find_all_keys(Session *session,
 
60
                             SORTPARAM *param,
 
61
                             optimizer::SqlSelect *select,
 
62
                             unsigned char * *sort_keys, 
 
63
                             internal::IO_CACHE *buffer_file,
 
64
                             internal::IO_CACHE *tempfile,
 
65
                             internal::IO_CACHE *indexfile);
 
66
 
 
67
static int write_keys(SORTPARAM *param,
 
68
                      unsigned char * *sort_keys,
 
69
                      uint32_t count,
 
70
                      internal::IO_CACHE *buffer_file,
 
71
                      internal::IO_CACHE *tempfile);
 
72
 
 
73
static void make_sortkey(SORTPARAM *param,
 
74
                         unsigned char *to,
 
75
                         unsigned char *ref_pos);
 
76
static void register_used_fields(SORTPARAM *param);
 
77
static int merge_index(SORTPARAM *param,
 
78
                       unsigned char *sort_buffer,
 
79
                       buffpek_st *buffpek,
 
80
                       uint32_t maxbuffer,
 
81
                       internal::IO_CACHE *tempfile,
 
82
                       internal::IO_CACHE *outfile);
 
83
static bool save_index(SORTPARAM *param,
 
84
                       unsigned char **sort_keys,
 
85
                       uint32_t count,
 
86
                       filesort_info_st *table_sort);
142
87
static uint32_t suffix_length(uint32_t string_length);
143
 
static void unpack_addon_fields(sort_addon_field *addon_field,
 
88
static uint32_t sortlength(Session *session,
 
89
                           SortField *sortorder,
 
90
                           uint32_t s_length,
 
91
                           bool *multi_byte_charset);
 
92
static sort_addon_field_st *get_addon_fields(Session *session,
 
93
                                             Field **ptabfield,
 
94
                                             uint32_t sortlength,
 
95
                                             uint32_t *plength);
 
96
static void unpack_addon_fields(sort_addon_field_st *addon_field,
144
97
                                unsigned char *buff);
145
 
 
146
 
FileSort::FileSort(Session &arg) :
147
 
  _session(arg)
148
 
149
 
}
150
 
 
151
98
/**
152
99
  Sort a table.
153
100
  Creates a set of pointers that can be used to read the rows
160
107
  The result set is stored in table->io_cache or
161
108
  table->record_pointers.
162
109
 
 
110
  @param session           Current thread
163
111
  @param table          Table to sort
164
112
  @param sortorder      How to sort the table
165
113
  @param s_length       Number of elements in sortorder
183
131
    examined_rows       will be set to number of examined rows
184
132
*/
185
133
 
186
 
ha_rows FileSort::run(Table *table, SortField *sortorder, uint32_t s_length,
187
 
                      optimizer::SqlSelect *select, ha_rows max_rows,
188
 
                      bool sort_positions, ha_rows &examined_rows)
 
134
ha_rows filesort(Session *session, Table *table, SortField *sortorder, uint32_t s_length,
 
135
                 optimizer::SqlSelect *select, ha_rows max_rows,
 
136
                 bool sort_positions, ha_rows *examined_rows)
189
137
{
190
 
  int error= 1;
191
 
  uint32_t memavl= 0, min_sort_memory;
 
138
  int error;
 
139
  uint32_t memavl, min_sort_memory;
192
140
  uint32_t maxbuffer;
193
 
  size_t allocated_sort_memory= 0;
194
 
  buffpek *buffpek_inst= 0;
 
141
  buffpek_st *buffpek;
195
142
  ha_rows records= HA_POS_ERROR;
196
143
  unsigned char **sort_keys= 0;
197
 
  internal::IO_CACHE tempfile;
198
 
  internal::IO_CACHE buffpek_pointers;
199
 
  internal::IO_CACHE *selected_records_file;
200
 
  internal::IO_CACHE *outfile;
201
 
  SortParam param;
 
144
  internal::IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
 
145
  SORTPARAM param;
202
146
  bool multi_byte_charset;
203
147
 
204
 
  /*
205
 
    Don't use table->sort in filesort as it is also used by
206
 
    QuickIndexMergeSelect. Work with a copy and put it back at the end
207
 
    when index_merge select has finished with it.
208
 
  */
209
 
  filesort_info table_sort(table->sort);
210
 
  table->sort.io_cache= NULL;
211
 
 
 
148
  filesort_info_st table_sort;
212
149
  TableList *tab= table->pos_in_table_list;
213
150
  Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
214
151
 
218
155
   Release InnoDB's adaptive hash index latch (if holding) before
219
156
   running a sort.
220
157
  */
221
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(&getSession());
 
158
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
222
159
 
 
160
  /*
 
161
    Don't use table->sort in filesort as it is also used by
 
162
    QuickIndexMergeSelect. Work with a copy and put it back at the end
 
163
    when index_merge select has finished with it.
 
164
  */
 
165
  memcpy(&table_sort, &table->sort, sizeof(filesort_info_st));
 
166
  table->sort.io_cache= NULL;
223
167
 
224
168
  outfile= table_sort.io_cache;
225
 
  assert(tempfile.buffer == 0);
226
 
  assert(buffpek_pointers.buffer == 0);
227
 
 
228
 
  param.sort_length= sortlength(sortorder, s_length, &multi_byte_charset);
 
169
  my_b_clear(&tempfile);
 
170
  my_b_clear(&buffpek_pointers);
 
171
  buffpek=0;
 
172
  error= 1;
 
173
  memset(&param, 0, sizeof(param));
 
174
  param.sort_length= sortlength(session, sortorder, s_length, &multi_byte_charset);
229
175
  param.ref_length= table->cursor->ref_length;
230
 
 
 
176
  param.addon_field= 0;
 
177
  param.addon_length= 0;
231
178
  if (!(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) && !sort_positions)
232
179
  {
233
180
    /*
234
181
      Get the descriptors of all fields whose values are appended
235
182
      to sorted fields and get its total length in param.spack_length.
236
183
    */
237
 
    param.addon_field= get_addon_fields(table->getFields(),
 
184
    param.addon_field= get_addon_fields(session, table->getFields(),
238
185
                                        param.sort_length,
239
186
                                        &param.addon_length);
240
187
  }
247
194
  {
248
195
    param.res_length= param.addon_length;
249
196
    if (!(table_sort.addon_buf= (unsigned char *) malloc(param.addon_length)))
250
 
    {
251
197
      goto err;
252
 
    }
253
198
  }
254
199
  else
255
200
  {
265
210
 
266
211
  if (select && select->quick)
267
212
  {
268
 
    getSession().status_var.filesort_range_count++;
 
213
    session->status_var.filesort_range_count++;
269
214
  }
270
215
  else
271
216
  {
272
 
    getSession().status_var.filesort_scan_count++;
 
217
    session->status_var.filesort_scan_count++;
273
218
  }
274
219
#ifdef CAN_TRUST_RANGE
275
220
  if (select && select->quick && select->quick->records > 0L)
291
236
    selected_records_file= 0;
292
237
  }
293
238
 
294
 
  if (multi_byte_charset && !(param.tmp_buffer= (char*) malloc(param.sort_length)))
295
 
  {
 
239
  if (multi_byte_charset &&
 
240
      !(param.tmp_buffer= (char*) malloc(param.sort_length)))
296
241
    goto err;
297
 
  }
298
242
 
299
 
  memavl= getSession().variables.sortbuff_size;
 
243
  memavl= session->variables.sortbuff_size;
300
244
  min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
301
245
  while (memavl >= min_sort_memory)
302
246
  {
303
247
    uint32_t old_memavl;
304
248
    uint32_t keys= memavl/(param.rec_length+sizeof(char*));
305
249
    param.keys= (uint32_t) min(records+1, (ha_rows)keys);
306
 
 
307
 
    allocated_sort_memory= param.keys * param.rec_length;
308
 
    if (not global_sort_buffer.add(allocated_sort_memory))
309
 
    {
310
 
      my_error(ER_OUT_OF_GLOBAL_SORTMEMORY, MYF(ME_ERROR+ME_WAITTANG));
311
 
      goto err;
312
 
    }
313
 
 
314
250
    if ((table_sort.sort_keys=
315
251
         (unsigned char **) make_char_array((char **) table_sort.sort_keys,
316
252
                                            param.keys, param.rec_length)))
317
253
      break;
318
 
 
319
 
    global_sort_buffer.sub(allocated_sort_memory);
320
254
    old_memavl= memavl;
321
255
    if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
322
256
      memavl= min_sort_memory;
327
261
    my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
328
262
    goto err;
329
263
  }
330
 
 
331
 
  if (buffpek_pointers.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
332
 
  {
 
264
  if (open_cached_file(&buffpek_pointers,drizzle_tmpdir.c_str(),TEMP_PREFIX,
 
265
                       DISK_BUFFER_SIZE, MYF(MY_WME)))
333
266
    goto err;
334
 
  }
335
267
 
336
268
  param.keys--;                         /* TODO: check why we do this */
337
269
  param.sort_form= table;
338
270
  param.end=(param.local_sortorder=sortorder)+s_length;
339
 
  if ((records= find_all_keys(&param,select,sort_keys, &buffpek_pointers,
340
 
                              &tempfile, selected_records_file)) == HA_POS_ERROR)
341
 
  {
 
271
  if ((records=find_all_keys(session, &param,select,sort_keys, &buffpek_pointers,
 
272
                             &tempfile, selected_records_file)) ==
 
273
      HA_POS_ERROR)
342
274
    goto err;
343
 
  }
344
 
  maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek_inst));
 
275
  maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek));
345
276
 
346
277
  if (maxbuffer == 0)                   // The whole set is in memory
347
278
  {
348
 
    if (param.save_index(sort_keys,(uint32_t) records, &table_sort))
349
 
    {
 
279
    if (save_index(&param,sort_keys,(uint32_t) records, &table_sort))
350
280
      goto err;
351
 
    }
352
281
  }
353
282
  else
354
283
  {
356
285
    {
357
286
      if (table_sort.buffpek)
358
287
        free(table_sort.buffpek);
359
 
      table_sort.buffpek = 0;
 
288
      table_sort.buffpek= 0;
360
289
    }
361
290
    if (!(table_sort.buffpek=
362
 
          (unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer, table_sort.buffpek)))
363
 
    {
 
291
          (unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer,
 
292
                                 table_sort.buffpek)))
364
293
      goto err;
365
 
    }
366
 
    buffpek_inst= (buffpek *) table_sort.buffpek;
 
294
    buffpek= (buffpek_st *) table_sort.buffpek;
367
295
    table_sort.buffpek_len= maxbuffer;
368
 
    buffpek_pointers.close_cached_file();
 
296
    close_cached_file(&buffpek_pointers);
369
297
        /* Open cached file if it isn't open */
370
 
    if (! my_b_inited(outfile) && outfile->open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME)))
371
 
    {
372
 
      goto err;
373
 
    }
374
 
 
375
 
    if (outfile->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
376
 
    {
377
 
      goto err;
378
 
    }
 
298
    if (! my_b_inited(outfile) &&
 
299
        open_cached_file(outfile,drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER,
 
300
                          MYF(MY_WME)))
 
301
      goto err;
 
302
    if (reinit_io_cache(outfile,internal::WRITE_CACHE,0L,0,0))
 
303
      goto err;
379
304
 
380
305
    /*
381
306
      Use also the space previously used by string pointers in sort_buffer
382
307
      for temporary key storage.
383
308
    */
384
 
    param.keys=((param.keys*(param.rec_length+sizeof(char*))) / param.rec_length-1);
385
 
 
 
309
    param.keys=((param.keys*(param.rec_length+sizeof(char*))) /
 
310
                param.rec_length-1);
386
311
    maxbuffer--;                                // Offset from 0
387
 
    if (merge_many_buff(&param,(unsigned char*) sort_keys,buffpek_inst,&maxbuffer, &tempfile))
388
 
    {
389
 
      goto err;
390
 
    }
391
 
 
392
 
    if (flush_io_cache(&tempfile) || tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
393
 
    {
394
 
      goto err;
395
 
    }
396
 
 
397
 
    if (merge_index(&param,(unsigned char*) sort_keys,buffpek_inst,maxbuffer,&tempfile, outfile))
398
 
    {
399
 
      goto err;
400
 
    }
 
312
    if (merge_many_buff(&param,(unsigned char*) sort_keys,buffpek,&maxbuffer,
 
313
                        &tempfile))
 
314
      goto err;
 
315
    if (flush_io_cache(&tempfile) ||
 
316
        reinit_io_cache(&tempfile,internal::READ_CACHE,0L,0,0))
 
317
      goto err;
 
318
    if (merge_index(&param,(unsigned char*) sort_keys,buffpek,maxbuffer,&tempfile,
 
319
                    outfile))
 
320
      goto err;
401
321
  }
402
 
 
403
322
  if (records > param.max_rows)
404
 
  {
405
 
    records= param.max_rows;
406
 
  }
 
323
    records=param.max_rows;
407
324
  error =0;
408
325
 
409
326
 err:
410
 
  if (not subselect || not subselect->is_uncacheable())
 
327
  if (param.tmp_buffer)
 
328
    if (param.tmp_buffer)
 
329
      free(param.tmp_buffer);
 
330
  if (!subselect || !subselect->is_uncacheable())
411
331
  {
412
332
    free(sort_keys);
413
333
    table_sort.sort_keys= 0;
414
 
    free(buffpek_inst);
 
334
    free(buffpek);
415
335
    table_sort.buffpek= 0;
416
336
    table_sort.buffpek_len= 0;
417
337
  }
418
 
 
419
 
  tempfile.close_cached_file();
420
 
  buffpek_pointers.close_cached_file();
421
 
 
 
338
  close_cached_file(&tempfile);
 
339
  close_cached_file(&buffpek_pointers);
422
340
  if (my_b_inited(outfile))
423
341
  {
424
342
    if (flush_io_cache(outfile))
425
 
    {
426
343
      error=1;
427
 
    }
428
344
    {
429
 
      internal::my_off_t save_pos= outfile->pos_in_file;
 
345
      internal::my_off_t save_pos=outfile->pos_in_file;
430
346
      /* For following reads */
431
 
      if (outfile->reinit_io_cache(internal::READ_CACHE,0L,0,0))
432
 
      {
 
347
      if (reinit_io_cache(outfile,internal::READ_CACHE,0L,0,0))
433
348
        error=1;
434
 
      }
435
349
      outfile->end_of_file=save_pos;
436
350
    }
437
351
  }
438
 
 
439
352
  if (error)
440
353
  {
441
354
    my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
443
356
  }
444
357
  else
445
358
  {
446
 
    getSession().status_var.filesort_rows+= (uint32_t) records;
 
359
    session->status_var.filesort_rows+= (uint32_t) records;
447
360
  }
448
 
  examined_rows= param.examined_rows;
449
 
  global_sort_buffer.sub(allocated_sort_memory);
450
 
  table->sort= table_sort;
 
361
  *examined_rows= param.examined_rows;
 
362
  memcpy(&table->sort, &table_sort, sizeof(filesort_info_st));
451
363
  DRIZZLE_FILESORT_DONE(error, records);
452
364
  return (error ? HA_POS_ERROR : records);
453
365
} /* filesort */
454
366
 
 
367
 
 
368
void Table::filesort_free_buffers(bool full)
 
369
{
 
370
  if (sort.record_pointers)
 
371
  {
 
372
    free((unsigned char*) sort.record_pointers);
 
373
    sort.record_pointers=0;
 
374
  }
 
375
  if (full)
 
376
  {
 
377
    if (sort.sort_keys )
 
378
    {
 
379
      if ((unsigned char*) sort.sort_keys)
 
380
        free((unsigned char*) sort.sort_keys);
 
381
      sort.sort_keys= 0;
 
382
    }
 
383
    if (sort.buffpek)
 
384
    {
 
385
      if ((unsigned char*) sort.buffpek)
 
386
        free((unsigned char*) sort.buffpek);
 
387
      sort.buffpek= 0;
 
388
      sort.buffpek_len= 0;
 
389
    }
 
390
  }
 
391
  if (sort.addon_buf)
 
392
  {
 
393
    free((char *) sort.addon_buf);
 
394
    free((char *) sort.addon_field);
 
395
    sort.addon_buf=0;
 
396
    sort.addon_field=0;
 
397
  }
 
398
}
 
399
 
455
400
/** Make a array of string pointers. */
456
401
 
457
402
static char **make_char_array(char **old_pos, register uint32_t fields,
476
421
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffpek_pointers, uint32_t count,
477
422
                                     unsigned char *buf)
478
423
{
479
 
  uint32_t length= sizeof(buffpek)*count;
 
424
  uint32_t length= sizeof(buffpek_st)*count;
480
425
  unsigned char *tmp= buf;
481
 
  if (count > UINT_MAX/sizeof(buffpek))
482
 
    return 0; /* sizeof(buffpek)*count will overflow */
 
426
  if (count > UINT_MAX/sizeof(buffpek_st))
 
427
    return 0; /* sizeof(buffpek_st)*count will overflow */
483
428
  if (!tmp)
484
429
    tmp= (unsigned char *)malloc(length);
485
430
  if (tmp)
486
431
  {
487
 
    if (buffpek_pointers->reinit_io_cache(internal::READ_CACHE,0L,0,0) ||
 
432
    if (reinit_io_cache(buffpek_pointers,internal::READ_CACHE,0L,0,0) ||
488
433
        my_b_read(buffpek_pointers, (unsigned char*) tmp, length))
489
434
    {
490
435
      free((char*) tmp);
502
447
  @param param             Sorting parameter
503
448
  @param select            Use this to get source data
504
449
  @param sort_keys         Array of pointers to sort key + addon buffers.
505
 
  @param buffpek_pointers  File to write buffpeks describing sorted segments
 
450
  @param buffpek_pointers  File to write buffpek_sts describing sorted segments
506
451
                           in tempfile.
507
452
  @param tempfile          File to write sorted sequences of sortkeys to.
508
453
  @param indexfile         If !NULL, use it for source data (contains rowids)
516
461
       {
517
462
         sort sort_keys buffer;
518
463
         dump sorted sequence to 'tempfile';
519
 
         dump buffpek describing sequence location into 'buffpek_pointers';
 
464
         dump buffpek_st describing sequence location into 'buffpek_pointers';
520
465
       }
521
466
       put sort key into 'sort_keys';
522
467
     }
532
477
    HA_POS_ERROR on error.
533
478
*/
534
479
 
535
 
ha_rows FileSort::find_all_keys(SortParam *param, 
536
 
                                optimizer::SqlSelect *select,
537
 
                                unsigned char **sort_keys,
538
 
                                internal::IO_CACHE *buffpek_pointers,
539
 
                                internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
 
480
static ha_rows find_all_keys(Session *session,
 
481
                             SORTPARAM *param, 
 
482
                             optimizer::SqlSelect *select,
 
483
                             unsigned char **sort_keys,
 
484
                             internal::IO_CACHE *buffpek_pointers,
 
485
                             internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
540
486
{
541
487
  int error,flag,quick_select;
542
488
  uint32_t idx,indexpos,ref_length;
543
489
  unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
544
490
  internal::my_off_t record;
545
491
  Table *sort_form;
546
 
  volatile Session::killed_state_t *killed= getSession().getKilledPtr();
 
492
  volatile Session::killed_state *killed= &session->killed;
547
493
  Cursor *file;
548
 
  boost::dynamic_bitset<> *save_read_set= NULL;
549
 
  boost::dynamic_bitset<> *save_write_set= NULL;
 
494
  MyBitmap *save_read_set, *save_write_set;
550
495
 
551
496
  idx=indexpos=0;
552
497
  error=quick_select=0;
565
510
  {
566
511
    next_pos=(unsigned char*) 0;                        /* Find records in sequence */
567
512
    file->startTableScan(1);
568
 
    file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
 
513
    file->extra_opt(HA_EXTRA_CACHE,
 
514
                    session->variables.read_buff_size);
569
515
  }
570
516
 
571
517
  ReadRecord read_record_info;
574
520
    if (select->quick->reset())
575
521
      return(HA_POS_ERROR);
576
522
 
577
 
    read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1);
 
523
    read_record_info.init_read_record(session, select->quick->head, select, 1, 1);
578
524
  }
579
525
 
580
526
  /* Remember original bitmaps */
581
527
  save_read_set=  sort_form->read_set;
582
528
  save_write_set= sort_form->write_set;
583
529
  /* Set up temporary column read map for columns used by sort */
584
 
  sort_form->tmp_set.reset();
 
530
  sort_form->tmp_set.clearAll();
585
531
  /* Temporary set for register_used_fields and register_field_in_read_map */
586
532
  sort_form->read_set= &sort_form->tmp_set;
587
 
  param->register_used_fields();
 
533
  register_used_fields(param);
588
534
  if (select && select->cond)
589
535
    select->cond->walk(&Item::register_field_in_read_map, 1,
590
536
                       (unsigned char*) sort_form);
591
 
  sort_form->column_bitmaps_set(sort_form->tmp_set, sort_form->tmp_set);
 
537
  sort_form->column_bitmaps_set(&sort_form->tmp_set, &sort_form->tmp_set);
592
538
 
593
539
  for (;;)
594
540
  {
643
589
    {
644
590
      if (idx == param->keys)
645
591
      {
646
 
        if (param->write_keys(sort_keys, idx, buffpek_pointers, tempfile))
 
592
        if (write_keys(param,sort_keys,idx,buffpek_pointers,tempfile))
647
593
          return(HA_POS_ERROR);
648
594
        idx=0;
649
595
        indexpos++;
650
596
      }
651
 
      param->make_sortkey(sort_keys[idx++], ref_pos);
 
597
      make_sortkey(param,sort_keys[idx++],ref_pos);
652
598
    }
653
599
    else
654
 
    {
655
600
      file->unlock_row();
656
 
    }
657
 
 
658
601
    /* It does not make sense to read more keys in case of a fatal error */
659
 
    if (getSession().is_error())
 
602
    if (session->is_error())
660
603
      break;
661
604
  }
662
605
  if (quick_select)
674
617
      file->endTableScan();
675
618
  }
676
619
 
677
 
  if (getSession().is_error())
 
620
  if (session->is_error())
678
621
    return(HA_POS_ERROR);
679
622
 
680
623
  /* Signal we should use orignal column read and write maps */
681
 
  sort_form->column_bitmaps_set(*save_read_set, *save_write_set);
 
624
  sort_form->column_bitmaps_set(save_read_set, save_write_set);
682
625
 
683
626
  if (error != HA_ERR_END_OF_FILE)
684
627
  {
685
628
    sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
686
629
    return(HA_POS_ERROR);
687
630
  }
688
 
 
689
 
  if (indexpos && idx && param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
690
 
  {
 
631
  if (indexpos && idx &&
 
632
      write_keys(param,sort_keys,idx,buffpek_pointers,tempfile))
691
633
    return(HA_POS_ERROR);
692
 
  }
693
 
 
694
634
  return(my_b_inited(tempfile) ?
695
635
              (ha_rows) (my_b_tell(tempfile)/param->rec_length) :
696
636
              idx);
701
641
  @details
702
642
  Sort the buffer and write:
703
643
  -# the sorted sequence to tempfile
704
 
  -# a buffpek describing the sorted sequence position to buffpek_pointers
 
644
  -# a buffpek_st describing the sorted sequence position to buffpek_pointers
705
645
 
706
646
    (was: Skriver en buffert med nycklar till filen)
707
647
 
708
648
  @param param             Sort parameters
709
649
  @param sort_keys         Array of pointers to keys to sort
710
650
  @param count             Number of elements in sort_keys array
711
 
  @param buffpek_pointers  One 'buffpek' struct will be written into this file.
712
 
                           The buffpek::{file_pos, count} will indicate where
 
651
  @param buffpek_pointers  One 'buffpek_st' struct will be written into this file.
 
652
                           The buffpek_st::{file_pos, count} will indicate where
713
653
                           the sorted data was stored.
714
654
  @param tempfile          The sorted sequence will be written into this file.
715
655
 
719
659
    1 Error
720
660
*/
721
661
 
722
 
int SortParam::write_keys(register unsigned char **sort_keys, uint32_t count,
723
 
                          internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
 
662
static int
 
663
write_keys(SORTPARAM *param, register unsigned char **sort_keys, uint32_t count,
 
664
           internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
724
665
{
725
 
  buffpek buffpek;
 
666
  size_t sort_length, rec_length;
 
667
  unsigned char **end;
 
668
  buffpek_st buffpek;
726
669
 
 
670
  sort_length= param->sort_length;
 
671
  rec_length= param->rec_length;
727
672
  internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
728
673
  if (!my_b_inited(tempfile) &&
729
 
      tempfile->open_cached_file(drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
730
 
  {
731
 
    return 1;
732
 
  }
 
674
      open_cached_file(tempfile, drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE,
 
675
                       MYF(MY_WME)))
 
676
    goto err;
733
677
  /* check we won't have more buffpeks than we can possibly keep in memory */
734
 
  if (my_b_tell(buffpek_pointers) + sizeof(buffpek) > (uint64_t)UINT_MAX)
735
 
  {
736
 
    return 1;
737
 
  }
738
 
 
 
678
  if (my_b_tell(buffpek_pointers) + sizeof(buffpek_st) > (uint64_t)UINT_MAX)
 
679
    goto err;
739
680
  buffpek.file_pos= my_b_tell(tempfile);
740
 
  if ((ha_rows) count > max_rows)
741
 
    count=(uint32_t) max_rows;
742
 
 
 
681
  if ((ha_rows) count > param->max_rows)
 
682
    count=(uint32_t) param->max_rows;
743
683
  buffpek.count=(ha_rows) count;
744
 
 
745
 
  for (unsigned char **ptr= sort_keys + count ; sort_keys != ptr ; sort_keys++)
746
 
  {
 
684
  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
747
685
    if (my_b_write(tempfile, (unsigned char*) *sort_keys, (uint32_t) rec_length))
748
 
    {
749
 
      return 1;
750
 
    }
751
 
  }
752
 
 
 
686
      goto err;
753
687
  if (my_b_write(buffpek_pointers, (unsigned char*) &buffpek, sizeof(buffpek)))
754
 
  {
755
 
    return 1;
756
 
  }
 
688
    goto err;
 
689
  return(0);
757
690
 
758
 
  return 0;
 
691
err:
 
692
  return(1);
759
693
} /* write_keys */
760
694
 
761
695
 
784
718
 
785
719
/** Make a sort-key from record. */
786
720
 
787
 
void SortParam::make_sortkey(register unsigned char *to, unsigned char *ref_pos)
 
721
static void make_sortkey(register SORTPARAM *param,
 
722
                         register unsigned char *to, unsigned char *ref_pos)
788
723
{
789
724
  Field *field;
790
725
  SortField *sort_field;
791
726
  size_t length;
792
727
 
793
 
  for (sort_field= local_sortorder ;
794
 
       sort_field != end ;
 
728
  for (sort_field=param->local_sortorder ;
 
729
       sort_field != param->end ;
795
730
       sort_field++)
796
731
  {
797
732
    bool maybe_null=0;
817
752
    {                                           // Item
818
753
      Item *item=sort_field->item;
819
754
      maybe_null= item->maybe_null;
820
 
 
821
755
      switch (sort_field->result_type) {
822
756
      case STRING_RESULT:
823
 
        {
824
 
          const CHARSET_INFO * const cs=item->collation.collation;
825
 
          char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
826
 
          int diff;
827
 
          uint32_t sort_field_length;
 
757
      {
 
758
        const CHARSET_INFO * const cs=item->collation.collation;
 
759
        char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
 
760
        int diff;
 
761
        uint32_t sort_field_length;
828
762
 
 
763
        if (maybe_null)
 
764
          *to++=1;
 
765
        /* All item->str() to use some extra byte for end null.. */
 
766
        String tmp((char*) to,sort_field->length+4,cs);
 
767
        String *res= item->str_result(&tmp);
 
768
        if (!res)
 
769
        {
829
770
          if (maybe_null)
830
 
            *to++=1;
831
 
          /* All item->str() to use some extra byte for end null.. */
832
 
          String tmp((char*) to,sort_field->length+4,cs);
833
 
          String *res= item->str_result(&tmp);
834
 
          if (!res)
835
 
          {
836
 
            if (maybe_null)
837
 
              memset(to-1, 0, sort_field->length+1);
838
 
            else
839
 
            {
840
 
              /*
841
 
                This should only happen during extreme conditions if we run out
842
 
                of memory or have an item marked not null when it can be null.
843
 
                This code is here mainly to avoid a hard crash in this case.
844
 
              */
845
 
              assert(0);
846
 
              memset(to, 0, sort_field->length);        // Avoid crash
847
 
            }
848
 
            break;
849
 
          }
850
 
          length= res->length();
851
 
          sort_field_length= sort_field->length - sort_field->suffix_length;
852
 
          diff=(int) (sort_field_length - length);
853
 
          if (diff < 0)
854
 
          {
855
 
            diff=0;
856
 
            length= sort_field_length;
857
 
          }
858
 
          if (sort_field->suffix_length)
859
 
          {
860
 
            /* Store length last in result_string */
861
 
            store_length(to + sort_field_length, length,
862
 
                         sort_field->suffix_length);
863
 
          }
864
 
          if (sort_field->need_strxnfrm)
865
 
          {
866
 
            char *from=(char*) res->ptr();
867
 
            uint32_t tmp_length;
868
 
            if ((unsigned char*) from == to)
869
 
            {
870
 
              set_if_smaller(length,sort_field->length);
871
 
              memcpy(tmp_buffer,from,length);
872
 
              from= tmp_buffer;
873
 
            }
874
 
            tmp_length= my_strnxfrm(cs,to,sort_field->length,
875
 
                                    (unsigned char*) from, length);
876
 
            assert(tmp_length == sort_field->length);
877
 
          }
 
771
            memset(to-1, 0, sort_field->length+1);
878
772
          else
879
773
          {
880
 
            my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
881
 
            cs->cset->fill(cs, (char *)to+length,diff,fill_char);
 
774
            /*
 
775
              This should only happen during extreme conditions if we run out
 
776
              of memory or have an item marked not null when it can be null.
 
777
              This code is here mainly to avoid a hard crash in this case.
 
778
            */
 
779
            assert(0);
 
780
            memset(to, 0, sort_field->length);  // Avoid crash
882
781
          }
883
782
          break;
884
783
        }
 
784
        length= res->length();
 
785
        sort_field_length= sort_field->length - sort_field->suffix_length;
 
786
        diff=(int) (sort_field_length - length);
 
787
        if (diff < 0)
 
788
        {
 
789
          diff=0;
 
790
          length= sort_field_length;
 
791
        }
 
792
        if (sort_field->suffix_length)
 
793
        {
 
794
          /* Store length last in result_string */
 
795
          store_length(to + sort_field_length, length,
 
796
                       sort_field->suffix_length);
 
797
        }
 
798
        if (sort_field->need_strxnfrm)
 
799
        {
 
800
          char *from=(char*) res->ptr();
 
801
          uint32_t tmp_length;
 
802
          if ((unsigned char*) from == to)
 
803
          {
 
804
            set_if_smaller(length,sort_field->length);
 
805
            memcpy(param->tmp_buffer,from,length);
 
806
            from=param->tmp_buffer;
 
807
          }
 
808
          tmp_length= my_strnxfrm(cs,to,sort_field->length,
 
809
                                  (unsigned char*) from, length);
 
810
          assert(tmp_length == sort_field->length);
 
811
        }
 
812
        else
 
813
        {
 
814
          my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
 
815
          cs->cset->fill(cs, (char *)to+length,diff,fill_char);
 
816
        }
 
817
        break;
 
818
      }
885
819
      case INT_RESULT:
886
 
        {
 
820
        {
887
821
          int64_t value= item->val_int_result();
888
822
          if (maybe_null)
889
823
          {
890
 
            *to++=1;
 
824
            *to++=1;
891
825
            if (item->null_value)
892
826
            {
893
827
              if (maybe_null)
899
833
              break;
900
834
            }
901
835
          }
902
 
          to[7]= (unsigned char) value;
903
 
          to[6]= (unsigned char) (value >> 8);
904
 
          to[5]= (unsigned char) (value >> 16);
905
 
          to[4]= (unsigned char) (value >> 24);
906
 
          to[3]= (unsigned char) (value >> 32);
907
 
          to[2]= (unsigned char) (value >> 40);
908
 
          to[1]= (unsigned char) (value >> 48);
 
836
          to[7]= (unsigned char) value;
 
837
          to[6]= (unsigned char) (value >> 8);
 
838
          to[5]= (unsigned char) (value >> 16);
 
839
          to[4]= (unsigned char) (value >> 24);
 
840
          to[3]= (unsigned char) (value >> 32);
 
841
          to[2]= (unsigned char) (value >> 40);
 
842
          to[1]= (unsigned char) (value >> 48);
909
843
          if (item->unsigned_flag)                    /* Fix sign */
910
844
            to[0]= (unsigned char) (value >> 56);
911
845
          else
912
846
            to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
913
 
          break;
914
 
        }
 
847
          break;
 
848
        }
915
849
      case DECIMAL_RESULT:
916
850
        {
917
851
          my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
928
862
          my_decimal2binary(E_DEC_FATAL_ERROR, dec_val, to,
929
863
                            item->max_length - (item->decimals ? 1:0),
930
864
                            item->decimals);
931
 
          break;
 
865
         break;
932
866
        }
933
867
      case REAL_RESULT:
934
 
        {
 
868
        {
935
869
          double value= item->val_result();
936
 
          if (maybe_null)
 
870
          if (maybe_null)
937
871
          {
938
872
            if (item->null_value)
939
873
            {
941
875
              to++;
942
876
              break;
943
877
            }
944
 
            *to++=1;
 
878
            *to++=1;
945
879
          }
946
 
          change_double_for_sort(value,(unsigned char*) to);
947
 
          break;
948
 
        }
 
880
          change_double_for_sort(value,(unsigned char*) to);
 
881
          break;
 
882
        }
949
883
      case ROW_RESULT:
950
884
      default:
951
 
        // This case should never be choosen
952
 
        assert(0);
953
 
        break;
 
885
        // This case should never be choosen
 
886
        assert(0);
 
887
        break;
954
888
      }
955
889
    }
956
 
 
957
890
    if (sort_field->reverse)
958
891
    {                                                   /* Revers key */
959
892
      if (maybe_null)
966
899
      }
967
900
    }
968
901
    else
969
 
    {
970
902
      to+= sort_field->length;
971
 
    }
972
903
  }
973
904
 
974
 
  if (addon_field)
 
905
  if (param->addon_field)
975
906
  {
976
907
    /*
977
908
      Save field values appended to sorted fields.
979
910
      In this implementation we use fixed layout for field values -
980
911
      the same for all records.
981
912
    */
982
 
    sort_addon_field *addonf= addon_field;
 
913
    sort_addon_field_st *addonf= param->addon_field;
983
914
    unsigned char *nulls= to;
984
915
    assert(addonf != 0);
985
916
    memset(nulls, 0, addonf->offset);
989
920
      if (addonf->null_bit && field->is_null())
990
921
      {
991
922
        nulls[addonf->null_offset]|= addonf->null_bit;
992
 
#ifdef HAVE_VALGRIND
 
923
#ifdef HAVE_purify
993
924
        memset(to, 0, addonf->length);
994
925
#endif
995
926
      }
996
927
      else
997
928
      {
998
 
#ifdef HAVE_VALGRIND
 
929
#ifdef HAVE_purify
999
930
        unsigned char *end= field->pack(to, field->ptr);
1000
 
        uint32_t local_length= (uint32_t) ((to + addonf->length) - end);
1001
 
        assert((int) local_length >= 0);
1002
 
        if (local_length)
1003
 
          memset(end, 0, local_length);
 
931
        uint32_t length= (uint32_t) ((to + addonf->length) - end);
 
932
        assert((int) length >= 0);
 
933
        if (length)
 
934
          memset(end, 0, length);
1004
935
#else
1005
936
        (void) field->pack(to, field->ptr);
1006
937
#endif
1011
942
  else
1012
943
  {
1013
944
    /* Save filepos last */
1014
 
    memcpy(to, ref_pos, (size_t) ref_length);
 
945
    memcpy(to, ref_pos, (size_t) param->ref_length);
1015
946
  }
 
947
  return;
1016
948
}
1017
949
 
1018
950
 
1020
952
  Register fields used by sorting in the sorted table's read set
1021
953
*/
1022
954
 
1023
 
void SortParam::register_used_fields()
 
955
static void register_used_fields(SORTPARAM *param)
1024
956
{
1025
957
  SortField *sort_field;
1026
 
  Table *table= sort_form;
 
958
  Table *table=param->sort_form;
1027
959
 
1028
 
  for (sort_field= local_sortorder ;
1029
 
       sort_field != end ;
 
960
  for (sort_field= param->local_sortorder ;
 
961
       sort_field != param->end ;
1030
962
       sort_field++)
1031
963
  {
1032
964
    Field *field;
1033
965
    if ((field= sort_field->field))
1034
966
    {
1035
967
      if (field->getTable() == table)
1036
 
        table->setReadSet(field->position());
 
968
        table->setReadSet(field->field_index);
1037
969
    }
1038
970
    else
1039
971
    {                                           // Item
1042
974
    }
1043
975
  }
1044
976
 
1045
 
  if (addon_field)
 
977
  if (param->addon_field)
1046
978
  {
1047
 
    sort_addon_field *addonf= addon_field;
 
979
    sort_addon_field_st *addonf= param->addon_field;
1048
980
    Field *field;
1049
981
    for ( ; (field= addonf->field) ; addonf++)
1050
 
      table->setReadSet(field->position());
 
982
      table->setReadSet(field->field_index);
1051
983
  }
1052
984
  else
1053
985
  {
1057
989
}
1058
990
 
1059
991
 
1060
 
bool SortParam::save_index(unsigned char **sort_keys, uint32_t count, filesort_info *table_sort)
 
992
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
 
993
                       filesort_info_st *table_sort)
1061
994
{
1062
 
  uint32_t offset;
 
995
  uint32_t offset,res_length;
1063
996
  unsigned char *to;
1064
997
 
1065
 
  internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
1066
 
  offset= rec_length - res_length;
1067
 
 
1068
 
  if ((ha_rows) count > max_rows)
1069
 
    count=(uint32_t) max_rows;
1070
 
 
1071
 
  if (!(to= table_sort->record_pointers= (unsigned char*) malloc(res_length*count)))
1072
 
    return true;
1073
 
 
1074
 
  for (unsigned char **end_ptr= sort_keys+count ; sort_keys != end_ptr ; sort_keys++)
 
998
  internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, param->sort_length);
 
999
  res_length= param->res_length;
 
1000
  offset= param->rec_length-res_length;
 
1001
  if ((ha_rows) count > param->max_rows)
 
1002
    count=(uint32_t) param->max_rows;
 
1003
  if (!(to= table_sort->record_pointers=
 
1004
        (unsigned char*) malloc(res_length*count)))
 
1005
    return(1);
 
1006
  for (unsigned char **end= sort_keys+count ; sort_keys != end ; sort_keys++)
1075
1007
  {
1076
1008
    memcpy(to, *sort_keys+offset, res_length);
1077
1009
    to+= res_length;
1078
1010
  }
1079
 
 
1080
 
  return false;
 
1011
  return(0);
1081
1012
}
1082
1013
 
1083
1014
 
1084
1015
/** Merge buffers to make < MERGEBUFF2 buffers. */
1085
1016
 
1086
 
int FileSort::merge_many_buff(SortParam *param, unsigned char *sort_buffer,
1087
 
                              buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
 
1017
int merge_many_buff(SORTPARAM *param, unsigned char *sort_buffer,
 
1018
                    buffpek_st *buffpek, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
1088
1019
{
 
1020
  register uint32_t i;
1089
1021
  internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
1090
 
  buffpek *lastbuff;
 
1022
  buffpek_st *lastbuff;
1091
1023
 
1092
1024
  if (*maxbuffer < MERGEBUFF2)
1093
 
    return 0;
 
1025
    return(0);
1094
1026
  if (flush_io_cache(t_file) ||
1095
 
      t_file2.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE, MYF(MY_WME)))
1096
 
  {
1097
 
    return 1;
1098
 
  }
 
1027
      open_cached_file(&t_file2,drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE,
 
1028
                        MYF(MY_WME)))
 
1029
    return(1);
1099
1030
 
1100
1031
  from_file= t_file ; to_file= &t_file2;
1101
1032
  while (*maxbuffer >= MERGEBUFF2)
1102
1033
  {
1103
 
    register uint32_t i;
1104
 
 
1105
 
    if (from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0))
1106
 
    {
1107
 
      break;
1108
 
    }
1109
 
 
1110
 
    if (to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
1111
 
    {
1112
 
      break;
1113
 
    }
1114
 
 
1115
 
    lastbuff=buffpek_inst;
 
1034
    if (reinit_io_cache(from_file,internal::READ_CACHE,0L,0,0))
 
1035
      goto cleanup;
 
1036
    if (reinit_io_cache(to_file,internal::WRITE_CACHE,0L,0,0))
 
1037
      goto cleanup;
 
1038
    lastbuff=buffpek;
1116
1039
    for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
1117
1040
    {
1118
1041
      if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
1119
 
                        buffpek_inst+i,buffpek_inst+i+MERGEBUFF-1,0))
1120
 
      {
1121
 
        goto cleanup;
1122
 
      }
 
1042
                        buffpek+i,buffpek+i+MERGEBUFF-1,0))
 
1043
      goto cleanup;
1123
1044
    }
1124
 
 
1125
1045
    if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
1126
 
                      buffpek_inst+i,buffpek_inst+ *maxbuffer,0))
1127
 
    {
 
1046
                      buffpek+i,buffpek+ *maxbuffer,0))
1128
1047
      break;
1129
 
    }
1130
 
 
1131
1048
    if (flush_io_cache(to_file))
1132
 
    {
1133
1049
      break;
1134
 
    }
1135
 
 
1136
1050
    temp=from_file; from_file=to_file; to_file=temp;
1137
 
    from_file->setup_io_cache();
1138
 
    to_file->setup_io_cache();
1139
 
    *maxbuffer= (uint32_t) (lastbuff-buffpek_inst)-1;
 
1051
    setup_io_cache(from_file);
 
1052
    setup_io_cache(to_file);
 
1053
    *maxbuffer= (uint32_t) (lastbuff-buffpek)-1;
1140
1054
  }
1141
 
 
1142
1055
cleanup:
1143
 
  to_file->close_cached_file();                 // This holds old result
 
1056
  close_cached_file(to_file);                   // This holds old result
1144
1057
  if (to_file == t_file)
1145
1058
  {
1146
1059
    *t_file=t_file2;                            // Copy result file
1147
 
    t_file->setup_io_cache();
 
1060
    setup_io_cache(t_file);
1148
1061
  }
1149
1062
 
1150
1063
  return(*maxbuffer >= MERGEBUFF2);     /* Return 1 if interrupted */
1158
1071
    (uint32_t)-1 if something goes wrong
1159
1072
*/
1160
1073
 
1161
 
uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
 
1074
uint32_t read_to_buffer(internal::IO_CACHE *fromfile, buffpek_st *buffpek,
 
1075
                        uint32_t rec_length)
1162
1076
{
1163
1077
  register uint32_t count;
1164
1078
  uint32_t length;
1165
1079
 
1166
 
  if ((count= (uint32_t) min((ha_rows) buffpek_inst->max_keys,buffpek_inst->count)))
 
1080
  if ((count= (uint32_t) min((ha_rows) buffpek->max_keys,buffpek->count)))
1167
1081
  {
1168
 
    if (pread(fromfile->file,(unsigned char*) buffpek_inst->base, (length= rec_length*count),buffpek_inst->file_pos) == 0)
 
1082
    if (pread(fromfile->file,(unsigned char*) buffpek->base, (length= rec_length*count),buffpek->file_pos) == 0)
1169
1083
      return((uint32_t) -1);
1170
1084
 
1171
 
    buffpek_inst->key= buffpek_inst->base;
1172
 
    buffpek_inst->file_pos+= length;                    /* New filepos */
1173
 
    buffpek_inst->count-= count;
1174
 
    buffpek_inst->mem_count= count;
 
1085
    buffpek->key= buffpek->base;
 
1086
    buffpek->file_pos+= length;                 /* New filepos */
 
1087
    buffpek->count-= count;
 
1088
    buffpek->mem_count= count;
1175
1089
  }
1176
1090
  return (count*rec_length);
1177
1091
} /* read_to_buffer */
1181
1095
{
1182
1096
  qsort2_cmp key_compare;
1183
1097
  void *key_compare_arg;
1184
 
 
1185
1098
  public:
1186
 
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg) :
1187
 
    key_compare(in_key_compare),
1188
 
    key_compare_arg(in_compare_arg)
1189
 
  { }
1190
 
  
1191
 
  inline bool operator()(const buffpek *i, const buffpek *j) const
 
1099
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
 
1100
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
 
1101
  inline bool operator()(const buffpek_st *i, const buffpek_st *j) const
1192
1102
  {
1193
 
    int val= key_compare(key_compare_arg, &i->key, &j->key);
1194
 
 
 
1103
    int val= key_compare(key_compare_arg,
 
1104
                      &i->key, &j->key);
1195
1105
    return (val >= 0);
1196
1106
  }
1197
1107
};
1201
1111
  Merge buffers to one buffer.
1202
1112
 
1203
1113
  @param param        Sort parameter
1204
 
  @param from_file    File with source data (buffpeks point to this file)
 
1114
  @param from_file    File with source data (buffpek_sts point to this file)
1205
1115
  @param to_file      File to write the sorted result data.
1206
1116
  @param sort_buffer  Buffer for data to store up to MERGEBUFF2 sort keys.
1207
 
  @param lastbuff     OUT Store here buffpek describing data written to to_file
1208
 
  @param Fb           First element in source buffpeks array
1209
 
  @param Tb           Last element in source buffpeks array
 
1117
  @param lastbuff     OUT Store here buffpek_st describing data written to to_file
 
1118
  @param Fb           First element in source buffpek_sts array
 
1119
  @param Tb           Last element in source buffpek_sts array
1210
1120
  @param flag
1211
1121
 
1212
1122
  @retval
1215
1125
    other  error
1216
1126
*/
1217
1127
 
1218
 
int FileSort::merge_buffers(SortParam *param, internal::IO_CACHE *from_file,
1219
 
                            internal::IO_CACHE *to_file, unsigned char *sort_buffer,
1220
 
                            buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
1221
 
                            int flag)
 
1128
int merge_buffers(SORTPARAM *param, internal::IO_CACHE *from_file,
 
1129
                  internal::IO_CACHE *to_file, unsigned char *sort_buffer,
 
1130
                  buffpek_st *lastbuff, buffpek_st *Fb, buffpek_st *Tb,
 
1131
                  int flag)
1222
1132
{
1223
1133
  int error;
1224
1134
  uint32_t rec_length,res_length,offset;
1227
1137
  ha_rows max_rows,org_max_rows;
1228
1138
  internal::my_off_t to_start_filepos;
1229
1139
  unsigned char *strpos;
1230
 
  buffpek *buffpek_inst;
 
1140
  buffpek_st *buffpek;
1231
1141
  qsort2_cmp cmp;
1232
1142
  void *first_cmp_arg;
1233
 
  volatile Session::killed_state_t *killed= getSession().getKilledPtr();
1234
 
  Session::killed_state_t not_killable;
 
1143
  volatile Session::killed_state *killed= &current_session->killed;
 
1144
  Session::killed_state not_killable;
1235
1145
 
1236
 
  getSession().status_var.filesort_merge_passes++;
 
1146
  current_session->status_var.filesort_merge_passes++;
1237
1147
  if (param->not_killable)
1238
1148
  {
1239
1149
    killed= &not_killable;
1263
1173
    cmp= internal::get_ptr_compare(sort_length);
1264
1174
    first_cmp_arg= (void*) &sort_length;
1265
1175
  }
1266
 
  priority_queue<buffpek *, vector<buffpek *>, compare_functor >
 
1176
  priority_queue<buffpek_st *, vector<buffpek_st *>, compare_functor > 
1267
1177
    queue(compare_functor(cmp, first_cmp_arg));
1268
 
  for (buffpek_inst= Fb ; buffpek_inst <= Tb ; buffpek_inst++)
 
1178
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1269
1179
  {
1270
 
    buffpek_inst->base= strpos;
1271
 
    buffpek_inst->max_keys= maxcount;
1272
 
    strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek_inst,
 
1180
    buffpek->base= strpos;
 
1181
    buffpek->max_keys= maxcount;
 
1182
    strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek,
1273
1183
                                                                         rec_length));
1274
1184
    if (error == -1)
1275
 
      return -1;
1276
 
 
1277
 
    buffpek_inst->max_keys= buffpek_inst->mem_count;    // If less data in buffers than expected
1278
 
    queue.push(buffpek_inst);
 
1185
      goto err;
 
1186
    buffpek->max_keys= buffpek->mem_count;      // If less data in buffers than expected
 
1187
    queue.push(buffpek);
1279
1188
  }
1280
1189
 
1281
1190
  if (param->unique_buff)
1288
1197
       This is safe as we know that there is always more than one element
1289
1198
       in each block to merge (This is guaranteed by the Unique:: algorithm
1290
1199
    */
1291
 
    buffpek_inst= queue.top();
1292
 
    memcpy(param->unique_buff, buffpek_inst->key, rec_length);
1293
 
    if (my_b_write(to_file, (unsigned char*) buffpek_inst->key, rec_length))
 
1200
    buffpek= queue.top();
 
1201
    memcpy(param->unique_buff, buffpek->key, rec_length);
 
1202
    if (my_b_write(to_file, (unsigned char*) buffpek->key, rec_length))
1294
1203
    {
1295
 
      return 1;
 
1204
      error=1; goto err;
1296
1205
    }
1297
 
    buffpek_inst->key+= rec_length;
1298
 
    buffpek_inst->mem_count--;
 
1206
    buffpek->key+= rec_length;
 
1207
    buffpek->mem_count--;
1299
1208
    if (!--max_rows)
1300
1209
    {
1301
1210
      error= 0;
1303
1212
    }
1304
1213
    /* Top element has been used */
1305
1214
    queue.pop();
1306
 
    queue.push(buffpek_inst);
 
1215
    queue.push(buffpek);
1307
1216
  }
1308
1217
  else
1309
 
  {
1310
1218
    cmp= 0;                                        // Not unique
1311
 
  }
1312
1219
 
1313
1220
  while (queue.size() > 1)
1314
1221
  {
1315
1222
    if (*killed)
1316
1223
    {
1317
 
      return 1;
 
1224
      error= 1; goto err;
1318
1225
    }
1319
1226
    for (;;)
1320
1227
    {
1321
 
      buffpek_inst= queue.top();
 
1228
      buffpek= queue.top();
1322
1229
      if (cmp)                                        // Remove duplicates
1323
1230
      {
1324
1231
        if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
1325
 
                    (unsigned char**) &buffpek_inst->key))
 
1232
                    (unsigned char**) &buffpek->key))
1326
1233
              goto skip_duplicate;
1327
 
            memcpy(param->unique_buff, buffpek_inst->key, rec_length);
 
1234
            memcpy(param->unique_buff, buffpek->key, rec_length);
1328
1235
      }
1329
1236
      if (flag == 0)
1330
1237
      {
1331
 
        if (my_b_write(to_file,(unsigned char*) buffpek_inst->key, rec_length))
 
1238
        if (my_b_write(to_file,(unsigned char*) buffpek->key, rec_length))
1332
1239
        {
1333
 
          return 1;
 
1240
          error=1; goto err;
1334
1241
        }
1335
1242
      }
1336
1243
      else
1337
1244
      {
1338
 
        if (my_b_write(to_file, (unsigned char*) buffpek_inst->key+offset, res_length))
 
1245
        if (my_b_write(to_file, (unsigned char*) buffpek->key+offset, res_length))
1339
1246
        {
1340
 
          return 1;
 
1247
          error=1; goto err;
1341
1248
        }
1342
1249
      }
1343
1250
      if (!--max_rows)
1347
1254
      }
1348
1255
 
1349
1256
    skip_duplicate:
1350
 
      buffpek_inst->key+= rec_length;
1351
 
      if (! --buffpek_inst->mem_count)
 
1257
      buffpek->key+= rec_length;
 
1258
      if (! --buffpek->mem_count)
1352
1259
      {
1353
 
        if (!(error= (int) read_to_buffer(from_file,buffpek_inst,
 
1260
        if (!(error= (int) read_to_buffer(from_file,buffpek,
1354
1261
                                          rec_length)))
1355
1262
        {
1356
1263
          queue.pop();
1357
1264
          break;                        /* One buffer have been removed */
1358
1265
        }
1359
1266
        else if (error == -1)
1360
 
        {
1361
 
          return -1;
1362
 
        }
 
1267
          goto err;
1363
1268
      }
1364
1269
      /* Top element has been replaced */
1365
1270
      queue.pop();
1366
 
      queue.push(buffpek_inst);
 
1271
      queue.push(buffpek);
1367
1272
    }
1368
1273
  }
1369
 
  buffpek_inst= queue.top();
1370
 
  buffpek_inst->base= sort_buffer;
1371
 
  buffpek_inst->max_keys= param->keys;
 
1274
  buffpek= queue.top();
 
1275
  buffpek->base= sort_buffer;
 
1276
  buffpek->max_keys= param->keys;
1372
1277
 
1373
1278
  /*
1374
1279
    As we know all entries in the buffer are unique, we only have to
1376
1281
  */
1377
1282
  if (cmp)
1378
1283
  {
1379
 
    if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek_inst->key))
 
1284
    if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek->key))
1380
1285
    {
1381
 
      buffpek_inst->key+= rec_length;         // Remove duplicate
1382
 
      --buffpek_inst->mem_count;
 
1286
      buffpek->key+= rec_length;         // Remove duplicate
 
1287
      --buffpek->mem_count;
1383
1288
    }
1384
1289
  }
1385
1290
 
1386
1291
  do
1387
1292
  {
1388
 
    if ((ha_rows) buffpek_inst->mem_count > max_rows)
 
1293
    if ((ha_rows) buffpek->mem_count > max_rows)
1389
1294
    {                                        /* Don't write too many records */
1390
 
      buffpek_inst->mem_count= (uint32_t) max_rows;
1391
 
      buffpek_inst->count= 0;                        /* Don't read more */
 
1295
      buffpek->mem_count= (uint32_t) max_rows;
 
1296
      buffpek->count= 0;                        /* Don't read more */
1392
1297
    }
1393
 
    max_rows-= buffpek_inst->mem_count;
 
1298
    max_rows-= buffpek->mem_count;
1394
1299
    if (flag == 0)
1395
1300
    {
1396
 
      if (my_b_write(to_file,(unsigned char*) buffpek_inst->key,
1397
 
                     (rec_length*buffpek_inst->mem_count)))
 
1301
      if (my_b_write(to_file,(unsigned char*) buffpek->key,
 
1302
                     (rec_length*buffpek->mem_count)))
1398
1303
      {
1399
 
        return 1;
 
1304
        error= 1; goto err;
1400
1305
      }
1401
1306
    }
1402
1307
    else
1403
1308
    {
1404
1309
      register unsigned char *end;
1405
 
      strpos= buffpek_inst->key+offset;
1406
 
      for (end= strpos+buffpek_inst->mem_count*rec_length ;
 
1310
      strpos= buffpek->key+offset;
 
1311
      for (end= strpos+buffpek->mem_count*rec_length ;
1407
1312
           strpos != end ;
1408
1313
           strpos+= rec_length)
1409
1314
      {
1410
1315
        if (my_b_write(to_file, (unsigned char *) strpos, res_length))
1411
1316
        {
1412
 
          return 1;
 
1317
          error=1; goto err;
1413
1318
        }
1414
1319
      }
1415
1320
    }
1416
1321
  }
1417
 
 
1418
 
  while ((error=(int) read_to_buffer(from_file,buffpek_inst, rec_length))
 
1322
  while ((error=(int) read_to_buffer(from_file,buffpek, rec_length))
1419
1323
         != -1 && error != 0);
1420
1324
 
1421
1325
end:
1422
1326
  lastbuff->count= min(org_max_rows-max_rows, param->max_rows);
1423
1327
  lastbuff->file_pos= to_start_filepos;
1424
 
 
1425
 
  return error;
 
1328
err:
 
1329
  return(error);
1426
1330
} /* merge_buffers */
1427
1331
 
1428
1332
 
1429
1333
        /* Do a merge to output-file (save only positions) */
1430
1334
 
1431
 
int FileSort::merge_index(SortParam *param, unsigned char *sort_buffer,
1432
 
                          buffpek *buffpek_inst, uint32_t maxbuffer,
1433
 
                          internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
 
1335
static int merge_index(SORTPARAM *param, unsigned char *sort_buffer,
 
1336
                       buffpek_st *buffpek, uint32_t maxbuffer,
 
1337
                       internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
1434
1338
{
1435
 
  if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek_inst,buffpek_inst,
1436
 
                    buffpek_inst+maxbuffer,1))
1437
 
    return 1;
1438
 
 
1439
 
  return 0;
 
1339
  if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek,buffpek,
 
1340
                    buffpek+maxbuffer,1))
 
1341
    return(1);
 
1342
  return(0);
1440
1343
} /* merge_index */
1441
1344
 
1442
1345
 
1456
1359
/**
1457
1360
  Calculate length of sort key.
1458
1361
 
 
1362
  @param session                          Thread Cursor
1459
1363
  @param sortorder                Order of items to sort
1460
1364
  @param s_length                 Number of items to sort
1461
1365
  @param[out] multi_byte_charset Set to 1 if we are using multi-byte charset
1470
1374
    Total length of sort buffer in bytes
1471
1375
*/
1472
1376
 
1473
 
uint32_t FileSort::sortlength(SortField *sortorder, uint32_t s_length, bool *multi_byte_charset)
 
1377
static uint32_t
 
1378
sortlength(Session *session, SortField *sortorder, uint32_t s_length,
 
1379
           bool *multi_byte_charset)
1474
1380
{
1475
1381
  register uint32_t length;
1476
1382
  const CHARSET_INFO *cs;
1500
1406
      sortorder->result_type= sortorder->item->result_type();
1501
1407
      if (sortorder->item->result_as_int64_t())
1502
1408
        sortorder->result_type= INT_RESULT;
1503
 
 
1504
1409
      switch (sortorder->result_type) {
1505
1410
      case STRING_RESULT:
1506
 
        sortorder->length=sortorder->item->max_length;
 
1411
        sortorder->length=sortorder->item->max_length;
1507
1412
        set_if_smaller(sortorder->length,
1508
 
                       getSession().variables.max_sort_length);
1509
 
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1510
 
        {
 
1413
                       session->variables.max_sort_length);
 
1414
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
 
1415
        {
1511
1416
          sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1512
 
          sortorder->need_strxnfrm= 1;
1513
 
          *multi_byte_charset= 1;
1514
 
        }
 
1417
          sortorder->need_strxnfrm= 1;
 
1418
          *multi_byte_charset= 1;
 
1419
        }
1515
1420
        else if (cs == &my_charset_bin)
1516
1421
        {
1517
1422
          /* Store length last to be able to sort blob/varbinary */
1518
1423
          sortorder->suffix_length= suffix_length(sortorder->length);
1519
1424
          sortorder->length+= sortorder->suffix_length;
1520
1425
        }
1521
 
        break;
 
1426
        break;
1522
1427
      case INT_RESULT:
1523
 
        sortorder->length=8;                    // Size of intern int64_t
1524
 
        break;
 
1428
        sortorder->length=8;                    // Size of intern int64_t
 
1429
        break;
1525
1430
      case DECIMAL_RESULT:
1526
1431
        sortorder->length=
1527
1432
          my_decimal_get_binary_size(sortorder->item->max_length -
1529
1434
                                     sortorder->item->decimals);
1530
1435
        break;
1531
1436
      case REAL_RESULT:
1532
 
        sortorder->length=sizeof(double);
1533
 
        break;
 
1437
        sortorder->length=sizeof(double);
 
1438
        break;
1534
1439
      case ROW_RESULT:
1535
 
        // This case should never be choosen
1536
 
        assert(0);
1537
 
        break;
 
1440
      default:
 
1441
        // This case should never be choosen
 
1442
        assert(0);
 
1443
        break;
1538
1444
      }
1539
1445
      if (sortorder->item->maybe_null)
1540
 
        length++;                               // Place for NULL marker
 
1446
        length++;                               // Place for NULL marker
1541
1447
    }
1542
 
    set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
 
1448
    set_if_smaller(sortorder->length,
 
1449
                   (size_t)session->variables.max_sort_length);
1543
1450
    length+=sortorder->length;
1544
1451
  }
1545
1452
  sortorder->field= (Field*) 0;                 // end marker
1559
1466
  layouts for the values of the non-sorted fields in the buffer and
1560
1467
  fills them.
1561
1468
 
 
1469
  @param session                 Current thread
1562
1470
  @param ptabfield           Array of references to the table fields
1563
1471
  @param sortlength          Total length of sorted fields
1564
1472
  @param[out] plength        Total length of appended fields
1573
1481
    NULL   if we do not store field values with sort data.
1574
1482
*/
1575
1483
 
1576
 
sort_addon_field *FileSort::get_addon_fields(Field **ptabfield, uint32_t sortlength_arg, uint32_t *plength)
 
1484
static sort_addon_field_st *
 
1485
get_addon_fields(Session *session, Field **ptabfield, uint32_t sortlength, uint32_t *plength)
1577
1486
{
1578
1487
  Field **pfield;
1579
1488
  Field *field;
1580
 
  sort_addon_field *addonf;
 
1489
  sort_addon_field_st *addonf;
1581
1490
  uint32_t length= 0;
1582
1491
  uint32_t fields= 0;
1583
1492
  uint32_t null_fields= 0;
1608
1517
    return 0;
1609
1518
  length+= (null_fields+7)/8;
1610
1519
 
1611
 
  if (length+sortlength_arg > getSession().variables.max_length_for_sort_data ||
1612
 
      !(addonf= (sort_addon_field *) malloc(sizeof(sort_addon_field)*
 
1520
  if (length+sortlength > session->variables.max_length_for_sort_data ||
 
1521
      !(addonf= (sort_addon_field_st *) malloc(sizeof(sort_addon_field_st)*
1613
1522
                                            (fields+1))))
1614
1523
    return 0;
1615
1524
 
1659
1568
*/
1660
1569
 
1661
1570
static void
1662
 
unpack_addon_fields(sort_addon_field *addon_field, unsigned char *buff)
 
1571
unpack_addon_fields(struct sort_addon_field_st *addon_field, unsigned char *buff)
1663
1572
{
1664
1573
  Field *field;
1665
 
  sort_addon_field *addonf= addon_field;
 
1574
  sort_addon_field_st *addonf= addon_field;
1666
1575
 
1667
1576
  for ( ; (field= addonf->field) ; addonf++)
1668
1577
  {