~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/uniques.cc

  • Committer: Brian Aker
  • Date: 2008-10-06 06:47:29 UTC
  • Revision ID: brian@tangent.org-20081006064729-2i9mhjkzyvow9xsm
RemoveĀ uint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
  deletes in disk order.
31
31
*/
32
32
 
33
 
#include "config.h"
34
 
 
35
 
#include <math.h>
36
 
 
37
 
#include <queue>
38
 
 
39
 
#include "drizzled/sql_sort.h"
40
 
#include "drizzled/session.h"
41
 
#include "drizzled/sql_list.h"
42
 
#include "drizzled/internal/iocache.h"
43
 
 
44
 
#if defined(CMATH_NAMESPACE)
45
 
using namespace CMATH_NAMESPACE;
46
 
#endif
47
 
 
48
 
using namespace std;
49
 
 
50
 
namespace drizzled
51
 
{
52
 
 
53
 
int unique_write_to_file(unsigned char* key, uint32_t,
 
33
#include <drizzled/server_includes.h>
 
34
#include "sql_sort.h"
 
35
 
 
36
 
 
37
int unique_write_to_file(unsigned char* key,
 
38
                         element_count count __attribute__((unused)),
54
39
                         Unique *unique)
55
40
{
56
41
  /*
59
44
    when tree implementation chooses to store pointer to key in TREE_ELEMENT
60
45
    (instead of storing the element itself there)
61
46
  */
62
 
  return my_b_write(unique->file, key, unique->size) ? 1 : 0;
 
47
  return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
63
48
}
64
49
 
65
50
int unique_write_to_ptrs(unsigned char* key,
66
 
                         uint32_t, Unique *unique)
 
51
                         element_count count __attribute__((unused)),
 
52
                         Unique *unique)
67
53
{
68
54
  memcpy(unique->record_pointers, key, unique->size);
69
55
  unique->record_pointers+=unique->size;
71
57
}
72
58
 
73
59
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
74
 
               uint32_t size_arg, size_t max_in_memory_size_arg)
75
 
  : max_in_memory_size(max_in_memory_size_arg),
76
 
    file(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
77
 
    size(size_arg),
78
 
    elements(0)
 
60
               uint32_t size_arg, uint64_t max_in_memory_size_arg)
 
61
  :max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
79
62
{
80
 
  my_b_clear(file);
81
 
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, false,
 
63
  my_b_clear(&file);
 
64
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
82
65
            NULL, comp_func_fixed_arg);
83
66
  /* If the following fail's the next add will also fail */
84
67
  my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
87
70
  */
88
71
  max_elements= (ulong) (max_in_memory_size /
89
72
                         ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
90
 
  open_cached_file(file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
 
73
  open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
91
74
                   MYF(MY_WME));
92
75
}
93
76
 
143
126
      total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
144
127
*/
145
128
 
146
 
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
 
129
static double get_merge_buffers_cost(uint32_t *buff_elems __attribute__((unused)),
 
130
                                     uint32_t elem_size,
147
131
                                     uint32_t *first, uint32_t *last)
148
132
{
149
133
  uint32_t total_buf_elems= 0;
280
264
*/
281
265
 
282
266
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
283
 
                            size_t max_in_memory_size)
 
267
                            uint64_t max_in_memory_size)
284
268
{
285
269
  ulong max_elements_in_tree;
286
270
  ulong last_tree_elems;
331
315
 
332
316
Unique::~Unique()
333
317
{
334
 
  close_cached_file(file);
 
318
  close_cached_file(&file);
335
319
  delete_tree(&tree);
336
320
  delete_dynamic(&file_ptrs);
337
321
}
343
327
  BUFFPEK file_ptr;
344
328
  elements+= tree.elements_in_tree;
345
329
  file_ptr.count=tree.elements_in_tree;
346
 
  file_ptr.file_pos=my_b_tell(file);
 
330
  file_ptr.file_pos=my_b_tell(&file);
347
331
 
348
332
  if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
349
333
                (void*) this, left_root_right) ||
372
356
  if (elements)
373
357
  {
374
358
    reset_dynamic(&file_ptrs);
375
 
    reinit_io_cache(file, internal::WRITE_CACHE, 0L, 0, 1);
 
359
    reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
376
360
  }
377
361
  elements= 0;
378
362
}
399
383
}
400
384
#endif
401
385
 
402
 
/*
403
 
 The comparison function object, passed to a priority_queue in merge_walk()
404
 
 as its sort function parameter.
405
 
*/
406
386
 
407
 
class buffpek_compare_functor
408
 
