~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/uniques.cc

  • Committer: Stewart Smith
  • Date: 2008-07-13 06:56:15 UTC
  • mto: (210.1.1 drizzle)
  • mto: This revision was merged to the branch mainline in revision 211.
  • Revision ID: stewart@flamingspork.com-20080713065615-vzok75kgnnviokl9
Move MD5() into a UDF

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
  deletes in disk order.
31
31
*/
32
32
 
33
 
#include "config.h"
34
 
 
35
 
#include <math.h>
36
 
 
37
 
#include <queue>
38
 
 
39
 
#include "drizzled/sql_sort.h"
40
 
#include "drizzled/session.h"
41
 
#include "drizzled/sql_list.h"
42
 
#include "drizzled/internal/iocache.h"
43
 
 
44
 
#if defined(CMATH_NAMESPACE)
45
 
using namespace CMATH_NAMESPACE;
46
 
#endif
47
 
 
48
 
using namespace std;
49
 
 
50
 
namespace drizzled
51
 
{
52
 
 
53
 
int unique_write_to_file(unsigned char* key, uint32_t,
 
33
#include "mysql_priv.h"
 
34
#include "sql_sort.h"
 
35
 
 
36
 
 
37
int unique_write_to_file(uchar* key,
 
38
                         element_count count __attribute__((__unused__)),
54
39
                         Unique *unique)
55
40
{
56
41
  /*
59
44
    when tree implementation chooses to store pointer to key in TREE_ELEMENT
60
45
    (instead of storing the element itself there)
61
46
  */
62
 
  return my_b_write(unique->file, key, unique->size) ? 1 : 0;
 
47
  return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
63
48
}
64
49
 
65
 
int unique_write_to_ptrs(unsigned char* key,
66
 
                         uint32_t, Unique *unique)
 
50
int unique_write_to_ptrs(uchar* key,
 
51
                         element_count count __attribute__((__unused__)),
 
52
                         Unique *unique)
67
53
{
68
54
  memcpy(unique->record_pointers, key, unique->size);
69
55
  unique->record_pointers+=unique->size;
71
57
}
72
58
 
73
59
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
74
 
               uint32_t size_arg, size_t max_in_memory_size_arg)
75
 
  : max_in_memory_size(max_in_memory_size_arg),
76
 
    file(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
77
 
    size(size_arg),
78
 
    elements(0)
 
60
               uint size_arg, ulonglong max_in_memory_size_arg)
 
61
  :max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
79
62
{
80
 
  my_b_clear(file);
81
 
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, false,
 
63
  my_b_clear(&file);
 
64
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
82
65
            NULL, comp_func_fixed_arg);
83
66
  /* If the following fail's the next add will also fail */
84
67
  my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
87
70
  */
88
71
  max_elements= (ulong) (max_in_memory_size /
89
72
                         ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
90
 
  open_cached_file(file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
91
 
                   MYF(MY_WME));
 
73
  VOID(open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
 
74
                   MYF(MY_WME)));
92
75
}
93
76
 
94
77
 
143
126
      total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
144
127
*/
145
128
 
146
 
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
147
 
                                     uint32_t *first, uint32_t *last)
 
129
static double get_merge_buffers_cost(uint *buff_elems __attribute__((__unused__)),
 
130
                                     uint elem_size,
 
131
                                     uint *first, uint *last)
148
132
{
149
 
  uint32_t total_buf_elems= 0;
150
 
  for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
 
133
  uint total_buf_elems= 0;
 
134
  for (uint *pbuf= first; pbuf <= last; pbuf++)
151
135
    total_buf_elems+= *pbuf;
152
136
  *last= total_buf_elems;
153
137
 
186
170
    Cost of merge in disk seeks.
187
171
*/
188
172
 
189
 
static double get_merge_many_buffs_cost(uint32_t *buffer,
190
 
                                        uint32_t maxbuffer, uint32_t max_n_elems,
191
 
                                        uint32_t last_n_elems, int elem_size)
 
173
static double get_merge_many_buffs_cost(uint *buffer,
 
174
                                        uint maxbuffer, uint max_n_elems,
 
175
                                        uint last_n_elems, int elem_size)
192
176
{
193
177
  register int i;
194
178
  double total_cost= 0.0;
195
 
  uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
 
179
  uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
196
180
 
197
181
  /*
198
182
    Set initial state: first maxbuffer sequences contain max_n_elems elements
210
194
  {
211
195
    while (maxbuffer >= MERGEBUFF2)
212
196
    {
213
 
      uint32_t lastbuff= 0;
 
197
      uint lastbuff= 0;
214
198
      for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
215
199
      {
216
200
        total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
279
263
      these will be random seeks.
280
264
*/
281
265
 
282
 
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
283
 
                            size_t max_in_memory_size)
 
266
double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
 
267
                            ulonglong max_in_memory_size)
