581
* Prepare IO_CACHE for shared use.
584
* The shared cache is used so: One IO_CACHE is initialized with
585
* init_io_cache(). This includes the allocation of a buffer. Then a
586
* share is allocated and init_io_cache_share() is called with the io
587
* cache and the share. Then the io cache is copied for each thread. So
588
* every thread has its own copy of IO_CACHE. But the allocated buffer
589
* is shared because cache->buffer is the same for all caches.
591
* One thread reads data from the file into the buffer. All threads
592
* read from the buffer, but every thread maintains its own set of
593
* pointers into the buffer. When all threads have used up the buffer
594
* contents, one of the threads reads the next block of data into the
595
* buffer. To accomplish this, each thread enters the cache lock before
596
* accessing the buffer. They wait in lock_io_cache() until all threads
597
* joined the lock. The last thread entering the lock is in charge of
598
* reading from file to buffer. It wakes all threads when done.
600
* Synchronizing a write cache to the read caches works so: Whenever
601
* the write buffer needs a flush, the write thread enters the lock and
602
* waits for all other threads to enter the lock too. They do this when
603
* they have used up the read buffer. When all threads are in the lock,
604
* the write thread copies the write buffer to the read buffer and
607
* share->running_threads is the number of threads not being in the
608
* cache lock. When entering lock_io_cache() the number is decreased.
609
* When the thread that fills the buffer enters unlock_io_cache() the
610
* number is reset to the number of threads. The condition
611
* running_threads == 0 means that all threads are in the lock. Bumping
612
* up the number to the full count is non-intuitive. But increasing the
613
* number by one for each thread that leaves the lock could lead to a
614
* solo run of one thread. The last thread to join a lock reads from
615
* file to buffer, wakes the other threads, processes the data in the
616
* cache and enters the lock again. If no other thread left the lock
617
* meanwhile, it would think it's the last one again and read the next
620
* The share has copies of 'error', 'buffer', 'read_end', and
621
* 'pos_in_file' from the thread that filled the buffer. We may not be
622
* able to access this information directly from its cache because the
623
* thread may be removed from the share before the variables could be
624
* copied by all other threads. Or, if a write buffer is synchronized,
625
* it would change its 'pos_in_file' after waking the other threads,
626
* possibly before they could copy its value.
628
* However, the 'buffer' variable in the share is for a synchronized
629
* write cache. It needs to know where to put the data. Otherwise it
630
* would need access to the read cache of one of the threads that is
631
* not yet removed from the share.
633
* @param read_cache A read cache. This will be copied for every thread after setup.
634
* @param cshare The share.
635
* @param write_cache If non-NULL a write cache that is to be synchronized with the read caches.
636
* @param num_threads Number of threads sharing the cache including the write thread if any.
638
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
639
IO_CACHE *write_cache, uint32_t num_threads)
641
assert(num_threads > 1);
642
assert(read_cache->type == READ_CACHE);
643
assert(!write_cache || (write_cache->type == WRITE_CACHE));
645
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
646
pthread_cond_init(&cshare->cond, 0);
647
pthread_cond_init(&cshare->cond_writer, 0);
649
cshare->running_threads= num_threads;
650
cshare->total_threads= num_threads;
651
cshare->error= 0; /* Initialize. */
652
cshare->buffer= read_cache->buffer;
653
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
654
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
655
cshare->source_cache= write_cache; /* Can be NULL. */
657
read_cache->share= cshare;
658
read_cache->read_function= _my_b_read_r;
659
read_cache->current_pos= NULL;
660
read_cache->current_end= NULL;
663
write_cache->share= cshare;
670
* Remove a thread from shared access to IO_CACHE.
672
* Every thread must do that on exit for not to deadlock other threads.
673
* The last thread destroys the pthread resources.
674
* A writer flushes its cache first.
676
* @param cache The IO_CACHE to be removed from the share.
678
void remove_io_thread(IO_CACHE *cache)
680
IO_CACHE_SHARE *cshare= cache->share;
683
/* If the writer goes, it needs to flush the write cache. */
684
if (cache == cshare->source_cache)
685
flush_io_cache(cache);
687
pthread_mutex_lock(&cshare->mutex);
689
/* Remove from share. */
690
total= --cshare->total_threads;
692
/* Detach from share. */
695
/* If the writer goes, let the readers know. */
696
if (cache == cshare->source_cache)
698
cshare->source_cache= NULL;
701
/* If all threads are waiting for me to join the lock, wake them. */
702
if (!--cshare->running_threads)
704
pthread_cond_signal(&cshare->cond_writer);
705
pthread_cond_broadcast(&cshare->cond);
708
pthread_mutex_unlock(&cshare->mutex);
712
pthread_cond_destroy (&cshare->cond_writer);
713
pthread_cond_destroy (&cshare->cond);
714
pthread_mutex_destroy(&cshare->mutex);
722
* Lock IO cache and wait for all other threads to join.
725
* Wait for all threads to finish with the current buffer. We want
726
* all threads to proceed in concert. The last thread to join
727
* lock_io_cache() will read the block from file and all threads start
728
* to use it. Then they will join again for reading the next block.
730
* The waiting threads detect a fresh buffer by comparing
731
* cshare->pos_in_file with the position they want to process next.
732
* Since the first block may start at position 0, we take
733
* cshare->read_end as an additional condition. This variable is
734
* initialized to NULL and will be set after a block of data is written
737
* @param cache The cache of the thread entering the lock.
738
* @param pos File position of the block to read. Unused for the write thread.
740
* @retval 1 OK, lock in place, go ahead and read.
741
* @retval 0 OK, unlocked, another thread did the read.
743
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
745
IO_CACHE_SHARE *cshare= cache->share;
747
/* Enter the lock. */
748
pthread_mutex_lock(&cshare->mutex);
749
cshare->running_threads--;
751
if (cshare->source_cache)
753
/* A write cache is synchronized to the read caches. */
755
if (cache == cshare->source_cache)
757
/* The writer waits until all readers are here. */
758
while (cshare->running_threads)
760
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
762
/* Stay locked. Leave the lock later by unlock_io_cache(). */
766
/* The last thread wakes the writer. */
767
if (!cshare->running_threads)
769
pthread_cond_signal(&cshare->cond_writer);
773
Readers wait until the data is copied from the writer. Another
774
reason to stop waiting is the removal of the write thread. If this
775
happens, we leave the lock with old data in the buffer.
777
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
778
cshare->source_cache)
780
pthread_cond_wait(&cshare->cond, &cshare->mutex);
784
If the writer was removed from the share while this thread was
785
asleep, we need to simulate an EOF condition. The writer cannot
786
reset the share variables as they might still be in use by readers
787
of the last block. When we awake here then because the last
788
joining thread signalled us. If the writer is not the last, it
789
will not signal. So it is safe to clear the buffer here.
791
if (!cshare->read_end || (cshare->pos_in_file < pos))
793
cshare->read_end= cshare->buffer; /* Empty buffer. */
794
cshare->error= 0; /* EOF is not an error. */
800
There are read caches only. The last thread arriving in
801
lock_io_cache() continues with a locked cache and reads the block.
803
if (!cshare->running_threads)
805
/* Stay locked. Leave the lock later by unlock_io_cache(). */
810
All other threads wait until the requested block is read by the
811
last thread arriving. Another reason to stop waiting is the
812
removal of a thread. If this leads to all threads being in the
813
lock, we have to continue also. The first of the awaken threads
814
will then do the read.
816
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
817
cshare->running_threads)
819
pthread_cond_wait(&cshare->cond, &cshare->mutex);
822
/* If the block is not yet read, continue with a locked cache and read. */
823
if (!cshare->read_end || (cshare->pos_in_file < pos))
825
/* Stay locked. Leave the lock later by unlock_io_cache(). */
829
/* Another thread did read the block already. */
833
Leave the lock. Do not call unlock_io_cache() later. The thread that
834
filled the buffer did this and marked all threads as running.
836
pthread_mutex_unlock(&cshare->mutex);
845
* This is called by the thread that filled the buffer. It marks all
846
* threads as running and awakes them. This must not be done by any
849
* Do not signal cond_writer. Either there is no writer or the writer
850
* is the only one who can call this function.
852
* The reason for resetting running_threads to total_threads before
853
* waking all other threads is that it could be possible that this
854
* thread is so fast with processing the buffer that it enters the lock
855
* before even one other thread has left it. If every awoken thread
856
* would increase running_threads by one, this thread could think that
857
* he is again the last to join and would not wait for the other
858
* threads to process the data.
860
* @param cache The cache of the thread leaving the lock.
862
static void unlock_io_cache(IO_CACHE *cache)
864
IO_CACHE_SHARE *cshare= cache->share;
866
cshare->running_threads= cshare->total_threads;
867
pthread_cond_broadcast(&cshare->cond);
868
pthread_mutex_unlock(&cshare->mutex);
874
* Read from IO_CACHE when it is shared between several threads.
876
* This function is only called from the my_b_read() macro when there
877
* aren't enough characters in the buffer to satisfy the request.
880
* It works as follows: when a thread tries to read from a file (that
881
* is, after using all the data from the (shared) buffer), it just
882
* hangs on lock_io_cache(), waiting for other threads. When the very
883
* last thread attempts a read, lock_io_cache() returns 1, the thread
884
* does actual IO and unlock_io_cache(), which signals all the waiting
885
* threads that data is in the buffer.
888
* When changing this function, be careful with handling file offsets
889
* (end-of_file, pos_in_file). Do not cast them to possibly smaller
890
* types than my_off_t unless you can be sure that their value fits.
891
* Same applies to differences of file offsets. (Bug #11527)
893
* When changing this function, check _my_b_read(). It might need thesame change.
895
* @param cache IO_CACHE pointer
896
* @param Buffer Buffer to retrieve count bytes from file
897
* @param Count Number of bytes to read into Buffer
899
* @retval 0 We succeeded in reading all data
900
* @retval 1 Error: can't read requested characters
902
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
904
my_off_t pos_in_file;
905
size_t length, diff_length, left_length;
906
IO_CACHE_SHARE *cshare= cache->share;
908
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
910
assert(Count >= left_length); /* User is not using my_b_read() */
911
memcpy(Buffer, cache->read_pos, left_length);
912
Buffer+= left_length;
919
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
920
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
921
length=io_round_up(Count+diff_length)-diff_length;
922
length= ((length <= cache->read_length) ?
923
length + io_round_dn(cache->read_length - length) :
924
length - io_round_up(length - cache->read_length));
925
if (cache->type != READ_FIFO &&
926
(length > (cache->end_of_file - pos_in_file)))
927
length= (size_t) (cache->end_of_file - pos_in_file);
930
cache->error= (int) left_length;
933
if (lock_io_cache(cache, pos_in_file))
935
/* With a synchronized write/read cache we won't come here... */
936
assert(!cshare->source_cache);
938
... unless the writer has gone before this thread entered the
939
lock. Simulate EOF in this case. It can be distinguished by
947
Whenever a function which operates on IO_CACHE flushes/writes
948
some part of the IO_CACHE to disk it will set the property
949
"seek_not_done" to indicate this to other functions operating
952
if (cache->seek_not_done)
954
if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
957
unlock_io_cache(cache);
961
len= my_read(cache->file, cache->buffer, length, cache->myflags);
963
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
964
cache->error= (len == length ? 0 : (int) len);
965
cache->pos_in_file= pos_in_file;
967
/* Copy important values to the share. */
968
cshare->error= cache->error;
969
cshare->read_end= cache->read_end;
970
cshare->pos_in_file= pos_in_file;
972
/* Mark all threads as running and wake them. */
973
unlock_io_cache(cache);
978
With a synchronized write/read cache readers always come here.
979
Copy important values from the share.
981
cache->error= cshare->error;
982
cache->read_end= cshare->read_end;
983
cache->pos_in_file= cshare->pos_in_file;
985
len= ((cache->error == -1) ? (size_t) -1 :
986
(size_t) (cache->read_end - cache->buffer));
988
cache->read_pos= cache->buffer;
989
cache->seek_not_done= 0;
990
if (len == 0 || len == (size_t) -1)
992
cache->error= (int) left_length;
995
cnt= (len > Count) ? Count : len;
996
memcpy(Buffer, cache->read_pos, cnt);
1000
cache->read_pos+= cnt;
1007
* Copy data from write cache to read cache.
1009
* The write thread will wait for all read threads to join the cache lock.
1010
* Then it copies the data over and wakes the read threads.
1012
* @param write_cache The write cache.
1013
* @param write_buffer The source of data, mostly the cache buffer.
1014
* @param write_length The number of bytes to copy.
1016
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
const unsigned char *write_buffer, size_t write_length)
1019
IO_CACHE_SHARE *cshare= write_cache->share;
1021
assert(cshare->source_cache == write_cache);
1023
write_length is usually less or equal to buffer_length.
1024
It can be bigger if _my_b_write() is called with a big length.
1026
while (write_length)
1028
size_t copy_length= min(write_length, write_cache->buffer_length);
1031
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
/* The writing thread does always have the lock when it awakes. */
1035
memcpy(cshare->buffer, write_buffer, copy_length);
1038
cshare->read_end= cshare->buffer + copy_length;
1039
cshare->pos_in_file= write_cache->pos_in_file;
1041
/* Mark all threads as running and wake them. */
1042
unlock_io_cache(write_cache);
1044
write_buffer+= copy_length;
1045
write_length-= copy_length;
1051
* Do sequential read from the SEQ_READ_APPEND cache.
1054
* We do this in three stages:
1055
* - first read from info->buffer
1056
* - then if there are still data to read, try the file descriptor
1057
* - afterwards, if there are still data to read, try append buffer
1060
* @retval 1 Failed to read
1062
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1064
size_t length, diff_length, left_length, save_count, max_length;
1065
my_off_t pos_in_file;
1068
/* first, read the regular buffer */
1069
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1071
assert(Count > left_length); /* User is not using my_b_read() */
1072
memcpy(Buffer,info->read_pos, left_length);
1073
Buffer+=left_length;
1076
lock_append_buffer(info);
1078
/* pos_in_file always point on where info->buffer was read */
1079
if ((pos_in_file=info->pos_in_file +
1080
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1081
goto read_append_buffer;
1084
With read-append cache we must always do a seek before we read,
1085
because the write could have moved the file pointer astray
1087
if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1090
unlock_append_buffer(info);
1093
info->seek_not_done=0;
1095
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1097
/* now the second stage begins - read from file descriptor */
1098
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1100
/* Fill first intern buffer */
1103
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1104
if ((read_length= my_read(info->file,Buffer, length,
1105
info->myflags)) == (size_t) -1)
1108
unlock_append_buffer(info);
1112
Buffer+=read_length;
1113
pos_in_file+=read_length;
1115
if (read_length != length)
1118
We only got part of data; Read the rest of the data from the
1121
goto read_append_buffer;
1123
left_length+=length;
1127
max_length= info->read_length-diff_length;
1128
if (max_length > (info->end_of_file - pos_in_file))
1129
max_length= (size_t) (info->end_of_file - pos_in_file);
1133
goto read_append_buffer;
1134
length=0; /* Didn't read any more chars */
1138
length= my_read(info->file,info->buffer, max_length, info->myflags);
1139
if (length == (size_t) -1)
1142
unlock_append_buffer(info);
1147
memcpy(Buffer, info->buffer, length);
1152
added the line below to make
1153
assert(pos_in_file==info->end_of_file) pass.
1154
otherwise this does not appear to be needed
1156
pos_in_file += length;
1157
goto read_append_buffer;
1160
unlock_append_buffer(info);
1161
info->read_pos=info->buffer+Count;
1162
info->read_end=info->buffer+length;
1163
info->pos_in_file=pos_in_file;
1164
memcpy(Buffer,info->buffer,(size_t) Count);
1170
Read data from the current write buffer.
1171
Count should never be == 0 here (The code will work even if count is 0)
1175
/* First copy the data to Count */
1176
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1178
size_t transfer_len;
1180
assert(info->append_read_pos <= info->write_pos);
1182
TODO: figure out if the assert below is needed or correct.
1184
assert(pos_in_file == info->end_of_file);
1185
copy_len=min(Count, len_in_buff);
1186
memcpy(Buffer, info->append_read_pos, copy_len);
1187
info->append_read_pos += copy_len;
1190
info->error = static_cast<int>(save_count - Count);
1192
/* Fill read buffer with data from write buffer */
1193
memcpy(info->buffer, info->append_read_pos,
1194
(size_t) (transfer_len=len_in_buff - copy_len));
1195
info->read_pos= info->buffer;
1196
info->read_end= info->buffer+transfer_len;
1197
info->append_read_pos=info->write_pos;
1198
info->pos_in_file=pos_in_file+copy_len;
1199
info->end_of_file+=len_in_buff;
1201
unlock_append_buffer(info);
1202
return Count ? 1 : 0;
1206
533
#ifdef HAVE_AIOWAIT