~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2000-2006 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Creates a index for a database by reading keys, sorting them and outputing
18
  them in sorted order through SORT_INFO functions.
19
*/
20
21
#include "fulltext.h"
22
#if defined(MSDOS) || defined(__WIN__)
23
#include <fcntl.h>
24
#else
25
#include <stddef.h>
26
#endif
27
#include <queues.h>
28
29
/* static variables */
30
31
#undef MIN_SORT_MEMORY
32
#undef MYF_RW
33
#undef DISK_BUFFER_SIZE
34
35
#define MERGEBUFF 15
36
#define MERGEBUFF2 31
37
#define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
38
#define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
39
#define DISK_BUFFER_SIZE (IO_SIZE*16)
40
41
42
/*
43
 Pointers of functions for store and read keys from temp file
44
*/
45
46
extern void print_error _VARARGS((const char *fmt,...));
47
48
/* Functions defined in this file */
49
50
static ha_rows  find_all_keys(MI_SORT_PARAM *info,uint keys,
51
                                    uchar **sort_keys,
52
                                    DYNAMIC_ARRAY *buffpek,int *maxbuffer,
53
                                    IO_CACHE *tempfile,
54
                                    IO_CACHE *tempfile_for_exceptions);
55
static int  write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
56
                             uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
57
static int  write_key(MI_SORT_PARAM *info, uchar *key,
58
			    IO_CACHE *tempfile);
59
static int  write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
60
                              uint count);
61
static int  merge_many_buff(MI_SORT_PARAM *info,uint keys,
62
                                  uchar * *sort_keys,
63
                                  BUFFPEK *buffpek,int *maxbuffer,
64
                                  IO_CACHE *t_file);
65
static uint  read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
66
                                  uint sort_length);
67
static int  merge_buffers(MI_SORT_PARAM *info,uint keys,
68
                                IO_CACHE *from_file, IO_CACHE *to_file,
69
                                uchar * *sort_keys, BUFFPEK *lastbuff,
70
                                BUFFPEK *Fb, BUFFPEK *Tb);
71
static int  merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
72
                              IO_CACHE *);
73
static int flush_ft_buf(MI_SORT_PARAM *info);
74
75
static int  write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
76
                                    uint count, BUFFPEK *buffpek,
77
                                    IO_CACHE *tempfile);
78
static uint  read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
79
                                         uint sort_length);
80
static int  write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
81
                                  uchar *key, uint sort_length, uint count);
82
static int  write_merge_key_varlen(MI_SORT_PARAM *info,
83
					 IO_CACHE *to_file,
84
					 uchar* key, uint sort_length,
85
					 uint count);
86
static inline int
87
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
88
89
/*
90
  Creates a index of sorted keys
91
92
  SYNOPSIS
93
    _create_index_by_sort()
94
    info		Sort parameters
95
    no_messages		Set to 1 if no output
96
    sortbuff_size	Size if sortbuffer to allocate
97
98
  RESULT
99
    0	ok
100
   <> 0 Error
101
*/
102
103
int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
104
			  ulong sortbuff_size)
105
{
106
  int error,maxbuffer,skr;
107
  uint memavl,old_memavl,keys,sort_length;
108
  DYNAMIC_ARRAY buffpek;
109
  ha_rows records;
110
  uchar **sort_keys;
111
  IO_CACHE tempfile, tempfile_for_exceptions;
112
  DBUG_ENTER("_create_index_by_sort");
113
  DBUG_PRINT("enter",("sort_length: %d", info->key_length));
114
115
  if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
116
  {
117
    info->write_keys=write_keys_varlen;
118
    info->read_to_buffer=read_to_buffer_varlen;
119
    info->write_key= write_merge_key_varlen;
120
  }
121
  else
122
  {
123
    info->write_keys=write_keys;
124
    info->read_to_buffer=read_to_buffer;
125
    info->write_key=write_merge_key;
126
  }
127
128
  my_b_clear(&tempfile);
129
  my_b_clear(&tempfile_for_exceptions);
130
  bzero((char*) &buffpek,sizeof(buffpek));
131
  sort_keys= (uchar **) NULL; error= 1;
132
  maxbuffer=1;
133
134
  memavl=max(sortbuff_size,MIN_SORT_MEMORY);
135
  records=	info->sort_info->max_records;
136
  sort_length=	info->key_length;
137
138
  while (memavl >= MIN_SORT_MEMORY)
139
  {
140
    if ((records < UINT_MAX32) && 
141
       ((my_off_t) (records + 1) * 
142
        (sort_length + sizeof(char*)) <= (my_off_t) memavl))
143
      keys= (uint)records+1;
144
    else
145
      do
146
      {
147
	skr=maxbuffer;
148
	if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
149
	    (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
150
             (sort_length+sizeof(char*))) <= 1 ||
151
            keys < (uint) maxbuffer)
152
	{
153
	  mi_check_print_error(info->sort_info->param,
154
			       "myisam_sort_buffer_size is too small");
155
	  goto err;
156
	}
157
      }
158
      while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
159
160
    if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
161
				       HA_FT_MAXBYTELEN, MYF(0))))
