~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

  • Committer: Lee Bieber
  • Date: 2011-03-28 18:11:45 UTC
  • mfrom: (2254.1.2 build)
  • Revision ID: kalebral@gmail.com-20110328181145-tfsb6s5ozhuvhfoq
Merge Patrick - Tweaked dbqp so that the existing slave tests work.
Merge Stewart - remove over 1000 lines of mysys

Show diffs side-by-side

added added

removed removed

Lines of Context:
49
49
 
50
50
#include <config.h>
51
51
 
 
52
#include <drizzled/definitions.h>
 
53
#include <drizzled/error_t.h>
 
54
#include <drizzled/error.h>
52
55
#include <drizzled/internal/my_sys.h>
53
56
#include <drizzled/internal/m_string.h>
54
57
#include <drizzled/drizzled.h>
55
 
#ifdef HAVE_AIOWAIT
56
 
#include <drizzled/error.h>
57
 
#include <drizzled/internal/aio_result.h>
58
 
static void my_aiowait(my_aio_result *result);
59
 
#endif
60
58
#include <drizzled/internal/iocache.h>
61
59
#include <errno.h>
62
60
#include <drizzled/util/test.h>
286
284
  error= 0;
287
285
  type= type_arg;
288
286
  init_functions();
289
 
#ifdef HAVE_AIOWAIT
290
 
  if (use_async_io && ! my_disable_async_io)
291
 
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
294
 
  }
295
 
  inited= aio_result.pending= 0;
296
 
#endif
297
287
  return 0;
298
288
}                                               /* init_io_cache */
299
289
 
300
 
        /* Wait until current request is ready */
301
 
 
302
 
#ifdef HAVE_AIOWAIT
303
 
static void my_aiowait(my_aio_result *result)
304
 
{
305
 
  if (result->pending)
306
 
  {
307
 
    struct aio_result_t *tmp;
308
 
    for (;;)
309
 
    {
310
 
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
311
 
      {
312
 
        if (errno == EINTR)
313
 
          continue;
314
 
        result->pending=0;                      /* Assume everythings is ok */
315
 
        break;
316
 
      }
317
 
      ((my_aio_result*) tmp)->pending=0;
318
 
      if ((my_aio_result*) tmp == result)
319
 
        break;
320
 
    }
321
 
  }
322
 
}
323
 
#endif
324
 
 
325
290
/**
326
291
 * @brief 
327
292
 *   Reset the cache
334
299
 */
335
300
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
301
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
 
302
                                  bool,
338
303
                                  bool clear_cache)
339
304
{
340
305
  /* One can't do reinit with the following types */
372
337
      write_pos=pos;
373
338
    else
374
339
      read_pos= pos;
375
 
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
377
 
#endif
378
340
  }
379
341
  else
380
342
  {
406
368
  error=0;
407
369
  init_functions();
408
370
 
409
 
#ifdef HAVE_AIOWAIT
410
 
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
413
 
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
416
 
  }
417
 
  inited= 0;
418
 
#else
419
 
  (void)use_async_io;
420
 
#endif
421
371
  return 0;
422
372
} /* reinit_io_cache */
423
373
 
537
487
  return(0);
538
488
}
539
489
 
540
 
 
541
 
#ifdef HAVE_AIOWAIT
542
 
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
556
 
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
558
 
  size_t max_length;
559
 
  my_off_t next_pos_in_file;
560
 
  unsigned char *read_buffer;
561
 
 
562
 
  memcpy(Buffer,info->read_pos,
563
 
         (left_length= (size_t) (info->read_end-info->read_pos)));
564
 
  Buffer+=left_length;
565
 
  org_Count=Count;
566
 
  Count-=left_length;
567
 
 
568
 
  if (info->inited)
569
 
  {                                             /* wait for read block */
570
 
    info->inited=0;                             /* No more block to read */
571
 
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
572
 
    if (info->aio_result.result.aio_errno)
573
 
    {
574
 
      if (info->myflags & MY_WME)
575
 
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
 
                 my_filename(info->file),
577
 
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
579
 
      info->error= -1;
580
 
      return(1);
581
 
    }
582
 
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
 
        read_length == (size_t) -1)
584
 
    {
585
 
      errno=0;                          /* For testing */
586
 
      info->error= (read_length == (size_t) -1 ? -1 :
587
 
                    (int) (read_length+left_length));
588
 
      return(1);
589
 
    }
590
 
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
591
 
 
592
 
    if (info->request_pos != info->buffer)
593
 
      info->request_pos=info->buffer;
594
 
    else
595
 
      info->request_pos=info->buffer+info->read_length;
596
 
    info->read_pos=info->request_pos;
597
 
    next_pos_in_file=info->aio_read_pos+read_length;
598
 
 
599
 
        /* Check if pos_in_file is changed
600
 
           (_ni_read_cache may have skipped some bytes) */