284
268
{
285
269
  ulong max_elements_in_tree;
286
270
  ulong last_tree_elems;
331
315
 
332
316
Unique::~Unique()
333
317
{
334
 
  close_cached_file(file);
 
318
  close_cached_file(&file);
335
319
  delete_tree(&tree);
336
320
  delete_dynamic(&file_ptrs);
337
321
}
343
327
  BUFFPEK file_ptr;
344
328
  elements+= tree.elements_in_tree;
345
329
  file_ptr.count=tree.elements_in_tree;
346
 
  file_ptr.file_pos=my_b_tell(file);
 
330
  file_ptr.file_pos=my_b_tell(&file);
347
331
 
348
332
  if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
349
333
                (void*) this, left_root_right) ||
350
 
      insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
 
334
      insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
351
335
    return 1;
352
336
  delete_tree(&tree);
353
337
  return 0;
372
356
  if (elements)
373
357
  {
374
358
    reset_dynamic(&file_ptrs);
375
 
    reinit_io_cache(file, internal::WRITE_CACHE, 0L, 0, 1);
 
359
    reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
376
360
  }
377
361
  elements= 0;
378
362
}
384
368
  BUFFPEK.
385
369
*/
386
370
 
387
 
#ifdef __cplusplus
388
 
extern "C" {
389
 
#endif
 
371
C_MODE_START
390
372
 
391
 
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
 
373
static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
392
374
{
393
375
  BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
394
376
  return ctx->key_compare(ctx->key_compare_arg,
395
 
                          *((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
396
 
}
397
 
 
398
 
#ifdef __cplusplus
399
 
}
400
 
#endif
401
 
 
402
 
/*
403
 
 The comparison function object, passed to a priority_queue in merge_walk()
404
 
 as its sort function parameter.
405
 
*/
406
 
 
407
 
class buffpek_compare_functor
408
 
{
409
 
  qsort_cmp2 key_compare;
410
 
  void *key_compare_arg;
411
 
  public:
412
 
  buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
413
 
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
414
 
  inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
415
 
  {
416
 
    return key_compare(key_compare_arg,
417
 
                    i->key, j->key);
418
 
  }
419
 
};
 
377
                          *((uchar **) key_ptr1), *((uchar **)key_ptr2));
 
378
}
 
379
 
 
380
C_MODE_END
 
381
 
420
382
 
421
383
/*
422
384
  DESCRIPTION
452
414
    <> 0  error
453
415
*/
454
416
 
455
 
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
456
 
                       uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
 
417
static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
 
418
                       uint key_length, BUFFPEK *begin, BUFFPEK *end,
457
419
                       tree_walk_action walk_action, void *walk_action_arg,
458
420
                       qsort_cmp2 compare, void *compare_arg,
459
 
                       internal::IO_CACHE *file)
 
421
                       IO_CACHE *file)
460
422
{
 
423
  BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
 
424
  QUEUE queue;
461
425
  if (end <= begin ||
462
 
      merge_buffer_size < (ulong) (key_length * (end - begin + 1))) 
 
426
      merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
 
427
      init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
 
428
                 buffpek_compare, &compare_context))
463
429
    return 1;
464
 
  priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
465
 
    queue(buffpek_compare_functor(compare, compare_arg));
466
430
  /* we need space for one key when a piece of merge buffer is re-read */
467
431
  merge_buffer_size-= key_length;
468
 
  unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
469
 
  uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
 
432
  uchar *save_key_buff= merge_buffer + merge_buffer_size;
 
433
  uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
470
434
                                        key_length);
471
435
  /* if piece_size is aligned reuse_freed_buffer will always hit */
472
 
  uint32_t piece_size= max_key_count_per_piece * key_length;
473
 
  uint32_t bytes_read;               /* to hold return value of read_to_buffer */
 
436
  uint piece_size= max_key_count_per_piece * key_length;
 
437
  uint bytes_read;               /* to hold return value of read_to_buffer */
474
438
  BUFFPEK *top;
