477
467
info->file is a pipe or socket or FIFO. We never should have tried
478
468
to seek on that. See Bugs#25807 and #22828 for more info.
480
assert(errno != ESPIPE);
470
assert(my_errno != ESPIPE);
486
diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
476
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
477
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
478
{ /* Fill first intern buffer */
489
479
size_t read_length;
490
if (info->end_of_file <= pos_in_file_local)
480
if (info->end_of_file <= pos_in_file)
491
481
{ /* End of file */
492
482
info->error= (int) left_length;
495
length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
485
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
486
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
498
489
info->error= (read_length == (size_t) -1 ? -1 :
499
490
(int) (read_length+left_length));
502
Count-= length_local;
503
Buffer+= length_local;
504
pos_in_file_local+= length_local;
505
left_length+= length_local;
509
500
max_length= info->read_length-diff_length;
510
501
if (info->type != READ_FIFO &&
511
max_length > (info->end_of_file - pos_in_file_local))
512
max_length= (size_t) (info->end_of_file - pos_in_file_local);
502
max_length > (info->end_of_file - pos_in_file))
503
max_length= (size_t) (info->end_of_file - pos_in_file);
517
info->error= static_cast<int>(left_length); /* We only got this many char */
508
info->error= left_length; /* We only got this many char */
520
length_local=0; /* Didn't read any chars */
511
length=0; /* Didn't read any chars */
522
else if (( length_local= my_read(info->file,info->buffer, max_length,
513
else if ((length= my_read(info->file,info->buffer, max_length,
523
514
info->myflags)) < Count ||
524
length_local == (size_t) -1)
515
length == (size_t) -1)
526
if ( length_local != (size_t) -1)
527
memcpy(Buffer, info->buffer, length_local);
528
info->pos_in_file= pos_in_file_local;
529
info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
517
if (length != (size_t) -1)
518
memcpy(Buffer, info->buffer, length);
519
info->pos_in_file= pos_in_file;
520
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
521
info->read_pos=info->read_end=info->buffer;
533
524
info->read_pos=info->buffer+Count;
534
info->read_end=info->buffer+ length_local;
535
info->pos_in_file=pos_in_file_local;
525
info->read_end=info->buffer+length;
526
info->pos_in_file=pos_in_file;
536
527
memcpy(Buffer, info->buffer, Count);
533
Prepare IO_CACHE for shared use.
536
init_io_cache_share()
537
read_cache A read cache. This will be copied for
538
every thread after setup.
540
write_cache If non-NULL a write cache that is to be
541
synchronized with the read caches.
542
num_threads Number of threads sharing the cache
543
including the write thread if any.
547
The shared cache is used so: One IO_CACHE is initialized with
548
init_io_cache(). This includes the allocation of a buffer. Then a
549
share is allocated and init_io_cache_share() is called with the io
550
cache and the share. Then the io cache is copied for each thread. So
551
every thread has its own copy of IO_CACHE. But the allocated buffer
552
is shared because cache->buffer is the same for all caches.
554
One thread reads data from the file into the buffer. All threads
555
read from the buffer, but every thread maintains its own set of
556
pointers into the buffer. When all threads have used up the buffer
557
contents, one of the threads reads the next block of data into the
558
buffer. To accomplish this, each thread enters the cache lock before
559
accessing the buffer. They wait in lock_io_cache() until all threads
560
joined the lock. The last thread entering the lock is in charge of
561
reading from file to buffer. It wakes all threads when done.
563
Synchronizing a write cache to the read caches works so: Whenever
564
the write buffer needs a flush, the write thread enters the lock and
565
waits for all other threads to enter the lock too. They do this when
566
they have used up the read buffer. When all threads are in the lock,
567
the write thread copies the write buffer to the read buffer and
570
share->running_threads is the number of threads not being in the
571
cache lock. When entering lock_io_cache() the number is decreased.
572
When the thread that fills the buffer enters unlock_io_cache() the
573
number is reset to the number of threads. The condition
574
running_threads == 0 means that all threads are in the lock. Bumping
575
up the number to the full count is non-intuitive. But increasing the
576
number by one for each thread that leaves the lock could lead to a
577
solo run of one thread. The last thread to join a lock reads from
578
file to buffer, wakes the other threads, processes the data in the
579
cache and enters the lock again. If no other thread left the lock
580
meanwhile, it would think it's the last one again and read the next
583
The share has copies of 'error', 'buffer', 'read_end', and
584
'pos_in_file' from the thread that filled the buffer. We may not be
585
able to access this information directly from its cache because the
586
thread may be removed from the share before the variables could be
587
copied by all other threads. Or, if a write buffer is synchronized,
588
it would change its 'pos_in_file' after waking the other threads,
589
possibly before they could copy its value.
591
However, the 'buffer' variable in the share is for a synchronized
592
write cache. It needs to know where to put the data. Otherwise it
593
would need access to the read cache of one of the threads that is
594
not yet removed from the share.
600
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
601
IO_CACHE *write_cache, uint32_t num_threads)
603
assert(num_threads > 1);
604
assert(read_cache->type == READ_CACHE);
605
assert(!write_cache || (write_cache->type == WRITE_CACHE));
607
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
608
pthread_cond_init(&cshare->cond, 0);
609
pthread_cond_init(&cshare->cond_writer, 0);
611
cshare->running_threads= num_threads;
612
cshare->total_threads= num_threads;
613
cshare->error= 0; /* Initialize. */
614
cshare->buffer= read_cache->buffer;
615
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
616
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
617
cshare->source_cache= write_cache; /* Can be NULL. */
619
read_cache->share= cshare;
620
read_cache->read_function= _my_b_read_r;
621
read_cache->current_pos= NULL;
622
read_cache->current_end= NULL;
625
write_cache->share= cshare;
632
Remove a thread from shared access to IO_CACHE.
636
cache The IO_CACHE to be removed from the share.
640
Every thread must do that on exit for not to deadlock other threads.
642
The last thread destroys the pthread resources.
644
A writer flushes its cache first.
650
void remove_io_thread(IO_CACHE *cache)
652
IO_CACHE_SHARE *cshare= cache->share;
655
/* If the writer goes, it needs to flush the write cache. */
656
if (cache == cshare->source_cache)
657
flush_io_cache(cache);
659
pthread_mutex_lock(&cshare->mutex);
661
/* Remove from share. */
662
total= --cshare->total_threads;
664
/* Detach from share. */
667
/* If the writer goes, let the readers know. */
668
if (cache == cshare->source_cache)
670
cshare->source_cache= NULL;
673
/* If all threads are waiting for me to join the lock, wake them. */
674
if (!--cshare->running_threads)
676
pthread_cond_signal(&cshare->cond_writer);
677
pthread_cond_broadcast(&cshare->cond);
680
pthread_mutex_unlock(&cshare->mutex);
684
pthread_cond_destroy (&cshare->cond_writer);
685
pthread_cond_destroy (&cshare->cond);
686
pthread_mutex_destroy(&cshare->mutex);
694
Lock IO cache and wait for all other threads to join.
698
cache The cache of the thread entering the lock.
699
pos File position of the block to read.
700
Unused for the write thread.
704
Wait for all threads to finish with the current buffer. We want
705
all threads to proceed in concert. The last thread to join
706
lock_io_cache() will read the block from file and all threads start
707
to use it. Then they will join again for reading the next block.
709
The waiting threads detect a fresh buffer by comparing
710
cshare->pos_in_file with the position they want to process next.
711
Since the first block may start at position 0, we take
712
cshare->read_end as an additional condition. This variable is
713
initialized to NULL and will be set after a block of data is written
717
1 OK, lock in place, go ahead and read.
718
0 OK, unlocked, another thread did the read.
721
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
723
IO_CACHE_SHARE *cshare= cache->share;
725
/* Enter the lock. */
726
pthread_mutex_lock(&cshare->mutex);
727
cshare->running_threads--;
729
if (cshare->source_cache)
731
/* A write cache is synchronized to the read caches. */
733
if (cache == cshare->source_cache)
735
/* The writer waits until all readers are here. */
736
while (cshare->running_threads)
738
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
740
/* Stay locked. Leave the lock later by unlock_io_cache(). */
744
/* The last thread wakes the writer. */
745
if (!cshare->running_threads)
747
pthread_cond_signal(&cshare->cond_writer);
751
Readers wait until the data is copied from the writer. Another
752
reason to stop waiting is the removal of the write thread. If this
753
happens, we leave the lock with old data in the buffer.
755
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
756
cshare->source_cache)
758
pthread_cond_wait(&cshare->cond, &cshare->mutex);
762
If the writer was removed from the share while this thread was
763
asleep, we need to simulate an EOF condition. The writer cannot
764
reset the share variables as they might still be in use by readers
765
of the last block. When we awake here then because the last
766
joining thread signalled us. If the writer is not the last, it
767
will not signal. So it is safe to clear the buffer here.
769
if (!cshare->read_end || (cshare->pos_in_file < pos))
771
cshare->read_end= cshare->buffer; /* Empty buffer. */
772
cshare->error= 0; /* EOF is not an error. */
778
There are read caches only. The last thread arriving in
779
lock_io_cache() continues with a locked cache and reads the block.
781
if (!cshare->running_threads)
783
/* Stay locked. Leave the lock later by unlock_io_cache(). */
788
All other threads wait until the requested block is read by the
789
last thread arriving. Another reason to stop waiting is the
790
removal of a thread. If this leads to all threads being in the
791
lock, we have to continue also. The first of the awaken threads
792
will then do the read.
794
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
795
cshare->running_threads)
797
pthread_cond_wait(&cshare->cond, &cshare->mutex);
800
/* If the block is not yet read, continue with a locked cache and read. */
801
if (!cshare->read_end || (cshare->pos_in_file < pos))
803
/* Stay locked. Leave the lock later by unlock_io_cache(). */
807
/* Another thread did read the block already. */
811
Leave the lock. Do not call unlock_io_cache() later. The thread that
812
filled the buffer did this and marked all threads as running.
814
pthread_mutex_unlock(&cshare->mutex);
824
cache The cache of the thread leaving the lock.
827
This is called by the thread that filled the buffer. It marks all
828
threads as running and awakes them. This must not be done by any
831
Do not signal cond_writer. Either there is no writer or the writer
832
is the only one who can call this function.
834
The reason for resetting running_threads to total_threads before
835
waking all other threads is that it could be possible that this
836
thread is so fast with processing the buffer that it enters the lock
837
before even one other thread has left it. If every awoken thread
838
would increase running_threads by one, this thread could think that
839
he is again the last to join and would not wait for the other
840
threads to process the data.
846
static void unlock_io_cache(IO_CACHE *cache)
848
IO_CACHE_SHARE *cshare= cache->share;
850
cshare->running_threads= cshare->total_threads;
851
pthread_cond_broadcast(&cshare->cond);
852
pthread_mutex_unlock(&cshare->mutex);
858
Read from IO_CACHE when it is shared between several threads.
862
cache IO_CACHE pointer
863
Buffer Buffer to retrieve count bytes from file
864
Count Number of bytes to read into Buffer
867
This function is only called from the my_b_read() macro when there
868
isn't enough characters in the buffer to satisfy the request.
872
It works as follows: when a thread tries to read from a file (that
873
is, after using all the data from the (shared) buffer), it just
874
hangs on lock_io_cache(), waiting for other threads. When the very
875
last thread attempts a read, lock_io_cache() returns 1, the thread
876
does actual IO and unlock_io_cache(), which signals all the waiting
877
threads that data is in the buffer.
881
When changing this function, be careful with handling file offsets
882
(end-of_file, pos_in_file). Do not cast them to possibly smaller
883
types than my_off_t unless you can be sure that their value fits.
884
Same applies to differences of file offsets. (Bug #11527)
886
When changing this function, check _my_b_read(). It might need the
890
0 we succeeded in reading all data
891
1 Error: can't read requested characters
894
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
896
my_off_t pos_in_file;
897
size_t length, diff_length, left_length;
898
IO_CACHE_SHARE *cshare= cache->share;
900
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
902
assert(Count >= left_length); /* User is not using my_b_read() */
903
memcpy(Buffer, cache->read_pos, left_length);
904
Buffer+= left_length;
911
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
912
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
913
length=IO_ROUND_UP(Count+diff_length)-diff_length;
914
length= ((length <= cache->read_length) ?
915
length + IO_ROUND_DN(cache->read_length - length) :
916
length - IO_ROUND_UP(length - cache->read_length));
917
if (cache->type != READ_FIFO &&
918
(length > (cache->end_of_file - pos_in_file)))
919
length= (size_t) (cache->end_of_file - pos_in_file);
922
cache->error= (int) left_length;
925
if (lock_io_cache(cache, pos_in_file))
927
/* With a synchronized write/read cache we won't come here... */
928
assert(!cshare->source_cache);
930
... unless the writer has gone before this thread entered the
931
lock. Simulate EOF in this case. It can be distinguished by
939
Whenever a function which operates on IO_CACHE flushes/writes
940
some part of the IO_CACHE to disk it will set the property
941
"seek_not_done" to indicate this to other functions operating
944
if (cache->seek_not_done)
946
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
950
unlock_io_cache(cache);
954
len= my_read(cache->file, cache->buffer, length, cache->myflags);
956
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
957
cache->error= (len == length ? 0 : (int) len);
958
cache->pos_in_file= pos_in_file;
960
/* Copy important values to the share. */
961
cshare->error= cache->error;
962
cshare->read_end= cache->read_end;
963
cshare->pos_in_file= pos_in_file;
965
/* Mark all threads as running and wake them. */
966
unlock_io_cache(cache);
971
With a synchronized write/read cache readers always come here.
972
Copy important values from the share.
974
cache->error= cshare->error;
975
cache->read_end= cshare->read_end;
976
cache->pos_in_file= cshare->pos_in_file;
978
len= ((cache->error == -1) ? (size_t) -1 :
979
(size_t) (cache->read_end - cache->buffer));
981
cache->read_pos= cache->buffer;
982
cache->seek_not_done= 0;
983
if (len == 0 || len == (size_t) -1)
985
cache->error= (int) left_length;
988
cnt= (len > Count) ? Count : len;
989
memcpy(Buffer, cache->read_pos, cnt);
993
cache->read_pos+= cnt;
1000
Copy data from write cache to read cache.
1003
copy_to_read_buffer()
1004
write_cache The write cache.
1005
write_buffer The source of data, mostly the cache buffer.
1006
write_length The number of bytes to copy.
1009
The write thread will wait for all read threads to join the cache
1010
lock. Then it copies the data over and wakes the read threads.
1016
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
const unsigned char *write_buffer, size_t write_length)
1019
IO_CACHE_SHARE *cshare= write_cache->share;
1021
assert(cshare->source_cache == write_cache);
1023
write_length is usually less or equal to buffer_length.
1024
It can be bigger if _my_b_write() is called with a big length.
1026
while (write_length)
1028
size_t copy_length= cmin(write_length, write_cache->buffer_length);
1029
int __attribute__((unused)) rc;
1031
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
/* The writing thread does always have the lock when it awakes. */
1035
memcpy(cshare->buffer, write_buffer, copy_length);
1038
cshare->read_end= cshare->buffer + copy_length;
1039
cshare->pos_in_file= write_cache->pos_in_file;
1041
/* Mark all threads as running and wake them. */
1042
unlock_io_cache(write_cache);
1044
write_buffer+= copy_length;
1045
write_length-= copy_length;
1051
Do sequential read from the SEQ_READ_APPEND cache.
1053
We do this in three stages:
1054
- first read from info->buffer
1055
- then if there are still data to read, try the file descriptor
1056
- afterwards, if there are still data to read, try append buffer
1063
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1065
size_t length, diff_length, left_length, save_count, max_length;
1066
my_off_t pos_in_file;
1069
/* first, read the regular buffer */
1070
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1072
assert(Count > left_length); /* User is not using my_b_read() */
1073
memcpy(Buffer,info->read_pos, left_length);
1074
Buffer+=left_length;
1077
lock_append_buffer(info);
1079
/* pos_in_file always point on where info->buffer was read */
1080
if ((pos_in_file=info->pos_in_file +
1081
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1082
goto read_append_buffer;
1085
With read-append cache we must always do a seek before we read,
1086
because the write could have moved the file pointer astray
1088
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1091
unlock_append_buffer(info);
1094
info->seek_not_done=0;
1096
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1098
/* now the second stage begins - read from file descriptor */
1099
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1101
/* Fill first intern buffer */
1104
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1105
if ((read_length= my_read(info->file,Buffer, length,
1106
info->myflags)) == (size_t) -1)
1109
unlock_append_buffer(info);
1113
Buffer+=read_length;
1114
pos_in_file+=read_length;
1116
if (read_length != length)
1119
We only got part of data; Read the rest of the data from the
1122
goto read_append_buffer;
1124
left_length+=length;
1128
max_length= info->read_length-diff_length;
1129
if (max_length > (info->end_of_file - pos_in_file))
1130
max_length= (size_t) (info->end_of_file - pos_in_file);
1134
goto read_append_buffer;
1135
length=0; /* Didn't read any more chars */
1139
length= my_read(info->file,info->buffer, max_length, info->myflags);
1140
if (length == (size_t) -1)
1143
unlock_append_buffer(info);
1148
memcpy(Buffer, info->buffer, length);
1153
added the line below to make
1154
assert(pos_in_file==info->end_of_file) pass.
1155
otherwise this does not appear to be needed
1157
pos_in_file += length;
1158
goto read_append_buffer;
1161
unlock_append_buffer(info);
1162
info->read_pos=info->buffer+Count;
1163
info->read_end=info->buffer+length;
1164
info->pos_in_file=pos_in_file;
1165
memcpy(Buffer,info->buffer,(size_t) Count);
1171
Read data from the current write buffer.
1172
Count should never be == 0 here (The code will work even if count is 0)
1176
/* First copy the data to Count */
1177
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1179
size_t transfer_len;
1181
assert(info->append_read_pos <= info->write_pos);
1183
TODO: figure out if the assert below is needed or correct.
1185
assert(pos_in_file == info->end_of_file);
1186
copy_len=cmin(Count, len_in_buff);
1187
memcpy(Buffer, info->append_read_pos, copy_len);
1188
info->append_read_pos += copy_len;
1191
info->error = save_count - Count;
1193
/* Fill read buffer with data from write buffer */
1194
memcpy(info->buffer, info->append_read_pos,
1195
(size_t) (transfer_len=len_in_buff - copy_len));
1196
info->read_pos= info->buffer;
1197
info->read_end= info->buffer+transfer_len;
1198
info->append_read_pos=info->write_pos;
1199
info->pos_in_file=pos_in_file+copy_len;
1200
info->end_of_file+=len_in_buff;
1202
unlock_append_buffer(info);
1203
return Count ? 1 : 0;
541
1207
#ifdef HAVE_AIOWAIT
545
* Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
547
* @param info st_io_cache pointer
548
* @param Buffer Buffer to retrieve count bytes from file
549
* @param Count Number of bytes to read into Buffer
551
* @retval -1 An error has occurred; errno is set.
553
* @retval 1 An error has occurred; st_io_cache to error state.
555
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
1210
Read from the IO_CACHE into a buffer and feed asynchronously
1211
from disk when needed.
1215
info IO_CACHE pointer
1216
Buffer Buffer to retrieve count bytes from file
1217
Count Number of bytes to read into Buffer
1220
-1 An error has occurred; my_errno is set.
1222
1 An error has occurred; IO_CACHE to error state.
1225
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
557
size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1227
size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1228
size_t max_length;
559
1229
my_off_t next_pos_in_file;
560
1230
unsigned char *read_buffer;
761
1432
if (Count >= IO_SIZE)
762
1433
{ /* Fill first intern buffer */
763
length_local=Count & (size_t) ~(IO_SIZE-1);
1434
length=Count & (size_t) ~(IO_SIZE-1);
764
1435
if (info->seek_not_done)
767
Whenever a function which operates on st_io_cache flushes/writes
768
some part of the st_io_cache to disk it will set the property
1438
Whenever a function which operates on IO_CACHE flushes/writes
1439
some part of the IO_CACHE to disk it will set the property
769
1440
"seek_not_done" to indicate this to other functions operating
772
if (lseek(info->file,info->pos_in_file,SEEK_SET))
1443
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
774
1445
info->error= -1;
777
1448
info->seek_not_done=0;
779
if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
return info->error= -1;
783
Buffer+=length_local;
784
info->pos_in_file+=length_local;
786
memcpy(info->write_pos,Buffer,(size_t) Count);
787
info->write_pos+=Count;
793
* Write a block to disk where part of the data may be inside the record buffer.
794
* As all write calls to the data goes through the cache,
795
* we will never get a seek over the end of the buffer.
797
int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
1450
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1451
return info->error= -1;
1454
In case of a shared I/O cache with a writer we normally do direct
1455
write cache to read cache copy. Simulate this here by direct
1456
caller buffer to read cache copy. Do it after the write so that
1457
the cache readers actions on the flushed part can go in parallel
1458
with the write of the extra stuff. copy_to_read_buffer()
1459
synchronizes writer and readers so that after this call the
1460
readers can act on the extra stuff while the writer can go ahead
1461
and prepare the next output. copy_to_read_buffer() relies on
1465
copy_to_read_buffer(info, Buffer, length);
1469
info->pos_in_file+=length;
1471
memcpy(info->write_pos,Buffer,(size_t) Count);
1472
info->write_pos+=Count;
1478
Append a block to the write buffer.
1479
This is done with the buffer locked to ensure that we don't read from
1480
the write buffer before we are ready with it.
1483
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1485
size_t rest_length,length;
1488
Assert that we cannot come here with a shared cache. If we do one
1489
day, we might need to add a call to copy_to_read_buffer().
1491
assert(!info->share);
1493
lock_append_buffer(info);
1494
rest_length= (size_t) (info->write_end - info->write_pos);
1495
if (Count <= rest_length)
1497
memcpy(info->write_pos, Buffer, rest_length);
1498
Buffer+=rest_length;
1500
info->write_pos+=rest_length;
1501
if (my_b_flush_io_cache(info,0))
1503
unlock_append_buffer(info);
1506
if (Count >= IO_SIZE)
1507
{ /* Fill first intern buffer */
1508
length=Count & (size_t) ~(IO_SIZE-1);
1509
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1511
unlock_append_buffer(info);
1512
return info->error= -1;
1516
info->end_of_file+=length;
1520
memcpy(info->write_pos,Buffer,(size_t) Count);
1521
info->write_pos+=Count;
1522
unlock_append_buffer(info);
1527
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1530
Sasha: We are not writing this with the ? operator to avoid hitting
1531
a possible compiler bug. At least gcc 2.95 cannot deal with
1532
several layers of ternary operators that evaluated comma(,) operator
1533
expressions inside - I do have a test case if somebody wants it
1535
if (info->type == SEQ_READ_APPEND)
1536
return my_b_append(info, Buffer, Count);
1537
return my_b_write(info, Buffer, Count);
1542
Write a block to disk where part of the data may be inside the record
1543
buffer. As all write calls to the data goes through the cache,
1544
we will never get a seek over the end of the buffer
1547
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
1554
Assert that we cannot come here with a shared cache. If we do one
1555
day, we might need to add a call to copy_to_read_buffer().
1557
assert(!info->share);
803
1559
if (pos < info->pos_in_file)
805
1561
/* Of no overlap, write everything without buffering */
806
1562
if (pos + Count <= info->pos_in_file)
807
1563
return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1564
/* Write the part of the block that is before buffer */
809
length_local= (uint32_t) (info->pos_in_file - pos);
810
if (pwrite(info->file, Buffer, length_local, pos) == 0)
1565
length= (uint) (info->pos_in_file - pos);
1566
if (pwrite(info->file, Buffer, length, pos) == 0)
811
1567
info->error= error= -1;
812
Buffer+=length_local;
814
Count-= length_local;
1572
info->seek_not_done=1;
817
1576
/* Check if we want to write inside the used part of the buffer.*/
818
length_local= (size_t) (info->write_end - info->buffer);
819
if (pos < info->pos_in_file + length_local)
1577
length= (size_t) (info->write_end - info->buffer);
1578
if (pos < info->pos_in_file + length)
821
1580
size_t offset= (size_t) (pos - info->pos_in_file);
822
length_local-=offset;
823
if (length_local > Count)
825
memcpy(info->buffer+offset, Buffer, length_local);
826
Buffer+=length_local;
827
Count-= length_local;
828
/* Fix length_local of buffer if the new data was larger */
829
if (info->buffer+length_local > info->write_pos)
830
info->write_pos=info->buffer+length_local;
1584
memcpy(info->buffer+offset, Buffer, length);
1587
/* Fix length of buffer if the new data was larger */
1588
if (info->buffer+length > info->write_pos)
1589
info->write_pos=info->buffer+length;
909
unlock_append_buffer(info, need_append_buffer_lock);
1685
UNLOCK_APPEND_BUFFER;
915
* Free an st_io_cache object
918
* It's currently safe to call this if one has called init_io_cache()
919
* on the 'info' object, even if init_io_cache() failed.
920
* This function is also safe to call twice with the same handle.
922
* @param info st_io_cache Handle to free
927
int st_io_cache::end_io_cache()
1690
Free an IO_CACHE object
1694
info IO_CACHE Handle to free
1697
It's currently safe to call this if one has called init_io_cache()
1698
on the 'info' object, even if init_io_cache() failed.
1699
This function is also safe to call twice with the same handle.
1706
int end_io_cache(IO_CACHE *info)
938
if (type == READ_CACHE)
939
global_read_buffer.sub(buffer_length);
941
if (file != -1) /* File doesn't exist */
942
_error= my_b_flush_io_cache(this, 1);
943
free((unsigned char*) buffer);
944
buffer=read_pos=(unsigned char*) 0;
1709
IO_CACHE_CALLBACK pre_close;
1712
Every thread must call remove_io_thread(). The last one destroys
1715
assert(!info->share || !info->share->total_threads);
1717
if ((pre_close=info->pre_close))
1722
if (info->alloced_buffer)
1724
info->alloced_buffer=0;
1725
if (info->file != -1) /* File doesn't exist */
1726
error= my_b_flush_io_cache(info,1);
1727
free((unsigned char*) info->buffer);
1728
info->buffer=info->read_pos=(unsigned char*) 0;
1730
if (info->type == SEQ_READ_APPEND)
1732
/* Destroy allocated mutex */
1733
info->type= TYPE_NOT_SET;
1734
pthread_mutex_destroy(&info->append_buffer_lock);
948
1737
} /* end_io_cache */
950
} /* namespace internal */
951
} /* namespace drizzled */
1740
/**********************************************************************
1741
Testing of MF_IOCACHE
1742
**********************************************************************/
1748
void die(const char* fmt, ...)
1751
va_start(va_args,fmt);
1752
fprintf(stderr,"Error:");
1753
vfprintf(stderr, fmt,va_args);
1754
fprintf(stderr,", errno=%d\n", errno);
1758
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1761
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1762
die("Could not open %s", fname);
1763
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1764
die("failed in init_io_cache()");
1768
void close_file(IO_CACHE* info)
1771
my_close(info->file, MYF(MY_WME));
1774
int main(int argc, char** argv)
1776
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1778
const char* fname="/tmp/iocache.test";
1779
int cache_size=16384;
1781
int max_block,total_bytes=0;
1782
int i,num_loops=100,error=0;
1784
char* block, *block_end;
1786
max_block = cache_size*3;
1787
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1788
die("Not enough memory to allocate test block");
1789
block_end = block + max_block;
1790
for (p = block,i=0; p < block_end;i++)
1794
if (my_stat(fname,&status, MYF(0)) &&
1795
my_delete(fname,MYF(MY_WME)))
1797
die("Delete of %s failed, aborting", fname);
1799
open_file(fname,&sra_cache, cache_size);
1800
for (i = 0; i < num_loops; i++)
1803
int block_size = abs(rand() % max_block);
1804
int4store(buf, block_size);
1805
if (my_b_append(&sra_cache,buf,4) ||
1806
my_b_append(&sra_cache, block, block_size))
1807
die("write failed");
1808
total_bytes += 4+block_size;
1810
close_file(&sra_cache);
1812
if (!my_stat(fname,&status,MYF(MY_WME)))
1813
die("%s failed to stat, but I had just closed it,\
1814
wonder how that happened");
1815
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1816
llstr(status.st_size,llstr_buf),
1818
my_delete(fname, MYF(MY_WME));
1819
/* check correctness of tests */
1820
if (total_bytes != status.st_size)
1822
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1823
supposedly written\n");