477
468
info->file is a pipe or socket or FIFO. We never should have tried
478
469
to seek on that. See Bugs#25807 and #22828 for more info.
480
assert(errno != ESPIPE);
471
assert(my_errno != ESPIPE);
486
diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
477
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
478
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
479
{ /* Fill first intern buffer */
489
480
size_t read_length;
490
if (info->end_of_file <= pos_in_file_local)
481
if (info->end_of_file <= pos_in_file)
491
482
{ /* End of file */
492
483
info->error= (int) left_length;
495
length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
486
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
487
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
498
490
info->error= (read_length == (size_t) -1 ? -1 :
499
491
(int) (read_length+left_length));
502
Count-= length_local;
503
Buffer+= length_local;
504
pos_in_file_local+= length_local;
505
left_length+= length_local;
509
501
max_length= info->read_length-diff_length;
510
502
if (info->type != READ_FIFO &&
511
max_length > (info->end_of_file - pos_in_file_local))
512
max_length= (size_t) (info->end_of_file - pos_in_file_local);
503
max_length > (info->end_of_file - pos_in_file))
504
max_length= (size_t) (info->end_of_file - pos_in_file);
517
info->error= static_cast<int>(left_length); /* We only got this many char */
509
info->error= left_length; /* We only got this many char */
520
length_local=0; /* Didn't read any chars */
512
length=0; /* Didn't read any chars */
522
else if (( length_local= my_read(info->file,info->buffer, max_length,
514
else if ((length= my_read(info->file,info->buffer, max_length,
523
515
info->myflags)) < Count ||
524
length_local == (size_t) -1)
516
length == (size_t) -1)
526
if ( length_local != (size_t) -1)
527
memcpy(Buffer, info->buffer, length_local);
528
info->pos_in_file= pos_in_file_local;
529
info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
518
if (length != (size_t) -1)
519
memcpy(Buffer, info->buffer, length);
520
info->pos_in_file= pos_in_file;
521
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
522
info->read_pos=info->read_end=info->buffer;
533
525
info->read_pos=info->buffer+Count;
534
info->read_end=info->buffer+ length_local;
535
info->pos_in_file=pos_in_file_local;
526
info->read_end=info->buffer+length;
527
info->pos_in_file=pos_in_file;
536
528
memcpy(Buffer, info->buffer, Count);
534
Prepare IO_CACHE for shared use.
537
init_io_cache_share()
538
read_cache A read cache. This will be copied for
539
every thread after setup.
541
write_cache If non-NULL a write cache that is to be
542
synchronized with the read caches.
543
num_threads Number of threads sharing the cache
544
including the write thread if any.
548
The shared cache is used so: One IO_CACHE is initialized with
549
init_io_cache(). This includes the allocation of a buffer. Then a
550
share is allocated and init_io_cache_share() is called with the io
551
cache and the share. Then the io cache is copied for each thread. So
552
every thread has its own copy of IO_CACHE. But the allocated buffer
553
is shared because cache->buffer is the same for all caches.
555
One thread reads data from the file into the buffer. All threads
556
read from the buffer, but every thread maintains its own set of
557
pointers into the buffer. When all threads have used up the buffer
558
contents, one of the threads reads the next block of data into the
559
buffer. To accomplish this, each thread enters the cache lock before
560
accessing the buffer. They wait in lock_io_cache() until all threads
561
joined the lock. The last thread entering the lock is in charge of
562
reading from file to buffer. It wakes all threads when done.
564
Synchronizing a write cache to the read caches works so: Whenever
565
the write buffer needs a flush, the write thread enters the lock and
566
waits for all other threads to enter the lock too. They do this when
567
they have used up the read buffer. When all threads are in the lock,
568
the write thread copies the write buffer to the read buffer and
571
share->running_threads is the number of threads not being in the
572
cache lock. When entering lock_io_cache() the number is decreased.
573
When the thread that fills the buffer enters unlock_io_cache() the
574
number is reset to the number of threads. The condition
575
running_threads == 0 means that all threads are in the lock. Bumping
576
up the number to the full count is non-intuitive. But increasing the
577
number by one for each thread that leaves the lock could lead to a
578
solo run of one thread. The last thread to join a lock reads from
579
file to buffer, wakes the other threads, processes the data in the
580
cache and enters the lock again. If no other thread left the lock
581
meanwhile, it would think it's the last one again and read the next
584
The share has copies of 'error', 'buffer', 'read_end', and
585
'pos_in_file' from the thread that filled the buffer. We may not be
586
able to access this information directly from its cache because the
587
thread may be removed from the share before the variables could be
588
copied by all other threads. Or, if a write buffer is synchronized,
589
it would change its 'pos_in_file' after waking the other threads,
590
possibly before they could copy its value.
592
However, the 'buffer' variable in the share is for a synchronized
593
write cache. It needs to know where to put the data. Otherwise it
594
would need access to the read cache of one of the threads that is
595
not yet removed from the share.
601
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
602
IO_CACHE *write_cache, uint32_t num_threads)
604
assert(num_threads > 1);
605
assert(read_cache->type == READ_CACHE);
606
assert(!write_cache || (write_cache->type == WRITE_CACHE));
608
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
609
pthread_cond_init(&cshare->cond, 0);
610
pthread_cond_init(&cshare->cond_writer, 0);
612
cshare->running_threads= num_threads;
613
cshare->total_threads= num_threads;
614
cshare->error= 0; /* Initialize. */
615
cshare->buffer= read_cache->buffer;
616
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
617
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
618
cshare->source_cache= write_cache; /* Can be NULL. */
620
read_cache->share= cshare;
621
read_cache->read_function= _my_b_read_r;
622
read_cache->current_pos= NULL;
623
read_cache->current_end= NULL;
626
write_cache->share= cshare;
633
Remove a thread from shared access to IO_CACHE.
637
cache The IO_CACHE to be removed from the share.
641
Every thread must do that on exit for not to deadlock other threads.
643
The last thread destroys the pthread resources.
645
A writer flushes its cache first.
651
void remove_io_thread(IO_CACHE *cache)
653
IO_CACHE_SHARE *cshare= cache->share;
656
/* If the writer goes, it needs to flush the write cache. */
657
if (cache == cshare->source_cache)
658
flush_io_cache(cache);
660
pthread_mutex_lock(&cshare->mutex);
662
/* Remove from share. */
663
total= --cshare->total_threads;
665
/* Detach from share. */
668
/* If the writer goes, let the readers know. */
669
if (cache == cshare->source_cache)
671
cshare->source_cache= NULL;
674
/* If all threads are waiting for me to join the lock, wake them. */
675
if (!--cshare->running_threads)
677
pthread_cond_signal(&cshare->cond_writer);
678
pthread_cond_broadcast(&cshare->cond);
681
pthread_mutex_unlock(&cshare->mutex);
685
pthread_cond_destroy (&cshare->cond_writer);
686
pthread_cond_destroy (&cshare->cond);
687
pthread_mutex_destroy(&cshare->mutex);
695
Lock IO cache and wait for all other threads to join.
699
cache The cache of the thread entering the lock.
700
pos File position of the block to read.
701
Unused for the write thread.
705
Wait for all threads to finish with the current buffer. We want
706
all threads to proceed in concert. The last thread to join
707
lock_io_cache() will read the block from file and all threads start
708
to use it. Then they will join again for reading the next block.
710
The waiting threads detect a fresh buffer by comparing
711
cshare->pos_in_file with the position they want to process next.
712
Since the first block may start at position 0, we take
713
cshare->read_end as an additional condition. This variable is
714
initialized to NULL and will be set after a block of data is written
718
1 OK, lock in place, go ahead and read.
719
0 OK, unlocked, another thread did the read.
722
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
724
IO_CACHE_SHARE *cshare= cache->share;
726
/* Enter the lock. */
727
pthread_mutex_lock(&cshare->mutex);
728
cshare->running_threads--;
730
if (cshare->source_cache)
732
/* A write cache is synchronized to the read caches. */
734
if (cache == cshare->source_cache)
736
/* The writer waits until all readers are here. */
737
while (cshare->running_threads)
739
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
741
/* Stay locked. Leave the lock later by unlock_io_cache(). */
745
/* The last thread wakes the writer. */
746
if (!cshare->running_threads)
748
pthread_cond_signal(&cshare->cond_writer);
752
Readers wait until the data is copied from the writer. Another
753
reason to stop waiting is the removal of the write thread. If this
754
happens, we leave the lock with old data in the buffer.
756
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
757
cshare->source_cache)
759
pthread_cond_wait(&cshare->cond, &cshare->mutex);
763
If the writer was removed from the share while this thread was
764
asleep, we need to simulate an EOF condition. The writer cannot
765
reset the share variables as they might still be in use by readers
766
of the last block. When we awake here then because the last
767
joining thread signalled us. If the writer is not the last, it
768
will not signal. So it is safe to clear the buffer here.
770
if (!cshare->read_end || (cshare->pos_in_file < pos))
772
cshare->read_end= cshare->buffer; /* Empty buffer. */
773
cshare->error= 0; /* EOF is not an error. */
779
There are read caches only. The last thread arriving in
780
lock_io_cache() continues with a locked cache and reads the block.
782
if (!cshare->running_threads)
784
/* Stay locked. Leave the lock later by unlock_io_cache(). */
789
All other threads wait until the requested block is read by the
790
last thread arriving. Another reason to stop waiting is the
791
removal of a thread. If this leads to all threads being in the
792
lock, we have to continue also. The first of the awaken threads
793
will then do the read.
795
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
796
cshare->running_threads)
798
pthread_cond_wait(&cshare->cond, &cshare->mutex);
801
/* If the block is not yet read, continue with a locked cache and read. */
802
if (!cshare->read_end || (cshare->pos_in_file < pos))
804
/* Stay locked. Leave the lock later by unlock_io_cache(). */
808
/* Another thread did read the block already. */
812
Leave the lock. Do not call unlock_io_cache() later. The thread that
813
filled the buffer did this and marked all threads as running.
815
pthread_mutex_unlock(&cshare->mutex);
825
cache The cache of the thread leaving the lock.
828
This is called by the thread that filled the buffer. It marks all
829
threads as running and awakes them. This must not be done by any
832
Do not signal cond_writer. Either there is no writer or the writer
833
is the only one who can call this function.
835
The reason for resetting running_threads to total_threads before
836
waking all other threads is that it could be possible that this
837
thread is so fast with processing the buffer that it enters the lock
838
before even one other thread has left it. If every awoken thread
839
would increase running_threads by one, this thread could think that
840
he is again the last to join and would not wait for the other
841
threads to process the data.
847
static void unlock_io_cache(IO_CACHE *cache)
849
IO_CACHE_SHARE *cshare= cache->share;
851
cshare->running_threads= cshare->total_threads;
852
pthread_cond_broadcast(&cshare->cond);
853
pthread_mutex_unlock(&cshare->mutex);
859
Read from IO_CACHE when it is shared between several threads.
863
cache IO_CACHE pointer
864
Buffer Buffer to retrieve count bytes from file
865
Count Number of bytes to read into Buffer
868
This function is only called from the my_b_read() macro when there
869
isn't enough characters in the buffer to satisfy the request.
873
It works as follows: when a thread tries to read from a file (that
874
is, after using all the data from the (shared) buffer), it just
875
hangs on lock_io_cache(), waiting for other threads. When the very
876
last thread attempts a read, lock_io_cache() returns 1, the thread
877
does actual IO and unlock_io_cache(), which signals all the waiting
878
threads that data is in the buffer.
882
When changing this function, be careful with handling file offsets
883
(end-of_file, pos_in_file). Do not cast them to possibly smaller
884
types than my_off_t unless you can be sure that their value fits.
885
Same applies to differences of file offsets. (Bug #11527)
887
When changing this function, check _my_b_read(). It might need the
891
0 we succeeded in reading all data
892
1 Error: can't read requested characters
895
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
897
my_off_t pos_in_file;
898
size_t length, diff_length, left_length;
899
IO_CACHE_SHARE *cshare= cache->share;
901
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
903
assert(Count >= left_length); /* User is not using my_b_read() */
904
memcpy(Buffer, cache->read_pos, left_length);
905
Buffer+= left_length;
912
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
913
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
914
length=IO_ROUND_UP(Count+diff_length)-diff_length;
915
length= ((length <= cache->read_length) ?
916
length + IO_ROUND_DN(cache->read_length - length) :
917
length - IO_ROUND_UP(length - cache->read_length));
918
if (cache->type != READ_FIFO &&
919
(length > (cache->end_of_file - pos_in_file)))
920
length= (size_t) (cache->end_of_file - pos_in_file);
923
cache->error= (int) left_length;
926
if (lock_io_cache(cache, pos_in_file))
928
/* With a synchronized write/read cache we won't come here... */
929
assert(!cshare->source_cache);
931
... unless the writer has gone before this thread entered the
932
lock. Simulate EOF in this case. It can be distinguished by
940
Whenever a function which operates on IO_CACHE flushes/writes
941
some part of the IO_CACHE to disk it will set the property
942
"seek_not_done" to indicate this to other functions operating
945
if (cache->seek_not_done)
947
if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
950
unlock_io_cache(cache);
954
len= my_read(cache->file, cache->buffer, length, cache->myflags);
956
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
957
cache->error= (len == length ? 0 : (int) len);
958
cache->pos_in_file= pos_in_file;
960
/* Copy important values to the share. */
961
cshare->error= cache->error;
962
cshare->read_end= cache->read_end;
963
cshare->pos_in_file= pos_in_file;
965
/* Mark all threads as running and wake them. */
966
unlock_io_cache(cache);
971
With a synchronized write/read cache readers always come here.
972
Copy important values from the share.
974
cache->error= cshare->error;
975
cache->read_end= cshare->read_end;
976
cache->pos_in_file= cshare->pos_in_file;
978
len= ((cache->error == -1) ? (size_t) -1 :
979
(size_t) (cache->read_end - cache->buffer));
981
cache->read_pos= cache->buffer;
982
cache->seek_not_done= 0;
983
if (len == 0 || len == (size_t) -1)
985
cache->error= (int) left_length;
988
cnt= (len > Count) ? Count : len;
989
memcpy(Buffer, cache->read_pos, cnt);
993
cache->read_pos+= cnt;
1000
Copy data from write cache to read cache.
1003
copy_to_read_buffer()
1004
write_cache The write cache.
1005
write_buffer The source of data, mostly the cache buffer.
1006
write_length The number of bytes to copy.
1009
The write thread will wait for all read threads to join the cache
1010
lock. Then it copies the data over and wakes the read threads.
1016
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
const unsigned char *write_buffer, size_t write_length)
1019
IO_CACHE_SHARE *cshare= write_cache->share;
1021
assert(cshare->source_cache == write_cache);
1023
write_length is usually less or equal to buffer_length.
1024
It can be bigger if _my_b_write() is called with a big length.
1026
while (write_length)
1028
size_t copy_length= cmin(write_length, write_cache->buffer_length);
1031
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
/* The writing thread does always have the lock when it awakes. */
1035
memcpy(cshare->buffer, write_buffer, copy_length);
1038
cshare->read_end= cshare->buffer + copy_length;
1039
cshare->pos_in_file= write_cache->pos_in_file;
1041
/* Mark all threads as running and wake them. */
1042
unlock_io_cache(write_cache);
1044
write_buffer+= copy_length;
1045
write_length-= copy_length;
1051
Do sequential read from the SEQ_READ_APPEND cache.
1053
We do this in three stages:
1054
- first read from info->buffer
1055
- then if there are still data to read, try the file descriptor
1056
- afterwards, if there are still data to read, try append buffer
1063
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1065
size_t length, diff_length, left_length, save_count, max_length;
1066
my_off_t pos_in_file;
1069
/* first, read the regular buffer */
1070
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1072
assert(Count > left_length); /* User is not using my_b_read() */
1073
memcpy(Buffer,info->read_pos, left_length);
1074
Buffer+=left_length;
1077
lock_append_buffer(info);
1079
/* pos_in_file always point on where info->buffer was read */
1080
if ((pos_in_file=info->pos_in_file +
1081
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1082
goto read_append_buffer;
1085
With read-append cache we must always do a seek before we read,
1086
because the write could have moved the file pointer astray
1088
if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1091
unlock_append_buffer(info);
1094
info->seek_not_done=0;
1096
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1098
/* now the second stage begins - read from file descriptor */
1099
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1101
/* Fill first intern buffer */
1104
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1105
if ((read_length= my_read(info->file,Buffer, length,
1106
info->myflags)) == (size_t) -1)
1109
unlock_append_buffer(info);
1113
Buffer+=read_length;
1114
pos_in_file+=read_length;
1116
if (read_length != length)
1119
We only got part of data; Read the rest of the data from the
1122
goto read_append_buffer;
1124
left_length+=length;
1128
max_length= info->read_length-diff_length;
1129
if (max_length > (info->end_of_file - pos_in_file))
1130
max_length= (size_t) (info->end_of_file - pos_in_file);
1134
goto read_append_buffer;
1135
length=0; /* Didn't read any more chars */
1139
length= my_read(info->file,info->buffer, max_length, info->myflags);
1140
if (length == (size_t) -1)
1143
unlock_append_buffer(info);
1148
memcpy(Buffer, info->buffer, length);
1153
added the line below to make
1154
assert(pos_in_file==info->end_of_file) pass.
1155
otherwise this does not appear to be needed
1157
pos_in_file += length;
1158
goto read_append_buffer;
1161
unlock_append_buffer(info);
1162
info->read_pos=info->buffer+Count;
1163
info->read_end=info->buffer+length;
1164
info->pos_in_file=pos_in_file;
1165
memcpy(Buffer,info->buffer,(size_t) Count);
1171
Read data from the current write buffer.
1172
Count should never be == 0 here (The code will work even if count is 0)
1176
/* First copy the data to Count */
1177
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1179
size_t transfer_len;
1181
assert(info->append_read_pos <= info->write_pos);
1183
TODO: figure out if the assert below is needed or correct.
1185
assert(pos_in_file == info->end_of_file);
1186
copy_len=cmin(Count, len_in_buff);
1187
memcpy(Buffer, info->append_read_pos, copy_len);
1188
info->append_read_pos += copy_len;
1191
info->error = save_count - Count;
1193
/* Fill read buffer with data from write buffer */
1194
memcpy(info->buffer, info->append_read_pos,
1195
(size_t) (transfer_len=len_in_buff - copy_len));
1196
info->read_pos= info->buffer;
1197
info->read_end= info->buffer+transfer_len;
1198
info->append_read_pos=info->write_pos;
1199
info->pos_in_file=pos_in_file+copy_len;
1200
info->end_of_file+=len_in_buff;
1202
unlock_append_buffer(info);
1203
return Count ? 1 : 0;
541
1207
#ifdef HAVE_AIOWAIT
545
* Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
547
* @param info st_io_cache pointer
548
* @param Buffer Buffer to retrieve count bytes from file
549
* @param Count Number of bytes to read into Buffer
551
* @retval -1 An error has occurred; errno is set.
553
* @retval 1 An error has occurred; st_io_cache to error state.
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
1210
Read from the IO_CACHE into a buffer and feed asynchronously
1211
from disk when needed.
1215
info IO_CACHE pointer
1216
Buffer Buffer to retrieve count bytes from file
1217
Count Number of bytes to read into Buffer
1220
-1 An error has occurred; my_errno is set.
1222
1 An error has occurred; IO_CACHE to error state.
1225
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
557
size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1227
size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1228
size_t max_length;
559
1229
my_off_t next_pos_in_file;
560
1230
unsigned char *read_buffer;
777
1447
info->seek_not_done=0;
779
if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
return info->error= -1;
783
Buffer+=length_local;
784
info->pos_in_file+=length_local;
786
memcpy(info->write_pos,Buffer,(size_t) Count);
787
info->write_pos+=Count;
793
* Write a block to disk where part of the data may be inside the record buffer.
794
* As all write calls to the data goes through the cache,
795
* we will never get a seek over the end of the buffer.
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
1449
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1450
return info->error= -1;
1453
In case of a shared I/O cache with a writer we normally do direct
1454
write cache to read cache copy. Simulate this here by direct
1455
caller buffer to read cache copy. Do it after the write so that
1456
the cache readers actions on the flushed part can go in parallel
1457
with the write of the extra stuff. copy_to_read_buffer()
1458
synchronizes writer and readers so that after this call the
1459
readers can act on the extra stuff while the writer can go ahead
1460
and prepare the next output. copy_to_read_buffer() relies on
1464
copy_to_read_buffer(info, Buffer, length);
1468
info->pos_in_file+=length;
1470
memcpy(info->write_pos,Buffer,(size_t) Count);
1471
info->write_pos+=Count;
1477
Append a block to the write buffer.
1478
This is done with the buffer locked to ensure that we don't read from
1479
the write buffer before we are ready with it.
1482
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1484
size_t rest_length,length;
1487
Assert that we cannot come here with a shared cache. If we do one
1488
day, we might need to add a call to copy_to_read_buffer().
1490
assert(!info->share);
1492
lock_append_buffer(info);
1493
rest_length= (size_t) (info->write_end - info->write_pos);
1494
if (Count <= rest_length)
1496
memcpy(info->write_pos, Buffer, rest_length);
1497
Buffer+=rest_length;
1499
info->write_pos+=rest_length;
1500
if (my_b_flush_io_cache(info,0))
1502
unlock_append_buffer(info);
1505
if (Count >= IO_SIZE)
1506
{ /* Fill first intern buffer */
1507
length=Count & (size_t) ~(IO_SIZE-1);
1508
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1510
unlock_append_buffer(info);
1511
return info->error= -1;
1515
info->end_of_file+=length;
1519
memcpy(info->write_pos,Buffer,(size_t) Count);
1520
info->write_pos+=Count;
1521
unlock_append_buffer(info);
1526
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1529
Sasha: We are not writing this with the ? operator to avoid hitting
1530
a possible compiler bug. At least gcc 2.95 cannot deal with
1531
several layers of ternary operators that evaluated comma(,) operator
1532
expressions inside - I do have a test case if somebody wants it
1534
if (info->type == SEQ_READ_APPEND)
1535
return my_b_append(info, Buffer, Count);
1536
return my_b_write(info, Buffer, Count);
1541
Write a block to disk where part of the data may be inside the record
1542
buffer. As all write calls to the data goes through the cache,
1543
we will never get a seek over the end of the buffer
1546
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
1553
Assert that we cannot come here with a shared cache. If we do one
1554
day, we might need to add a call to copy_to_read_buffer().
1556
assert(!info->share);
803
1558
if (pos < info->pos_in_file)
805
1560
/* Of no overlap, write everything without buffering */
806
1561
if (pos + Count <= info->pos_in_file)
807
1562
return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1563
/* Write the part of the block that is before buffer */
809
length_local= (uint32_t) (info->pos_in_file - pos);
810
if (pwrite(info->file, Buffer, length_local, pos) == 0)
1564
length= (uint) (info->pos_in_file - pos);
1565
if (pwrite(info->file, Buffer, length, pos) == 0)
811
1566
info->error= error= -1;
812
Buffer+=length_local;
814
Count-= length_local;
1571
info->seek_not_done=1;
817
1575
/* Check if we want to write inside the used part of the buffer.*/
818
length_local= (size_t) (info->write_end - info->buffer);
819
if (pos < info->pos_in_file + length_local)
1576
length= (size_t) (info->write_end - info->buffer);
1577
if (pos < info->pos_in_file + length)
821
1579
size_t offset= (size_t) (pos - info->pos_in_file);
822
length_local-=offset;
823
if (length_local > Count)
825
memcpy(info->buffer+offset, Buffer, length_local);
826
Buffer+=length_local;
827
Count-= length_local;
828
/* Fix length_local of buffer if the new data was larger */
829
if (info->buffer+length_local > info->write_pos)
830
info->write_pos=info->buffer+length_local;
1583
memcpy(info->buffer+offset, Buffer, length);
1586
/* Fix length of buffer if the new data was larger */
1587
if (info->buffer+length > info->write_pos)
1588
info->write_pos=info->buffer+length;
909
unlock_append_buffer(info, need_append_buffer_lock);
1684
UNLOCK_APPEND_BUFFER;
915
* Free an st_io_cache object
918
* It's currently safe to call this if one has called init_io_cache()
919
* on the 'info' object, even if init_io_cache() failed.
920
* This function is also safe to call twice with the same handle.
922
* @param info st_io_cache Handle to free
927
int st_io_cache::end_io_cache()
1689
Free an IO_CACHE object
1693
info IO_CACHE Handle to free
1696
It's currently safe to call this if one has called init_io_cache()
1697
on the 'info' object, even if init_io_cache() failed.
1698
This function is also safe to call twice with the same handle.
1705
int end_io_cache(IO_CACHE *info)
938
if (type == READ_CACHE)
939
global_read_buffer.sub(buffer_length);
941
if (file != -1) /* File doesn't exist */
942
_error= my_b_flush_io_cache(this, 1);
943
free((unsigned char*) buffer);
944
buffer=read_pos=(unsigned char*) 0;
1708
IO_CACHE_CALLBACK pre_close;
1711
Every thread must call remove_io_thread(). The last one destroys
1714
assert(!info->share || !info->share->total_threads);
1716
if ((pre_close=info->pre_close))
1721
if (info->alloced_buffer)
1723
info->alloced_buffer=0;
1724
if (info->file != -1) /* File doesn't exist */
1725
error= my_b_flush_io_cache(info,1);
1726
free((unsigned char*) info->buffer);
1727
info->buffer=info->read_pos=(unsigned char*) 0;
1729
if (info->type == SEQ_READ_APPEND)
1731
/* Destroy allocated mutex */
1732
info->type= TYPE_NOT_SET;
1733
pthread_mutex_destroy(&info->append_buffer_lock);
948
1736
} /* end_io_cache */
950
} /* namespace internal */
951
} /* namespace drizzled */
1739
/**********************************************************************
1740
Testing of MF_IOCACHE
1741
**********************************************************************/
1747
void die(const char* fmt, ...)
1750
va_start(va_args,fmt);
1751
fprintf(stderr,"Error:");
1752
vfprintf(stderr, fmt,va_args);
1753
fprintf(stderr,", errno=%d\n", errno);
1757
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1760
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1761
die("Could not open %s", fname);
1762
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1763
die("failed in init_io_cache()");
1767
void close_file(IO_CACHE* info)
1770
my_close(info->file, MYF(MY_WME));
1773
int main(int argc, char** argv)
1775
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1777
const char* fname="/tmp/iocache.test";
1778
int cache_size=16384;
1780
int max_block,total_bytes=0;
1781
int i,num_loops=100,error=0;
1783
char* block, *block_end;
1785
max_block = cache_size*3;
1786
if (!(block=(char*)malloc(max_block)))
1787
die("Not enough memory to allocate test block");
1788
block_end = block + max_block;
1789
for (p = block,i=0; p < block_end;i++)
1793
if (my_stat(fname,&status, MYF(0)) &&
1794
my_delete(fname,MYF(MY_WME)))
1796
die("Delete of %s failed, aborting", fname);
1798
open_file(fname,&sra_cache, cache_size);
1799
for (i = 0; i < num_loops; i++)
1802
int block_size = abs(rand() % max_block);
1803
int4store(buf, block_size);
1804
if (my_b_append(&sra_cache,buf,4) ||
1805
my_b_append(&sra_cache, block, block_size))
1806
die("write failed");
1807
total_bytes += 4+block_size;
1809
close_file(&sra_cache);
1811
if (!my_stat(fname,&status,MYF(MY_WME)))
1812
die("%s failed to stat, but I had just closed it,\
1813
wonder how that happened");
1814
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1815
llstr(status.st_size,llstr_buf),
1817
my_delete(fname, MYF(MY_WME));
1818
/* check correctness of tests */
1819
if (total_bytes != status.st_size)
1821
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1822
supposedly written\n");