601
 
 
602
 
    if (info->aio_read_pos < info->pos_in_file)
603
 
    {                                           /* Fix if skipped bytes */
604
 
      if (info->aio_read_pos + read_length < info->pos_in_file)
605
 
      {
606
 
        read_length=0;                          /* Skip block */
607
 
        next_pos_in_file=info->pos_in_file;
608
 
      }
609
 
      else
610
 
      {
611
 
        my_off_t offset= (info->pos_in_file - info->aio_read_pos);
612
 
        info->pos_in_file=info->aio_read_pos; /* Whe are here */
613
 
        info->read_pos=info->request_pos+offset;
614
 
        read_length-=offset;                    /* Bytes left from read_pos */
615
 
      }
616
 
    }
617
 
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
623
 
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
625
 
  }
626
 
  else
627
 
    next_pos_in_file=(info->pos_in_file+ (size_t)
628
 
                      (info->read_end - info->request_pos));
629
 
 
630
 
        /* If reading large blocks, or first read or read with skip */
631
 
  if (Count)
632
 
  {
633
 
    if (next_pos_in_file == info->end_of_file)
634
 
    {
635
 
      info->error=(int) (read_length+left_length);
636
 
      return 1;
637
 
    }
638
 
 
639
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
640
 
    {
641
 
      info->error= -1;
642
 
      return (1);
643
 
    }
644
 
 
645
 
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
646
 
    if (Count < read_length)
647
 
    {                                   /* Small block, read to cache */
648
 
      if ((read_length=my_read(info->file,info->request_pos,
649
 
                               read_length, info->myflags)) == (size_t) -1)
650
 
        return info->error= -1;
651
 
      use_length=min(Count,read_length);
652
 
      memcpy(Buffer,info->request_pos,(size_t) use_length);
653
 
      info->read_pos=info->request_pos+Count;
654
 
      info->read_end=info->request_pos+read_length;
655
 
      info->pos_in_file=next_pos_in_file;       /* Start of block in cache */
656
 
      next_pos_in_file+=read_length;
657
 
 
658
 
      if (Count != use_length)
659
 
      {                                 /* Didn't find hole block */
660
 
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
 
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
663
 
        info->error=(int) (read_length+left_length);
664
 
        return 1;
665
 
      }
666
 
    }
667
 
    else
668
 
    {                                           /* Big block, don't cache it */
669
 
      if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
670
 
          != Count)
671
 
      {
672
 
        info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
673
 
        return 1;
674
 
      }
675
 
      info->read_pos=info->read_end=info->request_pos;
676
 
      info->pos_in_file=(next_pos_in_file+=Count);
677
 
    }
678
 
  }
679
 
 
680
 
  /* Read next block with asyncronic io */
681
 
  diff_length=(next_pos_in_file & (IO_SIZE-1));
682
 
  max_length= info->read_length - diff_length;
683
 
  if (max_length > info->end_of_file - next_pos_in_file)
684
 
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
685
 
 
686
 
  if (info->request_pos != info->buffer)
687
 
    read_buffer=info->buffer;
688
 
  else
689
 
    read_buffer=info->buffer+info->read_length;
690
 
  info->aio_read_pos=next_pos_in_file;
691
 
  if (max_length)
692
 
  {
693
 
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
694
 
    if (aioread(info->file,read_buffer, max_length,
695
 
                (my_off_t) next_pos_in_file,SEEK_SET,
696
 
                &info->aio_result.result))
697
 
    {                                           /* Skip async io */
698
 
      errno=errno;
699
 
      if (info->request_pos != info->buffer)
700
 
      {
701
 
        memmove(info->buffer, info->request_pos,
702
 
                (size_t) (info->read_end - info->read_pos));
703
 
        info->request_pos=info->buffer;
704
 
        info->read_pos-=info->read_length;
705
 
        info->read_end-=info->read_length;
706
 
      }
707
 
      info->read_length=info->buffer_length;    /* Use hole buffer */
708
 
      info->read_function=_my_b_read;           /* Use normal IO_READ next */
709
 
    }
710
 
    else
711
 
      info->inited=info->aio_result.pending=1;
712
 
  }
713
 
  return 0;                                     /* Block read, async in use */
714
 
} /* _my_b_async_read */
715
 
#endif
716
 
 
717
 
 
718
490
/**
719
491
 * @brief
720
492
 *   Read one byte when buffer is empty
899
671
      return(info->error);
900
672
    }
901
673
  }
902
 
#ifdef HAVE_AIOWAIT
903
 
  else if (info->type != READ_NET)
904
 
  {
905
 
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
906
 
    info->inited=0;
907
 
  }
908
 
#endif
909
674
  unlock_append_buffer(info, need_append_buffer_lock);
910
675
  return(0);
911
676
}