33
#include "drizzled/drizzled.h"
34
#include "drizzled/sql_sort.h"
35
#include "drizzled/filesort.h"
36
#include "drizzled/error.h"
37
#include "drizzled/probes.h"
38
#include "drizzled/session.h"
39
#include "drizzled/table.h"
40
#include "drizzled/table_list.h"
41
#include "drizzled/optimizer/range.h"
42
#include "drizzled/records.h"
43
#include "drizzled/internal/iocache.h"
44
#include "drizzled/internal/my_sys.h"
45
#include "plugin/myisam/myisam.h"
46
#include "drizzled/plugin/transactional_storage_engine.h"
47
#include "drizzled/atomics.h"
48
#include "drizzled/global_buffer.h"
56
/* Defines used by filesort and uniques */
60
class BufferCompareContext
63
qsort_cmp2 key_compare;
64
void *key_compare_arg;
66
BufferCompareContext() :
75
uint32_t rec_length; /* Length of sorted records */
76
uint32_t sort_length; /* Length of sorted columns */
77
uint32_t ref_length; /* Length of record ref. */
78
uint32_t addon_length; /* Length of added packed fields */
79
uint32_t res_length; /* Length of records in final sorted file/buffer */
80
uint32_t keys; /* Max keys / buffer */
81
ha_rows max_rows,examined_rows;
82
Table *sort_form; /* For quicker make_sortkey */
83
SortField *local_sortorder;
85
sort_addon_field *addon_field; /* Descriptors for companion fields */
86
unsigned char *unique_buff;
89
/* The fields below are used only by Unique class */
91
BufferCompareContext cmp_context;
119
int write_keys(unsigned char * *sort_keys,
121
internal::IO_CACHE *buffer_file,
122
internal::IO_CACHE *tempfile);
124
void make_sortkey(unsigned char *to,
125
unsigned char *ref_pos);
126
void register_used_fields();
127
bool save_index(unsigned char **sort_keys,
129
filesort_info *table_sort);
24
#include <drizzled/server_includes.h>
25
#include <drizzled/sql_sort.h>
26
#include <drizzled/error.h>
27
#include <drizzled/probes.h>
28
#include <drizzled/session.h>
29
#include <drizzled/table.h>
30
#include <drizzled/table_list.h>
133
32
/* functions defined in this file */
135
34
static char **make_char_array(char **old_pos, register uint32_t fields,
138
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffer_file,
36
static unsigned char *read_buffpek_from_file(IO_CACHE *buffer_file, uint32_t count,
38
static ha_rows find_all_keys(SORTPARAM *param,SQL_SELECT *select,
39
unsigned char * *sort_keys, IO_CACHE *buffer_file,
40
IO_CACHE *tempfile,IO_CACHE *indexfile);
41
static int write_keys(SORTPARAM *param,unsigned char * *sort_keys,
42
uint32_t count, IO_CACHE *buffer_file, IO_CACHE *tempfile);
43
static void make_sortkey(SORTPARAM *param,unsigned char *to, unsigned char *ref_pos);
44
static void register_used_fields(SORTPARAM *param);
45
static int merge_index(SORTPARAM *param,unsigned char *sort_buffer,
47
uint32_t maxbuffer,IO_CACHE *tempfile,
49
static bool save_index(SORTPARAM *param,unsigned char **sort_keys, uint32_t count,
50
filesort_info_st *table_sort);
142
51
static uint32_t suffix_length(uint32_t string_length);
143
static void unpack_addon_fields(sort_addon_field *addon_field,
52
static uint32_t sortlength(Session *session, SORT_FIELD *sortorder, uint32_t s_length,
53
bool *multi_byte_charset);
54
static SORT_ADDON_FIELD *get_addon_fields(Session *session, Field **ptabfield,
55
uint32_t sortlength, uint32_t *plength);
56
static void unpack_addon_fields(struct st_sort_addon_field *addon_field,
144
57
unsigned char *buff);
146
FileSort::FileSort(Session &arg) :
153
60
Creates a set of pointers that can be used to read the rows
183
91
examined_rows will be set to number of examined rows
186
ha_rows FileSort::run(Table *table, SortField *sortorder, uint32_t s_length,
187
optimizer::SqlSelect *select, ha_rows max_rows,
188
bool sort_positions, ha_rows &examined_rows)
94
ha_rows filesort(Session *session, Table *table, SORT_FIELD *sortorder, uint32_t s_length,
95
SQL_SELECT *select, ha_rows max_rows,
96
bool sort_positions, ha_rows *examined_rows)
191
uint32_t memavl= 0, min_sort_memory;
99
uint32_t memavl, min_sort_memory;
192
100
uint32_t maxbuffer;
193
size_t allocated_sort_memory= 0;
194
buffpek *buffpek_inst= 0;
195
102
ha_rows records= HA_POS_ERROR;
196
103
unsigned char **sort_keys= 0;
197
internal::IO_CACHE tempfile;
198
internal::IO_CACHE buffpek_pointers;
199
internal::IO_CACHE *selected_records_file;
200
internal::IO_CACHE *outfile;
104
IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
202
106
bool multi_byte_charset;
205
Don't use table->sort in filesort as it is also used by
206
QuickIndexMergeSelect. Work with a copy and put it back at the end
207
when index_merge select has finished with it.
209
filesort_info table_sort(table->sort);
210
table->sort.io_cache= NULL;
108
filesort_info_st table_sort;
212
109
TableList *tab= table->pos_in_table_list;
213
110
Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
215
DRIZZLE_FILESORT_START(table->getShare()->getSchemaName(), table->getShare()->getTableName());
112
DRIZZLE_FILESORT_START();
218
115
Release InnoDB's adaptive hash index latch (if holding) before
221
plugin::TransactionalStorageEngine::releaseTemporaryLatches(&getSession());
118
ha_release_temporary_latches(session);
121
Don't use table->sort in filesort as it is also used by
122
QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end
123
when index_merge select has finished with it.
125
memcpy(&table_sort, &table->sort, sizeof(filesort_info_st));
126
table->sort.io_cache= NULL;
224
128
outfile= table_sort.io_cache;
225
assert(tempfile.buffer == 0);
226
assert(buffpek_pointers.buffer == 0);
228
param.sort_length= sortlength(sortorder, s_length, &multi_byte_charset);
229
param.ref_length= table->cursor->ref_length;
231
if (!(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) && !sort_positions)
129
my_b_clear(&tempfile);
130
my_b_clear(&buffpek_pointers);
133
memset(¶m, 0, sizeof(param));
134
param.sort_length= sortlength(session, sortorder, s_length, &multi_byte_charset);
135
param.ref_length= table->file->ref_length;
136
param.addon_field= 0;
137
param.addon_length= 0;
138
if (!(table->file->ha_table_flags() & HA_FAST_KEY_READ) && !sort_positions)
234
141
Get the descriptors of all fields whose values are appended
235
142
to sorted fields and get its total length in param.spack_length.
237
param.addon_field= get_addon_fields(table->getFields(),
144
param.addon_field= get_addon_fields(session, table->field,
238
145
param.sort_length,
239
146
¶m.addon_length);
291
196
selected_records_file= 0;
294
if (multi_byte_charset && !(param.tmp_buffer= (char*) malloc(param.sort_length)))
199
if (multi_byte_charset &&
200
!(param.tmp_buffer= (char*) malloc(param.sort_length)))
299
memavl= getSession().variables.sortbuff_size;
300
min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
203
memavl= session->variables.sortbuff_size;
204
min_sort_memory= cmax((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
301
205
while (memavl >= min_sort_memory)
303
207
uint32_t old_memavl;
304
208
uint32_t keys= memavl/(param.rec_length+sizeof(char*));
305
param.keys= (uint32_t) min(records+1, (ha_rows)keys);
307
allocated_sort_memory= param.keys * param.rec_length;
308
if (not global_sort_buffer.add(allocated_sort_memory))
310
my_error(ER_OUT_OF_GLOBAL_SORTMEMORY, MYF(ME_ERROR+ME_WAITTANG));
209
param.keys=(uint32_t) cmin(records+1, keys);
314
210
if ((table_sort.sort_keys=
315
211
(unsigned char **) make_char_array((char **) table_sort.sort_keys,
316
212
param.keys, param.rec_length)))
319
global_sort_buffer.sub(allocated_sort_memory);
321
if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
215
if ((memavl=memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
322
216
memavl= min_sort_memory;
324
218
sort_keys= table_sort.sort_keys;
327
221
my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
331
if (buffpek_pointers.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
224
if (open_cached_file(&buffpek_pointers,drizzle_tmpdir,TEMP_PREFIX,
225
DISK_BUFFER_SIZE, MYF(MY_WME)))
336
228
param.keys--; /* TODO: check why we do this */
337
229
param.sort_form= table;
338
230
param.end=(param.local_sortorder=sortorder)+s_length;
339
if ((records= find_all_keys(¶m,select,sort_keys, &buffpek_pointers,
340
&tempfile, selected_records_file)) == HA_POS_ERROR)
231
if ((records=find_all_keys(¶m,select,sort_keys, &buffpek_pointers,
232
&tempfile, selected_records_file)) ==
344
maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek_inst));
235
maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek));
346
237
if (maxbuffer == 0) // The whole set is in memory
348
if (param.save_index(sort_keys,(uint32_t) records, &table_sort))
239
if (save_index(¶m,sort_keys,(uint32_t) records, &table_sort))
357
246
if (table_sort.buffpek)
358
247
free(table_sort.buffpek);
359
table_sort.buffpek = 0;
248
table_sort.buffpek= 0;
361
250
if (!(table_sort.buffpek=
362
(unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer, table_sort.buffpek)))
251
(unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer,
252
table_sort.buffpek)))
366
buffpek_inst= (buffpek *) table_sort.buffpek;
254
buffpek= (BUFFPEK *) table_sort.buffpek;
367
255
table_sort.buffpek_len= maxbuffer;
368
buffpek_pointers.close_cached_file();
256
close_cached_file(&buffpek_pointers);
369
257
/* Open cached file if it isn't open */
370
if (! my_b_inited(outfile) && outfile->open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME)))
375
if (outfile->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
258
if (! my_b_inited(outfile) &&
259
open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
262
if (reinit_io_cache(outfile,WRITE_CACHE,0L,0,0))
381
266
Use also the space previously used by string pointers in sort_buffer
382
267
for temporary key storage.
384
param.keys=((param.keys*(param.rec_length+sizeof(char*))) / param.rec_length-1);
269
param.keys=((param.keys*(param.rec_length+sizeof(char*))) /
386
271
maxbuffer--; // Offset from 0
387
if (merge_many_buff(¶m,(unsigned char*) sort_keys,buffpek_inst,&maxbuffer, &tempfile))
392
if (flush_io_cache(&tempfile) || tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
397
if (merge_index(¶m,(unsigned char*) sort_keys,buffpek_inst,maxbuffer,&tempfile, outfile))
272
if (merge_many_buff(¶m,(unsigned char*) sort_keys,buffpek,&maxbuffer,
275
if (flush_io_cache(&tempfile) ||
276
reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
278
if (merge_index(¶m,(unsigned char*) sort_keys,buffpek,maxbuffer,&tempfile,
403
282
if (records > param.max_rows)
405
records= param.max_rows;
283
records=param.max_rows;
410
if (not subselect || not subselect->is_uncacheable())
287
if (param.tmp_buffer)
288
if (param.tmp_buffer)
289
free(param.tmp_buffer);
290
if (!subselect || !subselect->is_uncacheable())
292
if ((unsigned char*) sort_keys)
293
free((unsigned char*) sort_keys);
413
294
table_sort.sort_keys= 0;
295
if ((unsigned char*) buffpek)
296
free((unsigned char*) buffpek);
415
297
table_sort.buffpek= 0;
416
298
table_sort.buffpek_len= 0;
419
tempfile.close_cached_file();
420
buffpek_pointers.close_cached_file();
300
close_cached_file(&tempfile);
301
close_cached_file(&buffpek_pointers);
422
302
if (my_b_inited(outfile))
424
304
if (flush_io_cache(outfile))
429
internal::my_off_t save_pos= outfile->pos_in_file;
307
my_off_t save_pos=outfile->pos_in_file;
430
308
/* For following reads */
431
if (outfile->reinit_io_cache(internal::READ_CACHE,0L,0,0))
309
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
435
311
outfile->end_of_file=save_pos;
441
315
my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
442
316
MYF(ME_ERROR+ME_WAITTANG));
446
getSession().status_var.filesort_rows+= (uint32_t) records;
448
examined_rows= param.examined_rows;
449
global_sort_buffer.sub(allocated_sort_memory);
450
table->sort= table_sort;
451
DRIZZLE_FILESORT_DONE(error, records);
452
return (error ? HA_POS_ERROR : records);
318
statistic_add(session->status_var.filesort_rows,
319
(uint32_t) records, &LOCK_status);
320
*examined_rows= param.examined_rows;
321
memcpy(&table->sort, &table_sort, sizeof(filesort_info_st));
322
DRIZZLE_FILESORT_END();
323
return(error ? HA_POS_ERROR : records);
327
void filesort_free_buffers(Table *table, bool full)
329
if (table->sort.record_pointers)
331
free((unsigned char*) table->sort.record_pointers);
332
table->sort.record_pointers=0;
336
if (table->sort.sort_keys )
338
if ((unsigned char*) table->sort.sort_keys)
339
free((unsigned char*) table->sort.sort_keys);
340
table->sort.sort_keys= 0;
342
if (table->sort.buffpek)
344
if ((unsigned char*) table->sort.buffpek)
345
free((unsigned char*) table->sort.buffpek);
346
table->sort.buffpek= 0;
347
table->sort.buffpek_len= 0;
350
if (table->sort.addon_buf)
352
free((char *) table->sort.addon_buf);
353
free((char *) table->sort.addon_field);
354
table->sort.addon_buf=0;
355
table->sort.addon_field=0;
455
359
/** Make a array of string pointers. */
457
361
static char **make_char_array(char **old_pos, register uint32_t fields,
474
378
/** Read 'count' number of buffer pointers into memory. */
476
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffpek_pointers, uint32_t count,
380
static unsigned char *read_buffpek_from_file(IO_CACHE *buffpek_pointers, uint32_t count,
477
381
unsigned char *buf)
479
uint32_t length= sizeof(buffpek)*count;
383
uint32_t length= sizeof(BUFFPEK)*count;
480
384
unsigned char *tmp= buf;
481
if (count > UINT_MAX/sizeof(buffpek))
482
return 0; /* sizeof(buffpek)*count will overflow */
385
if (count > UINT_MAX/sizeof(BUFFPEK))
386
return 0; /* sizeof(BUFFPEK)*count will overflow */
484
388
tmp= (unsigned char *)malloc(length);
487
if (buffpek_pointers->reinit_io_cache(internal::READ_CACHE,0L,0,0) ||
391
if (reinit_io_cache(buffpek_pointers,READ_CACHE,0L,0,0) ||
488
392
my_b_read(buffpek_pointers, (unsigned char*) tmp, length))
490
394
free((char*) tmp);
532
436
HA_POS_ERROR on error.
535
ha_rows FileSort::find_all_keys(SortParam *param,
536
optimizer::SqlSelect *select,
537
unsigned char **sort_keys,
538
internal::IO_CACHE *buffpek_pointers,
539
internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
439
static ha_rows find_all_keys(SORTPARAM *param, SQL_SELECT *select,
440
unsigned char **sort_keys,
441
IO_CACHE *buffpek_pointers,
442
IO_CACHE *tempfile, IO_CACHE *indexfile)
541
444
int error,flag,quick_select;
542
445
uint32_t idx,indexpos,ref_length;
543
446
unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
544
internal::my_off_t record;
545
448
Table *sort_form;
546
volatile Session::killed_state_t *killed= getSession().getKilledPtr();
548
boost::dynamic_bitset<> *save_read_set= NULL;
549
boost::dynamic_bitset<> *save_write_set= NULL;
449
Session *session= current_session;
450
volatile Session::killed_state *killed= &session->killed;
452
MY_BITMAP *save_read_set, *save_write_set;
552
455
error=quick_select=0;
553
456
sort_form=param->sort_form;
554
file= sort_form->cursor;
457
file=sort_form->file;
555
458
ref_length=param->ref_length;
556
459
ref_pos= ref_buff;
557
460
quick_select=select && select->quick;
559
flag= ((!indexfile && ! file->isOrdered())
462
flag= ((!indexfile && file->ha_table_flags() & HA_REC_NOT_IN_SEQ)
560
463
|| quick_select);
561
464
if (indexfile || flag)
562
465
ref_pos= &file->ref[0];
564
467
if (! indexfile && ! quick_select)
566
469
next_pos=(unsigned char*) 0; /* Find records in sequence */
567
if (file->startTableScan(1))
568
return(HA_POS_ERROR);
569
file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
470
file->ha_rnd_init(1);
471
file->extra_opt(HA_EXTRA_CACHE,
472
current_session->variables.read_buff_size);
572
ReadRecord read_record_info;
475
READ_RECORD read_record_info;
573
476
if (quick_select)
575
478
if (select->quick->reset())
576
479
return(HA_POS_ERROR);
578
if (read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1))
579
return(HA_POS_ERROR);
480
init_read_record(&read_record_info, current_session, select->quick->head,
582
484
/* Remember original bitmaps */
583
485
save_read_set= sort_form->read_set;
584
486
save_write_set= sort_form->write_set;
585
487
/* Set up temporary column read map for columns used by sort */
586
sort_form->tmp_set.reset();
488
bitmap_clear_all(&sort_form->tmp_set);
587
489
/* Temporary set for register_used_fields and register_field_in_read_map */
588
490
sort_form->read_set= &sort_form->tmp_set;
589
param->register_used_fields();
491
register_used_fields(param);
590
492
if (select && select->cond)
591
493
select->cond->walk(&Item::register_field_in_read_map, 1,
592
494
(unsigned char*) sort_form);
593
sort_form->column_bitmaps_set(sort_form->tmp_set, sort_form->tmp_set);
495
sort_form->column_bitmaps_set(&sort_form->tmp_set, &sort_form->tmp_set);
601
503
error= HA_ERR_END_OF_FILE;
604
file->position(sort_form->getInsertRecord());
506
file->position(sort_form->record[0]);
606
508
else /* Not quick-select */
610
if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length))
512
if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length)) /* purecov: deadcode */
612
error= errno ? errno : -1; /* Abort */
514
error= my_errno ? my_errno : -1; /* Abort */
615
error=file->rnd_pos(sort_form->getInsertRecord(),next_pos);
517
error=file->rnd_pos(sort_form->record[0],next_pos);
619
error=file->rnd_next(sort_form->getInsertRecord());
521
error=file->rnd_next(sort_form->record[0]);
523
update_virtual_fields_marked_for_write(sort_form);
623
internal::my_store_ptr(ref_pos,ref_length,record); // Position to row
624
record+= sort_form->getShare()->db_record_offset;
527
my_store_ptr(ref_pos,ref_length,record); // Position to row
528
record+= sort_form->s->db_record_offset;
627
file->position(sort_form->getInsertRecord());
531
file->position(sort_form->record[0]);
629
533
if (error && error != HA_ERR_RECORD_DELETED)
667
568
index_merge quick select uses table->sort when retrieving rows, so free
668
569
resoures it has allocated.
670
read_record_info.end_read_record();
571
end_read_record(&read_record_info);
674
575
(void) file->extra(HA_EXTRA_NO_CACHE); /* End cacheing of records */
676
file->endTableScan();
679
if (getSession().is_error())
580
if (session->is_error())
680
581
return(HA_POS_ERROR);
682
583
/* Signal we should use orignal column read and write maps */
683
sort_form->column_bitmaps_set(*save_read_set, *save_write_set);
584
sort_form->column_bitmaps_set(save_read_set, save_write_set);
685
586
if (error != HA_ERR_END_OF_FILE)
687
sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
688
return(HA_POS_ERROR);
691
if (indexpos && idx && param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
693
return(HA_POS_ERROR);
588
file->print_error(error,MYF(ME_ERROR | ME_WAITTANG)); /* purecov: inspected */
589
return(HA_POS_ERROR); /* purecov: inspected */
591
if (indexpos && idx &&
592
write_keys(param,sort_keys,idx,buffpek_pointers,tempfile))
593
return(HA_POS_ERROR); /* purecov: inspected */
696
594
return(my_b_inited(tempfile) ?
697
595
(ha_rows) (my_b_tell(tempfile)/param->rec_length) :
724
int SortParam::write_keys(register unsigned char **sort_keys, uint32_t count,
725
internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
623
write_keys(SORTPARAM *param, register unsigned char **sort_keys, uint32_t count,
624
IO_CACHE *buffpek_pointers, IO_CACHE *tempfile)
626
size_t sort_length, rec_length;
729
internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
630
sort_length= param->sort_length;
631
rec_length= param->rec_length;
632
my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
730
633
if (!my_b_inited(tempfile) &&
731
tempfile->open_cached_file(drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
634
open_cached_file(tempfile, drizzle_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
636
goto err; /* purecov: inspected */
735
637
/* check we won't have more buffpeks than we can possibly keep in memory */
736
if (my_b_tell(buffpek_pointers) + sizeof(buffpek) > (uint64_t)UINT_MAX)
638
if (my_b_tell(buffpek_pointers) + sizeof(BUFFPEK) > (uint64_t)UINT_MAX)
741
640
buffpek.file_pos= my_b_tell(tempfile);
742
if ((ha_rows) count > max_rows)
743
count=(uint32_t) max_rows;
641
if ((ha_rows) count > param->max_rows)
642
count=(uint32_t) param->max_rows; /* purecov: inspected */
745
643
buffpek.count=(ha_rows) count;
747
for (unsigned char **ptr= sort_keys + count ; sort_keys != ptr ; sort_keys++)
644
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
749
645
if (my_b_write(tempfile, (unsigned char*) *sort_keys, (uint32_t) rec_length))
755
647
if (my_b_write(buffpek_pointers, (unsigned char*) &buffpek, sizeof(buffpek)))
761
653
} /* write_keys */
820
713
Item *item=sort_field->item;
821
714
maybe_null= item->maybe_null;
823
715
switch (sort_field->result_type) {
824
716
case STRING_RESULT:
826
const CHARSET_INFO * const cs=item->collation.collation;
827
char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
829
uint32_t sort_field_length;
718
const CHARSET_INFO * const cs=item->collation.collation;
719
char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
721
uint32_t sort_field_length;
725
/* All item->str() to use some extra byte for end null.. */
726
String tmp((char*) to,sort_field->length+4,cs);
727
String *res= item->str_result(&tmp);
833
/* All item->str() to use some extra byte for end null.. */
834
String tmp((char*) to,sort_field->length+4,cs);
835
String *res= item->str_result(&tmp);
839
memset(to-1, 0, sort_field->length+1);
843
This should only happen during extreme conditions if we run out
844
of memory or have an item marked not null when it can be null.
845
This code is here mainly to avoid a hard crash in this case.
848
memset(to, 0, sort_field->length); // Avoid crash
852
length= res->length();
853
sort_field_length= sort_field->length - sort_field->suffix_length;
854
diff=(int) (sort_field_length - length);
858
length= sort_field_length;
860
if (sort_field->suffix_length)
862
/* Store length last in result_string */
863
store_length(to + sort_field_length, length,
864
sort_field->suffix_length);
866
if (sort_field->need_strxnfrm)
868
char *from=(char*) res->ptr();
870
if ((unsigned char*) from == to)
872
set_if_smaller(length,sort_field->length);
873
memcpy(tmp_buffer,from,length);
876
tmp_length= my_strnxfrm(cs,to,sort_field->length,
877
(unsigned char*) from, length);
878
assert(tmp_length == sort_field->length);
731
memset(to-1, 0, sort_field->length+1);
882
my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
883
cs->cset->fill(cs, (char *)to+length,diff,fill_char);
734
/* purecov: begin deadcode */
736
This should only happen during extreme conditions if we run out
737
of memory or have an item marked not null when it can be null.
738
This code is here mainly to avoid a hard crash in this case.
741
memset(to, 0, sort_field->length); // Avoid crash
746
length= res->length();
747
sort_field_length= sort_field->length - sort_field->suffix_length;
748
diff=(int) (sort_field_length - length);
752
length= sort_field_length;
754
if (sort_field->suffix_length)
756
/* Store length last in result_string */
757
store_length(to + sort_field_length, length,
758
sort_field->suffix_length);
760
if (sort_field->need_strxnfrm)
762
char *from=(char*) res->ptr();
764
if ((unsigned char*) from == to)
766
set_if_smaller(length,sort_field->length);
767
memcpy(param->tmp_buffer,from,length);
768
from=param->tmp_buffer;
770
tmp_length= my_strnxfrm(cs,to,sort_field->length,
771
(unsigned char*) from, length);
772
assert(tmp_length == sort_field->length);
776
my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
777
cs->cset->fill(cs, (char *)to+length,diff,fill_char);
889
783
int64_t value= item->val_int_result();
786
*to++=1; /* purecov: inspected */
893
787
if (item->null_value)
904
to[7]= (unsigned char) value;
905
to[6]= (unsigned char) (value >> 8);
906
to[5]= (unsigned char) (value >> 16);
907
to[4]= (unsigned char) (value >> 24);
908
to[3]= (unsigned char) (value >> 32);
909
to[2]= (unsigned char) (value >> 40);
910
to[1]= (unsigned char) (value >> 48);
798
to[7]= (unsigned char) value;
799
to[6]= (unsigned char) (value >> 8);
800
to[5]= (unsigned char) (value >> 16);
801
to[4]= (unsigned char) (value >> 24);
802
to[3]= (unsigned char) (value >> 32);
803
to[2]= (unsigned char) (value >> 40);
804
to[1]= (unsigned char) (value >> 48);
911
805
if (item->unsigned_flag) /* Fix sign */
912
806
to[0]= (unsigned char) (value >> 56);
914
808
to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
917
811
case DECIMAL_RESULT:
919
type::Decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
813
my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
922
816
if (item->null_value)
1062
bool SortParam::save_index(unsigned char **sort_keys, uint32_t count, filesort_info *table_sort)
955
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
956
filesort_info_st *table_sort)
958
uint32_t offset,res_length;
1065
959
unsigned char *to;
1067
internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
1068
offset= rec_length - res_length;
1070
if ((ha_rows) count > max_rows)
1071
count=(uint32_t) max_rows;
1073
if (!(to= table_sort->record_pointers= (unsigned char*) malloc(res_length*count)))
1076
for (unsigned char **end_ptr= sort_keys+count ; sort_keys != end_ptr ; sort_keys++)
961
my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, param->sort_length);
962
res_length= param->res_length;
963
offset= param->rec_length-res_length;
964
if ((ha_rows) count > param->max_rows)
965
count=(uint32_t) param->max_rows;
966
if (!(to= table_sort->record_pointers=
967
(unsigned char*) malloc(res_length*count)))
968
return(1); /* purecov: inspected */
969
for (unsigned char **end= sort_keys+count ; sort_keys != end ; sort_keys++)
1078
971
memcpy(to, *sort_keys+offset, res_length);
1079
972
to+= res_length;
1086
978
/** Merge buffers to make < MERGEBUFF2 buffers. */
1088
int FileSort::merge_many_buff(SortParam *param, unsigned char *sort_buffer,
1089
buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
980
int merge_many_buff(SORTPARAM *param, unsigned char *sort_buffer,
981
BUFFPEK *buffpek, uint32_t *maxbuffer, IO_CACHE *t_file)
1091
internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
984
IO_CACHE t_file2,*from_file,*to_file,*temp;
1094
987
if (*maxbuffer < MERGEBUFF2)
988
return(0); /* purecov: inspected */
1096
989
if (flush_io_cache(t_file) ||
1097
t_file2.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE, MYF(MY_WME)))
990
open_cached_file(&t_file2,drizzle_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
992
return(1); /* purecov: inspected */
1102
994
from_file= t_file ; to_file= &t_file2;
1103
995
while (*maxbuffer >= MERGEBUFF2)
1105
register uint32_t i;
1107
if (from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0))
1112
if (to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
1117
lastbuff=buffpek_inst;
997
if (reinit_io_cache(from_file,READ_CACHE,0L,0,0))
999
if (reinit_io_cache(to_file,WRITE_CACHE,0L,0,0))
1118
1002
for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
1120
1004
if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
1121
buffpek_inst+i,buffpek_inst+i+MERGEBUFF-1,0))
1005
buffpek+i,buffpek+i+MERGEBUFF-1,0))
1127
1008
if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
1128
buffpek_inst+i,buffpek_inst+ *maxbuffer,0))
1009
buffpek+i,buffpek+ *maxbuffer,0))
1010
break; /* purecov: inspected */
1133
1011
if (flush_io_cache(to_file))
1012
break; /* purecov: inspected */
1138
1013
temp=from_file; from_file=to_file; to_file=temp;
1139
from_file->setup_io_cache();
1140
to_file->setup_io_cache();
1141
*maxbuffer= (uint32_t) (lastbuff-buffpek_inst)-1;
1014
setup_io_cache(from_file);
1015
setup_io_cache(to_file);
1016
*maxbuffer= (uint32_t) (lastbuff-buffpek)-1;
1145
to_file->close_cached_file(); // This holds old result
1019
close_cached_file(to_file); // This holds old result
1146
1020
if (to_file == t_file)
1148
1022
*t_file=t_file2; // Copy result file
1149
t_file->setup_io_cache();
1023
setup_io_cache(t_file);
1152
1026
return(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
1160
1034
(uint32_t)-1 if something goes wrong
1163
uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
1037
uint32_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
1038
uint32_t rec_length)
1165
1040
register uint32_t count;
1166
1041
uint32_t length;
1168
if ((count= (uint32_t) min((ha_rows) buffpek_inst->max_keys,buffpek_inst->count)))
1043
if ((count=(uint32_t) cmin((ha_rows) buffpek->max_keys,buffpek->count)))
1170
if (pread(fromfile->file,(unsigned char*) buffpek_inst->base, (length= rec_length*count),buffpek_inst->file_pos) == 0)
1171
return((uint32_t) -1);
1173
buffpek_inst->key= buffpek_inst->base;
1174
buffpek_inst->file_pos+= length; /* New filepos */
1175
buffpek_inst->count-= count;
1176
buffpek_inst->mem_count= count;
1045
if (pread(fromfile->file,(unsigned char*) buffpek->base, (length= rec_length*count),buffpek->file_pos) == 0)
1046
return((uint32_t) -1); /* purecov: inspected */
1047
buffpek->key=buffpek->base;
1048
buffpek->file_pos+= length; /* New filepos */
1049
buffpek->count-= count;
1050
buffpek->mem_count= count;
1178
1052
return (count*rec_length);
1179
1053
} /* read_to_buffer */
1182
class compare_functor
1057
Put all room used by freed buffer to use in adjacent buffer.
1059
Note, that we can't simply distribute memory evenly between all buffers,
1060
because new areas must not overlap with old ones.
1062
@param[in] queue list of non-empty buffers, without freed buffer
1063
@param[in] reuse empty buffer
1064
@param[in] key_length key length
1067
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint32_t key_length)
1184
qsort2_cmp key_compare;
1185
void *key_compare_arg;
1188
compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg) :
1189
key_compare(in_key_compare),
1190
key_compare_arg(in_compare_arg)
1193
inline bool operator()(const buffpek *i, const buffpek *j) const
1069
unsigned char *reuse_end= reuse->base + reuse->max_keys * key_length;
1070
for (uint32_t i= 0; i < queue->elements; ++i)
1195
int val= key_compare(key_compare_arg, &i->key, &j->key);
1072
BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
1073
if (bp->base + bp->max_keys * key_length == reuse->base)
1075
bp->max_keys+= reuse->max_keys;
1078
else if (bp->base == reuse_end)
1080
bp->base= reuse->base;
1081
bp->max_keys+= reuse->max_keys;
1203
1090
Merge buffers to one buffer.
1205
1092
@param param Sort parameter
1206
@param from_file File with source data (buffpeks point to this file)
1093
@param from_file File with source data (BUFFPEKs point to this file)
1207
1094
@param to_file File to write the sorted result data.
1208
1095
@param sort_buffer Buffer for data to store up to MERGEBUFF2 sort keys.
1209
@param lastbuff OUT Store here buffpek describing data written to to_file
1210
@param Fb First element in source buffpeks array
1211
@param Tb Last element in source buffpeks array
1096
@param lastbuff OUT Store here BUFFPEK describing data written to to_file
1097
@param Fb First element in source BUFFPEKs array
1098
@param Tb Last element in source BUFFPEKs array
1220
int FileSort::merge_buffers(SortParam *param, internal::IO_CACHE *from_file,
1221
internal::IO_CACHE *to_file, unsigned char *sort_buffer,
1222
buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
1107
int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
1108
IO_CACHE *to_file, unsigned char *sort_buffer,
1109
BUFFPEK *lastbuff, BUFFPEK *Fb, BUFFPEK *Tb,
1226
1113
uint32_t rec_length,res_length,offset;
1227
1114
size_t sort_length;
1228
1115
uint32_t maxcount;
1229
1116
ha_rows max_rows,org_max_rows;
1230
internal::my_off_t to_start_filepos;
1117
my_off_t to_start_filepos;
1231
1118
unsigned char *strpos;
1232
buffpek *buffpek_inst;
1233
1121
qsort2_cmp cmp;
1234
1122
void *first_cmp_arg;
1235
volatile Session::killed_state_t *killed= getSession().getKilledPtr();
1236
Session::killed_state_t not_killable;
1123
volatile Session::killed_state *killed= ¤t_session->killed;
1124
Session::killed_state not_killable;
1238
getSession().status_var.filesort_merge_passes++;
1126
status_var_increment(current_session->status_var.filesort_merge_passes);
1239
1127
if (param->not_killable)
1241
1129
killed= ¬_killable;
1265
cmp= internal::get_ptr_compare(sort_length);
1153
cmp= get_ptr_compare(sort_length);
1266
1154
first_cmp_arg= (void*) &sort_length;
1268
priority_queue<buffpek *, vector<buffpek *>, compare_functor >
1269
queue(compare_functor(cmp, first_cmp_arg));
1270
for (buffpek_inst= Fb ; buffpek_inst <= Tb ; buffpek_inst++)
1156
if (init_queue(&queue, (uint32_t) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
1157
(queue_compare) cmp, first_cmp_arg))
1158
return(1); /* purecov: inspected */
1159
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1272
buffpek_inst->base= strpos;
1273
buffpek_inst->max_keys= maxcount;
1274
strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek_inst,
1161
buffpek->base= strpos;
1162
buffpek->max_keys= maxcount;
1163
strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek,
1276
1165
if (error == -1)
1279
buffpek_inst->max_keys= buffpek_inst->mem_count; // If less data in buffers than expected
1280
queue.push(buffpek_inst);
1166
goto err; /* purecov: inspected */
1167
buffpek->max_keys= buffpek->mem_count; // If less data in buffers than expected
1168
queue_insert(&queue, (unsigned char*) buffpek);
1283
1171
if (param->unique_buff)
1290
1178
This is safe as we know that there is always more than one element
1291
1179
in each block to merge (This is guaranteed by the Unique:: algorithm
1293
buffpek_inst= queue.top();
1294
memcpy(param->unique_buff, buffpek_inst->key, rec_length);
1295
if (my_b_write(to_file, (unsigned char*) buffpek_inst->key, rec_length))
1181
buffpek= (BUFFPEK*) queue_top(&queue);
1182
memcpy(param->unique_buff, buffpek->key, rec_length);
1183
if (my_b_write(to_file, (unsigned char*) buffpek->key, rec_length))
1185
error=1; goto err; /* purecov: inspected */
1299
buffpek_inst->key+= rec_length;
1300
buffpek_inst->mem_count--;
1187
buffpek->key+= rec_length;
1188
buffpek->mem_count--;
1301
1189
if (!--max_rows)
1191
error= 0; /* purecov: inspected */
1192
goto end; /* purecov: inspected */
1306
/* Top element has been used */
1308
queue.push(buffpek_inst);
1194
queue_replaced(&queue); // Top element has been used
1312
1197
cmp= 0; // Not unique
1315
while (queue.size() > 1)
1199
while (queue.elements > 1)
1203
error= 1; goto err; /* purecov: inspected */
1323
buffpek_inst= queue.top();
1207
buffpek= (BUFFPEK*) queue_top(&queue);
1324
1208
if (cmp) // Remove duplicates
1326
1210
if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
1327
(unsigned char**) &buffpek_inst->key))
1211
(unsigned char**) &buffpek->key))
1328
1212
goto skip_duplicate;
1329
memcpy(param->unique_buff, buffpek_inst->key, rec_length);
1213
memcpy(param->unique_buff, buffpek->key, rec_length);
1333
if (my_b_write(to_file,(unsigned char*) buffpek_inst->key, rec_length))
1217
if (my_b_write(to_file,(unsigned char*) buffpek->key, rec_length))
1219
error=1; goto err; /* purecov: inspected */
1340
if (my_b_write(to_file, (unsigned char*) buffpek_inst->key+offset, res_length))
1224
if (my_b_write(to_file, (unsigned char*) buffpek->key+offset, res_length))
1226
error=1; goto err; /* purecov: inspected */
1345
1229
if (!--max_rows)
1231
error= 0; /* purecov: inspected */
1232
goto end; /* purecov: inspected */
1351
1235
skip_duplicate:
1352
buffpek_inst->key+= rec_length;
1353
if (! --buffpek_inst->mem_count)
1236
buffpek->key+= rec_length;
1237
if (! --buffpek->mem_count)
1355
if (!(error= (int) read_to_buffer(from_file,buffpek_inst,
1239
if (!(error= (int) read_to_buffer(from_file,buffpek,
1242
queue_remove(&queue,0);
1243
reuse_freed_buff(&queue, buffpek, rec_length);
1359
1244
break; /* One buffer have been removed */
1361
1246
else if (error == -1)
1247
goto err; /* purecov: inspected */
1366
/* Top element has been replaced */
1368
queue.push(buffpek_inst);
1249
queue_replaced(&queue); /* Top element has been replaced */
1371
buffpek_inst= queue.top();
1372
buffpek_inst->base= sort_buffer;
1373
buffpek_inst->max_keys= param->keys;
1252
buffpek= (BUFFPEK*) queue_top(&queue);
1253
buffpek->base= sort_buffer;
1254
buffpek->max_keys= param->keys;
1376
1257
As we know all entries in the buffer are unique, we only have to
1381
if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek_inst->key))
1262
if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek->key))
1383
buffpek_inst->key+= rec_length; // Remove duplicate
1384
--buffpek_inst->mem_count;
1264
buffpek->key+= rec_length; // Remove duplicate
1265
--buffpek->mem_count;
1390
if ((ha_rows) buffpek_inst->mem_count > max_rows)
1271
if ((ha_rows) buffpek->mem_count > max_rows)
1391
1272
{ /* Don't write too many records */
1392
buffpek_inst->mem_count= (uint32_t) max_rows;
1393
buffpek_inst->count= 0; /* Don't read more */
1273
buffpek->mem_count= (uint32_t) max_rows;
1274
buffpek->count= 0; /* Don't read more */
1395
max_rows-= buffpek_inst->mem_count;
1276
max_rows-= buffpek->mem_count;
1398
if (my_b_write(to_file,(unsigned char*) buffpek_inst->key,
1399
(rec_length*buffpek_inst->mem_count)))
1279
if (my_b_write(to_file,(unsigned char*) buffpek->key,
1280
(rec_length*buffpek->mem_count)))
1282
error= 1; goto err; /* purecov: inspected */
1406
1287
register unsigned char *end;
1407
strpos= buffpek_inst->key+offset;
1408
for (end= strpos+buffpek_inst->mem_count*rec_length ;
1288
strpos= buffpek->key+offset;
1289
for (end= strpos+buffpek->mem_count*rec_length ;
1409
1290
strpos != end ;
1410
1291
strpos+= rec_length)
1412
1293
if (my_b_write(to_file, (unsigned char *) strpos, res_length))
1420
while ((error=(int) read_to_buffer(from_file,buffpek_inst, rec_length))
1300
while ((error=(int) read_to_buffer(from_file,buffpek, rec_length))
1421
1301
!= -1 && error != 0);
1424
lastbuff->count= min(org_max_rows-max_rows, param->max_rows);
1304
lastbuff->count= cmin(org_max_rows-max_rows, param->max_rows);
1425
1305
lastbuff->file_pos= to_start_filepos;
1307
delete_queue(&queue);
1428
1309
} /* merge_buffers */
1431
1312
/* Do a merge to output-file (save only positions) */
1433
int FileSort::merge_index(SortParam *param, unsigned char *sort_buffer,
1434
buffpek *buffpek_inst, uint32_t maxbuffer,
1435
internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
1314
static int merge_index(SORTPARAM *param, unsigned char *sort_buffer,
1315
BUFFPEK *buffpek, uint32_t maxbuffer,
1316
IO_CACHE *tempfile, IO_CACHE *outfile)
1437
if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek_inst,buffpek_inst,
1438
buffpek_inst+maxbuffer,1))
1318
if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek,buffpek,
1319
buffpek+maxbuffer,1))
1320
return(1); /* purecov: inspected */
1442
1322
} /* merge_index */
1502
1385
sortorder->result_type= sortorder->item->result_type();
1503
1386
if (sortorder->item->result_as_int64_t())
1504
1387
sortorder->result_type= INT_RESULT;
1506
1388
switch (sortorder->result_type) {
1507
1389
case STRING_RESULT:
1508
sortorder->length=sortorder->item->max_length;
1509
set_if_smaller(sortorder->length,
1510
getSession().variables.max_sort_length);
1511
if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1390
sortorder->length=sortorder->item->max_length;
1391
set_if_smaller(sortorder->length, session->variables.max_sort_length);
1392
if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1513
1394
sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1514
sortorder->need_strxnfrm= 1;
1515
*multi_byte_charset= 1;
1395
sortorder->need_strxnfrm= 1;
1396
*multi_byte_charset= 1;
1517
1398
else if (cs == &my_charset_bin)
1519
1400
/* Store length last to be able to sort blob/varbinary */
1520
1401
sortorder->suffix_length= suffix_length(sortorder->length);
1521
1402
sortorder->length+= sortorder->suffix_length;
1524
1405
case INT_RESULT:
1525
sortorder->length=8; // Size of intern int64_t
1406
sortorder->length=8; // Size of intern int64_t
1527
1408
case DECIMAL_RESULT:
1528
1409
sortorder->length=
1529
class_decimal_get_binary_size(sortorder->item->max_length -
1410
my_decimal_get_binary_size(sortorder->item->max_length -
1530
1411
(sortorder->item->decimals ? 1 : 0),
1531
1412
sortorder->item->decimals);
1533
1414
case REAL_RESULT:
1534
sortorder->length=sizeof(double);
1415
sortorder->length=sizeof(double);
1536
1417
case ROW_RESULT:
1537
// This case should never be choosen
1419
// This case should never be choosen
1541
1423
if (sortorder->item->maybe_null)
1542
length++; // Place for NULL marker
1424
length++; // Place for NULL marker
1544
set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
1426
set_if_smaller(sortorder->length, session->variables.max_sort_length);
1545
1427
length+=sortorder->length;
1547
1429
sortorder->field= (Field*) 0; // end marker