1
/* Copyright (C) 2000-2006 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Creates a index for a database by reading keys, sorting them and outputing
18
them in sorted order through SORT_INFO functions.
22
#if defined(MSDOS) || defined(__WIN__)
29
/* static variables */
31
#undef MIN_SORT_MEMORY
33
#undef DISK_BUFFER_SIZE
37
#define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
38
#define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
39
#define DISK_BUFFER_SIZE (IO_SIZE*16)
43
Pointers of functions for store and read keys from temp file
46
extern void print_error _VARARGS((const char *fmt,...));
48
/* Functions defined in this file */
50
static ha_rows find_all_keys(MI_SORT_PARAM *info,uint keys,
52
DYNAMIC_ARRAY *buffpek,int *maxbuffer,
54
IO_CACHE *tempfile_for_exceptions);
55
static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
56
uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
57
static int write_key(MI_SORT_PARAM *info, uchar *key,
59
static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
61
static int merge_many_buff(MI_SORT_PARAM *info,uint keys,
63
BUFFPEK *buffpek,int *maxbuffer,
65
static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
67
static int merge_buffers(MI_SORT_PARAM *info,uint keys,
68
IO_CACHE *from_file, IO_CACHE *to_file,
69
uchar * *sort_keys, BUFFPEK *lastbuff,
70
BUFFPEK *Fb, BUFFPEK *Tb);
71
static int merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
73
static int flush_ft_buf(MI_SORT_PARAM *info);
75
static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
76
uint count, BUFFPEK *buffpek,
78
static uint read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
80
static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
81
uchar *key, uint sort_length, uint count);
82
static int write_merge_key_varlen(MI_SORT_PARAM *info,
84
uchar* key, uint sort_length,
87
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
90
Creates a index of sorted keys
93
_create_index_by_sort()
95
no_messages Set to 1 if no output
96
sortbuff_size Size if sortbuffer to allocate
103
int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
106
int error,maxbuffer,skr;
107
uint memavl,old_memavl,keys,sort_length;
108
DYNAMIC_ARRAY buffpek;
111
IO_CACHE tempfile, tempfile_for_exceptions;
112
DBUG_ENTER("_create_index_by_sort");
113
DBUG_PRINT("enter",("sort_length: %d", info->key_length));
115
if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
117
info->write_keys=write_keys_varlen;
118
info->read_to_buffer=read_to_buffer_varlen;
119
info->write_key= write_merge_key_varlen;
123
info->write_keys=write_keys;
124
info->read_to_buffer=read_to_buffer;
125
info->write_key=write_merge_key;
128
my_b_clear(&tempfile);
129
my_b_clear(&tempfile_for_exceptions);
130
bzero((char*) &buffpek,sizeof(buffpek));
131
sort_keys= (uchar **) NULL; error= 1;
134
memavl=max(sortbuff_size,MIN_SORT_MEMORY);
135
records= info->sort_info->max_records;
136
sort_length= info->key_length;
138
while (memavl >= MIN_SORT_MEMORY)
140
if ((records < UINT_MAX32) &&
141
((my_off_t) (records + 1) *
142
(sort_length + sizeof(char*)) <= (my_off_t) memavl))
143
keys= (uint)records+1;
148
if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
149
(keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
150
(sort_length+sizeof(char*))) <= 1 ||
151
keys < (uint) maxbuffer)
153
mi_check_print_error(info->sort_info->param,
154
"myisam_sort_buffer_size is too small");
158
while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
160
if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
161
HA_FT_MAXBYTELEN, MYF(0))))
163
if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
166
my_free((uchar*) sort_keys,MYF(0));
173
if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
174
memavl=MIN_SORT_MEMORY;
176
if (memavl < MIN_SORT_MEMORY)
178
mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
179
goto err; /* purecov: tested */
181
(*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
184
printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
186
if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
187
&tempfile,&tempfile_for_exceptions))
189
goto err; /* purecov: tested */
193
printf(" - Dumping %lu keys\n", (ulong) records);
194
if (write_index(info,sort_keys, (uint) records))
195
goto err; /* purecov: inspected */
199
keys=(keys*(sort_length+sizeof(char*)))/sort_length;
200
if (maxbuffer >= MERGEBUFF2)
203
printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */
204
if (merge_many_buff(info,keys,sort_keys,
205
dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
206
goto err; /* purecov: inspected */
208
if (flush_io_cache(&tempfile) ||
209
reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
210
goto err; /* purecov: inspected */
212
printf(" - Last merge and dumping keys\n"); /* purecov: tested */
213
if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
214
maxbuffer,&tempfile))
215
goto err; /* purecov: inspected */
218
if (flush_ft_buf(info) || flush_pending_blocks(info))
221
if (my_b_inited(&tempfile_for_exceptions))
223
MI_INFO *idx=info->sort_info->info;
224
uint keyno=info->key;
225
uint key_length, ref_length=idx->s->rec_reflength;
228
printf(" - Adding exceptions\n"); /* purecov: tested */
229
if (flush_io_cache(&tempfile_for_exceptions) ||
230
reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
233
while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
235
&& !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
238
if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
247
my_free((uchar*) sort_keys,MYF(0));
248
delete_dynamic(&buffpek);
249
close_cached_file(&tempfile);
250
close_cached_file(&tempfile_for_exceptions);
252
DBUG_RETURN(error ? -1 : 0);
253
} /* _create_index_by_sort */
256
/* Search after all keys and place them in a temp. file */
258
static ha_rows find_all_keys(MI_SORT_PARAM *info, uint keys,
259
uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
260
int *maxbuffer, IO_CACHE *tempfile,
261
IO_CACHE *tempfile_for_exceptions)
265
DBUG_ENTER("find_all_keys");
268
sort_keys[0]=(uchar*) (sort_keys+keys);
270
while (!(error=(*info->key_read)(info,sort_keys[idx])))
272
if (info->real_key_length > info->key_length)
274
if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
275
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
281
if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
283
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
285
sort_keys[0]=(uchar*) (sort_keys+keys);
286
memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
289
sort_keys[idx]=sort_keys[idx-1]+info->key_length;
292
DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
293
if (buffpek->elements)
295
if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
297
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
298
*maxbuffer=buffpek->elements-1;
303
DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
304
} /* find_all_keys */
308
/* Search after all keys and place them in a temp. file */
310
pthread_handler_t thr_find_all_keys(void *arg)
312
MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
314
uint memavl,old_memavl,keys,sort_length;
320
if (my_thread_init())
323
{ /* Add extra block since DBUG_ENTER declare variables */
324
DBUG_ENTER("thr_find_all_keys");
325
DBUG_PRINT("enter", ("master: %d", sort_param->master));
326
if (sort_param->sort_info->got_error)
329
if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
331
sort_param->write_keys= write_keys_varlen;
332
sort_param->read_to_buffer= read_to_buffer_varlen;
333
sort_param->write_key= write_merge_key_varlen;
337
sort_param->write_keys= write_keys;
338
sort_param->read_to_buffer= read_to_buffer;
339
sort_param->write_key= write_merge_key;
342
my_b_clear(&sort_param->tempfile);
343
my_b_clear(&sort_param->tempfile_for_exceptions);
344
bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
345
bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
346
sort_keys= (uchar **) NULL;
348
memavl= max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
349
idx= (uint)sort_param->sort_info->max_records;
350
sort_length= sort_param->key_length;
353
while (memavl >= MIN_SORT_MEMORY)
355
if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
364
if (memavl < sizeof(BUFFPEK)*maxbuffer ||
365
(keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
366
(sort_length+sizeof(char*))) <= 1 ||
367
keys < (uint) maxbuffer)
369
mi_check_print_error(sort_param->sort_info->param,
370
"myisam_sort_buffer_size is too small");
374
while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
376
if ((sort_keys= (uchar**)
377
my_malloc(keys*(sort_length+sizeof(char*))+
378
((sort_param->keyinfo->flag & HA_FULLTEXT) ?
379
HA_FT_MAXBYTELEN : 0), MYF(0))))
381
if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
382
maxbuffer, maxbuffer/2))
384
my_free((uchar*) sort_keys,MYF(0));
385
sort_keys= (uchar **) NULL; /* for err: label */
391
if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
392
old_memavl > MIN_SORT_MEMORY)
393
memavl= MIN_SORT_MEMORY;
395
if (memavl < MIN_SORT_MEMORY)
397
mi_check_print_error(sort_param->sort_info->param,
398
"MyISAM sort buffer too small");
399
goto err; /* purecov: tested */
402
if (sort_param->sort_info->param->testflag & T_VERBOSE)
403
printf("Key %d - Allocating buffer for %d keys\n",
404
sort_param->key + 1, keys);
405
sort_param->sort_keys= sort_keys;
408
sort_keys[0]= (uchar*) (sort_keys+keys);
410
DBUG_PRINT("info", ("reading keys"));
411
while (!(error= sort_param->sort_info->got_error) &&
412
!(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
414
if (sort_param->real_key_length > sort_param->key_length)
416
if (write_key(sort_param, sort_keys[idx],
417
&sort_param->tempfile_for_exceptions))
424
if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
425
(BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
426
&sort_param->tempfile))
428
sort_keys[0]= (uchar*) (sort_keys+keys);
429
memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
432
sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
436
if (sort_param->buffpek.elements)
438
if (sort_param->write_keys(sort_param, sort_keys, idx,
439
(BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
440
&sort_param->tempfile))
442
sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
445
sort_param->keys= idx;
447
sort_param->sort_keys_length= keys;
451
DBUG_PRINT("error", ("got some error"));
452
sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
454
my_free((uchar*) sort_keys,MYF(0));
455
sort_param->sort_keys= 0;
456
delete_dynamic(& sort_param->buffpek);
457
close_cached_file(&sort_param->tempfile);
458
close_cached_file(&sort_param->tempfile_for_exceptions);
461
free_root(&sort_param->wordroot, MYF(0));
463
Detach from the share if the writer is involved. Avoid others to
464
be blocked. This includes a flush of the write buffer. This will
465
also indicate EOF to the readers.
467
if (sort_param->sort_info->info->rec_cache.share)
468
remove_io_thread(&sort_param->sort_info->info->rec_cache);
470
/* Readers detach from the share if any. Avoid others to be blocked. */
471
if (sort_param->read_cache.share)
472
remove_io_thread(&sort_param->read_cache);
474
pthread_mutex_lock(&sort_param->sort_info->mutex);
475
if (!--sort_param->sort_info->threads_running)
476
pthread_cond_signal(&sort_param->sort_info->cond);
477
pthread_mutex_unlock(&sort_param->sort_info->mutex);
478
DBUG_PRINT("exit", ("======== ending thread ========"));
485
int thr_write_keys(MI_SORT_PARAM *sort_param)
487
SORT_INFO *sort_info= sort_param->sort_info;
488
MI_CHECK *param= sort_info->param;
489
ulong length= 0, keys;
490
ulong *rec_per_key_part=param->rec_per_key_part;
491
int got_error=sort_info->got_error;
493
MI_INFO *info=sort_info->info;
494
MYISAM_SHARE *share=info->s;
495
MI_SORT_PARAM *sinfo;
497
DBUG_ENTER("thr_write_keys");
499
for (i= 0, sinfo= sort_param ;
500
i < sort_info->total_keys ;
501
i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
503
if (!sinfo->sort_keys)
506
my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
507
MYF(MY_ALLOW_ZERO_PTR));
512
mi_set_key_active(share->state.key_map, sinfo->key);
513
if (!sinfo->buffpek.elements)
515
if (param->testflag & T_VERBOSE)
517
printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
520
if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
521
flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
524
if (!got_error && param->testflag & T_STATISTICS)
525
update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
526
param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
527
sinfo->notnull: NULL,
528
(ulonglong) info->state->records);
530
my_free((uchar*) sinfo->sort_keys,MYF(0));
531
my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
532
MYF(MY_ALLOW_ZERO_PTR));
536
for (i= 0, sinfo= sort_param ;
537
i < sort_info->total_keys ;
539
delete_dynamic(&sinfo->buffpek),
540
close_cached_file(&sinfo->tempfile),
541
close_cached_file(&sinfo->tempfile_for_exceptions),
546
if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
548
sinfo->write_keys=write_keys_varlen;
549
sinfo->read_to_buffer=read_to_buffer_varlen;
550
sinfo->write_key=write_merge_key_varlen;
554
sinfo->write_keys=write_keys;
555
sinfo->read_to_buffer=read_to_buffer;
556
sinfo->write_key=write_merge_key;
558
if (sinfo->buffpek.elements)
560
uint maxbuffer=sinfo->buffpek.elements-1;
563
length=param->sort_buffer_length;
564
while (length >= MIN_SORT_MEMORY)
566
if ((mergebuf= my_malloc(length, MYF(0))))
576
keys=length/sinfo->key_length;
577
if (maxbuffer >= MERGEBUFF2)
579
if (param->testflag & T_VERBOSE)
580
printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
581
if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
582
dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
583
(int*) &maxbuffer, &sinfo->tempfile))
589
if (flush_io_cache(&sinfo->tempfile) ||
590
reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
595
if (param->testflag & T_VERBOSE)
596
printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
597
if (merge_index(sinfo, keys, (uchar **)mergebuf,
598
dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
599
maxbuffer,&sinfo->tempfile) ||
600
flush_ft_buf(sinfo) ||
601
flush_pending_blocks(sinfo))
607
if (my_b_inited(&sinfo->tempfile_for_exceptions))
611
if (param->testflag & T_VERBOSE)
612
printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
614
if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
615
reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
622
!my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
625
uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
626
if (key_length > sizeof(ft_buf) ||
627
my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
629
_mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
630
key_length - info->s->rec_reflength))
635
my_free((uchar*) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
636
DBUG_RETURN(got_error);
640
/* Write all keys in memory to file for later merge */
642
static int write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
643
uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
646
uint sort_length=info->key_length;
647
DBUG_ENTER("write_keys");
649
my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
651
if (!my_b_inited(tempfile) &&
652
open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
653
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
654
DBUG_RETURN(1); /* purecov: inspected */
656
buffpek->file_pos=my_b_tell(tempfile);
657
buffpek->count=count;
659
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
661
if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
662
DBUG_RETURN(1); /* purecov: inspected */
669
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
672
uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
674
/* The following is safe as this is a local file */
675
if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
677
if ((err= my_b_write(to_file,bufs, (uint) len)))
683
static int write_keys_varlen(MI_SORT_PARAM *info,
684
register uchar **sort_keys,
685
uint count, BUFFPEK *buffpek,
690
DBUG_ENTER("write_keys_varlen");
692
my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
694
if (!my_b_inited(tempfile) &&
695
open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
696
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
697
DBUG_RETURN(1); /* purecov: inspected */
699
buffpek->file_pos=my_b_tell(tempfile);
700
buffpek->count=count;
701
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
703
if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
707
} /* write_keys_varlen */
710
static int write_key(MI_SORT_PARAM *info, uchar *key,
713
uint key_length=info->real_key_length;
714
DBUG_ENTER("write_key");
716
if (!my_b_inited(tempfile) &&
717
open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
718
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
721
if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
722
my_b_write(tempfile,(uchar*)key,(uint) key_length))
730
static int write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
733
DBUG_ENTER("write_index");
735
my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
736
(qsort2_cmp) info->key_cmp,info);
739
if ((*info->key_write)(info,*sort_keys++))
740
DBUG_RETURN(-1); /* purecov: inspected */
746
/* Merge buffers to make < MERGEBUFF2 buffers */
748
static int merge_many_buff(MI_SORT_PARAM *info, uint keys,
749
uchar **sort_keys, BUFFPEK *buffpek,
750
int *maxbuffer, IO_CACHE *t_file)
753
IO_CACHE t_file2, *from_file, *to_file, *temp;
755
DBUG_ENTER("merge_many_buff");
757
if (*maxbuffer < MERGEBUFF2)
758
DBUG_RETURN(0); /* purecov: inspected */
759
if (flush_io_cache(t_file) ||
760
open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
761
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
762
DBUG_RETURN(1); /* purecov: inspected */
764
from_file= t_file ; to_file= &t_file2;
765
while (*maxbuffer >= MERGEBUFF2)
767
reinit_io_cache(from_file,READ_CACHE,0L,0,0);
768
reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
770
for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
772
if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
773
buffpek+i,buffpek+i+MERGEBUFF-1))
776
if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
777
buffpek+i,buffpek+ *maxbuffer))
778
break; /* purecov: inspected */
779
if (flush_io_cache(to_file))
780
break; /* purecov: inspected */
781
temp=from_file; from_file=to_file; to_file=temp;
782
*maxbuffer= (int) (lastbuff-buffpek)-1;
785
close_cached_file(to_file); /* This holds old result */
786
if (to_file == t_file)
787
*t_file=t_file2; /* Copy result file */
789
DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
790
} /* merge_many_buff */
798
fromfile File to read from
799
buffpek Where to read from
800
sort_length max length to read
802
> 0 Ammount of bytes read
806
static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
812
if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
814
if (my_pread(fromfile->file,(uchar*) buffpek->base,
815
(length= sort_length*count),buffpek->file_pos,MYF_RW))
816
return((uint) -1); /* purecov: inspected */
817
buffpek->key=buffpek->base;
818
buffpek->file_pos+= length; /* New filepos */
819
buffpek->count-= count;
820
buffpek->mem_count= count;
822
return (count*sort_length);
823
} /* read_to_buffer */
825
static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
829
uint16 length_of_key = 0;
833
if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
835
buffp = buffpek->base;
837
for (idx=1;idx<=count;idx++)
839
if (my_pread(fromfile->file,(uchar*)&length_of_key,sizeof(length_of_key),
840
buffpek->file_pos,MYF_RW))
842
buffpek->file_pos+=sizeof(length_of_key);
843
if (my_pread(fromfile->file,(uchar*) buffp,length_of_key,
844
buffpek->file_pos,MYF_RW))
846
buffpek->file_pos+=length_of_key;
847
buffp = buffp + sort_length;
849
buffpek->key=buffpek->base;
850
buffpek->count-= count;
851
buffpek->mem_count= count;
853
return (count*sort_length);
854
} /* read_to_buffer_varlen */
857
static int write_merge_key_varlen(MI_SORT_PARAM *info,
858
IO_CACHE *to_file, uchar* key,
859
uint sort_length, uint count)
864
for (idx=1;idx<=count;idx++)
867
if ((err= my_var_write(info, to_file, bufs)))
869
bufs=bufs+sort_length;
875
static int write_merge_key(MI_SORT_PARAM *info __attribute__((unused)),
876
IO_CACHE *to_file, uchar *key,
877
uint sort_length, uint count)
879
return my_b_write(to_file, key, (size_t) sort_length*count);
883
Merge buffers to one buffer
884
If to_file == 0 then use info->key_write
888
merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
889
IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
890
BUFFPEK *Fb, BUFFPEK *Tb)
893
uint sort_length,maxcount;
895
my_off_t to_start_filepos= 0;
897
BUFFPEK *buffpek,**refpek;
899
volatile int *killed= killed_ptr(info->sort_info->param);
900
DBUG_ENTER("merge_buffers");
903
maxcount=keys/((uint) (Tb-Fb) +1);
904
DBUG_ASSERT(maxcount > 0);
906
to_start_filepos=my_b_tell(to_file);
907
strpos=(uchar*) sort_keys;
908
sort_length=info->key_length;
910
if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
911
(int (*)(void*, uchar *,uchar*)) info->key_cmp,
913
DBUG_RETURN(1); /* purecov: inspected */
915
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
917
count+= buffpek->count;
918
buffpek->base= strpos;
919
buffpek->max_keys=maxcount;
920
strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
923
goto err; /* purecov: inspected */
924
queue_insert(&queue,(uchar*) buffpek);
927
while (queue.elements > 1)
935
buffpek=(BUFFPEK*) queue_top(&queue);
938
if (info->write_key(info,to_file,(uchar*) buffpek->key,
939
(uint) sort_length,1))
941
error=1; goto err; /* purecov: inspected */
946
if ((*info->key_write)(info,(void*) buffpek->key))
948
error=1; goto err; /* purecov: inspected */
951
buffpek->key+=sort_length;
952
if (! --buffpek->mem_count)
954
if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
956
uchar *base=buffpek->base;
957
uint max_keys=buffpek->max_keys;
959
VOID(queue_remove(&queue,0));
961
/* Put room used by buffer to use in other buffer */
962
for (refpek= (BUFFPEK**) &queue_top(&queue);
963
refpek <= (BUFFPEK**) &queue_end(&queue);
967
if (buffpek->base+buffpek->max_keys*sort_length == base)
969
buffpek->max_keys+=max_keys;
972
else if (base+max_keys*sort_length == buffpek->base)
975
buffpek->max_keys+=max_keys;
979
break; /* One buffer have been removed */
982
else if (error == -1)
983
goto err; /* purecov: inspected */
984
queue_replaced(&queue); /* Top element has been replaced */
987
buffpek=(BUFFPEK*) queue_top(&queue);
988
buffpek->base=(uchar *) sort_keys;
989
buffpek->max_keys=keys;
994
if (info->write_key(info,to_file,(uchar*) buffpek->key,
995
sort_length,buffpek->mem_count))
997
error=1; goto err; /* purecov: inspected */
1002
register uchar *end;
1003
strpos= buffpek->key;
1004
for (end=strpos+buffpek->mem_count*sort_length;
1006
strpos+=sort_length)
1008
if ((*info->key_write)(info,(void*) strpos))
1010
error=1; goto err; /* purecov: inspected */
1015
while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
1018
lastbuff->count=count;
1020
lastbuff->file_pos=to_start_filepos;
1022
delete_queue(&queue);
1024
} /* merge_buffers */
1027
/* Do a merge to output-file (save only positions) */
1030
merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
1031
BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1033
DBUG_ENTER("merge_index");
1034
if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1036
DBUG_RETURN(1); /* purecov: inspected */
1041
flush_ft_buf(MI_SORT_PARAM *info)
1044
if (info->sort_info->ft_buf)
1046
err=sort_ft_buf_flush(info);
1047
my_free((uchar*)info->sort_info->ft_buf, MYF(0));
1048
info->sort_info->ft_buf=0;