~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/filesort.cc

  • Committer: Brian Aker
  • Date: 2010-12-10 03:50:07 UTC
  • mto: (1992.4.2 system-tables)
  • mto: This revision was merged to the branch mainline in revision 2001.
  • Revision ID: brian@tangent.org-20101210035007-w8ld6aze3ub0cu2s
Additional cerr output bits for a few classes (Item, Field,...)

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
  Sorts a database
22
22
*/
23
23
 
24
 
#include <config.h>
 
24
#include "config.h"
25
25
 
26
26
#include <float.h>
27
27
#include <limits.h>
28
28
 
29
29
#include <queue>
30
30
#include <algorithm>
31
 
#include <iostream>
32
 
 
33
 
#include <drizzled/drizzled.h>
34
 
#include <drizzled/sql_sort.h>
35
 
#include <drizzled/filesort.h>
36
 
#include <drizzled/error.h>
37
 
#include <drizzled/probes.h>
38
 
#include <drizzled/session.h>
39
 
#include <drizzled/table.h>
40
 
#include <drizzled/table_list.h>
41
 
#include <drizzled/optimizer/range.h>
42
 
#include <drizzled/records.h>
43
 
#include <drizzled/internal/iocache.h>
44
 
#include <drizzled/internal/my_sys.h>
45
 
#include <plugin/myisam/myisam.h>
46
 
#include <drizzled/plugin/transactional_storage_engine.h>
47
 
#include <drizzled/atomics.h>
48
 
#include <drizzled/global_buffer.h>
49
 
 
50
 
#include <drizzled/sort_field.h>
 
31
 
 
32
#include "drizzled/drizzled.h"
 
33
#include "drizzled/sql_sort.h"
 
34
#include "drizzled/filesort.h"
 
35
#include "drizzled/error.h"
 
36
#include "drizzled/probes.h"
 
37
#include "drizzled/session.h"
 
38
#include "drizzled/table.h"
 
39
#include "drizzled/table_list.h"
 
40
#include "drizzled/optimizer/range.h"
 
41
#include "drizzled/records.h"
 
42
#include "drizzled/internal/iocache.h"
 
43
#include "drizzled/internal/my_sys.h"
 
44
#include "plugin/myisam/myisam.h"
 
45
#include "drizzled/plugin/transactional_storage_engine.h"
 
46
#include "drizzled/atomics.h"
 
47
#include "drizzled/global_buffer.h"
51
48
 
52
49
 
