~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Monty Taylor
  • Date: 2008-07-01 14:33:36 UTC
  • mto: (28.1.12 backport_patch)
  • mto: This revision was merged to the branch mainline in revision 34.
  • Revision ID: monty@inaugust.com-20080701143336-8uihm7dhpu92rt0q
Somehow missed moving password.c. Duh.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
 
50
#define MAP_TO_USE_RAID
50
51
#include "mysys_priv.h"
51
 
#include <mystrings/m_string.h>
 
52
#include <m_string.h>
52
53
#ifdef HAVE_AIOWAIT
53
54
#include "mysys_err.h"
54
 
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
57
 
#include <mysys/iocache.h>
58
57
#include <errno.h>
59
 
#include <drizzled/util/test.h>
 
58
 
 
59
#ifdef THREAD
60
60
#define lock_append_buffer(info) \
61
61
 pthread_mutex_lock(&(info)->append_buffer_lock)
62
62
#define unlock_append_buffer(info) \
63
63
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
64
#else
 
65
#define lock_append_buffer(info)
 
66
#define unlock_append_buffer(info)
 
67
#endif
64
68
 
65
69
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
66
70
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
114
118
    break;
115
119
  default:
116
120
    info->read_function =
 
121
#ifdef THREAD
117
122
                          info->share ? _my_b_read_r :
 
123
#endif
118
124
                                        _my_b_read;
119
125
    info->write_function = _my_b_write;
120
126
  }
148
154
 
149
155
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
150
156
                  enum cache_type type, my_off_t seek_offset,
151
 
                  bool use_async_io, myf cache_myflags)
 
157
                  pbool use_async_io, myf cache_myflags)
152
158
{
153
159
  size_t min_cache;
154
160
  my_off_t pos;
155
161
  my_off_t end_of_file= ~(my_off_t) 0;
 
162
  DBUG_ENTER("init_io_cache");
 
163
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
 
164
                      (ulong) info, (int) type, (ulong) seek_offset));
156
165
 
157
166
  info->file= file;
158
167
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
178
187
        the beginning of whatever this file is, then somebody made a bad
179
188
        assumption.
180
189
      */
181
 
      assert(seek_offset == 0);
 
190
      DBUG_ASSERT(seek_offset == 0);
182
191
    }
183
192
    else
184
193
      info->seek_not_done= test(seek_offset != pos);
185
194
  }
186
195
 
187
196
  info->disk_writes= 0;
 
197
#ifdef THREAD
188
198
  info->share=0;
 
199
#endif
189
200
 
190
201
  if (!cachesize && !(cachesize= my_default_record_cache_size))
191
 
    return(1);                          /* No cache requested */
 
202
    DBUG_RETURN(1);                             /* No cache requested */
