~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

Merged Drizzle's Trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
70
70
{
71
71
 
72
72
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
73
 
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
75
73
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
76
74
 
77
75
/**
78
76
 * @brief
79
 
 *   Lock appends for the IO_CACHE   
80
 
 */
81
 
inline
82
 
static void lock_append_buffer(register IO_CACHE *info)
83
 
{
84
 
  pthread_mutex_lock(&(info)->append_buffer_lock);
85
 
}
86
 
 
87
 
/**
88
 
 * @brief
89
 
 *   Lock appends for the IO_CACHE   
90
 
 */
91
 
inline
92
 
static void unlock_append_buffer(register IO_CACHE *info)
93
 
{
94
 
  pthread_mutex_unlock(&(info)->append_buffer_lock);
95
 
}
96
 
 
97
 
/**
98
 
 * @brief
99
77
 *   Lock appends for the IO_CACHE if required (need_append_buffer_lock)   
100
78
 */
101
79
inline
102
 
static void lock_append_buffer(register IO_CACHE *info, int need_append_buffer_lock)
 
80
static void lock_append_buffer(register IO_CACHE *, int )
103
81
{
104
 
  if (need_append_buffer_lock) lock_append_buffer(info);
105
82
}
106
83
 
107
84
/**
109
86
 *   Unlock appends for the IO_CACHE if required (need_append_buffer_lock)   
110
87
 */
111
88
inline
112
 
static void unlock_append_buffer(register IO_CACHE *info, int need_append_buffer_lock)
 
89
static void unlock_append_buffer(register IO_CACHE *, int )
113
90
{
114
 
  if (need_append_buffer_lock) unlock_append_buffer(info);
115
91
}
116
92
 
117
93
/**
175
151
      as myisamchk
176
152
    */
177
153
    break;
178
 
  case SEQ_READ_APPEND:
179
 
    info->read_function = _my_b_seq_read;
180
 
    info->write_function = 0;                   /* Force a core if used */
181
 
    break;
182
154
  default:
183
 
    info->read_function =
184
 
                          info->share ? _my_b_read_r :
185
 
                                        _my_b_read;
 
155
    info->read_function = _my_b_read;
186
156
    info->write_function = _my_b_write;
187
157
  }
188
158
 
245
215
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
246
216
  }
247
217
 
248
 
  info->share=0;
249
 
 
250
218
  if (!cachesize && !(cachesize= my_default_record_cache_size))
251
219
    return(1);                          /* No cache requested */
252
220
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
253
 
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
 
221
  if (type == READ_CACHE)
