~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/uniques.cc

  • Committer: Monty Taylor
  • Date: 2009-01-29 19:04:39 UTC
  • mto: (779.3.29 devel)
  • mto: This revision was merged to the branch mainline in revision 823.
  • Revision ID: mordred@inaugust.com-20090129190439-vfr95s6gaudjacm7
Add timegm which is missing on Solaris.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Function to handle quick removal of duplicates
21
21
  The basic idea is as follows:
22
22
 
23
23
  Store first all strings in a binary tree, ignoring duplicates.
 
24
  When the tree uses more memory than 'max_heap_table_size',
 
25
  write the tree (in sorted order) out to disk and start with a new tree.
 
26
  When all data has been generated, merge the trees (removing any found
 
27
  duplicates).
24
28
 
25
29
  The unique entries will be returned in sort order, to ensure that we do the
26
30
  deletes in disk order.
27
31
*/
28
32
 
29
 
#include "config.h"
30
 
 
31
 
#include <math.h>
32
 
 
33
 
#include <queue>
34
 
 
35
 
#include "drizzled/sql_sort.h"
36
 
#include "drizzled/session.h"
37
 
#include "drizzled/sql_list.h"
38
 
#include "drizzled/internal/iocache.h"
 
33
#include <drizzled/server_includes.h>
 
34
#include <drizzled/sql_sort.h>
 
35
#include <drizzled/session.h>
 
36
#include CMATH_H
39
37
 
40
38
#if defined(CMATH_NAMESPACE)
41
39
using namespace CMATH_NAMESPACE;
42
40
#endif
43
41
 
44
 
using namespace std;
45
42
 
46
 
namespace drizzled
 
43
int unique_write_to_file(unsigned char* key, element_count,
 
44
                         Unique *unique)
47
45
{
 
46
  /*
 
47
    Use unique->size (size of element stored in the tree) and not
 
48
    unique->tree.size_of_element. The latter is different from unique->size
 
49
    when tree implementation chooses to store pointer to key in TREE_ELEMENT
 
50
    (instead of storing the element itself there)
 
51
  */
 
52
  return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
 
53
}
48
54
 
49
55
int unique_write_to_ptrs(unsigned char* key,
50
 
                         uint32_t, Unique *unique)
 
56
                         element_count, Unique *unique)
51
57
{
52
58
  memcpy(unique->record_pointers, key, unique->size);
53
59
  unique->record_pointers+=unique->size;
56
62
 
57
63
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
58
64
               uint32_t size_arg, size_t max_in_memory_size_arg)
59
 
  : max_in_memory_size(max_in_memory_size_arg),
60
 
    size(size_arg),
61
 
    elements(0)
 
65
  :max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
