477
466
info->file is a pipe or socket or FIFO. We never should have tried
478
467
to seek on that. See Bugs#25807 and #22828 for more info.
480
assert(errno != ESPIPE);
469
assert(my_errno != ESPIPE);
486
diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
475
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
476
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
477
{ /* Fill first intern buffer */
489
478
size_t read_length;
490
if (info->end_of_file <= pos_in_file_local)
479
if (info->end_of_file <= pos_in_file)
491
480
{ /* End of file */
492
481
info->error= (int) left_length;
495
length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
484
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
485
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
498
488
info->error= (read_length == (size_t) -1 ? -1 :
499
489
(int) (read_length+left_length));
502
Count-= length_local;
503
Buffer+= length_local;
504
pos_in_file_local+= length_local;
505
left_length+= length_local;
509
499
max_length= info->read_length-diff_length;
510
500
if (info->type != READ_FIFO &&
511
max_length > (info->end_of_file - pos_in_file_local))
512
max_length= (size_t) (info->end_of_file - pos_in_file_local);
501
max_length > (info->end_of_file - pos_in_file))
502
max_length= (size_t) (info->end_of_file - pos_in_file);
517
info->error= static_cast<int>(left_length); /* We only got this many char */
507
info->error= left_length; /* We only got this many char */
520
length_local=0; /* Didn't read any chars */
510
length=0; /* Didn't read any chars */
522
else if (( length_local= my_read(info->file,info->buffer, max_length,
512
else if ((length= my_read(info->file,info->buffer, max_length,
523
513
info->myflags)) < Count ||
524
length_local == (size_t) -1)
514
length == (size_t) -1)
526
if ( length_local != (size_t) -1)
527
memcpy(Buffer, info->buffer, length_local);
528
info->pos_in_file= pos_in_file_local;
529
info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
516
if (length != (size_t) -1)
517
memcpy(Buffer, info->buffer, length);
518
info->pos_in_file= pos_in_file;
519
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
520
info->read_pos=info->read_end=info->buffer;
533
523
info->read_pos=info->buffer+Count;
534
info->read_end=info->buffer+ length_local;
535
info->pos_in_file=pos_in_file_local;
524
info->read_end=info->buffer+length;
525
info->pos_in_file=pos_in_file;
536
526
memcpy(Buffer, info->buffer, Count);
532
Prepare IO_CACHE for shared use.
535
init_io_cache_share()
536
read_cache A read cache. This will be copied for
537
every thread after setup.
539
write_cache If non-NULL a write cache that is to be
540
synchronized with the read caches.
541
num_threads Number of threads sharing the cache
542
including the write thread if any.
546
The shared cache is used so: One IO_CACHE is initialized with
547
init_io_cache(). This includes the allocation of a buffer. Then a
548
share is allocated and init_io_cache_share() is called with the io
549
cache and the share. Then the io cache is copied for each thread. So
550
every thread has its own copy of IO_CACHE. But the allocated buffer
551
is shared because cache->buffer is the same for all caches.
553
One thread reads data from the file into the buffer. All threads
554
read from the buffer, but every thread maintains its own set of
555
pointers into the buffer. When all threads have used up the buffer
556
contents, one of the threads reads the next block of data into the
557
buffer. To accomplish this, each thread enters the cache lock before
558
accessing the buffer. They wait in lock_io_cache() until all threads
559
joined the lock. The last thread entering the lock is in charge of
560
reading from file to buffer. It wakes all threads when done.
562
Synchronizing a write cache to the read caches works so: Whenever
563
the write buffer needs a flush, the write thread enters the lock and
564
waits for all other threads to enter the lock too. They do this when
565
they have used up the read buffer. When all threads are in the lock,
566
the write thread copies the write buffer to the read buffer and
569
share->running_threads is the number of threads not being in the
570
cache lock. When entering lock_io_cache() the number is decreased.
571
When the thread that fills the buffer enters unlock_io_cache() the
572
number is reset to the number of threads. The condition
573
running_threads == 0 means that all threads are in the lock. Bumping
574
up the number to the full count is non-intuitive. But increasing the
575
number by one for each thread that leaves the lock could lead to a
576
solo run of one thread. The last thread to join a lock reads from
577
file to buffer, wakes the other threads, processes the data in the
578
cache and enters the lock again. If no other thread left the lock
579
meanwhile, it would think it's the last one again and read the next
582
The share has copies of 'error', 'buffer', 'read_end', and
583
'pos_in_file' from the thread that filled the buffer. We may not be
584
able to access this information directly from its cache because the
585
thread may be removed from the share before the variables could be
586
copied by all other threads. Or, if a write buffer is synchronized,
587
it would change its 'pos_in_file' after waking the other threads,
588
possibly before they could copy its value.
590
However, the 'buffer' variable in the share is for a synchronized
591
write cache. It needs to know where to put the data. Otherwise it
592
would need access to the read cache of one of the threads that is
593
not yet removed from the share.
599
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
600
IO_CACHE *write_cache, uint num_threads)
602
assert(num_threads > 1);
603
assert(read_cache->type == READ_CACHE);
604
assert(!write_cache || (write_cache->type == WRITE_CACHE));
606
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
607
pthread_cond_init(&cshare->cond, 0);
608
pthread_cond_init(&cshare->cond_writer, 0);
610
cshare->running_threads= num_threads;
611
cshare->total_threads= num_threads;
612
cshare->error= 0; /* Initialize. */
613
cshare->buffer= read_cache->buffer;
614
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
615
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
616
cshare->source_cache= write_cache; /* Can be NULL. */
618
read_cache->share= cshare;
619
read_cache->read_function= _my_b_read_r;
620
read_cache->current_pos= NULL;
621
read_cache->current_end= NULL;
624
write_cache->share= cshare;
631
Remove a thread from shared access to IO_CACHE.
635
cache The IO_CACHE to be removed from the share.
639
Every thread must do that on exit for not to deadlock other threads.
641
The last thread destroys the pthread resources.
643
A writer flushes its cache first.
649
void remove_io_thread(IO_CACHE *cache)
651
IO_CACHE_SHARE *cshare= cache->share;
654
/* If the writer goes, it needs to flush the write cache. */
655
if (cache == cshare->source_cache)
656
flush_io_cache(cache);
658
pthread_mutex_lock(&cshare->mutex);
660
/* Remove from share. */
661
total= --cshare->total_threads;
663
/* Detach from share. */
666
/* If the writer goes, let the readers know. */
667
if (cache == cshare->source_cache)
669
cshare->source_cache= NULL;
672
/* If all threads are waiting for me to join the lock, wake them. */
673
if (!--cshare->running_threads)
675
pthread_cond_signal(&cshare->cond_writer);
676
pthread_cond_broadcast(&cshare->cond);
679
pthread_mutex_unlock(&cshare->mutex);
683
pthread_cond_destroy (&cshare->cond_writer);
684
pthread_cond_destroy (&cshare->cond);
685
pthread_mutex_destroy(&cshare->mutex);
693
Lock IO cache and wait for all other threads to join.
697
cache The cache of the thread entering the lock.
698
pos File position of the block to read.
699
Unused for the write thread.
703
Wait for all threads to finish with the current buffer. We want
704
all threads to proceed in concert. The last thread to join
705
lock_io_cache() will read the block from file and all threads start
706
to use it. Then they will join again for reading the next block.
708
The waiting threads detect a fresh buffer by comparing
709
cshare->pos_in_file with the position they want to process next.
710
Since the first block may start at position 0, we take
711
cshare->read_end as an additional condition. This variable is
712
initialized to NULL and will be set after a block of data is written
716
1 OK, lock in place, go ahead and read.
717
0 OK, unlocked, another thread did the read.
720
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
722
IO_CACHE_SHARE *cshare= cache->share;
724
/* Enter the lock. */
725
pthread_mutex_lock(&cshare->mutex);
726
cshare->running_threads--;
728
if (cshare->source_cache)
730
/* A write cache is synchronized to the read caches. */
732
if (cache == cshare->source_cache)
734
/* The writer waits until all readers are here. */
735
while (cshare->running_threads)
737
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
739
/* Stay locked. Leave the lock later by unlock_io_cache(). */
743
/* The last thread wakes the writer. */
744
if (!cshare->running_threads)
746
pthread_cond_signal(&cshare->cond_writer);
750
Readers wait until the data is copied from the writer. Another
751
reason to stop waiting is the removal of the write thread. If this
752
happens, we leave the lock with old data in the buffer.
754
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
755
cshare->source_cache)
757
pthread_cond_wait(&cshare->cond, &cshare->mutex);
761
If the writer was removed from the share while this thread was
762
asleep, we need to simulate an EOF condition. The writer cannot
763
reset the share variables as they might still be in use by readers
764
of the last block. When we awake here then because the last
765
joining thread signalled us. If the writer is not the last, it
766
will not signal. So it is safe to clear the buffer here.
768
if (!cshare->read_end || (cshare->pos_in_file < pos))
770
cshare->read_end= cshare->buffer; /* Empty buffer. */
771
cshare->error= 0; /* EOF is not an error. */
777
There are read caches only. The last thread arriving in
778
lock_io_cache() continues with a locked cache and reads the block.
780
if (!cshare->running_threads)
782
/* Stay locked. Leave the lock later by unlock_io_cache(). */
787
All other threads wait until the requested block is read by the
788
last thread arriving. Another reason to stop waiting is the
789
removal of a thread. If this leads to all threads being in the
790
lock, we have to continue also. The first of the awaken threads
791
will then do the read.
793
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
794
cshare->running_threads)
796
pthread_cond_wait(&cshare->cond, &cshare->mutex);
799
/* If the block is not yet read, continue with a locked cache and read. */
800
if (!cshare->read_end || (cshare->pos_in_file < pos))
802
/* Stay locked. Leave the lock later by unlock_io_cache(). */
806
/* Another thread did read the block already. */
810
Leave the lock. Do not call unlock_io_cache() later. The thread that
811
filled the buffer did this and marked all threads as running.
813
pthread_mutex_unlock(&cshare->mutex);
823
cache The cache of the thread leaving the lock.
826
This is called by the thread that filled the buffer. It marks all
827
threads as running and awakes them. This must not be done by any
830
Do not signal cond_writer. Either there is no writer or the writer
831
is the only one who can call this function.
833
The reason for resetting running_threads to total_threads before
834
waking all other threads is that it could be possible that this
835
thread is so fast with processing the buffer that it enters the lock
836
before even one other thread has left it. If every awoken thread
837
would increase running_threads by one, this thread could think that
838
he is again the last to join and would not wait for the other
839
threads to process the data.
845
static void unlock_io_cache(IO_CACHE *cache)
847
IO_CACHE_SHARE *cshare= cache->share;
849
cshare->running_threads= cshare->total_threads;
850
pthread_cond_broadcast(&cshare->cond);
851
pthread_mutex_unlock(&cshare->mutex);
857
Read from IO_CACHE when it is shared between several threads.
861
cache IO_CACHE pointer
862
Buffer Buffer to retrieve count bytes from file
863
Count Number of bytes to read into Buffer
866
This function is only called from the my_b_read() macro when there
867
isn't enough characters in the buffer to satisfy the request.
871
It works as follows: when a thread tries to read from a file (that
872
is, after using all the data from the (shared) buffer), it just
873
hangs on lock_io_cache(), waiting for other threads. When the very
874
last thread attempts a read, lock_io_cache() returns 1, the thread
875
does actual IO and unlock_io_cache(), which signals all the waiting
876
threads that data is in the buffer.
880
When changing this function, be careful with handling file offsets
881
(end-of_file, pos_in_file). Do not cast them to possibly smaller
882
types than my_off_t unless you can be sure that their value fits.
883
Same applies to differences of file offsets. (Bug #11527)
885
When changing this function, check _my_b_read(). It might need the
889
0 we succeeded in reading all data
890
1 Error: can't read requested characters
893
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
895
my_off_t pos_in_file;
896
size_t length, diff_length, left_length;
897
IO_CACHE_SHARE *cshare= cache->share;
899
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
901
assert(Count >= left_length); /* User is not using my_b_read() */
902
memcpy(Buffer, cache->read_pos, left_length);
903
Buffer+= left_length;
910
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
911
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
912
length=IO_ROUND_UP(Count+diff_length)-diff_length;
913
length= ((length <= cache->read_length) ?
914
length + IO_ROUND_DN(cache->read_length - length) :
915
length - IO_ROUND_UP(length - cache->read_length));
916
if (cache->type != READ_FIFO &&
917
(length > (cache->end_of_file - pos_in_file)))
918
length= (size_t) (cache->end_of_file - pos_in_file);
921
cache->error= (int) left_length;
924
if (lock_io_cache(cache, pos_in_file))
926
/* With a synchronized write/read cache we won't come here... */
927
assert(!cshare->source_cache);
929
... unless the writer has gone before this thread entered the
930
lock. Simulate EOF in this case. It can be distinguished by
938
Whenever a function which operates on IO_CACHE flushes/writes
939
some part of the IO_CACHE to disk it will set the property
940
"seek_not_done" to indicate this to other functions operating
943
if (cache->seek_not_done)
945
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
949
unlock_io_cache(cache);
953
len= my_read(cache->file, cache->buffer, length, cache->myflags);
955
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
956
cache->error= (len == length ? 0 : (int) len);
957
cache->pos_in_file= pos_in_file;
959
/* Copy important values to the share. */
960
cshare->error= cache->error;
961
cshare->read_end= cache->read_end;
962
cshare->pos_in_file= pos_in_file;
964
/* Mark all threads as running and wake them. */
965
unlock_io_cache(cache);
970
With a synchronized write/read cache readers always come here.
971
Copy important values from the share.
973
cache->error= cshare->error;
974
cache->read_end= cshare->read_end;
975
cache->pos_in_file= cshare->pos_in_file;
977
len= ((cache->error == -1) ? (size_t) -1 :
978
(size_t) (cache->read_end - cache->buffer));
980
cache->read_pos= cache->buffer;
981
cache->seek_not_done= 0;
982
if (len == 0 || len == (size_t) -1)
984
cache->error= (int) left_length;
987
cnt= (len > Count) ? Count : len;
988
memcpy(Buffer, cache->read_pos, cnt);
992
cache->read_pos+= cnt;
999
Copy data from write cache to read cache.
1002
copy_to_read_buffer()
1003
write_cache The write cache.
1004
write_buffer The source of data, mostly the cache buffer.
1005
write_length The number of bytes to copy.
1008
The write thread will wait for all read threads to join the cache
1009
lock. Then it copies the data over and wakes the read threads.
1015
static void copy_to_read_buffer(IO_CACHE *write_cache,
1016
const uchar *write_buffer, size_t write_length)
1018
IO_CACHE_SHARE *cshare= write_cache->share;
1020
assert(cshare->source_cache == write_cache);
1022
write_length is usually less or equal to buffer_length.
1023
It can be bigger if _my_b_write() is called with a big length.
1025
while (write_length)
1027
size_t copy_length= min(write_length, write_cache->buffer_length);
1028
int __attribute__((unused)) rc;
1030
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1031
/* The writing thread does always have the lock when it awakes. */
1034
memcpy(cshare->buffer, write_buffer, copy_length);
1037
cshare->read_end= cshare->buffer + copy_length;
1038
cshare->pos_in_file= write_cache->pos_in_file;
1040
/* Mark all threads as running and wake them. */
1041
unlock_io_cache(write_cache);
1043
write_buffer+= copy_length;
1044
write_length-= copy_length;
1050
Do sequential read from the SEQ_READ_APPEND cache.
1052
We do this in three stages:
1053
- first read from info->buffer
1054
- then if there are still data to read, try the file descriptor
1055
- afterwards, if there are still data to read, try append buffer
1062
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1064
size_t length, diff_length, left_length, save_count, max_length;
1065
my_off_t pos_in_file;
1068
/* first, read the regular buffer */
1069
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1071
assert(Count > left_length); /* User is not using my_b_read() */
1072
memcpy(Buffer,info->read_pos, left_length);
1073
Buffer+=left_length;
1076
lock_append_buffer(info);
1078
/* pos_in_file always point on where info->buffer was read */
1079
if ((pos_in_file=info->pos_in_file +
1080
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1081
goto read_append_buffer;
1084
With read-append cache we must always do a seek before we read,
1085
because the write could have moved the file pointer astray
1087
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1090
unlock_append_buffer(info);
1093
info->seek_not_done=0;
1095
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1097
/* now the second stage begins - read from file descriptor */
1098
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1100
/* Fill first intern buffer */
1103
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1104
if ((read_length= my_read(info->file,Buffer, length,
1105
info->myflags)) == (size_t) -1)
1108
unlock_append_buffer(info);
1112
Buffer+=read_length;
1113
pos_in_file+=read_length;
1115
if (read_length != length)
1118
We only got part of data; Read the rest of the data from the
1121
goto read_append_buffer;
1123
left_length+=length;
1127
max_length= info->read_length-diff_length;
1128
if (max_length > (info->end_of_file - pos_in_file))
1129
max_length= (size_t) (info->end_of_file - pos_in_file);
1133
goto read_append_buffer;
1134
length=0; /* Didn't read any more chars */
1138
length= my_read(info->file,info->buffer, max_length, info->myflags);
1139
if (length == (size_t) -1)
1142
unlock_append_buffer(info);
1147
memcpy(Buffer, info->buffer, length);
1152
added the line below to make
1153
assert(pos_in_file==info->end_of_file) pass.
1154
otherwise this does not appear to be needed
1156
pos_in_file += length;
1157
goto read_append_buffer;
1160
unlock_append_buffer(info);
1161
info->read_pos=info->buffer+Count;
1162
info->read_end=info->buffer+length;
1163
info->pos_in_file=pos_in_file;
1164
memcpy(Buffer,info->buffer,(size_t) Count);
1170
Read data from the current write buffer.
1171
Count should never be == 0 here (The code will work even if count is 0)
1175
/* First copy the data to Count */
1176
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1178
size_t transfer_len;
1180
assert(info->append_read_pos <= info->write_pos);
1182
TODO: figure out if the assert below is needed or correct.
1184
assert(pos_in_file == info->end_of_file);
1185
copy_len=min(Count, len_in_buff);
1186
memcpy(Buffer, info->append_read_pos, copy_len);
1187
info->append_read_pos += copy_len;
1190
info->error = save_count - Count;
1192
/* Fill read buffer with data from write buffer */
1193
memcpy(info->buffer, info->append_read_pos,
1194
(size_t) (transfer_len=len_in_buff - copy_len));
1195
info->read_pos= info->buffer;
1196
info->read_end= info->buffer+transfer_len;
1197
info->append_read_pos=info->write_pos;
1198
info->pos_in_file=pos_in_file+copy_len;
1199
info->end_of_file+=len_in_buff;
1201
unlock_append_buffer(info);
1202
return Count ? 1 : 0;
541
1206
#ifdef HAVE_AIOWAIT
545
* Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
547
* @param info st_io_cache pointer
548
* @param Buffer Buffer to retrieve count bytes from file
549
* @param Count Number of bytes to read into Buffer
551
* @retval -1 An error has occurred; errno is set.
553
* @retval 1 An error has occurred; st_io_cache to error state.
555
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
1209
Read from the IO_CACHE into a buffer and feed asynchronously
1210
from disk when needed.
1214
info IO_CACHE pointer
1215
Buffer Buffer to retrieve count bytes from file
1216
Count Number of bytes to read into Buffer
1219
-1 An error has occurred; my_errno is set.
1221
1 An error has occurred; IO_CACHE to error state.
1224
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
557
size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1226
size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1227
size_t max_length;
559
1228
my_off_t next_pos_in_file;
560
unsigned char *read_buffer;
562
1231
memcpy(Buffer,info->read_pos,
563
1232
(left_length= (size_t) (info->read_end-info->read_pos)));
761
1431
if (Count >= IO_SIZE)
762
1432
{ /* Fill first intern buffer */
763
length_local=Count & (size_t) ~(IO_SIZE-1);
1433
length=Count & (size_t) ~(IO_SIZE-1);
764
1434
if (info->seek_not_done)
767
Whenever a function which operates on st_io_cache flushes/writes
768
some part of the st_io_cache to disk it will set the property
1437
Whenever a function which operates on IO_CACHE flushes/writes
1438
some part of the IO_CACHE to disk it will set the property
769
1439
"seek_not_done" to indicate this to other functions operating
772
if (lseek(info->file,info->pos_in_file,SEEK_SET))
1442
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
774
1444
info->error= -1;
777
1447
info->seek_not_done=0;
779
if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
return info->error= -1;
783
Buffer+=length_local;
784
info->pos_in_file+=length_local;
786
memcpy(info->write_pos,Buffer,(size_t) Count);
787
info->write_pos+=Count;
793
* Write a block to disk where part of the data may be inside the record buffer.
794
* As all write calls to the data goes through the cache,
795
* we will never get a seek over the end of the buffer.
797
int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
1449
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1450
return info->error= -1;
1453
In case of a shared I/O cache with a writer we normally do direct
1454
write cache to read cache copy. Simulate this here by direct
1455
caller buffer to read cache copy. Do it after the write so that
1456
the cache readers actions on the flushed part can go in parallel
1457
with the write of the extra stuff. copy_to_read_buffer()
1458
synchronizes writer and readers so that after this call the
1459
readers can act on the extra stuff while the writer can go ahead
1460
and prepare the next output. copy_to_read_buffer() relies on
1464
copy_to_read_buffer(info, Buffer, length);
1468
info->pos_in_file+=length;
1470
memcpy(info->write_pos,Buffer,(size_t) Count);
1471
info->write_pos+=Count;
1477
Append a block to the write buffer.
1478
This is done with the buffer locked to ensure that we don't read from
1479
the write buffer before we are ready with it.
1482
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1484
size_t rest_length,length;
1487
Assert that we cannot come here with a shared cache. If we do one
1488
day, we might need to add a call to copy_to_read_buffer().
1490
assert(!info->share);
1492
lock_append_buffer(info);
1493
rest_length= (size_t) (info->write_end - info->write_pos);
1494
if (Count <= rest_length)
1496
memcpy(info->write_pos, Buffer, rest_length);
1497
Buffer+=rest_length;
1499
info->write_pos+=rest_length;
1500
if (my_b_flush_io_cache(info,0))
1502
unlock_append_buffer(info);
1505
if (Count >= IO_SIZE)
1506
{ /* Fill first intern buffer */
1507
length=Count & (size_t) ~(IO_SIZE-1);
1508
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1510
unlock_append_buffer(info);
1511
return info->error= -1;
1515
info->end_of_file+=length;
1519
memcpy(info->write_pos,Buffer,(size_t) Count);
1520
info->write_pos+=Count;
1521
unlock_append_buffer(info);
1526
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1529
Sasha: We are not writing this with the ? operator to avoid hitting
1530
a possible compiler bug. At least gcc 2.95 cannot deal with
1531
several layers of ternary operators that evaluated comma(,) operator
1532
expressions inside - I do have a test case if somebody wants it
1534
if (info->type == SEQ_READ_APPEND)
1535
return my_b_append(info, Buffer, Count);
1536
return my_b_write(info, Buffer, Count);
1541
Write a block to disk where part of the data may be inside the record
1542
buffer. As all write calls to the data goes through the cache,
1543
we will never get a seek over the end of the buffer
1546
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1553
Assert that we cannot come here with a shared cache. If we do one
1554
day, we might need to add a call to copy_to_read_buffer().
1556
assert(!info->share);
803
1558
if (pos < info->pos_in_file)
805
1560
/* Of no overlap, write everything without buffering */
806
1561
if (pos + Count <= info->pos_in_file)
807
1562
return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1563
/* Write the part of the block that is before buffer */
809
length_local= (uint32_t) (info->pos_in_file - pos);
810
if (pwrite(info->file, Buffer, length_local, pos) == 0)
1564
length= (uint) (info->pos_in_file - pos);
1565
if (pwrite(info->file, Buffer, length, pos) == 0)
811
1566
info->error= error= -1;
812
Buffer+=length_local;
814
Count-= length_local;
1571
info->seek_not_done=1;
817
1575
/* Check if we want to write inside the used part of the buffer.*/
818
length_local= (size_t) (info->write_end - info->buffer);
819
if (pos < info->pos_in_file + length_local)
1576
length= (size_t) (info->write_end - info->buffer);
1577
if (pos < info->pos_in_file + length)
821
1579
size_t offset= (size_t) (pos - info->pos_in_file);
822
length_local-=offset;
823
if (length_local > Count)
825
memcpy(info->buffer+offset, Buffer, length_local);
826
Buffer+=length_local;
827
Count-= length_local;
828
/* Fix length_local of buffer if the new data was larger */
829
if (info->buffer+length_local > info->write_pos)
830
info->write_pos=info->buffer+length_local;
1583
memcpy(info->buffer+offset, Buffer, length);
1586
/* Fix length of buffer if the new data was larger */
1587
if (info->buffer+length > info->write_pos)
1588
info->write_pos=info->buffer+length;
909
unlock_append_buffer(info, need_append_buffer_lock);
1684
UNLOCK_APPEND_BUFFER;
915
* Free an st_io_cache object
918
* It's currently safe to call this if one has called init_io_cache()
919
* on the 'info' object, even if init_io_cache() failed.
920
* This function is also safe to call twice with the same handle.
922
* @param info st_io_cache Handle to free
927
int st_io_cache::end_io_cache()
1689
Free an IO_CACHE object
1693
info IO_CACHE Handle to free
1696
It's currently safe to call this if one has called init_io_cache()
1697
on the 'info' object, even if init_io_cache() failed.
1698
This function is also safe to call twice with the same handle.
1705
int end_io_cache(IO_CACHE *info)
938
if (type == READ_CACHE)
939
global_read_buffer.sub(buffer_length);
941
if (file != -1) /* File doesn't exist */
942
_error= my_b_flush_io_cache(this, 1);
943
free((unsigned char*) buffer);
944
buffer=read_pos=(unsigned char*) 0;
1708
IO_CACHE_CALLBACK pre_close;
1711
Every thread must call remove_io_thread(). The last one destroys
1714
assert(!info->share || !info->share->total_threads);
1716
if ((pre_close=info->pre_close))
1721
if (info->alloced_buffer)
1723
info->alloced_buffer=0;
1724
if (info->file != -1) /* File doesn't exist */
1725
error= my_b_flush_io_cache(info,1);
1726
my_free((uchar*) info->buffer,MYF(MY_WME));
1727
info->buffer=info->read_pos=(uchar*) 0;
1729
if (info->type == SEQ_READ_APPEND)
1731
/* Destroy allocated mutex */
1732
info->type= TYPE_NOT_SET;
1733
pthread_mutex_destroy(&info->append_buffer_lock);
948
1736
} /* end_io_cache */
950
} /* namespace internal */
951
} /* namespace drizzled */
1739
/**********************************************************************
1740
Testing of MF_IOCACHE
1741
**********************************************************************/
1747
void die(const char* fmt, ...)
1750
va_start(va_args,fmt);
1751
fprintf(stderr,"Error:");
1752
vfprintf(stderr, fmt,va_args);
1753
fprintf(stderr,", errno=%d\n", errno);
1757
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1760
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1761
die("Could not open %s", fname);
1762
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1763
die("failed in init_io_cache()");
1767
void close_file(IO_CACHE* info)
1770
my_close(info->file, MYF(MY_WME));
1773
int main(int argc, char** argv)
1775
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1777
const char* fname="/tmp/iocache.test";
1778
int cache_size=16384;
1780
int max_block,total_bytes=0;
1781
int i,num_loops=100,error=0;
1783
char* block, *block_end;
1785
max_block = cache_size*3;
1786
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1787
die("Not enough memory to allocate test block");
1788
block_end = block + max_block;
1789
for (p = block,i=0; p < block_end;i++)
1793
if (my_stat(fname,&status, MYF(0)) &&
1794
my_delete(fname,MYF(MY_WME)))
1796
die("Delete of %s failed, aborting", fname);
1798
open_file(fname,&sra_cache, cache_size);
1799
for (i = 0; i < num_loops; i++)
1802
int block_size = abs(rand() % max_block);
1803
int4store(buf, block_size);
1804
if (my_b_append(&sra_cache,buf,4) ||
1805
my_b_append(&sra_cache, block, block_size))
1806
die("write failed");
1807
total_bytes += 4+block_size;
1809
close_file(&sra_cache);
1810
my_free(block,MYF(MY_WME));
1811
if (!my_stat(fname,&status,MYF(MY_WME)))
1812
die("%s failed to stat, but I had just closed it,\
1813
wonder how that happened");
1814
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1815
llstr(status.st_size,llstr_buf),
1817
my_delete(fname, MYF(MY_WME));
1818
/* check correctness of tests */
1819
if (total_bytes != status.st_size)
1821
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1822
supposedly written\n");