254
222
  {                                             /* Assume file isn't growing */
255
223
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
256
224
    {
279
247
      if (cachesize < min_cache)
280
248
        cachesize = min_cache;
281
249
      buffer_block= cachesize;
282
 
      if (type == SEQ_READ_APPEND)
283
 
        buffer_block *= 2;
284
250
      if ((info->buffer=
285
251
           (unsigned char*) malloc(buffer_block)) != 0)
286
252
      {
287
253
        info->write_buffer=info->buffer;
288
 
        if (type == SEQ_READ_APPEND)
289
 
          info->write_buffer = info->buffer + cachesize;
290
 
        info->alloced_buffer=1;
 
254
        info->alloced_buffer= true;
291
255
        break;                                  /* Enough memory found */
292
256
      }
293
257
      if (cachesize == min_cache)
300
264
  info->read_length=info->buffer_length=cachesize;
301
265
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
302
266
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
303
 
  if (type == SEQ_READ_APPEND)
304
 
  {
305
 
    info->append_read_pos = info->write_pos = info->write_buffer;
306
 
    info->write_end = info->write_buffer + info->buffer_length;
307
 
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
308
 
  }
309
267
 
310
268
  if (type == WRITE_CACHE)
311
269
    info->write_end=
372
330
{
373
331
  /* One can't do reinit with the following types */
374
332
  assert(type != READ_NET && info->type != READ_NET &&
375
 
              type != WRITE_NET && info->type != WRITE_NET &&
376
 
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
 
333
              type != WRITE_NET && info->type != WRITE_NET);
377
334
 
378
335
  /* If the whole file is in memory, avoid flushing to disk */
379
336
  if (! clear_cache &&
469
426
 *   types than my_off_t unless you can be sure that their value fits.
470
427
 *   Same applies to differences of file offsets.
471
428
 *
472
 
 * When changing this function, check _my_b_read_r(). It might need the
473
 
 * same change.
474
 
 * 
475
 
 * @param info IO_CACHE pointer
476
 
 * @param Buffer Buffer to retrieve count bytes from file
477
 
 * @param Count Number of bytes to read into Buffer
 
429
 * @param info IO_CACHE pointer @param Buffer Buffer to retrieve count bytes
 
430
 * from file @param Count Number of bytes to read into Buffer
478
431
 * 
479
432
 * @retval 0 We succeeded in reading all data
480
433
 * @retval 1 Error: can't read requested characters
576
529
  return(0);
577
530
}
578
531
 
579
 
/**
580
 
 * @brief
581
 
 *   Prepare IO_CACHE for shared use.
582
 
 *
583
 
 * @detail
584
 
 *   The shared cache is used so: One IO_CACHE is initialized with
585
 
 *   init_io_cache(). This includes the allocation of a buffer. Then a
586
 
 *   share is allocated and init_io_cache_share() is called with the io
587
 
 *   cache and the share. Then the io cache is copied for each thread. So
588
 
 *   every thread has its own copy of IO_CACHE. But the allocated buffer
589
 
 *   is shared because cache->buffer is the same for all caches.
590
 
 *
591
 
 *   One thread reads data from the file into the buffer. All threads
592
 
 *   read from the buffer, but every thread maintains its own set of
593
 
 *   pointers into the buffer. When all threads have used up the buffer
594
 
 *   contents, one of the threads reads the next block of data into the
595
 
 *   buffer. To accomplish this, each thread enters the cache lock before
596
 
 *   accessing the buffer. They wait in lock_io_cache() until all threads
597
 
 *   joined the lock. The last thread entering the lock is in charge of
598
 
 *   reading from file to buffer. It wakes all threads when done.
599
 
 *
600
 
 *   Synchronizing a write cache to the read caches works so: Whenever
601
 
 *   the write buffer needs a flush, the write thread enters the lock and
602
 
 *   waits for all other threads to enter the lock too. They do this when
603
 
 *   they have used up the read buffer. When all threads are in the lock,
604
 
 *   the write thread copies the write buffer to the read buffer and
605
 
 *   wakes all threads.
606
 
 *
607
 
 *   share->running_threads is the number of threads not being in the
608
 
 *   cache lock. When entering lock_io_cache() the number is decreased.
609
 
 *   When the thread that fills the buffer enters unlock_io_cache() the
610
 
 *   number is reset to the number of threads. The condition
611
 
 *   running_threads == 0 means that all threads are in the lock. Bumping
612
 
 *   up the number to the full count is non-intuitive. But increasing the
613
 
 *   number by one for each thread that leaves the lock could lead to a
614
 
 *   solo run of one thread. The last thread to join a lock reads from
615
 
 *   file to buffer, wakes the other threads, processes the data in the
616
 
 *   cache and enters the lock again. If no other thread left the lock
617
 
 *   meanwhile, it would think it's the last one again and read the next
618
 
 *   block...
619
 
 *
620
 
 *   The share has copies of 'error', 'buffer', 'read_end', and
621
 
 *   'pos_in_file' from the thread that filled the buffer. We may not be
622
 
 *   able to access this information directly from its cache because the
623
 
 *   thread may be removed from the share before the variables could be
624
 
 *   copied by all other threads. Or, if a write buffer is synchronized,
625
 
 *   it would change its 'pos_in_file' after waking the other threads,
626
 
 *   possibly before they could copy its value.
627
 
 *
628
 
 *   However, the 'buffer' variable in the share is for a synchronized
629
 
 *   write cache. It needs to know where to put the data. Otherwise it
630
 
 *   would need access to the read cache of one of the threads that is
631
 
 *   not yet removed from the share.
632
 
 *
633
 
 * @param read_cache A read cache. This will be copied for every thread after setup.
634
 
 * @param cshare The share.
635
 
 * @param write_cache If non-NULL a write cache that is to be synchronized with the read caches.
636
 
 * @param num_threads Number of threads sharing the cache including the write thread if any.
637
 
 */
638
 
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
639
 
                         IO_CACHE *write_cache, uint32_t num_threads)
640
 
{
641
 
  assert(num_threads > 1);
642
 
  assert(read_cache->type == READ_CACHE);
643
 
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
644
 
 
645
 
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
646
 
  pthread_cond_init(&cshare->cond, 0);
647
 
  pthread_cond_init(&cshare->cond_writer, 0);
648
 
 
649
 
  cshare->running_threads= num_threads;
650
 
  cshare->total_threads=   num_threads;
651
 
  cshare->error=           0;    /* Initialize. */
652
 
  cshare->buffer=          read_cache->buffer;
653
 
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
654
 
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
655
 
  cshare->source_cache=    write_cache; /* Can be NULL. */
656
 
 
657
 
  read_cache->share=         cshare;
658
 
  read_cache->read_function= _my_b_read_r;
659
 
  read_cache->current_pos=   NULL;
660
 
  read_cache->current_end=   NULL;
661
 
 
662
 
  if (write_cache)
663
 
    write_cache->share= cshare;
664
 
 
665
 
  return;
666
 
}
667
 
 
668
 
/**
669
 
 * @brief
670
 
 *   Remove a thread from shared access to IO_CACHE.
671
 
 * @detail
672
 
 *   Every thread must do that on exit for not to deadlock other threads.
673
 
 *   The last thread destroys the pthread resources.
674
 
 *   A writer flushes its cache first.
675
 
 *
676
 
 * @param cache The IO_CACHE to be removed from the share.
677
 
 */
678
 
void remove_io_thread(IO_CACHE *cache)
679
 
{
680
 
  IO_CACHE_SHARE *cshare= cache->share;
681
 
  uint32_t total;
682
 
 
683
 
  /* If the writer goes, it needs to flush the write cache. */
684
 
  if (cache == cshare->source_cache)
685
 
    flush_io_cache(cache);
686
 
 
687
 
  pthread_mutex_lock(&cshare->mutex);
688
 
 
689
 
  /* Remove from share. */
690
 
  total= --cshare->total_threads;
691
 
 
692
 
  /* Detach from share. */
693
 
  cache->share= NULL;
694
 
 
695
 
  /* If the writer goes, let the readers know. */
696
 
  if (cache == cshare->source_cache)
697
 
  {
698
 
    cshare->source_cache= NULL;
699
 
  }
700
 
 
701
 
  /* If all threads are waiting for me to join the lock, wake them. */
702
 
  if (!--cshare->running_threads)
703
 
  {
704
 
    pthread_cond_signal(&cshare->cond_writer);
705
 
    pthread_cond_broadcast(&cshare->cond);
706
 
  }
707
 
 
708
 
  pthread_mutex_unlock(&cshare->mutex);
709
 
 
710
 
  if (!total)
711
 
  {
712
 
    pthread_cond_destroy (&cshare->cond_writer);
713
 
    pthread_cond_destroy (&cshare->cond);
714
 
    pthread_mutex_destroy(&cshare->mutex);
715
 
  }
716
 
 
717
 
  return;
718
 
}
719
 
 
720
 
/**
721
 
 * @brief
722
 
 *   Lock IO cache and wait for all other threads to join.
723
 
 *
724
 
 * @detail
725
 
 *   Wait for all threads to finish with the current buffer. We want
726
 
 *   all threads to proceed in concert. The last thread to join
727
 
 *   lock_io_cache() will read the block from file and all threads start
728
 
 *   to use it. Then they will join again for reading the next block.
729
 
 *
730
 
 *   The waiting threads detect a fresh buffer by comparing
731
 
 *   cshare->pos_in_file with the position they want to process next.
732
 
 *   Since the first block may start at position 0, we take
733
 
 *   cshare->read_end as an additional condition. This variable is
734
 
 *   initialized to NULL and will be set after a block of data is written
735
 
 *   to the buffer.
736
 
 *
737
 
 * @param cache The cache of the thread entering the lock.
738
 
 * @param pos File position of the block to read. Unused for the write thread.
739
 
 *
740
 
 * @retval 1 OK, lock in place, go ahead and read.
741
 
 * @retval 0 OK, unlocked, another thread did the read.
742
 
 */
743
 
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
744
 
{
745
 
  IO_CACHE_SHARE *cshare= cache->share;
746
 
 
747
 
  /* Enter the lock. */
748
 
  pthread_mutex_lock(&cshare->mutex);
749
 
  cshare->running_threads--;
750
 
 
751
 
  if (cshare->source_cache)
752
 
  {
753
 
    /* A write cache is synchronized to the read caches. */
754
 
 
755
 
    if (cache == cshare->source_cache)
756
 
    {
757
 
      /* The writer waits until all readers are here. */
758
 
      while (cshare->running_threads)
759
 
      {
760
 
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
761
 
      }
762
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
763
 
      return(1);
764
 
    }
765
 
 
766
 
    /* The last thread wakes the writer. */
767
 
    if (!cshare->running_threads)
768
 
    {
769
 
      pthread_cond_signal(&cshare->cond_writer);
770
 
    }
771
 
 
772
 
    /*
773
 
      Readers wait until the data is copied from the writer. Another
774
 
      reason to stop waiting is the removal of the write thread. If this
775
 
      happens, we leave the lock with old data in the buffer.
776
 
    */
777
 
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
778
 
           cshare->source_cache)
779
 
    {
780
 
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
781
 
    }
782
 
 
783
 
    /*
784
 
      If the writer was removed from the share while this thread was
785
 
      asleep, we need to simulate an EOF condition. The writer cannot
786
 
      reset the share variables as they might still be in use by readers
787
 
      of the last block. When we awake here then because the last
788
 
      joining thread signalled us. If the writer is not the last, it
789
 
      will not signal. So it is safe to clear the buffer here.
790
 
    */
791
 
    if (!cshare->read_end || (cshare->pos_in_file < pos))
792
 
    {
793
 
      cshare->read_end= cshare->buffer; /* Empty buffer. */
794
 
      cshare->error= 0; /* EOF is not an error. */
795
 
    }
796
 
  }
797
 
  else
798
 
  {
799
 
    /*
800
 
      There are read caches only. The last thread arriving in
801
 
      lock_io_cache() continues with a locked cache and reads the block.
802
 
    */
803
 
    if (!cshare->running_threads)
804
 
    {
805
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
806
 
      return(1);
807
 
    }
808
 
 
809
 
    /*
810
 
      All other threads wait until the requested block is read by the
811
 
      last thread arriving. Another reason to stop waiting is the
812
 
      removal of a thread. If this leads to all threads being in the
813
 
      lock, we have to continue also. The first of the awaken threads
814
 
      will then do the read.
815
 
    */
816
 
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
817
 
           cshare->running_threads)
818
 
    {
819
 
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
820
 
    }
821
 
 
822
 
    /* If the block is not yet read, continue with a locked cache and read. */
823
 
    if (!cshare->read_end || (cshare->pos_in_file < pos))
824
 
    {
825
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
826
 
      return(1);
827
 
    }
828
 
 
829
 
    /* Another thread did read the block already. */
830
 
  }
831
 
 
832
 
  /*
833
 
    Leave the lock. Do not call unlock_io_cache() later. The thread that
834
 
    filled the buffer did this and marked all threads as running.
835
 
  */
836
 
  pthread_mutex_unlock(&cshare->mutex);
837
 
  return(0);
838
 
}
839
 
 
840
 