62
66
{
63
 
  // Second element is max size for memory use in unique sort
64
 
  init_tree(&tree, 0, 0, size, comp_func, false,
 
67
  my_b_clear(&file);
 
68
  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
65
69
            NULL, comp_func_fixed_arg);
 
70
  /* If the following fail's the next add will also fail */
 
71
  my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
 
72
  /*
 
73
    If you change the following, change it in get_max_elements function, too.
 
74
  */
 
75
  max_elements= (ulong) (max_in_memory_size /
 
76
                         ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
 
77
  open_cached_file(&file, drizzle_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
 
78
                   MYF(MY_WME));
66
79
}
67
80
 
68
81
 
88
101
 
89
102
 
90
103
/*
 
104
  Calculate cost of merge_buffers function call for given sequence of
 
105
  input stream lengths and store the number of rows in result stream in *last.
 
106
 
 
107
  SYNOPSIS
 
108
    get_merge_buffers_cost()
 
109
      buff_elems  Array of #s of elements in buffers
 
110
      elem_size   Size of element stored in buffer
 
111
      first       Pointer to first merged element size
 
112
      last        Pointer to last merged element size
 
113
 
 
114
  RETURN
 
115
    Cost of merge_buffers operation in disk seeks.
 
116
 
 
117
  NOTES
 
118
    It is assumed that no rows are eliminated during merge.
 
119
    The cost is calculated as
 
120
 
 
121
      cost(read_and_write) + cost(merge_comparisons).
 
122
 
 
123
    All bytes in the sequences is read and written back during merge so cost
 
124
    of disk io is 2*elem_size*total_buf_elems/IO_SIZE (2 is for read + write)
 
125
 
 
126
    For comparisons cost calculations we assume that all merged sequences have
 
127
    the same length, so each of total_buf_size elements will be added to a sort
 
128
    heap with (n_buffers-1) elements. This gives the comparison cost:
 
129
 
 
130
      total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID;
 
131
*/
 
132
 
 
133
static double get_merge_buffers_cost(uint32_t *, uint32_t elem_size,
 
134
                                     uint32_t *first, uint32_t *last)
 
135
{
 
136
  uint32_t total_buf_elems= 0;
 
137
  for (uint32_t *pbuf= first; pbuf <= last; pbuf++)
 
138
    total_buf_elems+= *pbuf;
 
139
  *last= total_buf_elems;
 
140
 
 
141
  int n_buffers= last - first + 1;
 
142
 
 
143
  /* Using log2(n)=log(n)/log(2) formula */
 
144
  return 2*((double)total_buf_elems*elem_size) / IO_SIZE +
 
145
     total_buf_elems*log((double) n_buffers) / (TIME_FOR_COMPARE_ROWID * M_LN2);
 
146
}
 
147
 
 
148
 
 
149
/*
 
150
  Calculate cost of merging buffers into one in Unique::get, i.e. calculate
 
151
  how long (in terms of disk seeks) the two calls
 
152
    merge_many_buffs(...);
 
153
    merge_buffers(...);
 
154
  will take.
 
155
 
 
156
  SYNOPSIS
 
157
    get_merge_many_buffs_cost()
 
158
      buffer        buffer space for temporary data, at least
 
159
                    Unique::get_cost_calc_buff_size bytes
 
160
      maxbuffer     # of full buffers
 
161
      max_n_elems   # of elements in first maxbuffer buffers
 
162
      last_n_elems  # of elements in last buffer
 
163
      elem_size     size of buffer element
 
164
 
 
165
  NOTES
 
166
    maxbuffer+1 buffers are merged, where first maxbuffer buffers contain
 
167
    max_n_elems elements each and last buffer contains last_n_elems elements.
 
168
 
 
169
    The current implementation does a dumb simulation of merge_many_buffs
 
170
    function actions.
 
171
 
 
172
  RETURN
 
173
    Cost of merge in disk seeks.
 
174
*/
 
175
 
 
176
static double get_merge_many_buffs_cost(uint32_t *buffer,
 
177
                                        uint32_t maxbuffer, uint32_t max_n_elems,
 
178
                                        uint32_t last_n_elems, int elem_size)
 
179
{
 
180
  register int i;
 
181
  double total_cost= 0.0;
 
182
  uint32_t *buff_elems= buffer; /* #s of elements in each of merged sequences */
 
183
 
 
184
  /*
 
185
    Set initial state: first maxbuffer sequences contain max_n_elems elements
 
186
    each, last sequence contains last_n_elems elements.
 
187
  */
 
188
  for (i = 0; i < (int)maxbuffer; i++)
 
189
    buff_elems[i]= max_n_elems;
 
190
  buff_elems[maxbuffer]= last_n_elems;
 
191
 
 
192
  /*
 
193
    Do it exactly as merge_many_buff function does, calling
 
194
    get_merge_buffers_cost to get cost of merge_buffers.
 
195
  */
 
196
  if (maxbuffer >= MERGEBUFF2)
 
197
  {
 
198
    while (maxbuffer >= MERGEBUFF2)
 
199
    {
 
200
      uint32_t lastbuff= 0;
 
201
      for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
 
202
      {
 
203
        total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
 
204
                                           buff_elems + i,
 
205
                                           buff_elems + i + MERGEBUFF-1);
 
206
        lastbuff++;
 
207
      }
 
208
      total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
 
209
                                         buff_elems + i,
 
210
                                         buff_elems + maxbuffer);
 
211
      maxbuffer= lastbuff;
 
212
    }
 
213
  }
 
214
 
 
215
  /* Simulate final merge_buff call. */
 
216
  total_cost += get_merge_buffers_cost(buff_elems, elem_size,
 
217
                                       buff_elems, buff_elems + maxbuffer);
 
218
  return total_cost;
 
219
}
 