475
439
  int res= 1;
476
440
  /*
484
448
    top->base= merge_buffer + (top - begin) * piece_size;
485
449
    top->max_keys= max_key_count_per_piece;
486
450
    bytes_read= read_to_buffer(file, top, key_length);
487
 
    if (bytes_read == (uint32_t) (-1))
 
451
    if (bytes_read == (uint) (-1))
488
452
      goto end;
489
453
    assert(bytes_read);
490
 
    queue.push(top);
 
454
    queue_insert(&queue, (uchar *) top);
491
455
  }
492
 
  top= queue.top();
493
 
  while (queue.size() > 1)
 
456
  top= (BUFFPEK *) queue_top(&queue);
 
457
  while (queue.elements > 1)
494
458
  {
495
459
    /*
496
460
      Every iteration one element is removed from the queue, and one is
506
470
    */
507
471
    top->key+= key_length;
508
472
    if (--top->mem_count)
509
 
    {
510
 
      queue.pop();
511
 
      queue.push(top);
512
 
    }
 
473
      queue_replaced(&queue);
513
474
    else /* next piece should be read */
514
475
    {
515
476
      /* save old_key not to overwrite it in read_to_buffer */
516
477
      memcpy(save_key_buff, old_key, key_length);
517
478
      old_key= save_key_buff;
518
479
      bytes_read= read_to_buffer(file, top, key_length);
519
 
      if (bytes_read == (uint32_t) (-1))
 
480
      if (bytes_read == (uint) (-1))
520
481
        goto end;
521
 
      else if (bytes_read > 0) /* top->key, top->mem_count are reset */
522
 
      {                        /* in read_to_buffer */
523
 
        queue.pop();
524
 
        queue.push(top);
525
 
      }
 
482
      else if (bytes_read > 0)      /* top->key, top->mem_count are reset */
 
483
        queue_replaced(&queue);     /* in read_to_buffer */
526
484
      else
527
485
      {
528
486
        /*
529
 
          Tree for old 'top' element is empty: remove it from the queue. 
 
487
          Tree for old 'top' element is empty: remove it from the queue and
 
488
          give all its memory to the nearest tree.
530
489
        */
531
 
        queue.pop();
 
490
        queue_remove(&queue, 0);
 
491
        reuse_freed_buff(&queue, top, key_length);
532
492
      }
533
493
    }
534
 
    top= queue.top();
 
494
    top= (BUFFPEK *) queue_top(&queue);
535
495
    /* new top has been obtained; if old top is unique, apply the action */
536
496
    if (compare(compare_arg, old_key, top->key))
537
497
    {
554
514
    }
555
515
    while (--top->mem_count);
556
516
    bytes_read= read_to_buffer(file, top, key_length);
557
 
    if (bytes_read == (uint32_t) (-1))
 
517
    if (bytes_read == (uint) (-1))
558
518
      goto end;
559
519
  }
560
520
  while (bytes_read);
561
521
  res= 0;
562
522
end:
 
523
  delete_queue(&queue);
563
524
  return res;
564
525
}
565
526
 
576
537
  SYNOPSIS
577
538
    Unique:walk()
578
539
  All params are 'IN':
579
 
    action  function-visitor, typed in include/tree.h
 
540
    action  function-visitor, typed in include/my_tree.h
580
541
            function is called for each unique element
581
542
    arg     argument for visitor, which is passed to it on each call
582
543
  RETURN VALUE
587
548
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
588
549
{
589
550
  int res;
590
 
  unsigned char *merge_buffer;
 
551
  uchar *merge_buffer;
591
552
 
592
553
  if (elements == 0)                       /* the whole tree is in memory */
593
554
    return tree_walk(&tree, action, walk_action_arg, left_root_right);
595
556
  /* flush current tree to the file to have some memory for merge buffer */
596
557
  if (flush())
597
558
    return 1;
598
 
  if (flush_io_cache(file) || reinit_io_cache(file, internal::READ_CACHE, 0L, 0, 0))
 
559
  if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
599
560
    return 1;
600
 
  if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
 
561
  if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
601
562
    return 1;
602
563
  res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
603
564
                  (BUFFPEK *) file_ptrs.buffer,
604
565
                  (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
605
566
                  action, walk_action_arg,
606
 
                  tree.compare, tree.custom_arg, file);
607
 
  free((char*) merge_buffer);
 
567
                  tree.compare, tree.custom_arg, &file);
 