/**
841
 
 * @brief
842
 
 *   Unlock IO cache.
843
 
 * 
844
 
 * @detail
845
 
 *   This is called by the thread that filled the buffer. It marks all
846
 
 *   threads as running and awakes them. This must not be done by any
847
 
 *   other thread.
848
 
 *
849
 
 *   Do not signal cond_writer. Either there is no writer or the writer
850
 
 *   is the only one who can call this function.
851
 
 *
852
 
 *   The reason for resetting running_threads to total_threads before
853
 
 *   waking all other threads is that it could be possible that this
854
 
 *   thread is so fast with processing the buffer that it enters the lock
855
 
 *   before even one other thread has left it. If every awoken thread
856
 
 *   would increase running_threads by one, this thread could think that
857
 
 *   he is again the last to join and would not wait for the other
858
 
 *   threads to process the data.
859
 
 *
860
 
 * @param cache The cache of the thread leaving the lock.
861
 
 */
862
 
static void unlock_io_cache(IO_CACHE *cache)
863
 
{
864
 
  IO_CACHE_SHARE *cshare= cache->share;
865
 
 
866
 
  cshare->running_threads= cshare->total_threads;
867
 
  pthread_cond_broadcast(&cshare->cond);
868
 
  pthread_mutex_unlock(&cshare->mutex);
869
 
  return;
870
 
}
871
 
 
872
 
