~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

  • Committer: Olaf van der Spek
  • Date: 2011-07-04 19:11:47 UTC
  • mto: This revision was merged to the branch mainline in revision 2367.
  • Revision ID: olafvdspek@gmail.com-20110704191147-s99ojek811zi1fzj
RemoveĀ unusedĀ Name_resolution_context::error_reporter

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
 
50
#include <config.h>
51
51
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
54
 
#include "drizzled/drizzled.h"
55
 
#ifdef HAVE_AIOWAIT
56
 
#include "drizzled/error.h"
57
 
#include "drizzled/internal/aio_result.h"
58
 
static void my_aiowait(my_aio_result *result);
59
 
#endif
60
 
#include "drizzled/internal/iocache.h"
 
52
#include <drizzled/definitions.h>
 
53
#include <drizzled/error_t.h>
 
54
#include <drizzled/error.h>
 
55
#include <drizzled/internal/my_sys.h>
 
56
#include <drizzled/internal/m_string.h>
 
57
#include <drizzled/drizzled.h>
 
58
#include <drizzled/internal/iocache.h>
61
59
#include <errno.h>
62
60
#include <drizzled/util/test.h>
63
61
#include <stdlib.h>
70
68
namespace internal
71
69
{
72
70
 
73
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
 
71
static int _my_b_read(io_cache_st *info, unsigned char *Buffer, size_t Count);
 
72
static int _my_b_write(io_cache_st *info, const unsigned char *Buffer, size_t Count);
75
73
 
76
74
/**
77
75
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
 
76
 *   Lock appends for the io_cache_st if required (need_append_buffer_lock)   
79
77
 */
80
78
inline
81
 
static void lock_append_buffer(register st_io_cache *, int )
 
79
static void lock_append_buffer(io_cache_st *, int )
82
80
{
83
81
}
84
82
 
85
83
/**
86
84
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
 
85
 *   Unlock appends for the io_cache_st if required (need_append_buffer_lock)   
88
86
 */
89
87
inline
90
 
static void unlock_append_buffer(register st_io_cache *, int )
 
88
static void unlock_append_buffer(io_cache_st *, int )
91
89
{
92
90
}
93
91
 
114
112
 
115
113
/**
116
114
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
 
115
 *   Setup internal pointers inside io_cache_st
118
116
 * 
119
117
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
 
118
 *   This is called on automatically on init or reinit of io_cache_st
 
119
 *   It must be called externally if one moves or copies an io_cache_st object.
122
120
 * 
123
121
 * @param info Cache handler to setup
124
122
 */
125
 
void st_io_cache::setup_io_cache()
 
123
void io_cache_st::setup_io_cache()
126
124
{
127
125
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
126
  if (type == WRITE_CACHE)
138
136
}
139
137
 
140
138
 
141
 
void st_io_cache::init_functions()
 
139
void io_cache_st::init_functions()
142
140
{
143
141
  switch (type) {
144
142
  case READ_NET:
160
158
 
161
159
/**
162
160
 * @brief
163
 
 *   Initialize an st_io_cache object
 
161
 *   Initialize an io_cache_st object
164
162
 *
165
163
 * @param file File that should be associated with the handler
166
164
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
175
173
 * @retval 0 success
176
174
 * @retval # error
177
175
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
 
176
int io_cache_st::init_io_cache(int file_arg, size_t cachesize,
179
177
                               enum cache_type type_arg, my_off_t seek_offset,
180
178
                               bool use_async_io, myf cache_myflags)
181
179
{
239
237
  {
240
238
    /* Retry allocating memory in smaller blocks until we get one */
241
239
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
242
 
    for (;;)
 
240
    if (cachesize < min_cache)
 
241
      cachesize = min_cache;
 
242
    size_t buffer_block= cachesize;
 
243
    if (type_arg == READ_CACHE && not global_read_buffer.add(buffer_block))
243
244
    {
244
 
      size_t buffer_block;
245
 
      if (cachesize < min_cache)
246
 
        cachesize = min_cache;
247
 
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
255
 
           (unsigned char*) malloc(buffer_block)) != 0)
256
 
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
259
 
        break;                                  /* Enough memory found */
260
 
      }
261
 
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
267
 
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
 
      cachesize= (cachesize*3/4 & ~(min_cache-1));
 
245
      my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
 
246
      return 2;
271
247
    }
 
248
 
 
249
    buffer= (unsigned char*) malloc(buffer_block);
 
250
    write_buffer=buffer;
 
251
    alloced_buffer= true;
272
252
  }
273
253
 
274
254
  read_length=buffer_length=cachesize;
286
266
  error= 0;
287
267
  type= type_arg;