192
203
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
193
204
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
194
205
  {                                             /* Assume file isn't growing */
222
233
      if (type == SEQ_READ_APPEND)
223
234
        buffer_block *= 2;
224
235
      if ((info->buffer=
225
 
           (unsigned char*) my_malloc(buffer_block,
 
236
           (uchar*) my_malloc(buffer_block,
226
237
                             MYF((cache_myflags & ~ MY_WME) |
227
238
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
228
239
      {
233
244
        break;                                  /* Enough memory found */
234
245
      }
235
246
      if (cachesize == min_cache)
236
 
        return(2);                              /* Can't alloc cache */
 
247
        DBUG_RETURN(2);                         /* Can't alloc cache */
237
248
      /* Try with less memory */
238
249
      cachesize= (cachesize*3/4 & ~(min_cache-1));
239
250
    }
240
251
  }
241
252
 
 
253
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
242
254
  info->read_length=info->buffer_length=cachesize;
243
255
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
244
256
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
246
258
  {
247
259
    info->append_read_pos = info->write_pos = info->write_buffer;
248
260
    info->write_end = info->write_buffer + info->buffer_length;
 
261
#ifdef THREAD
249
262
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
263
#endif
250
264
  }
251
 
#if defined(SAFE_MUTEX)
 
265
#if defined(SAFE_MUTEX) && defined(THREAD)
252
266
  else
253
267
  {
254
268
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
255
 
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
269
    bzero((char*) &info->append_buffer_lock, sizeof(info));
256
270
  }
257
271
#endif
258
272
 
270
284
#ifdef HAVE_AIOWAIT
271
285
  if (use_async_io && ! my_disable_async_io)
272
286
  {
 
287
    DBUG_PRINT("info",("Using async io"));
273
288
    info->read_length/=2;
274
289
    info->read_function=_my_b_async_read;
275
290
  }
276
291
  info->inited=info->aio_result.pending=0;
277
292
#endif
278
 
  return(0);
 
293
  DBUG_RETURN(0);
279
294
}                                               /* init_io_cache */
280
295
 
281
296
        /* Wait until current request is ready */
292
307
      {
293
308
        if (errno == EINTR)
294
309
          continue;
 
310
        DBUG_PRINT("error",("No aio request, error: %d",errno));
295
311
        result->pending=0;                      /* Assume everythings is ok */
296
312
        break;
297
313
      }
312
328
  in the cache, we are reusing this memory without flushing it to disk.
313
329
*/
314
330
 
315
 
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
331
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
316
332
                        my_off_t seek_offset,
317
 
                        bool use_async_io __attribute__((unused)),
318
 
                        bool clear_cache)
 
333
                        pbool use_async_io __attribute__((unused)),
 
334
                        pbool clear_cache)
319
335
{
 
336
  DBUG_ENTER("reinit_io_cache");
 
337
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
 
338
                      (ulong) info, type, (ulong) seek_offset,
 
339
                      (int) clear_cache));
 
340
 
320
341
  /* One can't do reinit with the following types */
321
 
  assert(type != READ_NET && info->type != READ_NET &&
 
342
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
322
343
              type != WRITE_NET && info->type != WRITE_NET &&
323
344
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
324
345
 
328
349
      seek_offset <= my_b_tell(info))
329
350
  {
330
351
    /* Reuse current buffer without flushing it to disk */
331
 
    unsigned char *pos;
 
352
    uchar *pos;
332
353
    if (info->type == WRITE_CACHE && type == READ_CACHE)
333
354
    {
334
355
      info->read_end=info->write_pos;
367
388
      info->end_of_file=my_b_tell(info);
368
389
    /* flush cache if we want to reuse it */
369
390
    if (!clear_cache && my_b_flush_io_cache(info,1))
370
 
      return(1);
 
391
      DBUG_RETURN(1);
371
392
    info->pos_in_file=seek_offset;
372
393
    /* Better to do always do a seek */
373
394
    info->seek_not_done=1;
389
410
 
390
411
#ifdef HAVE_AIOWAIT
391
412
  if (use_async_io && ! my_disable_async_io &&
392
 
      ((uint32_t) info->buffer_length <
393
 
       (uint32_t) (info->end_of_file - seek_offset)))
 
413
      ((ulong) info->buffer_length <
 
414
       (ulong) (info->end_of_file - seek_offset)))
394
415
  {
395
416
    info->read_length=info->buffer_length/2;
396
417
    info->read_function=_my_b_async_read;
397
418
  }
398
419
  info->inited=0;
399
420
#endif
400
 
  return(0);
 
421
  DBUG_RETURN(0);
401
422
} /* reinit_io_cache */
402
423
 
403
424
 
430
451
    1      Error: can't read requested characters
431
452
*/
432
453
 
433
 
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
454
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
434
455
{
435
456
  size_t length,diff_length,left_length, max_length;
436
457
  my_off_t pos_in_file;
 
458
  DBUG_ENTER("_my_b_read");
437
459
 
438
460
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
439
461
  {
440
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
462
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
441
463
    memcpy(Buffer,info->read_pos, left_length);
442
464
    Buffer+=left_length;
443
465
    Count-=left_length;
467
489
        info->file is a pipe or socket or FIFO.  We never should have tried
468
490
        to seek on that.  See Bugs#25807 and #22828 for more info.
469
491
      */
470
 
      assert(my_errno != ESPIPE);
 
492
      DBUG_ASSERT(my_errno != ESPIPE);
471
493
      info->error= -1;
472
 
      return(1);
 
494
      DBUG_RETURN(1);
473
495
    }
474
496
  }