/**
873
 
 * @brief
874
 
 *   Read from IO_CACHE when it is shared between several threads.
875
 
 * @detail
876
 
 *   This function is only called from the my_b_read() macro when there
877
 
 *   aren't enough characters in the buffer to satisfy the request.
878
 
 *
879
 
 * IMPLEMENTATION
880
 
 *   It works as follows: when a thread tries to read from a file (that
881
 
 *   is, after using all the data from the (shared) buffer), it just
882
 
 *   hangs on lock_io_cache(), waiting for other threads. When the very
883
 
 *   last thread attempts a read, lock_io_cache() returns 1, the thread
884
 
 *   does actual IO and unlock_io_cache(), which signals all the waiting
885
 
 *   threads that data is in the buffer.
886
 
 *
887
 
 * WARNING
888
 
 *   When changing this function, be careful with handling file offsets
889
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
890
 
 *   types than my_off_t unless you can be sure that their value fits.
891
 
 *   Same applies to differences of file offsets. (Bug #11527)
892
 
 *
893
 
 *   When changing this function, check _my_b_read(). It might need thesame change.
894
 
 * 
895
 
 * @param cache IO_CACHE pointer
896
 
 * @param Buffer Buffer to retrieve count bytes from file
897
 
 * @param Count Number of bytes to read into Buffer
898
 
 *
899
 
 * @retval 0 We succeeded in reading all data
900
 
 * @retval 1 Error: can't read requested characters
901
 
 */