53
50
using namespace std;
55
52
namespace drizzled
56
53
{
57
54
 
 
55
 
58
56
/* Defines used by filesort and uniques */
59
57
#define MERGEBUFF               7
60
58
#define MERGEBUFF2              15
134
132
 
135
133
/* functions defined in this file */
136
134
 
137
 
static char **make_char_array(char **old_pos, uint32_t fields,
 
135
static char **make_char_array(char **old_pos, register uint32_t fields,
138
136
                              uint32_t length);
139
137
 
140
138
static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffer_file,
383
381
      Use also the space previously used by string pointers in sort_buffer
384
382
      for temporary key storage.
385
383
    */
386
 
    param.keys=((param.keys*(param.rec_length+sizeof(char*))) / param.rec_length-1);
387
 
 
 
384
    param.keys=((param.keys*(param.rec_length+sizeof(char*))) /
 
385
                param.rec_length-1);
388
386
    maxbuffer--;                                // Offset from 0
389
387
    if (merge_many_buff(&param,(unsigned char*) sort_keys,buffpek_inst,&maxbuffer, &tempfile))
390
388
    {
420
418
 
421
419
  tempfile.close_cached_file();
422
420
  buffpek_pointers.close_cached_file();
423
 
 
424
421
  if (my_b_inited(outfile))
425
422
  {
426
423
    if (flush_io_cache(outfile))
428
425
      error=1;
429
426
    }
430
427
    {
431
 
      internal::my_off_t save_pos= outfile->pos_in_file;
 
428
      internal::my_off_t save_pos=outfile->pos_in_file;
432
429
      /* For following reads */
433
430
      if (outfile->reinit_io_cache(internal::READ_CACHE,0L,0,0))
434
431
      {
437
434
      outfile->end_of_file=save_pos;
438
435
    }
439
436
  }
440
 
 
441
437
  if (error)
442
438
  {
443
439
    my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
456
452
 
457
453
/** Make a array of string pointers. */
458
454
 
459
 
static char **make_char_array(char **old_pos, uint32_t fields,
 
455
static char **make_char_array(char **old_pos, register uint32_t fields,
460
456
                              uint32_t length)
461
457
{
462
 
  char **pos;
 
458
  register char **pos;
463
459
  char *char_pos;
464
460
 
465
461
  if (old_pos ||
566
562
  if (! indexfile && ! quick_select)
567
563
  {
568
564
    next_pos=(unsigned char*) 0;                        /* Find records in sequence */
569
 
    if (file->startTableScan(1))
570
 
      return(HA_POS_ERROR);
 
565
    file->startTableScan(1);
571
566
    file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
572
567
  }
573
568
 
577
572
    if (select->quick->reset())
578
573
      return(HA_POS_ERROR);
579
574
 
580
 
    if (read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1))
581
 
      return(HA_POS_ERROR);
 
575
    read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1);
582
576
  }
583
577
 
584
578
  /* Remember original bitmaps */
689
683
    sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
690
684
    return(HA_POS_ERROR);
691
685
  }
692
 
 
693
 
  if (indexpos && idx && param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
694
 
  {
 
686
  if (indexpos && idx &&
 
687
      param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
695
688
    return(HA_POS_ERROR);
696
 
  }
697
 
 
698
689
  return(my_b_inited(tempfile) ?
699
690
              (ha_rows) (my_b_tell(tempfile)/param->rec_length) :
700
691
              idx);
723
714
    1 Error
724
715
*/
725
716
 
726
 
int SortParam::write_keys(unsigned char **sort_keys, uint32_t count,
 
717
int SortParam::write_keys(register unsigned char **sort_keys, uint32_t count,
727
718
                          internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
728
719
{
729
720
  buffpek buffpek;
788
779
 
789
780
/** Make a sort-key from record. */
790
781
 
791
 
void SortParam::make_sortkey(unsigned char *to, unsigned char *ref_pos)
 
782
void SortParam::make_sortkey(register unsigned char *to, unsigned char *ref_pos)
792
783
{
793
784
  Field *field;
794
785
  SortField *sort_field;
821
812
    {                                           // Item
822
813
      Item *item=sort_field->item;
823
814
      maybe_null= item->maybe_null;
824
 
 
825
815
      switch (sort_field->result_type) {
826
816
      case STRING_RESULT:
827
 
        {
828
 
          const CHARSET_INFO * const cs=item->collation.collation;
829
 
          char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
830
 
          int diff;
831
 
          uint32_t sort_field_length;
 
817
      {
 
818
        const CHARSET_INFO * const cs=item->collation.collation;
 
819
        char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
 
820
        int diff;
 
821
        uint32_t sort_field_length;
832
822
 
 
823
        if (maybe_null)
 
824
          *to++=1;
 
825
        /* All item->str() to use some extra byte for end null.. */
 
826
        String tmp((char*) to,sort_field->length+4,cs);
 
827
        String *res= item->str_result(&tmp);
 
828
        if (!res)
 
829
        {
833
830
          if (maybe_null)
834
 
            *to++=1;
835
 
          /* All item->str() to use some extra byte for end null.. */
836
 
          String tmp((char*) to,sort_field->length+4,cs);
837
 
          String *res= item->str_result(&tmp);
838
 
          if (!res)
839
 
          {
840
 
            if (maybe_null)
841
 
              memset(to-1, 0, sort_field->length+1);
842
 
            else
843
 
            {
844
 
              /*
845
 
                This should only happen during extreme conditions if we run out
846
 
                of memory or have an item marked not null when it can be null.
847
 
                This code is here mainly to avoid a hard crash in this case.
848
 
              */
849
 
              assert(0);
850
 
              memset(to, 0, sort_field->length);        // Avoid crash
851
 
            }
852
 
            break;
853
 
          }
854
 
          length= res->length();
855
 
          sort_field_length= sort_field->length - sort_field->suffix_length;
856
 
          diff=(int) (sort_field_length - length);
857
 
          if (diff < 0)
858
 
          {
859
 
            diff=0;
860
 
            length= sort_field_length;
861
 
          }
862
 
          if (sort_field->suffix_length)
863
 
          {
864
 
            /* Store length last in result_string */
865
 
            store_length(to + sort_field_length, length,
866
 
                         sort_field->suffix_length);
867
 
          }
868
 
          if (sort_field->need_strxnfrm)
869
 
          {
870
 
            char *from=(char*) res->ptr();
871
 
            uint32_t tmp_length;
872
 
            if ((unsigned char*) from == to)
873
 
            {
874
 
              set_if_smaller(length,sort_field->length);
875
 
              memcpy(tmp_buffer,from,length);
876
 
              from= tmp_buffer;
877
 
            }
878
 
            tmp_length= my_strnxfrm(cs,to,sort_field->length,
879
 
                                    (unsigned char*) from, length);
880
 
            assert(tmp_length == sort_field->length);
881
 
          }
 
831
            memset(to-1, 0, sort_field->length+1);
882
832
          else
883
833
          {
884
 
            my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
885
 
            cs->cset->fill(cs, (char *)to+length,diff,fill_char);
 
834
            /*
 
835
              This should only happen during extreme conditions if we run out
 
836
              of memory or have an item marked not null when it can be null.
 
837
              This code is here mainly to avoid a hard crash in this case.
 
838
            */
 
839
            assert(0);
 
840
            memset(to, 0, sort_field->length);  // Avoid crash
886
841
          }
887
842
          break;
888
843
        }
 
844
        length= res->length();
 
845
        sort_field_length= sort_field->length - sort_field->suffix_length;
 
846
        diff=(int) (sort_field_length - length);
 
847
        if (diff < 0)
 
848
        {
 
849
          diff=0;
 
850
          length= sort_field_length;
 
851
        }
 
852
        if (sort_field->suffix_length)
 
853
        {
 
854
          /* Store length last in result_string */
 
855
          store_length(to + sort_field_length, length,
 
856
                       sort_field->suffix_length);
 
857
        }
 
858
        if (sort_field->need_strxnfrm)
 
859
        {
 
860
          char *from=(char*) res->ptr();
 
861
          uint32_t tmp_length;
 
862
          if ((unsigned char*) from == to)
 
863
          {
 
864
            set_if_smaller(length,sort_field->length);
 
865
            memcpy(tmp_buffer,from,length);
 
866
            from= tmp_buffer;
 
867
          }
 
868
          tmp_length= my_strnxfrm(cs,to,sort_field->length,
 
869
                                  (unsigned char*) from, length);
 
870
          assert(tmp_length == sort_field->length);
 
871
        }
 
872
        else
 
873
        {
 
874
          my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
 
875
          cs->cset->fill(cs, (char *)to+length,diff,fill_char);
 
876
        }
 
877
        break;
 
878
      }
889
879
      case INT_RESULT:
890
 
        {
 
880
        {
891
881
          int64_t value= item->val_int_result();
892
882
          if (maybe_null)
893
883
          {
894
 
            *to++=1;
 
884
            *to++=1;
895
885
            if (item->null_value)
896
886
            {
897
887
              if (maybe_null)
903
893
              break;
904
894
            }
905
895
          }
906
 
          to[7]= (unsigned char) value;
907
 
          to[6]= (unsigned char) (value >> 8);
908
 
          to[5]= (unsigned char) (value >> 16);
909
 
          to[4]= (unsigned char) (value >> 24);
910
 
          to[3]= (unsigned char) (value >> 32);
911
 
          to[2]= (unsigned char) (value >> 40);
912
 
          to[1]= (unsigned char) (value >> 48);
 
896
          to[7]= (unsigned char) value;
 
897
          to[6]= (unsigned char) (value >> 8);
 
898
          to[5]= (unsigned char) (value >> 16);
 
899
          to[4]= (unsigned char) (value >> 24);
 
900
          to[3]= (unsigned char) (value >> 32);
 
901
          to[2]= (unsigned char) (value >> 40);
 
902
          to[1]= (unsigned char) (value >> 48);
913
903
          if (item->unsigned_flag)                    /* Fix sign */
914
904
            to[0]= (unsigned char) (value >> 56);
915
905
          else
916
906
            to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
917
 
          break;
918
 
        }
 
907
          break;
 
908
        }
919
909
      case DECIMAL_RESULT:
920
910
        {
921
 
          type::Decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
 
911
          my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
922
912
          if (maybe_null)
923
913
          {
924
914
            if (item->null_value)
929
919
            }
930
920
            *to++=1;
931
921
          }
932
 
          dec_val->val_binary(E_DEC_FATAL_ERROR, to,
933
 
                              item->max_length - (item->decimals ? 1:0),
934
 
                              item->decimals);
935
 
          break;
 
922
          my_decimal2binary(E_DEC_FATAL_ERROR, dec_val, to,
 
923
                            item->max_length - (item->decimals ? 1:0),
 
924
                            item->decimals);
 
925
         break;
936
926
        }
937
927
      case REAL_RESULT:
938
 
        {
 
928
        {
939
929
          double value= item->val_result();
940
 
          if (maybe_null)
 
930
          if (maybe_null)
941
931
          {
942
932
            if (item->null_value)
943
933
            {
945
935
              to++;
946
936
              break;
947
937
            }
948
 
            *to++=1;
 
938
            *to++=1;
949
939
          }
950
 
          change_double_for_sort(value,(unsigned char*) to);
951
 
          break;
952
 
        }
 
940
          change_double_for_sort(value,(unsigned char*) to);
 
941
          break;
 
942
        }
953
943
      case ROW_RESULT:
954
944
      default:
955
 
        // This case should never be choosen
956
 
        assert(0);
957
 
        break;
 
945
        // This case should never be choosen
 
946
        assert(0);
 
947
        break;
958
948
      }
959
949
    }
960
 
 
961
950
    if (sort_field->reverse)
962
951
    {                                                   /* Revers key */
963
952
      if (maybe_null)
970
959
      }
971
960
    }
972
961
    else
973
 
    {
974
962
      to+= sort_field->length;
975
 
    }
976
963
  }
977
964
 
978
965
  if (addon_field)
1021
1008
 
1022
1009
 
1023
1010
/*
1024
 
  fields used by sorting in the sorted table's read set
 
1011
  Register fields used by sorting in the sorted table's read set
1025
1012
*/
1026
1013
 
1027
1014
void SortParam::register_used_fields()
1037
1024
    if ((field= sort_field->field))
1038
1025
    {
1039
1026
      if (field->getTable() == table)
1040
 
        table->setReadSet(field->position());
 
1027
        table->setReadSet(field->field_index);
1041
1028
    }
1042
1029
    else
1043
1030
    {                                           // Item
1051
1038
    sort_addon_field *addonf= addon_field;
1052
1039
    Field *field;
1053
1040
    for ( ; (field= addonf->field) ; addonf++)
1054
 
      table->setReadSet(field->position());
 
1041
      table->setReadSet(field->field_index);
1055
1042
  }
1056
1043
  else
1057
1044
  {
1061
1048
}
1062
1049
 
1063
1050
 
1064
 
bool SortParam::save_index(unsigned char **sort_keys, uint32_t count, filesort_info *table_sort)
 
1051
bool SortParam::save_index(unsigned char **sort_keys, uint32_t count,
 
1052
                           filesort_info *table_sort)
1065
1053
{
1066
1054
  uint32_t offset;
1067
1055
  unsigned char *to;
1068
1056
 
1069
1057
  internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
1070
1058
  offset= rec_length - res_length;
1071
 
 
1072
1059
  if ((ha_rows) count > max_rows)
1073
1060
    count=(uint32_t) max_rows;
1074
 
 
1075
 
  if (!(to= table_sort->record_pointers= (unsigned char*) malloc(res_length*count)))
 
1061
  if (!(to= table_sort->record_pointers=
 
1062
        (unsigned char*) malloc(res_length*count)))
1076
1063
    return true;
1077
1064
 
1078
1065
  for (unsigned char **end_ptr= sort_keys+count ; sort_keys != end_ptr ; sort_keys++)
1080
1067
    memcpy(to, *sort_keys+offset, res_length);
1081
1068
    to+= res_length;
1082
1069
  }
1083
 
 
1084
1070
  return false;
1085
1071
}
1086
1072
 
1104
1090
  from_file= t_file ; to_file= &t_file2;
1105
1091
  while (*maxbuffer >= MERGEBUFF2)
1106
1092
  {
1107
 
    uint32_t i;
 
1093
    register uint32_t i;
1108
1094
 
1109
1095
    if (from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0))
1110
1096
    {
1164
1150
 
1165
1151
uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
1166
1152
{
1167
 
  uint32_t count;
 
1153
  register uint32_t count;
1168
1154
  uint32_t length;
1169
1155
 
1170
1156
  if ((count= (uint32_t) min((ha_rows) buffpek_inst->max_keys,buffpek_inst->count)))
1185
1171
{
1186
1172
  qsort2_cmp key_compare;
1187
1173
  void *key_compare_arg;
1188
 
 
1189
1174
  public:
1190
 
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg) :
1191
 
    key_compare(in_key_compare),
1192
 
    key_compare_arg(in_compare_arg)
1193
 
  { }
1194
 
  
 
1175
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
 
1176
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
1195
1177
  inline bool operator()(const buffpek *i, const buffpek *j) const
1196
1178
  {
1197
 
    int val= key_compare(key_compare_arg, &i->key, &j->key);
1198
 
 
 
1179
    int val= key_compare(key_compare_arg,
 
1180
                      &i->key, &j->key);
1199
1181
    return (val >= 0);
1200
1182
  }
1201
1183
};
1405
1387
    }
1406
1388
    else
1407
1389
    {
1408
 
      unsigned char *end;
 
1390
      register unsigned char *end;
1409
1391
      strpos= buffpek_inst->key+offset;
1410
1392
      for (end= strpos+buffpek_inst->mem_count*rec_length ;
1411
1393
           strpos != end ;
1476
1458
 
1477
1459
uint32_t FileSort::sortlength(SortField *sortorder, uint32_t s_length, bool *multi_byte_charset)
1478
1460
{
1479
 
  uint32_t length;
 
1461
  register uint32_t length;
1480
1462
  const CHARSET_INFO *cs;
1481
1463
  *multi_byte_charset= 0;
1482
1464
 
1504
1486
      sortorder->result_type= sortorder->item->result_type();
1505
1487
      if (sortorder->item->result_as_int64_t())
1506
1488
        sortorder->result_type= INT_RESULT;
1507
 
 
1508
1489
      switch (sortorder->result_type) {
1509
1490
      case STRING_RESULT:
1510
 
        sortorder->length=sortorder->item->max_length;
 
1491
        sortorder->length=sortorder->item->max_length;
1511
1492
        set_if_smaller(sortorder->length,
1512
1493
                       getSession().variables.max_sort_length);
1513
 
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1514
 
        {
 
1494
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
 
1495
        {
1515
1496
          sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1516
 
          sortorder->need_strxnfrm= 1;
1517
 
          *multi_byte_charset= 1;
1518
 
        }
 
1497
          sortorder->need_strxnfrm= 1;
 
1498
          *multi_byte_charset= 1;
 
1499
        }
1519
1500
        else if (cs == &my_charset_bin)
1520
1501
        {
1521
1502
          /* Store length last to be able to sort blob/varbinary */
1522
1503
          sortorder->suffix_length= suffix_length(sortorder->length);
1523
1504
          sortorder->length+= sortorder->suffix_length;
1524
1505
        }
1525
 
        break;
 
1506
        break;
1526
1507
      case INT_RESULT:
1527
 
        sortorder->length=8;                    // Size of intern int64_t
1528
 
        break;
 
1508
        sortorder->length=8;                    // Size of intern int64_t
 
1509
        break;
1529
1510
      case DECIMAL_RESULT:
1530
1511
        sortorder->length=
1531
 
          class_decimal_get_binary_size(sortorder->item->max_length -
 
1512
          my_decimal_get_binary_size(sortorder->item->max_length -
1532
1513
                                     (sortorder->item->decimals ? 1 : 0),
1533
1514
                                     sortorder->item->decimals);
1534
1515
        break;
1535
1516
      case REAL_RESULT:
1536
 
        sortorder->length=sizeof(double);
1537
 
        break;
 
1517
        sortorder->length=sizeof(double);
 
1518
        break;
1538
1519
      case ROW_RESULT:
1539
 
        // This case should never be choosen
1540
 
        assert(0);
1541
 
        break;
 
1520
      default:
 
1521
        // This case should never be choosen
 
1522
        assert(0);
 
1523
        break;
1542
1524
      }
1543
1525
      if (sortorder->item->maybe_null)
1544
 
        length++;                               // Place for NULL marker
 
1526
        length++;                               // Place for NULL marker
1545
1527
    }
1546
1528
    set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
1547
1529
    length+=sortorder->length;