475
497
 
480
502
    if (info->end_of_file <= pos_in_file)
481
503
    {                                   /* End of file */
482
504
      info->error= (int) left_length;
483
 
      return(1);
 
505
      DBUG_RETURN(1);
484
506
    }
485
507
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
486
508
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
488
510
    {
489
511
      info->error= (read_length == (size_t) -1 ? -1 :
490
512
                    (int) (read_length+left_length));
491
 
      return(1);
 
513
      DBUG_RETURN(1);
492
514
    }
493
515
    Count-=length;
494
516
    Buffer+=length;
506
528
    if (Count)
507
529
    {
508
530
      info->error= left_length;         /* We only got this many char */
509
 
      return(1);
 
531
      DBUG_RETURN(1);
510
532
    }
511
533
    length=0;                           /* Didn't read any chars */
512
534
  }
519
541
    info->pos_in_file= pos_in_file;
520
542
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
521
543
    info->read_pos=info->read_end=info->buffer;
522
 
    return(1);
 
544
    DBUG_RETURN(1);
523
545
  }
524
546
  info->read_pos=info->buffer+Count;
525
547
  info->read_end=info->buffer+length;
526
548
  info->pos_in_file=pos_in_file;
527
549
  memcpy(Buffer, info->buffer, Count);
528
 
  return(0);
 
550
  DBUG_RETURN(0);
529
551
}
530
552
 
531
553
 
 
554
#ifdef THREAD
532
555
/*
533
556
  Prepare IO_CACHE for shared use.
534
557
 
598
621
*/
599
622
 
600
623
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
601
 
                         IO_CACHE *write_cache, uint32_t num_threads)
 
624
                         IO_CACHE *write_cache, uint num_threads)
602
625
{
603
 
  assert(num_threads > 1);
604
 
  assert(read_cache->type == READ_CACHE);
605
 
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
626
  DBUG_ENTER("init_io_cache_share");
 
627
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
 
628
                                "write_cache: 0x%lx  threads: %u",
 
629
                                (long) read_cache, (long) cshare,
 
630
                                (long) write_cache, num_threads));
 
631
 
 
632
  DBUG_ASSERT(num_threads > 1);
 
633
  DBUG_ASSERT(read_cache->type == READ_CACHE);
 
634
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
606
635
 
607
636
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
608
637
  pthread_cond_init(&cshare->cond, 0);
624
653
  if (write_cache)
625
654
    write_cache->share= cshare;
626
655
 
627
 
  return;
 
656
  DBUG_VOID_RETURN;
628
657
}
629
658
 
630
659
 
650
679
void remove_io_thread(IO_CACHE *cache)
651
680
{
652
681
  IO_CACHE_SHARE *cshare= cache->share;
653
 
  uint32_t total;
 
682
  uint total;
 
683
  DBUG_ENTER("remove_io_thread");
654
684
 
655
685
  /* If the writer goes, it needs to flush the write cache. */
656
686
  if (cache == cshare->source_cache)
657
687
    flush_io_cache(cache);
658
688
 
659
689
  pthread_mutex_lock(&cshare->mutex);
 
690
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
 
691
                                (cache == cshare->source_cache) ?
 
692
                                "writer" : "reader", (long) cache));
660
693
 
661
694
  /* Remove from share. */
662
695
  total= --cshare->total_threads;
 
696
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
663
697
 
664
698
  /* Detach from share. */
665
699
  cache->share= NULL;
667
701
  /* If the writer goes, let the readers know. */
668
702
  if (cache == cshare->source_cache)
669
703
  {
 
704
    DBUG_PRINT("io_cache_share", ("writer leaves"));
670
705
    cshare->source_cache= NULL;
671
706
  }
672
707
 
673
708
  /* If all threads are waiting for me to join the lock, wake them. */
674
709
  if (!--cshare->running_threads)
675
710
  {
 
711
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
676
712
    pthread_cond_signal(&cshare->cond_writer);
677
713
    pthread_cond_broadcast(&cshare->cond);
678
714
  }
