1
/* Copyright (C) 2000-2006 MySQL AB
1
/* Copyright (C) 2000-2006 MySQL AB
3
3
This program is free software; you can redistribute it and/or modify
4
4
it under the terms of the GNU General Public License as published by
24
24
#include <drizzled/server_includes.h>
26
#include <drizzled/drizzled_error_messages.h>
28
/* functions defined in this file */
25
#include <drizzled/sql_sort.h>
26
#include <drizzled/error.h>
27
#include <drizzled/probes.h>
28
#include <drizzled/session.h>
29
#include <drizzled/table.h>
30
#include <drizzled/table_list.h>
36
/* functions defined in this file */
30
38
static char **make_char_array(char **old_pos, register uint32_t fields,
31
uint32_t length, myf my_flag);
32
40
static unsigned char *read_buffpek_from_file(IO_CACHE *buffer_file, uint32_t count,
33
41
unsigned char *buf);
34
42
static ha_rows find_all_keys(SORTPARAM *param,SQL_SELECT *select,
43
51
uint32_t maxbuffer,IO_CACHE *tempfile,
44
52
IO_CACHE *outfile);
45
static bool save_index(SORTPARAM *param,unsigned char **sort_keys, uint32_t count,
53
static bool save_index(SORTPARAM *param,unsigned char **sort_keys, uint32_t count,
46
54
filesort_info_st *table_sort);
47
55
static uint32_t suffix_length(uint32_t string_length);
48
static uint32_t sortlength(THD *thd, SORT_FIELD *sortorder, uint32_t s_length,
56
static uint32_t sortlength(Session *session, SORT_FIELD *sortorder, uint32_t s_length,
49
57
bool *multi_byte_charset);
50
static SORT_ADDON_FIELD *get_addon_fields(THD *thd, Field **ptabfield,
58
static SORT_ADDON_FIELD *get_addon_fields(Session *session, Field **ptabfield,
51
59
uint32_t sortlength, uint32_t *plength);
52
60
static void unpack_addon_fields(struct st_sort_addon_field *addon_field,
53
61
unsigned char *buff);
63
71
The result set is stored in table->io_cache or
64
72
table->record_pointers.
66
@param thd Current thread
74
@param session Current thread
67
75
@param table Table to sort
68
76
@param sortorder How to sort the table
69
77
@param s_length Number of elements in sortorder
87
95
examined_rows will be set to number of examined rows
90
ha_rows filesort(THD *thd, Table *table, SORT_FIELD *sortorder, uint32_t s_length,
98
ha_rows filesort(Session *session, Table *table, SORT_FIELD *sortorder, uint32_t s_length,
91
99
SQL_SELECT *select, ha_rows max_rows,
92
100
bool sort_positions, ha_rows *examined_rows)
98
106
ha_rows records= HA_POS_ERROR;
99
107
unsigned char **sort_keys= 0;
100
IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
108
IO_CACHE tempfile, buffpek_pointers, *selected_records_file, *outfile;
102
110
bool multi_byte_charset;
111
119
Release InnoDB's adaptive hash index latch (if holding) before
114
ha_release_temporary_latches(thd);
122
ha_release_temporary_latches(session);
117
Don't use table->sort in filesort as it is also used by
118
QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end
125
Don't use table->sort in filesort as it is also used by
126
QUICK_INDEX_MERGE_SELECT. Work with a copy and put it back at the end
119
127
when index_merge select has finished with it.
121
129
memcpy(&table_sort, &table->sort, sizeof(filesort_info_st));
122
130
table->sort.io_cache= NULL;
124
132
outfile= table_sort.io_cache;
125
133
my_b_clear(&tempfile);
126
134
my_b_clear(&buffpek_pointers);
129
137
memset(¶m, 0, sizeof(param));
130
param.sort_length= sortlength(thd, sortorder, s_length, &multi_byte_charset);
138
param.sort_length= sortlength(session, sortorder, s_length, &multi_byte_charset);
131
139
param.ref_length= table->file->ref_length;
132
140
param.addon_field= 0;
133
141
param.addon_length= 0;
134
142
if (!(table->file->ha_table_flags() & HA_FAST_KEY_READ) && !sort_positions)
137
Get the descriptors of all fields whose values are appended
145
Get the descriptors of all fields whose values are appended
138
146
to sorted fields and get its total length in param.spack_length.
140
param.addon_field= get_addon_fields(thd, table->field,
148
param.addon_field= get_addon_fields(session, table->field,
141
149
param.sort_length,
142
150
¶m.addon_length);
149
157
if (param.addon_field)
151
159
param.res_length= param.addon_length;
152
if (!(table_sort.addon_buf= (unsigned char *) my_malloc(param.addon_length,
160
if (!(table_sort.addon_buf= (unsigned char *) malloc(param.addon_length)))
158
165
param.res_length= param.ref_length;
160
The reference to the record is considered
167
The reference to the record is considered
161
168
as an additional sorted field
163
170
param.sort_length+= param.ref_length;
168
175
if (select && select->quick)
170
status_var_increment(thd->status_var.filesort_range_count);
177
status_var_increment(session->status_var.filesort_range_count);
174
status_var_increment(thd->status_var.filesort_scan_count);
181
status_var_increment(session->status_var.filesort_scan_count);
176
183
#ifdef CAN_TRUST_RANGE
177
184
if (select && select->quick && select->quick->records > 0L)
186
193
records= table->file->estimate_rows_upper_bound();
188
If number of records is not known, use as much of sort buffer
195
If number of records is not known, use as much of sort buffer
191
198
if (records == HA_POS_ERROR)
192
199
records--; // we use 'records+1' below.
196
203
if (multi_byte_charset &&
197
!(param.tmp_buffer= (char*) my_malloc(param.sort_length,MYF(MY_WME))))
204
!(param.tmp_buffer= (char*) malloc(param.sort_length)))
200
memavl= thd->variables.sortbuff_size;
207
memavl= session->variables.sortbuff_size;
201
208
min_sort_memory= cmax((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
202
209
while (memavl >= min_sort_memory)
206
213
param.keys=(uint32_t) cmin(records+1, keys);
207
214
if ((table_sort.sort_keys=
208
215
(unsigned char **) make_char_array((char **) table_sort.sort_keys,
209
param.keys, param.rec_length, MYF(0))))
216
param.keys, param.rec_length)))
211
218
old_memavl=memavl;
212
219
if ((memavl=memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
218
225
my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
221
if (open_cached_file(&buffpek_pointers,mysql_tmpdir,TEMP_PREFIX,
228
if (open_cached_file(&buffpek_pointers,drizzle_tmpdir,TEMP_PREFIX,
222
229
DISK_BUFFER_SIZE, MYF(MY_WME)))
253
260
close_cached_file(&buffpek_pointers);
254
261
/* Open cached file if it isn't open */
255
262
if (! my_b_inited(outfile) &&
256
open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
263
open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
259
266
if (reinit_io_cache(outfile,WRITE_CACHE,0L,0,0))
312
319
my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
313
320
MYF(ME_ERROR+ME_WAITTANG));
315
statistic_add(thd->status_var.filesort_rows,
322
statistic_add(session->status_var.filesort_rows,
316
323
(uint32_t) records, &LOCK_status);
317
324
*examined_rows= param.examined_rows;
318
325
memcpy(&table->sort, &table_sort, sizeof(filesort_info_st));
356
363
/** Make a array of string pointers. */
358
365
static char **make_char_array(char **old_pos, register uint32_t fields,
359
uint32_t length, myf my_flag)
361
368
register char **pos;
365
(old_pos= (char**) my_malloc((uint32_t) fields*(length+sizeof(char*)),
372
(old_pos= (char**) malloc((uint32_t) fields*(length+sizeof(char*)))))
368
374
pos=old_pos; char_pos=((char*) (pos+fields)) -length;
369
375
while (fields--) *(pos++) = (char_pos+= length);
383
389
if (count > UINT_MAX/sizeof(BUFFPEK))
384
390
return 0; /* sizeof(BUFFPEK)*count will overflow */
386
tmp= (unsigned char *)my_malloc(length, MYF(MY_WME));
392
tmp= (unsigned char *)malloc(length);
389
395
if (reinit_io_cache(buffpek_pointers,READ_CACHE,0L,0,0) ||
444
450
unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
446
452
Table *sort_form;
447
THD *thd= current_thd;
448
volatile THD::killed_state *killed= &thd->killed;
453
Session *session= current_session;
454
volatile Session::killed_state *killed= &session->killed;
450
456
MY_BITMAP *save_read_set, *save_write_set;
467
473
next_pos=(unsigned char*) 0; /* Find records in sequence */
468
474
file->ha_rnd_init(1);
469
475
file->extra_opt(HA_EXTRA_CACHE,
470
current_thd->variables.read_buff_size);
476
current_session->variables.read_buff_size);
473
479
READ_RECORD read_record_info;
476
482
if (select->quick->reset())
477
483
return(HA_POS_ERROR);
478
init_read_record(&read_record_info, current_thd, select->quick->head,
484
init_read_record(&read_record_info, current_session, select->quick->head,
626
635
rec_length= param->rec_length;
627
636
my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
628
637
if (!my_b_inited(tempfile) &&
629
open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
638
open_cached_file(tempfile, drizzle_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
631
640
goto err; /* purecov: inspected */
632
641
/* check we won't have more buffpeks than we can possibly keep in memory */
802
810
to[0]= (unsigned char) (value >> 56);
804
812
to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
806
to[3]= (unsigned char) value;
807
to[2]= (unsigned char) (value >> 8);
808
to[1]= (unsigned char) (value >> 16);
809
if (item->unsigned_flag) /* Fix sign */
810
to[0]= (unsigned char) (value >> 24);
812
to[0]= (unsigned char) (value >> 24) ^ 128; /* Reverse signbit */
816
815
case DECIMAL_RESULT:
960
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
959
static bool save_index(SORTPARAM *param, unsigned char **sort_keys, uint32_t count,
961
960
filesort_info_st *table_sort)
963
962
uint32_t offset,res_length;
968
967
offset= param->rec_length-res_length;
969
968
if ((ha_rows) count > param->max_rows)
970
969
count=(uint32_t) param->max_rows;
971
if (!(to= table_sort->record_pointers=
972
(unsigned char*) my_malloc(res_length*count, MYF(MY_WME))))
970
if (!(to= table_sort->record_pointers=
971
(unsigned char*) malloc(res_length*count)))
973
972
return(1); /* purecov: inspected */
974
973
for (unsigned char **end= sort_keys+count ; sort_keys != end ; sort_keys++)
992
991
if (*maxbuffer < MERGEBUFF2)
993
992
return(0); /* purecov: inspected */
994
993
if (flush_io_cache(t_file) ||
995
open_cached_file(&t_file2,mysql_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
994
open_cached_file(&t_file2,drizzle_tmpdir,TEMP_PREFIX,DISK_BUFFER_SIZE,
997
996
return(1); /* purecov: inspected */
1058
1057
} /* read_to_buffer */
1062
Put all room used by freed buffer to use in adjacent buffer.
1064
Note, that we can't simply distribute memory evenly between all buffers,
1065
because new areas must not overlap with old ones.
1067
@param[in] queue list of non-empty buffers, without freed buffer
1068
@param[in] reuse empty buffer
1069
@param[in] key_length key length
1072
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint32_t key_length)
1060
class compare_functor
1074
unsigned char *reuse_end= reuse->base + reuse->max_keys * key_length;
1075
for (uint32_t i= 0; i < queue->elements; ++i)
1062
qsort2_cmp key_compare;
1063
void *key_compare_arg;
1065
compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
1066
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
1067
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
1077
BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
1078
if (bp->base + bp->max_keys * key_length == reuse->base)
1080
bp->max_keys+= reuse->max_keys;
1083
else if (bp->base == reuse_end)
1085
bp->base= reuse->base;
1086
bp->max_keys+= reuse->max_keys;
1069
int val= key_compare(key_compare_arg,
1122
1104
my_off_t to_start_filepos;
1123
1105
unsigned char *strpos;
1124
1106
BUFFPEK *buffpek;
1126
1107
qsort2_cmp cmp;
1127
1108
void *first_cmp_arg;
1128
volatile THD::killed_state *killed= ¤t_thd->killed;
1129
THD::killed_state not_killable;
1109
volatile Session::killed_state *killed= ¤t_session->killed;
1110
Session::killed_state not_killable;
1131
status_var_increment(current_thd->status_var.filesort_merge_passes);
1112
status_var_increment(current_session->status_var.filesort_merge_passes);
1132
1113
if (param->not_killable)
1134
1115
killed= ¬_killable;
1135
not_killable= THD::NOT_KILLED;
1116
not_killable= Session::NOT_KILLED;
1158
1139
cmp= get_ptr_compare(sort_length);
1159
1140
first_cmp_arg= (void*) &sort_length;
1161
if (init_queue(&queue, (uint32_t) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
1162
(queue_compare) cmp, first_cmp_arg))
1163
return(1); /* purecov: inspected */
1142
priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor >
1143
queue(compare_functor(cmp, first_cmp_arg));
1164
1144
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1166
1146
buffpek->base= strpos;
1170
1150
if (error == -1)
1171
1151
goto err; /* purecov: inspected */
1172
1152
buffpek->max_keys= buffpek->mem_count; // If less data in buffers than expected
1173
queue_insert(&queue, (unsigned char*) buffpek);
1153
queue.push(buffpek);
1176
1156
if (param->unique_buff)
1179
1159
Called by Unique::get()
1180
1160
Copy the first argument to param->unique_buff for unique removal.
1181
1161
Store it also in 'to_file'.
1183
1163
This is safe as we know that there is always more than one element
1184
1164
in each block to merge (This is guaranteed by the Unique:: algorithm
1186
buffpek= (BUFFPEK*) queue_top(&queue);
1166
buffpek= queue.top();
1187
1167
memcpy(param->unique_buff, buffpek->key, rec_length);
1188
1168
if (my_b_write(to_file, (unsigned char*) buffpek->key, rec_length))
1196
1176
error= 0; /* purecov: inspected */
1197
1177
goto end; /* purecov: inspected */
1199
queue_replaced(&queue); // Top element has been used
1179
/* Top element has been used */
1181
queue.push(buffpek);
1202
1184
cmp= 0; // Not unique
1204
while (queue.elements > 1)
1186
while (queue.size() > 1)
1244
1226
if (!(error= (int) read_to_buffer(from_file,buffpek,
1247
queue_remove(&queue,0);
1248
reuse_freed_buff(&queue, buffpek, rec_length);
1249
1230
break; /* One buffer have been removed */
1251
1232
else if (error == -1)
1252
1233
goto err; /* purecov: inspected */
1254
queue_replaced(&queue); /* Top element has been replaced */
1235
/* Top element has been replaced */
1237
queue.push(buffpek);
1257
buffpek= (BUFFPEK*) queue_top(&queue);
1240
buffpek= queue.top();
1258
1241
buffpek->base= sort_buffer;
1259
1242
buffpek->max_keys= param->keys;
1344
1326
Calculate length of sort key.
1346
@param thd Thread handler
1328
@param session Thread handler
1347
1329
@param sortorder Order of items to sort
1348
1330
@param s_length Number of items to sort
1349
1331
@param[out] multi_byte_charset Set to 1 if we are using multi-byte charset
1361
1343
static uint32_t
1362
sortlength(THD *thd, SORT_FIELD *sortorder, uint32_t s_length,
1344
sortlength(Session *session, SORT_FIELD *sortorder, uint32_t s_length,
1363
1345
bool *multi_byte_charset)
1365
1347
register uint32_t length;
1393
1375
switch (sortorder->result_type) {
1394
1376
case STRING_RESULT:
1395
1377
sortorder->length=sortorder->item->max_length;
1396
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
1378
set_if_smaller(sortorder->length,
1379
session->variables.max_sort_length);
1397
1380
if (use_strnxfrm((cs=sortorder->item->collation.collation)))
1399
1382
sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
1400
1383
sortorder->need_strxnfrm= 1;
1401
1384
*multi_byte_charset= 1;
1410
1393
case INT_RESULT:
1411
#if SIZEOF_LONG_LONG > 4
1412
1394
sortorder->length=8; // Size of intern int64_t
1414
sortorder->length=4;
1417
1396
case DECIMAL_RESULT:
1418
1397
sortorder->length=
1419
my_decimal_get_binary_size(sortorder->item->max_length -
1398
my_decimal_get_binary_size(sortorder->item->max_length -
1420
1399
(sortorder->item->decimals ? 1 : 0),
1421
1400
sortorder->item->decimals);
1432
1411
if (sortorder->item->maybe_null)
1433
1412
length++; // Place for NULL marker
1435
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
1414
set_if_smaller(sortorder->length,
1415
(size_t)session->variables.max_sort_length);
1436
1416
length+=sortorder->length;
1438
1418
sortorder->field= (Field*) 0; // end marker
1447
1427
The function first finds out what fields are used in the result set.
1448
1428
Then it calculates the length of the buffer to store the values of
1449
these fields together with the value of sort values.
1429
these fields together with the value of sort values.
1450
1430
If the calculated length is not greater than max_length_for_sort_data
1451
1431
the function allocates memory for an array of descriptors containing
1452
1432
layouts for the values of the non-sorted fields in the buffer and
1455
@param thd Current thread
1435
@param session Current thread
1456
1436
@param ptabfield Array of references to the table fields
1457
1437
@param sortlength Total length of sorted fields
1458
1438
@param[out] plength Total length of appended fields
1470
1450
static SORT_ADDON_FIELD *
1471
get_addon_fields(THD *thd, Field **ptabfield, uint32_t sortlength, uint32_t *plength)
1451
get_addon_fields(Session *session, Field **ptabfield, uint32_t sortlength, uint32_t *plength)
1473
1453
Field **pfield;
1484
1464
Note for future refinement:
1485
1465
This this a too strong condition.
1486
1466
Actually we need only the fields referred in the
1487
result set. And for some of them it makes sense to use
1467
result set. And for some of them it makes sense to use
1488
1468
the values directly from sorted fields.
1499
1479
if (field->maybe_null())
1505
1485
length+= (null_fields+7)/8;
1507
if (length+sortlength > thd->variables.max_length_for_sort_data ||
1508
!(addonf= (SORT_ADDON_FIELD *) my_malloc(sizeof(SORT_ADDON_FIELD)*
1509
(fields+1), MYF(MY_WME))))
1487
if (length+sortlength > session->variables.max_length_for_sort_data ||
1488
!(addonf= (SORT_ADDON_FIELD *) malloc(sizeof(SORT_ADDON_FIELD)*
1512
1492
*plength= length;