902
 
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
903
 
{
904
 
  my_off_t pos_in_file;
905
 
  size_t length, diff_length, left_length;
906
 
  IO_CACHE_SHARE *cshare= cache->share;
907
 
 
908
 
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
909
 
  {
910
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
911
 
    memcpy(Buffer, cache->read_pos, left_length);
912
 
    Buffer+= left_length;
913
 
    Count-= left_length;
914
 
  }
915
 
  while (Count)
916
 
  {
917
 
    size_t cnt, len;
918
 
 
919
 
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
920
 
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
921
 
    length=io_round_up(Count+diff_length)-diff_length;
922
 
    length= ((length <= cache->read_length) ?
923
 
             length + io_round_dn(cache->read_length - length) :
924
 
             length - io_round_up(length - cache->read_length));
925
 
    if (cache->type != READ_FIFO &&
926
 
        (length > (cache->end_of_file - pos_in_file)))
927
 
      length= (size_t) (cache->end_of_file - pos_in_file);
928
 
    if (length == 0)
929
 
    {
930
 
      cache->error= (int) left_length;
931
 
      return(1);
932
 
    }
933
 
    if (lock_io_cache(cache, pos_in_file))
934
 
    {
935
 
      /* With a synchronized write/read cache we won't come here... */
936
 
      assert(!cshare->source_cache);
937
 
      /*
938
 
        ... unless the writer has gone before this thread entered the
939
 
        lock. Simulate EOF in this case. It can be distinguished by
940
 
        cache->file.
941
 
      */
942
 
      if (cache->file < 0)
943
 
        len= 0;
944
 
      else
945
 
      {
946
 
        /*
947
 
          Whenever a function which operates on IO_CACHE flushes/writes
948
 
          some part of the IO_CACHE to disk it will set the property
949
 
          "seek_not_done" to indicate this to other functions operating
950
 
          on the IO_CACHE.
951
 
        */
952
 
        if (cache->seek_not_done)
953
 
        {
954
 
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
955
 
          {
956
 
            cache->error= -1;
957
 
            unlock_io_cache(cache);
958
 
            return(1);
959
 
          }
960
 
        }
961
 
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
962
 
      }
963
 
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
964
 
      cache->error=       (len == length ? 0 : (int) len);
965
 
      cache->pos_in_file= pos_in_file;
966
 
 
967
 
      /* Copy important values to the share. */
968
 
      cshare->error=       cache->error;
969
 
      cshare->read_end=    cache->read_end;
970
 
      cshare->pos_in_file= pos_in_file;
971
 
 
972
 
      /* Mark all threads as running and wake them. */
973
 
      unlock_io_cache(cache);
974
 
    }
975
 
    else
976
 
    {
977
 
      /*
978
 
        With a synchronized write/read cache readers always come here.
979
 
        Copy important values from the share.
980
 
      */
981
 
      cache->error=       cshare->error;
982
 
      cache->read_end=    cshare->read_end;
983
 
      cache->pos_in_file= cshare->pos_in_file;
984
 
 
985
 
      len= ((cache->error == -1) ? (size_t) -1 :
986
 
            (size_t) (cache->read_end - cache->buffer));
987
 
    }
988
 
    cache->read_pos=      cache->buffer;
989
 
    cache->seek_not_done= 0;
990
 
    if (len == 0 || len == (size_t) -1)
991
 
    {
992
 
      cache->error= (int) left_length;
993
 
      return(1);
994
 
    }
995
 
    cnt= (len > Count) ? Count : len;
996
 
    memcpy(Buffer, cache->read_pos, cnt);
997
 
    Count -= cnt;
998
 
    Buffer+= cnt;
999
 
    left_length+= cnt;
1000
 
    cache->read_pos+= cnt;
1001
 
  }
1002
 
  return(0);
1003
 
}
1004
 
 
1005
 
/**
1006
 
 * @brief
1007
 
 *   Copy data from write cache to read cache.
1008
 
 * @detail
1009
 
 *   The write thread will wait for all read threads to join the cache lock.
1010
 
 *   Then it copies the data over and wakes the read threads.
1011
 
 *
1012
 
 * @param write_cache The write cache.
1013
 
 * @param write_buffer The source of data, mostly the cache buffer.
1014
 
 * @param write_length The number of bytes to copy.
1015
 
 */
1016
 
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
 
                                const unsigned char *write_buffer, size_t write_length)
1018
 