681
717
 
682
718
  if (!total)
683
719
  {
 
720
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
684
721
    pthread_cond_destroy (&cshare->cond_writer);
685
722
    pthread_cond_destroy (&cshare->cond);
686
723
    pthread_mutex_destroy(&cshare->mutex);
687
724
  }
688
725
 
689
 
  return;
 
726
  DBUG_VOID_RETURN;
690
727
}
691
728
 
692
729
 
721
758
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
722
759
{
723
760
  IO_CACHE_SHARE *cshare= cache->share;
 
761
  DBUG_ENTER("lock_io_cache");
724
762
 
725
763
  /* Enter the lock. */
726
764
  pthread_mutex_lock(&cshare->mutex);
727
765
  cshare->running_threads--;
 
766
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
767
                                (cache == cshare->source_cache) ?
 
768
                                "writer" : "reader", (long) cache, (ulong) pos,
 
769
                                cshare->running_threads));
728
770
 
729
771
  if (cshare->source_cache)
730
772
  {
735
777
      /* The writer waits until all readers are here. */
736
778
      while (cshare->running_threads)
737
779
      {
 
780
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
738
781
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
739
782
      }
 
783
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
 
784
 
740
785
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
741
 
      return(1);
 
786
      DBUG_RETURN(1);
742
787
    }
743
788
 
744
789
    /* The last thread wakes the writer. */
745
790
    if (!cshare->running_threads)
746
791
    {
 
792
      DBUG_PRINT("io_cache_share", ("waking writer"));
747
793
      pthread_cond_signal(&cshare->cond_writer);
748
794
    }
749
795
 
755
801
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
756
802
           cshare->source_cache)
757
803
    {
 
804
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
758
805
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
759
806
    }
760
807
 
768
815
    */
769
816
    if (!cshare->read_end || (cshare->pos_in_file < pos))
770
817
    {
 
818
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
771
819
      cshare->read_end= cshare->buffer; /* Empty buffer. */
772
820
      cshare->error= 0; /* EOF is not an error. */
773
821
    }
780
828
    */
781
829
    if (!cshare->running_threads)
782
830
    {
 
831
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
783
832
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
784
 
      return(1);
 
833
      DBUG_RETURN(1);
785
834
    }
786
835
 
787
836
    /*
794
843
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
795
844
           cshare->running_threads)
796
845
    {
 
846
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
797
847
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
798
848
    }
799
849
 
800
850
    /* If the block is not yet read, continue with a locked cache and read. */
801
851
    if (!cshare->read_end || (cshare->pos_in_file < pos))
802
852
    {
 
853
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
803
854
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
804
 
      return(1);
 
855
      DBUG_RETURN(1);
805
856
    }
806
857
 
807
858
    /* Another thread did read the block already. */
808
859
  }
 
860
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
 
861
                                (uint) (cshare->read_end ? (size_t)
 
862
                                        (cshare->read_end - cshare->buffer) :
 
863
                                        0)));
809
864
 
810
865
  /*
811
866
    Leave the lock. Do not call unlock_io_cache() later. The thread that
812
867
    filled the buffer did this and marked all threads as running.
813
868
  */
814
869
  pthread_mutex_unlock(&cshare->mutex);
815
 
  return(0);
 
870
  DBUG_RETURN(0);
816
871
}
817
872
 
818
873
 
846
901
static void unlock_io_cache(IO_CACHE *cache)
847
902
{
848
903
  IO_CACHE_SHARE *cshare= cache->share;
 
904
  DBUG_ENTER("unlock_io_cache");
 
905
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
906
                                (cache == cshare->source_cache) ?
 
907
                                "writer" : "reader",
 
908
                                (long) cache, (ulong) cshare->pos_in_file,
 
909
                                cshare->total_threads));
849
910
 
850
911
  cshare->running_threads= cshare->total_threads;
851
912
  pthread_cond_broadcast(&cshare->cond);
852
913
  pthread_mutex_unlock(&cshare->mutex);
853
 
  return;
 
914
  DBUG_VOID_RETURN;
854
915
}
855
916
 