220
 
 
221
 
 
222
/*
91
223
  Calculate cost of using Unique for processing nkeys elements of size
92
224
  key_size using max_in_memory_size memory.
93
225
 
122
254
 
123
255
      Approximate value of log2(N!) is calculated by log2_n_fact function.
124
256
 
125
 
       
126
 
      (The Next two are historical, we do all unique operations in memory or fail)
127
 
 
128
257
    2. Cost of merging.
129
258
      If only one tree is created by Unique no merging will be necessary.
130
259
      Otherwise, we model execution of merge_many_buff function and count
137
266
      these will be random seeks.
138
267
*/
139
268
 
140
 
double Unique::get_use_cost(uint32_t *, uint32_t nkeys, uint32_t key_size,
141
 
                            size_t max_in_memory_size_arg)
 
269
double Unique::get_use_cost(uint32_t *buffer, uint32_t nkeys, uint32_t key_size,
 
270
                            size_t max_in_memory_size)
142
271
{
143
272
  ulong max_elements_in_tree;
144
273
  ulong last_tree_elems;
 
274
  int   n_full_trees; /* number of trees in unique - 1 */
145
275
  double result;
146
276
 
147
 
  max_elements_in_tree= ((ulong) max_in_memory_size_arg /
 
277
  max_elements_in_tree= ((ulong) max_in_memory_size /
148
278
                         ALIGN_SIZE(sizeof(TREE_ELEMENT)+key_size));
149
279
 
 
280
  n_full_trees=    nkeys / max_elements_in_tree;
150
281
  last_tree_elems= nkeys % max_elements_in_tree;
151
282
 
152
283
  /* Calculate cost of creating trees */
153
284
  result= 2*log2_n_fact(last_tree_elems + 1.0);
 
285
  if (n_full_trees)
 
286
    result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0);
154
287
  result /= TIME_FOR_COMPARE_ROWID;
155
288
 
 
289
 
 
290
  if (!n_full_trees)
 
291
    return result;
 
292
 
 
293
  /*
 
294
    There is more then one tree and merging is necessary.
 
295
    First, add cost of writing all trees to disk, assuming that all disk
 
296
    writes are sequential.
 
297
  */
 
298
  result += DISK_SEEK_BASE_COST * n_full_trees *
 
299
              ceil(((double) key_size)*max_elements_in_tree / IO_SIZE);
 
300
  result += DISK_SEEK_BASE_COST * ceil(((double) key_size)*last_tree_elems / IO_SIZE);
 
301
 
 
302
  /* Cost of merge */
 
303
  double merge_cost= get_merge_many_buffs_cost(buffer, n_full_trees,
 
304
                                               max_elements_in_tree,
 
305
                                               last_tree_elems, key_size);
 
306
  if (merge_cost < 0.0)
 
307
    return merge_cost;
 
308
 
 
309
  result += merge_cost;
 
310
  /*
 
311
    Add cost of reading the resulting sequence, assuming there were no
 
312
    duplicate elements.
 
313
  */
 
314
  result += ceil((double)key_size*nkeys/IO_SIZE);
 
315
 
156
316
  return result;
157
317
}
158
318
 
159
319
Unique::~Unique()
160
320
{
161
 
  delete_tree(&tree);
 
321
  close_cached_file(&file);
 
322
  delete_tree(&tree);
 
323
  delete_dynamic(&file_ptrs);
 
324
}
 
325
 
 
326
 
 
327
    /* Write tree to disk; clear tree */
 
328
bool Unique::flush()
 
329
{
 
330
  BUFFPEK file_ptr;
 
331
  elements+= tree.elements_in_tree;
 
332
  file_ptr.count=tree.elements_in_tree;
 
333
  file_ptr.file_pos=my_b_tell(&file);
 
334
 
 
335
  if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
 
336
                (void*) this, left_root_right) ||
 
337
      insert_dynamic(&file_ptrs, (unsigned char*) &file_ptr))
 
338
    return 1;
 
339
  delete_tree(&tree);
 
340
  return 0;
162
341
}
163
342
 
164
343
 
165
344
/*
166
 
  Clear the tree.
 
345
  Clear the tree and the file.
167
346
  You must call reset() if you want to reuse Unique after walk().
168
347
*/
169
348
 
