463
477
info->file is a pipe or socket or FIFO. We never should have tried
464
478
to seek on that. See Bugs#25807 and #22828 for more info.
466
assert(my_errno != ESPIPE);
480
assert(errno != ESPIPE);
472
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
486
diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
473
487
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
474
488
{ /* Fill first intern buffer */
475
489
size_t read_length;
476
if (info->end_of_file <= pos_in_file)
490
if (info->end_of_file <= pos_in_file_local)
477
491
{ /* End of file */
478
492
info->error= (int) left_length;
481
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
482
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
495
length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
485
498
info->error= (read_length == (size_t) -1 ? -1 :
486
499
(int) (read_length+left_length));
502
Count-= length_local;
503
Buffer+= length_local;
504
pos_in_file_local+= length_local;
505
left_length+= length_local;
496
509
max_length= info->read_length-diff_length;
497
510
if (info->type != READ_FIFO &&
498
max_length > (info->end_of_file - pos_in_file))
499
max_length= (size_t) (info->end_of_file - pos_in_file);
511
max_length > (info->end_of_file - pos_in_file_local))
512
max_length= (size_t) (info->end_of_file - pos_in_file_local);
504
info->error= left_length; /* We only got this many char */
517
info->error= static_cast<int>(left_length); /* We only got this many char */
507
length=0; /* Didn't read any chars */
520
length_local=0; /* Didn't read any chars */
509
else if ((length= my_read(info->file,info->buffer, max_length,
522
else if (( length_local= my_read(info->file,info->buffer, max_length,
510
523
info->myflags)) < Count ||
511
length == (size_t) -1)
524
length_local == (size_t) -1)
513
if (length != (size_t) -1)
514
memcpy(Buffer, info->buffer, length);
515
info->pos_in_file= pos_in_file;
516
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
526
if ( length_local != (size_t) -1)
527
memcpy(Buffer, info->buffer, length_local);
528
info->pos_in_file= pos_in_file_local;
529
info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
517
530
info->read_pos=info->read_end=info->buffer;
520
533
info->read_pos=info->buffer+Count;
521
info->read_end=info->buffer+length;
522
info->pos_in_file=pos_in_file;
534
info->read_end=info->buffer+ length_local;
535
info->pos_in_file=pos_in_file_local;
523
536
memcpy(Buffer, info->buffer, Count);
529
Prepare IO_CACHE for shared use.
532
init_io_cache_share()
533
read_cache A read cache. This will be copied for
534
every thread after setup.
536
write_cache If non-NULL a write cache that is to be
537
synchronized with the read caches.
538
num_threads Number of threads sharing the cache
539
including the write thread if any.
543
The shared cache is used so: One IO_CACHE is initialized with
544
init_io_cache(). This includes the allocation of a buffer. Then a
545
share is allocated and init_io_cache_share() is called with the io
546
cache and the share. Then the io cache is copied for each thread. So
547
every thread has its own copy of IO_CACHE. But the allocated buffer
548
is shared because cache->buffer is the same for all caches.
550
One thread reads data from the file into the buffer. All threads
551
read from the buffer, but every thread maintains its own set of
552
pointers into the buffer. When all threads have used up the buffer
553
contents, one of the threads reads the next block of data into the
554
buffer. To accomplish this, each thread enters the cache lock before
555
accessing the buffer. They wait in lock_io_cache() until all threads
556
joined the lock. The last thread entering the lock is in charge of
557
reading from file to buffer. It wakes all threads when done.
559
Synchronizing a write cache to the read caches works so: Whenever
560
the write buffer needs a flush, the write thread enters the lock and
561
waits for all other threads to enter the lock too. They do this when
562
they have used up the read buffer. When all threads are in the lock,
563
the write thread copies the write buffer to the read buffer and
566
share->running_threads is the number of threads not being in the
567
cache lock. When entering lock_io_cache() the number is decreased.
568
When the thread that fills the buffer enters unlock_io_cache() the
569
number is reset to the number of threads. The condition
570
running_threads == 0 means that all threads are in the lock. Bumping
571
up the number to the full count is non-intuitive. But increasing the
572
number by one for each thread that leaves the lock could lead to a
573
solo run of one thread. The last thread to join a lock reads from
574
file to buffer, wakes the other threads, processes the data in the
575
cache and enters the lock again. If no other thread left the lock
576
meanwhile, it would think it's the last one again and read the next
579
The share has copies of 'error', 'buffer', 'read_end', and
580
'pos_in_file' from the thread that filled the buffer. We may not be
581
able to access this information directly from its cache because the
582
thread may be removed from the share before the variables could be
583
copied by all other threads. Or, if a write buffer is synchronized,
584
it would change its 'pos_in_file' after waking the other threads,
585
possibly before they could copy its value.
587
However, the 'buffer' variable in the share is for a synchronized
588
write cache. It needs to know where to put the data. Otherwise it
589
would need access to the read cache of one of the threads that is
590
not yet removed from the share.
596
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
597
IO_CACHE *write_cache, uint32_t num_threads)
599
assert(num_threads > 1);
600
assert(read_cache->type == READ_CACHE);
601
assert(!write_cache || (write_cache->type == WRITE_CACHE));
603
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
604
pthread_cond_init(&cshare->cond, 0);
605
pthread_cond_init(&cshare->cond_writer, 0);
607
cshare->running_threads= num_threads;
608
cshare->total_threads= num_threads;
609
cshare->error= 0; /* Initialize. */
610
cshare->buffer= read_cache->buffer;
611
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
612
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
613
cshare->source_cache= write_cache; /* Can be NULL. */
615
read_cache->share= cshare;
616
read_cache->read_function= _my_b_read_r;
617
read_cache->current_pos= NULL;
618
read_cache->current_end= NULL;
621
write_cache->share= cshare;
628
Remove a thread from shared access to IO_CACHE.
632
cache The IO_CACHE to be removed from the share.
636
Every thread must do that on exit for not to deadlock other threads.
638
The last thread destroys the pthread resources.
640
A writer flushes its cache first.
646
void remove_io_thread(IO_CACHE *cache)
648
IO_CACHE_SHARE *cshare= cache->share;
651
/* If the writer goes, it needs to flush the write cache. */
652
if (cache == cshare->source_cache)
653
flush_io_cache(cache);
655
pthread_mutex_lock(&cshare->mutex);
657
/* Remove from share. */
658
total= --cshare->total_threads;
660
/* Detach from share. */
663
/* If the writer goes, let the readers know. */
664
if (cache == cshare->source_cache)
666
cshare->source_cache= NULL;
669
/* If all threads are waiting for me to join the lock, wake them. */
670
if (!--cshare->running_threads)
672
pthread_cond_signal(&cshare->cond_writer);
673
pthread_cond_broadcast(&cshare->cond);
676
pthread_mutex_unlock(&cshare->mutex);
680
pthread_cond_destroy (&cshare->cond_writer);
681
pthread_cond_destroy (&cshare->cond);
682
pthread_mutex_destroy(&cshare->mutex);
690
Lock IO cache and wait for all other threads to join.
694
cache The cache of the thread entering the lock.
695
pos File position of the block to read.
696
Unused for the write thread.
700
Wait for all threads to finish with the current buffer. We want
701
all threads to proceed in concert. The last thread to join
702
lock_io_cache() will read the block from file and all threads start
703
to use it. Then they will join again for reading the next block.
705
The waiting threads detect a fresh buffer by comparing
706
cshare->pos_in_file with the position they want to process next.
707
Since the first block may start at position 0, we take
708
cshare->read_end as an additional condition. This variable is
709
initialized to NULL and will be set after a block of data is written
713
1 OK, lock in place, go ahead and read.
714
0 OK, unlocked, another thread did the read.
717
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
719
IO_CACHE_SHARE *cshare= cache->share;
721
/* Enter the lock. */
722
pthread_mutex_lock(&cshare->mutex);
723
cshare->running_threads--;
725
if (cshare->source_cache)
727
/* A write cache is synchronized to the read caches. */
729
if (cache == cshare->source_cache)
731
/* The writer waits until all readers are here. */
732
while (cshare->running_threads)
734
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
736
/* Stay locked. Leave the lock later by unlock_io_cache(). */
740
/* The last thread wakes the writer. */
741
if (!cshare->running_threads)
743
pthread_cond_signal(&cshare->cond_writer);
747
Readers wait until the data is copied from the writer. Another
748
reason to stop waiting is the removal of the write thread. If this
749
happens, we leave the lock with old data in the buffer.
751
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
752
cshare->source_cache)
754
pthread_cond_wait(&cshare->cond, &cshare->mutex);
758
If the writer was removed from the share while this thread was
759
asleep, we need to simulate an EOF condition. The writer cannot
760
reset the share variables as they might still be in use by readers
761
of the last block. When we awake here then because the last
762
joining thread signalled us. If the writer is not the last, it
763
will not signal. So it is safe to clear the buffer here.
765
if (!cshare->read_end || (cshare->pos_in_file < pos))
767
cshare->read_end= cshare->buffer; /* Empty buffer. */
768
cshare->error= 0; /* EOF is not an error. */
774
There are read caches only. The last thread arriving in
775
lock_io_cache() continues with a locked cache and reads the block.
777
if (!cshare->running_threads)
779
/* Stay locked. Leave the lock later by unlock_io_cache(). */
784
All other threads wait until the requested block is read by the
785
last thread arriving. Another reason to stop waiting is the
786
removal of a thread. If this leads to all threads being in the
787
lock, we have to continue also. The first of the awaken threads
788
will then do the read.
790
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
791
cshare->running_threads)
793
pthread_cond_wait(&cshare->cond, &cshare->mutex);
796
/* If the block is not yet read, continue with a locked cache and read. */
797
if (!cshare->read_end || (cshare->pos_in_file < pos))
799
/* Stay locked. Leave the lock later by unlock_io_cache(). */
803
/* Another thread did read the block already. */
807
Leave the lock. Do not call unlock_io_cache() later. The thread that
808
filled the buffer did this and marked all threads as running.
810
pthread_mutex_unlock(&cshare->mutex);
820
cache The cache of the thread leaving the lock.
823
This is called by the thread that filled the buffer. It marks all
824
threads as running and awakes them. This must not be done by any
827
Do not signal cond_writer. Either there is no writer or the writer
828
is the only one who can call this function.
830
The reason for resetting running_threads to total_threads before
831
waking all other threads is that it could be possible that this
832
thread is so fast with processing the buffer that it enters the lock
833
before even one other thread has left it. If every awoken thread
834
would increase running_threads by one, this thread could think that
835
he is again the last to join and would not wait for the other
836
threads to process the data.
842
static void unlock_io_cache(IO_CACHE *cache)
844
IO_CACHE_SHARE *cshare= cache->share;
846
cshare->running_threads= cshare->total_threads;
847
pthread_cond_broadcast(&cshare->cond);
848
pthread_mutex_unlock(&cshare->mutex);
854
Read from IO_CACHE when it is shared between several threads.
858
cache IO_CACHE pointer
859
Buffer Buffer to retrieve count bytes from file
860
Count Number of bytes to read into Buffer
863
This function is only called from the my_b_read() macro when there
864
isn't enough characters in the buffer to satisfy the request.
868
It works as follows: when a thread tries to read from a file (that
869
is, after using all the data from the (shared) buffer), it just
870
hangs on lock_io_cache(), waiting for other threads. When the very
871
last thread attempts a read, lock_io_cache() returns 1, the thread
872
does actual IO and unlock_io_cache(), which signals all the waiting
873
threads that data is in the buffer.
877
When changing this function, be careful with handling file offsets
878
(end-of_file, pos_in_file). Do not cast them to possibly smaller
879
types than my_off_t unless you can be sure that their value fits.
880
Same applies to differences of file offsets. (Bug #11527)
882
When changing this function, check _my_b_read(). It might need the
886
0 we succeeded in reading all data
887
1 Error: can't read requested characters
890
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
892
my_off_t pos_in_file;
893
size_t length, diff_length, left_length;
894
IO_CACHE_SHARE *cshare= cache->share;
896
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
898
assert(Count >= left_length); /* User is not using my_b_read() */
899
memcpy(Buffer, cache->read_pos, left_length);
900
Buffer+= left_length;
907
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
908
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
909
length=IO_ROUND_UP(Count+diff_length)-diff_length;
910
length= ((length <= cache->read_length) ?
911
length + IO_ROUND_DN(cache->read_length - length) :
912
length - IO_ROUND_UP(length - cache->read_length));
913
if (cache->type != READ_FIFO &&
914
(length > (cache->end_of_file - pos_in_file)))
915
length= (size_t) (cache->end_of_file - pos_in_file);
918
cache->error= (int) left_length;
921
if (lock_io_cache(cache, pos_in_file))
923
/* With a synchronized write/read cache we won't come here... */
924
assert(!cshare->source_cache);
926
... unless the writer has gone before this thread entered the
927
lock. Simulate EOF in this case. It can be distinguished by
935
Whenever a function which operates on IO_CACHE flushes/writes
936
some part of the IO_CACHE to disk it will set the property
937
"seek_not_done" to indicate this to other functions operating
940
if (cache->seek_not_done)
942
if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
945
unlock_io_cache(cache);
949
len= my_read(cache->file, cache->buffer, length, cache->myflags);
951
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
952
cache->error= (len == length ? 0 : (int) len);
953
cache->pos_in_file= pos_in_file;
955
/* Copy important values to the share. */
956
cshare->error= cache->error;
957
cshare->read_end= cache->read_end;
958
cshare->pos_in_file= pos_in_file;
960
/* Mark all threads as running and wake them. */
961
unlock_io_cache(cache);
966
With a synchronized write/read cache readers always come here.
967
Copy important values from the share.
969
cache->error= cshare->error;
970
cache->read_end= cshare->read_end;
971
cache->pos_in_file= cshare->pos_in_file;
973
len= ((cache->error == -1) ? (size_t) -1 :
974
(size_t) (cache->read_end - cache->buffer));
976
cache->read_pos= cache->buffer;
977
cache->seek_not_done= 0;
978
if (len == 0 || len == (size_t) -1)
980
cache->error= (int) left_length;
983
cnt= (len > Count) ? Count : len;
984
memcpy(Buffer, cache->read_pos, cnt);
988
cache->read_pos+= cnt;
995
Copy data from write cache to read cache.
998
copy_to_read_buffer()
999
write_cache The write cache.
1000
write_buffer The source of data, mostly the cache buffer.
1001
write_length The number of bytes to copy.
1004
The write thread will wait for all read threads to join the cache
1005
lock. Then it copies the data over and wakes the read threads.
1011
static void copy_to_read_buffer(IO_CACHE *write_cache,
1012
const unsigned char *write_buffer, size_t write_length)
1014
IO_CACHE_SHARE *cshare= write_cache->share;
1016
assert(cshare->source_cache == write_cache);
1018
write_length is usually less or equal to buffer_length.
1019
It can be bigger if _my_b_write() is called with a big length.
1021
while (write_length)
1023
size_t copy_length= min(write_length, write_cache->buffer_length);
1026
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1027
/* The writing thread does always have the lock when it awakes. */
1030
memcpy(cshare->buffer, write_buffer, copy_length);
1033
cshare->read_end= cshare->buffer + copy_length;
1034
cshare->pos_in_file= write_cache->pos_in_file;
1036
/* Mark all threads as running and wake them. */
1037
unlock_io_cache(write_cache);
1039
write_buffer+= copy_length;
1040
write_length-= copy_length;
1046
Do sequential read from the SEQ_READ_APPEND cache.
1048
We do this in three stages:
1049
- first read from info->buffer
1050
- then if there are still data to read, try the file descriptor
1051
- afterwards, if there are still data to read, try append buffer
1058
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1060
size_t length, diff_length, left_length, save_count, max_length;
1061
my_off_t pos_in_file;
1064
/* first, read the regular buffer */
1065
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1067
assert(Count > left_length); /* User is not using my_b_read() */
1068
memcpy(Buffer,info->read_pos, left_length);
1069
Buffer+=left_length;
1072
lock_append_buffer(info);
1074
/* pos_in_file always point on where info->buffer was read */
1075
if ((pos_in_file=info->pos_in_file +
1076
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1077
goto read_append_buffer;
1080
With read-append cache we must always do a seek before we read,
1081
because the write could have moved the file pointer astray
1083
if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1086
unlock_append_buffer(info);
1089
info->seek_not_done=0;
1091
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1093
/* now the second stage begins - read from file descriptor */
1094
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1096
/* Fill first intern buffer */
1099
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1100
if ((read_length= my_read(info->file,Buffer, length,
1101
info->myflags)) == (size_t) -1)
1104
unlock_append_buffer(info);
1108
Buffer+=read_length;
1109
pos_in_file+=read_length;
1111
if (read_length != length)
1114
We only got part of data; Read the rest of the data from the
1117
goto read_append_buffer;
1119
left_length+=length;
1123
max_length= info->read_length-diff_length;
1124
if (max_length > (info->end_of_file - pos_in_file))
1125
max_length= (size_t) (info->end_of_file - pos_in_file);
1129
goto read_append_buffer;
1130
length=0; /* Didn't read any more chars */
1134
length= my_read(info->file,info->buffer, max_length, info->myflags);
1135
if (length == (size_t) -1)
1138
unlock_append_buffer(info);
1143
memcpy(Buffer, info->buffer, length);
1148
added the line below to make
1149
assert(pos_in_file==info->end_of_file) pass.
1150
otherwise this does not appear to be needed
1152
pos_in_file += length;
1153
goto read_append_buffer;
1156
unlock_append_buffer(info);
1157
info->read_pos=info->buffer+Count;
1158
info->read_end=info->buffer+length;
1159
info->pos_in_file=pos_in_file;
1160
memcpy(Buffer,info->buffer,(size_t) Count);
1166
Read data from the current write buffer.
1167
Count should never be == 0 here (The code will work even if count is 0)
1171
/* First copy the data to Count */
1172
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1174
size_t transfer_len;
1176
assert(info->append_read_pos <= info->write_pos);
1178
TODO: figure out if the assert below is needed or correct.
1180
assert(pos_in_file == info->end_of_file);
1181
copy_len=min(Count, len_in_buff);
1182
memcpy(Buffer, info->append_read_pos, copy_len);
1183
info->append_read_pos += copy_len;
1186
info->error = save_count - Count;
1188
/* Fill read buffer with data from write buffer */
1189
memcpy(info->buffer, info->append_read_pos,
1190
(size_t) (transfer_len=len_in_buff - copy_len));
1191
info->read_pos= info->buffer;
1192
info->read_end= info->buffer+transfer_len;
1193
info->append_read_pos=info->write_pos;
1194
info->pos_in_file=pos_in_file+copy_len;
1195
info->end_of_file+=len_in_buff;
1197
unlock_append_buffer(info);
1198
return Count ? 1 : 0;
1202
541
#ifdef HAVE_AIOWAIT
1205
Read from the IO_CACHE into a buffer and feed asynchronously
1206
from disk when needed.
1210
info IO_CACHE pointer
1211
Buffer Buffer to retrieve count bytes from file
1212
Count Number of bytes to read into Buffer
1215
-1 An error has occurred; my_errno is set.
1217
1 An error has occurred; IO_CACHE to error state.
1220
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
545
* Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
547
* @param info st_io_cache pointer
548
* @param Buffer Buffer to retrieve count bytes from file
549
* @param Count Number of bytes to read into Buffer
551
* @retval -1 An error has occurred; errno is set.
553
* @retval 1 An error has occurred; st_io_cache to error state.
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
1222
size_t length,read_length,diff_length,left_length,use_length,org_Count;
557
size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1223
558
size_t max_length;
1224
559
my_off_t next_pos_in_file;
1225
560
unsigned char *read_buffer;
1442
777
info->seek_not_done=0;
1444
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1445
return info->error= -1;
1448
In case of a shared I/O cache with a writer we normally do direct
1449
write cache to read cache copy. Simulate this here by direct
1450
caller buffer to read cache copy. Do it after the write so that
1451
the cache readers actions on the flushed part can go in parallel
1452
with the write of the extra stuff. copy_to_read_buffer()
1453
synchronizes writer and readers so that after this call the
1454
readers can act on the extra stuff while the writer can go ahead
1455
and prepare the next output. copy_to_read_buffer() relies on
1459
copy_to_read_buffer(info, Buffer, length);
1463
info->pos_in_file+=length;
1465
memcpy(info->write_pos,Buffer,(size_t) Count);
1466
info->write_pos+=Count;
1472
Append a block to the write buffer.
1473
This is done with the buffer locked to ensure that we don't read from
1474
the write buffer before we are ready with it.
1477
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1479
size_t rest_length,length;
1482
Assert that we cannot come here with a shared cache. If we do one
1483
day, we might need to add a call to copy_to_read_buffer().
1485
assert(!info->share);
1487
lock_append_buffer(info);
1488
rest_length= (size_t) (info->write_end - info->write_pos);
1489
if (Count <= rest_length)
1491
memcpy(info->write_pos, Buffer, rest_length);
1492
Buffer+=rest_length;
1494
info->write_pos+=rest_length;
1495
if (my_b_flush_io_cache(info,0))
1497
unlock_append_buffer(info);
1500
if (Count >= IO_SIZE)
1501
{ /* Fill first intern buffer */
1502
length=Count & (size_t) ~(IO_SIZE-1);
1503
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1505
unlock_append_buffer(info);
1506
return info->error= -1;
1510
info->end_of_file+=length;
1514
memcpy(info->write_pos,Buffer,(size_t) Count);
1515
info->write_pos+=Count;
1516
unlock_append_buffer(info);
1521
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1524
Sasha: We are not writing this with the ? operator to avoid hitting
1525
a possible compiler bug. At least gcc 2.95 cannot deal with
1526
several layers of ternary operators that evaluated comma(,) operator
1527
expressions inside - I do have a test case if somebody wants it
1529
if (info->type == SEQ_READ_APPEND)
1530
return my_b_append(info, Buffer, Count);
1531
return my_b_write(info, Buffer, Count);
1536
Write a block to disk where part of the data may be inside the record
1537
buffer. As all write calls to the data goes through the cache,
1538
we will never get a seek over the end of the buffer
1541
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
779
if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
return info->error= -1;
783
Buffer+=length_local;
784
info->pos_in_file+=length_local;
786
memcpy(info->write_pos,Buffer,(size_t) Count);
787
info->write_pos+=Count;
793
* Write a block to disk where part of the data may be inside the record buffer.
794
* As all write calls to the data goes through the cache,
795
* we will never get a seek over the end of the buffer.
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
1548
Assert that we cannot come here with a shared cache. If we do one
1549
day, we might need to add a call to copy_to_read_buffer().
1551
assert(!info->share);
1553
803
if (pos < info->pos_in_file)
1555
805
/* Of no overlap, write everything without buffering */
1556
806
if (pos + Count <= info->pos_in_file)
1557
807
return (pwrite(info->file, Buffer, Count, pos) == 0);
1558
808
/* Write the part of the block that is before buffer */
1559
length= (uint32_t) (info->pos_in_file - pos);
1560
if (pwrite(info->file, Buffer, length, pos) == 0)
809
length_local= (uint32_t) (info->pos_in_file - pos);
810
if (pwrite(info->file, Buffer, length_local, pos) == 0)
1561
811
info->error= error= -1;
1566
info->seek_not_done=1;
812
Buffer+=length_local;
814
Count-= length_local;
1570
817
/* Check if we want to write inside the used part of the buffer.*/
1571
length= (size_t) (info->write_end - info->buffer);
1572
if (pos < info->pos_in_file + length)
818
length_local= (size_t) (info->write_end - info->buffer);
819
if (pos < info->pos_in_file + length_local)
1574
821
size_t offset= (size_t) (pos - info->pos_in_file);
1578
memcpy(info->buffer+offset, Buffer, length);
1581
/* Fix length of buffer if the new data was larger */
1582
if (info->buffer+length > info->write_pos)
1583
info->write_pos=info->buffer+length;
822
length_local-=offset;
823
if (length_local > Count)
825
memcpy(info->buffer+offset, Buffer, length_local);
826
Buffer+=length_local;
827
Count-= length_local;
828
/* Fix length_local of buffer if the new data was larger */
829
if (info->buffer+length_local > info->write_pos)
830
info->write_pos=info->buffer+length_local;
1678
UNLOCK_APPEND_BUFFER;
909
unlock_append_buffer(info, need_append_buffer_lock);
1683
Free an IO_CACHE object
1687
info IO_CACHE Handle to free
1690
It's currently safe to call this if one has called init_io_cache()
1691
on the 'info' object, even if init_io_cache() failed.
1692
This function is also safe to call twice with the same handle.
1699
int end_io_cache(IO_CACHE *info)
915
* Free an st_io_cache object
918
* It's currently safe to call this if one has called init_io_cache()
919
* on the 'info' object, even if init_io_cache() failed.
920
* This function is also safe to call twice with the same handle.
922
* @param info st_io_cache Handle to free
927
int st_io_cache::end_io_cache()
1702
IO_CACHE_CALLBACK pre_close;
1705
Every thread must call remove_io_thread(). The last one destroys
1708
assert(!info->share || !info->share->total_threads);
1710
if ((pre_close=info->pre_close))
1715
if (info->alloced_buffer)
1717
info->alloced_buffer=0;
1718
if (info->file != -1) /* File doesn't exist */
1719
error= my_b_flush_io_cache(info,1);
1720
free((unsigned char*) info->buffer);
1721
info->buffer=info->read_pos=(unsigned char*) 0;
1723
if (info->type == SEQ_READ_APPEND)
1725
/* Destroy allocated mutex */
1726
info->type= TYPE_NOT_SET;
1727
pthread_mutex_destroy(&info->append_buffer_lock);
938
if (type == READ_CACHE)
939
global_read_buffer.sub(buffer_length);
941
if (file != -1) /* File doesn't exist */
942
_error= my_b_flush_io_cache(this, 1);
943
free((unsigned char*) buffer);
944
buffer=read_pos=(unsigned char*) 0;
1730
948
} /* end_io_cache */
1733
/**********************************************************************
1734
Testing of MF_IOCACHE
1735
**********************************************************************/
1741
void die(const char* fmt, ...)
1744
va_start(va_args,fmt);
1745
fprintf(stderr,"Error:");
1746
vfprintf(stderr, fmt,va_args);
1747
fprintf(stderr,", errno=%d\n", errno);
1751
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1754
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1755
die("Could not open %s", fname);
1756
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1757
die("failed in init_io_cache()");
1761
void close_file(IO_CACHE* info)
1764
my_close(info->file, MYF(MY_WME));
1767
int main(int argc, char** argv)
1769
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1771
const char* fname="/tmp/iocache.test";
1772
int cache_size=16384;
1774
int max_block,total_bytes=0;
1775
int i,num_loops=100,error=0;
1777
char* block, *block_end;
1779
max_block = cache_size*3;
1780
if (!(block=(char*)malloc(max_block)))
1781
die("Not enough memory to allocate test block");
1782
block_end = block + max_block;
1783
for (p = block,i=0; p < block_end;i++)
1787
if (my_stat(fname,&status, MYF(0)) &&
1788
my_delete(fname,MYF(MY_WME)))
1790
die("Delete of %s failed, aborting", fname);
1792
open_file(fname,&sra_cache, cache_size);
1793
for (i = 0; i < num_loops; i++)
1796
int block_size = abs(rand() % max_block);
1797
int4store(buf, block_size);
1798
if (my_b_append(&sra_cache,buf,4) ||
1799
my_b_append(&sra_cache, block, block_size))
1800
die("write failed");
1801
total_bytes += 4+block_size;
1803
close_file(&sra_cache);
1805
if (!my_stat(fname,&status,MYF(MY_WME)))
1806
die("%s failed to stat, but I had just closed it,\
1807
wonder how that happened");
1808
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1809
llstr(status.st_size,llstr_buf),
1811
my_delete(fname, MYF(MY_WME));
1812
/* check correctness of tests */
1813
if (total_bytes != status.st_size)
1815
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1816
supposedly written\n");
950
} /* namespace internal */
951
} /* namespace drizzled */