856
917
 
891
952
    1      Error: can't read requested characters
892
953
*/
893
954
 
894
 
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
955
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
895
956
{
896
957
  my_off_t pos_in_file;
897
958
  size_t length, diff_length, left_length;
898
959
  IO_CACHE_SHARE *cshare= cache->share;
 
960
  DBUG_ENTER("_my_b_read_r");
899
961
 
900
962
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
901
963
  {
902
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
964
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
903
965
    memcpy(Buffer, cache->read_pos, left_length);
904
966
    Buffer+= left_length;
905
967
    Count-= left_length;
920
982
    if (length == 0)
921
983
    {
922
984
      cache->error= (int) left_length;
923
 
      return(1);
 
985
      DBUG_RETURN(1);
924
986
    }
925
987
    if (lock_io_cache(cache, pos_in_file))
926
988
    {
927
989
      /* With a synchronized write/read cache we won't come here... */
928
 
      assert(!cshare->source_cache);
 
990
      DBUG_ASSERT(!cshare->source_cache);
929
991
      /*
930
992
        ... unless the writer has gone before this thread entered the
931
993
        lock. Simulate EOF in this case. It can be distinguished by
948
1010
          {
949
1011
            cache->error= -1;
950
1012
            unlock_io_cache(cache);
951
 
            return(1);
 
1013
            DBUG_RETURN(1);
952
1014
          }
953
1015
        }
954
1016
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
955
1017
      }
 
1018
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
 
1019
 
956
1020
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
957
1021
      cache->error=       (len == length ? 0 : (int) len);
958
1022
      cache->pos_in_file= pos_in_file;
982
1046
    cache->seek_not_done= 0;
983
1047
    if (len == 0 || len == (size_t) -1)
984
1048
    {
 
1049
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
 
1050
                                    (ulong) len, (ulong) left_length));
985
1051
      cache->error= (int) left_length;
986
 
      return(1);
 
1052
      DBUG_RETURN(1);
987
1053
    }
988
1054
    cnt= (len > Count) ? Count : len;
989
1055
    memcpy(Buffer, cache->read_pos, cnt);
992
1058
    left_length+= cnt;
993
1059
    cache->read_pos+= cnt;
994
1060
  }
995
 
  return(0);
 
1061
  DBUG_RETURN(0);
996
1062
}
997
1063
 
998
1064
 
1014
1080
*/
1015
1081
 
1016
1082
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
 
                                const unsigned char *write_buffer, size_t write_length)
 
1083
                                const uchar *write_buffer, size_t write_length)
1018
1084
{
1019
1085
  IO_CACHE_SHARE *cshare= write_cache->share;
1020
1086
 
1021
 
  assert(cshare->source_cache == write_cache);
 
1087
  DBUG_ASSERT(cshare->source_cache == write_cache);
1022
1088
  /*
1023
1089
    write_length is usually less or equal to buffer_length.
1024
1090
    It can be bigger if _my_b_write() is called with a big length.
1025
1091
  */
1026
1092
  while (write_length)
1027
1093
  {
1028
 
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1094
    size_t copy_length= min(write_length, write_cache->buffer_length);
1029
1095
    int  __attribute__((unused)) rc;
1030
1096
 
1031
1097
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
1098
    /* The writing thread does always have the lock when it awakes. */
1033
 
    assert(rc);
 
1099
    DBUG_ASSERT(rc);
1034
1100
 
1035
1101
    memcpy(cshare->buffer, write_buffer, copy_length);
1036
1102
 
1045
1111
    write_length-= copy_length;
1046
1112
  }
1047
1113
}
 
1114
#endif /*THREAD*/
1048
1115
 
1049
1116
 
1050
1117
/*
1060
1127
    1  Failed to read
1061
1128
*/
1062
1129
 
1063
 
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1130
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1064
1131
{
1065
1132
  size_t length, diff_length, left_length, save_count, max_length;
1066
1133
  my_off_t pos_in_file;
1069
1136
  /* first, read the regular buffer */
1070
1137
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1071
1138
  {
1072
 
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1139
    DBUG_ASSERT(Count > left_length);   /* User is not using my_b_read() */
1073
1140
    memcpy(Buffer,info->read_pos, left_length);
1074
1141
    Buffer+=left_length;
1075
1142
    Count-=left_length;
1151
1218
 
1152
1219
      /*
1153
1220
         added the line below to make
1154
 
         assert(pos_in_file==info->end_of_file) pass.
 
1221
         DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1155
1222
         otherwise this does not appear to be needed
1156
1223
      */
1157
1224
      pos_in_file += length;
1178
1245
    size_t copy_len;
1179
1246
    size_t transfer_len;
1180
1247
 
1181
 
    assert(info->append_read_pos <= info->write_pos);
 
1248
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1182
1249
    /*
1183
1250
      TODO: figure out if the assert below is needed or correct.
1184
1251
    */
1185
 
    assert(pos_in_file == info->end_of_file);
1186
 
    copy_len=cmin(Count, len_in_buff);
 
1252
    DBUG_ASSERT(pos_in_file == info->end_of_file);
 
1253
    copy_len=min(Count, len_in_buff);
1187
1254
    memcpy(Buffer, info->append_read_pos, copy_len);
1188
1255
    info->append_read_pos += copy_len;
1189
1256
    Count -= copy_len;
1222
1289
     1          An error has occurred; IO_CACHE to error state.
1223
1290
*/
1224
1291
 
1225
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1292
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1226
1293
{
1227
1294
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1228
1295
  size_t max_length;
1229
1296
  my_off_t next_pos_in_file;
1230
 
  unsigned char *read_buffer;
 
1297
  uchar *read_buffer;
1231
1298
 
1232
1299
  memcpy(Buffer,info->read_pos,
1233
1300
         (left_length= (size_t) (info->read_end-info->read_pos)));
1284
1351
        read_length-=offset;                    /* Bytes left from read_pos */
1285
1352
      }
1286
1353
    }
 
1354
#ifndef DBUG_OFF
 
1355
    if (info->aio_read_pos > info->pos_in_file)
 
1356
    {
 
1357
      my_errno=EINVAL;
 
1358
      return(info->read_length= (size_t) -1);
 
1359
    }
 
1360
#endif
1287
1361
        /* Copy found bytes to buffer */
1288
 
    length=cmin(Count,read_length);
 
1362
    length=min(Count,read_length);
1289
1363
    memcpy(Buffer,info->read_pos,(size_t) length);
1290
1364
    Buffer+=length;
1291
1365
    Count-=length;
1319
1393
      if ((read_length=my_read(info->file,info->request_pos,
1320
1394
                               read_length, info->myflags)) == (size_t) -1)
1321
1395
        return info->error= -1;
1322
 
      use_length=cmin(Count,read_length);
 
1396
      use_length=min(Count,read_length);
1323
1397
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1324
1398
      info->read_pos=info->request_pos+Count;
1325
1399
      info->read_end=info->request_pos+read_length;
1362
1436
  if (max_length)
1363
1437
  {
1364
1438
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
1439
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
 
1440
                          (ulong) next_pos_in_file, (ulong) max_length));
1365
1441
    if (aioread(info->file,read_buffer, max_length,
1366
1442
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
1367
1443
                &info->aio_result.result))
1368
1444
    {                                           /* Skip async io */
1369
1445
      my_errno=errno;
 
1446
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
 
1447
                          errno, info->aio_result.result.aio_errno));