171
350
Unique::reset()
172
351
{
173
352
  reset_tree(&tree);
174
 
  assert(elements == 0);
 
353
  /*
 
354
    If elements != 0, some trees were stored in the file (see how
 
355
    flush() works). Note, that we can not count on my_b_tell(&file) == 0
 
356
    here, because it can return 0 right after walk(), and walk() does not
 
357
    reset any Unique member.
 
358
  */
 
359
  if (elements)
 
360
  {
 
361
    reset_dynamic(&file_ptrs);
 
362
    reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
 
363
  }
 
364
  elements= 0;
 
365
}
 
366
 
 
367
/*
 
368
  The comparison function, passed to queue_init() in merge_walk() and in
 
369
  merge_buffers() when the latter is called from Uniques::get() must
 
370
  use comparison function of Uniques::tree, but compare members of struct
 
371
  BUFFPEK.
 
372
*/
 
373
 
 
374
#ifdef __cplusplus
 
375
extern "C" {
 
376
#endif
 
377
 
 
378
static int buffpek_compare(void *arg, unsigned char *key_ptr1, unsigned char *key_ptr2)
 
379
{
 
380
  BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg;
 
381
  return ctx->key_compare(ctx->key_compare_arg,
 
382
                          *((unsigned char **) key_ptr1), *((unsigned char **)key_ptr2));
 
383
}
 
384
 
 
385
#ifdef __cplusplus
 
386
}
 
387
#endif
 
388
 
 
389
 
 
390
 
 
391
/*
 
392
  DESCRIPTION
 
393
 
 
394
    Function is very similar to merge_buffers, but instead of writing sorted
 
395
    unique keys to the output file, it invokes walk_action for each key.
 
396
    This saves I/O if you need to pass through all unique keys only once.
 
397
 
 
398
  SYNOPSIS
 
399
    merge_walk()
 
400
  All params are 'IN' (but see comment for begin, end):
 
401
    merge_buffer       buffer to perform cached piece-by-piece loading
 
402
                       of trees; initially the buffer is empty
 
403
    merge_buffer_size  size of merge_buffer. Must be aligned with
 
404
                       key_length
 
405
    key_length         size of tree element; key_length * (end - begin)
 
406
                       must be less or equal than merge_buffer_size.
 
407
    begin              pointer to BUFFPEK struct for the first tree.
 
408
    end                pointer to BUFFPEK struct for the last tree;
 
409
                       end > begin and [begin, end) form a consecutive
 
410
                       range. BUFFPEKs structs in that range are used and
 
411
                       overwritten in merge_walk().
 
412
    walk_action        element visitor. Action is called for each unique
 
413
                       key.
 
414
    walk_action_arg    argument to walk action. Passed to it on each call.
 
415
    compare            elements comparison function
 
416
    compare_arg        comparison function argument
 
417
    file               file with all trees dumped. Trees in the file
 
418
                       must contain sorted unique values. Cache must be
 
419
                       initialized in read mode.
 
420
  RETURN VALUE
 
421
    0     ok
 
422
    <> 0  error
 
423
*/
 
424
 
 
425
static bool merge_walk(unsigned char *merge_buffer, ulong merge_buffer_size,
 
426
                       uint32_t key_length, BUFFPEK *begin, BUFFPEK *end,
 
427
                       tree_walk_action walk_action, void *walk_action_arg,
 
428
                       qsort_cmp2 compare, void *compare_arg,
 
429
                       IO_CACHE *file)
 
430
{
 
431
  BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
 
432
  QUEUE queue;
 
433
  if (end <= begin ||
 
434
      merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
 
435
      init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
 
436
                 buffpek_compare, &compare_context))
 
437
    return 1;
 
438
  /* we need space for one key when a piece of merge buffer is re-read */
 
439
  merge_buffer_size-= key_length;
 
440
  unsigned char *save_key_buff= merge_buffer + merge_buffer_size;
 
441
  uint32_t max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
 
442
                                        key_length);
 
443
  /* if piece_size is aligned reuse_freed_buffer will always hit */
 
444
  uint32_t piece_size= max_key_count_per_piece * key_length;
 
445
  uint32_t bytes_read;               /* to hold return value of read_to_buffer */
 
446
  BUFFPEK *top;
 
447
  int res= 1;
 