288
268
  init_functions();
289
 
#ifdef HAVE_AIOWAIT
290
 
  if (use_async_io && ! my_disable_async_io)
291
 
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
294
 
  }
295
 
  inited= aio_result.pending= 0;
296
 
#endif
297
269
  return 0;
298
270
}                                               /* init_io_cache */
299
271
 
300
 
        /* Wait until current request is ready */
301
 
 
302
 
#ifdef HAVE_AIOWAIT
303
 
static void my_aiowait(my_aio_result *result)
304
 
{
305
 
  if (result->pending)
306
 
  {
307
 
    struct aio_result_t *tmp;
308
 
    for (;;)
309
 
    {
310
 
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
311
 
      {
312
 
        if (errno == EINTR)
313
 
          continue;
314
 
        result->pending=0;                      /* Assume everythings is ok */
315
 
        break;
316
 
      }
317
 
      ((my_aio_result*) tmp)->pending=0;
318
 
      if ((my_aio_result*) tmp == result)
319
 
        break;
320
 
    }
321
 
  }
322
 
}
323
 
#endif
324
 
 
325
272
/**
326
273
 * @brief 
327
274
 *   Reset the cache
332
279
 *   If we are doing a reinit of a cache where we have the start of the file
333
280
 *   in the cache, we are reusing this memory without flushing it to disk.
334
281
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
 
282
bool io_cache_st::reinit_io_cache(enum cache_type type_arg,
336
283
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
 
284
                                  bool,
338
285
                                  bool clear_cache)
339
286
{
340
287
  /* One can't do reinit with the following types */
372
319
      write_pos=pos;
373
320
    else
374
321
      read_pos= pos;
375
 
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
377
 
#endif
378
322
  }
379
323
  else
380
324
  {
406
350
  error=0;
407
351
  init_functions();
408
352
 
409
 
#ifdef HAVE_AIOWAIT
410
 
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
413
 
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
416
 
  }
417
 
  inited= 0;
418
 
#else
419
 
  (void)use_async_io;
420
 
#endif
421
353
  return 0;
422
354
} /* reinit_io_cache */
423
355
 
435
367
 *   types than my_off_t unless you can be sure that their value fits.
436
368
 *   Same applies to differences of file offsets.
437
369
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
 
370
 * @param info io_cache_st pointer @param Buffer Buffer to retrieve count bytes
439
371
 * from file @param Count Number of bytes to read into Buffer
440
372
 * 
441
373
 * @retval 0 We succeeded in reading all data
442
374
 * @retval 1 Error: can't read requested characters
443
375
 */
444
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
376
static int _my_b_read(io_cache_st *info, unsigned char *Buffer, size_t Count)
445
377
{
446
378
  size_t length_local,diff_length,left_length, max_length;
447
379
  my_off_t pos_in_file_local;
458
390
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
391
 
460
392
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
393
    Whenever a function which operates on io_cache_st flushes/writes
 
394
    some part of the io_cache_st to disk it will set the property
463
395
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
396
    on the io_cache_st.
465
397
  */
466
398
  if (info->seek_not_done)
467
399
  {
479
411
      */
480
412
      assert(errno != ESPIPE);
481
413
      info->error= -1;
482
 
      return(1);
 
414
      return 1;
483
415
    }
484
416
  }
485
417
 
490
422
    if (info->end_of_file <= pos_in_file_local)
491
423
    {                                   /* End of file */
492
424
      info->error= (int) left_length;
493
 
      return(1);
 
425
      return 1;
494
426
    }
495
427
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
428
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
497
429
    {
498
430
      info->error= (read_length == (size_t) -1 ? -1 :
499
431
                    (int) (read_length+left_length));
500
 
      return(1);
 
432
      return 1;
501
433
    }
502
434
    Count-= length_local;
503
435
    Buffer+= length_local;
515
447
    if (Count)
516
448
    {
517
449
      info->error= static_cast<int>(left_length);       /* We only got this many char */
518
 
      return(1);
 
450
      return 1;
519
451
    }
520
452
     length_local=0;                            /* Didn't read any chars */
521
453
  }
528
460
    info->pos_in_file= pos_in_file_local;
529
461
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
530
462
    info->read_pos=info->read_end=info->buffer;
531
 
    return(1);
 
463
    return 1;
532
464
  }
533
465
  info->read_pos=info->buffer+Count;
534
466
  info->read_end=info->buffer+ length_local;
535
467
  info->pos_in_file=pos_in_file_local;
536
468
  memcpy(Buffer, info->buffer, Count);
537
 
  return(0);
 
469
  return 0;
538
470
}
539
471
 
540
 
 
541
 
