~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Brian Aker
  • Date: 2008-07-13 21:20:24 UTC
  • Revision ID: brian@tangent.org-20080713212024-o6263c1vha7yxdeu
More bool removal. More cow bell!

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
 
50
#define MAP_TO_USE_RAID
50
51
#include "mysys_priv.h"
51
 
#include <mystrings/m_string.h>
 
52
#include <m_string.h>
52
53
#ifdef HAVE_AIOWAIT
53
54
#include "mysys_err.h"
54
 
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
57
 
#include <mysys/iocache.h>
58
57
#include <errno.h>
59
 
#include <drizzled/util/test.h>
 
58
 
60
59
#define lock_append_buffer(info) \
61
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
62
61
#define unlock_append_buffer(info) \
148
147
 
149
148
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
150
149
                  enum cache_type type, my_off_t seek_offset,
151
 
                  bool use_async_io, myf cache_myflags)
 
150
                  pbool use_async_io, myf cache_myflags)
152
151
{
153
152
  size_t min_cache;
154
153
  my_off_t pos;
155
154
  my_off_t end_of_file= ~(my_off_t) 0;
 
155
  DBUG_ENTER("init_io_cache");
 
156
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
 
157
                      (ulong) info, (int) type, (ulong) seek_offset));
156
158
 
157
159
  info->file= file;
158
160
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
178
180
        the beginning of whatever this file is, then somebody made a bad
179
181
        assumption.
180
182
      */
181
 
      assert(seek_offset == 0);
 
183
      DBUG_ASSERT(seek_offset == 0);
182
184
    }
183
185
    else
184
186
      info->seek_not_done= test(seek_offset != pos);
188
190
  info->share=0;
189
191
 
190
192
  if (!cachesize && !(cachesize= my_default_record_cache_size))
191
 
    return(1);                          /* No cache requested */
 
193
    DBUG_RETURN(1);                             /* No cache requested */
192
194
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
193
195
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
194
196
  {                                             /* Assume file isn't growing */
222
224
      if (type == SEQ_READ_APPEND)
223
225
        buffer_block *= 2;
224
226
      if ((info->buffer=
225
 
           (unsigned char*) my_malloc(buffer_block,
 
227
           (uchar*) my_malloc(buffer_block,
226
228
                             MYF((cache_myflags & ~ MY_WME) |
227
229
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
228
230
      {
233
235
        break;                                  /* Enough memory found */
234
236
      }
235
237
      if (cachesize == min_cache)
236
 
        return(2);                              /* Can't alloc cache */
 
238
        DBUG_RETURN(2);                         /* Can't alloc cache */
237
239
      /* Try with less memory */
238
240
      cachesize= (cachesize*3/4 & ~(min_cache-1));
239
241
    }
240
242
  }
241
243
 
 
244
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
242
245
  info->read_length=info->buffer_length=cachesize;
243
246
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
244
247
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
252
255
  else
253
256
  {
254
257
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
255
 
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
258
    bzero((char*) &info->append_buffer_lock, sizeof(info));
256
259
  }
257
260
#endif
258
261
 
270
273
#ifdef HAVE_AIOWAIT
271
274
  if (use_async_io && ! my_disable_async_io)
272
275
  {
 
276
    DBUG_PRINT("info",("Using async io"));
273
277
    info->read_length/=2;
274
278
    info->read_function=_my_b_async_read;
275
279
  }
276
280
  info->inited=info->aio_result.pending=0;
277
281
#endif
278
 
  return(0);
 
282
  DBUG_RETURN(0);
279
283
}                                               /* init_io_cache */
280
284
 
281
285
        /* Wait until current request is ready */
292
296
      {
293
297
        if (errno == EINTR)
294
298
          continue;
 
299
        DBUG_PRINT("error",("No aio request, error: %d",errno));
295
300
        result->pending=0;                      /* Assume everythings is ok */
296
301
        break;
297
302
      }
314
319
 
315
320
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
316
321
                        my_off_t seek_offset,
317
 
                        bool use_async_io __attribute__((unused)),
318
 
                        bool clear_cache)
 
322
                        pbool use_async_io __attribute__((unused)),
 
323
                        pbool clear_cache)
319
324
{
 
325
  DBUG_ENTER("reinit_io_cache");
 
326
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
 
327
                      (ulong) info, type, (ulong) seek_offset,
 
328
                      (int) clear_cache));
 
