~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/myisam/sort.cc

Merged Drizzle's Trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
302
302
} /* find_all_keys */
303
303
 
304
304
 
305
 
/* Search after all keys and place them in a temp. file */
306
 
 
307
 
pthread_handler_t thr_find_all_keys(void *arg)
308
 
{
309
 
  MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
310
 
  int error;
311
 
  uint32_t memavl,old_memavl,keys,sort_length;
312
 
  uint32_t idx;
313
 
  size_t maxbuffer;
314
 
  unsigned char **sort_keys=0;
315
 
 
316
 
  error=1;
317
 
 
318
 
  if (internal::my_thread_init())
319
 
    goto err;
320
 
 
321
 
  {
322
 
    if (sort_param->sort_info->got_error)
323
 
      goto err;
324
 
 
325
 
    if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
326
 
    {
327
 
      sort_param->write_keys=     write_keys_varlen;
328
 
      sort_param->read_to_buffer= read_to_buffer_varlen;
329
 
      sort_param->write_key=      write_merge_key_varlen;
330
 
    }
331
 
    else
332
 
    {
333
 
      sort_param->write_keys=     write_keys;
334
 
      sort_param->read_to_buffer= read_to_buffer;
335
 
      sort_param->write_key=      write_merge_key;
336
 
    }
337
 
 
338
 
    my_b_clear(&sort_param->tempfile);
339
 
    my_b_clear(&sort_param->tempfile_for_exceptions);
340
 
    memset(&sort_param->buffpek, 0, sizeof(sort_param->buffpek));
341
 
    memset(&sort_param->unique, 0,  sizeof(sort_param->unique));
342
 
    sort_keys= (unsigned char **) NULL;
343
 
 
344
 
    memavl=       max(sort_param->sortbuff_size, (uint32_t)MIN_SORT_MEMORY);
345
 
    idx=          (uint)sort_param->sort_info->max_records;
346
 
    sort_length=  sort_param->key_length;
347
 
    maxbuffer=    1;
348
 
 
349
 
    while (memavl >= MIN_SORT_MEMORY)
350
 
    {
351
 
      if ((internal::my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
352
 
          (internal::my_off_t) memavl)
353
 
        keys= idx+1;
354
 
      else
355
 
      {
356
 
        uint32_t skr;
357
 
        do
358
 
        {
359
 
          skr= maxbuffer;
360
 
          if (memavl < sizeof(BUFFPEK)*maxbuffer ||
361
 
              (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
362
 
               (sort_length+sizeof(char*))) <= 1 ||
363
 
              keys < maxbuffer)
364
 
          {
365
 
            mi_check_print_error(sort_param->sort_info->param,
366
 
                                 "myisam_sort_buffer_size is too small");
367
 
            goto err;
368
 
          }
369
 
        }
370
 
        while ((maxbuffer= (idx/(keys-1)+1)) != skr);
371
 
      }
372
 
      if ((sort_keys= (unsigned char**)
373
 
           malloc(keys*(sort_length+sizeof(char*)))))
374
 
      {
375
 
        if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
376
 
                                  maxbuffer, maxbuffer/2))
377
 
        {
378
 
          free((unsigned char*) sort_keys);
379
 
          sort_keys= (unsigned char **) NULL; /* for err: label */
380
 
        }
381
 
        else
382
 
          break;
383
 
      }
384
 
      old_memavl= memavl;
385
 
      if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
386
 
          old_memavl > MIN_SORT_MEMORY)
387
 
        memavl= MIN_SORT_MEMORY;
388
 
    }
389
 
    if (memavl < MIN_SORT_MEMORY)
390
 
    {
391
 
      mi_check_print_error(sort_param->sort_info->param,
392
 
                           "MyISAM sort buffer too small");
393
 
      goto err;
394
 
    }
395
 
 
396
 
    if (sort_param->sort_info->param->testflag & T_VERBOSE)
397
 
      printf("Key %d - Allocating buffer for %d keys\n",
398
 
             sort_param->key + 1, keys);
399
 
    sort_param->sort_keys= sort_keys;
400
 
 
401
 
    idx= error= 0;
402
 
    sort_keys[0]= (unsigned char*) (sort_keys+keys);
403
 
 
404
 
    while (!(error= sort_param->sort_info->got_error) &&
405
 
           !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
406
 
    {
407
 
      if (sort_param->real_key_length > sort_param->key_length)
408
 
      {
409
 
        if (write_key(sort_param, sort_keys[idx],
410
 
                      &sort_param->tempfile_for_exceptions))
411
 
          goto err;
412
 
        continue;
413
 
      }
414
 
 
415
 
      if (++idx == keys)
416
 
      {
417
 
        if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
418
 
                                   (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
419
 
                                   &sort_param->tempfile))
420
 
          goto err;
421
 
        sort_keys[0]= (unsigned char*) (sort_keys+keys);
422
 
        memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
423
 
        idx= 1;
424
 
      }
425
 
      sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
426
 
    }
427
 
    if (error > 0)
428
 
      goto err;
429
 
    if (sort_param->buffpek.elements)
430
 
    {
431
 
      if (sort_param->write_keys(sort_param, sort_keys, idx,
432
 
                                 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
433
 
                                 &sort_param->tempfile))
434
 
        goto err;
435
 
      sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
436
 
    }
437
 
    else
438
 
      sort_param->keys= idx;
439
 
 
440
 
    sort_param->sort_keys_length= keys;
441
 
    goto ok;
442
 
 
443
 
err:
444
 
    sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
445
 
    if (sort_keys)
446
 
      free((unsigned char*) sort_keys);
447
 
    sort_param->sort_keys= 0;
448
 
    delete_dynamic(& sort_param->buffpek);
449
 
    close_cached_file(&sort_param->tempfile);
450
 
    close_cached_file(&sort_param->tempfile_for_exceptions);
451
 
 
452
 
ok:
453
 
    sort_param->wordroot.free_root(MYF(0));
454
 
    /*
455
 
      Detach from the share if the writer is involved. Avoid others to
456
 
      be blocked. This includes a flush of the write buffer. This will
457
 
      also indicate EOF to the readers.
458
 
    */
459
 
    if (sort_param->sort_info->info->rec_cache.share)
460
 
      remove_io_thread(&sort_param->sort_info->info->rec_cache);
461
 
 
462
 
    /* Readers detach from the share if any. Avoid others to be blocked. */
463
 
    if (sort_param->read_cache.share)
464
 
      remove_io_thread(&sort_param->read_cache);
465
 
 
466
 
    pthread_mutex_lock(&sort_param->sort_info->mutex);
467
 
    if (!--sort_param->sort_info->threads_running)
468
 
      pthread_cond_signal(&sort_param->sort_info->cond);
469
 
    pthread_mutex_unlock(&sort_param->sort_info->mutex);
470
 
  }
471
 
  internal::my_thread_end();
472
 
  return NULL;
473
 
}
474
 
 
475
 
 
476
305
int thr_write_keys(MI_SORT_PARAM *sort_param)
477
306
{
478
307
  SORT_INFO *sort_info= sort_param->sort_info;