#ifdef HAVE_AIOWAIT
542
 
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
556
 
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
558
 
  size_t max_length;
559
 
  my_off_t next_pos_in_file;
560
 
  unsigned char *read_buffer;
561
 
 
562
 
  memcpy(Buffer,info->read_pos,
563
 
         (left_length= (size_t) (info->read_end-info->read_pos)));
564
 
  Buffer+=left_length;
565
 
  org_Count=Count;
566
 
  Count-=left_length;
567
 
 
568
 
  if (info->inited)
569
 
  {                                             /* wait for read block */
570
 
    info->inited=0;                             /* No more block to read */
571
 
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
572
 
    if (info->aio_result.result.aio_errno)
573
 
    {
574
 
      if (info->myflags & MY_WME)
575
 
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
 
                 my_filename(info->file),
577
 
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
579
 
      info->error= -1;
580
 
      return(1);
581
 
    }
582
 
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
 
        read_length == (size_t) -1)
584
 
    {
585
 
      errno=0;                          /* For testing */
586
 
      info->error= (read_length == (size_t) -1 ? -1 :
587
 
                    (int) (read_length+left_length));
588
 
      return(1);
589
 
    }
590
 
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
591
 
 
592
 
    if (info->request_pos != info->buffer)
593
 
      info->request_pos=info->buffer;
594
 
    else
595
 
      info->request_pos=info->buffer+info->read_length;
596
 
    info->read_pos=info->request_pos;
597
 
    next_pos_in_file=info->aio_read_pos+read_length;
598
 
 
599
 
        /* Check if pos_in_file is changed
600
 
           (_ni_read_cache may have skipped some bytes) */
601
 
 
602
 
    if (info->aio_read_pos < info->pos_in_file)
603
 
    {                                           /* Fix if skipped bytes */
604
 
      if (info->aio_read_pos + read_length < info->pos_in_file)
605
 
      {
606
 
        read_length=0;                          /* Skip block */
607
 
        next_pos_in_file=info->pos_in_file;
608
 
      }
609
 
      else
610
 
      {
611
 
        my_off_t offset= (info->pos_in_file - info->aio_read_pos);
612
 
        info->pos_in_file=info->aio_read_pos; /* Whe are here */
613
 
        info->read_pos=info->request_pos+offset;
614
 
        read_length-=offset;                    /* Bytes left from read_pos */
615
 
      }
616
 
    }
617
 
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
623
 
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
625
 
  }
626
 
  else
627
 
    next_pos_in_file=(info->pos_in_file+ (size_t)
628
 
                      (info->read_end - info->request_pos));
629
 
 
630
 
        /* If reading large blocks, or first read or read with skip */
631
 
  if (Count)
632
 
  {
633
 
    if (next_pos_in_file == info->end_of_file)
634
 
    {
635
 
      info->error=(int) (read_length+left_length);
636
 
      return 1;
637
 
    }
638
 
 
639
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
640
 
    {
641
 
      info->error= -1;
642
 
      return (1);
643
 
    }
644
 
 
645
 
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
646
 
    if (Count < read_length)
647
 
    {                                   /* Small block, read to cache */
648
 
      if ((read_length=my_read(info->file,info->request_pos,
649
 
                               read_length, info->myflags)) == (size_t) -1)
650
 
        return info->error= -1;
651
 
      use_length=min(Count,read_length);
652
 
      memcpy(Buffer,info->request_pos,(size_t) use_length);
653
 
      info->read_pos=info->request_pos+Count;
654
 
      info->read_end=info->request_pos+read_length;
655
 
      info->pos_in_file=next_pos_in_file;       /* Start of block in cache */
656
 
      next_pos_in_file+=read_length;
657
 
 
658
 
      if (Count != use_length)
659
 
      {                                 /* Didn't find hole block */
660
 
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
 
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
663
 
        info->error=(int) (read_length+left_length);
664
 
        return 1;
665
 
      }
666
 
    }
667
 
    else
668
 
    {                                           /* Big block, don't cache it */
669
 
      if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
670
 
          != Count)
671
 
      {
672
 
        info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
673
 
        return 1;
674
 
      }
675
 
      info->read_pos=info->read_end=info->request_pos;
676
 
      info->pos_in_file=(next_pos_in_file+=Count);
677
 
    }
678
 
  }
679
 
 
680
 
  /* Read next block with asyncronic io */
681
 
  diff_length=(next_pos_in_file & (IO_SIZE-1));
682
 
  max_length= info->read_length - diff_length;
683
 
  if (max_length > info->end_of_file - next_pos_in_file)
684
 
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
685
 
 
686
 
  if (info->request_pos != info->buffer)
