30
30
deletes in disk order.
39
#include "drizzled/sql_sort.h"
40
#include "drizzled/session.h"
41
#include "drizzled/sql_list.h"
42
#include "drizzled/internal/iocache.h"
44
#if defined(CMATH_NAMESPACE)
45
using namespace CMATH_NAMESPACE;
53
int unique_write_to_file(unsigned char* key, uint32_t,
33
#include "mysql_priv.h"
37
int unique_write_to_file(uchar* key,
38
element_count count __attribute__((__unused__)),
59
44
when tree implementation chooses to store pointer to key in TREE_ELEMENT
60
45
(instead of storing the element itself there)
62
return my_b_write(unique->file, key, unique->size) ? 1 : 0;
47
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
65
int unique_write_to_ptrs(unsigned char* key,
66
uint32_t, Unique *unique)
50
int unique_write_to_ptrs(uchar* key,
51
element_count count __attribute__((__unused__)),
68
54
memcpy(unique->record_pointers, key, unique->size);
69
55
unique->record_pointers+=unique->size;
73
59
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
74
uint32_t size_arg, size_t max_in_memory_size_arg)
75
: max_in_memory_size(max_in_memory_size_arg),
76
file(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
60
uint size_arg, ulonglong max_in_memory_size_arg)
61
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
81
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, false,
64
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
82
65
NULL, comp_func_fixed_arg);
83
66
/* If the following fail's the next add will also fail */
84
67
my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
88
71
max_elements= (ulong) (max_in_memory_size /
89
72
ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
90
open_cached_file(file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
73
VOID(open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
143
126
total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
146
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
147
uint32_t *first, uint32_t *last)
129
static double get_merge_buffers_cost(uint *buff_elems __attribute__((__unused__)),
131
uint *first, uint *last)
149
uint32_t total_buf_elems= 0;
150
for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
133
uint total_buf_elems= 0;
134
for (uint *pbuf= first; pbuf <= last; pbuf++)
151
135
total_buf_elems+= *pbuf;
152
136
*last= total_buf_elems;
186
170
Cost of merge in disk seeks.
189
static double get_merge_many_buffs_cost(uint32_t *buffer,
190
uint32_t maxbuffer, uint32_t max_n_elems,
191
uint32_t last_n_elems, int elem_size)
173
static double get_merge_many_buffs_cost(uint *buffer,
174
uint maxbuffer, uint max_n_elems,
175
uint last_n_elems, int elem_size)
194
178
double total_cost= 0.0;
195
uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
179
uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
198
182
Set initial state: first maxbuffer sequences contain max_n_elems elements
279
263
these will be random seeks.
282
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
283
size_t max_in_memory_size)
266
double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
267
ulonglong max_in_memory_size)
285
269
ulong max_elements_in_tree;
286
270
ulong last_tree_elems;
343
327
BUFFPEK file_ptr;
344
328
elements+= tree.elements_in_tree;
345
329
file_ptr.count=tree.elements_in_tree;
346
file_ptr.file_pos=my_b_tell(file);
330
file_ptr.file_pos=my_b_tell(&file);
348
332
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
349
333
(void*) this, left_root_right) ||
350
insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
334
insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
352
336
delete_tree(&tree);
374
358
reset_dynamic(&file_ptrs);
375
reinit_io_cache(file, internal::WRITE_CACHE, 0L, 0, 1);
359
reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
391
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
373
static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
393
375
BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
394
376
return ctx->key_compare(ctx->key_compare_arg,
395
*((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
403
The comparison function object, passed to a priority_queue in merge_walk()
404
as its sort function parameter.
407
class buffpek_compare_functor
409
qsort_cmp2 key_compare;
410
void *key_compare_arg;
412
buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
413
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
414
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
416
return key_compare(key_compare_arg,
377
*((uchar **) key_ptr1), *((uchar **)key_ptr2));
455
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
456
uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
417
static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
418
uint key_length, BUFFPEK *begin, BUFFPEK *end,
457
419
tree_walk_action walk_action, void *walk_action_arg,
458
420
qsort_cmp2 compare, void *compare_arg,
459
internal::IO_CACHE *file)
423
BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
461
425
if (end <= begin ||
462
merge_buffer_size < (ulong) (key_length * (end - begin + 1)))
426
merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
427
init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
428
buffpek_compare, &compare_context))
464
priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
465
queue(buffpek_compare_functor(compare, compare_arg));
466
430
/* we need space for one key when a piece of merge buffer is re-read */
467
431
merge_buffer_size-= key_length;
468
unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
469
uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
432
uchar *save_key_buff= merge_buffer + merge_buffer_size;
433
uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
471
435
/* if piece_size is aligned reuse_freed_buffer will always hit */
472
uint32_t piece_size= max_key_count_per_piece * key_length;
473
uint32_t bytes_read; /* to hold return value of read_to_buffer */
436
uint piece_size= max_key_count_per_piece * key_length;
437
uint bytes_read; /* to hold return value of read_to_buffer */
484
448
top->base= merge_buffer + (top - begin) * piece_size;
485
449
top->max_keys= max_key_count_per_piece;
486
450
bytes_read= read_to_buffer(file, top, key_length);
487
if (bytes_read == (uint32_t) (-1))
451
if (bytes_read == (uint) (-1))
489
453
assert(bytes_read);
454
queue_insert(&queue, (uchar *) top);
493
while (queue.size() > 1)
456
top= (BUFFPEK *) queue_top(&queue);
457
while (queue.elements > 1)
496
460
Every iteration one element is removed from the queue, and one is
507
471
top->key+= key_length;
508
472
if (--top->mem_count)
473
queue_replaced(&queue);
513
474
else /* next piece should be read */
515
476
/* save old_key not to overwrite it in read_to_buffer */
516
477
memcpy(save_key_buff, old_key, key_length);
517
478
old_key= save_key_buff;
518
479
bytes_read= read_to_buffer(file, top, key_length);
519
if (bytes_read == (uint32_t) (-1))
480
if (bytes_read == (uint) (-1))
521
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
522
{ /* in read_to_buffer */
482
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
483
queue_replaced(&queue); /* in read_to_buffer */
529
Tree for old 'top' element is empty: remove it from the queue.
487
Tree for old 'top' element is empty: remove it from the queue and
488
give all its memory to the nearest tree.
490
queue_remove(&queue, 0);
491
reuse_freed_buff(&queue, top, key_length);
494
top= (BUFFPEK *) queue_top(&queue);
535
495
/* new top has been obtained; if old top is unique, apply the action */
536
496
if (compare(compare_arg, old_key, top->key))
555
515
while (--top->mem_count);
556
516
bytes_read= read_to_buffer(file, top, key_length);
557
if (bytes_read == (uint32_t) (-1))
517
if (bytes_read == (uint) (-1))
560
520
while (bytes_read);
523
delete_queue(&queue);
578
539
All params are 'IN':
579
action function-visitor, typed in include/tree.h
540
action function-visitor, typed in include/my_tree.h
580
541
function is called for each unique element
581
542
arg argument for visitor, which is passed to it on each call
587
548
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
590
unsigned char *merge_buffer;
592
553
if (elements == 0) /* the whole tree is in memory */
593
554
return tree_walk(&tree, action, walk_action_arg, left_root_right);
595
556
/* flush current tree to the file to have some memory for merge buffer */
598
if (flush_io_cache(file) || reinit_io_cache(file, internal::READ_CACHE, 0L, 0, 0))
559
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
600
if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
561
if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
602
563
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
603
564
(BUFFPEK *) file_ptrs.buffer,
604
565
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
605
566
action, walk_action_arg,
606
tree.compare, tree.custom_arg, file);
607
free((char*) merge_buffer);
567
tree.compare, tree.custom_arg, &file);
568
my_free((char*) merge_buffer, MYF(0));
612
Modify the Table element so that when one calls init_records()
573
Modify the TABLE element so that when one calls init_records()
613
574
the rows will be read in priority order.
616
bool Unique::get(Table *table)
577
bool Unique::get(TABLE *table)
618
579
SORTPARAM sort_param;
619
580
table->sort.found_records=elements+tree.elements_in_tree;
621
if (my_b_tell(file) == 0)
582
if (my_b_tell(&file) == 0)
623
584
/* Whole tree is in memory; Don't use disk if you don't need to */
624
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
625
malloc(size * tree.elements_in_tree)))
585
if ((record_pointers=table->sort.record_pointers= (uchar*)
586
my_malloc(size * tree.elements_in_tree, MYF(0))))
627
588
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
628
589
this, left_root_right);
636
internal::IO_CACHE *outfile=table->sort.io_cache;
597
IO_CACHE *outfile=table->sort.io_cache;
637
598
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
638
uint32_t maxbuffer= file_ptrs.elements - 1;
639
unsigned char *sort_buffer;
640
internal::my_off_t save_pos;
599
uint maxbuffer= file_ptrs.elements - 1;
643
604
/* Open cached file if it isn't open */
644
outfile=table->sort.io_cache= new internal::IO_CACHE;
645
memset(outfile, 0, sizeof(internal::IO_CACHE));
605
outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
647
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
608
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
649
reinit_io_cache(outfile, internal::WRITE_CACHE, 0L, 0, 0);
610
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
651
memset(&sort_param, 0, sizeof(sort_param));
612
bzero((char*) &sort_param,sizeof(sort_param));
652
613
sort_param.max_rows= elements;
653
614
sort_param.sort_form=table;
654
615
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
656
sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
617
sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
657
618
sort_param.not_killable=1;
659
if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
660
sort_param.sort_length)))
620
if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
621
sort_param.sort_length,
662
624
sort_param.unique_buff= sort_buffer+(sort_param.keys*
663
625
sort_param.sort_length);
667
629
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
669
631
/* Merge the buffers to one file, removing duplicates */
670
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,file))
672
if (flush_io_cache(file) ||
673
reinit_io_cache(file,internal::READ_CACHE,0L,0,0))
675
if (merge_buffers(&sort_param, file, outfile, sort_buffer, file_ptr,
632
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
634
if (flush_io_cache(&file) ||
635
reinit_io_cache(&file,READ_CACHE,0L,0,0))
637
if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
676
638
file_ptr, file_ptr+maxbuffer,0))
682
643
if (flush_io_cache(outfile))
685
646
/* Setup io_cache for reading */
686
647
save_pos=outfile->pos_in_file;
687
if (reinit_io_cache(outfile,internal::READ_CACHE,0L,0,0))
648
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
689
650
outfile->end_of_file=save_pos;
693
} /* namespace drizzled */