~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

  • Committer: Andrew Hutchings
  • Date: 2011-02-01 10:23:22 UTC
  • mto: (2136.1.1 build)
  • mto: This revision was merged to the branch mainline in revision 2137.
  • Revision ID: andrew@linuxjedi.co.uk-20110201102322-oxztcyrjzg3c7yta
Fix counters cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include <config.h>
 
50
#include "config.h"
51
51
 
52
 
#include <drizzled/definitions.h>
53
 
#include <drizzled/error_t.h>
54
 
#include <drizzled/error.h>
55
 
#include <drizzled/internal/my_sys.h>
56
 
#include <drizzled/internal/m_string.h>
57
 
#include <drizzled/drizzled.h>
58
 
#include <drizzled/internal/iocache.h>
 
52
#include "drizzled/internal/my_sys.h"
 
53
#include "drizzled/internal/m_string.h"
 
54
#include "drizzled/drizzled.h"
 
55
#ifdef HAVE_AIOWAIT
 
56
#include "drizzled/error.h"
 
57
#include "drizzled/internal/aio_result.h"
 
58
static void my_aiowait(my_aio_result *result);
 
59
#endif
 
60
#include "drizzled/internal/iocache.h"
59
61
#include <errno.h>
60
62
#include <drizzled/util/test.h>
61
63
#include <stdlib.h>
68
70
namespace internal
69
71
{
70
72
 
71
 
static int _my_b_read(io_cache_st *info, unsigned char *Buffer, size_t Count);
72
 
static int _my_b_write(io_cache_st *info, const unsigned char *Buffer, size_t Count);
 
73
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
 
74
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
73
75
 
74
76
/**
75
77
 * @brief
76
 
 *   Lock appends for the io_cache_st if required (need_append_buffer_lock)   
 
78
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
77
79
 */
78
80
inline
79
 
static void lock_append_buffer(io_cache_st *, int )
 
81
static void lock_append_buffer(register st_io_cache *, int )
80
82
{
81
83
}
82
84
 
83
85
/**
84
86
 * @brief
85
 
 *   Unlock appends for the io_cache_st if required (need_append_buffer_lock)   
 
87
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
86
88
 */
87
89
inline
88
 
static void unlock_append_buffer(io_cache_st *, int )
 
90
static void unlock_append_buffer(register st_io_cache *, int )
89
91
{
90
92
}
91
93
 
112
114
 
113
115
/**
114
116
 * @brief 
115
 
 *   Setup internal pointers inside io_cache_st
 
117
 *   Setup internal pointers inside st_io_cache
116
118
 * 
117
119
 * @details
118
 
 *   This is called on automatically on init or reinit of io_cache_st
119
 
 *   It must be called externally if one moves or copies an io_cache_st object.
 
120
 *   This is called on automatically on init or reinit of st_io_cache
 
121
 *   It must be called externally if one moves or copies an st_io_cache object.
120
122
 * 
121
123
 * @param info Cache handler to setup
122
124
 */
123
 
void io_cache_st::setup_io_cache()
 
125
void st_io_cache::setup_io_cache()
124
126
{
125
127
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
126
128
  if (type == WRITE_CACHE)
136
138
}
137
139
 
138
140
 
139
 
void io_cache_st::init_functions()
 
141
void st_io_cache::init_functions()
140
142
{
141
143
  switch (type) {
142
144
  case READ_NET:
158
160
 
159
161
/**
160
162
 * @brief
161
 
 *   Initialize an io_cache_st object
 
163
 *   Initialize an st_io_cache object
162
164
 *
163
165
 * @param file File that should be associated with the handler
164
166
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
173
175
 * @retval 0 success
174
176
 * @retval # error
175
177
 */
176
 
int io_cache_st::init_io_cache(int file_arg, size_t cachesize,
 
178
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
177
179
                               enum cache_type type_arg, my_off_t seek_offset,
178
180
                               bool use_async_io, myf cache_myflags)
179
181
{
284
286
  error= 0;
285
287
  type= type_arg;
286
288
  init_functions();
 
289
#ifdef HAVE_AIOWAIT
 
290
  if (use_async_io && ! my_disable_async_io)
 
291
  {
 
292
    read_length/=2;
 
293
    read_function=_my_b_async_read;
 
294
  }
 
295
  inited= aio_result.pending= 0;
 
296
#endif
287
297
  return 0;
288
298
}                                               /* init_io_cache */
289
299
 
 
300
        /* Wait until current request is ready */
 
301
 
 
302
#ifdef HAVE_AIOWAIT
 
303
static void my_aiowait(my_aio_result *result)
 
304
{
 
305
  if (result->pending)
 
306
  {
 
307
    struct aio_result_t *tmp;
 
308
    for (;;)
 
309
    {
 
310
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
 
311
      {
 
312
        if (errno == EINTR)
 
313
          continue;
 
314
        result->pending=0;                      /* Assume everythings is ok */
 
315
        break;
 
316
      }
 
317
      ((my_aio_result*) tmp)->pending=0;
 
318
      if ((my_aio_result*) tmp == result)
 
319
        break;
 
320
    }
 
321
  }
 
322
}
 
323
#endif
 
324
 
290
325
/**
291
326
 * @brief 
292
327
 *   Reset the cache
297
332
 *   If we are doing a reinit of a cache where we have the start of the file
298
333
 *   in the cache, we are reusing this memory without flushing it to disk.
299
334
 */
300
 
bool io_cache_st::reinit_io_cache(enum cache_type type_arg,
 
335
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
301
336
                                  my_off_t seek_offset,
302
 
                                  bool,
 
337
                                  bool use_async_io,
303
338
                                  bool clear_cache)
304
339
{
305
340
  /* One can't do reinit with the following types */
337
372
      write_pos=pos;
338
373
    else
339
374
      read_pos= pos;
 
375
#ifdef HAVE_AIOWAIT
 
376
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
377
#endif
340
378
  }
341
379
  else
342
380
  {
368
406
  error=0;
369
407
  init_functions();
370
408
 
 
409
#ifdef HAVE_AIOWAIT
 
410
  if (use_async_io && ! my_disable_async_io &&
 
411
      ((uint32_t) buffer_length <
 
412
       (uint32_t) (end_of_file - seek_offset)))
 
413
  {
 
414
    read_length=buffer_length/2;
 
415
    read_function=_my_b_async_read;
 
416
  }
 
417
  inited= 0;
 
418
#else
 
419
  (void)use_async_io;
 
420
#endif
371
421
  return 0;
372
422
} /* reinit_io_cache */
373
423
 
385
435
 *   types than my_off_t unless you can be sure that their value fits.
386
436
 *   Same applies to differences of file offsets.
387
437
 *
388
 
 * @param info io_cache_st pointer @param Buffer Buffer to retrieve count bytes
 
438
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
389
439
 * from file @param Count Number of bytes to read into Buffer
390
440
 * 
391
441
 * @retval 0 We succeeded in reading all data
392
442
 * @retval 1 Error: can't read requested characters
393
443
 */
394
 
static int _my_b_read(io_cache_st *info, unsigned char *Buffer, size_t Count)
 
444
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
395
445
{
396
446
  size_t length_local,diff_length,left_length, max_length;
397
447
  my_off_t pos_in_file_local;
408
458
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
409
459
 
410
460
  /*
411
 
    Whenever a function which operates on io_cache_st flushes/writes
412
 
    some part of the io_cache_st to disk it will set the property
 
461
    Whenever a function which operates on st_io_cache flushes/writes
 
462
    some part of the st_io_cache to disk it will set the property
413
463
    "seek_not_done" to indicate this to other functions operating
414
 
    on the io_cache_st.
 
464
    on the st_io_cache.
415
465
  */
416
466
  if (info->seek_not_done)
417
467
  {
487
537
  return(0);
488
538
}
489
539
 
 
540
 
 
541
#ifdef HAVE_AIOWAIT
 
542
 
 
543
/**
 
544
 * @brief
 
545
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
 
546
 *
 
547
 * @param info st_io_cache pointer
 
548
 * @param Buffer Buffer to retrieve count bytes from file
 
549
 * @param Count Number of bytes to read into Buffer
 
550
 * 
 
551
 * @retval -1 An error has occurred; errno is set.
 
552
 * @retval 0 Success
 
553
 * @retval 1 An error has occurred; st_io_cache to error state.
 
554
 */
 
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
556
{
 
557
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
558
  size_t max_length;
 
559
  my_off_t next_pos_in_file;
 
560
  unsigned char *read_buffer;
 
561
 
 
562
  memcpy(Buffer,info->read_pos,
 
563
         (left_length= (size_t) (info->read_end-info->read_pos)));
 
564
  Buffer+=left_length;
 
565
  org_Count=Count;
 
566
  Count-=left_length;
 
567
 
 
568
  if (info->inited)
 
569
  {                                             /* wait for read block */
 
570
    info->inited=0;                             /* No more block to read */
 
571
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
572
    if (info->aio_result.result.aio_errno)
 
573
    {
 
574
      if (info->myflags & MY_WME)
 
575
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
 
576
                 my_filename(info->file),
 
577
                 info->aio_result.result.aio_errno);
 
578
      errno=info->aio_result.result.aio_errno;
 
579
      info->error= -1;
 
580
      return(1);
 
581
    }
 
582
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
 
583
        read_length == (size_t) -1)
 
584
    {
 
585
      errno=0;                          /* For testing */
 
586
      info->error= (read_length == (size_t) -1 ? -1 :
 
587
                    (int) (read_length+left_length));
 
588
      return(1);
 
589
    }
 
590
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
 
591
 
 
592
    if (info->request_pos != info->buffer)
 
593
      info->request_pos=info->buffer;
 
594
    else
 
595
      info->request_pos=info->buffer+info->read_length;
 
596
    info->read_pos=info->request_pos;
 
597
    next_pos_in_file=info->aio_read_pos+read_length;
 
598
 
 
599
        /* Check if pos_in_file is changed
 
600
           (_ni_read_cache may have skipped some bytes) */
 
601
 
 
602
    if (info->aio_read_pos < info->pos_in_file)
 
603
    {                                           /* Fix if skipped bytes */
 
604
      if (info->aio_read_pos + read_length < info->pos_in_file)
 
605
      {
 
606
        read_length=0;                          /* Skip block */
 
607
        next_pos_in_file=info->pos_in_file;
 
608
      }
 
609
      else
 
610
      {
 
611
        my_off_t offset= (info->pos_in_file - info->aio_read_pos);
 
612
        info->pos_in_file=info->aio_read_pos; /* Whe are here */
 
613
        info->read_pos=info->request_pos+offset;
 
614
        read_length-=offset;                    /* Bytes left from read_pos */
 
615
      }
 
616
    }
 
617
        /* Copy found bytes to buffer */
 
618
    length_local=min(Count,read_length);
 
619
    memcpy(Buffer,info->read_pos,(size_t) length_local);
 
620
    Buffer+=length_local;
 
621
    Count-=length_local;
 
622
    left_length+=length_local;
 
623
    info->read_end=info->rc_pos+read_length;
 
624
    info->read_pos+=length_local;
 
625
  }
 
626
  else
 
627
    next_pos_in_file=(info->pos_in_file+ (size_t)
 
628
                      (info->read_end - info->request_pos));
 
629
 
 
630
        /* If reading large blocks, or first read or read with skip */
 
631
  if (Count)
 
632
  {
 
633
    if (next_pos_in_file == info->end_of_file)
 
634
    {
 
635
      info->error=(int) (read_length+left_length);
 
636
      return 1;
 
637
    }
 
638
 
 
639
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
640
    {
 
641
      info->error= -1;
 
642
      return (1);
 
643
    }
 
644
 
 
645
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
 
646
    if (Count < read_length)
 
647
    {                                   /* Small block, read to cache */
 
648
      if ((read_length=my_read(info->file,info->request_pos,
 
649
                               read_length, info->myflags)) == (size_t) -1)
 
650
        return info->error= -1;
 
651
      use_length=min(Count,read_length);
 
652
      memcpy(Buffer,info->request_pos,(size_t) use_length);
 
653
      info->read_pos=info->request_pos+Count;
 
654
      info->read_end=info->request_pos+read_length;
 
655
      info->pos_in_file=next_pos_in_file;       /* Start of block in cache */
 
656
      next_pos_in_file+=read_length;
 
657
 
 
658
      if (Count != use_length)
 
659
      {                                 /* Didn't find hole block */
 
660
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
 
661
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
 
662
                   my_filename(info->file),errno);
 
663
        info->error=(int) (read_length+left_length);
 
664
        return 1;
 
665
      }
 
666
    }
 
667
    else
 
668
    {                                           /* Big block, don't cache it */
 
669
      if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
 
670
          != Count)
 
671
      {
 
672
        info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
 
673
        return 1;
 
674
      }
 
675
      info->read_pos=info->read_end=info->request_pos;
 
676
      info->pos_in_file=(next_pos_in_file+=Count);
 
677
    }
 
678
  }
 
679
 
 
680
  /* Read next block with asyncronic io */
 
681
  diff_length=(next_pos_in_file & (IO_SIZE-1));
 
682
  max_length= info->read_length - diff_length;
 
683
  if (max_length > info->end_of_file - next_pos_in_file)
 
684
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
 
685
 
 
686
  if (info->request_pos != info->buffer)
 
687
    read_buffer=info->buffer;
 
688
  else
 
689
    read_buffer=info->buffer+info->read_length;
 
690
  info->aio_read_pos=next_pos_in_file;
 
691
  if (max_length)
 
692
  {
 
693
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
694
    if (aioread(info->file,read_buffer, max_length,
 
695
                (my_off_t) next_pos_in_file,SEEK_SET,
 
696
                &info->aio_result.result))
 
697
    {                                           /* Skip async io */
 
698
      errno=errno;
 
699
      if (info->request_pos != info->buffer)
 
700
      {
 
701
        memmove(info->buffer, info->request_pos,
 
702
                (size_t) (info->read_end - info->read_pos));
 
703
        info->request_pos=info->buffer;
 
704
        info->read_pos-=info->read_length;
 
705
        info->read_end-=info->read_length;
 
706
      }
 
707
      info->read_length=info->buffer_length;    /* Use hole buffer */
 
708
      info->read_function=_my_b_read;           /* Use normal IO_READ next */
 
709
    }
 
710
    else
 
711
      info->inited=info->aio_result.pending=1;
 
712
  }
 
713
  return 0;                                     /* Block read, async in use */
 
714
} /* _my_b_async_read */
 