162
    {
163
      if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
164
			     maxbuffer/2))
165
      {
166
	my_free((uchar*) sort_keys,MYF(0));
167
        sort_keys= 0;
168
      }
169
      else
170
	break;
171
    }
172
    old_memavl=memavl;
173
    if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
174
      memavl=MIN_SORT_MEMORY;
175
  }
176
  if (memavl < MIN_SORT_MEMORY)
177
  {
178
    mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
179
    goto err; /* purecov: tested */
180
  }
181
  (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
182
183
  if (!no_messages)
184
    printf("  - Searching for keys, allocating buffer for %d keys\n",keys);
185
186
  if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
187
                                  &tempfile,&tempfile_for_exceptions))
188
      == HA_POS_ERROR)
189
    goto err; /* purecov: tested */
190
  if (maxbuffer == 0)
191
  {
192
    if (!no_messages)
193
      printf("  - Dumping %lu keys\n", (ulong) records);
194
    if (write_index(info,sort_keys, (uint) records))
195
      goto err; /* purecov: inspected */
196
  }
197
  else
198
  {
199
    keys=(keys*(sort_length+sizeof(char*)))/sort_length;
200
    if (maxbuffer >= MERGEBUFF2)
201
    {
202
      if (!no_messages)
203
	printf("  - Merging %lu keys\n", (ulong) records); /* purecov: tested */
204
      if (merge_many_buff(info,keys,sort_keys,
205
                  dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
206
	goto err;				/* purecov: inspected */
207
    }
208
    if (flush_io_cache(&tempfile) ||
209
	reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
210
      goto err;					/* purecov: inspected */
211
    if (!no_messages)
212
      printf("  - Last merge and dumping keys\n"); /* purecov: tested */
213
    if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
214
                    maxbuffer,&tempfile))
215
      goto err;					/* purecov: inspected */
216
  }
217
218
  if (flush_ft_buf(info) || flush_pending_blocks(info))
219
    goto err;
220
221
  if (my_b_inited(&tempfile_for_exceptions))
222
  {
223
    MI_INFO *idx=info->sort_info->info;
224
    uint     keyno=info->key;
225
    uint     key_length, ref_length=idx->s->rec_reflength;
226
227
    if (!no_messages)
228
      printf("  - Adding exceptions\n"); /* purecov: tested */
229
    if (flush_io_cache(&tempfile_for_exceptions) ||
230
	reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
231
      goto err;
232
233
    while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
234
		      sizeof(key_length))
235
        && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
236
		      (uint) key_length))
237
    {
238
	if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
239
	  goto err;
240
    }
241
  }
242
243
  error =0;
244
245
err:
246
  if (sort_keys)
247
    my_free((uchar*) sort_keys,MYF(0));
248
  delete_dynamic(&buffpek);
249
  close_cached_file(&tempfile);
250
  close_cached_file(&tempfile_for_exceptions);
251
252
  DBUG_RETURN(error ? -1 : 0);
253
} /* _create_index_by_sort */
254
255
256
/* Search after all keys and place them in a temp. file */
257
258
static ha_rows  find_all_keys(MI_SORT_PARAM *info, uint keys,
259
				    uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
260
				    int *maxbuffer, IO_CACHE *tempfile,
261
				    IO_CACHE *tempfile_for_exceptions)