{
409
 
  qsort_cmp2 key_compare;
410
 
  void *key_compare_arg;
411
 
  public:
412
 
  buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
413
 
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
414
 
  inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
415
 
  {
416
 
    return key_compare(key_compare_arg,
417
 
                    i->key, j->key);
418
 
  }
419
 
};
420
387
 
421
388
/*
422
389
  DESCRIPTION
456
423
                       uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
457
424
                       tree_walk_action walk_action, void *walk_action_arg,
458
425
                       qsort_cmp2 compare, void *compare_arg,
459
 
                       internal::IO_CACHE *file)
 
426
                       IO_CACHE *file)
460
427
{
 
428
  BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
 
429
  QUEUE queue;
461
430
  if (end <= begin ||
462
 
      merge_buffer_size < (ulong) (key_length * (end - begin + 1))) 
 
431
      merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
 
432
      init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
 
433
                 buffpek_compare, &compare_context))
463
434
    return 1;
464
 
  priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
465
 
    queue(buffpek_compare_functor(compare, compare_arg));
466
435
  /* we need space for one key when a piece of merge buffer is re-read */
467
436
  merge_buffer_size-= key_length;
468
437
  unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
469
 
  uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
 
438
  uint32_t max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
470
439
                                        key_length);
471
440
  /* if piece_size is aligned reuse_freed_buffer will always hit */
472
441
  uint32_t piece_size= max_key_count_per_piece * key_length;
484
453
    top->base= merge_buffer + (top - begin) * piece_size;
485
454
    top->max_keys= max_key_count_per_piece;
486
455
    bytes_read= read_to_buffer(file, top, key_length);
487
 
    if (bytes_read == (uint32_t) (-1))
 
456
    if (bytes_read == (uint) (-1))
488
457
      goto end;
489
458
    assert(bytes_read);
490
 
    queue.push(top);
 
459
    queue_insert(&queue, (unsigned char *) top);
491
460
  }
492
 
  top= queue.top();
493
 
  while (queue.size() > 1)
 
461
  top= (BUFFPEK *) queue_top(&queue);
 
462
  while (queue.elements > 1)
494
463
  {
495
464
    /*
496
465
      Every iteration one element is removed from the queue, and one is
506
475
    */
507
476
    top->key+= key_length;
508
477
    if (--top->mem_count)
509
 
    {
510
 
      queue.pop();
511
 
      queue.push(top);
512
 
    }
 
478
      queue_replaced(&queue);
513
479
    else /* next piece should be read */
514
480
    {
515
481
      /* save old_key not to overwrite it in read_to_buffer */
516
482
      memcpy(save_key_buff, old_key, key_length);
517
483
      old_key= save_key_buff;
518
484
      bytes_read= read_to_buffer(file, top, key_length);
519
 
      if (bytes_read == (uint32_t) (-1))
 
485
      if (bytes_read == (uint) (-1))
520
486
        goto end;
521
 
      else if (bytes_read > 0) /* top->key, top->mem_count are reset */
522
 
      {                        /* in read_to_buffer */
523
 
        queue.pop();
524
 
        queue.push(top);
525
 
      }
 
487
      else if (bytes_read > 0)      /* top->key, top->mem_count are reset */
 
488
        queue_replaced(&queue);     /* in read_to_buffer */
526
489
      else
527
490
      {
528
491
        /*
529
 
          Tree for old 'top' element is empty: remove it from the queue. 
 
492
          Tree for old 'top' element is empty: remove it from the queue and
 
493
          give all its memory to the nearest tree.
530
494
        */
531
 
        queue.pop();
 
495
        queue_remove(&queue, 0);
 
496
        reuse_freed_buff(&queue, top, key_length);
532
497
      }
533
498
    }
534
 
    top= queue.top();
 
499
    top= (BUFFPEK *) queue_top(&queue);
535
500
    /* new top has been obtained; if old top is unique, apply the action */
536
501
    if (compare(compare_arg, old_key, top->key))
537
502
    {
554
519
    }
555
520
    while (--top->mem_count);
556
521
    bytes_read= read_to_buffer(file, top, key_length);
557
 
    if (bytes_read == (uint32_t) (-1))
 
522
    if (bytes_read == (uint) (-1))
558
523
      goto end;
559
524
  }
560
525
  while (bytes_read);
561
526
  res= 0;
562
527
end:
 
528
  delete_queue(&queue);
563
529
  return res;
564
530
}
565
531
 
576
542
  SYNOPSIS
577
543
    Unique:walk()
578
544
  All params are 'IN':
579
 
    action  function-visitor, typed in include/tree.h
 