{
1019
 
  IO_CACHE_SHARE *cshare= write_cache->share;
1020
 
 
1021
 
  assert(cshare->source_cache == write_cache);
1022
 
  /*
1023
 
    write_length is usually less or equal to buffer_length.
1024
 
    It can be bigger if _my_b_write() is called with a big length.
1025
 
  */
1026
 
  while (write_length)
1027
 
  {
1028
 
    size_t copy_length= min(write_length, write_cache->buffer_length);
1029
 
    int  rc;
1030
 
 
1031
 
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
 
    /* The writing thread does always have the lock when it awakes. */
1033
 
    assert(rc);
1034
 
 
1035
 
    memcpy(cshare->buffer, write_buffer, copy_length);
1036
 
 
1037
 
    cshare->error=       0;
1038
 
    cshare->read_end=    cshare->buffer + copy_length;
1039
 
    cshare->pos_in_file= write_cache->pos_in_file;
1040
 
 
1041
 
    /* Mark all threads as running and wake them. */
1042
 
    unlock_io_cache(write_cache);
1043
 
 
1044
 
    write_buffer+= copy_length;
1045
 
    write_length-= copy_length;
1046
 
  }
1047
 
}
1048
 
 
1049
 
/**
1050
 
 * @brief
1051
 
 *   Do sequential read from the SEQ_READ_APPEND cache.
1052
 
 *
1053
 
 * @detail
1054
 
 *   We do this in three stages:
1055
 
 *      - first read from info->buffer
1056
 
 *      - then if there are still data to read, try the file descriptor
1057
 
 *      - afterwards, if there are still data to read, try append buffer
1058
 
 *
1059
 
 * @retval 0 Success
1060
 
 * @retval 1 Failed to read
1061
 
 */
1062
 
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1063
 
