1
/* Copyright (C) 2000-2006 MySQL AB
1
/* Copyright (C) 2000-2006 MySQL AB
3
3
This program is free software; you can redistribute it and/or modify
4
4
it under the terms of the GNU General Public License as published by
24
24
#include <drizzled/server_includes.h>
26
#include <drizzled/drizzled_error_messages.h>
28
/* functions defined in this file */
25
#include <drizzled/sql_sort.h>
26
#include <drizzled/error.h>
27
#include <drizzled/probes.h>
28
#include <drizzled/session.h>
29
#include <drizzled/table.h>
30
#include <drizzled/table_list.h>
37
/* functions defined in this file */
30
39
static char **make_char_array(char **old_pos, register uint32_t fields,
31
uint32_t length, myf my_flag);
32
41
static unsigned char *read_buffpek_from_file(IO_CACHE *buffer_file, uint32_t count,
33
42
unsigned char *buf);
34
43
static ha_rows find_all_keys(SORTPARAM *param,SQL_SELECT *select,
43
52
uint32_t maxbuffer,IO_CACHE *tempfile,
44
53
IO_CACHE *outfile);
45
static bool save_index(SORTPARAM *param,unsigned char **sort_keys, uint32_t count,
54
static bool save_index(SORTPARAM *param,unsigned char **sort_keys, uint32_t count,
46
55
filesort_info_st *table_sort);
47
56
static uint32_t suffix_length(uint32_t string_length);
48
static uint32_t sortlength(THD *thd, SORT_FIELD *sortorder, uint32_t s_length,
57
static uint32_t sortlength(Session *session, SORT_FIELD *sortorder, uint32_t s_length,
49
58
bool *multi_byte_charset);
50
static SORT_ADDON_FIELD *get_addon_fields(THD *thd, Field **ptabfield,
59
static SORT_ADDON_FIELD *get_addon_fields(Session *session, Field **ptabfield,
51
60
uint32_t sortlength, uint32_t *plength);
52
61
static void unpack_addon_fields(struct st_sort_addon_field *addon_field,
53
62
unsigned char *buff);
63
72
The result set is stored in table->io_cache or
64
73
table->record_pointers.
66
@param thd Current thread
75
@param session Current thread
67
76
@param table Table to sort
68
77
@param sortorder How to sort the table
69
78
@param s_length Number of elements in sortorder
87
96
examined_rows will be set to number of examined rows
90
ha_rows filesort(THD *thd, Table *table, SORT_FIELD *sortorder, uint32_t s_length,
99
ha_rows filesort(Session *session, Table *table, SORT_FIELD *sortorder, uint32_t s_length,
91
100
SQL_SELECT *select, ha_rows max_rows,
92
101
bool sort_positions, ha_rows *examined_rows)
98
107
ha_rows records= HA_POS_ERROR;
99
108
unsigned char **sort_keys= 0;
100
IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
109
IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
102
111
bool multi_byte_charset;
111
120
Release InnoDB's adaptive hash index latch (if holding) before
114
ha_release_temporary_latches(thd);
123
ha_release_temporary_latches(session);
117
Don't use table->sort in filesort as it is also used by
118
QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end
126
Don't use table->sort in filesort as it is also used by
127
QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end
119
128
when index_merge select has finished with it.
121
130
memcpy(&table_sort, &table->sort, sizeof(filesort_info_st));
122
131
table->sort.io_cache= NULL;
124
133
outfile= table_sort.io_cache;
125
134
my_b_clear(&tempfile);
126
135
my_b_clear(&buffpek_pointers);
129
138
memset(¶m, 0, sizeof(param));
130
param.sort_length= sortlength(thd, sortorder, s_length, &multi_byte_charset);
139
param.sort_length= sortlength(session, sortorder, s_length, &multi_byte_charset);
131
140
param.ref_length= table->file->ref_length;
132
141
param.addon_field= 0;
133
142
param.addon_length= 0;
134
143
if (!(table->file->ha_table_flags() & HA_FAST_KEY_READ) && !sort_positions)
137
Get the descriptors of all fields whose values are appended
146
Get the descriptors of all fields whose values are appended
138
147
to sorted fields and get its total length in param.spack_length.
140
param.addon_field= get_addon_fields(thd, table->field,
149
param.addon_field= get_addon_fields(session, table->field,
141
150
param.sort_length,
142
151
¶m.addon_length);
149
158
if (param.addon_field)
151
160
param.res_length= param.addon_length;
152
if (!(table_sort.addon_buf= (unsigned char *) my_malloc(param.addon_length,
161
if (!(table_sort.addon_buf= (unsigned char *) malloc(param.addon_length)))
158
166
param.res_length= param.ref_length;
160
The reference to the record is considered
168
The reference to the record is considered
161
169
as an additional sorted field
163
171
param.sort_length+= param.ref_length;
168
176
if (select && select->quick)
170
status_var_increment(thd->status_var.filesort_range_count);
178
status_var_increment(session->status_var.filesort_range_count);
174
status_var_increment(thd->status_var.filesort_scan_count);
182
status_var_increment(session->status_var.filesort_scan_count);
176
184
#ifdef CAN_TRUST_RANGE
177
185
if (select && select->quick && select->quick->records > 0L)
179
records=cmin((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
180
table->file->stats.records)+EXTRA_RECORDS;
187
records= min((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
188
table->file->stats.records)+EXTRA_RECORDS;
181
189
selected_records_file=0;
186
194
records= table->file->estimate_rows_upper_bound();
188
If number of records is not known, use as much of sort buffer
196
If number of records is not known, use as much of sort buffer
191
199
if (records == HA_POS_ERROR)
192
200
records--; // we use 'records+1' below.
196
204
if (multi_byte_charset &&
197
!(param.tmp_buffer= (char*) my_malloc(param.sort_length,MYF(MY_WME))))
205
!(param.tmp_buffer= (char*) malloc(param.sort_length)))
200
memavl= thd->variables.sortbuff_size;
201
min_sort_memory= cmax((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
208
memavl= session->variables.sortbuff_size;
209
min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
202
210
while (memavl >= min_sort_memory)
204
212
uint32_t old_memavl;
205
213
uint32_t keys= memavl/(param.rec_length+sizeof(char*));
206
param.keys=(uint32_t) cmin(records+1, keys);
214
param.keys= (uint32_t) min(records+1, (ha_rows)keys);
207
215
if ((table_sort.sort_keys=
208
216
(unsigned char **) make_char_array((char **) table_sort.sort_keys,
209
param.keys, param.rec_length, MYF(0))))
217
param.keys, param.rec_length)))
212
if ((memavl=memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
220
if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
213
221
memavl= min_sort_memory;
215
223
sort_keys= table_sort.sort_keys;
218
226
my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
221
if (open_cached_file(&buffpek_pointers,mysql_tmpdir,TEMP_PREFIX,
229
if (open_cached_file(&buffpek_pointers,drizzle_tmpdir,TEMP_PREFIX,
222
230
DISK_BUFFER_SIZE, MYF(MY_WME)))
253
261
close_cached_file(&buffpek_pointers);
254
262
/* Open cached file if it isn't open */
255
263
if (! my_b_inited(outfile) &&
256
open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
264
open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
259
267
if (reinit_io_cache(outfile,WRITE_CACHE,0L,0,0))
312
320
my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
313
321
MYF(ME_ERROR+ME_WAITTANG));
315
statistic_add(thd->status_var.filesort_rows,
323
statistic_add(session->status_var.filesort_rows,
316
324
(uint32_t) records, &LOCK_status);
317
325
*examined_rows= param.examined_rows;
318
326
memcpy(&table->sort, &table_sort, sizeof(filesort_info_st));
356
364
/** Make a array of string pointers. */
358
366
static char **make_char_array(char **old_pos, register uint32_t fields,
359
uint32_t length, myf my_flag)
361
369
register char **pos;
365
(old_pos= (char**) my_malloc((uint32_t) fields*(length+sizeof(char*)),
373
(old_pos= (char**) malloc((uint32_t) fields*(length+sizeof(char*)))))
368
375
pos=old_pos; char_pos=((char*) (pos+fields)) -length;
369
376
while (fields--) *(pos++) = (char_pos+= length);
383
390
if (count > UINT_MAX/sizeof(BUFFPEK))
384
391
return 0; /* sizeof(BUFFPEK)*count will overflow */
386
tmp= (unsigned char *)my_malloc(length, MYF(MY_WME));
393
tmp= (unsigned char *)malloc(length);
389
396
if (reinit_io_cache(buffpek_pointers,READ_CACHE,0L,0,0) ||
444
451
unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
446
453
Table *sort_form;
447
THD *thd= current_thd;
448
volatile THD::killed_state *killed= &thd->killed;
454
Session *session= current_session;
455
volatile Session::killed_state *killed= &session->killed;
450
457
MY_BITMAP *save_read_set, *save_write_set;
467
474
next_pos=(unsigned char*) 0; /* Find records in sequence */
468
475
file->ha_rnd_init(1);
469
476
file->extra_opt(HA_EXTRA_CACHE,
470
current_thd->variables.read_buff_size);
477
current_session->variables.read_buff_size);
473
480
READ_RECORD read_record_info;
476
483
if (select->quick->reset())
477
484
return(HA_POS_ERROR);
478
init_read_record(&read_record_info, current_thd, select->quick->head,
485
init_read_record(&read_record_info, current_session, select->quick->head,
626
634
rec_length= param->rec_length;
627
635
my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
628
636
if (!my_b_inited(tempfile) &&
629
open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
637
open_cached_file(tempfile, drizzle_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
631
639
goto err; /* purecov: inspected */
632
640
/* check we won't have more buffpeks than we can possibly keep in memory */
802
809
to[0]= (unsigned char) (value >> 56);
804
811
to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
806
to[3]= (unsigned char) value;
807
to[2]= (unsigned char) (value >> 8);
808
to[1]= (unsigned char) (value >> 16);
809
if (item->unsigned_flag) /* Fix sign */
810
to[0]= (unsigned char) (value >> 24);
812
to[0]= (unsigned char) (value >> 24) ^ 128; /* Reverse signbit */
816
814
case DECIMAL_RESULT:
924
922
register SORT_FIELD *sort_field;
925
923
Table *table=param->sort_form;
926
MY_BITMAP *bitmap= table->read_set;
928
925
for (sort_field= param->local_sortorder ;
929
926
sort_field != param->end ;
933
930
if ((field= sort_field->field))
935
932
if (field->table == table)
936
bitmap_set_bit(bitmap, field->field_index);
933
table->setReadSet(field->field_index);
947
944
SORT_ADDON_FIELD *addonf= param->addon_field;
949
946
for ( ; (field= addonf->field) ; addonf++)
950
bitmap_set_bit(bitmap, field->field_index);
947
table->setReadSet(field->field_index);
960
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
957
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
961
958
filesort_info_st *table_sort)
963
960
uint32_t offset,res_length;
968
965
offset= param->rec_length-res_length;
969
966
if ((ha_rows) count > param->max_rows)
970
967
count=(uint32_t) param->max_rows;
971
if (!(to= table_sort->record_pointers=
972
(unsigned char*) my_malloc(res_length*count, MYF(MY_WME))))
968
if (!(to= table_sort->record_pointers=
969
(unsigned char*) malloc(res_length*count)))
973
970
return(1); /* purecov: inspected */
974
971
for (unsigned char **end= sort_keys+count ; sort_keys != end ; sort_keys++)
992
989
if (*maxbuffer < MERGEBUFF2)
993
990
return(0); /* purecov: inspected */
994
991
if (flush_io_cache(t_file) ||
995
open_cached_file(&t_file2,mysql_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
992
open_cached_file(&t_file2,drizzle_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
997
994
return(1); /* purecov: inspected */
1042
1039
uint32_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
1043
uint32_t rec_length)
1040
uint32_t rec_length)
1045
1042
register uint32_t count;
1046
1043
uint32_t length;
1048
if ((count=(uint32_t) cmin((ha_rows) buffpek->max_keys,buffpek->count)))
1045
if ((count= (uint32_t) min((ha_rows) buffpek->max_keys,buffpek->count)))
1050
1047
if (pread(fromfile->file,(unsigned char*) buffpek->base, (length= rec_length*count),buffpek->file_pos) == 0)
1051
1048
return((uint32_t) -1); /* purecov: inspected */
1052
buffpek->key=buffpek->base;
1050
buffpek->key= buffpek->base;
1053
1051
buffpek->file_pos+= length; /* New filepos */
1054
buffpek->count-= count;
1052
buffpek->count-= count;
1055
1053
buffpek->mem_count= count;
1057
1055
return (count*rec_length);
1058
1056
} /* read_to_buffer */
1062
Put all room used by freed buffer to use in adjacent buffer.
1064
Note, that we can't simply distribute memory evenly between all buffers,
1065
because new areas must not overlap with old ones.
1067
@param[in] queue list of non-empty buffers, without freed buffer
1068
@param[in] reuse empty buffer
1069
@param[in] key_length key length
1072
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint32_t key_length)
1059
class compare_functor
1074
unsigned char *reuse_end= reuse->base + reuse->max_keys * key_length;
1075
for (uint32_t i= 0; i < queue->elements; ++i)
1061
qsort2_cmp key_compare;
1062
void *key_compare_arg;
1064
compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
1065
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
1066
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
1077
BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
1078
if (bp->base + bp->max_keys * key_length == reuse->base)
1080
bp->max_keys+= reuse->max_keys;
1083
else if (bp->base == reuse_end)
1085
bp->base= reuse->base;
1086
bp->max_keys+= reuse->max_keys;
1068
int val= key_compare(key_compare_arg,
1122
1103
my_off_t to_start_filepos;
1123
1104
unsigned char *strpos;
1124
1105
BUFFPEK *buffpek;
1126
1106
qsort2_cmp cmp;
1127
1107
void *first_cmp_arg;
1128
volatile THD::killed_state *killed= ¤t_thd->killed;
1129
THD::killed_state not_killable;
1108
volatile Session::killed_state *killed= ¤t_session->killed;
1109
Session::killed_state not_killable;
1131
status_var_increment(current_thd->status_var.filesort_merge_passes);
1111
status_var_increment(current_session->status_var.filesort_merge_passes);
1132
1112
if (param->not_killable)
1134
1114
killed= ¬_killable;
1135
not_killable= THD::NOT_KILLED;
1115
not_killable= Session::NOT_KILLED;
1158
1138
cmp= get_ptr_compare(sort_length);
1159
1139
first_cmp_arg= (void*) &sort_length;
1161
if (init_queue(&queue, (uint32_t) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
1162
(queue_compare) cmp, first_cmp_arg))
1163
return(1); /* purecov: inspected */
1141
priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor >
1142
queue(compare_functor(cmp, first_cmp_arg));
1164
1143
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1166
1145
buffpek->base= strpos;
1170
1149
if (error == -1)
1171
1150
goto err; /* purecov: inspected */
1172
1151
buffpek->max_keys= buffpek->mem_count; // If less data in buffers than expected
1173
queue_insert(&queue, (unsigned char*) buffpek);
1152
queue.push(buffpek);
1176
1155
if (param->unique_buff)
1179
1158
Called by Unique::get()
1180
1159
Copy the first argument to param->unique_buff for unique removal.
1181
1160
Store it also in 'to_file'.
1183
1162
This is safe as we know that there is always more than one element
1184
1163
in each block to merge (This is guaranteed by the Unique:: algorithm
1186
buffpek= (BUFFPEK*) queue_top(&queue);
1165
buffpek= queue.top();
1187
1166
memcpy(param->unique_buff, buffpek->key, rec_length);
1188
1167
if (my_b_write(to_file, (unsigned char*) buffpek->key, rec_length))
1196
1175
error= 0; /* purecov: inspected */
1197
1176
goto end; /* purecov: inspected */
1199
queue_replaced(&queue); // Top element has been used
1178
/* Top element has been used */
1180
queue.push(buffpek);
1202
1183
cmp= 0; // Not unique
1204
while (queue.elements > 1)
1185
while (queue.size() > 1)
1244
1225
if (!(error= (int) read_to_buffer(from_file,buffpek,
1247
queue_remove(&queue,0);
1248
reuse_freed_buff(&queue, buffpek, rec_length);
1249
1229
break; /* One buffer have been removed */
1251
1231
else if (error == -1)
1252
1232
goto err; /* purecov: inspected */
1254
queue_replaced(&queue); /* Top element has been replaced */
1234
/* Top element has been replaced */
1236
queue.push(buffpek);
1257
buffpek= (BUFFPEK*) queue_top(&queue);
1239
buffpek= queue.top();
1258
1240
buffpek->base= sort_buffer;
1259
1241
buffpek->max_keys= param->keys;
1344
1325
Calculate length of sort key.
1346
@param thd Thread handler
1327
@param session Thread handler
1347
1328
@param sortorder Order of items to sort
1348
1329
@param s_length Number of items to sort
1349
1330
@param[out] multi_byte_charset Set to 1 if we are using multi-byte charset
1361
1342
static uint32_t
1362
sortlength(THD *thd, SORT_FIELD *sortorder, uint32_t s_length,
1343
sortlength(Session *session, SORT_FIELD *sortorder, uint32_t s_length,
1363
1344
bool *multi_byte_charset)
1365
1346
register uint32_t length;
1393
1374
switch (sortorder->result_type) {
1394
1375
case STRING_RESULT:
1395
1376
sortorder->length=sortorder->item->max_length;
1396
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
1377
set_if_smaller(sortorder->length,
1378
session->variables.max_sort_length);
1397
1379
if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1399
1381
sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1400
1382
sortorder->need_strxnfrm= 1;
1401
1383
*multi_byte_charset= 1;
1410
1392
case INT_RESULT:
1411
#if SIZEOF_LONG_LONG > 4
1412
1393
sortorder->length=8; // Size of intern int64_t
1414
sortorder->length=4;
1417
1395
case DECIMAL_RESULT:
1418
1396
sortorder->length=
1419
my_decimal_get_binary_size(sortorder->item->max_length -
1397
my_decimal_get_binary_size(sortorder->item->max_length -
1420
1398
(sortorder->item->decimals ? 1 : 0),
1421
1399
sortorder->item->decimals);
1432
1410
if (sortorder->item->maybe_null)
1433
1411
length++; // Place for NULL marker
1435
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
1413
set_if_smaller(sortorder->length,
1414
(size_t)session->variables.max_sort_length);
1436
1415
length+=sortorder->length;
1438
1417
sortorder->field= (Field*) 0; // end marker
1447
1426
The function first finds out what fields are used in the result set.
1448
1427
Then it calculates the length of the buffer to store the values of
1449
these fields together with the value of sort values.
1428
these fields together with the value of sort values.
1450
1429
If the calculated length is not greater than max_length_for_sort_data
1451
1430
the function allocates memory for an array of descriptors containing
1452
1431
layouts for the values of the non-sorted fields in the buffer and
1455
@param thd Current thread
1434
@param session Current thread
1456
1435
@param ptabfield Array of references to the table fields
1457
1436
@param sortlength Total length of sorted fields
1458
1437
@param[out] plength Total length of appended fields
1470
1449
static SORT_ADDON_FIELD *
1471
get_addon_fields(THD *thd, Field **ptabfield, uint32_t sortlength, uint32_t *plength)
1450
get_addon_fields(Session *session, Field **ptabfield, uint32_t sortlength, uint32_t *plength)
1473
1452
Field **pfield;
1484
1462
Note for future refinement:
1485
1463
This this a too strong condition.
1486
1464
Actually we need only the fields referred in the
1487
result set. And for some of them it makes sense to use
1465
result set. And for some of them it makes sense to use
1488
1466
the values directly from sorted fields.
1492
1470
for (pfield= ptabfield; (field= *pfield) ; pfield++)
1494
if (!bitmap_is_set(read_set, field->field_index))
1472
if (!(field->isReadSet()))
1496
1474
if (field->flags & BLOB_FLAG)
1499
1477
if (field->maybe_null())
1505
1483
length+= (null_fields+7)/8;
1507
if (length+sortlength > thd->variables.max_length_for_sort_data ||
1508
!(addonf= (SORT_ADDON_FIELD *) my_malloc(sizeof(SORT_ADDON_FIELD)*
1509
(fields+1), MYF(MY_WME))))
1485
if (length+sortlength > session->variables.max_length_for_sort_data ||
1486
!(addonf= (SORT_ADDON_FIELD *) malloc(sizeof(SORT_ADDON_FIELD)*
1512
1490
*plength= length;