30
30
deletes in disk order.
33
#include <drizzled/server_includes.h>
37
int unique_write_to_file(uchar* key,
38
element_count count __attribute__((unused)),
39
#include "drizzled/sql_sort.h"
40
#include "drizzled/session.h"
41
#include "drizzled/sql_list.h"
42
#include "drizzled/internal/iocache.h"
44
#if defined(CMATH_NAMESPACE)
45
using namespace CMATH_NAMESPACE;
48
using namespace drizzled;
52
int unique_write_to_file(unsigned char* key, uint32_t,
44
58
when tree implementation chooses to store pointer to key in TREE_ELEMENT
45
59
(instead of storing the element itself there)
47
return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
61
return my_b_write(unique->file, key, unique->size) ? 1 : 0;
50
int unique_write_to_ptrs(uchar* key,
51
element_count count __attribute__((unused)),
64
int unique_write_to_ptrs(unsigned char* key,
65
uint32_t, Unique *unique)
54
67
memcpy(unique->record_pointers, key, unique->size);
55
68
unique->record_pointers+=unique->size;
59
72
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
60
uint size_arg, uint64_t max_in_memory_size_arg)
61
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
73
uint32_t size_arg, size_t max_in_memory_size_arg)
74
: max_in_memory_size(max_in_memory_size_arg),
75
file(static_cast<IO_CACHE *>(memory::sql_calloc(sizeof(IO_CACHE)))),
64
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
80
init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, false,
65
81
NULL, comp_func_fixed_arg);
66
82
/* If the following fail's the next add will also fail */
67
83
my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
71
87
max_elements= (ulong) (max_in_memory_size /
72
88
ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
73
VOID(open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
89
open_cached_file(file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
126
142
total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
129
static double get_merge_buffers_cost(uint *buff_elems __attribute__((unused)),
131
uint *first, uint *last)
145
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
146
uint32_t *first, uint32_t *last)
133
uint total_buf_elems= 0;
134
for (uint *pbuf= first; pbuf <= last; pbuf++)
148
uint32_t total_buf_elems= 0;
149
for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
135
150
total_buf_elems+= *pbuf;
136
151
*last= total_buf_elems;
170
185
Cost of merge in disk seeks.
173
static double get_merge_many_buffs_cost(uint *buffer,
174
uint maxbuffer, uint max_n_elems,
175
uint last_n_elems, int elem_size)
188
static double get_merge_many_buffs_cost(uint32_t *buffer,
189
uint32_t maxbuffer, uint32_t max_n_elems,
190
uint32_t last_n_elems, int elem_size)
178
193
double total_cost= 0.0;
179
uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
194
uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
182
197
Set initial state: first maxbuffer sequences contain max_n_elems elements
263
278
these will be random seeks.
266
double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
267
uint64_t max_in_memory_size)
281
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
282
size_t max_in_memory_size)
269
284
ulong max_elements_in_tree;
270
285
ulong last_tree_elems;
327
342
BUFFPEK file_ptr;
328
343
elements+= tree.elements_in_tree;
329
344
file_ptr.count=tree.elements_in_tree;
330
file_ptr.file_pos=my_b_tell(&file);
345
file_ptr.file_pos=my_b_tell(file);
332
347
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
333
348
(void*) this, left_root_right) ||
334
insert_dynamic(&file_ptrs, (uchar*) &file_ptr))
349
insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
336
351
delete_tree(&tree);
358
373
reset_dynamic(&file_ptrs);
359
reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
374
reinit_io_cache(file, WRITE_CACHE, 0L, 0, 1);
373
static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
390
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
375
392
BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
376
393
return ctx->key_compare(ctx->key_compare_arg,
377
*((uchar **) key_ptr1), *((uchar **)key_ptr2));
394
*((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
402
The comparison function object, passed to a priority_queue in merge_walk()
403
as its sort function parameter.
406
class buffpek_compare_functor
408
qsort_cmp2 key_compare;
409
void *key_compare_arg;
411
buffpek_compare_functor(qsort_cmp2 in_key_compare, void *in_compare_arg)
412
: key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
413
inline bool operator()(const BUFFPEK *i, const BUFFPEK *j)
415
return key_compare(key_compare_arg,
417
static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
418
uint key_length, BUFFPEK *begin, BUFFPEK *end,
454
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
455
uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
419
456
tree_walk_action walk_action, void *walk_action_arg,
420
457
qsort_cmp2 compare, void *compare_arg,
423
BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
425
460
if (end <= begin ||
426
merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
427
init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
428
buffpek_compare, &compare_context))
461
merge_buffer_size < (ulong) (key_length * (end - begin + 1)))
463
priority_queue<BUFFPEK *, vector<BUFFPEK *>, buffpek_compare_functor >
464
queue(buffpek_compare_functor(compare, compare_arg));
430
465
/* we need space for one key when a piece of merge buffer is re-read */
431
466
merge_buffer_size-= key_length;
432
uchar *save_key_buff= merge_buffer + merge_buffer_size;
433
uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
467
unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
468
uint32_t max_key_count_per_piece= (uint32_t) (merge_buffer_size/(end-begin) /
435
470
/* if piece_size is aligned reuse_freed_buffer will always hit */
436
uint piece_size= max_key_count_per_piece * key_length;
437
uint bytes_read; /* to hold return value of read_to_buffer */
471
uint32_t piece_size= max_key_count_per_piece * key_length;
472
uint32_t bytes_read; /* to hold return value of read_to_buffer */
448
483
top->base= merge_buffer + (top - begin) * piece_size;
449
484
top->max_keys= max_key_count_per_piece;
450
485
bytes_read= read_to_buffer(file, top, key_length);
451
if (bytes_read == (uint) (-1))
486
if (bytes_read == (uint32_t) (-1))
453
488
assert(bytes_read);
454
queue_insert(&queue, (uchar *) top);
456
top= (BUFFPEK *) queue_top(&queue);
457
while (queue.elements > 1)
492
while (queue.size() > 1)
460
495
Every iteration one element is removed from the queue, and one is
471
506
top->key+= key_length;
472
507
if (--top->mem_count)
473
queue_replaced(&queue);
474
512
else /* next piece should be read */
476
514
/* save old_key not to overwrite it in read_to_buffer */
477
515
memcpy(save_key_buff, old_key, key_length);
478
516
old_key= save_key_buff;
479
517
bytes_read= read_to_buffer(file, top, key_length);
480
if (bytes_read == (uint) (-1))
518
if (bytes_read == (uint32_t) (-1))
482
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
483
queue_replaced(&queue); /* in read_to_buffer */
520
else if (bytes_read > 0) /* top->key, top->mem_count are reset */
521
{ /* in read_to_buffer */
487
Tree for old 'top' element is empty: remove it from the queue and
488
give all its memory to the nearest tree.
528
Tree for old 'top' element is empty: remove it from the queue.
490
queue_remove(&queue, 0);
491
reuse_freed_buff(&queue, top, key_length);
494
top= (BUFFPEK *) queue_top(&queue);
495
534
/* new top has been obtained; if old top is unique, apply the action */
496
535
if (compare(compare_arg, old_key, top->key))
515
554
while (--top->mem_count);
516
555
bytes_read= read_to_buffer(file, top, key_length);
517
if (bytes_read == (uint) (-1))
556
if (bytes_read == (uint32_t) (-1))
520
559
while (bytes_read);
523
delete_queue(&queue);
548
586
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
589
unsigned char *merge_buffer;
553
591
if (elements == 0) /* the whole tree is in memory */
554
592
return tree_walk(&tree, action, walk_action_arg, left_root_right);
556
594
/* flush current tree to the file to have some memory for merge buffer */
559
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
597
if (flush_io_cache(file) || reinit_io_cache(file, READ_CACHE, 0L, 0, 0))
561
if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
599
if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
563
601
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
564
602
(BUFFPEK *) file_ptrs.buffer,
565
603
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
566
604
action, walk_action_arg,
567
tree.compare, tree.custom_arg, &file);
568
my_free((char*) merge_buffer, MYF(0));
605
tree.compare, tree.custom_arg, file);
606
free((char*) merge_buffer);
573
Modify the TABLE element so that when one calls init_records()
611
Modify the Table element so that when one calls init_records()
574
612
the rows will be read in priority order.
577
bool Unique::get(TABLE *table)
615
bool Unique::get(Table *table)
579
617
SORTPARAM sort_param;
580
618
table->sort.found_records=elements+tree.elements_in_tree;
582
if (my_b_tell(&file) == 0)
620
if (my_b_tell(file) == 0)
584
622
/* Whole tree is in memory; Don't use disk if you don't need to */
585
if ((record_pointers=table->sort.record_pointers= (uchar*)
586
my_malloc(size * tree.elements_in_tree, MYF(0))))
623
if ((record_pointers=table->sort.record_pointers= (unsigned char*)
624
malloc(size * tree.elements_in_tree)))
588
626
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
589
627
this, left_root_right);
597
635
IO_CACHE *outfile=table->sort.io_cache;
598
636
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
599
uint maxbuffer= file_ptrs.elements - 1;
637
uint32_t maxbuffer= file_ptrs.elements - 1;
638
unsigned char *sort_buffer;
601
639
my_off_t save_pos;
604
642
/* Open cached file if it isn't open */
605
outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
643
outfile=table->sort.io_cache= new IO_CACHE;
644
memset(outfile, 0, sizeof(IO_CACHE));
608
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
646
if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
610
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
648
reinit_io_cache(outfile, WRITE_CACHE, 0L, 0, 0);
612
650
memset(&sort_param, 0, sizeof(sort_param));
613
651
sort_param.max_rows= elements;
614
652
sort_param.sort_form=table;
615
653
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
617
sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
655
sort_param.keys= (uint32_t) (max_in_memory_size / sort_param.sort_length);
618
656
sort_param.not_killable=1;
620
if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
621
sort_param.sort_length,
658
if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
659
sort_param.sort_length)))
624
661
sort_param.unique_buff= sort_buffer+(sort_param.keys*
625
662
sort_param.sort_length);
629
666
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
631
668
/* Merge the buffers to one file, removing duplicates */
632
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
634
if (flush_io_cache(&file) ||
635
reinit_io_cache(&file,READ_CACHE,0L,0,0))
637
if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
669
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,file))
671
if (flush_io_cache(file) ||
672
reinit_io_cache(file,READ_CACHE,0L,0,0))
674
if (merge_buffers(&sort_param, file, outfile, sort_buffer, file_ptr,
638
675
file_ptr, file_ptr+maxbuffer,0))
643
681
if (flush_io_cache(outfile))