477
460
info->file is a pipe or socket or FIFO. We never should have tried
478
461
to seek on that. See Bugs#25807 and #22828 for more info.
480
assert(errno != ESPIPE);
463
assert(my_errno != ESPIPE);
486
diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
469
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
470
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
471
{ /* Fill first intern buffer */
489
472
size_t read_length;
490
if (info->end_of_file <= pos_in_file_local)
473
if (info->end_of_file <= pos_in_file)
491
474
{ /* End of file */
492
475
info->error= (int) left_length;
495
length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
478
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
479
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
498
482
info->error= (read_length == (size_t) -1 ? -1 :
499
483
(int) (read_length+left_length));
502
Count-= length_local;
503
Buffer+= length_local;
504
pos_in_file_local+= length_local;
505
left_length+= length_local;
509
493
max_length= info->read_length-diff_length;
510
494
if (info->type != READ_FIFO &&
511
max_length > (info->end_of_file - pos_in_file_local))
512
max_length= (size_t) (info->end_of_file - pos_in_file_local);
495
max_length > (info->end_of_file - pos_in_file))
496
max_length= (size_t) (info->end_of_file - pos_in_file);
517
info->error= static_cast<int>(left_length); /* We only got this many char */
501
info->error= left_length; /* We only got this many char */
520
length_local=0; /* Didn't read any chars */
504
length=0; /* Didn't read any chars */
522
else if (( length_local= my_read(info->file,info->buffer, max_length,
506
else if ((length= my_read(info->file,info->buffer, max_length,
523
507
info->myflags)) < Count ||
524
length_local == (size_t) -1)
508
length == (size_t) -1)
526
if ( length_local != (size_t) -1)
527
memcpy(Buffer, info->buffer, length_local);
528
info->pos_in_file= pos_in_file_local;
529
info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
510
if (length != (size_t) -1)
511
memcpy(Buffer, info->buffer, length);
512
info->pos_in_file= pos_in_file;
513
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
514
info->read_pos=info->read_end=info->buffer;
533
517
info->read_pos=info->buffer+Count;
534
info->read_end=info->buffer+ length_local;
535
info->pos_in_file=pos_in_file_local;
518
info->read_end=info->buffer+length;
519
info->pos_in_file=pos_in_file;
536
520
memcpy(Buffer, info->buffer, Count);
526
Prepare IO_CACHE for shared use.
529
init_io_cache_share()
530
read_cache A read cache. This will be copied for
531
every thread after setup.
533
write_cache If non-NULL a write cache that is to be
534
synchronized with the read caches.
535
num_threads Number of threads sharing the cache
536
including the write thread if any.
540
The shared cache is used so: One IO_CACHE is initialized with
541
init_io_cache(). This includes the allocation of a buffer. Then a
542
share is allocated and init_io_cache_share() is called with the io
543
cache and the share. Then the io cache is copied for each thread. So
544
every thread has its own copy of IO_CACHE. But the allocated buffer
545
is shared because cache->buffer is the same for all caches.
547
One thread reads data from the file into the buffer. All threads
548
read from the buffer, but every thread maintains its own set of
549
pointers into the buffer. When all threads have used up the buffer
550
contents, one of the threads reads the next block of data into the
551
buffer. To accomplish this, each thread enters the cache lock before
552
accessing the buffer. They wait in lock_io_cache() until all threads
553
joined the lock. The last thread entering the lock is in charge of
554
reading from file to buffer. It wakes all threads when done.
556
Synchronizing a write cache to the read caches works so: Whenever
557
the write buffer needs a flush, the write thread enters the lock and
558
waits for all other threads to enter the lock too. They do this when
559
they have used up the read buffer. When all threads are in the lock,
560
the write thread copies the write buffer to the read buffer and
563
share->running_threads is the number of threads not being in the
564
cache lock. When entering lock_io_cache() the number is decreased.
565
When the thread that fills the buffer enters unlock_io_cache() the
566
number is reset to the number of threads. The condition
567
running_threads == 0 means that all threads are in the lock. Bumping
568
up the number to the full count is non-intuitive. But increasing the
569
number by one for each thread that leaves the lock could lead to a
570
solo run of one thread. The last thread to join a lock reads from
571
file to buffer, wakes the other threads, processes the data in the
572
cache and enters the lock again. If no other thread left the lock
573
meanwhile, it would think it's the last one again and read the next
576
The share has copies of 'error', 'buffer', 'read_end', and
577
'pos_in_file' from the thread that filled the buffer. We may not be
578
able to access this information directly from its cache because the
579
thread may be removed from the share before the variables could be
580
copied by all other threads. Or, if a write buffer is synchronized,
581
it would change its 'pos_in_file' after waking the other threads,
582
possibly before they could copy its value.
584
However, the 'buffer' variable in the share is for a synchronized
585
write cache. It needs to know where to put the data. Otherwise it
586
would need access to the read cache of one of the threads that is
587
not yet removed from the share.
593
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
594
IO_CACHE *write_cache, uint32_t num_threads)
596
assert(num_threads > 1);
597
assert(read_cache->type == READ_CACHE);
598
assert(!write_cache || (write_cache->type == WRITE_CACHE));
600
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
601
pthread_cond_init(&cshare->cond, 0);
602
pthread_cond_init(&cshare->cond_writer, 0);
604
cshare->running_threads= num_threads;
605
cshare->total_threads= num_threads;
606
cshare->error= 0; /* Initialize. */
607
cshare->buffer= read_cache->buffer;
608
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
609
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
610
cshare->source_cache= write_cache; /* Can be NULL. */
612
read_cache->share= cshare;
613
read_cache->read_function= _my_b_read_r;
614
read_cache->current_pos= NULL;
615
read_cache->current_end= NULL;
618
write_cache->share= cshare;
625
Remove a thread from shared access to IO_CACHE.
629
cache The IO_CACHE to be removed from the share.
633
Every thread must do that on exit for not to deadlock other threads.
635
The last thread destroys the pthread resources.
637
A writer flushes its cache first.
643
void remove_io_thread(IO_CACHE *cache)
645
IO_CACHE_SHARE *cshare= cache->share;
648
/* If the writer goes, it needs to flush the write cache. */
649
if (cache == cshare->source_cache)
650
flush_io_cache(cache);
652
pthread_mutex_lock(&cshare->mutex);
654
/* Remove from share. */
655
total= --cshare->total_threads;
657
/* Detach from share. */
660
/* If the writer goes, let the readers know. */
661
if (cache == cshare->source_cache)
663
cshare->source_cache= NULL;
666
/* If all threads are waiting for me to join the lock, wake them. */
667
if (!--cshare->running_threads)
669
pthread_cond_signal(&cshare->cond_writer);
670
pthread_cond_broadcast(&cshare->cond);
673
pthread_mutex_unlock(&cshare->mutex);
677
pthread_cond_destroy (&cshare->cond_writer);
678
pthread_cond_destroy (&cshare->cond);
679
pthread_mutex_destroy(&cshare->mutex);
687
Lock IO cache and wait for all other threads to join.
691
cache The cache of the thread entering the lock.
692
pos File position of the block to read.
693
Unused for the write thread.
697
Wait for all threads to finish with the current buffer. We want
698
all threads to proceed in concert. The last thread to join
699
lock_io_cache() will read the block from file and all threads start
700
to use it. Then they will join again for reading the next block.
702
The waiting threads detect a fresh buffer by comparing
703
cshare->pos_in_file with the position they want to process next.
704
Since the first block may start at position 0, we take
705
cshare->read_end as an additional condition. This variable is
706
initialized to NULL and will be set after a block of data is written
710
1 OK, lock in place, go ahead and read.
711
0 OK, unlocked, another thread did the read.
714
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
716
IO_CACHE_SHARE *cshare= cache->share;
718
/* Enter the lock. */
719
pthread_mutex_lock(&cshare->mutex);
720
cshare->running_threads--;
722
if (cshare->source_cache)
724
/* A write cache is synchronized to the read caches. */
726
if (cache == cshare->source_cache)
728
/* The writer waits until all readers are here. */
729
while (cshare->running_threads)
731
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
733
/* Stay locked. Leave the lock later by unlock_io_cache(). */
737
/* The last thread wakes the writer. */
738
if (!cshare->running_threads)
740
pthread_cond_signal(&cshare->cond_writer);
744
Readers wait until the data is copied from the writer. Another
745
reason to stop waiting is the removal of the write thread. If this
746
happens, we leave the lock with old data in the buffer.
748
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
749
cshare->source_cache)
751
pthread_cond_wait(&cshare->cond, &cshare->mutex);
755
If the writer was removed from the share while this thread was
756
asleep, we need to simulate an EOF condition. The writer cannot
757
reset the share variables as they might still be in use by readers
758
of the last block. When we awake here then because the last
759
joining thread signalled us. If the writer is not the last, it
760
will not signal. So it is safe to clear the buffer here.
762
if (!cshare->read_end || (cshare->pos_in_file < pos))
764
cshare->read_end= cshare->buffer; /* Empty buffer. */
765
cshare->error= 0; /* EOF is not an error. */
771
There are read caches only. The last thread arriving in
772
lock_io_cache() continues with a locked cache and reads the block.
774
if (!cshare->running_threads)
776
/* Stay locked. Leave the lock later by unlock_io_cache(). */
781
All other threads wait until the requested block is read by the
782
last thread arriving. Another reason to stop waiting is the
783
removal of a thread. If this leads to all threads being in the
784
lock, we have to continue also. The first of the awaken threads
785
will then do the read.
787
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
788
cshare->running_threads)
790
pthread_cond_wait(&cshare->cond, &cshare->mutex);
793
/* If the block is not yet read, continue with a locked cache and read. */
794
if (!cshare->read_end || (cshare->pos_in_file < pos))
796
/* Stay locked. Leave the lock later by unlock_io_cache(). */
800
/* Another thread did read the block already. */
804
Leave the lock. Do not call unlock_io_cache() later. The thread that
805
filled the buffer did this and marked all threads as running.
807
pthread_mutex_unlock(&cshare->mutex);
817
cache The cache of the thread leaving the lock.
820
This is called by the thread that filled the buffer. It marks all
821
threads as running and awakes them. This must not be done by any
824
Do not signal cond_writer. Either there is no writer or the writer
825
is the only one who can call this function.
827
The reason for resetting running_threads to total_threads before
828
waking all other threads is that it could be possible that this
829
thread is so fast with processing the buffer that it enters the lock
830
before even one other thread has left it. If every awoken thread
831
would increase running_threads by one, this thread could think that
832
he is again the last to join and would not wait for the other
833
threads to process the data.
839
static void unlock_io_cache(IO_CACHE *cache)
841
IO_CACHE_SHARE *cshare= cache->share;
843
cshare->running_threads= cshare->total_threads;
844
pthread_cond_broadcast(&cshare->cond);
845
pthread_mutex_unlock(&cshare->mutex);
851
Read from IO_CACHE when it is shared between several threads.
855
cache IO_CACHE pointer
856
Buffer Buffer to retrieve count bytes from file
857
Count Number of bytes to read into Buffer
860
This function is only called from the my_b_read() macro when there
861
isn't enough characters in the buffer to satisfy the request.
865
It works as follows: when a thread tries to read from a file (that
866
is, after using all the data from the (shared) buffer), it just
867
hangs on lock_io_cache(), waiting for other threads. When the very
868
last thread attempts a read, lock_io_cache() returns 1, the thread
869
does actual IO and unlock_io_cache(), which signals all the waiting
870
threads that data is in the buffer.
874
When changing this function, be careful with handling file offsets
875
(end-of_file, pos_in_file). Do not cast them to possibly smaller
876
types than my_off_t unless you can be sure that their value fits.
877
Same applies to differences of file offsets. (Bug #11527)
879
When changing this function, check _my_b_read(). It might need the
883
0 we succeeded in reading all data
884
1 Error: can't read requested characters
887
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
889
my_off_t pos_in_file;
890
size_t length, diff_length, left_length;
891
IO_CACHE_SHARE *cshare= cache->share;
893
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
895
assert(Count >= left_length); /* User is not using my_b_read() */
896
memcpy(Buffer, cache->read_pos, left_length);
897
Buffer+= left_length;
904
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
905
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
906
length=IO_ROUND_UP(Count+diff_length)-diff_length;
907
length= ((length <= cache->read_length) ?
908
length + IO_ROUND_DN(cache->read_length - length) :
909
length - IO_ROUND_UP(length - cache->read_length));
910
if (cache->type != READ_FIFO &&
911
(length > (cache->end_of_file - pos_in_file)))
912
length= (size_t) (cache->end_of_file - pos_in_file);
915
cache->error= (int) left_length;
918
if (lock_io_cache(cache, pos_in_file))
920
/* With a synchronized write/read cache we won't come here... */
921
assert(!cshare->source_cache);
923
... unless the writer has gone before this thread entered the
924
lock. Simulate EOF in this case. It can be distinguished by
932
Whenever a function which operates on IO_CACHE flushes/writes
933
some part of the IO_CACHE to disk it will set the property
934
"seek_not_done" to indicate this to other functions operating
937
if (cache->seek_not_done)
939
if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
942
unlock_io_cache(cache);
946
len= my_read(cache->file, cache->buffer, length, cache->myflags);
948
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
949
cache->error= (len == length ? 0 : (int) len);
950
cache->pos_in_file= pos_in_file;
952
/* Copy important values to the share. */
953
cshare->error= cache->error;
954
cshare->read_end= cache->read_end;
955
cshare->pos_in_file= pos_in_file;
957
/* Mark all threads as running and wake them. */
958
unlock_io_cache(cache);
963
With a synchronized write/read cache readers always come here.
964
Copy important values from the share.
966
cache->error= cshare->error;
967
cache->read_end= cshare->read_end;
968
cache->pos_in_file= cshare->pos_in_file;
970
len= ((cache->error == -1) ? (size_t) -1 :
971
(size_t) (cache->read_end - cache->buffer));
973
cache->read_pos= cache->buffer;
974
cache->seek_not_done= 0;
975
if (len == 0 || len == (size_t) -1)
977
cache->error= (int) left_length;
980
cnt= (len > Count) ? Count : len;
981
memcpy(Buffer, cache->read_pos, cnt);
985
cache->read_pos+= cnt;
992
Copy data from write cache to read cache.
995
copy_to_read_buffer()
996
write_cache The write cache.
997
write_buffer The source of data, mostly the cache buffer.
998
write_length The number of bytes to copy.
1001
The write thread will wait for all read threads to join the cache
1002
lock. Then it copies the data over and wakes the read threads.
1008
static void copy_to_read_buffer(IO_CACHE *write_cache,
1009
const unsigned char *write_buffer, size_t write_length)
1011
IO_CACHE_SHARE *cshare= write_cache->share;
1013
assert(cshare->source_cache == write_cache);
1015
write_length is usually less or equal to buffer_length.
1016
It can be bigger if _my_b_write() is called with a big length.
1018
while (write_length)
1020
size_t copy_length= cmin(write_length, write_cache->buffer_length);
1023
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1024
/* The writing thread does always have the lock when it awakes. */
1027
memcpy(cshare->buffer, write_buffer, copy_length);
1030
cshare->read_end= cshare->buffer + copy_length;
1031
cshare->pos_in_file= write_cache->pos_in_file;
1033
/* Mark all threads as running and wake them. */
1034
unlock_io_cache(write_cache);
1036
write_buffer+= copy_length;
1037
write_length-= copy_length;
1043
Do sequential read from the SEQ_READ_APPEND cache.
1045
We do this in three stages:
1046
- first read from info->buffer
1047
- then if there are still data to read, try the file descriptor
1048
- afterwards, if there are still data to read, try append buffer
1055
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1057
size_t length, diff_length, left_length, save_count, max_length;
1058
my_off_t pos_in_file;
1061
/* first, read the regular buffer */
1062
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1064
assert(Count > left_length); /* User is not using my_b_read() */
1065
memcpy(Buffer,info->read_pos, left_length);
1066
Buffer+=left_length;
1069
lock_append_buffer(info);
1071
/* pos_in_file always point on where info->buffer was read */
1072
if ((pos_in_file=info->pos_in_file +
1073
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1074
goto read_append_buffer;
1077
With read-append cache we must always do a seek before we read,
1078
because the write could have moved the file pointer astray
1080
if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1083
unlock_append_buffer(info);
1086
info->seek_not_done=0;
1088
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1090
/* now the second stage begins - read from file descriptor */
1091
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1093
/* Fill first intern buffer */
1096
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1097
if ((read_length= my_read(info->file,Buffer, length,
1098
info->myflags)) == (size_t) -1)
1101
unlock_append_buffer(info);
1105
Buffer+=read_length;
1106
pos_in_file+=read_length;
1108
if (read_length != length)
1111
We only got part of data; Read the rest of the data from the
1114
goto read_append_buffer;
1116
left_length+=length;
1120
max_length= info->read_length-diff_length;
1121
if (max_length > (info->end_of_file - pos_in_file))
1122
max_length= (size_t) (info->end_of_file - pos_in_file);
1126
goto read_append_buffer;
1127
length=0; /* Didn't read any more chars */
1131
length= my_read(info->file,info->buffer, max_length, info->myflags);
1132
if (length == (size_t) -1)
1135
unlock_append_buffer(info);
1140
memcpy(Buffer, info->buffer, length);
1145
added the line below to make
1146
assert(pos_in_file==info->end_of_file) pass.
1147
otherwise this does not appear to be needed
1149
pos_in_file += length;
1150
goto read_append_buffer;
1153
unlock_append_buffer(info);
1154
info->read_pos=info->buffer+Count;
1155
info->read_end=info->buffer+length;
1156
info->pos_in_file=pos_in_file;
1157
memcpy(Buffer,info->buffer,(size_t) Count);
1163
Read data from the current write buffer.
1164
Count should never be == 0 here (The code will work even if count is 0)
1168
/* First copy the data to Count */
1169
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1171
size_t transfer_len;
1173
assert(info->append_read_pos <= info->write_pos);
1175
TODO: figure out if the assert below is needed or correct.
1177
assert(pos_in_file == info->end_of_file);
1178
copy_len=cmin(Count, len_in_buff);
1179
memcpy(Buffer, info->append_read_pos, copy_len);
1180
info->append_read_pos += copy_len;
1183
info->error = save_count - Count;
1185
/* Fill read buffer with data from write buffer */
1186
memcpy(info->buffer, info->append_read_pos,
1187
(size_t) (transfer_len=len_in_buff - copy_len));
1188
info->read_pos= info->buffer;
1189
info->read_end= info->buffer+transfer_len;
1190
info->append_read_pos=info->write_pos;
1191
info->pos_in_file=pos_in_file+copy_len;
1192
info->end_of_file+=len_in_buff;
1194
unlock_append_buffer(info);
1195
return Count ? 1 : 0;
541
1199
#ifdef HAVE_AIOWAIT
545
* Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
547
* @param info st_io_cache pointer
548
* @param Buffer Buffer to retrieve count bytes from file
549
* @param Count Number of bytes to read into Buffer
551
* @retval -1 An error has occurred; errno is set.
553
* @retval 1 An error has occurred; st_io_cache to error state.
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
1202
Read from the IO_CACHE into a buffer and feed asynchronously
1203
from disk when needed.
1207
info IO_CACHE pointer
1208
Buffer Buffer to retrieve count bytes from file
1209
Count Number of bytes to read into Buffer
1212
-1 An error has occurred; my_errno is set.
1214
1 An error has occurred; IO_CACHE to error state.
1217
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
557
size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1219
size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1220
size_t max_length;
559
1221
my_off_t next_pos_in_file;
560
1222
unsigned char *read_buffer;
777
1439
info->seek_not_done=0;
779
if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
return info->error= -1;
783
Buffer+=length_local;
784
info->pos_in_file+=length_local;
786
memcpy(info->write_pos,Buffer,(size_t) Count);
787
info->write_pos+=Count;
793
* Write a block to disk where part of the data may be inside the record buffer.
794
* As all write calls to the data goes through the cache,
795
* we will never get a seek over the end of the buffer.
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
1441
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1442
return info->error= -1;
1445
In case of a shared I/O cache with a writer we normally do direct
1446
write cache to read cache copy. Simulate this here by direct
1447
caller buffer to read cache copy. Do it after the write so that
1448
the cache readers actions on the flushed part can go in parallel
1449
with the write of the extra stuff. copy_to_read_buffer()
1450
synchronizes writer and readers so that after this call the
1451
readers can act on the extra stuff while the writer can go ahead
1452
and prepare the next output. copy_to_read_buffer() relies on
1456
copy_to_read_buffer(info, Buffer, length);
1460
info->pos_in_file+=length;
1462
memcpy(info->write_pos,Buffer,(size_t) Count);
1463
info->write_pos+=Count;
1469
Append a block to the write buffer.
1470
This is done with the buffer locked to ensure that we don't read from
1471
the write buffer before we are ready with it.
1474
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1476
size_t rest_length,length;
1479
Assert that we cannot come here with a shared cache. If we do one
1480
day, we might need to add a call to copy_to_read_buffer().
1482
assert(!info->share);
1484
lock_append_buffer(info);
1485
rest_length= (size_t) (info->write_end - info->write_pos);
1486
if (Count <= rest_length)
1488
memcpy(info->write_pos, Buffer, rest_length);
1489
Buffer+=rest_length;
1491
info->write_pos+=rest_length;
1492
if (my_b_flush_io_cache(info,0))
1494
unlock_append_buffer(info);
1497
if (Count >= IO_SIZE)
1498
{ /* Fill first intern buffer */
1499
length=Count & (size_t) ~(IO_SIZE-1);
1500
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1502
unlock_append_buffer(info);
1503
return info->error= -1;
1507
info->end_of_file+=length;
1511
memcpy(info->write_pos,Buffer,(size_t) Count);
1512
info->write_pos+=Count;
1513
unlock_append_buffer(info);
1518
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1521
Sasha: We are not writing this with the ? operator to avoid hitting
1522
a possible compiler bug. At least gcc 2.95 cannot deal with
1523
several layers of ternary operators that evaluated comma(,) operator
1524
expressions inside - I do have a test case if somebody wants it
1526
if (info->type == SEQ_READ_APPEND)
1527
return my_b_append(info, Buffer, Count);
1528
return my_b_write(info, Buffer, Count);
1533
Write a block to disk where part of the data may be inside the record
1534
buffer. As all write calls to the data goes through the cache,
1535
we will never get a seek over the end of the buffer
1538
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
1545
Assert that we cannot come here with a shared cache. If we do one
1546
day, we might need to add a call to copy_to_read_buffer().
1548
assert(!info->share);
803
1550
if (pos < info->pos_in_file)
805
1552
/* Of no overlap, write everything without buffering */
806
1553
if (pos + Count <= info->pos_in_file)
807
1554
return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1555
/* Write the part of the block that is before buffer */
809
length_local= (uint32_t) (info->pos_in_file - pos);
810
if (pwrite(info->file, Buffer, length_local, pos) == 0)
1556
length= (uint32_t) (info->pos_in_file - pos);
1557
if (pwrite(info->file, Buffer, length, pos) == 0)
811
1558
info->error= error= -1;
812
Buffer+=length_local;
814
Count-= length_local;
1563
info->seek_not_done=1;
817
1567
/* Check if we want to write inside the used part of the buffer.*/
818
length_local= (size_t) (info->write_end - info->buffer);
819
if (pos < info->pos_in_file + length_local)
1568
length= (size_t) (info->write_end - info->buffer);
1569
if (pos < info->pos_in_file + length)
821
1571
size_t offset= (size_t) (pos - info->pos_in_file);
822
length_local-=offset;
823
if (length_local > Count)
825
memcpy(info->buffer+offset, Buffer, length_local);
826
Buffer+=length_local;
827
Count-= length_local;
828
/* Fix length_local of buffer if the new data was larger */
829
if (info->buffer+length_local > info->write_pos)
830
info->write_pos=info->buffer+length_local;
1575
memcpy(info->buffer+offset, Buffer, length);
1578
/* Fix length of buffer if the new data was larger */
1579
if (info->buffer+length > info->write_pos)
1580
info->write_pos=info->buffer+length;
909
unlock_append_buffer(info, need_append_buffer_lock);
1675
UNLOCK_APPEND_BUFFER;
915
* Free an st_io_cache object
918
* It's currently safe to call this if one has called init_io_cache()
919
* on the 'info' object, even if init_io_cache() failed.
920
* This function is also safe to call twice with the same handle.
922
* @param info st_io_cache Handle to free
927
int st_io_cache::end_io_cache()
1680
Free an IO_CACHE object
1684
info IO_CACHE Handle to free
1687
It's currently safe to call this if one has called init_io_cache()
1688
on the 'info' object, even if init_io_cache() failed.
1689
This function is also safe to call twice with the same handle.
1696
int end_io_cache(IO_CACHE *info)
938
if (type == READ_CACHE)
939
global_read_buffer.sub(buffer_length);
941
if (file != -1) /* File doesn't exist */
942
_error= my_b_flush_io_cache(this, 1);
943
free((unsigned char*) buffer);
944
buffer=read_pos=(unsigned char*) 0;
1699
IO_CACHE_CALLBACK pre_close;
1702
Every thread must call remove_io_thread(). The last one destroys
1705
assert(!info->share || !info->share->total_threads);
1707
if ((pre_close=info->pre_close))
1712
if (info->alloced_buffer)
1714
info->alloced_buffer=0;
1715
if (info->file != -1) /* File doesn't exist */
1716
error= my_b_flush_io_cache(info,1);
1717
free((unsigned char*) info->buffer);
1718
info->buffer=info->read_pos=(unsigned char*) 0;
1720
if (info->type == SEQ_READ_APPEND)
1722
/* Destroy allocated mutex */
1723
info->type= TYPE_NOT_SET;
1724
pthread_mutex_destroy(&info->append_buffer_lock);
948
1727
} /* end_io_cache */
950
} /* namespace internal */
951
} /* namespace drizzled */
1730
/**********************************************************************
1731
Testing of MF_IOCACHE
1732
**********************************************************************/
1738
void die(const char* fmt, ...)
1741
va_start(va_args,fmt);
1742
fprintf(stderr,"Error:");
1743
vfprintf(stderr, fmt,va_args);
1744
fprintf(stderr,", errno=%d\n", errno);
1748
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1751
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1752
die("Could not open %s", fname);
1753
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1754
die("failed in init_io_cache()");
1758
void close_file(IO_CACHE* info)
1761
my_close(info->file, MYF(MY_WME));
1764
int main(int argc, char** argv)
1766
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1768
const char* fname="/tmp/iocache.test";
1769
int cache_size=16384;
1771
int max_block,total_bytes=0;
1772
int i,num_loops=100,error=0;
1774
char* block, *block_end;
1776
max_block = cache_size*3;
1777
if (!(block=(char*)malloc(max_block)))
1778
die("Not enough memory to allocate test block");
1779
block_end = block + max_block;
1780
for (p = block,i=0; p < block_end;i++)
1784
if (my_stat(fname,&status, MYF(0)) &&
1785
my_delete(fname,MYF(MY_WME)))
1787
die("Delete of %s failed, aborting", fname);
1789
open_file(fname,&sra_cache, cache_size);
1790
for (i = 0; i < num_loops; i++)
1793
int block_size = abs(rand() % max_block);
1794
int4store(buf, block_size);
1795
if (my_b_append(&sra_cache,buf,4) ||
1796
my_b_append(&sra_cache, block, block_size))
1797
die("write failed");
1798
total_bytes += 4+block_size;
1800
close_file(&sra_cache);
1802
if (!my_stat(fname,&status,MYF(MY_WME)))
1803
die("%s failed to stat, but I had just closed it,\
1804
wonder how that happened");
1805
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1806
llstr(status.st_size,llstr_buf),
1808
my_delete(fname, MYF(MY_WME));
1809
/* check correctness of tests */
1810
if (total_bytes != status.st_size)
1812
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1813
supposedly written\n");