262
{
263
  int error;
264
  uint idx;
265
  DBUG_ENTER("find_all_keys");
266
267
  idx=error=0;
268
  sort_keys[0]=(uchar*) (sort_keys+keys);
269
270
  while (!(error=(*info->key_read)(info,sort_keys[idx])))
271
  {
272
    if (info->real_key_length > info->key_length)
273
    {
274
      if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
275
        DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
276
      continue;
277
    }
278
279
    if (++idx == keys)
280
    {
281
      if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
282
		     tempfile))
283
      DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
284
285
      sort_keys[0]=(uchar*) (sort_keys+keys);
286
      memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
287
      idx=1;
288
    }
289
    sort_keys[idx]=sort_keys[idx-1]+info->key_length;
290
  }
291
  if (error > 0)
292
    DBUG_RETURN(HA_POS_ERROR);		/* Aborted by get_key */ /* purecov: inspected */
293
  if (buffpek->elements)
294
  {
295
    if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
296
		   tempfile))
297
      DBUG_RETURN(HA_POS_ERROR);		/* purecov: inspected */
298
    *maxbuffer=buffpek->elements-1;
299
  }
300
  else
301
    *maxbuffer=0;
302
303
  DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
304
} /* find_all_keys */
305
306
307
#ifdef THREAD
308
/* Search after all keys and place them in a temp. file */
309
310
pthread_handler_t thr_find_all_keys(void *arg)
311
{
312
  MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
313
  int error;
314
  uint memavl,old_memavl,keys,sort_length;
315
  uint idx, maxbuffer;
316
  uchar **sort_keys=0;
317
318
  error=1;
319
320
  if (my_thread_init())
321
    goto err;
322
323
  { /* Add extra block since DBUG_ENTER declare variables */
324
    DBUG_ENTER("thr_find_all_keys");
325
    DBUG_PRINT("enter", ("master: %d", sort_param->master));
326
    if (sort_param->sort_info->got_error)
327
      goto err;
328
329
    if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
330
    {
331
      sort_param->write_keys=     write_keys_varlen;
332
      sort_param->read_to_buffer= read_to_buffer_varlen;
333
      sort_param->write_key=      write_merge_key_varlen;
334
    }
335
    else
336
    {
337
      sort_param->write_keys=     write_keys;
338
      sort_param->read_to_buffer= read_to_buffer;
339
      sort_param->write_key=      write_merge_key;
340
    }
341
342
    my_b_clear(&sort_param->tempfile);
343
    my_b_clear(&sort_param->tempfile_for_exceptions);
344
    bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
345
    bzero((char*) &sort_param->unique,  sizeof(sort_param->unique));
346
    sort_keys= (uchar **) NULL;
347
348
    memavl=       max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
349
    idx=          (uint)sort_param->sort_info->max_records;
350
    sort_length=  sort_param->key_length;
351
    maxbuffer=    1;
352
353
    while (memavl >= MIN_SORT_MEMORY)
354
    {
355
      if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
356
          (my_off_t) memavl)
357
        keys= idx+1;
358
      else
359
      {
360
        uint skr;
361
        do
362
        {
363
          skr= maxbuffer;
364
          if (memavl < sizeof(BUFFPEK)*maxbuffer ||
365
              (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
366
               (sort_length+sizeof(char*))) <= 1 ||
367
              keys < (uint) maxbuffer)
368
          {
369
            mi_check_print_error(sort_param->sort_info->param,
370
                                 "myisam_sort_buffer_size is too small");
371
            goto err;
372
          }
373
        }
374
        while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
375
      }
376
      if ((sort_keys= (uchar**)
377
           my_malloc(keys*(sort_length+sizeof(char*))+
378
                     ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
379
                      HA_FT_MAXBYTELEN : 0), MYF(0))))
380
      {
381
        if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
382
                                  maxbuffer, maxbuffer/2))
383
        {
384
          my_free((uchar*) sort_keys,MYF(0));
385
          sort_keys= (uchar **) NULL; /* for err: label */
386
        }
387
        else
388
          break;
389
      }
390
      old_memavl= memavl;
391
      if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
392
          old_memavl > MIN_SORT_MEMORY)
393
        memavl= MIN_SORT_MEMORY;
394
    }