568
  my_free((char*) merge_buffer, MYF(0));
608
569
  return res;
609
570
}
610
571
 
611
572
/*
612
 
  Modify the Table element so that when one calls init_records()
 
573
  Modify the TABLE element so that when one calls init_records()
613
574
  the rows will be read in priority order.
614
575
*/
615
576
 
616
 
bool Unique::get(Table *table)
 
577
bool Unique::get(TABLE *table)
617
578
{
618
579
  SORTPARAM sort_param;
619
580
  table->sort.found_records=elements+tree.elements_in_tree;
620
581
 
621
 
  if (my_b_tell(file) == 0)
 
582
  if (my_b_tell(&file) == 0)
622
583
  {
623
584
    /* Whole tree is in memory;  Don't use disk if you don't need to */
624
 
    if ((record_pointers=table->sort.record_pointers= (unsigned char*)
625
 
         malloc(size * tree.elements_in_tree)))
 
585
    if ((record_pointers=table->sort.record_pointers= (uchar*)
 
586
         my_malloc(size * tree.elements_in_tree, MYF(0))))
626
587
    {
627
588
      (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
628
589
                       this, left_root_right);
633
594
  if (flush())
634
595
    return 1;
635
596
 
636
 
  internal::IO_CACHE *outfile=table->sort.io_cache;
 
597
  IO_CACHE *outfile=table->sort.io_cache;
637
598
  BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
638
 
  uint32_t maxbuffer= file_ptrs.elements - 1;
639
 
  unsigned char *sort_buffer;
640
 
  internal::my_off_t save_pos;
 
599
  uint maxbuffer= file_ptrs.elements - 1;
 
600
  uchar *sort_buffer;
 
601
  my_off_t save_pos;
641
602
  bool error=1;
642
603
 
643
604
      /* Open cached file if it isn't open */
644
 
  outfile=table->sort.io_cache= new internal::IO_CACHE;
645
 
  memset(outfile, 0, sizeof(internal::IO_CACHE));
 
605
  outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
 
606
                                MYF(MY_ZEROFILL));
646
607
 
647
 
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
 
608
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
648
609
    return 1;
649
 
  reinit_io_cache(outfile, internal::WRITE_CACHE, 0L, 0, 0);
 
610
  reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
650
611
 
651
 
  memset(&sort_param, 0, sizeof(sort_param));
 
612
  bzero((char*) &sort_param,sizeof(sort_param));
652
613
  sort_param.max_rows= elements;
653
614
  sort_param.sort_form=table;
654
615
  sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
655
616
    size;
656
 
  sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
 
617
  sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
657
618
  sort_param.not_killable=1;
658
619
 
659
 
  if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
660
 
                                            sort_param.sort_length)))
 
620
  if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
 
621
                                       sort_param.sort_length,
 
622
                                       MYF(0))))
661
623
    return 1;
662
624
  sort_param.unique_buff= sort_buffer+(sort_param.keys*
663
625
                                       sort_param.sort_length);
667
629
  sort_param.cmp_context.key_compare_arg= tree.custom_arg;
668
630
 
669
631
  /* Merge the buffers to one file, removing duplicates */
670
 
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,file))
671
 
    goto err;
672
 
  if (flush_io_cache(file) ||
673
 
      reinit_io_cache(file,internal::READ_CACHE,0L,0,0))
674
 
    goto err;
675
 
  if (merge_buffers(&sort_param, file, outfile, sort_buffer, file_ptr,
 
632
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
 
633
    goto err;
 
634
  if (flush_io_cache(&file) ||
 
635
      reinit_io_cache(&file,READ_CACHE,0L,0,0))
 
636
    goto err;
 
637
  if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
676
638
                    file_ptr, file_ptr+maxbuffer,0))
677
639
    goto err;
678
640
  error=0;
679
641
err:
680
 
  if (sort_buffer)
681
 
    free(sort_buffer);
 
642
  x_free(sort_buffer);
682
643
  if (flush_io_cache(outfile))
683
644
    error=1;
684
645
 
685
646
  /* Setup io_cache for reading */
686
647
  save_pos=outfile->pos_in_file;
687
 
  if (reinit_io_cache(outfile,internal::READ_CACHE,0L,0,0))
 
648
  if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
688
649
    error=1;
689
650
  outfile->end_of_file=save_pos;
690
651
  return error;
691
652
}
692
 
 
693
 
} /* namespace drizzled */