329
 
320
330
  /* One can't do reinit with the following types */
321
 
  assert(type != READ_NET && info->type != READ_NET &&
 
331
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
322
332
              type != WRITE_NET && info->type != WRITE_NET &&
323
333
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
324
334
 
328
338
      seek_offset <= my_b_tell(info))
329
339
  {
330
340
    /* Reuse current buffer without flushing it to disk */
331
 
    unsigned char *pos;
 
341
    uchar *pos;
332
342
    if (info->type == WRITE_CACHE && type == READ_CACHE)
333
343
    {
334
344
      info->read_end=info->write_pos;
367
377
      info->end_of_file=my_b_tell(info);
368
378
    /* flush cache if we want to reuse it */
369
379
    if (!clear_cache && my_b_flush_io_cache(info,1))
370
 
      return(1);
 
380
      DBUG_RETURN(1);
371
381
    info->pos_in_file=seek_offset;
372
382
    /* Better to do always do a seek */
373
383
    info->seek_not_done=1;
389
399
 
390
400
#ifdef HAVE_AIOWAIT
391
401
  if (use_async_io && ! my_disable_async_io &&
392
 
      ((uint32_t) info->buffer_length <
393
 
       (uint32_t) (info->end_of_file - seek_offset)))
 
402
      ((ulong) info->buffer_length <
 
403
       (ulong) (info->end_of_file - seek_offset)))
394
404
  {
395
405
    info->read_length=info->buffer_length/2;
396
406
    info->read_function=_my_b_async_read;
397
407
  }
398
408
  info->inited=0;
399
409
#endif
400
 
  return(0);
 
410
  DBUG_RETURN(0);
401
411
} /* reinit_io_cache */
402
412
 
403
413
 
430
440
    1      Error: can't read requested characters
431
441
*/
432
442
 
433
 
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
443
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
434
444
{
435
445
  size_t length,diff_length,left_length, max_length;
436
446
  my_off_t pos_in_file;
 
447
  DBUG_ENTER("_my_b_read");
437
448
 
438
449
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
439
450
  {
440
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
451
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
441
452
    memcpy(Buffer,info->read_pos, left_length);
442
453
    Buffer+=left_length;
443
454
    Count-=left_length;
467
478
        info->file is a pipe or socket or FIFO.  We never should have tried
468
479
        to seek on that.  See Bugs#25807 and #22828 for more info.
469
480
      */
470
 
      assert(my_errno != ESPIPE);
 
481
      DBUG_ASSERT(my_errno != ESPIPE);
471
482
      info->error= -1;
472
 
      return(1);
 
483
      DBUG_RETURN(1);
473
484
    }
474
485
  }
475
486
 
480
491
    if (info->end_of_file <= pos_in_file)
481
492
    {                                   /* End of file */
482
493
      info->error= (int) left_length;
483
 
      return(1);
 
494
      DBUG_RETURN(1);
484
495
    }
485
496
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
486
497
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
488
499
    {
489
500
      info->error= (read_length == (size_t) -1 ? -1 :
490
501
                    (int) (read_length+left_length));
491
 
      return(1);
 
502
      DBUG_RETURN(1);
492
503
    }
493
504
    Count-=length;
494
505
    Buffer+=length;
506
517
    if (Count)
507
518
    {
508
519
      info->error= left_length;         /* We only got this many char */
509
 
      return(1);
 
520
      DBUG_RETURN(1);
510
521
    }
511
522
    length=0;                           /* Didn't read any chars */
512
523
  }
519
530
    info->pos_in_file= pos_in_file;
520
531
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
521
532
    info->read_pos=info->read_end=info->buffer;
522
 
    return(1);
 
533
    DBUG_RETURN(1);
523
534
  }
524
535
  info->read_pos=info->buffer+Count;