395
    if (memavl < MIN_SORT_MEMORY)
396
    {
397
      mi_check_print_error(sort_param->sort_info->param,
398
                           "MyISAM sort buffer too small");
399
      goto err; /* purecov: tested */
400
    }
401
402
    if (sort_param->sort_info->param->testflag & T_VERBOSE)
403
      printf("Key %d - Allocating buffer for %d keys\n",
404
             sort_param->key + 1, keys);
405
    sort_param->sort_keys= sort_keys;
406
407
    idx= error= 0;
408
    sort_keys[0]= (uchar*) (sort_keys+keys);
409
410
    DBUG_PRINT("info", ("reading keys"));
411
    while (!(error= sort_param->sort_info->got_error) &&
412
           !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
413
    {
414
      if (sort_param->real_key_length > sort_param->key_length)
415
      {
416
        if (write_key(sort_param, sort_keys[idx],
417
                      &sort_param->tempfile_for_exceptions))
418
          goto err;
419
        continue;
420
      }
421
422
      if (++idx == keys)
423
      {
424
        if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
425
                                   (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
426
                                   &sort_param->tempfile))
427
          goto err;
428
        sort_keys[0]= (uchar*) (sort_keys+keys);
429
        memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
430
        idx= 1;
431
      }
432
      sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
433
    }
434
    if (error > 0)
435
      goto err;
436
    if (sort_param->buffpek.elements)
437
    {
438
      if (sort_param->write_keys(sort_param, sort_keys, idx,
439
                                 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
440
                                 &sort_param->tempfile))
441
        goto err;
442
      sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
443
    }
444
    else
445
      sort_param->keys= idx;
446
447
    sort_param->sort_keys_length= keys;
448
    goto ok;
449
450
err:
451
    DBUG_PRINT("error", ("got some error"));
452
    sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
453
    if (sort_keys)
454
      my_free((uchar*) sort_keys,MYF(0));
455
    sort_param->sort_keys= 0;
456
    delete_dynamic(& sort_param->buffpek);
457
    close_cached_file(&sort_param->tempfile);
458
    close_cached_file(&sort_param->tempfile_for_exceptions);
459
460
ok:
461
    free_root(&sort_param->wordroot, MYF(0));
462
    /*
463
      Detach from the share if the writer is involved. Avoid others to
464
      be blocked. This includes a flush of the write buffer. This will
465
      also indicate EOF to the readers.
466
    */
467
    if (sort_param->sort_info->info->rec_cache.share)
468
      remove_io_thread(&sort_param->sort_info->info->rec_cache);
469
470
    /* Readers detach from the share if any. Avoid others to be blocked. */
471
    if (sort_param->read_cache.share)
472
      remove_io_thread(&sort_param->read_cache);
473
474
    pthread_mutex_lock(&sort_param->sort_info->mutex);
475
    if (!--sort_param->sort_info->threads_running)
476
      pthread_cond_signal(&sort_param->sort_info->cond);
477
    pthread_mutex_unlock(&sort_param->sort_info->mutex);
478
    DBUG_PRINT("exit", ("======== ending thread ========"));
479
  }
480
  my_thread_end();
481
  return NULL;
482
}
483
484
485
int thr_write_keys(MI_SORT_PARAM *sort_param)
486
{
487
  SORT_INFO *sort_info= sort_param->sort_info;
488
  MI_CHECK *param= sort_info->param;
489
  ulong length= 0, keys;
490
  ulong *rec_per_key_part=param->rec_per_key_part;
491
  int got_error=sort_info->got_error;
492
  uint i;
493
  MI_INFO *info=sort_info->info;
494
  MYISAM_SHARE *share=info->s;
495
  MI_SORT_PARAM *sinfo;
496
  uchar *mergebuf=0;
497
  DBUG_ENTER("thr_write_keys");
498
499
  for (i= 0, sinfo= sort_param ;
500
       i < sort_info->total_keys ;
501
       i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
502
  {
503
    if (!sinfo->sort_keys)
504
    {
505
      got_error=1;
506
      my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
507
              MYF(MY_ALLOW_ZERO_PTR));
508
      continue;
509
    }
510
    if (!got_error)
511
    {
512
      mi_set_key_active(share->state.key_map, sinfo->key);
513
      if (!sinfo->buffpek.elements)
514
      {
515
        if (param->testflag & T_VERBOSE)
516
        {
517
          printf("Key %d  - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
518
          fflush(stdout);
519
        }
520
        if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
521
            flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
522
          got_error=1;
523
      }
524
      if (!got_error && param->testflag & T_STATISTICS)
525
        update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
526
                         param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
527
                         sinfo->notnull: NULL,
528
                         (ulonglong) info->state->records);
529
    }
530
    my_free((uchar*) sinfo->sort_keys,MYF(0));
531
    my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
532
	    MYF(MY_ALLOW_ZERO_PTR));
533
    sinfo->sort_keys=0;
534
  }
