~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/uniques.cc

  • Committer: Brian Aker
  • Date: 2010-01-22 00:53:13 UTC
  • Revision ID: brian@gaz-20100122005313-jmizcbcdi1lt4tcx
Revert db patch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
  deletes in disk order.
31
31
*/
32
32
 
33
 
#include <drizzled/server_includes.h>
34
 
#include "sql_sort.h"
35
 
 
36
 
 
37
 
int unique_write_to_file(uchar* key,
38
 
                         element_count count __attribute__((unused)),
 
33
#include "config.h"
 
34
 
 
35
#include <math.h>
 
36
 
 
37
#include <queue>
 
38
 
 
39
#include "drizzled/sql_sort.h"
 
40
#include "drizzled/session.h"
 
41
#include "drizzled/sql_list.h"
 
42
#include "drizzled/internal/iocache.h"
 
43
 
 
44
#if defined(CMATH_NAMESPACE)
 
45
using namespace CMATH_NAMESPACE;
 
46
#endif
 
47
 
 
48
using namespace drizzled;
 
49
using namespace std;
 
50
 
 
51
 
 
52
int unique_write_to_file(unsigned char* key, uint32_t,
39
53
                         Unique *unique)
40
54
{
41
55
  /*
44
58
    when tree implementation chooses to store pointer to key in TREE_ELEMENT
45
59
    (instead of storing the element itself there)
46
60
  */
47
 
  return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
 
61
  return my_b_write(unique->file, key, unique->size) ? 1 : 0;
48
62
}
49
63
 
50
 
int unique_write_to_ptrs(uchar* key,
51
 
                         element_count count __attribute__((unused)),
52
 
                         Unique *unique)
 
64
int unique_write_to_ptrs(unsigned char* key,
 
65
                         uint32_t, Unique *unique)
53
66
{
54
67
  memcpy(unique->record_pointers, key, unique->size);
55
68
  unique->record_pointers+=unique->size;
57
70
}
58
71
 
59
72
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
60
 
               uint size_arg, uint64_t max_in_memory_size_arg)
61
 
  :max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
 
73
               uint32_t size_arg, size_t max_in_memory_size_arg)
 
74
  : max_in_memory_size(max_in_memory_size_arg),
 
75
    file(static_cast<IO_CACHE *>(memory::sql_calloc(sizeof(IO_CACHE)))),
 
76
    size(size_arg),
 
77
    elements(0)
62
78
{
63
 
  my_b_clear(&file);
64
 
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
 
79
  my_b_clear(file);
 
80
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, false,
65
81
            NULL, comp_func_fixed_arg);
66
82
  /* If the following fail's the next add will also fail */
67
83
  my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
70
86
  */
71
87
  max_elements= (ulong) (max_in_memory_size /
72
88
                         ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
73
 
  VOID(open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
74
 
                   MYF(MY_WME)));
 
89
  open_cached_file(file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
 
90
                   MYF(MY_WME));
75
91
}
76
92
 
77
93
 
126
142
      total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
127
143
*/
128
144
 
129
 
static double get_merge_buffers_cost(uint *buff_elems __attribute__((unused)),
130
 
                                     uint elem_size,
131
 
                                     uint *first, uint *last)
 
145
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
 
146
                                     uint32_t *first, uint32_t *last)
132
147
{
133
 
  uint total_buf_elems= 0;
134
 
  for (uint *pbuf= first; pbuf <= last; pbuf++)
 
148
  uint32_t total_buf_elems= 0;
 
149
  for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
135
150
    total_buf_elems+= *pbuf;
136
151
  *last= total_buf_elems;
137
152
 
170
185
    Cost of merge in disk seeks.
171
186
*/
172
187
 
173
 
static double get_merge_many_buffs_cost(uint *buffer,
174
 
                                        uint maxbuffer, uint max_n_elems,
175
 
                                        uint last_n_elems, int elem_size)
 
188
static double get_merge_many_buffs_cost(uint32_t *buffer,
 
189
                                        uint32_t maxbuffer, uint32_t max_n_elems,
 
190
                                        uint32_t last_n_elems, int elem_size)
176
191
{
177
192
  register int i;
178
193
  double total_cost= 0.0;
179
 
  uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
 
194
  uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
180
195
 
181
196
  /*
182
197
    Set initial state: first maxbuffer sequences contain max_n_elems elements
194
209
  {
195
210
    while (maxbuffer >= MERGEBUFF2)
196
211
    {
197
 
      uint lastbuff= 0;
 
212
      uint32_t lastbuff= 0;
198
213
      for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
199
214
      {
200
215
        total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
263
278
      these will be random seeks.
264
279
*/
265
280
 
266
 
double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
267
 
                            uint64_t max_in_memory_size)
 
281
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
 
282
                            size_t max_in_memory_size)
