32
#include "drizzled/sql_sort.h"
33
#include "drizzled/error.h"
34
#include "drizzled/probes.h"
35
#include "drizzled/session.h"
36
#include "drizzled/table.h"
37
#include "drizzled/table_list.h"
38
#include "drizzled/optimizer/range.h"
39
#include "drizzled/records.h"
40
#include "drizzled/internal/iocache.h"
41
#include "drizzled/internal/my_sys.h"
42
#include "plugin/myisam/myisam.h"
43
#include "drizzled/plugin/transactional_storage_engine.h"
50
/* functions defined in this file */
24
#include <drizzled/server_includes.h>
26
#include <drizzled/drizzled_error_messages.h>
28
/* functions defined in this file */
52
30
static char **make_char_array(char **old_pos, register uint32_t fields,
55
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffer_file,
59
static ha_rows find_all_keys(Session *session,
61
optimizer::SqlSelect *select,
62
unsigned char * *sort_keys,
63
internal::IO_CACHE *buffer_file,
64
internal::IO_CACHE *tempfile,
65
internal::IO_CACHE *indexfile);
67
static int write_keys(SORTPARAM *param,
68
unsigned char * *sort_keys,
70
internal::IO_CACHE *buffer_file,
71
internal::IO_CACHE *tempfile);
73
static void make_sortkey(SORTPARAM *param,
75
unsigned char *ref_pos);
31
uint32_t length, myf my_flag);
32
static unsigned char *read_buffpek_from_file(IO_CACHE *buffer_file, uint32_t count,
34
static ha_rows find_all_keys(SORTPARAM *param,SQL_SELECT *select,
35
unsigned char * *sort_keys, IO_CACHE *buffer_file,
36
IO_CACHE *tempfile,IO_CACHE *indexfile);
37
static int write_keys(SORTPARAM *param,unsigned char * *sort_keys,
38
uint32_t count, IO_CACHE *buffer_file, IO_CACHE *tempfile);
39
static void make_sortkey(SORTPARAM *param,unsigned char *to, unsigned char *ref_pos);
76
40
static void register_used_fields(SORTPARAM *param);
77
static int merge_index(SORTPARAM *param,
78
unsigned char *sort_buffer,
81
internal::IO_CACHE *tempfile,
82
internal::IO_CACHE *outfile);
83
static bool save_index(SORTPARAM *param,
84
unsigned char **sort_keys,
41
static int merge_index(SORTPARAM *param,unsigned char *sort_buffer,
43
uint32_t maxbuffer,IO_CACHE *tempfile,
45
static bool save_index(SORTPARAM *param,unsigned char **sort_keys, uint32_t count,
86
46
filesort_info_st *table_sort);
87
47
static uint32_t suffix_length(uint32_t string_length);
88
static uint32_t sortlength(Session *session,
91
bool *multi_byte_charset);
92
static sort_addon_field_st *get_addon_fields(Session *session,
96
static void unpack_addon_fields(sort_addon_field_st *addon_field,
48
static uint32_t sortlength(Session *session, SORT_FIELD *sortorder, uint32_t s_length,
49
bool *multi_byte_charset);
50
static SORT_ADDON_FIELD *get_addon_fields(Session *session, Field **ptabfield,
51
uint32_t sortlength, uint32_t *plength);
52
static void unpack_addon_fields(struct st_sort_addon_field *addon_field,
97
53
unsigned char *buff);
131
87
examined_rows will be set to number of examined rows
134
ha_rows filesort(Session *session, Table *table, SortField *sortorder, uint32_t s_length,
135
optimizer::SqlSelect *select, ha_rows max_rows,
90
ha_rows filesort(Session *session, Table *table, SORT_FIELD *sortorder, uint32_t s_length,
91
SQL_SELECT *select, ha_rows max_rows,
136
92
bool sort_positions, ha_rows *examined_rows)
139
95
uint32_t memavl, min_sort_memory;
140
96
uint32_t maxbuffer;
142
98
ha_rows records= HA_POS_ERROR;
143
99
unsigned char **sort_keys= 0;
144
internal::IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
100
IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
146
102
bool multi_byte_charset;
149
105
TableList *tab= table->pos_in_table_list;
150
106
Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
152
DRIZZLE_FILESORT_START(table->getShare()->getSchemaName(), table->getShare()->getTableName());
108
DRIZZLE_FILESORT_START();
155
111
Release InnoDB's adaptive hash index latch (if holding) before
158
plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
114
ha_release_temporary_latches(session);
161
Don't use table->sort in filesort as it is also used by
162
QuickIndexMergeSelect. Work with a copy and put it back at the end
117
Don't use table->sort in filesort as it is also used by
118
QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end
163
119
when index_merge select has finished with it.
165
121
memcpy(&table_sort, &table->sort, sizeof(filesort_info_st));
166
122
table->sort.io_cache= NULL;
168
124
outfile= table_sort.io_cache;
169
125
my_b_clear(&tempfile);
170
126
my_b_clear(&buffpek_pointers);
173
129
memset(¶m, 0, sizeof(param));
174
130
param.sort_length= sortlength(session, sortorder, s_length, &multi_byte_charset);
175
param.ref_length= table->cursor->ref_length;
131
param.ref_length= table->file->ref_length;
176
132
param.addon_field= 0;
177
133
param.addon_length= 0;
178
if (!(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) && !sort_positions)
134
if (!(table->file->ha_table_flags() & HA_FAST_KEY_READ) && !sort_positions)
181
Get the descriptors of all fields whose values are appended
137
Get the descriptors of all fields whose values are appended
182
138
to sorted fields and get its total length in param.spack_length.
184
param.addon_field= get_addon_fields(session, table->getFields(),
140
param.addon_field= get_addon_fields(session, table->field,
185
141
param.sort_length,
186
142
¶m.addon_length);
211
168
if (select && select->quick)
213
session->status_var.filesort_range_count++;
170
status_var_increment(session->status_var.filesort_range_count);
217
session->status_var.filesort_scan_count++;
174
status_var_increment(session->status_var.filesort_scan_count);
219
176
#ifdef CAN_TRUST_RANGE
220
177
if (select && select->quick && select->quick->records > 0L)
222
records= min((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
223
table->cursor->stats.records)+EXTRA_RECORDS;
179
records=cmin((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
180
table->file->stats.records)+EXTRA_RECORDS;
224
181
selected_records_file=0;
229
records= table->cursor->estimate_rows_upper_bound();
186
records= table->file->estimate_rows_upper_bound();
231
If number of records is not known, use as much of sort buffer
188
If number of records is not known, use as much of sort buffer
234
191
if (records == HA_POS_ERROR)
235
192
records--; // we use 'records+1' below.
239
196
if (multi_byte_charset &&
240
!(param.tmp_buffer= (char*) malloc(param.sort_length)))
197
!(param.tmp_buffer= (char*) my_malloc(param.sort_length,MYF(MY_WME))))
243
200
memavl= session->variables.sortbuff_size;
244
min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
201
min_sort_memory= cmax((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
245
202
while (memavl >= min_sort_memory)
247
204
uint32_t old_memavl;
248
205
uint32_t keys= memavl/(param.rec_length+sizeof(char*));
249
param.keys= (uint32_t) min(records+1, (ha_rows)keys);
206
param.keys=(uint32_t) cmin(records+1, keys);
250
207
if ((table_sort.sort_keys=
251
208
(unsigned char **) make_char_array((char **) table_sort.sort_keys,
252
param.keys, param.rec_length)))
209
param.keys, param.rec_length, MYF(0))))
255
if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
212
if ((memavl=memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
256
213
memavl= min_sort_memory;
258
215
sort_keys= table_sort.sort_keys;
261
218
my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
264
if (open_cached_file(&buffpek_pointers,drizzle_tmpdir.c_str(),TEMP_PREFIX,
221
if (open_cached_file(&buffpek_pointers,mysql_tmpdir,TEMP_PREFIX,
265
222
DISK_BUFFER_SIZE, MYF(MY_WME)))
268
225
param.keys--; /* TODO: check why we do this */
269
226
param.sort_form= table;
270
227
param.end=(param.local_sortorder=sortorder)+s_length;
271
if ((records=find_all_keys(session, ¶m,select,sort_keys, &buffpek_pointers,
228
if ((records=find_all_keys(¶m,select,sort_keys, &buffpek_pointers,
272
229
&tempfile, selected_records_file)) ==
291
248
(unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer,
292
249
table_sort.buffpek)))
294
buffpek= (buffpek_st *) table_sort.buffpek;
251
buffpek= (BUFFPEK *) table_sort.buffpek;
295
252
table_sort.buffpek_len= maxbuffer;
296
253
close_cached_file(&buffpek_pointers);
297
254
/* Open cached file if it isn't open */
298
255
if (! my_b_inited(outfile) &&
299
open_cached_file(outfile,drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER,
256
open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
302
if (reinit_io_cache(outfile,internal::WRITE_CACHE,0L,0,0))
259
if (reinit_io_cache(outfile,WRITE_CACHE,0L,0,0))
342
301
if (flush_io_cache(outfile))
345
internal::my_off_t save_pos=outfile->pos_in_file;
304
my_off_t save_pos=outfile->pos_in_file;
346
305
/* For following reads */
347
if (reinit_io_cache(outfile,internal::READ_CACHE,0L,0,0))
306
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
349
308
outfile->end_of_file=save_pos;
354
312
my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
355
313
MYF(ME_ERROR+ME_WAITTANG));
359
session->status_var.filesort_rows+= (uint32_t) records;
315
statistic_add(session->status_var.filesort_rows,
316
(uint32_t) records, &LOCK_status);
361
317
*examined_rows= param.examined_rows;
362
318
memcpy(&table->sort, &table_sort, sizeof(filesort_info_st));
363
DRIZZLE_FILESORT_DONE(error, records);
364
return (error ? HA_POS_ERROR : records);
319
DRIZZLE_FILESORT_END();
320
return(error ? HA_POS_ERROR : records);
368
void Table::filesort_free_buffers(bool full)
324
void filesort_free_buffers(Table *table, bool full)
370
if (sort.record_pointers)
326
if (table->sort.record_pointers)
372
free((unsigned char*) sort.record_pointers);
373
sort.record_pointers=0;
328
free((unsigned char*) table->sort.record_pointers);
329
table->sort.record_pointers=0;
333
if (table->sort.sort_keys )
379
if ((unsigned char*) sort.sort_keys)
380
free((unsigned char*) sort.sort_keys);
335
if ((unsigned char*) table->sort.sort_keys)
336
free((unsigned char*) table->sort.sort_keys);
337
table->sort.sort_keys= 0;
339
if (table->sort.buffpek)
385
if ((unsigned char*) sort.buffpek)
386
free((unsigned char*) sort.buffpek);
341
if ((unsigned char*) table->sort.buffpek)
342
free((unsigned char*) table->sort.buffpek);
343
table->sort.buffpek= 0;
344
table->sort.buffpek_len= 0;
347
if (table->sort.addon_buf)
393
free((char *) sort.addon_buf);
394
free((char *) sort.addon_field);
349
free((char *) table->sort.addon_buf);
350
free((char *) table->sort.addon_field);
351
table->sort.addon_buf=0;
352
table->sort.addon_field=0;
400
356
/** Make a array of string pointers. */
402
358
static char **make_char_array(char **old_pos, register uint32_t fields,
359
uint32_t length, myf my_flag)
405
361
register char **pos;
409
(old_pos= (char**) malloc((uint32_t) fields*(length+sizeof(char*)))))
365
(old_pos= (char**) my_malloc((uint32_t) fields*(length+sizeof(char*)),
411
368
pos=old_pos; char_pos=((char*) (pos+fields)) -length;
412
369
while (fields--) *(pos++) = (char_pos+= length);
419
376
/** Read 'count' number of buffer pointers into memory. */
421
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffpek_pointers, uint32_t count,
378
static unsigned char *read_buffpek_from_file(IO_CACHE *buffpek_pointers, uint32_t count,
422
379
unsigned char *buf)
424
uint32_t length= sizeof(buffpek_st)*count;
381
uint32_t length= sizeof(BUFFPEK)*count;
425
382
unsigned char *tmp= buf;
426
if (count > UINT_MAX/sizeof(buffpek_st))
427
return 0; /* sizeof(buffpek_st)*count will overflow */
383
if (count > UINT_MAX/sizeof(BUFFPEK))
384
return 0; /* sizeof(BUFFPEK)*count will overflow */
429
tmp= (unsigned char *)malloc(length);
386
tmp= (unsigned char *)my_malloc(length, MYF(MY_WME));
432
if (reinit_io_cache(buffpek_pointers,internal::READ_CACHE,0L,0,0) ||
389
if (reinit_io_cache(buffpek_pointers,READ_CACHE,0L,0,0) ||
433
390
my_b_read(buffpek_pointers, (unsigned char*) tmp, length))
435
392
free((char*) tmp);
477
434
HA_POS_ERROR on error.
480
static ha_rows find_all_keys(Session *session,
482
optimizer::SqlSelect *select,
437
static ha_rows find_all_keys(SORTPARAM *param, SQL_SELECT *select,
483
438
unsigned char **sort_keys,
484
internal::IO_CACHE *buffpek_pointers,
485
internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
439
IO_CACHE *buffpek_pointers,
440
IO_CACHE *tempfile, IO_CACHE *indexfile)
487
442
int error,flag,quick_select;
488
443
uint32_t idx,indexpos,ref_length;
489
444
unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
490
internal::my_off_t record;
491
446
Table *sort_form;
447
Session *session= current_session;
492
448
volatile Session::killed_state *killed= &session->killed;
494
MyBitmap *save_read_set, *save_write_set;
450
MY_BITMAP *save_read_set, *save_write_set;
497
453
error=quick_select=0;
498
454
sort_form=param->sort_form;
499
file= sort_form->cursor;
455
file=sort_form->file;
500
456
ref_length=param->ref_length;
501
457
ref_pos= ref_buff;
502
458
quick_select=select && select->quick;
504
flag= ((!indexfile && ! file->isOrdered())
460
flag= ((!indexfile && file->ha_table_flags() & HA_REC_NOT_IN_SEQ)
505
461
|| quick_select);
506
462
if (indexfile || flag)
507
463
ref_pos= &file->ref[0];
509
465
if (! indexfile && ! quick_select)
511
467
next_pos=(unsigned char*) 0; /* Find records in sequence */
512
file->startTableScan(1);
468
file->ha_rnd_init(1);
513
469
file->extra_opt(HA_EXTRA_CACHE,
514
session->variables.read_buff_size);
470
current_session->variables.read_buff_size);
517
ReadRecord read_record_info;
473
READ_RECORD read_record_info;
518
474
if (quick_select)
520
476
if (select->quick->reset())
521
477
return(HA_POS_ERROR);
523
read_record_info.init_read_record(session, select->quick->head, select, 1, 1);
478
init_read_record(&read_record_info, current_session, select->quick->head,
526
482
/* Remember original bitmaps */
527
483
save_read_set= sort_form->read_set;
528
484
save_write_set= sort_form->write_set;
529
485
/* Set up temporary column read map for columns used by sort */
530
sort_form->tmp_set.clearAll();
486
bitmap_clear_all(&sort_form->tmp_set);
531
487
/* Temporary set for register_used_fields and register_field_in_read_map */
532
488
sort_form->read_set= &sort_form->tmp_set;
533
489
register_used_fields(param);
545
501
error= HA_ERR_END_OF_FILE;
548
file->position(sort_form->getInsertRecord());
504
file->position(sort_form->record[0]);
550
506
else /* Not quick-select */
554
if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length))
510
if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length)) /* purecov: deadcode */
556
error= errno ? errno : -1; /* Abort */
512
error= my_errno ? my_errno : -1; /* Abort */
559
error=file->rnd_pos(sort_form->getInsertRecord(),next_pos);
515
error=file->rnd_pos(sort_form->record[0],next_pos);
563
error=file->rnd_next(sort_form->getInsertRecord());
519
error=file->rnd_next(sort_form->record[0]);
521
update_virtual_fields_marked_for_write(sort_form);
567
internal::my_store_ptr(ref_pos,ref_length,record); // Position to row
568
record+= sort_form->getShare()->db_record_offset;
525
my_store_ptr(ref_pos,ref_length,record); // Position to row
526
record+= sort_form->s->db_record_offset;
571
file->position(sort_form->getInsertRecord());
529
file->position(sort_form->record[0]);
573
531
if (error && error != HA_ERR_RECORD_DELETED)
608
566
index_merge quick select uses table->sort when retrieving rows, so free
609
567
resoures it has allocated.
611
read_record_info.end_read_record();
569
end_read_record(&read_record_info);
615
573
(void) file->extra(HA_EXTRA_NO_CACHE); /* End cacheing of records */
617
file->endTableScan();
620
578
if (session->is_error())
621
579
return(HA_POS_ERROR);
623
581
/* Signal we should use orignal column read and write maps */
624
582
sort_form->column_bitmaps_set(save_read_set, save_write_set);
626
584
if (error != HA_ERR_END_OF_FILE)
628
sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
629
return(HA_POS_ERROR);
586
file->print_error(error,MYF(ME_ERROR | ME_WAITTANG)); /* purecov: inspected */
587
return(HA_POS_ERROR); /* purecov: inspected */
631
589
if (indexpos && idx &&
632
590
write_keys(param,sort_keys,idx,buffpek_pointers,tempfile))
633
return(HA_POS_ERROR);
591
return(HA_POS_ERROR); /* purecov: inspected */
634
592
return(my_b_inited(tempfile) ?
635
593
(ha_rows) (my_b_tell(tempfile)/param->rec_length) :
642
600
Sort the buffer and write:
643
601
-# the sorted sequence to tempfile
644
-# a buffpek_st describing the sorted sequence position to buffpek_pointers
602
-# a BUFFPEK describing the sorted sequence position to buffpek_pointers
646
604
(was: Skriver en buffert med nycklar till filen)
648
606
@param param Sort parameters
649
607
@param sort_keys Array of pointers to keys to sort
650
608
@param count Number of elements in sort_keys array
651
@param buffpek_pointers One 'buffpek_st' struct will be written into this file.
652
The buffpek_st::{file_pos, count} will indicate where
609
@param buffpek_pointers One 'BUFFPEK' struct will be written into this file.
610
The BUFFPEK::{file_pos, count} will indicate where
653
611
the sorted data was stored.
654
612
@param tempfile The sorted sequence will be written into this file.
663
621
write_keys(SORTPARAM *param, register unsigned char **sort_keys, uint32_t count,
664
internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
622
IO_CACHE *buffpek_pointers, IO_CACHE *tempfile)
666
624
size_t sort_length, rec_length;
667
625
unsigned char **end;
670
628
sort_length= param->sort_length;
671
629
rec_length= param->rec_length;
672
internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
630
my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
673
631
if (!my_b_inited(tempfile) &&
674
open_cached_file(tempfile, drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE,
632
open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
634
goto err; /* purecov: inspected */
677
635
/* check we won't have more buffpeks than we can possibly keep in memory */
678
if (my_b_tell(buffpek_pointers) + sizeof(buffpek_st) > (uint64_t)UINT_MAX)
636
if (my_b_tell(buffpek_pointers) + sizeof(BUFFPEK) > (uint64_t)UINT_MAX)
680
638
buffpek.file_pos= my_b_tell(tempfile);
681
639
if ((ha_rows) count > param->max_rows)
682
count=(uint32_t) param->max_rows;
640
count=(uint32_t) param->max_rows; /* purecov: inspected */
683
641
buffpek.count=(ha_rows) count;
684
642
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
685
643
if (my_b_write(tempfile, (unsigned char*) *sort_keys, (uint32_t) rec_length))
992
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
953
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
993
954
filesort_info_st *table_sort)
995
956
uint32_t offset,res_length;
996
957
unsigned char *to;
998
internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, param->sort_length);
959
my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, param->sort_length);
999
960
res_length= param->res_length;
1000
961
offset= param->rec_length-res_length;
1001
962
if ((ha_rows) count > param->max_rows)
1002
963
count=(uint32_t) param->max_rows;
1003
if (!(to= table_sort->record_pointers=
1004
(unsigned char*) malloc(res_length*count)))
964
if (!(to= table_sort->record_pointers=
965
(unsigned char*) my_malloc(res_length*count, MYF(MY_WME))))
966
return(1); /* purecov: inspected */
1006
967
for (unsigned char **end= sort_keys+count ; sort_keys != end ; sort_keys++)
1008
969
memcpy(to, *sort_keys+offset, res_length);
1015
976
/** Merge buffers to make < MERGEBUFF2 buffers. */
1017
978
int merge_many_buff(SORTPARAM *param, unsigned char *sort_buffer,
1018
buffpek_st *buffpek, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
979
BUFFPEK *buffpek, uint32_t *maxbuffer, IO_CACHE *t_file)
1020
981
register uint32_t i;
1021
internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
1022
buffpek_st *lastbuff;
982
IO_CACHE t_file2,*from_file,*to_file,*temp;
1024
985
if (*maxbuffer < MERGEBUFF2)
986
return(0); /* purecov: inspected */
1026
987
if (flush_io_cache(t_file) ||
1027
open_cached_file(&t_file2,drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE,
988
open_cached_file(&t_file2,mysql_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
990
return(1); /* purecov: inspected */
1031
992
from_file= t_file ; to_file= &t_file2;
1032
993
while (*maxbuffer >= MERGEBUFF2)
1034
if (reinit_io_cache(from_file,internal::READ_CACHE,0L,0,0))
995
if (reinit_io_cache(from_file,READ_CACHE,0L,0,0))
1036
if (reinit_io_cache(to_file,internal::WRITE_CACHE,0L,0,0))
997
if (reinit_io_cache(to_file,WRITE_CACHE,0L,0,0))
1038
999
lastbuff=buffpek;
1039
1000
for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
1071
1032
(uint32_t)-1 if something goes wrong
1074
uint32_t read_to_buffer(internal::IO_CACHE *fromfile, buffpek_st *buffpek,
1075
uint32_t rec_length)
1035
uint32_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
1036
uint32_t rec_length)
1077
1038
register uint32_t count;
1078
1039
uint32_t length;
1080
if ((count= (uint32_t) min((ha_rows) buffpek->max_keys,buffpek->count)))
1041
if ((count=(uint32_t) cmin((ha_rows) buffpek->max_keys,buffpek->count)))
1082
1043
if (pread(fromfile->file,(unsigned char*) buffpek->base, (length= rec_length*count),buffpek->file_pos) == 0)
1083
return((uint32_t) -1);
1085
buffpek->key= buffpek->base;
1044
return((uint32_t) -1); /* purecov: inspected */
1045
buffpek->key=buffpek->base;
1086
1046
buffpek->file_pos+= length; /* New filepos */
1087
buffpek->count-= count;
1047
buffpek->count-= count;
1088
1048
buffpek->mem_count= count;
1090
1050
return (count*rec_length);
1091
1051
} /* read_to_buffer */
1094
class compare_functor
1055
Put all room used by freed buffer to use in adjacent buffer.
1057
Note, that we can't simply distribute memory evenly between all buffers,
1058
because new areas must not overlap with old ones.
1060
@param[in] queue list of non-empty buffers, without freed buffer
1061
@param[in] reuse empty buffer
1062
@param[in] key_length key length
1065
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint32_t key_length)
1096
qsort2_cmp key_compare;
1097
void *key_compare_arg;
1099
compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
1100
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
1101
inline bool operator()(const buffpek_st *i, const buffpek_st *j) const
1067
unsigned char *reuse_end= reuse->base + reuse->max_keys * key_length;
1068
for (uint32_t i= 0; i < queue->elements; ++i)
1103
int val= key_compare(key_compare_arg,
1070
BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
1071
if (bp->base + bp->max_keys * key_length == reuse->base)
1073
bp->max_keys+= reuse->max_keys;
1076
else if (bp->base == reuse_end)
1078
bp->base= reuse->base;
1079
bp->max_keys+= reuse->max_keys;
1111
1088
Merge buffers to one buffer.
1113
1090
@param param Sort parameter
1114
@param from_file File with source data (buffpek_sts point to this file)
1091
@param from_file File with source data (BUFFPEKs point to this file)
1115
1092
@param to_file File to write the sorted result data.
1116
1093
@param sort_buffer Buffer for data to store up to MERGEBUFF2 sort keys.
1117
@param lastbuff OUT Store here buffpek_st describing data written to to_file
1118
@param Fb First element in source buffpek_sts array
1119
@param Tb Last element in source buffpek_sts array
1094
@param lastbuff OUT Store here BUFFPEK describing data written to to_file
1095
@param Fb First element in source BUFFPEKs array
1096
@param Tb Last element in source BUFFPEKs array
1173
cmp= internal::get_ptr_compare(sort_length);
1151
cmp= get_ptr_compare(sort_length);
1174
1152
first_cmp_arg= (void*) &sort_length;
1176
priority_queue<buffpek_st *, vector<buffpek_st *>, compare_functor >
1177
queue(compare_functor(cmp, first_cmp_arg));
1154
if (init_queue(&queue, (uint32_t) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
1155
(queue_compare) cmp, first_cmp_arg))
1156
return(1); /* purecov: inspected */
1178
1157
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1180
1159
buffpek->base= strpos;
1197
1176
This is safe as we know that there is always more than one element
1198
1177
in each block to merge (This is guaranteed by the Unique:: algorithm
1200
buffpek= queue.top();
1179
buffpek= (BUFFPEK*) queue_top(&queue);
1201
1180
memcpy(param->unique_buff, buffpek->key, rec_length);
1202
1181
if (my_b_write(to_file, (unsigned char*) buffpek->key, rec_length))
1183
error=1; goto err; /* purecov: inspected */
1206
1185
buffpek->key+= rec_length;
1207
1186
buffpek->mem_count--;
1208
1187
if (!--max_rows)
1189
error= 0; /* purecov: inspected */
1190
goto end; /* purecov: inspected */
1213
/* Top element has been used */
1215
queue.push(buffpek);
1192
queue_replaced(&queue); // Top element has been used
1218
1195
cmp= 0; // Not unique
1220
while (queue.size() > 1)
1197
while (queue.elements > 1)
1201
error= 1; goto err; /* purecov: inspected */
1228
buffpek= queue.top();
1205
buffpek= (BUFFPEK*) queue_top(&queue);
1229
1206
if (cmp) // Remove duplicates
1231
1208
if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
1333
1310
/* Do a merge to output-file (save only positions) */
1335
1312
static int merge_index(SORTPARAM *param, unsigned char *sort_buffer,
1336
buffpek_st *buffpek, uint32_t maxbuffer,
1337
internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
1313
BUFFPEK *buffpek, uint32_t maxbuffer,
1314
IO_CACHE *tempfile, IO_CACHE *outfile)
1339
1316
if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek,buffpek,
1340
1317
buffpek+maxbuffer,1))
1318
return(1); /* purecov: inspected */
1343
1320
} /* merge_index */