535
536
  for (i= 0, sinfo= sort_param ;
537
       i < sort_info->total_keys ;
538
       i++,
539
	 delete_dynamic(&sinfo->buffpek),
540
	 close_cached_file(&sinfo->tempfile),
541
	 close_cached_file(&sinfo->tempfile_for_exceptions),
542
	 sinfo++)
543
  {
544
    if (got_error)
545
      continue;
546
    if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
547
    {
548
      sinfo->write_keys=write_keys_varlen;
549
      sinfo->read_to_buffer=read_to_buffer_varlen;
550
      sinfo->write_key=write_merge_key_varlen;
551
    }
552
    else
553
    {
554
      sinfo->write_keys=write_keys;
555
      sinfo->read_to_buffer=read_to_buffer;
556
      sinfo->write_key=write_merge_key;
557
    }
558
    if (sinfo->buffpek.elements)
559
    {
560
      uint maxbuffer=sinfo->buffpek.elements-1;
561
      if (!mergebuf)
562
      {
563
        length=param->sort_buffer_length;
564
        while (length >= MIN_SORT_MEMORY)
565
        {
566
          if ((mergebuf= my_malloc(length, MYF(0))))
567
              break;
568
          length=length*3/4;
569
        }
570
        if (!mergebuf)
571
        {
572
          got_error=1;
573
          continue;
574
        }
575
      }
576
      keys=length/sinfo->key_length;
577
      if (maxbuffer >= MERGEBUFF2)
578
      {
579
        if (param->testflag & T_VERBOSE)
580
          printf("Key %d  - Merging %u keys\n",sinfo->key+1, sinfo->keys);
581
        if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
582
			    dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
583
			    (int*) &maxbuffer, &sinfo->tempfile))
584
        {
585
          got_error=1;
586
          continue;
587
        }
588
      }
589
      if (flush_io_cache(&sinfo->tempfile) ||
590
          reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
591
      {
592
        got_error=1;
593
        continue;
594
      }
595
      if (param->testflag & T_VERBOSE)
596
        printf("Key %d  - Last merge and dumping keys\n", sinfo->key+1);
597
      if (merge_index(sinfo, keys, (uchar **)mergebuf,
598
                      dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
599
                      maxbuffer,&sinfo->tempfile) ||
600
          flush_ft_buf(sinfo) ||
601
	  flush_pending_blocks(sinfo))
602
      {
603
        got_error=1;
604
        continue;
605
      }
606
    }
607
    if (my_b_inited(&sinfo->tempfile_for_exceptions))
608
    {
609
      uint key_length;
610
611
      if (param->testflag & T_VERBOSE)
612
        printf("Key %d  - Dumping 'long' keys\n", sinfo->key+1);
613
614
      if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
615
          reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
616
      {
617
        got_error=1;
618
        continue;
619
      }
620
621
      while (!got_error &&
622
	     !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
623
			sizeof(key_length)))
624
      {
625
        uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
626
        if (key_length > sizeof(ft_buf) ||
627
            my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
628
                      (uint)key_length) ||
629
            _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
630
                         key_length - info->s->rec_reflength))
631
          got_error=1;
632
      }
633
    }
634
  }
635
  my_free((uchar*) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
636
  DBUG_RETURN(got_error);
637
}
638
#endif /* THREAD */
639
640
        /* Write all keys in memory to file for later merge */
641
642
static int  write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
643
                             uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
644
{
645
  uchar **end;
646
  uint sort_length=info->key_length;
647
  DBUG_ENTER("write_keys");
648
649
  my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
650
            info);
