30
30
deletes in disk order.
39
#include "drizzled/sql_sort.h"
40
#include "drizzled/session.h"
41
#include "drizzled/sql_list.h"
42
#include "drizzled/internal/iocache.h"
44
#if defined(CMATH_NAMESPACE)
45
using namespace CMATH_NAMESPACE;
53
int unique_write_to_file(unsigned char* key, uint32_t,
33
#include <drizzled/server_includes.h>
37
int unique_write_to_file(unsigned char* key,
38
element_count count __attribute__((unused)),
59
44
when tree implementation chooses to store pointer to key in TREE_ELEMENT
60
45
(instead of storing the element itself there)
62
return my_b_write(unique->file, key, unique->size) ? 1 : 0;
47
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
65
50
int unique_write_to_ptrs(unsigned char* key,
66
uint32_t, Unique *unique)
51
element_count count __attribute__((unused)),
68
54
memcpy(unique->record_pointers, key, unique->size);
69
55
unique->record_pointers+=unique->size;
73
59
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
74
uint32_t size_arg, size_t max_in_memory_size_arg)
75
: max_in_memory_size(max_in_memory_size_arg),
76
file(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
60
uint32_t size_arg, uint64_t max_in_memory_size_arg)
61
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
81
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, false,
64
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
82
65
NULL, comp_func_fixed_arg);
83
66
/* If the following fail's the next add will also fail */
84
67
my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
88
71
max_elements= (ulong) (max_in_memory_size /
89
72
ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
90
open_cached_file(file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
73
open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
143
126
total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
146
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
129
static double get_merge_buffers_cost(uint32_t *buff_elems __attribute__((unused)),
147
131
uint32_t *first, uint32_t *last)
149
133
uint32_t total_buf_elems= 0;
282
266
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
283
size_t max_in_memory_size)
267
uint64_t max_in_memory_size)
285
269
ulong max_elements_in_tree;
286
270
ulong last_tree_elems;
343
327
BUFFPEK file_ptr;
344
328
elements+= tree.elements_in_tree;
345
329
file_ptr.count=tree.elements_in_tree;
346
file_ptr.file_pos=my_b_tell(file);
330
file_ptr.file_pos=my_b_tell(&file);
348
332
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
349
333
(void*) this, left_root_right) ||
374
358
reset_dynamic(&file_ptrs);
375
reinit_io_cache(file, internal::WRITE_CACHE, 0L, 0, 1);
359
reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
403
The comparison function object, passed to a priority_queue in merge_walk()
404
as its sort function parameter.
407
class buffpek_compare_functor
409
qsort_cmp2 key_compare;
410
void *key_compare_arg;
412
buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
413
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
414
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
416
return key_compare(key_compare_arg,
456
423
uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
457
424
tree_walk_action walk_action, void *walk_action_arg,
458
425
qsort_cmp2 compare, void *compare_arg,
459
internal::IO_CACHE *file)
428
BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
461
430
if (end <= begin ||
462
merge_buffer_size < (ulong) (key_length * (end - begin + 1)))
431
merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
432
init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
433
buffpek_compare, &compare_context))
464
priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
465
queue(buffpek_compare_functor(compare, compare_arg));
466
435
/* we need space for one key when a piece of merge buffer is re-read */
467
436
merge_buffer_size-= key_length;
468
437
unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
469
uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
438
uint32_t max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
471
440
/* if piece_size is aligned reuse_freed_buffer will always hit */
472
441
uint32_t piece_size= max_key_count_per_piece * key_length;
484
453
top->base= merge_buffer + (top - begin) * piece_size;
485
454
top->max_keys= max_key_count_per_piece;
486
455
bytes_read= read_to_buffer(file, top, key_length);
487
if (bytes_read == (uint32_t) (-1))
456
if (bytes_read == (uint) (-1))
489
458
assert(bytes_read);
459
queue_insert(&queue, (unsigned char *) top);
493
while (queue.size() > 1)
461
top= (BUFFPEK *) queue_top(&queue);
462
while (queue.elements > 1)
496
465
Every iteration one element is removed from the queue, and one is
507
476
top->key+= key_length;
508
477
if (--top->mem_count)
478
queue_replaced(&queue);
513
479
else /* next piece should be read */
515
481
/* save old_key not to overwrite it in read_to_buffer */
516
482
memcpy(save_key_buff, old_key, key_length);
517
483
old_key= save_key_buff;
518
484
bytes_read= read_to_buffer(file, top, key_length);
519
if (bytes_read == (uint32_t) (-1))
485
if (bytes_read == (uint) (-1))
521
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
522
{ /* in read_to_buffer */
487
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
488
queue_replaced(&queue); /* in read_to_buffer */
529
Tree for old 'top' element is empty: remove it from the queue.
492
Tree for old 'top' element is empty: remove it from the queue and
493
give all its memory to the nearest tree.
495
queue_remove(&queue, 0);
496
reuse_freed_buff(&queue, top, key_length);
499
top= (BUFFPEK *) queue_top(&queue);
535
500
/* new top has been obtained; if old top is unique, apply the action */
536
501
if (compare(compare_arg, old_key, top->key))
555
520
while (--top->mem_count);
556
521
bytes_read= read_to_buffer(file, top, key_length);
557
if (bytes_read == (uint32_t) (-1))
522
if (bytes_read == (uint) (-1))
560
525
while (bytes_read);
528
delete_queue(&queue);
578
544
All params are 'IN':
579
action function-visitor, typed in include/tree.h
545
action function-visitor, typed in include/my_tree.h
580
546
function is called for each unique element
581
547
arg argument for visitor, which is passed to it on each call
595
561
/* flush current tree to the file to have some memory for merge buffer */
598
if (flush_io_cache(file) || reinit_io_cache(file, internal::READ_CACHE, 0L, 0, 0))
564
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
600
if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
566
if (!(merge_buffer= (unsigned char *) my_malloc((ulong) max_in_memory_size, MYF(0))))
602
568
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
603
569
(BUFFPEK *) file_ptrs.buffer,
604
570
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
605
571
action, walk_action_arg,
606
tree.compare, tree.custom_arg, file);
572
tree.compare, tree.custom_arg, &file);
607
573
free((char*) merge_buffer);
618
584
SORTPARAM sort_param;
619
585
table->sort.found_records=elements+tree.elements_in_tree;
621
if (my_b_tell(file) == 0)
587
if (my_b_tell(&file) == 0)
623
589
/* Whole tree is in memory; Don't use disk if you don't need to */
624
590
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
625
malloc(size * tree.elements_in_tree)))
591
my_malloc(size * tree.elements_in_tree, MYF(0))))
627
593
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
628
594
this, left_root_right);
636
internal::IO_CACHE *outfile=table->sort.io_cache;
602
IO_CACHE *outfile=table->sort.io_cache;
637
603
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
638
604
uint32_t maxbuffer= file_ptrs.elements - 1;
639
605
unsigned char *sort_buffer;
640
internal::my_off_t save_pos;
643
609
/* Open cached file if it isn't open */
644
outfile=table->sort.io_cache= new internal::IO_CACHE;
645
memset(outfile, 0, sizeof(internal::IO_CACHE));
610
outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
647
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
613
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
649
reinit_io_cache(outfile, internal::WRITE_CACHE, 0L, 0, 0);
615
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
651
617
memset(&sort_param, 0, sizeof(sort_param));
652
618
sort_param.max_rows= elements;
653
619
sort_param.sort_form=table;
654
620
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
656
sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
622
sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
657
623
sort_param.not_killable=1;
659
if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
660
sort_param.sort_length)))
625
if (!(sort_buffer=(unsigned char*) my_malloc((sort_param.keys+1) *
626
sort_param.sort_length,
662
629
sort_param.unique_buff= sort_buffer+(sort_param.keys*
663
630
sort_param.sort_length);
667
634
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
669
636
/* Merge the buffers to one file, removing duplicates */
670
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,file))
672
if (flush_io_cache(file) ||
673
reinit_io_cache(file,internal::READ_CACHE,0L,0,0))
675
if (merge_buffers(&sort_param, file, outfile, sort_buffer, file_ptr,
637
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
639
if (flush_io_cache(&file) ||
640
reinit_io_cache(&file,READ_CACHE,0L,0,0))
642
if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
676
643
file_ptr, file_ptr+maxbuffer,0))
685
652
/* Setup io_cache for reading */
686
653
save_pos=outfile->pos_in_file;
687
if (reinit_io_cache(outfile,internal::READ_CACHE,0L,0,0))
654
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
689
656
outfile->end_of_file=save_pos;
693
} /* namespace drizzled */