525
536
  info->read_end=info->buffer+length;
526
537
  info->pos_in_file=pos_in_file;
527
538
  memcpy(Buffer, info->buffer, Count);
528
 
  return(0);
 
539
  DBUG_RETURN(0);
529
540
}
530
541
 
531
542
 
598
609
*/
599
610
 
600
611
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
601
 
                         IO_CACHE *write_cache, uint32_t num_threads)
 
612
                         IO_CACHE *write_cache, uint num_threads)
602
613
{
603
 
  assert(num_threads > 1);
604
 
  assert(read_cache->type == READ_CACHE);
605
 
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
614
  DBUG_ENTER("init_io_cache_share");
 
615
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
 
616
                                "write_cache: 0x%lx  threads: %u",
 
617
                                (long) read_cache, (long) cshare,
 
618
                                (long) write_cache, num_threads));
 
619
 
 
620
  DBUG_ASSERT(num_threads > 1);
 
621
  DBUG_ASSERT(read_cache->type == READ_CACHE);
 
622
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
606
623
 
607
624
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
608
625
  pthread_cond_init(&cshare->cond, 0);
624
641
  if (write_cache)
625
642
    write_cache->share= cshare;
626
643
 
627
 
  return;
 
644
  DBUG_VOID_RETURN;
628
645
}
629
646
 
630
647
 
650
667
void remove_io_thread(IO_CACHE *cache)
651
668
{
652
669
  IO_CACHE_SHARE *cshare= cache->share;
653
 
  uint32_t total;
 
670
  uint total;
 
671
  DBUG_ENTER("remove_io_thread");
654
672
 
655
673
  /* If the writer goes, it needs to flush the write cache. */
656
674
  if (cache == cshare->source_cache)
657
675
    flush_io_cache(cache);
658
676
 
659
677
  pthread_mutex_lock(&cshare->mutex);
 
678
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
 
679
                                (cache == cshare->source_cache) ?
 
680
                                "writer" : "reader", (long) cache));
660
681
 
661
682
  /* Remove from share. */
662
683
  total= --cshare->total_threads;
 
684
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
663
685
 
664
686
  /* Detach from share. */
665
687
  cache->share= NULL;
667
689
  /* If the writer goes, let the readers know. */
668
690
  if (cache == cshare->source_cache)
669
691
  {
 
692
    DBUG_PRINT("io_cache_share", ("writer leaves"));
670
693
    cshare->source_cache= NULL;
671
694
  }
672
695
 
673
696
  /* If all threads are waiting for me to join the lock, wake them. */
674
697
  if (!--cshare->running_threads)
675
698
  {
 
699
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
676
700
    pthread_cond_signal(&cshare->cond_writer);
677
701
    pthread_cond_broadcast(&cshare->cond);
678
702
  }
681
705
 
682
706
  if (!total)
683
707
  {
 
708
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
684
709
    pthread_cond_destroy (&cshare->cond_writer);
685
710
    pthread_cond_destroy (&cshare->cond);
686
711
    pthread_mutex_destroy(&cshare->mutex);
687
712
  }
688
713
 
689
 
  return;
 
714
  DBUG_VOID_RETURN;
690
715
}
691
716
 
692
717
 
721
746
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
722
747
{
723
748
  IO_CACHE_SHARE *cshare= cache->share;
 
749
  DBUG_ENTER("lock_io_cache");
724
750
 
725
751
  /* Enter the lock. */
726
752
  pthread_mutex_lock(&cshare->mutex);
727
753
  cshare->running_threads--;
 
754
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
755
                                (cache == cshare->source_cache) ?
 
756
                                "writer" : "reader", (long) cache, (ulong) pos,
 
757
                                cshare->running_threads));
728
758
 
729
759
  if (cshare->source_cache)
730
760
  {
735
765
      /* The writer waits until all readers are here. */
736
766
      while (cshare->running_threads)
737
767
      {
 
768
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
738
769
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
739
770
      }
 
771
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
 
772
 
740
773
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
741
 
      return(1);
 
774
      DBUG_RETURN(1);
742
775
    }
743
776
 