651
  if (!my_b_inited(tempfile) &&
652
      open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
653
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
654
    DBUG_RETURN(1); /* purecov: inspected */
655
656
  buffpek->file_pos=my_b_tell(tempfile);
657
  buffpek->count=count;
658
659
  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
660
  {
661
    if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
662
      DBUG_RETURN(1); /* purecov: inspected */
663
  }
664
  DBUG_RETURN(0);
665
} /* write_keys */
666
667
668
static inline int
669
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
670
{
671
  int err;
672
  uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
673
674
  /* The following is safe as this is a local file */
675
  if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
676
    return (err);
677
  if ((err= my_b_write(to_file,bufs, (uint) len)))
678
    return (err);
679
  return (0);
680
}
681
682
683
static int  write_keys_varlen(MI_SORT_PARAM *info,
684
				    register uchar **sort_keys,
685
                                    uint count, BUFFPEK *buffpek,
686
				    IO_CACHE *tempfile)
687
{
688
  uchar **end;
689
  int err;
690
  DBUG_ENTER("write_keys_varlen");
691
692
  my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
693
            info);
694
  if (!my_b_inited(tempfile) &&
695
      open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
696
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
697
    DBUG_RETURN(1); /* purecov: inspected */
698
699
  buffpek->file_pos=my_b_tell(tempfile);
700
  buffpek->count=count;
701
  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
702
  {
703
    if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
704
      DBUG_RETURN(err);
705
  }
706
  DBUG_RETURN(0);
707
} /* write_keys_varlen */
708
709
710
static int  write_key(MI_SORT_PARAM *info, uchar *key,
711
			    IO_CACHE *tempfile)
712
{
713
  uint key_length=info->real_key_length;
714
  DBUG_ENTER("write_key");
715
716
  if (!my_b_inited(tempfile) &&
717
      open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
718
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
719
    DBUG_RETURN(1);
720
721
  if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
722
      my_b_write(tempfile,(uchar*)key,(uint) key_length))
723
    DBUG_RETURN(1);
724
  DBUG_RETURN(0);
725
} /* write_key */
726
727
728
/* Write index */
729
730
static int  write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
731
                              register uint count)
732
{
733
  DBUG_ENTER("write_index");
734
735
  my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
736
           (qsort2_cmp) info->key_cmp,info);
737
  while (count--)
738
  {
739
    if ((*info->key_write)(info,*sort_keys++))
740
      DBUG_RETURN(-1); /* purecov: inspected */
741
  }
742
  DBUG_RETURN(0);
743
} /* write_index */
744
745
746
        /* Merge buffers to make < MERGEBUFF2 buffers */
747
748
static int  merge_many_buff(MI_SORT_PARAM *info, uint keys,
749
                                  uchar **sort_keys, BUFFPEK *buffpek,
750
                                  int *maxbuffer, IO_CACHE *t_file)
751
{
752
  register int i;
753
  IO_CACHE t_file2, *from_file, *to_file, *temp;
754
  BUFFPEK *lastbuff;
755
  DBUG_ENTER("merge_many_buff");
756
757
  if (*maxbuffer < MERGEBUFF2)
758
    DBUG_RETURN(0);                             /* purecov: inspected */
759
  if (flush_io_cache(t_file) ||
760
      open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
761
                       DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
762
    DBUG_RETURN(1);                             /* purecov: inspected */
763
764
  from_file= t_file ; to_file= &t_file2;
765
  while (*maxbuffer >= MERGEBUFF2)
766
  {
767
    reinit_io_cache(from_file,READ_CACHE,0L,0,0);
768
    reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
769
    lastbuff=buffpek;
770
    for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
771
    {
772
      if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
773
                        buffpek+i,buffpek+i+MERGEBUFF-1))
774
        goto cleanup;
775
    }
776
    if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
777
                      buffpek+i,buffpek+ *maxbuffer))
778
      break; /* purecov: inspected */
779
    if (flush_io_cache(to_file))
780
      break;                                    /* purecov: inspected */
781
    temp=from_file; from_file=to_file; to_file=temp;
782
    *maxbuffer= (int) (lastbuff-buffpek)-1;
783
  }
