21
21
The basic idea is as follows:
23
23
Store first all strings in a binary tree, ignoring duplicates.
24
When the tree uses more memory than 'max_heap_table_size',
25
write the tree (in sorted order) out to disk and start with a new tree.
26
When all data has been generated, merge the trees (removing any found
25
29
The unique entries will be returned in sort order, to ensure that we do the
26
30
deletes in disk order.
33
#include <drizzled/server_includes.h>
34
#include <drizzled/sql_sort.h>
35
#include <drizzled/session.h>
35
#include "drizzled/sql_sort.h"
36
#include "drizzled/session.h"
37
#include "drizzled/sql_list.h"
38
#include "drizzled/internal/iocache.h"
40
38
#if defined(CMATH_NAMESPACE)
41
39
using namespace CMATH_NAMESPACE;
44
42
using namespace std;
45
int unique_write_to_file(unsigned char* key, element_count,
49
Use unique->size (size of element stored in the tree) and not
50
unique->tree.size_of_element. The latter is different from unique->size
51
when tree implementation chooses to store pointer to key in TREE_ELEMENT
52
(instead of storing the element itself there)
54
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
49
57
int unique_write_to_ptrs(unsigned char* key,
50
uint32_t, Unique *unique)
58
element_count, Unique *unique)
52
60
memcpy(unique->record_pointers, key, unique->size);
53
61
unique->record_pointers+=unique->size;
57
65
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
58
66
uint32_t size_arg, size_t max_in_memory_size_arg)
59
: max_in_memory_size(max_in_memory_size_arg),
67
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
63
// Second element is max size for memory use in unique sort
64
init_tree(&tree, 0, 0, size, comp_func, false,
70
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
65
71
NULL, comp_func_fixed_arg);
72
/* If the following fail's the next add will also fail */
73
my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
75
If you change the following, change it in get_max_elements function, too.
77
max_elements= (ulong) (max_in_memory_size /
78
ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
79
open_cached_file(&file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
106
Calculate cost of merge_buffers function call for given sequence of
107
input stream lengths and store the number of rows in result stream in *last.
110
get_merge_buffers_cost()
111
buff_elems Array of #s of elements in buffers
112
elem_size Size of element stored in buffer
113
first Pointer to first merged element size
114
last Pointer to last merged element size
117
Cost of merge_buffers operation in disk seeks.
120
It is assumed that no rows are eliminated during merge.
121
The cost is calculated as
123
cost(read_and_write) + cost(merge_comparisons).
125
All bytes in the sequences is read and written back during merge so cost
126
of disk io is 2*elem_size*total_buf_elems/IO_SIZE (2 is for read + write)
128
For comparisons cost calculations we assume that all merged sequences have
129
the same length, so each of total_buf_size elements will be added to a sort
130
heap with (n_buffers-1) elements. This gives the comparison cost:
132
total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
135
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
136
uint32_t *first, uint32_t *last)
138
uint32_t total_buf_elems= 0;
139
for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
140
total_buf_elems+= *pbuf;
141
*last= total_buf_elems;
143
int n_buffers= last - first + 1;
145
/* Using log2(n)=log(n)/log(2) formula */
146
return 2*((double)total_buf_elems*elem_size) / IO_SIZE +
147
total_buf_elems*log((double) n_buffers) / (TIME_FOR_COMPARE_ROWID * M_LN2);
152
Calculate cost of merging buffers into one in Unique::get, i.e. calculate
153
how long (in terms of disk seeks) the two calls
154
merge_many_buffs(...);
159
get_merge_many_buffs_cost()
160
buffer buffer space for temporary data, at least
161
Unique::get_cost_calc_buff_size bytes
162
maxbuffer # of full buffers
163
max_n_elems # of elements in first maxbuffer buffers
164
last_n_elems # of elements in last buffer
165
elem_size size of buffer element
168
maxbuffer+1 buffers are merged, where first maxbuffer buffers contain
169
max_n_elems elements each and last buffer contains last_n_elems elements.
171
The current implementation does a dumb simulation of merge_many_buffs
175
Cost of merge in disk seeks.
178
static double get_merge_many_buffs_cost(uint32_t *buffer,
179
uint32_t maxbuffer, uint32_t max_n_elems,
180
uint32_t last_n_elems, int elem_size)
183
double total_cost= 0.0;
184
uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
187
Set initial state: first maxbuffer sequences contain max_n_elems elements
188
each, last sequence contains last_n_elems elements.
190
for (i = 0; i < (int)maxbuffer; i++)
191
buff_elems[i]= max_n_elems;
192
buff_elems[maxbuffer]= last_n_elems;
195
Do it exactly as merge_many_buff function does, calling
196
get_merge_buffers_cost to get cost of merge_buffers.
198
if (maxbuffer >= MERGEBUFF2)
200
while (maxbuffer >= MERGEBUFF2)
202
uint32_t lastbuff= 0;
203
for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
205
total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
207
buff_elems + i + MERGEBUFF-1);
210
total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
212
buff_elems + maxbuffer);
217
/* Simulate final merge_buff call. */
218
total_cost += get_merge_buffers_cost(buff_elems, elem_size,
219
buff_elems, buff_elems + maxbuffer);
91
225
Calculate cost of using Unique for processing nkeys elements of size
92
226
key_size using max_in_memory_size memory.
137
268
these will be random seeks.
140
double Unique::get_use_cost(uint32_t *, uint32_t nkeys, uint32_t key_size,
141
size_t max_in_memory_size_arg)
271
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
272
size_t max_in_memory_size)
143
274
ulong max_elements_in_tree;
144
275
ulong last_tree_elems;
276
int n_full_trees; /* number of trees in unique - 1 */
147
max_elements_in_tree= ((ulong) max_in_memory_size_arg /
279
max_elements_in_tree= ((ulong) max_in_memory_size /
148
280
ALIGN_SIZE(sizeof(TREE_ELEMENT)+key_size));
282
n_full_trees= nkeys / max_elements_in_tree;
150
283
last_tree_elems= nkeys % max_elements_in_tree;
152
285
/* Calculate cost of creating trees */
153
286
result= 2*log2_n_fact(last_tree_elems + 1.0);
288
result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0);
154
289
result /= TIME_FOR_COMPARE_ROWID;
296
There is more then one tree and merging is necessary.
297
First, add cost of writing all trees to disk, assuming that all disk
298
writes are sequential.
300
result += DISK_SEEK_BASE_COST * n_full_trees *
301
ceil(((double) key_size)*max_elements_in_tree / IO_SIZE);
302
result += DISK_SEEK_BASE_COST * ceil(((double) key_size)*last_tree_elems / IO_SIZE);
305
double merge_cost= get_merge_many_buffs_cost(buffer, n_full_trees,
306
max_elements_in_tree,
307
last_tree_elems, key_size);
308
if (merge_cost < 0.0)
311
result += merge_cost;
313
Add cost of reading the resulting sequence, assuming there were no
316
result += ceil((double)key_size*nkeys/IO_SIZE);
159
321
Unique::~Unique()
323
close_cached_file(&file);
325
delete_dynamic(&file_ptrs);
329
/* Write tree to disk; clear tree */
333
elements+= tree.elements_in_tree;
334
file_ptr.count=tree.elements_in_tree;
335
file_ptr.file_pos=my_b_tell(&file);
337
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
338
(void*) this, left_root_right) ||
339
insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
347
Clear the tree and the file.
167
348
You must call reset() if you want to reuse Unique after walk().
173
354
reset_tree(&tree);
174
assert(elements == 0);
356
If elements != 0, some trees were stored in the file (see how
357
flush() works). Note, that we can not count on my_b_tell(&file) == 0
358
here, because it can return 0 right after walk(), and walk() does not
359
reset any Unique member.
363
reset_dynamic(&file_ptrs);
364
reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
370
The comparison function, passed to queue_init() in merge_walk() and in
371
merge_buffers() when the latter is called from Uniques::get() must
372
use comparison function of Uniques::tree, but compare members of struct
380
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
382
BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
383
return ctx->key_compare(ctx->key_compare_arg,
384
*((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
392
The comparison function object, passed to a priority_queue in merge_walk()
393
as its sort function parameter.
396
class buffpek_compare_functor
398
qsort_cmp2 key_compare;
399
void *key_compare_arg;
401
buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
402
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
403
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
405
return key_compare(key_compare_arg,
413
Function is very similar to merge_buffers, but instead of writing sorted
414
unique keys to the output file, it invokes walk_action for each key.
415
This saves I/O if you need to pass through all unique keys only once.
419
All params are 'IN' (but see comment for begin, end):
420
merge_buffer buffer to perform cached piece-by-piece loading
421
of trees; initially the buffer is empty
422
merge_buffer_size size of merge_buffer. Must be aligned with
424
key_length size of tree element; key_length * (end - begin)
425
must be less or equal than merge_buffer_size.
426
begin pointer to BUFFPEK struct for the first tree.
427
end pointer to BUFFPEK struct for the last tree;
428
end > begin and [begin, end) form a consecutive
429
range. BUFFPEKs structs in that range are used and
430
overwritten in merge_walk().
431
walk_action element visitor. Action is called for each unique
433
walk_action_arg argument to walk action. Passed to it on each call.
434
compare elements comparison function
435
compare_arg comparison function argument
436
file file with all trees dumped. Trees in the file
437
must contain sorted unique values. Cache must be
438
initialized in read mode.
444
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
445
uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
446
tree_walk_action walk_action, void *walk_action_arg,
447
qsort_cmp2 compare, void *compare_arg,
451
merge_buffer_size < (ulong) (key_length * (end - begin + 1)))
453
priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
454
queue(buffpek_compare_functor(compare, compare_arg));
455
/* we need space for one key when a piece of merge buffer is re-read */
456
merge_buffer_size-= key_length;
457
unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
458
uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
460
/* if piece_size is aligned reuse_freed_buffer will always hit */
461
uint32_t piece_size= max_key_count_per_piece * key_length;
462
uint32_t bytes_read; /* to hold return value of read_to_buffer */
466
Invariant: queue must contain top element from each tree, until a tree
467
is not completely walked through.
468
Here we're forcing the invariant, inserting one element from each tree
471
for (top= begin; top != end; ++top)
473
top->base= merge_buffer + (top - begin) * piece_size;
474
top->max_keys= max_key_count_per_piece;
475
bytes_read= read_to_buffer(file, top, key_length);
476
if (bytes_read == (uint32_t) (-1))
482
while (queue.size() > 1)
485
Every iteration one element is removed from the queue, and one is
486
inserted by the rules of the invariant. If two adjacent elements on
487
the top of the queue are not equal, biggest one is unique, because all
488
elements in each tree are unique. Action is applied only to unique
491
void *old_key= top->key;
493
read next key from the cache or from the file and push it to the
494
queue; this gives new top.
496
top->key+= key_length;
497
if (--top->mem_count)
502
else /* next piece should be read */
504
/* save old_key not to overwrite it in read_to_buffer */
505
memcpy(save_key_buff, old_key, key_length);
506
old_key= save_key_buff;
507
bytes_read= read_to_buffer(file, top, key_length);
508
if (bytes_read == (uint32_t) (-1))
510
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
511
{ /* in read_to_buffer */
518
Tree for old 'top' element is empty: remove it from the queue.
524
/* new top has been obtained; if old top is unique, apply the action */
525
if (compare(compare_arg, old_key, top->key))
527
if (walk_action(old_key, 1, walk_action_arg))
532
Applying walk_action to the tail of the last tree: this is safe because
533
either we had only one tree in the beginning, either we work with the
534
last tree in the queue.
540
if (walk_action(top->key, 1, walk_action_arg))
542
top->key+= key_length;
544
while (--top->mem_count);
545
bytes_read= read_to_buffer(file, top, key_length);
546
if (bytes_read == (uint32_t) (-1))
198
576
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
200
return tree_walk(&tree, action, walk_action_arg, left_root_right);
579
unsigned char *merge_buffer;
581
if (elements == 0) /* the whole tree is in memory */
582
return tree_walk(&tree, action, walk_action_arg, left_root_right);
584
/* flush current tree to the file to have some memory for merge buffer */
587
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
589
if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
591
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
592
(BUFFPEK *) file_ptrs.buffer,
593
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
594
action, walk_action_arg,
595
tree.compare, tree.custom_arg, &file);
596
free((char*) merge_buffer);
208
605
bool Unique::get(Table *table)
210
table->sort.found_records= elements+tree.elements_in_tree;
607
SORTPARAM sort_param;
608
table->sort.found_records=elements+tree.elements_in_tree;
212
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
213
malloc(size * tree.elements_in_tree)))
610
if (my_b_tell(&file) == 0)
215
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
216
this, left_root_right);
612
/* Whole tree is in memory; Don't use disk if you don't need to */
613
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
614
malloc(size * tree.elements_in_tree)))
616
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
617
this, left_root_right);
219
/* Not enough memory */
621
/* Not enough memory; Save the result to file && free memory used by tree */
625
IO_CACHE *outfile=table->sort.io_cache;
626
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
627
uint32_t maxbuffer= file_ptrs.elements - 1;
628
unsigned char *sort_buffer;
632
/* Open cached file if it isn't open */
633
outfile=table->sort.io_cache= new IO_CACHE;
634
memset(outfile, 0, sizeof(IO_CACHE));
636
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
638
reinit_io_cache(outfile, WRITE_CACHE, 0L, 0, 0);
640
memset(&sort_param, 0, sizeof(sort_param));
641
sort_param.max_rows= elements;
642
sort_param.sort_form=table;
643
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
645
sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
646
sort_param.not_killable=1;
648
if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
649
sort_param.sort_length)))
651
sort_param.unique_buff= sort_buffer+(sort_param.keys*
652
sort_param.sort_length);
654
sort_param.compare= (qsort2_cmp) buffpek_compare;
655
sort_param.cmp_context.key_compare= tree.compare;
656
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
658
/* Merge the buffers to one file, removing duplicates */
659
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
661
if (flush_io_cache(&file) ||
662
reinit_io_cache(&file,READ_CACHE,0L,0,0))
664
if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
665
file_ptr, file_ptr+maxbuffer,0))
671
if (flush_io_cache(outfile))
674
/* Setup io_cache for reading */
675
save_pos=outfile->pos_in_file;
676
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
678
outfile->end_of_file=save_pos;
223
} /* namespace drizzled */