520
530
info->pos_in_file= pos_in_file;
521
531
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
522
532
info->read_pos=info->read_end=info->buffer;
525
535
info->read_pos=info->buffer+Count;
526
536
info->read_end=info->buffer+length;
527
537
info->pos_in_file=pos_in_file;
528
538
memcpy(Buffer, info->buffer, Count);
544
Prepare IO_CACHE for shared use.
547
init_io_cache_share()
548
read_cache A read cache. This will be copied for
549
every thread after setup.
551
write_cache If non-NULL a write cache that is to be
552
synchronized with the read caches.
553
num_threads Number of threads sharing the cache
554
including the write thread if any.
558
The shared cache is used so: One IO_CACHE is initialized with
559
init_io_cache(). This includes the allocation of a buffer. Then a
560
share is allocated and init_io_cache_share() is called with the io
561
cache and the share. Then the io cache is copied for each thread. So
562
every thread has its own copy of IO_CACHE. But the allocated buffer
563
is shared because cache->buffer is the same for all caches.
565
One thread reads data from the file into the buffer. All threads
566
read from the buffer, but every thread maintains its own set of
567
pointers into the buffer. When all threads have used up the buffer
568
contents, one of the threads reads the next block of data into the
569
buffer. To accomplish this, each thread enters the cache lock before
570
accessing the buffer. They wait in lock_io_cache() until all threads
571
joined the lock. The last thread entering the lock is in charge of
572
reading from file to buffer. It wakes all threads when done.
574
Synchronizing a write cache to the read caches works so: Whenever
575
the write buffer needs a flush, the write thread enters the lock and
576
waits for all other threads to enter the lock too. They do this when
577
they have used up the read buffer. When all threads are in the lock,
578
the write thread copies the write buffer to the read buffer and
581
share->running_threads is the number of threads not being in the
582
cache lock. When entering lock_io_cache() the number is decreased.
583
When the thread that fills the buffer enters unlock_io_cache() the
584
number is reset to the number of threads. The condition
585
running_threads == 0 means that all threads are in the lock. Bumping
586
up the number to the full count is non-intuitive. But increasing the
587
number by one for each thread that leaves the lock could lead to a
588
solo run of one thread. The last thread to join a lock reads from
589
file to buffer, wakes the other threads, processes the data in the
590
cache and enters the lock again. If no other thread left the lock
591
meanwhile, it would think it's the last one again and read the next
594
The share has copies of 'error', 'buffer', 'read_end', and
595
'pos_in_file' from the thread that filled the buffer. We may not be
596
able to access this information directly from its cache because the
597
thread may be removed from the share before the variables could be
598
copied by all other threads. Or, if a write buffer is synchronized,
599
it would change its 'pos_in_file' after waking the other threads,
600
possibly before they could copy its value.
602
However, the 'buffer' variable in the share is for a synchronized
603
write cache. It needs to know where to put the data. Otherwise it
604
would need access to the read cache of one of the threads that is
605
not yet removed from the share.
611
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
612
IO_CACHE *write_cache, uint num_threads)
614
DBUG_ENTER("init_io_cache_share");
615
DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
616
"write_cache: 0x%lx threads: %u",
617
(long) read_cache, (long) cshare,
618
(long) write_cache, num_threads));
620
DBUG_ASSERT(num_threads > 1);
621
DBUG_ASSERT(read_cache->type == READ_CACHE);
622
DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
624
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
625
pthread_cond_init(&cshare->cond, 0);
626
pthread_cond_init(&cshare->cond_writer, 0);
628
cshare->running_threads= num_threads;
629
cshare->total_threads= num_threads;
630
cshare->error= 0; /* Initialize. */
631
cshare->buffer= read_cache->buffer;
632
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
633
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
634
cshare->source_cache= write_cache; /* Can be NULL. */
636
read_cache->share= cshare;
637
read_cache->read_function= _my_b_read_r;
638
read_cache->current_pos= NULL;
639
read_cache->current_end= NULL;
642
write_cache->share= cshare;
649
Remove a thread from shared access to IO_CACHE.
653
cache The IO_CACHE to be removed from the share.
657
Every thread must do that on exit for not to deadlock other threads.
659
The last thread destroys the pthread resources.
661
A writer flushes its cache first.
667
void remove_io_thread(IO_CACHE *cache)
669
IO_CACHE_SHARE *cshare= cache->share;
671
DBUG_ENTER("remove_io_thread");
673
/* If the writer goes, it needs to flush the write cache. */
674
if (cache == cshare->source_cache)
675
flush_io_cache(cache);
677
pthread_mutex_lock(&cshare->mutex);
678
DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
679
(cache == cshare->source_cache) ?
680
"writer" : "reader", (long) cache));
682
/* Remove from share. */
683
total= --cshare->total_threads;
684
DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
686
/* Detach from share. */
689
/* If the writer goes, let the readers know. */
690
if (cache == cshare->source_cache)
692
DBUG_PRINT("io_cache_share", ("writer leaves"));
693
cshare->source_cache= NULL;
696
/* If all threads are waiting for me to join the lock, wake them. */
697
if (!--cshare->running_threads)
699
DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
700
pthread_cond_signal(&cshare->cond_writer);
701
pthread_cond_broadcast(&cshare->cond);
704
pthread_mutex_unlock(&cshare->mutex);
708
DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
709
pthread_cond_destroy (&cshare->cond_writer);
710
pthread_cond_destroy (&cshare->cond);
711
pthread_mutex_destroy(&cshare->mutex);
719
Lock IO cache and wait for all other threads to join.
723
cache The cache of the thread entering the lock.
724
pos File position of the block to read.
725
Unused for the write thread.
729
Wait for all threads to finish with the current buffer. We want
730
all threads to proceed in concert. The last thread to join
731
lock_io_cache() will read the block from file and all threads start
732
to use it. Then they will join again for reading the next block.
734
The waiting threads detect a fresh buffer by comparing
735
cshare->pos_in_file with the position they want to process next.
736
Since the first block may start at position 0, we take
737
cshare->read_end as an additional condition. This variable is
738
initialized to NULL and will be set after a block of data is written
742
1 OK, lock in place, go ahead and read.
743
0 OK, unlocked, another thread did the read.
746
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
748
IO_CACHE_SHARE *cshare= cache->share;
749
DBUG_ENTER("lock_io_cache");
751
/* Enter the lock. */
752
pthread_mutex_lock(&cshare->mutex);
753
cshare->running_threads--;
754
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
755
(cache == cshare->source_cache) ?
756
"writer" : "reader", (long) cache, (ulong) pos,
757
cshare->running_threads));
759
if (cshare->source_cache)
761
/* A write cache is synchronized to the read caches. */
763
if (cache == cshare->source_cache)
765
/* The writer waits until all readers are here. */
766
while (cshare->running_threads)
768
DBUG_PRINT("io_cache_share", ("writer waits in lock"));
769
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
771
DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
773
/* Stay locked. Leave the lock later by unlock_io_cache(). */
777
/* The last thread wakes the writer. */
778
if (!cshare->running_threads)
780
DBUG_PRINT("io_cache_share", ("waking writer"));
781
pthread_cond_signal(&cshare->cond_writer);
785
Readers wait until the data is copied from the writer. Another
786
reason to stop waiting is the removal of the write thread. If this
787
happens, we leave the lock with old data in the buffer.
789
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
790
cshare->source_cache)
792
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
793
pthread_cond_wait(&cshare->cond, &cshare->mutex);
797
If the writer was removed from the share while this thread was
798
asleep, we need to simulate an EOF condition. The writer cannot
799
reset the share variables as they might still be in use by readers
800
of the last block. When we awake here then because the last
801
joining thread signalled us. If the writer is not the last, it
802
will not signal. So it is safe to clear the buffer here.
804
if (!cshare->read_end || (cshare->pos_in_file < pos))
806
DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
807
cshare->read_end= cshare->buffer; /* Empty buffer. */
808
cshare->error= 0; /* EOF is not an error. */
814
There are read caches only. The last thread arriving in
815
lock_io_cache() continues with a locked cache and reads the block.
817
if (!cshare->running_threads)
819
DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
820
/* Stay locked. Leave the lock later by unlock_io_cache(). */
825
All other threads wait until the requested block is read by the
826
last thread arriving. Another reason to stop waiting is the
827
removal of a thread. If this leads to all threads being in the
828
lock, we have to continue also. The first of the awaken threads
829
will then do the read.
831
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
832
cshare->running_threads)
834
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
835
pthread_cond_wait(&cshare->cond, &cshare->mutex);
838
/* If the block is not yet read, continue with a locked cache and read. */
839
if (!cshare->read_end || (cshare->pos_in_file < pos))
841
DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
842
/* Stay locked. Leave the lock later by unlock_io_cache(). */
846
/* Another thread did read the block already. */
848
DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
849
(uint) (cshare->read_end ? (size_t)
850
(cshare->read_end - cshare->buffer) :
854
Leave the lock. Do not call unlock_io_cache() later. The thread that
855
filled the buffer did this and marked all threads as running.
857
pthread_mutex_unlock(&cshare->mutex);
867
cache The cache of the thread leaving the lock.
870
This is called by the thread that filled the buffer. It marks all
871
threads as running and awakes them. This must not be done by any
874
Do not signal cond_writer. Either there is no writer or the writer
875
is the only one who can call this function.
877
The reason for resetting running_threads to total_threads before
878
waking all other threads is that it could be possible that this
879
thread is so fast with processing the buffer that it enters the lock
880
before even one other thread has left it. If every awoken thread
881
would increase running_threads by one, this thread could think that
882
he is again the last to join and would not wait for the other
883
threads to process the data.
889
static void unlock_io_cache(IO_CACHE *cache)
891
IO_CACHE_SHARE *cshare= cache->share;
892
DBUG_ENTER("unlock_io_cache");
893
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
894
(cache == cshare->source_cache) ?
896
(long) cache, (ulong) cshare->pos_in_file,
897
cshare->total_threads));
899
cshare->running_threads= cshare->total_threads;
900
pthread_cond_broadcast(&cshare->cond);
901
pthread_mutex_unlock(&cshare->mutex);
907
Read from IO_CACHE when it is shared between several threads.
911
cache IO_CACHE pointer
912
Buffer Buffer to retrieve count bytes from file
913
Count Number of bytes to read into Buffer
916
This function is only called from the my_b_read() macro when there
917
isn't enough characters in the buffer to satisfy the request.
921
It works as follows: when a thread tries to read from a file (that
922
is, after using all the data from the (shared) buffer), it just
923
hangs on lock_io_cache(), waiting for other threads. When the very
924
last thread attempts a read, lock_io_cache() returns 1, the thread
925
does actual IO and unlock_io_cache(), which signals all the waiting
926
threads that data is in the buffer.
930
When changing this function, be careful with handling file offsets
931
(end-of_file, pos_in_file). Do not cast them to possibly smaller
932
types than my_off_t unless you can be sure that their value fits.
933
Same applies to differences of file offsets. (Bug #11527)
935
When changing this function, check _my_b_read(). It might need the
939
0 we succeeded in reading all data
940
1 Error: can't read requested characters
943
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
945
my_off_t pos_in_file;
946
size_t length, diff_length, left_length;
947
IO_CACHE_SHARE *cshare= cache->share;
948
DBUG_ENTER("_my_b_read_r");
950
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
952
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
953
memcpy(Buffer, cache->read_pos, left_length);
954
Buffer+= left_length;
961
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
962
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
963
length=IO_ROUND_UP(Count+diff_length)-diff_length;
964
length= ((length <= cache->read_length) ?
965
length + IO_ROUND_DN(cache->read_length - length) :
966
length - IO_ROUND_UP(length - cache->read_length));
967
if (cache->type != READ_FIFO &&
968
(length > (cache->end_of_file - pos_in_file)))
969
length= (size_t) (cache->end_of_file - pos_in_file);
972
cache->error= (int) left_length;
975
if (lock_io_cache(cache, pos_in_file))
977
/* With a synchronized write/read cache we won't come here... */
978
DBUG_ASSERT(!cshare->source_cache);
980
... unless the writer has gone before this thread entered the
981
lock. Simulate EOF in this case. It can be distinguished by
989
Whenever a function which operates on IO_CACHE flushes/writes
990
some part of the IO_CACHE to disk it will set the property
991
"seek_not_done" to indicate this to other functions operating
994
if (cache->seek_not_done)
996
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1000
unlock_io_cache(cache);
1004
len= my_read(cache->file, cache->buffer, length, cache->myflags);
1006
DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1008
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1009
cache->error= (len == length ? 0 : (int) len);
1010
cache->pos_in_file= pos_in_file;
1012
/* Copy important values to the share. */
1013
cshare->error= cache->error;
1014
cshare->read_end= cache->read_end;
1015
cshare->pos_in_file= pos_in_file;
1017
/* Mark all threads as running and wake them. */
1018
unlock_io_cache(cache);
1023
With a synchronized write/read cache readers always come here.
1024
Copy important values from the share.
1026
cache->error= cshare->error;
1027
cache->read_end= cshare->read_end;
1028
cache->pos_in_file= cshare->pos_in_file;
1030
len= ((cache->error == -1) ? (size_t) -1 :
1031
(size_t) (cache->read_end - cache->buffer));
1033
cache->read_pos= cache->buffer;
1034
cache->seek_not_done= 0;
1035
if (len == 0 || len == (size_t) -1)
1037
DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1038
(ulong) len, (ulong) left_length));
1039
cache->error= (int) left_length;
1042
cnt= (len > Count) ? Count : len;
1043
memcpy(Buffer, cache->read_pos, cnt);
1047
cache->read_pos+= cnt;
1054
Copy data from write cache to read cache.
1057
copy_to_read_buffer()
1058
write_cache The write cache.
1059
write_buffer The source of data, mostly the cache buffer.
1060
write_length The number of bytes to copy.
1063
The write thread will wait for all read threads to join the cache
1064
lock. Then it copies the data over and wakes the read threads.
1070
static void copy_to_read_buffer(IO_CACHE *write_cache,
1071
const uchar *write_buffer, size_t write_length)
1073
IO_CACHE_SHARE *cshare= write_cache->share;
1075
DBUG_ASSERT(cshare->source_cache == write_cache);
1077
write_length is usually less or equal to buffer_length.
1078
It can be bigger if _my_b_write() is called with a big length.
1080
while (write_length)
1082
size_t copy_length= min(write_length, write_cache->buffer_length);
1083
int __attribute__((unused)) rc;
1085
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1086
/* The writing thread does always have the lock when it awakes. */
1089
memcpy(cshare->buffer, write_buffer, copy_length);
1092
cshare->read_end= cshare->buffer + copy_length;
1093
cshare->pos_in_file= write_cache->pos_in_file;
1095
/* Mark all threads as running and wake them. */
1096
unlock_io_cache(write_cache);
1098
write_buffer+= copy_length;
1099
write_length-= copy_length;
1105
Do sequential read from the SEQ_READ_APPEND cache.
1107
We do this in three stages:
1108
- first read from info->buffer
1109
- then if there are still data to read, try the file descriptor
1110
- afterwards, if there are still data to read, try append buffer
1117
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1119
size_t length, diff_length, left_length, save_count, max_length;
1120
my_off_t pos_in_file;
1123
/* first, read the regular buffer */
1124
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1126
DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
1127
memcpy(Buffer,info->read_pos, left_length);
1128
Buffer+=left_length;
1131
lock_append_buffer(info);
1133
/* pos_in_file always point on where info->buffer was read */
1134
if ((pos_in_file=info->pos_in_file +
1135
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1136
goto read_append_buffer;
1139
With read-append cache we must always do a seek before we read,
1140
because the write could have moved the file pointer astray
1142
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1145
unlock_append_buffer(info);
1148
info->seek_not_done=0;
1150
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1152
/* now the second stage begins - read from file descriptor */
1153
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1155
/* Fill first intern buffer */
1158
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1159
if ((read_length= my_read(info->file,Buffer, length,
1160
info->myflags)) == (size_t) -1)
1163
unlock_append_buffer(info);
1167
Buffer+=read_length;
1168
pos_in_file+=read_length;
1170
if (read_length != length)
1173
We only got part of data; Read the rest of the data from the
1176
goto read_append_buffer;
1178
left_length+=length;
1182
max_length= info->read_length-diff_length;
1183
if (max_length > (info->end_of_file - pos_in_file))
1184
max_length= (size_t) (info->end_of_file - pos_in_file);
1188
goto read_append_buffer;
1189
length=0; /* Didn't read any more chars */
1193
length= my_read(info->file,info->buffer, max_length, info->myflags);
1194
if (length == (size_t) -1)
1197
unlock_append_buffer(info);
1202
memcpy(Buffer, info->buffer, length);
1207
added the line below to make
1208
DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1209
otherwise this does not appear to be needed
1211
pos_in_file += length;
1212
goto read_append_buffer;
1215
unlock_append_buffer(info);
1216
info->read_pos=info->buffer+Count;
1217
info->read_end=info->buffer+length;
1218
info->pos_in_file=pos_in_file;
1219
memcpy(Buffer,info->buffer,(size_t) Count);
1225
Read data from the current write buffer.
1226
Count should never be == 0 here (The code will work even if count is 0)
1230
/* First copy the data to Count */
1231
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1233
size_t transfer_len;
1235
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1237
TODO: figure out if the assert below is needed or correct.
1239
DBUG_ASSERT(pos_in_file == info->end_of_file);
1240
copy_len=min(Count, len_in_buff);
1241
memcpy(Buffer, info->append_read_pos, copy_len);
1242
info->append_read_pos += copy_len;
1245
info->error = save_count - Count;
1247
/* Fill read buffer with data from write buffer */
1248
memcpy(info->buffer, info->append_read_pos,
1249
(size_t) (transfer_len=len_in_buff - copy_len));
1250
info->read_pos= info->buffer;
1251
info->read_end= info->buffer+transfer_len;
1252
info->append_read_pos=info->write_pos;
1253
info->pos_in_file=pos_in_file+copy_len;
1254
info->end_of_file+=len_in_buff;
1256
unlock_append_buffer(info);
1257
return Count ? 1 : 0;
533
1261
#ifdef HAVE_AIOWAIT
537
* Read from the IO_CACHE into a buffer and feed asynchronously from disk when needed.
539
* @param info IO_CACHE pointer
540
* @param Buffer Buffer to retrieve count bytes from file
541
* @param Count Number of bytes to read into Buffer
543
* @retval -1 An error has occurred; errno is set.
545
* @retval 1 An error has occurred; IO_CACHE to error state.
547
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1264
Read from the IO_CACHE into a buffer and feed asynchronously
1265
from disk when needed.
1269
info IO_CACHE pointer
1270
Buffer Buffer to retrieve count bytes from file
1271
Count Number of bytes to read into Buffer
1274
-1 An error has occurred; my_errno is set.
1276
1 An error has occurred; IO_CACHE to error state.
1279
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
549
1281
size_t length,read_length,diff_length,left_length,use_length,org_Count;
550
1282
size_t max_length;
551
1283
my_off_t next_pos_in_file;
552
unsigned char *read_buffer;
554
1286
memcpy(Buffer,info->read_pos,
555
1287
(left_length= (size_t) (info->read_end-info->read_pos)));