448
  /*
 
449
    Invariant: queue must contain top element from each tree, until a tree
 
450
    is not completely walked through.
 
451
    Here we're forcing the invariant, inserting one element from each tree
 
452
    to the queue.
 
453
  */
 
454
  for (top= begin; top != end; ++top)
 
455
  {
 
456
    top->base= merge_buffer + (top - begin) * piece_size;
 
457
    top->max_keys= max_key_count_per_piece;
 
458
    bytes_read= read_to_buffer(file, top, key_length);
 
459
    if (bytes_read == (uint) (-1))
 
460
      goto end;
 
461
    assert(bytes_read);
 
462
    queue_insert(&queue, (unsigned char *) top);
 
463
  }
 
464
  top= (BUFFPEK *) queue_top(&queue);
 
465
  while (queue.elements > 1)
 
466
  {
 
467
    /*
 
468
      Every iteration one element is removed from the queue, and one is
 
469
      inserted by the rules of the invariant. If two adjacent elements on
 
470
      the top of the queue are not equal, biggest one is unique, because all
 
471
      elements in each tree are unique. Action is applied only to unique
 
472
      elements.
 
473
    */
 
474
    void *old_key= top->key;
 
475
    /*
 
476
      read next key from the cache or from the file and push it to the
 
477
      queue; this gives new top.
 
478
    */
 
479
    top->key+= key_length;
 
480
    if (--top->mem_count)
 
481
      queue_replaced(&queue);
 
482
    else /* next piece should be read */
 
483
    {
 
484
      /* save old_key not to overwrite it in read_to_buffer */
 
485
      memcpy(save_key_buff, old_key, key_length);
 
486
      old_key= save_key_buff;
 
487
      bytes_read= read_to_buffer(file, top, key_length);
 
488
      if (bytes_read == (uint) (-1))
 
489
        goto end;
 
490
      else if (bytes_read > 0)      /* top->key, top->mem_count are reset */
 
491
        queue_replaced(&queue);     /* in read_to_buffer */
 
492
      else
 
493
      {
 
494
        /*
 
495
          Tree for old 'top' element is empty: remove it from the queue and
 
496
          give all its memory to the nearest tree.
 
497
        */
 
498
        queue_remove(&queue, 0);
 
499
        reuse_freed_buff(&queue, top, key_length);
 
500
      }
 
501
    }
 
502
    top= (BUFFPEK *) queue_top(&queue);
 
503
    /* new top has been obtained; if old top is unique, apply the action */
 
504
    if (compare(compare_arg, old_key, top->key))
 
505
    {
 
506
      if (walk_action(old_key, 1, walk_action_arg))
 
507
        goto end;
 
508
    }
 
509
  }
 
510
  /*
 
511
    Applying walk_action to the tail of the last tree: this is safe because
 
512
    either we had only one tree in the beginning, either we work with the
 
513
    last tree in the queue.
 
514
  */
 
515
  do
 
516
  {
 
517
    do
 
518
    {
 
519
      if (walk_action(top->key, 1, walk_action_arg))
 
520
        goto end;
 
521
      top->key+= key_length;
 
522
    }
 
523
    while (--top->mem_count);
 
524
    bytes_read= read_to_buffer(file, top, key_length);
 
525
    if (bytes_read == (uint) (-1))
 
526
      goto end;
 
527
  }
 
528
  while (bytes_read);
 
529
  res= 0;
 
530
end:
 
531
  delete_queue(&queue);
 
532
  return res;
175
533
}
176
534
 
177
535
 
187
545
  SYNOPSIS
188
546
    Unique:walk()
189
547
  All params are 'IN':
190
 
    action  function-visitor, typed in include/tree.h
 
548
    action  function-visitor, typed in include/my_tree.h
191
549
            function is called for each unique element
192
550
    arg     argument for visitor, which is passed to it on each call
193
551
  RETURN VALUE
197
555
 
