21
21
The basic idea is as follows:
23
23
Store first all strings in a binary tree, ignoring duplicates.
24
When the tree uses more memory than 'max_heap_table_size',
25
write the tree (in sorted order) out to disk and start with a new tree.
26
When all data has been generated, merge the trees (removing any found
25
29
The unique entries will be returned in sort order, to ensure that we do the
26
30
deletes in disk order.
35
#include <drizzled/sql_sort.h>
36
#include <drizzled/session.h>
37
#include <drizzled/sql_list.h>
38
#include <drizzled/internal/iocache.h>
39
#include <drizzled/unique.h>
41
#if defined(CMATH_NAMESPACE)
42
using namespace CMATH_NAMESPACE;
33
#include "mysql_priv.h"
37
int unique_write_to_file(uchar* key,
38
element_count count __attribute__((__unused__)),
42
Use unique->size (size of element stored in the tree) and not
43
unique->tree.size_of_element. The latter is different from unique->size
44
when tree implementation chooses to store pointer to key in TREE_ELEMENT
45
(instead of storing the element itself there)
47
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
50
int unique_write_to_ptrs(unsigned char* key,
51
uint32_t, Unique *unique)
50
int unique_write_to_ptrs(uchar* key,
51
element_count count __attribute__((__unused__)),
53
54
memcpy(unique->record_pointers, key, unique->size);
54
55
unique->record_pointers+=unique->size;
58
59
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
59
uint32_t size_arg, size_t max_in_memory_size_arg)
60
: max_in_memory_size(max_in_memory_size_arg),
60
uint size_arg, uint64_t max_in_memory_size_arg)
61
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
64
// Second element is max size for memory use in unique sort
65
init_tree(&tree, 0, 0, size, comp_func, false,
64
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
66
65
NULL, comp_func_fixed_arg);
66
/* If the following fail's the next add will also fail */
67
my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
69
If you change the following, change it in get_max_elements function, too.
71
max_elements= (ulong) (max_in_memory_size /
72
ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
73
VOID(open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
100
Calculate cost of merge_buffers function call for given sequence of
101
input stream lengths and store the number of rows in result stream in *last.
104
get_merge_buffers_cost()
105
buff_elems Array of #s of elements in buffers
106
elem_size Size of element stored in buffer
107
first Pointer to first merged element size
108
last Pointer to last merged element size
111
Cost of merge_buffers operation in disk seeks.
114
It is assumed that no rows are eliminated during merge.
115
The cost is calculated as
117
cost(read_and_write) + cost(merge_comparisons).
119
All bytes in the sequences is read and written back during merge so cost
120
of disk io is 2*elem_size*total_buf_elems/IO_SIZE (2 is for read + write)
122
For comparisons cost calculations we assume that all merged sequences have
123
the same length, so each of total_buf_size elements will be added to a sort
124
heap with (n_buffers-1) elements. This gives the comparison cost:
126
total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
129
static double get_merge_buffers_cost(uint *buff_elems __attribute__((__unused__)),
131
uint *first, uint *last)
133
uint total_buf_elems= 0;
134
for (uint *pbuf= first; pbuf <= last; pbuf++)
135
total_buf_elems+= *pbuf;
136
*last= total_buf_elems;
138
int n_buffers= last - first + 1;
140
/* Using log2(n)=log(n)/log(2) formula */
141
return 2*((double)total_buf_elems*elem_size) / IO_SIZE +
142
total_buf_elems*log((double) n_buffers) / (TIME_FOR_COMPARE_ROWID * M_LN2);
147
Calculate cost of merging buffers into one in Unique::get, i.e. calculate
148
how long (in terms of disk seeks) the two calls
149
merge_many_buffs(...);
154
get_merge_many_buffs_cost()
155
buffer buffer space for temporary data, at least
156
Unique::get_cost_calc_buff_size bytes
157
maxbuffer # of full buffers
158
max_n_elems # of elements in first maxbuffer buffers
159
last_n_elems # of elements in last buffer
160
elem_size size of buffer element
163
maxbuffer+1 buffers are merged, where first maxbuffer buffers contain
164
max_n_elems elements each and last buffer contains last_n_elems elements.
166
The current implementation does a dumb simulation of merge_many_buffs
170
Cost of merge in disk seeks.
173
static double get_merge_many_buffs_cost(uint *buffer,
174
uint maxbuffer, uint max_n_elems,
175
uint last_n_elems, int elem_size)
178
double total_cost= 0.0;
179
uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
182
Set initial state: first maxbuffer sequences contain max_n_elems elements
183
each, last sequence contains last_n_elems elements.
185
for (i = 0; i < (int)maxbuffer; i++)
186
buff_elems[i]= max_n_elems;
187
buff_elems[maxbuffer]= last_n_elems;
190
Do it exactly as merge_many_buff function does, calling
191
get_merge_buffers_cost to get cost of merge_buffers.
193
if (maxbuffer >= MERGEBUFF2)
195
while (maxbuffer >= MERGEBUFF2)
198
for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
200
total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
202
buff_elems + i + MERGEBUFF-1);
205
total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
207
buff_elems + maxbuffer);
212
/* Simulate final merge_buff call. */
213
total_cost += get_merge_buffers_cost(buff_elems, elem_size,
214
buff_elems, buff_elems + maxbuffer);
92
220
Calculate cost of using Unique for processing nkeys elements of size
93
221
key_size using max_in_memory_size memory.
138
263
these will be random seeks.
141
double Unique::get_use_cost(uint32_t *, uint32_t nkeys, uint32_t key_size,
142
size_t max_in_memory_size_arg)
266
double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
267
uint64_t max_in_memory_size)
144
269
ulong max_elements_in_tree;
145
270
ulong last_tree_elems;
271
int n_full_trees; /* number of trees in unique - 1 */
148
max_elements_in_tree= ((ulong) max_in_memory_size_arg /
274
max_elements_in_tree= ((ulong) max_in_memory_size /
149
275
ALIGN_SIZE(sizeof(TREE_ELEMENT)+key_size));
277
n_full_trees= nkeys / max_elements_in_tree;
151
278
last_tree_elems= nkeys % max_elements_in_tree;
153
280
/* Calculate cost of creating trees */
154
281
result= 2*log2_n_fact(last_tree_elems + 1.0);
283
result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0);
155
284
result /= TIME_FOR_COMPARE_ROWID;
291
There is more then one tree and merging is necessary.
292
First, add cost of writing all trees to disk, assuming that all disk
293
writes are sequential.
295
result += DISK_SEEK_BASE_COST * n_full_trees *
296
ceil(((double) key_size)*max_elements_in_tree / IO_SIZE);
297
result += DISK_SEEK_BASE_COST * ceil(((double) key_size)*last_tree_elems / IO_SIZE);
300
double merge_cost= get_merge_many_buffs_cost(buffer, n_full_trees,
301
max_elements_in_tree,
302
last_tree_elems, key_size);
303
if (merge_cost < 0.0)
306
result += merge_cost;
308
Add cost of reading the resulting sequence, assuming there were no
311
result += ceil((double)key_size*nkeys/IO_SIZE);
160
316
Unique::~Unique()
318
close_cached_file(&file);
320
delete_dynamic(&file_ptrs);
324
/* Write tree to disk; clear tree */
328
elements+= tree.elements_in_tree;
329
file_ptr.count=tree.elements_in_tree;
330
file_ptr.file_pos=my_b_tell(&file);
332
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
333
(void*) this, left_root_right) ||
334
insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
342
Clear the tree and the file.
168
343
You must call reset() if you want to reuse Unique after walk().
174
349
reset_tree(&tree);
175
assert(elements == 0);
351
If elements != 0, some trees were stored in the file (see how
352
flush() works). Note, that we can not count on my_b_tell(&file) == 0
353
here, because it can return 0 right after walk(), and walk() does not
354
reset any Unique member.
358
reset_dynamic(&file_ptrs);
359
reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
365
The comparison function, passed to queue_init() in merge_walk() and in
366
merge_buffers() when the latter is called from Uniques::get() must
367
use comparison function of Uniques::tree, but compare members of struct
373
static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
375
BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
376
return ctx->key_compare(ctx->key_compare_arg,
377
*((uchar **) key_ptr1), *((uchar **)key_ptr2));
386
Function is very similar to merge_buffers, but instead of writing sorted
387
unique keys to the output file, it invokes walk_action for each key.
388
This saves I/O if you need to pass through all unique keys only once.
392
All params are 'IN' (but see comment for begin, end):
393
merge_buffer buffer to perform cached piece-by-piece loading
394
of trees; initially the buffer is empty
395
merge_buffer_size size of merge_buffer. Must be aligned with
397
key_length size of tree element; key_length * (end - begin)
398
must be less or equal than merge_buffer_size.
399
begin pointer to BUFFPEK struct for the first tree.
400
end pointer to BUFFPEK struct for the last tree;
401
end > begin and [begin, end) form a consecutive
402
range. BUFFPEKs structs in that range are used and
403
overwritten in merge_walk().
404
walk_action element visitor. Action is called for each unique
406
walk_action_arg argument to walk action. Passed to it on each call.
407
compare elements comparison function
408
compare_arg comparison function argument
409
file file with all trees dumped. Trees in the file
410
must contain sorted unique values. Cache must be
411
initialized in read mode.
417
static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
418
uint key_length, BUFFPEK *begin, BUFFPEK *end,
419
tree_walk_action walk_action, void *walk_action_arg,
420
qsort_cmp2 compare, void *compare_arg,
423
BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
426
merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
427
init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
428
buffpek_compare, &compare_context))
430
/* we need space for one key when a piece of merge buffer is re-read */
431
merge_buffer_size-= key_length;
432
uchar *save_key_buff= merge_buffer + merge_buffer_size;
433
uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
435
/* if piece_size is aligned reuse_freed_buffer will always hit */
436
uint piece_size= max_key_count_per_piece * key_length;
437
uint bytes_read; /* to hold return value of read_to_buffer */
441
Invariant: queue must contain top element from each tree, until a tree
442
is not completely walked through.
443
Here we're forcing the invariant, inserting one element from each tree
446
for (top= begin; top != end; ++top)
448
top->base= merge_buffer + (top - begin) * piece_size;
449
top->max_keys= max_key_count_per_piece;
450
bytes_read= read_to_buffer(file, top, key_length);
451
if (bytes_read == (uint) (-1))
454
queue_insert(&queue, (uchar *) top);
456
top= (BUFFPEK *) queue_top(&queue);
457
while (queue.elements > 1)
460
Every iteration one element is removed from the queue, and one is
461
inserted by the rules of the invariant. If two adjacent elements on
462
the top of the queue are not equal, biggest one is unique, because all
463
elements in each tree are unique. Action is applied only to unique
466
void *old_key= top->key;
468
read next key from the cache or from the file and push it to the
469
queue; this gives new top.
471
top->key+= key_length;
472
if (--top->mem_count)
473
queue_replaced(&queue);
474
else /* next piece should be read */
476
/* save old_key not to overwrite it in read_to_buffer */
477
memcpy(save_key_buff, old_key, key_length);
478
old_key= save_key_buff;
479
bytes_read= read_to_buffer(file, top, key_length);
480
if (bytes_read == (uint) (-1))
482
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
483
queue_replaced(&queue); /* in read_to_buffer */
487
Tree for old 'top' element is empty: remove it from the queue and
488
give all its memory to the nearest tree.
490
queue_remove(&queue, 0);
491
reuse_freed_buff(&queue, top, key_length);
494
top= (BUFFPEK *) queue_top(&queue);
495
/* new top has been obtained; if old top is unique, apply the action */
496
if (compare(compare_arg, old_key, top->key))
498
if (walk_action(old_key, 1, walk_action_arg))
503
Applying walk_action to the tail of the last tree: this is safe because
504
either we had only one tree in the beginning, either we work with the
505
last tree in the queue.
511
if (walk_action(top->key, 1, walk_action_arg))
513
top->key+= key_length;
515
while (--top->mem_count);
516
bytes_read= read_to_buffer(file, top, key_length);
517
if (bytes_read == (uint) (-1))
523
delete_queue(&queue);
199
548
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
201
return tree_walk(&tree, action, walk_action_arg, left_root_right);
553
if (elements == 0) /* the whole tree is in memory */
554
return tree_walk(&tree, action, walk_action_arg, left_root_right);
556
/* flush current tree to the file to have some memory for merge buffer */
559
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
561
if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
563
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
564
(BUFFPEK *) file_ptrs.buffer,
565
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
566
action, walk_action_arg,
567
tree.compare, tree.custom_arg, &file);
568
my_free((char*) merge_buffer, MYF(0));
205
Modify the Table element so that when one calls init_records()
573
Modify the TABLE element so that when one calls init_records()
206
574
the rows will be read in priority order.
209
bool Unique::get(Table *table)
577
bool Unique::get(TABLE *table)
211
table->sort.found_records= elements+tree.elements_in_tree;
579
SORTPARAM sort_param;
580
table->sort.found_records=elements+tree.elements_in_tree;
213
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
214
malloc(size * tree.elements_in_tree)))
582
if (my_b_tell(&file) == 0)
216
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
217
this, left_root_right);
584
/* Whole tree is in memory; Don't use disk if you don't need to */
585
if ((record_pointers=table->sort.record_pointers= (uchar*)
586
my_malloc(size * tree.elements_in_tree, MYF(0))))
588
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
589
this, left_root_right);
220
/* Not enough memory */
593
/* Not enough memory; Save the result to file && free memory used by tree */
597
IO_CACHE *outfile=table->sort.io_cache;
598
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
599
uint maxbuffer= file_ptrs.elements - 1;
604
/* Open cached file if it isn't open */
605
outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
608
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
610
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
612
bzero((char*) &sort_param,sizeof(sort_param));
613
sort_param.max_rows= elements;
614
sort_param.sort_form=table;
615
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
617
sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
618
sort_param.not_killable=1;
620
if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
621
sort_param.sort_length,
624
sort_param.unique_buff= sort_buffer+(sort_param.keys*
625
sort_param.sort_length);
627
sort_param.compare= (qsort2_cmp) buffpek_compare;
628
sort_param.cmp_context.key_compare= tree.compare;
629
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
631
/* Merge the buffers to one file, removing duplicates */
632
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
634
if (flush_io_cache(&file) ||
635
reinit_io_cache(&file,READ_CACHE,0L,0,0))
637
if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
638
file_ptr, file_ptr+maxbuffer,0))
643
if (flush_io_cache(outfile))
646
/* Setup io_cache for reading */
647
save_pos=outfile->pos_in_file;
648
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
650
outfile->end_of_file=save_pos;
224
} /* namespace drizzled */