268
283
{
269
284
  ulong max_elements_in_tree;
270
285
  ulong last_tree_elems;
315
330
 
316
331
Unique::~Unique()
317
332
{
318
 
  close_cached_file(&file);
 
333
  close_cached_file(file);
319
334
  delete_tree(&tree);
320
335
  delete_dynamic(&file_ptrs);
321
336
}
327
342
  BUFFPEK file_ptr;
328
343
  elements+= tree.elements_in_tree;
329
344
  file_ptr.count=tree.elements_in_tree;
330
 
  file_ptr.file_pos=my_b_tell(&file);
 
345
  file_ptr.file_pos=my_b_tell(file);
331
346
 
332
347
  if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
333
348
                (void*) this, left_root_right) ||
334
 
      insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
 
349
      insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
335
350
    return 1;
336
351
  delete_tree(&tree);
337
352
  return 0;
356
371
  if (elements)
357
372
  {
358
373
    reset_dynamic(&file_ptrs);
359
 
    reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
 
374
    reinit_io_cache(file, WRITE_CACHE, 0L, 0, 1);
360
375
  }
361
376
  elements= 0;
362
377
}
368
383
  BUFFPEK.
369
384
*/
370
385
 
371
 
C_MODE_START
 
386
#ifdef __cplusplus
 
387
extern "C" {
 
388
#endif
372
389
 
373
 
static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
 
390
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
374
391
{
375
392
  BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
376
393
  return ctx->key_compare(ctx->key_compare_arg,
377
 
                          *((uchar **) key_ptr1), *((uchar **)key_ptr2));
378
 
}
379
 
 
380
 
C_MODE_END
381
 
 
 
394
                          *((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
 
395
}
 
396
 
 
397
#ifdef __cplusplus
 
398
}
 
399
#endif
 
400
 
 
401
/*
 
402
 The comparison function object, passed to a priority_queue in merge_walk()
 
403
 as its sort function parameter.
 
404
*/
 
405
 
 
406
class buffpek_compare_functor
 
407
{
 
408
  qsort_cmp2 key_compare;
 
409
  void *key_compare_arg;
 
410
  public:
 
411
  buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
 
412
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
 
413
  inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
 
414
  {
 
415
    return key_compare(key_compare_arg,
 
416
                    i->key, j->key);
 
417
  }
 
418
};
382
419
 
383
420
/*
384
421
  DESCRIPTION
414
451
    <> 0  error
415
452
*/
416
453
 
417
 
static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
418
 
                       uint key_length, BUFFPEK *begin, BUFFPEK *end,
 
454
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
 
455
                       uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
419
456
                       tree_walk_action walk_action, void *walk_action_arg,
420
457
                       qsort_cmp2 compare, void *compare_arg,
421
458
                       IO_CACHE *file)
422
459
{
423
 
  BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
424
 
  QUEUE queue;
425
460
  if (end <= begin ||
426
 
      merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
427
 
      init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
428
 
                 buffpek_compare, &compare_context))
 
461
      merge_buffer_size < (ulong) (key_length * (end - begin + 1))) 
429
462
    return 1;
 
463
  priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
 
464
    queue(buffpek_compare_functor(compare, compare_arg));
430
465
  /* we need space for one key when a piece of merge buffer is re-read */
431
466
  merge_buffer_size-= key_length;
432
 
  uchar *save_key_buff= merge_buffer + merge_buffer_size;
433
 
  uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
 
467
  unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
 
