477
471
info->file is a pipe or socket or FIFO. We never should have tried
478
472
to seek on that. See Bugs#25807 and #22828 for more info.
480
assert(errno != ESPIPE);
474
assert(my_errno != ESPIPE);
486
diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
480
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
481
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
482
{ /* Fill first intern buffer */
489
483
size_t read_length;
490
if (info->end_of_file <= pos_in_file_local)
484
if (info->end_of_file <= pos_in_file)
491
485
{ /* End of file */
492
486
info->error= (int) left_length;
495
length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
489
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
490
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
498
493
info->error= (read_length == (size_t) -1 ? -1 :
499
494
(int) (read_length+left_length));
502
Count-= length_local;
503
Buffer+= length_local;
504
pos_in_file_local+= length_local;
505
left_length+= length_local;
509
504
max_length= info->read_length-diff_length;
510
505
if (info->type != READ_FIFO &&
511
max_length > (info->end_of_file - pos_in_file_local))
512
max_length= (size_t) (info->end_of_file - pos_in_file_local);
506
max_length > (info->end_of_file - pos_in_file))
507
max_length= (size_t) (info->end_of_file - pos_in_file);
517
info->error= static_cast<int>(left_length); /* We only got this many char */
512
info->error= left_length; /* We only got this many char */
520
length_local=0; /* Didn't read any chars */
515
length=0; /* Didn't read any chars */
522
else if (( length_local= my_read(info->file,info->buffer, max_length,
517
else if ((length= my_read(info->file,info->buffer, max_length,
523
518
info->myflags)) < Count ||
524
length_local == (size_t) -1)
519
length == (size_t) -1)
526
if ( length_local != (size_t) -1)
527
memcpy(Buffer, info->buffer, length_local);
528
info->pos_in_file= pos_in_file_local;
529
info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
521
if (length != (size_t) -1)
522
memcpy(Buffer, info->buffer, length);
523
info->pos_in_file= pos_in_file;
524
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
525
info->read_pos=info->read_end=info->buffer;
533
528
info->read_pos=info->buffer+Count;
534
info->read_end=info->buffer+ length_local;
535
info->pos_in_file=pos_in_file_local;
529
info->read_end=info->buffer+length;
530
info->pos_in_file=pos_in_file;
536
531
memcpy(Buffer, info->buffer, Count);
537
Prepare IO_CACHE for shared use.
540
init_io_cache_share()
541
read_cache A read cache. This will be copied for
542
every thread after setup.
544
write_cache If non-NULL a write cache that is to be
545
synchronized with the read caches.
546
num_threads Number of threads sharing the cache
547
including the write thread if any.
551
The shared cache is used so: One IO_CACHE is initialized with
552
init_io_cache(). This includes the allocation of a buffer. Then a
553
share is allocated and init_io_cache_share() is called with the io
554
cache and the share. Then the io cache is copied for each thread. So
555
every thread has its own copy of IO_CACHE. But the allocated buffer
556
is shared because cache->buffer is the same for all caches.
558
One thread reads data from the file into the buffer. All threads
559
read from the buffer, but every thread maintains its own set of
560
pointers into the buffer. When all threads have used up the buffer
561
contents, one of the threads reads the next block of data into the
562
buffer. To accomplish this, each thread enters the cache lock before
563
accessing the buffer. They wait in lock_io_cache() until all threads
564
joined the lock. The last thread entering the lock is in charge of
565
reading from file to buffer. It wakes all threads when done.
567
Synchronizing a write cache to the read caches works so: Whenever
568
the write buffer needs a flush, the write thread enters the lock and
569
waits for all other threads to enter the lock too. They do this when
570
they have used up the read buffer. When all threads are in the lock,
571
the write thread copies the write buffer to the read buffer and
574
share->running_threads is the number of threads not being in the
575
cache lock. When entering lock_io_cache() the number is decreased.
576
When the thread that fills the buffer enters unlock_io_cache() the
577
number is reset to the number of threads. The condition
578
running_threads == 0 means that all threads are in the lock. Bumping
579
up the number to the full count is non-intuitive. But increasing the
580
number by one for each thread that leaves the lock could lead to a
581
solo run of one thread. The last thread to join a lock reads from
582
file to buffer, wakes the other threads, processes the data in the
583
cache and enters the lock again. If no other thread left the lock
584
meanwhile, it would think it's the last one again and read the next
587
The share has copies of 'error', 'buffer', 'read_end', and
588
'pos_in_file' from the thread that filled the buffer. We may not be
589
able to access this information directly from its cache because the
590
thread may be removed from the share before the variables could be
591
copied by all other threads. Or, if a write buffer is synchronized,
592
it would change its 'pos_in_file' after waking the other threads,
593
possibly before they could copy its value.
595
However, the 'buffer' variable in the share is for a synchronized
596
write cache. It needs to know where to put the data. Otherwise it
597
would need access to the read cache of one of the threads that is
598
not yet removed from the share.
604
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
605
IO_CACHE *write_cache, uint32_t num_threads)
607
assert(num_threads > 1);
608
assert(read_cache->type == READ_CACHE);
609
assert(!write_cache || (write_cache->type == WRITE_CACHE));
611
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
612
pthread_cond_init(&cshare->cond, 0);
613
pthread_cond_init(&cshare->cond_writer, 0);
615
cshare->running_threads= num_threads;
616
cshare->total_threads= num_threads;
617
cshare->error= 0; /* Initialize. */
618
cshare->buffer= read_cache->buffer;
619
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
620
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
621
cshare->source_cache= write_cache; /* Can be NULL. */
623
read_cache->share= cshare;
624
read_cache->read_function= _my_b_read_r;
625
read_cache->current_pos= NULL;
626
read_cache->current_end= NULL;
629
write_cache->share= cshare;
636
Remove a thread from shared access to IO_CACHE.
640
cache The IO_CACHE to be removed from the share.
644
Every thread must do that on exit for not to deadlock other threads.
646
The last thread destroys the pthread resources.
648
A writer flushes its cache first.
654
void remove_io_thread(IO_CACHE *cache)
656
IO_CACHE_SHARE *cshare= cache->share;
659
/* If the writer goes, it needs to flush the write cache. */
660
if (cache == cshare->source_cache)
661
flush_io_cache(cache);
663
pthread_mutex_lock(&cshare->mutex);
665
/* Remove from share. */
666
total= --cshare->total_threads;
668
/* Detach from share. */
671
/* If the writer goes, let the readers know. */
672
if (cache == cshare->source_cache)
674
cshare->source_cache= NULL;
677
/* If all threads are waiting for me to join the lock, wake them. */
678
if (!--cshare->running_threads)
680
pthread_cond_signal(&cshare->cond_writer);
681
pthread_cond_broadcast(&cshare->cond);
684
pthread_mutex_unlock(&cshare->mutex);
688
pthread_cond_destroy (&cshare->cond_writer);
689
pthread_cond_destroy (&cshare->cond);
690
pthread_mutex_destroy(&cshare->mutex);
698
Lock IO cache and wait for all other threads to join.
702
cache The cache of the thread entering the lock.
703
pos File position of the block to read.
704
Unused for the write thread.
708
Wait for all threads to finish with the current buffer. We want
709
all threads to proceed in concert. The last thread to join
710
lock_io_cache() will read the block from file and all threads start
711
to use it. Then they will join again for reading the next block.
713
The waiting threads detect a fresh buffer by comparing
714
cshare->pos_in_file with the position they want to process next.
715
Since the first block may start at position 0, we take
716
cshare->read_end as an additional condition. This variable is
717
initialized to NULL and will be set after a block of data is written
721
1 OK, lock in place, go ahead and read.
722
0 OK, unlocked, another thread did the read.
725
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
727
IO_CACHE_SHARE *cshare= cache->share;
729
/* Enter the lock. */
730
pthread_mutex_lock(&cshare->mutex);
731
cshare->running_threads--;
733
if (cshare->source_cache)
735
/* A write cache is synchronized to the read caches. */
737
if (cache == cshare->source_cache)
739
/* The writer waits until all readers are here. */
740
while (cshare->running_threads)
742
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
744
/* Stay locked. Leave the lock later by unlock_io_cache(). */
748
/* The last thread wakes the writer. */
749
if (!cshare->running_threads)
751
pthread_cond_signal(&cshare->cond_writer);
755
Readers wait until the data is copied from the writer. Another
756
reason to stop waiting is the removal of the write thread. If this
757
happens, we leave the lock with old data in the buffer.
759
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
760
cshare->source_cache)
762
pthread_cond_wait(&cshare->cond, &cshare->mutex);
766
If the writer was removed from the share while this thread was
767
asleep, we need to simulate an EOF condition. The writer cannot
768
reset the share variables as they might still be in use by readers
769
of the last block. When we awake here then because the last
770
joining thread signalled us. If the writer is not the last, it
771
will not signal. So it is safe to clear the buffer here.
773
if (!cshare->read_end || (cshare->pos_in_file < pos))
775
cshare->read_end= cshare->buffer; /* Empty buffer. */
776
cshare->error= 0; /* EOF is not an error. */
782
There are read caches only. The last thread arriving in
783
lock_io_cache() continues with a locked cache and reads the block.
785
if (!cshare->running_threads)
787
/* Stay locked. Leave the lock later by unlock_io_cache(). */
792
All other threads wait until the requested block is read by the
793
last thread arriving. Another reason to stop waiting is the
794
removal of a thread. If this leads to all threads being in the
795
lock, we have to continue also. The first of the awaken threads
796
will then do the read.
798
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
799
cshare->running_threads)
801
pthread_cond_wait(&cshare->cond, &cshare->mutex);
804
/* If the block is not yet read, continue with a locked cache and read. */
805
if (!cshare->read_end || (cshare->pos_in_file < pos))
807
/* Stay locked. Leave the lock later by unlock_io_cache(). */
811
/* Another thread did read the block already. */
815
Leave the lock. Do not call unlock_io_cache() later. The thread that
816
filled the buffer did this and marked all threads as running.
818
pthread_mutex_unlock(&cshare->mutex);
828
cache The cache of the thread leaving the lock.
831
This is called by the thread that filled the buffer. It marks all
832
threads as running and awakes them. This must not be done by any
835
Do not signal cond_writer. Either there is no writer or the writer
836
is the only one who can call this function.
838
The reason for resetting running_threads to total_threads before
839
waking all other threads is that it could be possible that this
840
thread is so fast with processing the buffer that it enters the lock
841
before even one other thread has left it. If every awoken thread
842
would increase running_threads by one, this thread could think that
843
he is again the last to join and would not wait for the other
844
threads to process the data.
850
static void unlock_io_cache(IO_CACHE *cache)
852
IO_CACHE_SHARE *cshare= cache->share;
854
cshare->running_threads= cshare->total_threads;
855
pthread_cond_broadcast(&cshare->cond);
856
pthread_mutex_unlock(&cshare->mutex);
862
Read from IO_CACHE when it is shared between several threads.
866
cache IO_CACHE pointer
867
Buffer Buffer to retrieve count bytes from file
868
Count Number of bytes to read into Buffer
871
This function is only called from the my_b_read() macro when there
872
isn't enough characters in the buffer to satisfy the request.
876
It works as follows: when a thread tries to read from a file (that
877
is, after using all the data from the (shared) buffer), it just
878
hangs on lock_io_cache(), waiting for other threads. When the very
879
last thread attempts a read, lock_io_cache() returns 1, the thread
880
does actual IO and unlock_io_cache(), which signals all the waiting
881
threads that data is in the buffer.
885
When changing this function, be careful with handling file offsets
886
(end-of_file, pos_in_file). Do not cast them to possibly smaller
887
types than my_off_t unless you can be sure that their value fits.
888
Same applies to differences of file offsets. (Bug #11527)
890
When changing this function, check _my_b_read(). It might need the
894
0 we succeeded in reading all data
895
1 Error: can't read requested characters
898
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
900
my_off_t pos_in_file;
901
size_t length, diff_length, left_length;
902
IO_CACHE_SHARE *cshare= cache->share;
904
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
906
assert(Count >= left_length); /* User is not using my_b_read() */
907
memcpy(Buffer, cache->read_pos, left_length);
908
Buffer+= left_length;
915
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
916
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
917
length=IO_ROUND_UP(Count+diff_length)-diff_length;
918
length= ((length <= cache->read_length) ?
919
length + IO_ROUND_DN(cache->read_length - length) :
920
length - IO_ROUND_UP(length - cache->read_length));
921
if (cache->type != READ_FIFO &&
922
(length > (cache->end_of_file - pos_in_file)))
923
length= (size_t) (cache->end_of_file - pos_in_file);
926
cache->error= (int) left_length;
929
if (lock_io_cache(cache, pos_in_file))
931
/* With a synchronized write/read cache we won't come here... */
932
assert(!cshare->source_cache);
934
... unless the writer has gone before this thread entered the
935
lock. Simulate EOF in this case. It can be distinguished by
943
Whenever a function which operates on IO_CACHE flushes/writes
944
some part of the IO_CACHE to disk it will set the property
945
"seek_not_done" to indicate this to other functions operating
948
if (cache->seek_not_done)
950
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
954
unlock_io_cache(cache);
958
len= my_read(cache->file, cache->buffer, length, cache->myflags);
960
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
961
cache->error= (len == length ? 0 : (int) len);
962
cache->pos_in_file= pos_in_file;
964
/* Copy important values to the share. */
965
cshare->error= cache->error;
966
cshare->read_end= cache->read_end;
967
cshare->pos_in_file= pos_in_file;
969
/* Mark all threads as running and wake them. */
970
unlock_io_cache(cache);
975
With a synchronized write/read cache readers always come here.
976
Copy important values from the share.
978
cache->error= cshare->error;
979
cache->read_end= cshare->read_end;
980
cache->pos_in_file= cshare->pos_in_file;
982
len= ((cache->error == -1) ? (size_t) -1 :
983
(size_t) (cache->read_end - cache->buffer));
985
cache->read_pos= cache->buffer;
986
cache->seek_not_done= 0;
987
if (len == 0 || len == (size_t) -1)
989
cache->error= (int) left_length;
992
cnt= (len > Count) ? Count : len;
993
memcpy(Buffer, cache->read_pos, cnt);
997
cache->read_pos+= cnt;
1004
Copy data from write cache to read cache.
1007
copy_to_read_buffer()
1008
write_cache The write cache.
1009
write_buffer The source of data, mostly the cache buffer.
1010
write_length The number of bytes to copy.
1013
The write thread will wait for all read threads to join the cache
1014
lock. Then it copies the data over and wakes the read threads.
1020
static void copy_to_read_buffer(IO_CACHE *write_cache,
1021
const unsigned char *write_buffer, size_t write_length)
1023
IO_CACHE_SHARE *cshare= write_cache->share;
1025
assert(cshare->source_cache == write_cache);
1027
write_length is usually less or equal to buffer_length.
1028
It can be bigger if _my_b_write() is called with a big length.
1030
while (write_length)
1032
size_t copy_length= cmin(write_length, write_cache->buffer_length);
1035
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1036
/* The writing thread does always have the lock when it awakes. */
1039
memcpy(cshare->buffer, write_buffer, copy_length);
1042
cshare->read_end= cshare->buffer + copy_length;
1043
cshare->pos_in_file= write_cache->pos_in_file;
1045
/* Mark all threads as running and wake them. */
1046
unlock_io_cache(write_cache);
1048
write_buffer+= copy_length;
1049
write_length-= copy_length;
1055
Do sequential read from the SEQ_READ_APPEND cache.
1057
We do this in three stages:
1058
- first read from info->buffer
1059
- then if there are still data to read, try the file descriptor
1060
- afterwards, if there are still data to read, try append buffer
1067
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1069
size_t length, diff_length, left_length, save_count, max_length;
1070
my_off_t pos_in_file;
1073
/* first, read the regular buffer */
1074
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1076
assert(Count > left_length); /* User is not using my_b_read() */
1077
memcpy(Buffer,info->read_pos, left_length);
1078
Buffer+=left_length;
1081
lock_append_buffer(info);
1083
/* pos_in_file always point on where info->buffer was read */
1084
if ((pos_in_file=info->pos_in_file +
1085
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1086
goto read_append_buffer;
1089
With read-append cache we must always do a seek before we read,
1090
because the write could have moved the file pointer astray
1092
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1095
unlock_append_buffer(info);
1098
info->seek_not_done=0;
1100
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1102
/* now the second stage begins - read from file descriptor */
1103
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1105
/* Fill first intern buffer */
1108
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1109
if ((read_length= my_read(info->file,Buffer, length,
1110
info->myflags)) == (size_t) -1)
1113
unlock_append_buffer(info);
1117
Buffer+=read_length;
1118
pos_in_file+=read_length;
1120
if (read_length != length)
1123
We only got part of data; Read the rest of the data from the
1126
goto read_append_buffer;
1128
left_length+=length;
1132
max_length= info->read_length-diff_length;
1133
if (max_length > (info->end_of_file - pos_in_file))
1134
max_length= (size_t) (info->end_of_file - pos_in_file);
1138
goto read_append_buffer;
1139
length=0; /* Didn't read any more chars */
1143
length= my_read(info->file,info->buffer, max_length, info->myflags);
1144
if (length == (size_t) -1)
1147
unlock_append_buffer(info);
1152
memcpy(Buffer, info->buffer, length);
1157
added the line below to make
1158
assert(pos_in_file==info->end_of_file) pass.
1159
otherwise this does not appear to be needed
1161
pos_in_file += length;
1162
goto read_append_buffer;
1165
unlock_append_buffer(info);
1166
info->read_pos=info->buffer+Count;
1167
info->read_end=info->buffer+length;
1168
info->pos_in_file=pos_in_file;
1169
memcpy(Buffer,info->buffer,(size_t) Count);
1175
Read data from the current write buffer.
1176
Count should never be == 0 here (The code will work even if count is 0)
1180
/* First copy the data to Count */
1181
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1183
size_t transfer_len;
1185
assert(info->append_read_pos <= info->write_pos);
1187
TODO: figure out if the assert below is needed or correct.
1189
assert(pos_in_file == info->end_of_file);
1190
copy_len=cmin(Count, len_in_buff);
1191
memcpy(Buffer, info->append_read_pos, copy_len);
1192
info->append_read_pos += copy_len;
1195
info->error = save_count - Count;
1197
/* Fill read buffer with data from write buffer */
1198
memcpy(info->buffer, info->append_read_pos,
1199
(size_t) (transfer_len=len_in_buff - copy_len));
1200
info->read_pos= info->buffer;
1201
info->read_end= info->buffer+transfer_len;
1202
info->append_read_pos=info->write_pos;
1203
info->pos_in_file=pos_in_file+copy_len;
1204
info->end_of_file+=len_in_buff;
1206
unlock_append_buffer(info);
1207
return Count ? 1 : 0;
541
1211
#ifdef HAVE_AIOWAIT
545
* Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
547
* @param info st_io_cache pointer
548
* @param Buffer Buffer to retrieve count bytes from file
549
* @param Count Number of bytes to read into Buffer
551
* @retval -1 An error has occurred; errno is set.
553
* @retval 1 An error has occurred; st_io_cache to error state.
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
1214
Read from the IO_CACHE into a buffer and feed asynchronously
1215
from disk when needed.
1219
info IO_CACHE pointer
1220
Buffer Buffer to retrieve count bytes from file
1221
Count Number of bytes to read into Buffer
1224
-1 An error has occurred; my_errno is set.
1226
1 An error has occurred; IO_CACHE to error state.
1229
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
557
size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1231
size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1232
size_t max_length;
559
1233
my_off_t next_pos_in_file;
560
1234
unsigned char *read_buffer;
761
1436
if (Count >= IO_SIZE)
762
1437
{ /* Fill first intern buffer */
763
length_local=Count & (size_t) ~(IO_SIZE-1);
1438
length=Count & (size_t) ~(IO_SIZE-1);
764
1439
if (info->seek_not_done)
767
Whenever a function which operates on st_io_cache flushes/writes
768
some part of the st_io_cache to disk it will set the property
1442
Whenever a function which operates on IO_CACHE flushes/writes
1443
some part of the IO_CACHE to disk it will set the property
769
1444
"seek_not_done" to indicate this to other functions operating
772
if (lseek(info->file,info->pos_in_file,SEEK_SET))
1447
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
774
1449
info->error= -1;
777
1452
info->seek_not_done=0;
779
if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
return info->error= -1;
783
Buffer+=length_local;
784
info->pos_in_file+=length_local;
786
memcpy(info->write_pos,Buffer,(size_t) Count);
787
info->write_pos+=Count;
793
* Write a block to disk where part of the data may be inside the record buffer.
794
* As all write calls to the data goes through the cache,
795
* we will never get a seek over the end of the buffer.
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
1454
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1455
return info->error= -1;
1458
In case of a shared I/O cache with a writer we normally do direct
1459
write cache to read cache copy. Simulate this here by direct
1460
caller buffer to read cache copy. Do it after the write so that
1461
the cache readers actions on the flushed part can go in parallel
1462
with the write of the extra stuff. copy_to_read_buffer()
1463
synchronizes writer and readers so that after this call the
1464
readers can act on the extra stuff while the writer can go ahead
1465
and prepare the next output. copy_to_read_buffer() relies on
1469
copy_to_read_buffer(info, Buffer, length);
1473
info->pos_in_file+=length;
1475
memcpy(info->write_pos,Buffer,(size_t) Count);
1476
info->write_pos+=Count;
1482
Append a block to the write buffer.
1483
This is done with the buffer locked to ensure that we don't read from
1484
the write buffer before we are ready with it.
1487
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1489
size_t rest_length,length;
1492
Assert that we cannot come here with a shared cache. If we do one
1493
day, we might need to add a call to copy_to_read_buffer().
1495
assert(!info->share);
1497
lock_append_buffer(info);
1498
rest_length= (size_t) (info->write_end - info->write_pos);
1499
if (Count <= rest_length)
1501
memcpy(info->write_pos, Buffer, rest_length);
1502
Buffer+=rest_length;
1504
info->write_pos+=rest_length;
1505
if (my_b_flush_io_cache(info,0))
1507
unlock_append_buffer(info);
1510
if (Count >= IO_SIZE)
1511
{ /* Fill first intern buffer */
1512
length=Count & (size_t) ~(IO_SIZE-1);
1513
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1515
unlock_append_buffer(info);
1516
return info->error= -1;
1520
info->end_of_file+=length;
1524
memcpy(info->write_pos,Buffer,(size_t) Count);
1525
info->write_pos+=Count;
1526
unlock_append_buffer(info);
1531
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1534
Sasha: We are not writing this with the ? operator to avoid hitting
1535
a possible compiler bug. At least gcc 2.95 cannot deal with
1536
several layers of ternary operators that evaluated comma(,) operator
1537
expressions inside - I do have a test case if somebody wants it
1539
if (info->type == SEQ_READ_APPEND)
1540
return my_b_append(info, Buffer, Count);
1541
return my_b_write(info, Buffer, Count);
1546
Write a block to disk where part of the data may be inside the record
1547
buffer. As all write calls to the data goes through the cache,
1548
we will never get a seek over the end of the buffer
1551
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
1558
Assert that we cannot come here with a shared cache. If we do one
1559
day, we might need to add a call to copy_to_read_buffer().
1561
assert(!info->share);
803
1563
if (pos < info->pos_in_file)
805
1565
/* Of no overlap, write everything without buffering */
806
1566
if (pos + Count <= info->pos_in_file)
807
1567
return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1568
/* Write the part of the block that is before buffer */
809
length_local= (uint32_t) (info->pos_in_file - pos);
810
if (pwrite(info->file, Buffer, length_local, pos) == 0)
1569
length= (uint) (info->pos_in_file - pos);
1570
if (pwrite(info->file, Buffer, length, pos) == 0)
811
1571
info->error= error= -1;
812
Buffer+=length_local;
814
Count-= length_local;
1576
info->seek_not_done=1;
817
1580
/* Check if we want to write inside the used part of the buffer.*/
818
length_local= (size_t) (info->write_end - info->buffer);
819
if (pos < info->pos_in_file + length_local)
1581
length= (size_t) (info->write_end - info->buffer);
1582
if (pos < info->pos_in_file + length)
821
1584
size_t offset= (size_t) (pos - info->pos_in_file);
822
length_local-=offset;
823
if (length_local > Count)
825
memcpy(info->buffer+offset, Buffer, length_local);
826
Buffer+=length_local;
827
Count-= length_local;
828
/* Fix length_local of buffer if the new data was larger */
829
if (info->buffer+length_local > info->write_pos)
830
info->write_pos=info->buffer+length_local;
1588
memcpy(info->buffer+offset, Buffer, length);
1591
/* Fix length of buffer if the new data was larger */
1592
if (info->buffer+length > info->write_pos)
1593
info->write_pos=info->buffer+length;
909
unlock_append_buffer(info, need_append_buffer_lock);
1690
UNLOCK_APPEND_BUFFER;
915
* Free an st_io_cache object
918
* It's currently safe to call this if one has called init_io_cache()
919
* on the 'info' object, even if init_io_cache() failed.
920
* This function is also safe to call twice with the same handle.
922
* @param info st_io_cache Handle to free
927
int st_io_cache::end_io_cache()
1695
Free an IO_CACHE object
1699
info IO_CACHE Handle to free
1702
It's currently safe to call this if one has called init_io_cache()
1703
on the 'info' object, even if init_io_cache() failed.
1704
This function is also safe to call twice with the same handle.
1711
int end_io_cache(IO_CACHE *info)
938
if (type == READ_CACHE)
939
global_read_buffer.sub(buffer_length);
941
if (file != -1) /* File doesn't exist */
942
_error= my_b_flush_io_cache(this, 1);
943
free((unsigned char*) buffer);
944
buffer=read_pos=(unsigned char*) 0;
1714
IO_CACHE_CALLBACK pre_close;
1717
Every thread must call remove_io_thread(). The last one destroys
1720
assert(!info->share || !info->share->total_threads);
1722
if ((pre_close=info->pre_close))
1727
if (info->alloced_buffer)
1729
info->alloced_buffer=0;
1730
if (info->file != -1) /* File doesn't exist */
1731
error= my_b_flush_io_cache(info,1);
1732
free((unsigned char*) info->buffer);
1733
info->buffer=info->read_pos=(unsigned char*) 0;
1735
if (info->type == SEQ_READ_APPEND)
1737
/* Destroy allocated mutex */
1738
info->type= TYPE_NOT_SET;
1739
pthread_mutex_destroy(&info->append_buffer_lock);
948
1742
} /* end_io_cache */
950
} /* namespace internal */
951
} /* namespace drizzled */
1745
/**********************************************************************
1746
Testing of MF_IOCACHE
1747
**********************************************************************/
1753
void die(const char* fmt, ...)
1756
va_start(va_args,fmt);
1757
fprintf(stderr,"Error:");
1758
vfprintf(stderr, fmt,va_args);
1759
fprintf(stderr,", errno=%d\n", errno);
1763
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1766
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1767
die("Could not open %s", fname);
1768
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1769
die("failed in init_io_cache()");
1773
void close_file(IO_CACHE* info)
1776
my_close(info->file, MYF(MY_WME));
1779
int main(int argc, char** argv)
1781
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1783
const char* fname="/tmp/iocache.test";
1784
int cache_size=16384;
1786
int max_block,total_bytes=0;
1787
int i,num_loops=100,error=0;
1789
char* block, *block_end;
1791
max_block = cache_size*3;
1792
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1793
die("Not enough memory to allocate test block");
1794
block_end = block + max_block;
1795
for (p = block,i=0; p < block_end;i++)
1799
if (my_stat(fname,&status, MYF(0)) &&
1800
my_delete(fname,MYF(MY_WME)))
1802
die("Delete of %s failed, aborting", fname);
1804
open_file(fname,&sra_cache, cache_size);
1805
for (i = 0; i < num_loops; i++)
1808
int block_size = abs(rand() % max_block);
1809
int4store(buf, block_size);
1810
if (my_b_append(&sra_cache,buf,4) ||
1811
my_b_append(&sra_cache, block, block_size))
1812
die("write failed");
1813
total_bytes += 4+block_size;
1815
close_file(&sra_cache);
1817
if (!my_stat(fname,&status,MYF(MY_WME)))
1818
die("%s failed to stat, but I had just closed it,\
1819
wonder how that happened");
1820
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1821
llstr(status.st_size,llstr_buf),
1823
my_delete(fname, MYF(MY_WME));
1824
/* check correctness of tests */
1825
if (total_bytes != status.st_size)
1827
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1828
supposedly written\n");