33
33
#include <drizzled/server_includes.h>
37
int unique_write_to_file(uchar* key,
38
element_count count __attribute__((unused)),
34
#include <drizzled/sql_sort.h>
35
#include <drizzled/session.h>
38
#if defined(CMATH_NAMESPACE)
39
using namespace CMATH_NAMESPACE;
45
int unique_write_to_file(unsigned char* key, element_count,
47
54
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
50
int unique_write_to_ptrs(uchar* key,
51
element_count count __attribute__((unused)),
57
int unique_write_to_ptrs(unsigned char* key,
58
element_count, Unique *unique)
54
60
memcpy(unique->record_pointers, key, unique->size);
55
61
unique->record_pointers+=unique->size;
59
65
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
60
uint size_arg, uint64_t max_in_memory_size_arg)
66
uint32_t size_arg, size_t max_in_memory_size_arg)
61
67
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
71
77
max_elements= (ulong) (max_in_memory_size /
72
78
ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
73
VOID(open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
79
open_cached_file(&file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
126
132
total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
129
static double get_merge_buffers_cost(uint *buff_elems __attribute__((unused)),
131
uint *first, uint *last)
135
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
136
uint32_t *first, uint32_t *last)
133
uint total_buf_elems= 0;
134
for (uint *pbuf= first; pbuf <= last; pbuf++)
138
uint32_t total_buf_elems= 0;
139
for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
135
140
total_buf_elems+= *pbuf;
136
141
*last= total_buf_elems;
170
175
Cost of merge in disk seeks.
173
static double get_merge_many_buffs_cost(uint *buffer,
174
uint maxbuffer, uint max_n_elems,
175
uint last_n_elems, int elem_size)
178
static double get_merge_many_buffs_cost(uint32_t *buffer,
179
uint32_t maxbuffer, uint32_t max_n_elems,
180
uint32_t last_n_elems, int elem_size)
178
183
double total_cost= 0.0;
179
uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
184
uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
182
187
Set initial state: first maxbuffer sequences contain max_n_elems elements
263
268
these will be random seeks.
266
double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
267
uint64_t max_in_memory_size)
271
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
272
size_t max_in_memory_size)
269
274
ulong max_elements_in_tree;
270
275
ulong last_tree_elems;
332
337
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
333
338
(void*) this, left_root_right) ||
334
insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
339
insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
336
341
delete_tree(&tree);
373
static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
380
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
375
382
BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
376
383
return ctx->key_compare(ctx->key_compare_arg,
377
*((uchar **) key_ptr1), *((uchar **)key_ptr2));
384
*((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
392
The comparison function object, passed to a priority_queue in merge_walk()
393
as its sort function parameter.
396
class buffpek_compare_functor
398
qsort_cmp2 key_compare;
399
void *key_compare_arg;
401
buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
402
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
403
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
405
return key_compare(key_compare_arg,
417
static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
418
uint key_length, BUFFPEK *begin, BUFFPEK *end,
444
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
445
uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
419
446
tree_walk_action walk_action, void *walk_action_arg,
420
447
qsort_cmp2 compare, void *compare_arg,
423
BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
425
450
if (end <= begin ||
426
merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
427
init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
428
buffpek_compare, &compare_context))
451
merge_buffer_size < (ulong) (key_length * (end - begin + 1)))
453
priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
454
queue(buffpek_compare_functor(compare, compare_arg));
430
455
/* we need space for one key when a piece of merge buffer is re-read */
431
456
merge_buffer_size-= key_length;
432
uchar *save_key_buff= merge_buffer + merge_buffer_size;
433
uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
457
unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
458
uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
435
460
/* if piece_size is aligned reuse_freed_buffer will always hit */
436
uint piece_size= max_key_count_per_piece * key_length;
437
uint bytes_read; /* to hold return value of read_to_buffer */
461
uint32_t piece_size= max_key_count_per_piece * key_length;
462
uint32_t bytes_read; /* to hold return value of read_to_buffer */
448
473
top->base= merge_buffer + (top - begin) * piece_size;
449
474
top->max_keys= max_key_count_per_piece;
450
475
bytes_read= read_to_buffer(file, top, key_length);
451
if (bytes_read == (uint) (-1))
476
if (bytes_read == (uint32_t) (-1))
453
478
assert(bytes_read);
454
queue_insert(&queue, (uchar *) top);
456
top= (BUFFPEK *) queue_top(&queue);
457
while (queue.elements > 1)
482
while (queue.size() > 1)
460
485
Every iteration one element is removed from the queue, and one is
471
496
top->key+= key_length;
472
497
if (--top->mem_count)
473
queue_replaced(&queue);
474
502
else /* next piece should be read */
476
504
/* save old_key not to overwrite it in read_to_buffer */
477
505
memcpy(save_key_buff, old_key, key_length);
478
506
old_key= save_key_buff;
479
507
bytes_read= read_to_buffer(file, top, key_length);
480
if (bytes_read == (uint) (-1))
508
if (bytes_read == (uint32_t) (-1))
482
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
483
queue_replaced(&queue); /* in read_to_buffer */
510
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
511
{ /* in read_to_buffer */
487
Tree for old 'top' element is empty: remove it from the queue and
488
give all its memory to the nearest tree.
518
Tree for old 'top' element is empty: remove it from the queue.
490
queue_remove(&queue, 0);
491
reuse_freed_buff(&queue, top, key_length);
494
top= (BUFFPEK *) queue_top(&queue);
495
524
/* new top has been obtained; if old top is unique, apply the action */
496
525
if (compare(compare_arg, old_key, top->key))
515
544
while (--top->mem_count);
516
545
bytes_read= read_to_buffer(file, top, key_length);
517
if (bytes_read == (uint) (-1))
546
if (bytes_read == (uint32_t) (-1))
520
549
while (bytes_read);
523
delete_queue(&queue);
548
576
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
579
unsigned char *merge_buffer;
553
581
if (elements == 0) /* the whole tree is in memory */
554
582
return tree_walk(&tree, action, walk_action_arg, left_root_right);
559
587
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
561
if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
589
if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
563
591
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
564
592
(BUFFPEK *) file_ptrs.buffer,
565
593
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
566
594
action, walk_action_arg,
567
595
tree.compare, tree.custom_arg, &file);
568
my_free((char*) merge_buffer, MYF(0));
596
free((char*) merge_buffer);
582
610
if (my_b_tell(&file) == 0)
584
612
/* Whole tree is in memory; Don't use disk if you don't need to */
585
if ((record_pointers=table->sort.record_pointers= (uchar*)
586
my_malloc(size * tree.elements_in_tree, MYF(0))))
613
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
614
malloc(size * tree.elements_in_tree)))
588
616
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
589
617
this, left_root_right);
597
625
IO_CACHE *outfile=table->sort.io_cache;
598
626
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
599
uint maxbuffer= file_ptrs.elements - 1;
627
uint32_t maxbuffer= file_ptrs.elements - 1;
628
unsigned char *sort_buffer;
601
629
my_off_t save_pos;
604
632
/* Open cached file if it isn't open */
605
outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
633
outfile=table->sort.io_cache= new IO_CACHE;
634
memset(outfile, 0, sizeof(IO_CACHE));
608
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
636
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
610
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
638
reinit_io_cache(outfile, WRITE_CACHE, 0L, 0, 0);
612
640
memset(&sort_param, 0, sizeof(sort_param));
613
641
sort_param.max_rows= elements;
614
642
sort_param.sort_form=table;
615
643
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
617
sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
645
sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
618
646
sort_param.not_killable=1;
620
if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
621
sort_param.sort_length,
648
if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
649
sort_param.sort_length)))
624
651
sort_param.unique_buff= sort_buffer+(sort_param.keys*
625
652
sort_param.sort_length);