1370
1448
      if (info->request_pos != info->buffer)
1371
1449
      {
1372
 
        memcpy(info->buffer, info->request_pos,
1373
 
               (size_t) (info->read_end - info->read_pos));
 
1450
        bmove(info->buffer,info->request_pos,
 
1451
              (size_t) (info->read_end - info->read_pos));
1374
1452
        info->request_pos=info->buffer;
1375
1453
        info->read_pos-=info->read_length;
1376
1454
        info->read_end-=info->read_length;
1390
1468
 
1391
1469
int _my_b_get(IO_CACHE *info)
1392
1470
{
1393
 
  unsigned char buff;
 
1471
  uchar buff;
1394
1472
  IO_CACHE_CALLBACK pre_read,post_read;
1395
1473
  if ((pre_read = info->pre_read))
1396
1474
    (*pre_read)(info);
1398
1476
    return my_b_EOF;
1399
1477
  if ((post_read = info->post_read))
1400
1478
    (*post_read)(info);
1401
 
  return (int) (unsigned char) buff;
 
1479
  return (int) (uchar) buff;
1402
1480
}
1403
1481
 
1404
1482
/* 
1411
1489
   -1 On error; my_errno contains error code.
1412
1490
*/
1413
1491
 
1414
 
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1492
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1415
1493
{
1416
1494
  size_t rest_length,length;
1417
1495
 
1450
1528
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1451
1529
      return info->error= -1;
1452
1530
 
 
1531
#ifdef THREAD
1453
1532
    /*
1454
1533
      In case of a shared I/O cache with a writer we normally do direct
1455
1534
      write cache to read cache copy. Simulate this here by direct
1463
1542
    */
1464
1543
    if (info->share)
1465
1544
      copy_to_read_buffer(info, Buffer, length);
 
1545
#endif
1466
1546
 
1467
1547
    Count-=length;
1468
1548
    Buffer+=length;
1480
1560
  the write buffer before we are ready with it.
1481
1561
*/
1482
1562
 
1483
 
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1563
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1484
1564
{
1485
1565
  size_t rest_length,length;
1486
1566
 
 
1567
#ifdef THREAD
1487
1568
  /*
1488
1569
    Assert that we cannot come here with a shared cache. If we do one
1489
1570
    day, we might need to add a call to copy_to_read_buffer().
1490
1571
  */
1491
 
  assert(!info->share);
 
1572
  DBUG_ASSERT(!info->share);
 
1573
#endif
1492
1574
 
1493
1575
  lock_append_buffer(info);
1494
1576
  rest_length= (size_t) (info->write_end - info->write_pos);
1524
1606
}
1525
1607
 
1526
1608
 
1527
 
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1609
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1528
1610
{
1529
1611
  /*
1530
1612
    Sasha: We are not writing this with the ? operator to avoid hitting
1544
1626
  we will never get a seek over the end of the buffer
1545
1627
*/
1546
1628
 
1547
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
1629
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1548
1630
                   my_off_t pos)
1549
1631
{
1550
1632
  size_t length;
1551
1633
  int error=0;
1552
1634
 
 
1635
#ifdef THREAD
1553
1636
  /*
1554
1637
    Assert that we cannot come here with a shared cache. If we do one
1555
1638
    day, we might need to add a call to copy_to_read_buffer().
1556
1639
  */
1557
 
  assert(!info->share);
 
1640
  DBUG_ASSERT(!info->share);
 
1641
#endif
1558
1642
 
1559
1643
  if (pos < info->pos_in_file)
1560
1644
  {
1561
1645
    /* Of no overlap, write everything without buffering */
1562
1646
    if (pos + Count <= info->pos_in_file)
1563
 
      return (pwrite(info->file, Buffer, Count, pos) == 0);
 
1647
      return my_pwrite(info->file, Buffer, Count, pos,
 
1648
                       info->myflags | MY_NABP);
1564
1649
    /* Write the part of the block that is before buffer */
1565
1650
    length= (uint) (info->pos_in_file - pos);
1566
 
    if (pwrite(info->file, Buffer, length, pos) == 0)
 
1651
    if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1567
1652
      info->error= error= -1;
1568
1653
    Buffer+=length;
1569
1654
    pos+=  length;
1599
1684
 
1600
1685
        /* Flush write cache */
1601
1686
 
 
1687
#ifdef THREAD
1602
1688
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1603
1689
  lock_append_buffer(info);
1604
1690
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1605
1691
  unlock_append_buffer(info);
 
1692
#else
 
1693
#define LOCK_APPEND_BUFFER
 
1694
#define UNLOCK_APPEND_BUFFER
 
1695
#endif
 
1696
 
1606
1697
 
1607
1698
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1608
1699
{
1609
1700
  size_t length;
1610
 
  bool append_cache;
 
1701
  my_bool append_cache;
1611
1702
  my_off_t pos_in_file;
 
1703
  DBUG_ENTER("my_b_flush_io_cache");
 
1704
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1612
1705
 
1613
1706
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1614
1707
    need_append_buffer_lock=0;
1618
1711
    if (info->file == -1)
1619
1712
    {
1620
1713
      if (real_open_cached_file(info))
1621
 
        return((info->error= -1));
 
1714
        DBUG_RETURN((info->error= -1));
1622
1715
    }
1623
1716
    LOCK_APPEND_BUFFER;
1624
1717
 
1625
1718
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
1626
1719
    {
 
1720
#ifdef THREAD
1627
1721
      /*
1628
1722
        In case of a shared I/O cache with a writer we do direct write
1629
1723
        cache to read cache copy. Do it before the write here so that
1632
1726
      */
1633
1727
      if (info->share)
1634
1728
        copy_to_read_buffer(info, info->write_buffer, length);
 
1729
#endif
1635
1730
 
1636
1731
      pos_in_file=info->pos_in_file;
1637
1732
      /*
1644
1739
            MY_FILEPOS_ERROR)
1645
1740
        {
1646
1741
          UNLOCK_APPEND_BUFFER;
1647
 
          return((info->error= -1));
 
1742
          DBUG_RETURN((info->error= -1));
1648
1743
        }
1649
1744
        if (!append_cache)
1650
1745
          info->seek_not_done=0;
1666
1761
      else
1667
1762
      {
1668
1763
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1669
 
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
 
1764
        DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
1670
1765
      }
1671
1766
 
1672
1767
      info->append_read_pos=info->write_pos=info->write_buffer;
1673
1768
      ++info->disk_writes;
1674
1769
      UNLOCK_APPEND_BUFFER;
1675
 
      return(info->error);
 
1770
      DBUG_RETURN(info->error);
1676
1771
    }
1677
1772
  }
1678
1773
#ifdef HAVE_AIOWAIT
1683
1778
  }
1684
1779
#endif
1685
1780
  UNLOCK_APPEND_BUFFER;
1686
 
  return(0);
 
1781
  DBUG_RETURN(0);
1687
1782
}
1688
1783
 
1689
1784
/*
1707
1802
{
1708
1803
  int error=0;
1709
1804
  IO_CACHE_CALLBACK pre_close;
 
1805
  DBUG_ENTER("end_io_cache");
 
1806
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1710
1807
 
 
1808
#ifdef THREAD
1711
1809
  /*
1712
1810
    Every thread must call remove_io_thread(). The last one destroys
1713
1811
    the share elements.
1714
1812
  */
1715
 
  assert(!info->share || !info->share->total_threads);
 
1813
  DBUG_ASSERT(!info->share || !info->share->total_threads);
 
1814
#endif
1716
1815
 
1717
1816
  if ((pre_close=info->pre_close))
1718
1817
  {
1724
1823
    info->alloced_buffer=0;
1725
1824
    if (info->file != -1)                       /* File doesn't exist */
1726
1825
      error= my_b_flush_io_cache(info,1);
1727
 
    free((unsigned char*) info->buffer);
1728
 
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1826
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1827
    info->buffer=info->read_pos=(uchar*) 0;
1729
1828
  }
1730
1829
  if (info->type == SEQ_READ_APPEND)
1731
1830
  {
1732
1831
    /* Destroy allocated mutex */
1733
1832
    info->type= TYPE_NOT_SET;
 
1833
#ifdef THREAD
1734
1834
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1835
#endif
1735
1836
  }
1736
 
  return(error);
 
1837
  DBUG_RETURN(error);
1737
1838
} /* end_io_cache */
1738
1839
 
1739
1840
 
1808
1909
    total_bytes += 4+block_size;
1809
1910
  }
1810
1911
  close_file(&sra_cache);
1811
 
  free(block);
 
1912
  my_free(block,MYF(MY_WME));
1812
1913
  if (!my_stat(fname,&status,MYF(MY_WME)))
1813
1914
    die("%s failed to stat, but I had just closed it,\
1814
1915
 wonder how that happened");