545
    action  function-visitor, typed in include/my_tree.h
580
546
            function is called for each unique element
581
547
    arg     argument for visitor, which is passed to it on each call
582
548
  RETURN VALUE
595
561
  /* flush current tree to the file to have some memory for merge buffer */
596
562
  if (flush())
597
563
    return 1;
598
 
  if (flush_io_cache(file) || reinit_io_cache(file, internal::READ_CACHE, 0L, 0, 0))
 
564
  if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
599
565
    return 1;
600
 
  if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
 
566
  if (!(merge_buffer= (unsigned char *) my_malloc((ulong) max_in_memory_size, MYF(0))))
601
567
    return 1;
602
568
  res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
603
569
                  (BUFFPEK *) file_ptrs.buffer,
604
570
                  (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
605
571
                  action, walk_action_arg,
606
 
                  tree.compare, tree.custom_arg, file);
 
572
                  tree.compare, tree.custom_arg, &file);
607
573
  free((char*) merge_buffer);
608
574
  return res;
609
575
}
618
584
  SORTPARAM sort_param;
619
585
  table->sort.found_records=elements+tree.elements_in_tree;
620
586
 
621
 
  if (my_b_tell(file) == 0)
 
587
  if (my_b_tell(&file) == 0)
622
588
  {
623
589
    /* Whole tree is in memory;  Don't use disk if you don't need to */
624
590
    if ((record_pointers=table->sort.record_pointers= (unsigned char*)
625
 
         malloc(size * tree.elements_in_tree)))
 
591
         my_malloc(size * tree.elements_in_tree, MYF(0))))
626
592
    {
627
593
      (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
628
594
                       this, left_root_right);
633
599
  if (flush())
634
600
    return 1;
635
601
 
636
 
  internal::IO_CACHE *outfile=table->sort.io_cache;
 
602
  IO_CACHE *outfile=table->sort.io_cache;
637
603
  BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
638
604
  uint32_t maxbuffer= file_ptrs.elements - 1;
639
605
  unsigned char *sort_buffer;
640
 
  internal::my_off_t save_pos;
 
606
  my_off_t save_pos;
641
607
  bool error=1;
642
608
 
643
609
      /* Open cached file if it isn't open */
644
 
  outfile=table->sort.io_cache= new internal::IO_CACHE;
645
 
  memset(outfile, 0, sizeof(internal::IO_CACHE));
 
610
  outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
 
611
                                MYF(MY_ZEROFILL));
646
612
 
647
 
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
 
613
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
648
614
    return 1;
649
 
  reinit_io_cache(outfile, internal::WRITE_CACHE, 0L, 0, 0);
 
615
  reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
650
616
 
651
617
  memset(&sort_param, 0, sizeof(sort_param));
652
618
  sort_param.max_rows= elements;
653
619
  sort_param.sort_form=table;
654
620
  sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
655
621
    size;
656
 
  sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
 
622
  sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
657
623
  sort_param.not_killable=1;
658
624
 
659
 
  if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
660
 
                                            sort_param.sort_length)))
 
625
  if (!(sort_buffer=(unsigned char*) my_malloc((sort_param.keys+1) *
 
626
                                       sort_param.sort_length,
 
627
                                       MYF(0))))
661
628
    return 1;
662
629
  sort_param.unique_buff= sort_buffer+(sort_param.keys*
663
630
                                       sort_param.sort_length);
667
634
  sort_param.cmp_context.key_compare_arg= tree.custom_arg;
668
635
 
669
636
  /* Merge the buffers to one file, removing duplicates */
670
 
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,file))
671
 
    goto err;
672
 
  if (flush_io_cache(file) ||
673
 
      reinit_io_cache(file,internal::READ_CACHE,0L,0,0))
674
 
    goto err;
675
 
  if (merge_buffers(&sort_param, file, outfile, sort_buffer, file_ptr,
 
637
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
 
638
    goto err;
 
639
  if (flush_io_cache(&file) ||
 
640
      reinit_io_cache(&file,READ_CACHE,0L,0,0))
 
641
    goto err;
 
642
  if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
676
643
                    file_ptr, file_ptr+maxbuffer,0))
677
644
    goto err;
678
645
  error=0;
684
651
 
685
652
  /* Setup io_cache for reading */
686
653
  save_pos=outfile->pos_in_file;
687
 
  if (reinit_io_cache(outfile,internal::READ_CACHE,0L,0,0))
 
654
  if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
688
655
    error=1;
689
656
  outfile->end_of_file=save_pos;
690
657
  return error;
691
658
}
692
 
 
693
 
} /* namespace drizzled */