~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/filesort.cc

  • Committer: Monty Taylor
  • Date: 2009-01-09 07:02:24 UTC
  • mto: (779.1.2 devel)
  • mto: This revision was merged to the branch mainline in revision 784.
  • Revision ID: mordred@inaugust.com-20090109070224-prwl5p52mfql3zfw
Split out readline.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
#include <drizzled/table.h>
30
30
#include <drizzled/table_list.h>
31
31
 
32
 
#include <queue>
33
 
 
34
 
using namespace std;
35
 
 
36
32
/* functions defined in this file */
37
33
 
38
34
static char **make_char_array(char **old_pos, register uint32_t fields,
687
683
{
688
684
  register Field *field;
689
685
  register SORT_FIELD *sort_field;
690
 
  size_t length;
 
686
  register uint32_t length;
691
687
 
692
688
  for (sort_field=param->local_sortorder ;
693
689
       sort_field != param->end ;
1057
1053
} /* read_to_buffer */
1058
1054
 
1059
1055
 
1060
 
class compare_functor
 
1056
/**
 
1057
  Put all room used by freed buffer to use in adjacent buffer.
 
1058
 
 
1059
  Note, that we can't simply distribute memory evenly between all buffers,
 
1060
  because new areas must not overlap with old ones.
 
1061
 
 
1062
  @param[in] queue      list of non-empty buffers, without freed buffer
 
1063
  @param[in] reuse      empty buffer
 
1064
  @param[in] key_length key length
 
1065
*/
 
1066
 
 
1067
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint32_t key_length)
1061
1068
{
1062
 
  qsort2_cmp key_compare;
1063
 
  void *key_compare_arg;
1064
 
  public:
1065
 
  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
1066
 
    : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
1067
 
  inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
 
1069
  unsigned char *reuse_end= reuse->base + reuse->max_keys * key_length;
 
1070
  for (uint32_t i= 0; i < queue->elements; ++i)
1068
1071
  {
1069
 
    int val= key_compare(key_compare_arg,
1070
 
                      &i->key, &j->key);
1071
 
    return (val >= 0);
 
1072
    BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
 
1073
    if (bp->base + bp->max_keys * key_length == reuse->base)
 
1074
    {
 
1075
      bp->max_keys+= reuse->max_keys;
 
1076
      return;
 
1077
    }
 
1078
    else if (bp->base == reuse_end)
 
1079
    {
 
1080
      bp->base= reuse->base;
 
1081
      bp->max_keys+= reuse->max_keys;
 
1082
      return;
 
1083
    }
1072
1084
  }
1073
 
};
 
1085
  assert(0);
 
1086
}
1074
1087
 
1075
1088
 
1076
1089
/**
1104
1117
  my_off_t to_start_filepos;
1105
1118
  unsigned char *strpos;
1106
1119
  BUFFPEK *buffpek;
 
1120
  QUEUE queue;
1107
1121
  qsort2_cmp cmp;
1108
1122
  void *first_cmp_arg;
1109
1123
  volatile Session::killed_state *killed= &current_session->killed;
1139
1153
    cmp= get_ptr_compare(sort_length);
1140
1154
    first_cmp_arg= (void*) &sort_length;
1141
1155
  }
1142
 
  priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor > 
1143
 
    queue(compare_functor(cmp, first_cmp_arg));
 
1156
  if (init_queue(&queue, (uint32_t) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
 
1157
                 (queue_compare) cmp, first_cmp_arg))
 
1158
    return(1);                                /* purecov: inspected */
1144
1159
  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1145
1160
  {
1146
1161
    buffpek->base= strpos;
1150
1165
    if (error == -1)
1151
1166
      goto err;                                 /* purecov: inspected */
1152
1167
    buffpek->max_keys= buffpek->mem_count;      // If less data in buffers than expected
1153
 
    queue.push(buffpek);
 
1168
    queue_insert(&queue, (unsigned char*) buffpek);
1154
1169
  }
1155
1170
 
1156
1171
  if (param->unique_buff)
1163
1178
       This is safe as we know that there is always more than one element
1164
1179
       in each block to merge (This is guaranteed by the Unique:: algorithm
1165
1180
    */
1166
 
    buffpek= queue.top();
 
1181
    buffpek= (BUFFPEK*) queue_top(&queue);
1167
1182
    memcpy(param->unique_buff, buffpek->key, rec_length);
1168
1183
    if (my_b_write(to_file, (unsigned char*) buffpek->key, rec_length))
1169
1184
    {
1176
1191
      error= 0;                                       /* purecov: inspected */
1177
1192
      goto end;                                       /* purecov: inspected */
1178
1193
    }
1179
 
    /* Top element has been used */
1180
 
    queue.pop();
1181
 
    queue.push(buffpek);
 
1194
    queue_replaced(&queue);                        // Top element has been used
1182
1195
  }
1183
1196
  else
1184
1197
    cmp= 0;                                        // Not unique
1185
1198
 
1186
 
  while (queue.size() > 1)
 
1199
  while (queue.elements > 1)
1187
1200
  {
1188
1201
    if (*killed)
1189
1202
    {
1191
1204
    }
1192
1205
    for (;;)
1193
1206
    {
1194
 
      buffpek= queue.top();
 
1207
      buffpek= (BUFFPEK*) queue_top(&queue);
1195
1208
      if (cmp)                                        // Remove duplicates
1196
1209
      {
1197
1210
        if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
1226
1239
        if (!(error= (int) read_to_buffer(from_file,buffpek,
1227
1240
                                          rec_length)))
1228
1241
        {
1229
 
          queue.pop();
 
1242
          queue_remove(&queue,0);
 
1243
          reuse_freed_buff(&queue, buffpek, rec_length);
1230
1244
          break;                        /* One buffer have been removed */
1231
1245
        }
1232
1246
        else if (error == -1)
1233
1247
          goto err;                        /* purecov: inspected */
1234
1248
      }
1235
 
      /* Top element has been replaced */
1236
 
      queue.pop();
1237
 
      queue.push(buffpek);
 
1249
      queue_replaced(&queue);              /* Top element has been replaced */
1238
1250
    }
1239
1251
  }
1240
 
  buffpek= queue.top();
 
1252
  buffpek= (BUFFPEK*) queue_top(&queue);
1241
1253
  buffpek->base= sort_buffer;
1242
1254
  buffpek->max_keys= param->keys;
1243
1255
 
1292
1304
  lastbuff->count= cmin(org_max_rows-max_rows, param->max_rows);
1293
1305
  lastbuff->file_pos= to_start_filepos;
1294
1306
err:
 
1307
  delete_queue(&queue);
1295
1308
  return(error);
1296
1309
} /* merge_buffers */
1297
1310
 
1375
1388
      switch (sortorder->result_type) {
1376
1389
      case STRING_RESULT:
1377
1390
        sortorder->length=sortorder->item->max_length;
1378
 
        set_if_smaller(sortorder->length,
1379
 
                       session->variables.max_sort_length);
 
1391
        set_if_smaller(sortorder->length, session->variables.max_sort_length);
1380
1392
        if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1381
1393
        {
1382
1394
          sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1411
1423
      if (sortorder->item->maybe_null)
1412
1424
        length++;                               // Place for NULL marker
1413
1425
    }
1414
 
    set_if_smaller(sortorder->length,
1415
 
                   (size_t)session->variables.max_sort_length);
 
1426
    set_if_smaller(sortorder->length, session->variables.max_sort_length);
1416
1427
    length+=sortorder->length;
1417
1428
  }
1418
1429
  sortorder->field= (Field*) 0;                 // end marker