784
cleanup:
785
  close_cached_file(to_file);                   /* This holds old result */
786
  if (to_file == t_file)
787
    *t_file=t_file2;                            /* Copy result file */
788
789
  DBUG_RETURN(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
790
} /* merge_many_buff */
791
792
793
/*
794
   Read data to buffer
795
796
  SYNOPSIS
797
    read_to_buffer()
798
    fromfile		File to read from
799
    buffpek		Where to read from
800
    sort_length		max length to read
801
  RESULT
802
    > 0	Ammount of bytes read
803
    -1	Error
804
*/
805
806
static uint  read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
807
                                  uint sort_length)
808
{
809
  register uint count;
810
  uint length;
811
812
  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
813
  {
814
    if (my_pread(fromfile->file,(uchar*) buffpek->base,
815
                 (length= sort_length*count),buffpek->file_pos,MYF_RW))
816
      return((uint) -1);                        /* purecov: inspected */
817
    buffpek->key=buffpek->base;
818
    buffpek->file_pos+= length;                 /* New filepos */
819
    buffpek->count-=    count;
820
    buffpek->mem_count= count;
821
  }
822
  return (count*sort_length);
823
} /* read_to_buffer */
824
825
static uint  read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
826
                                         uint sort_length)
827
{
828
  register uint count;
829
  uint16 length_of_key = 0;
830
  uint idx;
831
  uchar *buffp;
832
833
  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
834
  {
835
    buffp = buffpek->base;
836
837
    for (idx=1;idx<=count;idx++)
838
    {
839
      if (my_pread(fromfile->file,(uchar*)&length_of_key,sizeof(length_of_key),
840
                   buffpek->file_pos,MYF_RW))
841
        return((uint) -1);
842
      buffpek->file_pos+=sizeof(length_of_key);
843
      if (my_pread(fromfile->file,(uchar*) buffp,length_of_key,
844
                   buffpek->file_pos,MYF_RW))
845
        return((uint) -1);
846
      buffpek->file_pos+=length_of_key;
847
      buffp = buffp + sort_length;
848
    }
849
    buffpek->key=buffpek->base;
850
    buffpek->count-=    count;
851
    buffpek->mem_count= count;
852
  }
853
  return (count*sort_length);
854
} /* read_to_buffer_varlen */
855
856
857
static int  write_merge_key_varlen(MI_SORT_PARAM *info,
858
					 IO_CACHE *to_file, uchar* key,
859
                                         uint sort_length, uint count)
860
{
861
  uint idx;
862
  uchar *bufs = key;
863
864
  for (idx=1;idx<=count;idx++)
865
  {
866
    int err;
867
    if ((err= my_var_write(info, to_file, bufs)))
868
      return (err);
869
    bufs=bufs+sort_length;
870
  }
871
  return(0);
872
}
873
874
875
static int  write_merge_key(MI_SORT_PARAM *info __attribute__((unused)),
876
				  IO_CACHE *to_file, uchar *key,
877
				  uint sort_length, uint count)
878
{
879
  return my_b_write(to_file, key, (size_t) sort_length*count);
880
}
881
882
/*
883
  Merge buffers to one buffer
884
  If to_file == 0 then use info->key_write
885
*/
886
887
static int
888
merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
889
              IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
890
              BUFFPEK *Fb, BUFFPEK *Tb)
891
{
892
  int error;
893
  uint sort_length,maxcount;
894
  ha_rows count;
895
  my_off_t to_start_filepos= 0;
896
  uchar *strpos;
897
  BUFFPEK *buffpek,**refpek;
898
  QUEUE queue;
899
  volatile int *killed= killed_ptr(info->sort_info->param);
900
  DBUG_ENTER("merge_buffers");
901
902
  count=error=0;
903
  maxcount=keys/((uint) (Tb-Fb) +1);
904
  DBUG_ASSERT(maxcount > 0);
905
  if (to_file)
906
    to_start_filepos=my_b_tell(to_file);
907
  strpos=(uchar*) sort_keys;
908
  sort_length=info->key_length;
909
910
  if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
911
                 (int (*)(void*, uchar *,uchar*)) info->key_cmp,
912
                 (void*) info))
913
    DBUG_RETURN(1); /* purecov: inspected */