468
  uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
434
469
                                        key_length);
435
470
  /* if piece_size is aligned reuse_freed_buffer will always hit */
436
 
  uint piece_size= max_key_count_per_piece * key_length;
437
 
  uint bytes_read;               /* to hold return value of read_to_buffer */
 
471
  uint32_t piece_size= max_key_count_per_piece * key_length;
 
472
  uint32_t bytes_read;               /* to hold return value of read_to_buffer */
438
473
  BUFFPEK *top;
439
474
  int res= 1;
440
475
  /*
448
483
    top->base= merge_buffer + (top - begin) * piece_size;
449
484
    top->max_keys= max_key_count_per_piece;
450
485
    bytes_read= read_to_buffer(file, top, key_length);
451
 
    if (bytes_read == (uint) (-1))
 
486
    if (bytes_read == (uint32_t) (-1))
452
487
      goto end;
453
488
    assert(bytes_read);
454
 
    queue_insert(&queue, (uchar *) top);
 
489
    queue.push(top);
455
490
  }
456
 
  top= (BUFFPEK *) queue_top(&queue);
457
 
  while (queue.elements > 1)
 
491
  top= queue.top();
 
492
  while (queue.size() > 1)
458
493
  {
459
494
    /*
460
495
      Every iteration one element is removed from the queue, and one is
470
505
    */
471
506
    top->key+= key_length;
472
507
    if (--top->mem_count)
473
 
      queue_replaced(&queue);
 
508
    {
 
509
      queue.pop();
 
510
      queue.push(top);
 
511
    }
474
512
    else /* next piece should be read */
475
513
    {
476
514
      /* save old_key not to overwrite it in read_to_buffer */
477
515
      memcpy(save_key_buff, old_key, key_length);
478
516
      old_key= save_key_buff;
479
517
      bytes_read= read_to_buffer(file, top, key_length);
480
 
      if (bytes_read == (uint) (-1))
 
518
      if (bytes_read == (uint32_t) (-1))
481
519
        goto end;
482
 
      else if (bytes_read > 0)      /* top->key, top->mem_count are reset */
483
 
        queue_replaced(&queue);     /* in read_to_buffer */
 
520
      else if (bytes_read > 0) /* top->key, top->mem_count are reset */
 
521
      {                        /* in read_to_buffer */
 
522
        queue.pop();
 
523
        queue.push(top);
 
524
      }
484
525
      else
485
526
      {
486
527
        /*
487
 
          Tree for old 'top' element is empty: remove it from the queue and
488
 
          give all its memory to the nearest tree.
 
528
          Tree for old 'top' element is empty: remove it from the queue. 
489
529
        */
490
 
        queue_remove(&queue, 0);
491
 
        reuse_freed_buff(&queue, top, key_length);
 
530
        queue.pop();
492
531
      }
493
532
    }
494
 
    top= (BUFFPEK *) queue_top(&queue);
 
533
    top= queue.top();
495
534
    /* new top has been obtained; if old top is unique, apply the action */
496
535
    if (compare(compare_arg, old_key, top->key))
497
536
    {
514
553
    }
515
554
    while (--top->mem_count);
516
555
    bytes_read= read_to_buffer(file, top, key_length);
517
 
    if (bytes_read == (uint) (-1))
 
556
    if (bytes_read == (uint32_t) (-1))
518
557
      goto end;
519
558
  }
520
559
  while (bytes_read);
521
560
  res= 0;
522
561
end:
523
 
  delete_queue(&queue);
524
562
  return res;
525
563
}
526
564
 
548
586
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
549
587
{
550
588
  int res;
551
 
  uchar *merge_buffer;
 
589
  unsigned char *merge_buffer;
552
590
 
553
591
  if (elements == 0)                       /* the whole tree is in memory */
554
592
    return tree_walk(&tree, action, walk_action_arg, left_root_right);
556
594
  /* flush current tree to the file to have some memory for merge buffer */
557
595
  if (flush())
558
596
    return 1;
559
 
  if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
 
597
  if (flush_io_cache(file) || reinit_io_cache(file, READ_CACHE, 0L, 0, 0))
560
598
    return 1;
561
 
  if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
 
599
  if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
562
600
    return 1;
563
601
  res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
564
602
                  (BUFFPEK *) file_ptrs.buffer,
565
603
                  (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
566
604
                  action, walk_action_arg,
567
 
                  tree.compare, tree.custom_arg, &file);
568
 
  my_free((char*) merge_buffer, MYF(0));
 
605
                  tree.compare, tree.custom_arg, file);
 