744
777
    /* The last thread wakes the writer. */
745
778
    if (!cshare->running_threads)
746
779
    {
 
780
      DBUG_PRINT("io_cache_share", ("waking writer"));
747
781
      pthread_cond_signal(&cshare->cond_writer);
748
782
    }
749
783
 
755
789
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
756
790
           cshare->source_cache)
757
791
    {
 
792
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
758
793
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
759
794
    }
760
795
 
768
803
    */
769
804
    if (!cshare->read_end || (cshare->pos_in_file < pos))
770
805
    {
 
806
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
771
807
      cshare->read_end= cshare->buffer; /* Empty buffer. */
772
808
      cshare->error= 0; /* EOF is not an error. */
773
809
    }
780
816
    */
781
817
    if (!cshare->running_threads)
782
818
    {
 
819
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
783
820
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
784
 
      return(1);
 
821
      DBUG_RETURN(1);
785
822
    }
786
823
 
787
824
    /*
794
831
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
795
832
           cshare->running_threads)
796
833
    {
 
834
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
797
835
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
798
836
    }
799
837
 
800
838
    /* If the block is not yet read, continue with a locked cache and read. */
801
839
    if (!cshare->read_end || (cshare->pos_in_file < pos))
802
840
    {
 
841
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
803
842
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
804
 
      return(1);
 
843
      DBUG_RETURN(1);
805
844
    }
806
845
 
807
846
    /* Another thread did read the block already. */
808
847
  }
 
848
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
 
849
                                (uint) (cshare->read_end ? (size_t)
 
850
                                        (cshare->read_end - cshare->buffer) :
 
851
                                        0)));
809
852
 
810
853
  /*
811
854
    Leave the lock. Do not call unlock_io_cache() later. The thread that
812
855
    filled the buffer did this and marked all threads as running.
813
856
  */
814
857
  pthread_mutex_unlock(&cshare->mutex);
815
 
  return(0);
 
858
  DBUG_RETURN(0);
816
859
}
817
860
 
818
861
 
846
889
static void unlock_io_cache(IO_CACHE *cache)
847
890
{
848
891
  IO_CACHE_SHARE *cshare= cache->share;
 
892
  DBUG_ENTER("unlock_io_cache");
 
893
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
894
                                (cache == cshare->source_cache) ?
 
895
                                "writer" : "reader",
 
896
                                (long) cache, (ulong) cshare->pos_in_file,
 
897
                                cshare->total_threads));
849
898
 
850
899
  cshare->running_threads= cshare->total_threads;
851
900
  pthread_cond_broadcast(&cshare->cond);
852
901
  pthread_mutex_unlock(&cshare->mutex);
853
 
  return;
 
902
  DBUG_VOID_RETURN;
854
903
}
855
904
 
856
905
 
891
940
    1      Error: can't read requested characters
892
941
*/
893
942
 