914
915
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
916
  {
917
    count+= buffpek->count;
918
    buffpek->base= strpos;
919
    buffpek->max_keys=maxcount;
920
    strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
921
                                                      sort_length));
922
    if (error == -1)
923
      goto err; /* purecov: inspected */
924
    queue_insert(&queue,(uchar*) buffpek);
925
  }
926
927
  while (queue.elements > 1)
928
  {
929
    for (;;)
930
    {
931
      if (*killed)
932
      {
933
        error=1; goto err;
934
      }
935
      buffpek=(BUFFPEK*) queue_top(&queue);
936
      if (to_file)
937
      {
938
        if (info->write_key(info,to_file,(uchar*) buffpek->key,
939
                            (uint) sort_length,1))
940
        {
941
          error=1; goto err; /* purecov: inspected */
942
        }
943
      }
944
      else
945
      {
946
        if ((*info->key_write)(info,(void*) buffpek->key))
947
        {
948
          error=1; goto err; /* purecov: inspected */
949
        }
950
      }
951
      buffpek->key+=sort_length;
952
      if (! --buffpek->mem_count)
953
      {
954
        if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
955
        {
956
          uchar *base=buffpek->base;
957
          uint max_keys=buffpek->max_keys;
958
959
          VOID(queue_remove(&queue,0));
960
961
          /* Put room used by buffer to use in other buffer */
962
          for (refpek= (BUFFPEK**) &queue_top(&queue);
963
               refpek <= (BUFFPEK**) &queue_end(&queue);
964
               refpek++)
965
          {
966
            buffpek= *refpek;
967
            if (buffpek->base+buffpek->max_keys*sort_length == base)
968
            {
969
              buffpek->max_keys+=max_keys;
970
              break;
971
            }
972
            else if (base+max_keys*sort_length == buffpek->base)
973
            {
974
              buffpek->base=base;
975
              buffpek->max_keys+=max_keys;
976
              break;
977
            }
978
          }
979
          break;                /* One buffer have been removed */
980
        }
981
      }
982
      else if (error == -1)
983
        goto err;               /* purecov: inspected */
984
      queue_replaced(&queue);   /* Top element has been replaced */
985
    }
986
  }
987
  buffpek=(BUFFPEK*) queue_top(&queue);
988
  buffpek->base=(uchar *) sort_keys;
989
  buffpek->max_keys=keys;
990
  do
991
  {
992
    if (to_file)
993
    {
994
      if (info->write_key(info,to_file,(uchar*) buffpek->key,
995
                         sort_length,buffpek->mem_count))
996
      {
997
        error=1; goto err; /* purecov: inspected */
998
      }
999
    }
1000
    else
1001
    {
1002
      register uchar *end;
1003
      strpos= buffpek->key;
1004
      for (end=strpos+buffpek->mem_count*sort_length;
1005
           strpos != end ;
1006
           strpos+=sort_length)
1007
      {
1008
        if ((*info->key_write)(info,(void*) strpos))
1009
        {
1010
          error=1; goto err; /* purecov: inspected */
1011
        }
1012
      }
1013
    }
1014
  }
1015
  while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
1016
         error != 0);
1017
1018
  lastbuff->count=count;
1019
  if (to_file)
1020
    lastbuff->file_pos=to_start_filepos;
1021
err:
1022
  delete_queue(&queue);
1023
  DBUG_RETURN(error);
1024
} /* merge_buffers */
1025
1026
1027
        /* Do a merge to output-file (save only positions) */
1028
1029
static int
1030
merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
1031
            BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1032
{
1033
  DBUG_ENTER("merge_index");
1034
  if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1035
                    buffpek+maxbuffer))
1036
    DBUG_RETURN(1); /* purecov: inspected */
1037
  DBUG_RETURN(0);
1038
} /* merge_index */
1039
1040
static int
1041
flush_ft_buf(MI_SORT_PARAM *info)
1042
{
1043
  int err=0;
1044
  if (info->sort_info->ft_buf)
1045
  {
1046
    err=sort_ft_buf_flush(info);
1047
    my_free((uchar*)info->sort_info->ft_buf, MYF(0));
1048
    info->sort_info->ft_buf=0;
1049
  }
1050
  return err;
1051
}
1052