~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/myisam/sort.cc

  • Committer: Brian Aker
  • Date: 2009-03-22 21:27:04 UTC
  • mfrom: (960.2.22 mordred)
  • Revision ID: brian@tangent.org-20090322212704-ysn4mkkjg2u9kv22
Merge Monty

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
#include "myisamdef.h"
22
22
#include <stddef.h>
23
 
#include <mysys/queues.h>
 
23
#include <queue>
24
24
 
25
25
/* static variables */
26
26
 
34
34
#define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
35
35
#define DISK_BUFFER_SIZE (IO_SIZE*16)
36
36
 
 
37
using namespace std;
 
38
 
37
39
 
38
40
/*
39
41
 Pointers of functions for store and read keys from temp file
43
45
 
44
46
/* Functions defined in this file */
45
47
 
46
 
static ha_rows  find_all_keys(MI_SORT_PARAM *info,uint32_t keys,
 
48
extern "C" 
 
49
{
 
50
ha_rows  find_all_keys(MI_SORT_PARAM *info,uint32_t keys,
47
51
                              unsigned char **sort_keys,
48
52
                              DYNAMIC_ARRAY *buffpek,
49
53
                              size_t *maxbuffer,
50
54
                              IO_CACHE *tempfile,
51
55
                              IO_CACHE *tempfile_for_exceptions);
52
 
static int  write_keys(MI_SORT_PARAM *info,unsigned char **sort_keys,
 
56
int  write_keys(MI_SORT_PARAM *info,unsigned char **sort_keys,
53
57
                             uint32_t count, BUFFPEK *buffpek,IO_CACHE *tempfile);
54
 
static int  write_key(MI_SORT_PARAM *info, unsigned char *key,
 
58
int  write_key(MI_SORT_PARAM *info, unsigned char *key,
55
59
                            IO_CACHE *tempfile);
56
 
static int  write_index(MI_SORT_PARAM *info,unsigned char * *sort_keys,
 
60
int  write_index(MI_SORT_PARAM *info,unsigned char * *sort_keys,
57
61
                              uint32_t count);
58
 
static int  merge_many_buff(MI_SORT_PARAM *info,uint32_t keys,
 
62
int  merge_many_buff(MI_SORT_PARAM *info,uint32_t keys,
59
63
                            unsigned char * *sort_keys,
60
64
                            BUFFPEK *buffpek,size_t *maxbuffer,
61
65
                            IO_CACHE *t_file);
62
 
static uint32_t  read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
 
66
uint32_t  read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
63
67
                                  uint32_t sort_length);
64
 
static int  merge_buffers(MI_SORT_PARAM *info,uint32_t keys,
 
68
int  merge_buffers(MI_SORT_PARAM *info,uint32_t keys,
65
69
                                IO_CACHE *from_file, IO_CACHE *to_file,
66
70
                                unsigned char * *sort_keys, BUFFPEK *lastbuff,
67
71
                                BUFFPEK *Fb, BUFFPEK *Tb);
68
 
static int  merge_index(MI_SORT_PARAM *,uint,unsigned char **,BUFFPEK *, int,
 
72
int  merge_index(MI_SORT_PARAM *,uint,unsigned char **,BUFFPEK *, int,
69
73
                              IO_CACHE *);
70
 
 
71
 
static int  write_keys_varlen(MI_SORT_PARAM *info,unsigned char **sort_keys,
72
 
                                    uint32_t count, BUFFPEK *buffpek,
73
 
                                    IO_CACHE *tempfile);
74
 
static uint32_t  read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
75
 
                                         uint32_t sort_length);
76
 
static int  write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
77
 
                                  unsigned char *key, uint32_t sort_length, uint32_t count);
78
 
static int  write_merge_key_varlen(MI_SORT_PARAM *info,
79
 
                                         IO_CACHE *to_file,
80
 
                                         unsigned char* key, uint32_t sort_length,
81
 
                                         uint32_t count);
82
 
static inline int
 
74
int  write_keys_varlen(MI_SORT_PARAM *info,unsigned char **sort_keys,
 
75
                       uint32_t count, BUFFPEK *buffpek,
 
76
                       IO_CACHE *tempfile);
 
77
uint32_t  read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
 
78
                                uint32_t sort_length);
 
79
int  write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
 
80
                     unsigned char *key, uint32_t sort_length, uint32_t count);
 
81
int  write_merge_key_varlen(MI_SORT_PARAM *info,
 
82
                            IO_CACHE *to_file,
 
83
                            unsigned char* key, uint32_t sort_length,
 
84
                            uint32_t count);
 
85
}
 
86
inline int
83
87
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, unsigned char *bufs);
84
88
 
85
89
/*
249
253
 
250
254
/* Search after all keys and place them in a temp. file */
251
255
 
252
 
static ha_rows  find_all_keys(MI_SORT_PARAM *info, uint32_t keys,
 
256
ha_rows  find_all_keys(MI_SORT_PARAM *info, uint32_t keys,
253
257
                              unsigned char **sort_keys,
254
258
                              DYNAMIC_ARRAY *buffpek,
255
259
                              size_t *maxbuffer, IO_CACHE *tempfile,
550
554
        length=param->sort_buffer_length;
551
555
        while (length >= MIN_SORT_MEMORY)
552
556
        {
553
 
          if ((mergebuf= malloc(length)))
 
557
          if ((mergebuf= (unsigned char *)malloc(length)))
554
558
              break;
555
559
          length=length*3/4;
556
560
        }
624
628
 
625
629
        /* Write all keys in memory to file for later merge */
626
630
 
627
 
static int  write_keys(MI_SORT_PARAM *info, register unsigned char **sort_keys,
 
631
int  write_keys(MI_SORT_PARAM *info, register unsigned char **sort_keys,
628
632
                             uint32_t count, BUFFPEK *buffpek, IO_CACHE *tempfile)
629
633
{
630
634
  unsigned char **end;
649
653
} /* write_keys */
650
654
 
651
655
 
652
 
static inline int
 
656
inline int
653
657
my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, unsigned char *bufs)
654
658
{
655
659
  int err;
664
668
}
665
669
 
666
670
 
667
 
static int  write_keys_varlen(MI_SORT_PARAM *info,
 
671
int  write_keys_varlen(MI_SORT_PARAM *info,
668
672
                                    register unsigned char **sort_keys,
669
673
                                    uint32_t count, BUFFPEK *buffpek,
670
674
                                    IO_CACHE *tempfile)
690
694
} /* write_keys_varlen */
691
695
 
692
696
 
693
 
static int  write_key(MI_SORT_PARAM *info, unsigned char *key,
 
697
int  write_key(MI_SORT_PARAM *info, unsigned char *key,
694
698
                            IO_CACHE *tempfile)
695
699
{
696
700
  uint32_t key_length=info->real_key_length;
709
713
 
710
714
/* Write index */
711
715
 
712
 
static int  write_index(MI_SORT_PARAM *info, register unsigned char **sort_keys,
 
716
int  write_index(MI_SORT_PARAM *info, register unsigned char **sort_keys,
713
717
                              register uint32_t count)
714
718
{
715
719
  my_qsort2((unsigned char*) sort_keys,(size_t) count,sizeof(unsigned char*),
725
729
 
726
730
        /* Merge buffers to make < MERGEBUFF2 buffers */
727
731
 
728
 
static int  merge_many_buff(MI_SORT_PARAM *info, uint32_t keys,
 
732
int  merge_many_buff(MI_SORT_PARAM *info, uint32_t keys,
729
733
                            unsigned char **sort_keys, BUFFPEK *buffpek,
730
734
                            size_t *maxbuffer, IO_CACHE *t_file)
731
735
{
782
786
    -1  Error
783
787
*/
784
788
 
785
 
static uint32_t  read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
 
789
uint32_t  read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
786
790
                                  uint32_t sort_length)
787
791
{
788
792
  register uint32_t count;
801
805
  return (count*sort_length);
802
806
} /* read_to_buffer */
803
807
 
804
 
static uint32_t  read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
 
808
uint32_t  read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
805
809
                                         uint32_t sort_length)
806
810
{
807
811
  register uint32_t count;
833
837
} /* read_to_buffer_varlen */
834
838
 
835
839
 
836
 
static int  write_merge_key_varlen(MI_SORT_PARAM *info,
 
840
int  write_merge_key_varlen(MI_SORT_PARAM *info,
837
841
                                         IO_CACHE *to_file, unsigned char* key,
838
842
                                         uint32_t sort_length, uint32_t count)
839
843
{
851
855
}
852
856
 
853
857
 
854
 
static int  write_merge_key(MI_SORT_PARAM *info,
 
858
int  write_merge_key(MI_SORT_PARAM *info,
855
859
                                  IO_CACHE *to_file, unsigned char *key,
856
860
                                  uint32_t sort_length, uint32_t count)
857
861
{
860
864
}
861
865
 
862
866
/*
 
867
 * Function object to be used as the comparison function
 
868
 * for the priority queue in the merge_buffers method.
 
869
 */
 
870
class compare_functor
 
871
{
 
872
  qsort2_cmp key_compare;
 
873
  void *key_compare_arg;
 
874
  public:
 
875
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
 
876
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
 
877
  inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
 
878
  {
 
879
    int val= key_compare(key_compare_arg,
 
880
                      &i->key, &j->key);
 
881
    return (val >= 0);
 
882
  }
 
883
};
 
884
 
 
885
/*
863
886
  Merge buffers to one buffer
864
887
  If to_file == 0 then use info->key_write
865
888
*/
866
889
 
867
 
static int
 
890
int
868
891
merge_buffers(MI_SORT_PARAM *info, uint32_t keys, IO_CACHE *from_file,
869
892
              IO_CACHE *to_file, unsigned char **sort_keys, BUFFPEK *lastbuff,
870
893
              BUFFPEK *Fb, BUFFPEK *Tb)
874
897
  ha_rows count;
875
898
  my_off_t to_start_filepos= 0;
876
899
  unsigned char *strpos;
877
 
  BUFFPEK *buffpek,**refpek;
878
 
  QUEUE queue;
 
900
  BUFFPEK *buffpek;
 
901
  priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor > 
 
902
    queue(compare_functor((qsort2_cmp) info->key_cmp, static_cast<void *>(info)));
879
903
  volatile int *killed= killed_ptr(info->sort_info->param);
880
904
 
881
905
  count=error=0;
886
910
  strpos=(unsigned char*) sort_keys;
887
911
  sort_length=info->key_length;
888
912
 
889
 
 
890
 
  if (init_queue(&queue,(uint32_t) (Tb-Fb)+1,(uint32_t) offsetof(BUFFPEK,key),
891
 
                 false,
892
 
                 (queue_compare) info->key_cmp,
893
 
                 (void*) info))
894
 
    return(1); /* purecov: inspected */
895
 
 
896
913
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
897
914
  {
898
915
    count+= buffpek->count;
902
919
                                                      sort_length));
903
920
    if (error == -1)
904
921
      goto err; /* purecov: inspected */
905
 
    queue_insert(&queue,(unsigned char*) buffpek);
 
922
    queue.push(buffpek);
906
923
  }
907
924
 
908
 
  while (queue.elements > 1)
 
925
  while (queue.size() > 1)
909
926
  {
910
927
    for (;;)
911
928
    {
913
930
      {
914
931
        error=1; goto err;
915
932
      }
916
 
      buffpek=(BUFFPEK*) queue_top(&queue);
 
933
      buffpek= queue.top();
917
934
      if (to_file)
918
935
      {
919
936
        if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
934
951
      {
935
952
        if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
936
953
        {
937
 
          unsigned char *base=buffpek->base;
938
 
          uint32_t max_keys=buffpek->max_keys;
939
 
 
940
 
          queue_remove(&queue,0);
941
 
 
942
 
          /* Put room used by buffer to use in other buffer */
943
 
          for (refpek= (BUFFPEK**) &queue_top(&queue);
944
 
               refpek <= (BUFFPEK**) &queue_end(&queue);
945
 
               refpek++)
946
 
          {
947
 
            buffpek= *refpek;
948
 
            if (buffpek->base+buffpek->max_keys*sort_length == base)
949
 
            {
950
 
              buffpek->max_keys+=max_keys;
951
 
              break;
952
 
            }
953
 
            else if (base+max_keys*sort_length == buffpek->base)
954
 
            {
955
 
              buffpek->base=base;
956
 
              buffpek->max_keys+=max_keys;
957
 
              break;
958
 
            }
959
 
          }
 
954
          queue.pop();
960
955
          break;                /* One buffer have been removed */
961
956
        }
962
957
      }
963
958
      else if (error == -1)
964
959
        goto err;               /* purecov: inspected */
965
 
      queue_replaced(&queue);   /* Top element has been replaced */
 
960
      /* Top element has been replaced */
 
961
      queue.pop();
 
962
      queue.push(buffpek);
966
963
    }
967
964
  }
968
 
  buffpek=(BUFFPEK*) queue_top(&queue);
 
965
  buffpek= queue.top();
969
966
  buffpek->base=(unsigned char *) sort_keys;
970
967
  buffpek->max_keys=keys;
971
968
  do
1000
997
  if (to_file)
1001
998
    lastbuff->file_pos=to_start_filepos;
1002
999
err:
1003
 
  delete_queue(&queue);
1004
1000
  return(error);
1005
1001
} /* merge_buffers */
1006
1002
 
1007
1003
 
1008
1004
        /* Do a merge to output-file (save only positions) */
1009
1005
 
1010
 
static int
 
1006
int
1011
1007
merge_index(MI_SORT_PARAM *info, uint32_t keys, unsigned char **sort_keys,
1012
1008
            BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1013
1009
{