606
  free((char*) merge_buffer);
569
607
  return res;
570
608
}
571
609
 
572
610
/*
573
 
  Modify the TABLE element so that when one calls init_records()
 
611
  Modify the Table element so that when one calls init_records()
574
612
  the rows will be read in priority order.
575
613
*/
576
614
 
577
 
bool Unique::get(TABLE *table)
 
615
bool Unique::get(Table *table)
578
616
{
579
617
  SORTPARAM sort_param;
580
618
  table->sort.found_records=elements+tree.elements_in_tree;
581
619
 
582
 
  if (my_b_tell(&file) == 0)
 
620
  if (my_b_tell(file) == 0)
583
621
  {
584
622
    /* Whole tree is in memory;  Don't use disk if you don't need to */
585
 
    if ((record_pointers=table->sort.record_pointers= (uchar*)
586
 
         my_malloc(size * tree.elements_in_tree, MYF(0))))
 
623
    if ((record_pointers=table->sort.record_pointers= (unsigned char*)
 
624
         malloc(size * tree.elements_in_tree)))
587
625
    {
588
626
      (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
589
627
                       this, left_root_right);
596
634
 
597
635
  IO_CACHE *outfile=table->sort.io_cache;
598
636
  BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
599
 
  uint maxbuffer= file_ptrs.elements - 1;
600
 
  uchar *sort_buffer;
 
637
  uint32_t maxbuffer= file_ptrs.elements - 1;
 
638
  unsigned char *sort_buffer;
601
639
  my_off_t save_pos;
602
640
  bool error=1;
603
641
 
604
642
      /* Open cached file if it isn't open */
605
 
  outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
606
 
                                MYF(MY_ZEROFILL));
 
643
  outfile=table->sort.io_cache= new IO_CACHE;
 
644
  memset(outfile, 0, sizeof(IO_CACHE));
607
645
 
608
 
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
 
646
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
609
647
    return 1;
610
 
  reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
 
648
  reinit_io_cache(outfile, WRITE_CACHE, 0L, 0, 0);
611
649
 
612
650
  memset(&sort_param, 0, sizeof(sort_param));
613
651
  sort_param.max_rows= elements;
614
652
  sort_param.sort_form=table;
615
653
  sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
616
654
    size;
617
 
  sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
 
655
  sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
618
656
  sort_param.not_killable=1;
619
657
 
620
 
  if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
621
 
                                       sort_param.sort_length,
622
 
                                       MYF(0))))
 
658
  if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
 
659
                                            sort_param.sort_length)))
623
660
    return 1;
624
661
  sort_param.unique_buff= sort_buffer+(sort_param.keys*
625
662
                                       sort_param.sort_length);
629
666
  sort_param.cmp_context.key_compare_arg= tree.custom_arg;
630
667
 
631
668
  /* Merge the buffers to one file, removing duplicates */
632
 
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
633
 
    goto err;
634
 
  if (flush_io_cache(&file) ||
635
 
      reinit_io_cache(&file,READ_CACHE,0L,0,0))
636
 
    goto err;
637
 
  if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
 
669
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,file))
 
670
    goto err;
 
671
  if (flush_io_cache(file) ||
 
672
      reinit_io_cache(file,READ_CACHE,0L,0,0))
 
673
    goto err;
 
674
  if (merge_buffers(&sort_param, file, outfile, sort_buffer, file_ptr,
638
675
                    file_ptr, file_ptr+maxbuffer,0))
639
676
    goto err;
640
677
  error=0;
641
678
err:
642
 
  x_free(sort_buffer);
 
679
  if (sort_buffer)
 
680
    free(sort_buffer);
643
681
  if (flush_io_cache(outfile))
644
682
    error=1;
645
683