894
 
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
943
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
895
944
{
896
945
  my_off_t pos_in_file;
897
946
  size_t length, diff_length, left_length;
898
947
  IO_CACHE_SHARE *cshare= cache->share;
 
948
  DBUG_ENTER("_my_b_read_r");
899
949
 
900
950
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
901
951
  {
902
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
952
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
903
953
    memcpy(Buffer, cache->read_pos, left_length);
904
954
    Buffer+= left_length;
905
955
    Count-= left_length;
920
970
    if (length == 0)
921
971
    {
922
972
      cache->error= (int) left_length;
923
 
      return(1);
 
973
      DBUG_RETURN(1);
924
974
    }
925
975
    if (lock_io_cache(cache, pos_in_file))
926
976
    {
927
977
      /* With a synchronized write/read cache we won't come here... */
928
 
      assert(!cshare->source_cache);
 
978
      DBUG_ASSERT(!cshare->source_cache);
929
979
      /*
930
980
        ... unless the writer has gone before this thread entered the
931
981
        lock. Simulate EOF in this case. It can be distinguished by
948
998
          {
949
999
            cache->error= -1;
950
1000
            unlock_io_cache(cache);
951
 
            return(1);
 
1001
            DBUG_RETURN(1);
952
1002
          }
953
1003
        }
954
1004
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
955
1005
      }
 
1006
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
 
1007
 
956
1008
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
957
1009
      cache->error=       (len == length ? 0 : (int) len);
958
1010
      cache->pos_in_file= pos_in_file;
982
1034
    cache->seek_not_done= 0;
983
1035
    if (len == 0 || len == (size_t) -1)
984
1036
    {
 
1037
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
 
1038
                                    (ulong) len, (ulong) left_length));
985
1039
      cache->error= (int) left_length;
986
 
      return(1);
 
1040
      DBUG_RETURN(1);
987
1041
    }
988
1042
    cnt= (len > Count) ? Count : len;
989
1043
    memcpy(Buffer, cache->read_pos, cnt);
992
1046
    left_length+= cnt;
993
1047
    cache->read_pos+= cnt;
994
1048
  }
995
 
  return(0);
 
1049
  DBUG_RETURN(0);
996
1050
}
997
1051
 
998
1052
 
1014
1068
*/
1015
1069
 
1016
1070
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
 
                                const unsigned char *write_buffer, size_t write_length)
 
1071
                                const uchar *write_buffer, size_t write_length)
1018
1072
{
1019
1073
  IO_CACHE_SHARE *cshare= write_cache->share;
1020
1074
 
1021
 
  assert(cshare->source_cache == write_cache);
 
1075
  DBUG_ASSERT(cshare->source_cache == write_cache);
1022
1076
  /*
1023
1077
    write_length is usually less or equal to buffer_length.
1024
1078
    It can be bigger if _my_b_write() is called with a big length.
1025
1079
  */
1026
1080
  while (write_length)
1027
1081
  {
1028
 
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1082
    size_t copy_length= min(write_length, write_cache->buffer_length);
1029
1083
    int  __attribute__((unused)) rc;
1030
1084
 
1031
1085
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
1086
    /* The writing thread does always have the lock when it awakes. */
1033
 
    assert(rc);
 
1087
    DBUG_ASSERT(rc);
1034
1088
 
1035
1089
    memcpy(cshare->buffer, write_buffer, copy_length);
1036
1090
 
1060
1114
    1  Failed to read
1061
1115
*/
1062
1116
 
1063
 
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1117
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1064
1118
{
1065
1119
  size_t length, diff_length, left_length, save_count, max_length;
1066
1120
  my_off_t pos_in_file;
1069
1123
  /* first, read the regular buffer */
1070
1124
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1071
1125
  {
1072
 
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1126
    DBUG_ASSERT(Count > left_length);   /* User is not using my_b_read() */
1073
1127
    memcpy(Buffer,info->read_pos, left_length);
1074
1128
    Buffer+=left_length;
1075
1129
    Count-=left_length;
1151
1205
 
1152
1206
      /*
1153
1207
         added the line below to make
1154
 
         assert(pos_in_file==info->end_of_file) pass.
 
1208
         DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1155
1209
         otherwise this does not appear to be needed
1156
1210
      */
1157
1211
      pos_in_file += length;
1178
1232
    size_t copy_len;
1179
1233
    size_t transfer_len;
1180
1234
 
1181
 
    assert(info->append_read_pos <= info->write_pos);
 
1235
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1182
1236
    /*
1183
1237
      TODO: figure out if the assert below is needed or correct.
1184
1238
    */
1185
 
    assert(pos_in_file == info->end_of_file);
1186
 
    copy_len=cmin(Count, len_in_buff);
 
1239
    DBUG_ASSERT(pos_in_file == info->end_of_file);
 
1240
    copy_len=min(Count, len_in_buff);
1187
1241
    memcpy(Buffer, info->append_read_pos, copy_len);
1188
1242
    info->append_read_pos += copy_len;
1189
1243
    Count -= copy_len;
1222
1276
     1          An error has occurred; IO_CACHE to error state.
1223
1277
*/
1224
1278
 
1225
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1279
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1226
1280
{
1227
1281
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1228
1282
  size_t max_length;
1229
1283
  my_off_t next_pos_in_file;
1230
 
  unsigned char *read_buffer;
 
1284
  uchar *read_buffer;
1231
1285
 
1232
1286
  memcpy(Buffer,info->read_pos,
1233
1287
         (left_length= (size_t) (info->read_end-info->read_pos)));
1284
1338
        read_length-=offset;                    /* Bytes left from read_pos */
1285
1339
      }
1286
1340
    }
 
1341
#ifndef DBUG_OFF
 
1342
    if (info->aio_read_pos > info->pos_in_file)
 
1343
    {
 
1344
      my_errno=EINVAL;
 
1345
      return(info->read_length= (size_t) -1);
 
1346
    }
 
1347
#endif
1287
1348
        /* Copy found bytes to buffer */
1288
 
    length=cmin(Count,read_length);
 
1349
    length=min(Count,read_length);
1289
1350
    memcpy(Buffer,info->read_pos,(size_t) length);
1290
1351
    Buffer+=length;
1291
1352
    Count-=length;
1319
1380
      if ((read_length=my_read(info->file,info->request_pos,
1320
1381
                               read_length, info->myflags)) == (size_t) -1)
1321
1382
        return info->error= -1;
1322
 
      use_length=cmin(Count,read_length);
 
1383
      use_length=min(Count,read_length);
1323
1384
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1324
1385
      info->read_pos=info->request_pos+Count;
1325
1386
      info->read_end=info->request_pos+read_length;
1362
1423
  if (max_length)
1363
1424
  {
1364
1425
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
1426
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
 
1427
                          (ulong) next_pos_in_file, (ulong) max_length));
1365
1428
    if (aioread(info->file,read_buffer, max_length,
1366
1429
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
1367
1430
                &info->aio_result.result))
1368
1431
    {                                           /* Skip async io */
1369
1432
      my_errno=errno;
 
1433
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
 
1434
                          errno, info->aio_result.result.aio_errno));
1370
1435
      if (info->request_pos != info->buffer)
1371
1436
      {
1372
 
        memcpy(info->buffer, info->request_pos,
1373
 
               (size_t) (info->read_end - info->read_pos));
 
1437
        bmove(info->buffer,info->request_pos,
 
1438
              (size_t) (info->read_end - info->read_pos));
1374
1439
        info->request_pos=info->buffer;
1375
1440
        info->read_pos-=info->read_length;
1376
1441
        info->read_end-=info->read_length;
1390
1455
 
1391
1456
int _my_b_get(IO_CACHE *info)
1392
1457
{
1393
 
  unsigned char buff;
 
1458
  uchar buff;
1394
1459
  IO_CACHE_CALLBACK pre_read,post_read;
1395
1460
  if ((pre_read = info->pre_read))
1396
1461
    (*pre_read)(info);
1398
1463
    return my_b_EOF;
1399
1464
  if ((post_read = info->post_read))
1400
1465
    (*post_read)(info);
1401
 
  return (int) (unsigned char) buff;
 
1466
  return (int) (uchar) buff;
1402
1467
}
1403
1468
 
1404
1469
/* 
1411
1476
   -1 On error; my_errno contains error code.
1412
1477
*/
1413
1478
 
1414
 
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1479
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1415
1480
{
1416
1481
  size_t rest_length,length;
1417
1482
 
1480
1545
  the write buffer before we are ready with it.
1481
1546
*/
1482
1547
 
1483
 
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1548
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1484
1549
{
1485
1550
  size_t rest_length,length;
1486
1551
 
1488
1553
    Assert that we cannot come here with a shared cache. If we do one
1489
1554
    day, we might need to add a call to copy_to_read_buffer().
1490
1555
  */
1491
 
  assert(!info->share);
 
1556
  DBUG_ASSERT(!info->share);
1492
1557
 
1493
1558
  lock_append_buffer(info);
1494
1559
  rest_length= (size_t) (info->write_end - info->write_pos);
1524
1589
}
1525
1590
 
1526
1591
 
1527
 
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1592
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1528
1593
{
1529
1594
  /*
1530
1595
    Sasha: We are not writing this with the ? operator to avoid hitting
1544
1609
  we will never get a seek over the end of the buffer
1545
1610
*/
1546
1611
 
1547
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
1612
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1548
1613
                   my_off_t pos)
1549
1614
{
1550
1615
  size_t length;
1554
1619
    Assert that we cannot come here with a shared cache. If we do one
1555
1620
    day, we might need to add a call to copy_to_read_buffer().
1556
1621
  */
1557
 
  assert(!info->share);
 
1622
  DBUG_ASSERT(!info->share);
1558
1623
 
1559
1624
  if (pos < info->pos_in_file)
1560
1625
  {
1609
1674
  size_t length;
1610
1675
  bool append_cache;
1611
1676
  my_off_t pos_in_file;
 
1677
  DBUG_ENTER("my_b_flush_io_cache");
 
1678
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1612
1679
 
1613
1680
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1614
1681
    need_append_buffer_lock=0;
1618
1685
    if (info->file == -1)
1619
1686
    {
1620
1687
      if (real_open_cached_file(info))
1621
 
        return((info->error= -1));
 
1688
        DBUG_RETURN((info->error= -1));
1622
1689
    }
1623
1690
    LOCK_APPEND_BUFFER;
1624
1691
 
1644
1711
            MY_FILEPOS_ERROR)
1645
1712
        {
1646
1713
          UNLOCK_APPEND_BUFFER;
1647
 
          return((info->error= -1));
 
1714
          DBUG_RETURN((info->error= -1));
1648
1715
        }
1649
1716
        if (!append_cache)
1650
1717
          info->seek_not_done=0;
1666
1733
      else
1667
1734
      {
1668
1735
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1669
 
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
 
1736
        DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
1670
1737
      }
1671
1738
 
1672
1739
      info->append_read_pos=info->write_pos=info->write_buffer;
1673
1740
      ++info->disk_writes;
1674
1741
      UNLOCK_APPEND_BUFFER;
1675
 
      return(info->error);
 
1742
      DBUG_RETURN(info->error);
1676
1743
    }
1677
1744
  }
1678
1745
#ifdef HAVE_AIOWAIT
1683
1750
  }
1684
1751
#endif
1685
1752
  UNLOCK_APPEND_BUFFER;
1686
 
  return(0);
 
1753
  DBUG_RETURN(0);
1687
1754
}
1688
1755
 
1689
1756
/*
1707
1774
{
1708
1775
  int error=0;
1709
1776
  IO_CACHE_CALLBACK pre_close;
 
1777
  DBUG_ENTER("end_io_cache");
 
1778
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1710
1779
 
1711
1780
  /*
1712
1781
    Every thread must call remove_io_thread(). The last one destroys
1713
1782
    the share elements.
1714
1783
  */
1715
 
  assert(!info->share || !info->share->total_threads);
 
1784
  DBUG_ASSERT(!info->share || !info->share->total_threads);
1716
1785
 
1717
1786
  if ((pre_close=info->pre_close))
1718
1787
  {
1724
1793
    info->alloced_buffer=0;
1725
1794
    if (info->file != -1)                       /* File doesn't exist */
1726
1795
      error= my_b_flush_io_cache(info,1);
1727
 
    free((unsigned char*) info->buffer);
1728
 
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1796
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1797
    info->buffer=info->read_pos=(uchar*) 0;
1729
1798
  }
1730
1799
  if (info->type == SEQ_READ_APPEND)
1731
1800
  {
1733
1802
    info->type= TYPE_NOT_SET;
1734
1803
    pthread_mutex_destroy(&info->append_buffer_lock);
1735
1804
  }
1736
 
  return(error);
 
1805
  DBUG_RETURN(error);
1737
1806
} /* end_io_cache */
1738
1807
 
1739
1808
 
1808
1877
    total_bytes += 4+block_size;
1809
1878
  }
1810
1879
  close_file(&sra_cache);
1811
 
  free(block);
 
1880
  my_free(block,MYF(MY_WME));
1812
1881
  if (!my_stat(fname,&status,MYF(MY_WME)))
1813
1882
    die("%s failed to stat, but I had just closed it,\
1814
1883
 wonder how that happened");