{
1064
 
  size_t length, diff_length, left_length, save_count, max_length;
1065
 
  my_off_t pos_in_file;
1066
 
  save_count=Count;
1067
 
 
1068
 
  /* first, read the regular buffer */
1069
 
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1070
 
  {
1071
 
    assert(Count > left_length);        /* User is not using my_b_read() */
1072
 
    memcpy(Buffer,info->read_pos, left_length);
1073
 
    Buffer+=left_length;
1074
 
    Count-=left_length;
1075
 
  }
1076
 
  lock_append_buffer(info);
1077
 
 
1078
 
  /* pos_in_file always point on where info->buffer was read */
1079
 
  if ((pos_in_file=info->pos_in_file +
1080
 
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1081
 
    goto read_append_buffer;
1082
 
 
1083
 
  /*
1084
 
    With read-append cache we must always do a seek before we read,
1085
 
    because the write could have moved the file pointer astray
1086
 
  */
1087
 
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1088
 
  {
1089
 
   info->error= -1;
1090
 
   unlock_append_buffer(info);
1091
 
   return (1);
1092
 
  }
1093
 
  info->seek_not_done=0;
1094
 
 
1095
 
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1096
 
 
1097
 
  /* now the second stage begins - read from file descriptor */
1098
 
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1099
 
  {
1100
 
    /* Fill first intern buffer */
1101
 
    size_t read_length;
1102
 
 
1103
 
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1104
 
    if ((read_length= my_read(info->file,Buffer, length,
1105
 
                              info->myflags)) == (size_t) -1)
1106
 
    {
1107
 
      info->error= -1;
1108
 
      unlock_append_buffer(info);
1109
 
      return 1;
1110
 
    }
1111
 
    Count-=read_length;
1112
 
    Buffer+=read_length;
1113
 
    pos_in_file+=read_length;
1114
 
 
1115
 
    if (read_length != length)
1116
 
    {
1117
 
      /*
1118
 
        We only got part of data;  Read the rest of the data from the
1119
 
        write buffer
1120
 
      */
1121
 
      goto read_append_buffer;
1122
 
    }
1123
 
    left_length+=length;
1124
 
    diff_length=0;
1125
 
  }
1126
 
 
1127
 
  max_length= info->read_length-diff_length;
1128
 
  if (max_length > (info->end_of_file - pos_in_file))
1129
 
    max_length= (size_t) (info->end_of_file - pos_in_file);
1130
 
  if (!max_length)
1131
 
  {
1132
 
    if (Count)
1133
 
      goto read_append_buffer;
1134
 
    length=0;                           /* Didn't read any more chars */
1135
 
  }
1136
 
  else
1137
 
  {
1138
 
    length= my_read(info->file,info->buffer, max_length, info->myflags);
1139
 
    if (length == (size_t) -1)
1140
 
    {
1141
 
      info->error= -1;
1142
 
      unlock_append_buffer(info);
1143
 
      return 1;
1144
 
    }
1145
 
    if (length < Count)
1146
 
    {
1147
 
      memcpy(Buffer, info->buffer, length);
1148
 
      Count -= length;
1149
 
      Buffer += length;
1150
 
 
1151
 
      /*
1152
 
         added the line below to make
1153
 
         assert(pos_in_file==info->end_of_file) pass.
1154
 
         otherwise this does not appear to be needed
1155
 
      */
1156
 
      pos_in_file += length;
1157
 
      goto read_append_buffer;
1158
 
    }
1159
 
  }
1160
 
  unlock_append_buffer(info);
1161
 
  info->read_pos=info->buffer+Count;
1162
 
  info->read_end=info->buffer+length;
1163
 
  info->pos_in_file=pos_in_file;
1164
 
  memcpy(Buffer,info->buffer,(size_t) Count);
1165
 
  return 0;
1166
 
 
1167
 
read_append_buffer:
1168
 
 
1169
 
  /*
1170
 
     Read data from the current write buffer.
1171
 
     Count should never be == 0 here (The code will work even if count is 0)
1172
 
  */
1173
 
 
1174
 
  {
1175
 
    /* First copy the data to Count */
1176
 
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1177
 
    size_t copy_len;
1178
 
    size_t transfer_len;
1179
 
 
1180
 
    assert(info->append_read_pos <= info->write_pos);
1181
 
    /*
1182
 
      TODO: figure out if the assert below is needed or correct.
1183
 
    */
1184
 
    assert(pos_in_file == info->end_of_file);
1185
 
    copy_len=min(Count, len_in_buff);
1186
 
    memcpy(Buffer, info->append_read_pos, copy_len);
1187
 
    info->append_read_pos += copy_len;
1188
 
    Count -= copy_len;
1189
 
    if (Count)
1190
 
      info->error = static_cast<int>(save_count - Count);
1191
 
 
1192
 
    /* Fill read buffer with data from write buffer */
1193
 
    memcpy(info->buffer, info->append_read_pos,
1194
 
           (size_t) (transfer_len=len_in_buff - copy_len));
1195
 
    info->read_pos= info->buffer;
1196
 
    info->read_end= info->buffer+transfer_len;
1197
 
    info->append_read_pos=info->write_pos;
1198
 
    info->pos_in_file=pos_in_file+copy_len;
1199
 
    info->end_of_file+=len_in_buff;
1200
 
  }
1201
 
  unlock_append_buffer(info);
1202
 
  return Count ? 1 : 0;
1203
 
}
1204
 
 
1205
532
 
1206
533
#ifdef HAVE_AIOWAIT
1207
534
 
1444
771
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1445
772
      return info->error= -1;
1446
773
 
1447
 
    /*
1448
 
      In case of a shared I/O cache with a writer we normally do direct
1449
 
      write cache to read cache copy. Simulate this here by direct
1450
 
      caller buffer to read cache copy. Do it after the write so that
1451
 
      the cache readers actions on the flushed part can go in parallel
1452
 
      with the write of the extra stuff. copy_to_read_buffer()
1453
 
      synchronizes writer and readers so that after this call the
1454
 
      readers can act on the extra stuff while the writer can go ahead
1455
 
      and prepare the next output. copy_to_read_buffer() relies on
1456
 
      info->pos_in_file.
1457
 
    */
1458
 
    if (info->share)
1459
 
      copy_to_read_buffer(info, Buffer, length);
1460
 
 
1461
774
    Count-=length;
1462
775
    Buffer+=length;
1463
776
    info->pos_in_file+=length;
1479
792
  size_t length;
1480
793
  int error=0;
1481
794
 
1482
 
  /*
1483
 
    Assert that we cannot come here with a shared cache. If we do one
1484
 
    day, we might need to add a call to copy_to_read_buffer().
1485
 
  */
1486
 
  assert(!info->share);
1487
 
 
1488
795
  if (pos < info->pos_in_file)
1489
796
  {
1490
797
    /* Of no overlap, write everything without buffering */
1529
836
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1530
837
{
1531
838
  size_t length;
1532
 
  bool append_cache;
 
839
  bool append_cache= false;
1533
840
  my_off_t pos_in_file;
1534
841
 
1535
 
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1536
 
    need_append_buffer_lock=0;
1537
 
 
1538
842
  if (info->type == WRITE_CACHE || append_cache)
1539
843
  {
1540
844
    if (info->file == -1)
1546
850
 
1547
851
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
1548
852
    {
1549
 
      /*
1550
 
        In case of a shared I/O cache with a writer we do direct write
1551
 
        cache to read cache copy. Do it before the write here so that
1552
 
        the readers can work in parallel with the write.
1553
 
        copy_to_read_buffer() relies on info->pos_in_file.
1554
 
      */
1555
 
      if (info->share)
1556
 
        copy_to_read_buffer(info, info->write_buffer, length);
1557
 
 
1558
853
      pos_in_file=info->pos_in_file;
1559
854
      /*
1560
855
        If we have append cache, we always open the file with
1626
921
  int error=0;
1627
922
  IO_CACHE_CALLBACK pre_close;
1628
923
 
1629
 
  /*
1630
 
    Every thread must call remove_io_thread(). The last one destroys
1631
 
    the share elements.
1632
 
  */
1633
 
  assert(!info->share || !info->share->total_threads);
1634
 
 
1635
924
  if ((pre_close=info->pre_close))
1636
925
  {
1637
926
    (*pre_close)(info);
1645
934
    free((unsigned char*) info->buffer);
1646
935
    info->buffer=info->read_pos=(unsigned char*) 0;
1647
936
  }
1648
 
  if (info->type == SEQ_READ_APPEND)
1649
 
  {
1650
 
    /* Destroy allocated mutex */
1651
 
    info->type= TYPE_NOT_SET;
1652
 
    pthread_mutex_destroy(&info->append_buffer_lock);
1653
 
  }
 
937
 
1654
938
  return(error);
1655
939
} /* end_io_cache */
1656
940
 
1657
941
} /* namespace internal */
1658
942
} /* namespace drizzled */
1659
 
 
1660
 
/**********************************************************************
1661
 
 Testing of MF_IOCACHE
1662
 
**********************************************************************/
1663
 
 
1664
 
#ifdef MAIN
1665
 
 
1666
 
void die(const char* fmt, ...)
1667
 
{
1668
 
  va_list va_args;
1669
 
  va_start(va_args,fmt);
1670
 
  fprintf(stderr,"Error:");
1671
 
  vfprintf(stderr, fmt,va_args);
1672
 
  fprintf(stderr,", errno=%d\n", errno);
1673
 
  exit(1);
1674
 
}
1675
 
 
1676
 
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1677
 
{
1678
 
  int fd;
1679
 
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1680
 
    die("Could not open %s", fname);
1681
 
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1682
 
    die("failed in init_io_cache()");
1683
 
  return fd;
1684
 
}
1685
 
 
1686
 
void close_file(IO_CACHE* info)
1687
 
{
1688
 
  end_io_cache(info);
1689
 
  my_close(info->file, MYF(MY_WME));
1690
 
}
1691
 
 
1692
 
int main(int argc, char** argv)
1693
 
{
1694
 
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1695
 
  MY_STAT status;
1696
 
  const char* fname="/tmp/iocache.test";
1697
 
  int cache_size=16384;
1698
 
  char llstr_buf[22];
1699
 
  int max_block,total_bytes=0;
1700
 
  int i,num_loops=100,error=0;
1701
 
  char *p;
1702
 
  char* block, *block_end;
1703
 
  MY_INIT(argv[0]);
1704
 
  max_block = cache_size*3;
1705
 
  if (!(block=(char*)malloc(max_block)))
1706
 
    die("Not enough memory to allocate test block");
1707
 
  block_end = block + max_block;
1708
 
  for (p = block,i=0; p < block_end;i++)
1709
 
  {
1710
 
    *p++ = (char)i;
1711
 
  }
1712
 
  if (my_stat(fname,&status, MYF(0)) &&
1713
 
      my_delete(fname,MYF(MY_WME)))
1714
 
    {
1715
 
      die("Delete of %s failed, aborting", fname);
1716
 
    }
1717
 
  open_file(fname,&sra_cache, cache_size);
1718
 
  for (i = 0; i < num_loops; i++)
1719
 
  {
1720
 
    char buf[4];
1721
 
    int block_size = abs(rand() % max_block);
1722
 
    int4store(buf, block_size);
1723
 
    if (my_b_append(&sra_cache,buf,4) ||
1724
 
        my_b_append(&sra_cache, block, block_size))
1725
 
      die("write failed");
1726
 
    total_bytes += 4+block_size;
1727
 
  }
1728
 
  close_file(&sra_cache);
1729
 
  free(block);
1730
 
  if (!my_stat(fname,&status,MYF(MY_WME)))
1731
 
    die("%s failed to stat, but I had just closed it,\
1732
 
 wonder how that happened");
1733
 
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
1734
 
         llstr(status.st_size,llstr_buf),
1735
 
         total_bytes);
1736
 
  my_delete(fname, MYF(MY_WME));
1737
 
  /* check correctness of tests */
1738
 
  if (total_bytes != status.st_size)
1739
 
  {
1740
 
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
1741
 
supposedly written\n");
1742
 
    error=1;
1743
 
  }
1744
 
  exit(error);
1745
 
  return 0;
1746
 
}
1747
 
#endif