198
556
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
199
557
{
200
 
  return tree_walk(&tree, action, walk_action_arg, left_root_right);
 
558
  int res;
 
559
  unsigned char *merge_buffer;
 
560
 
 
561
  if (elements == 0)                       /* the whole tree is in memory */
 
562
    return tree_walk(&tree, action, walk_action_arg, left_root_right);
 
563
 
 
564
  /* flush current tree to the file to have some memory for merge buffer */
 
565
  if (flush())
 
566
    return 1;
 
567
  if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
 
568
    return 1;
 
569
  if (!(merge_buffer= (unsigned char *) malloc(max_in_memory_size)))
 
570
    return 1;
 
571
  res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
 
572
                  (BUFFPEK *) file_ptrs.buffer,
 
573
                  (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
 
574
                  action, walk_action_arg,
 
575
                  tree.compare, tree.custom_arg, &file);
 
576
  free((char*) merge_buffer);
 
577
  return res;
201
578
}
202
579
 
203
580
/*
207
584
 
208
585
bool Unique::get(Table *table)
209
586
{
210
 
  table->sort.found_records= elements+tree.elements_in_tree;
 
587
  SORTPARAM sort_param;
 
588
  table->sort.found_records=elements+tree.elements_in_tree;
211
589
 
212
 
  if ((record_pointers=table->sort.record_pointers= (unsigned char*)
213
 
       malloc(size * tree.elements_in_tree)))
 
590
  if (my_b_tell(&file) == 0)
214
591
  {
215
 
    (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
216
 
                     this, left_root_right);
217
 
    return 0;
 
592
    /* Whole tree is in memory;  Don't use disk if you don't need to */
 
593
    if ((record_pointers=table->sort.record_pointers= (unsigned char*)
 
594
         malloc(size * tree.elements_in_tree)))
 
595
    {
 
596
      (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
 
597
                       this, left_root_right);
 
598
      return 0;
 
599
    }
218
600
  }
219
 
  /* Not enough memory */
220
 
  return 1;
 
601
  /* Not enough memory; Save the result to file && free memory used by tree */
 
602
  if (flush())
 
603
    return 1;
 
604
 
 
605
  IO_CACHE *outfile=table->sort.io_cache;
 
606
  BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
 
607
  uint32_t maxbuffer= file_ptrs.elements - 1;
 
608
  unsigned char *sort_buffer;
 
609
  my_off_t save_pos;
 
610
  bool error=1;
 
611
 
 
612
      /* Open cached file if it isn't open */
 
613
  outfile=table->sort.io_cache= new IO_CACHE;
 
614
  memset(outfile, 0, sizeof(IO_CACHE));
 
615
 
 
616
  if (!outfile || (! my_b_inited(outfile) && open_cached_file(outfile,drizzle_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME))))
 
617
    return 1;
 
618
  reinit_io_cache(outfile, WRITE_CACHE, 0L, 0, 0);
 
619
 
 
620
  memset(&sort_param, 0, sizeof(sort_param));
 
621
  sort_param.max_rows= elements;
 
622
  sort_param.sort_form=table;
 
623
  sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
 
624
    size;
 
625
  sort_param.keys= (uint) (max_in_memory_size / sort_param.sort_length);
 
626
  sort_param.not_killable=1;
 
627
 
 
628
  if (!(sort_buffer=(unsigned char*) malloc((sort_param.keys+1) *
 
629
                                            sort_param.sort_length)))
 
630
    return 1;
 
631
  sort_param.unique_buff= sort_buffer+(sort_param.keys*
 
632
                                       sort_param.sort_length);
 
633
 
 
634
  sort_param.compare= (qsort2_cmp) buffpek_compare;
 
635
  sort_param.cmp_context.key_compare= tree.compare;
 
636
  sort_param.cmp_context.key_compare_arg= tree.custom_arg;
 
637
 
 
638
  /* Merge the buffers to one file, removing duplicates */
 
639
  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
 
640
    goto err;
 
641
  if (flush_io_cache(&file) ||
 
642
      reinit_io_cache(&file,READ_CACHE,0L,0,0))
 
643
    goto err;
 
644
  if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
 
645
                    file_ptr, file_ptr+maxbuffer,0))
 
646
    goto err;
 
647
  error=0;
 
648
err:
 
649
  if (sort_buffer)
 
650
    free(sort_buffer);
 
651
  if (flush_io_cache(outfile))
 
652
    error=1;
 
653
 
 
654
  /* Setup io_cache for reading */
 
655
  save_pos=outfile->pos_in_file;
 
656
  if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
 
657
    error=1;
 
658
  outfile->end_of_file=save_pos;
 
659
  return error;
221
660
}
222
 
 
223
 
} /* namespace drizzled */