687
 
    read_buffer=info->buffer;
688
 
  else
689
 
    read_buffer=info->buffer+info->read_length;
690
 
  info->aio_read_pos=next_pos_in_file;
691
 
  if (max_length)
692
 
  {
693
 
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
694
 
    if (aioread(info->file,read_buffer, max_length,
695
 
                (my_off_t) next_pos_in_file,SEEK_SET,
696
 
                &info->aio_result.result))
697
 
    {                                           /* Skip async io */
698
 
      errno=errno;
699
 
      if (info->request_pos != info->buffer)
700
 
      {
701
 
        memmove(info->buffer, info->request_pos,
702
 
                (size_t) (info->read_end - info->read_pos));
703
 
        info->request_pos=info->buffer;
704
 
        info->read_pos-=info->read_length;
705
 
        info->read_end-=info->read_length;
706
 
      }
707
 
      info->read_length=info->buffer_length;    /* Use hole buffer */
708
 
      info->read_function=_my_b_read;           /* Use normal IO_READ next */
709
 
    }
710
 
    else
711
 
      info->inited=info->aio_result.pending=1;
712
 
  }
713
 
  return 0;                                     /* Block read, async in use */
714
 
} /* _my_b_async_read */
715
 
#endif
716
 
 
717
 
 
718
472
/**
719
473
 * @brief
720
474
 *   Read one byte when buffer is empty
721
475
 */
722
 
int _my_b_get(st_io_cache *info)
 
476
int _my_b_get(io_cache_st *info)
723
477
{
724
478
  unsigned char buff;
725
479
  IO_CACHE_CALLBACK pre_read,post_read;
 
480
 
726
481
  if ((pre_read = info->pre_read))
727
482
    (*pre_read)(info);
 
483
 
728
484
  if ((*(info)->read_function)(info,&buff,1))
729
485
    return my_b_EOF;
 
486
 
730
487
  if ((post_read = info->post_read))
731
488
    (*post_read)(info);
 
489
 
732
490
  return (int) (unsigned char) buff;
733
491
}
734
492
 
735
493
/**
736
494
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
 
495
 *   Write a byte buffer to io_cache_st and flush to disk if io_cache_st is full.
738
496
 *
739
497
 * @retval -1 On error; errno contains error code.
740
498
 * @retval 0 On success
741
499
 * @retval 1 On error on write
742
500
 */
743
 
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
501
int _my_b_write(io_cache_st *info, const unsigned char *Buffer, size_t Count)
744
502
{
745
503
  size_t rest_length,length_local;
746
504
 
764
522
    if (info->seek_not_done)
765
523
    {
766
524
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
525
        Whenever a function which operates on io_cache_st flushes/writes
 
526
        some part of the io_cache_st to disk it will set the property
769
527
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
528
        on the io_cache_st.
771
529
      */
772
530
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
773
531
      {
794
552
 *   As all write calls to the data goes through the cache,
795
553
 *   we will never get a seek over the end of the buffer.
796
554
 */
797
 
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
555
int my_block_write(io_cache_st *info, const unsigned char *Buffer, size_t Count,
798
556
                   my_off_t pos)
799
557
{
800
558
  size_t length_local;
841
599
 * @brief
842
600
 *   Flush write cache 
843
601
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
602
int my_b_flush_io_cache(io_cache_st *info, int need_append_buffer_lock)
845
603
{
846
604
  size_t length_local;
847
605
  bool append_cache= false;
899
657
      return(info->error);
900
658
    }
901
659
  }
902
 
#ifdef HAVE_AIOWAIT
903
 
  else if (info->type != READ_NET)
904
 
  {
905
 
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
906
 
    info->inited=0;
907
 
  }
908
 
#endif
909
660
  unlock_append_buffer(info, need_append_buffer_lock);
910
 
  return(0);
 
661
  return 0;
911
662
}
912
663
 
913
664
/**
914
665
 * @brief
915
 
 *   Free an st_io_cache object
 
666
 *   Free an io_cache_st object
916
667
 * 
917
668
 * @detail
918
669
 *   It's currently safe to call this if one has called init_io_cache()
919
670
 *   on the 'info' object, even if init_io_cache() failed.
920
671
 *   This function is also safe to call twice with the same handle.
921
672
 * 
922
 
 * @param info st_io_cache Handle to free
 
673
 * @param info io_cache_st Handle to free
923
674
 * 
924
675
 * @retval 0 ok
925
676
 * @retval # Error
926
677
 */
927
 
int st_io_cache::end_io_cache()
 
678
int io_cache_st::end_io_cache()
928
679
{
929
680
  int _error=0;
930
681