715
#endif
 
716
 
 
717
 
490
718
/**
491
719
 * @brief
492
720
 *   Read one byte when buffer is empty
493
721
 */
494
 
int _my_b_get(io_cache_st *info)
 
722
int _my_b_get(st_io_cache *info)
495
723
{
496
724
  unsigned char buff;
497
725
  IO_CACHE_CALLBACK pre_read,post_read;
498
 
 
499
726
  if ((pre_read = info->pre_read))
500
727
    (*pre_read)(info);
501
 
 
502
728
  if ((*(info)->read_function)(info,&buff,1))
503
729
    return my_b_EOF;
504
 
 
505
730
  if ((post_read = info->post_read))
506
731
    (*post_read)(info);
507
 
 
508
732
  return (int) (unsigned char) buff;
509
733
}
510
734
 
511
735
/**
512
736
 * @brief
513
 
 *   Write a byte buffer to io_cache_st and flush to disk if io_cache_st is full.
 
737
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
514
738
 *
515
739
 * @retval -1 On error; errno contains error code.
516
740
 * @retval 0 On success
517
741
 * @retval 1 On error on write
518
742
 */
519
 
int _my_b_write(io_cache_st *info, const unsigned char *Buffer, size_t Count)
 
743
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
520
744
{
521
745
  size_t rest_length,length_local;
522
746
 
540
764
    if (info->seek_not_done)
541
765
    {
542
766
      /*
543
 
        Whenever a function which operates on io_cache_st flushes/writes
544
 
        some part of the io_cache_st to disk it will set the property
 
767
        Whenever a function which operates on st_io_cache flushes/writes
 
768
        some part of the st_io_cache to disk it will set the property
545
769
        "seek_not_done" to indicate this to other functions operating
546
 
        on the io_cache_st.
 
770
        on the st_io_cache.
547
771
      */
548
772
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
549
773
      {
570
794
 *   As all write calls to the data goes through the cache,
571
795
 *   we will never get a seek over the end of the buffer.
572
796
 */
573
 
int my_block_write(io_cache_st *info, const unsigned char *Buffer, size_t Count,
 
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
574
798
                   my_off_t pos)
575
799
{
576
800
  size_t length_local;
617
841
 * @brief
618
842
 *   Flush write cache 
619
843
 */
620
 
int my_b_flush_io_cache(io_cache_st *info, int need_append_buffer_lock)
 
844
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
621
845
{
622
846
  size_t length_local;
623
847
  bool append_cache= false;
675
899
      return(info->error);
676
900
    }
677
901
  }
 
902
#ifdef HAVE_AIOWAIT
 
903
  else if (info->type != READ_NET)
 
904
  {
 
905
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
906
    info->inited=0;
 
907
  }
 
908
#endif
678
909
  unlock_append_buffer(info, need_append_buffer_lock);
679
910
  return(0);
680
911
}
681
912
 
682
913
/**
683
914
 * @brief
684
 
 *   Free an io_cache_st object
 
915
 *   Free an st_io_cache object
685
916
 * 
686
917
 * @detail
687
918
 *   It's currently safe to call this if one has called init_io_cache()
688
919
 *   on the 'info' object, even if init_io_cache() failed.
689
920
 *   This function is also safe to call twice with the same handle.
690
921
 * 
691
 
 * @param info io_cache_st Handle to free
 
922
 * @param info st_io_cache Handle to free
692
923
 * 
693
924
 * @retval 0 ok
694
925
 * @retval # Error
695
926
 */
696
 
int io_cache_st::end_io_cache()
 
927
int st_io_cache::end_io_cache()
697
928
{
698
929
  int _error=0;
699
930