1
/* Copyright (C) 2000 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Cashing of files with only does (sequential) read or writes of fixed-
18
length records. A read isn't allowed to go over file-length. A read is ok
19
if it ends at file-length and next read can try to read after file-length
20
(and get a EOF-error).
21
Possibly use of asyncronic io.
22
macros for read and writes for faster io.
23
Used instead of FILE when reading or writing whole files.
24
This code makes mf_rec_cache obsolete (currently only used by ISAM)
25
One can change info->pos_in_file to a higher value to skip bytes in file if
26
also info->read_pos is set to info->read_end.
27
If called through open_cached_file(), then the temporary file will
28
only be created if a write exeeds the file buffer or if one calls
29
my_b_flush_io_cache().
31
If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32
reading and another for writing. Reads are first done from disk and
33
then done from the write buffer. This is an efficient way to read
34
from a log file when one is writing to it at the same time.
35
For this to work, the file has to be opened in append mode!
36
Note that when one uses SEQ_READ_APPEND, one MUST write using
37
my_b_append ! This is needed because we need to lock the mutex
38
every time we access the write buffer.
41
When one SEQ_READ_APPEND and we are reading and writing at the same time,
42
each time the write buffer gets full and it's written to disk, we will
43
always do a disk read to read a part of the buffer from disk to the
45
This should be fixed so that when we do a my_b_flush_io_cache() and
46
we have been reading the write buffer, we should transfer the rest of the
47
write buffer to the read buffer before we start to reuse it.
50
#define MAP_TO_USE_RAID
51
#include "mysys_priv.h"
54
#include "mysys_err.h"
55
static void my_aiowait(my_aio_result *result);
60
#define lock_append_buffer(info) \
61
pthread_mutex_lock(&(info)->append_buffer_lock)
62
#define unlock_append_buffer(info) \
63
pthread_mutex_unlock(&(info)->append_buffer_lock)
65
#define lock_append_buffer(info)
66
#define unlock_append_buffer(info)
69
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
70
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
73
Setup internal pointers inside IO_CACHE
80
This is called on automaticly on init or reinit of IO_CACHE
81
It must be called externally if one moves or copies an IO_CACHE
85
void setup_io_cache(IO_CACHE* info)
87
/* Ensure that my_b_tell() and my_b_bytes_in_cache works */
88
if (info->type == WRITE_CACHE)
90
info->current_pos= &info->write_pos;
91
info->current_end= &info->write_end;
95
info->current_pos= &info->read_pos;
96
info->current_end= &info->read_end;
102
init_functions(IO_CACHE* info)
104
enum cache_type type= info->type;
108
Must be initialized by the caller. The problem is that
109
_my_b_net_read has to be defined in sql directory because of
110
the dependency on THD, and therefore cannot be visible to
111
programs that link against mysys but know nothing about THD, such
115
case SEQ_READ_APPEND:
116
info->read_function = _my_b_seq_read;
117
info->write_function = 0; /* Force a core if used */
120
info->read_function =
122
info->share ? _my_b_read_r :
125
info->write_function = _my_b_write;
128
setup_io_cache(info);
133
Initialize an IO_CACHE object
137
info cache handler to initialize
138
file File that should be associated to to the handler
139
If == -1 then real_open_cached_file()
140
will be called when it's time to open file.
141
cachesize Size of buffer to allocate for read/write
142
If == 0 then use my_default_record_cache_size
144
seek_offset Where cache should start reading/writing
145
use_async_io Set to 1 of we should use async_io (if avaiable)
146
cache_myflags Bitmap of differnt flags
147
MY_WME | MY_FAE | MY_NABP | MY_FNABP |
148
MY_DONT_CHECK_FILESIZE
155
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
156
enum cache_type type, my_off_t seek_offset,
157
pbool use_async_io, myf cache_myflags)
161
my_off_t end_of_file= ~(my_off_t) 0;
162
DBUG_ENTER("init_io_cache");
163
DBUG_PRINT("enter",("cache: 0x%lx type: %d pos: %ld",
164
(ulong) info, (int) type, (ulong) seek_offset));
167
info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
168
info->pos_in_file= seek_offset;
169
info->pre_close = info->pre_read = info->post_read = 0;
171
info->alloced_buffer = 0;
173
info->seek_not_done= 0;
177
pos= my_tell(file, MYF(0));
178
if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
181
This kind of object doesn't support seek() or tell(). Don't set a
182
flag that will make us again try to seek() later and fail.
184
info->seek_not_done= 0;
186
Additionally, if we're supposed to start somewhere other than the
187
the beginning of whatever this file is, then somebody made a bad
190
DBUG_ASSERT(seek_offset == 0);
193
info->seek_not_done= test(seek_offset != pos);
196
info->disk_writes= 0;
201
if (!cachesize && !(cachesize= my_default_record_cache_size))
202
DBUG_RETURN(1); /* No cache requested */
203
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
204
if (type == READ_CACHE || type == SEQ_READ_APPEND)
205
{ /* Assume file isn't growing */
206
if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
208
/* Calculate end of file to avoid allocating oversized buffers */
209
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
210
/* Need to reset seek_not_done now that we just did a seek. */
211
info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
212
if (end_of_file < seek_offset)
213
end_of_file=seek_offset;
214
/* Trim cache size if the file is very small */
215
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
217
cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
218
use_async_io=0; /* No need to use async */
222
cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
223
if (type != READ_NET && type != WRITE_NET)
225
/* Retry allocating memory in smaller blocks until we get one */
226
cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
230
if (cachesize < min_cache)
231
cachesize = min_cache;
232
buffer_block= cachesize;
233
if (type == SEQ_READ_APPEND)
236
(uchar*) my_malloc(buffer_block,
237
MYF((cache_myflags & ~ MY_WME) |
238
(cachesize == min_cache ? MY_WME : 0)))) != 0)
240
info->write_buffer=info->buffer;
241
if (type == SEQ_READ_APPEND)
242
info->write_buffer = info->buffer + cachesize;
243
info->alloced_buffer=1;
244
break; /* Enough memory found */
246
if (cachesize == min_cache)
247
DBUG_RETURN(2); /* Can't alloc cache */
248
/* Try with less memory */
249
cachesize= (cachesize*3/4 & ~(min_cache-1));
253
DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
254
info->read_length=info->buffer_length=cachesize;
255
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
256
info->request_pos= info->read_pos= info->write_pos = info->buffer;
257
if (type == SEQ_READ_APPEND)
259
info->append_read_pos = info->write_pos = info->write_buffer;
260
info->write_end = info->write_buffer + info->buffer_length;
262
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
265
#if defined(SAFE_MUTEX) && defined(THREAD)
268
/* Clear mutex so that safe_mutex will notice that it's not initialized */
269
bzero((char*) &info->append_buffer_lock, sizeof(info));
273
if (type == WRITE_CACHE)
275
info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
277
info->read_end=info->buffer; /* Nothing in cache */
279
/* End_of_file may be changed by user later */
280
info->end_of_file= end_of_file;
283
init_functions(info);
285
if (use_async_io && ! my_disable_async_io)
287
DBUG_PRINT("info",("Using async io"));
288
info->read_length/=2;
289
info->read_function=_my_b_async_read;
291
info->inited=info->aio_result.pending=0;
294
} /* init_io_cache */
296
/* Wait until current request is ready */
299
static void my_aiowait(my_aio_result *result)
303
struct aio_result_t *tmp;
306
if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
310
DBUG_PRINT("error",("No aio request, error: %d",errno));
311
result->pending=0; /* Assume everythings is ok */
314
((my_aio_result*) tmp)->pending=0;
315
if ((my_aio_result*) tmp == result)
325
Use this to reset cache to re-start reading or to change the type
326
between READ_CACHE <-> WRITE_CACHE
327
If we are doing a reinit of a cache where we have the start of the file
328
in the cache, we are reusing this memory without flushing it to disk.
331
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
332
my_off_t seek_offset,
333
pbool use_async_io __attribute__((unused)),
336
DBUG_ENTER("reinit_io_cache");
337
DBUG_PRINT("enter",("cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
338
(ulong) info, type, (ulong) seek_offset,
341
/* One can't do reinit with the following types */
342
DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
343
type != WRITE_NET && info->type != WRITE_NET &&
344
type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
346
/* If the whole file is in memory, avoid flushing to disk */
348
seek_offset >= info->pos_in_file &&
349
seek_offset <= my_b_tell(info))
351
/* Reuse current buffer without flushing it to disk */
353
if (info->type == WRITE_CACHE && type == READ_CACHE)
355
info->read_end=info->write_pos;
356
info->end_of_file=my_b_tell(info);
358
Trigger a new seek only if we have a valid
361
info->seek_not_done= (info->file != -1);
363
else if (type == WRITE_CACHE)
365
if (info->type == READ_CACHE)
367
info->write_end=info->write_buffer+info->buffer_length;
368
info->seek_not_done=1;
370
info->end_of_file = ~(my_off_t) 0;
372
pos=info->request_pos+(seek_offset-info->pos_in_file);
373
if (type == WRITE_CACHE)
378
my_aiowait(&info->aio_result); /* Wait for outstanding req */
384
If we change from WRITE_CACHE to READ_CACHE, assume that everything
385
after the current positions should be ignored
387
if (info->type == WRITE_CACHE && type == READ_CACHE)
388
info->end_of_file=my_b_tell(info);
389
/* flush cache if we want to reuse it */
390
if (!clear_cache && my_b_flush_io_cache(info,1))
392
info->pos_in_file=seek_offset;
393
/* Better to do always do a seek */
394
info->seek_not_done=1;
395
info->request_pos=info->read_pos=info->write_pos=info->buffer;
396
if (type == READ_CACHE)
398
info->read_end=info->buffer; /* Nothing in cache */
402
info->write_end=(info->buffer + info->buffer_length -
403
(seek_offset & (IO_SIZE-1)));
404
info->end_of_file= ~(my_off_t) 0;
409
init_functions(info);
412
if (use_async_io && ! my_disable_async_io &&
413
((ulong) info->buffer_length <
414
(ulong) (info->end_of_file - seek_offset)))
416
info->read_length=info->buffer_length/2;
417
info->read_function=_my_b_async_read;
422
} /* reinit_io_cache */
431
info IO_CACHE pointer
432
Buffer Buffer to retrieve count bytes from file
433
Count Number of bytes to read into Buffer
436
This function is only called from the my_b_read() macro when there
437
isn't enough characters in the buffer to satisfy the request.
441
When changing this function, be careful with handling file offsets
442
(end-of_file, pos_in_file). Do not cast them to possibly smaller
443
types than my_off_t unless you can be sure that their value fits.
444
Same applies to differences of file offsets.
446
When changing this function, check _my_b_read_r(). It might need the
450
0 we succeeded in reading all data
451
1 Error: can't read requested characters
454
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
456
size_t length,diff_length,left_length, max_length;
457
my_off_t pos_in_file;
458
DBUG_ENTER("_my_b_read");
460
if ((left_length= (size_t) (info->read_end-info->read_pos)))
462
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
463
memcpy(Buffer,info->read_pos, left_length);
468
/* pos_in_file always point on where info->buffer was read */
469
pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
472
Whenever a function which operates on IO_CACHE flushes/writes
473
some part of the IO_CACHE to disk it will set the property
474
"seek_not_done" to indicate this to other functions operating
477
if (info->seek_not_done)
479
if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))
480
!= MY_FILEPOS_ERROR))
482
/* No error, reset seek_not_done flag. */
483
info->seek_not_done= 0;
488
If the seek failed and the error number is ESPIPE, it is because
489
info->file is a pipe or socket or FIFO. We never should have tried
490
to seek on that. See Bugs#25807 and #22828 for more info.
492
DBUG_ASSERT(my_errno != ESPIPE);
498
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
499
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
500
{ /* Fill first intern buffer */
502
if (info->end_of_file <= pos_in_file)
504
info->error= (int) left_length;
507
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
508
if ((read_length= my_read(info->file,Buffer, length, info->myflags))
511
info->error= (read_length == (size_t) -1 ? -1 :
512
(int) (read_length+left_length));
522
max_length= info->read_length-diff_length;
523
if (info->type != READ_FIFO &&
524
max_length > (info->end_of_file - pos_in_file))
525
max_length= (size_t) (info->end_of_file - pos_in_file);
530
info->error= left_length; /* We only got this many char */
533
length=0; /* Didn't read any chars */
535
else if ((length= my_read(info->file,info->buffer, max_length,
536
info->myflags)) < Count ||
537
length == (size_t) -1)
539
if (length != (size_t) -1)
540
memcpy(Buffer, info->buffer, length);
541
info->pos_in_file= pos_in_file;
542
info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
543
info->read_pos=info->read_end=info->buffer;
546
info->read_pos=info->buffer+Count;
547
info->read_end=info->buffer+length;
548
info->pos_in_file=pos_in_file;
549
memcpy(Buffer, info->buffer, Count);
556
Prepare IO_CACHE for shared use.
559
init_io_cache_share()
560
read_cache A read cache. This will be copied for
561
every thread after setup.
563
write_cache If non-NULL a write cache that is to be
564
synchronized with the read caches.
565
num_threads Number of threads sharing the cache
566
including the write thread if any.
570
The shared cache is used so: One IO_CACHE is initialized with
571
init_io_cache(). This includes the allocation of a buffer. Then a
572
share is allocated and init_io_cache_share() is called with the io
573
cache and the share. Then the io cache is copied for each thread. So
574
every thread has its own copy of IO_CACHE. But the allocated buffer
575
is shared because cache->buffer is the same for all caches.
577
One thread reads data from the file into the buffer. All threads
578
read from the buffer, but every thread maintains its own set of
579
pointers into the buffer. When all threads have used up the buffer
580
contents, one of the threads reads the next block of data into the
581
buffer. To accomplish this, each thread enters the cache lock before
582
accessing the buffer. They wait in lock_io_cache() until all threads
583
joined the lock. The last thread entering the lock is in charge of
584
reading from file to buffer. It wakes all threads when done.
586
Synchronizing a write cache to the read caches works so: Whenever
587
the write buffer needs a flush, the write thread enters the lock and
588
waits for all other threads to enter the lock too. They do this when
589
they have used up the read buffer. When all threads are in the lock,
590
the write thread copies the write buffer to the read buffer and
593
share->running_threads is the number of threads not being in the
594
cache lock. When entering lock_io_cache() the number is decreased.
595
When the thread that fills the buffer enters unlock_io_cache() the
596
number is reset to the number of threads. The condition
597
running_threads == 0 means that all threads are in the lock. Bumping
598
up the number to the full count is non-intuitive. But increasing the
599
number by one for each thread that leaves the lock could lead to a
600
solo run of one thread. The last thread to join a lock reads from
601
file to buffer, wakes the other threads, processes the data in the
602
cache and enters the lock again. If no other thread left the lock
603
meanwhile, it would think it's the last one again and read the next
606
The share has copies of 'error', 'buffer', 'read_end', and
607
'pos_in_file' from the thread that filled the buffer. We may not be
608
able to access this information directly from its cache because the
609
thread may be removed from the share before the variables could be
610
copied by all other threads. Or, if a write buffer is synchronized,
611
it would change its 'pos_in_file' after waking the other threads,
612
possibly before they could copy its value.
614
However, the 'buffer' variable in the share is for a synchronized
615
write cache. It needs to know where to put the data. Otherwise it
616
would need access to the read cache of one of the threads that is
617
not yet removed from the share.
623
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
624
IO_CACHE *write_cache, uint num_threads)
626
DBUG_ENTER("init_io_cache_share");
627
DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
628
"write_cache: 0x%lx threads: %u",
629
(long) read_cache, (long) cshare,
630
(long) write_cache, num_threads));
632
DBUG_ASSERT(num_threads > 1);
633
DBUG_ASSERT(read_cache->type == READ_CACHE);
634
DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
636
pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
637
pthread_cond_init(&cshare->cond, 0);
638
pthread_cond_init(&cshare->cond_writer, 0);
640
cshare->running_threads= num_threads;
641
cshare->total_threads= num_threads;
642
cshare->error= 0; /* Initialize. */
643
cshare->buffer= read_cache->buffer;
644
cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
645
cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
646
cshare->source_cache= write_cache; /* Can be NULL. */
648
read_cache->share= cshare;
649
read_cache->read_function= _my_b_read_r;
650
read_cache->current_pos= NULL;
651
read_cache->current_end= NULL;
654
write_cache->share= cshare;
661
Remove a thread from shared access to IO_CACHE.
665
cache The IO_CACHE to be removed from the share.
669
Every thread must do that on exit for not to deadlock other threads.
671
The last thread destroys the pthread resources.
673
A writer flushes its cache first.
679
void remove_io_thread(IO_CACHE *cache)
681
IO_CACHE_SHARE *cshare= cache->share;
683
DBUG_ENTER("remove_io_thread");
685
/* If the writer goes, it needs to flush the write cache. */
686
if (cache == cshare->source_cache)
687
flush_io_cache(cache);
689
pthread_mutex_lock(&cshare->mutex);
690
DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
691
(cache == cshare->source_cache) ?
692
"writer" : "reader", (long) cache));
694
/* Remove from share. */
695
total= --cshare->total_threads;
696
DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
698
/* Detach from share. */
701
/* If the writer goes, let the readers know. */
702
if (cache == cshare->source_cache)
704
DBUG_PRINT("io_cache_share", ("writer leaves"));
705
cshare->source_cache= NULL;
708
/* If all threads are waiting for me to join the lock, wake them. */
709
if (!--cshare->running_threads)
711
DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
712
pthread_cond_signal(&cshare->cond_writer);
713
pthread_cond_broadcast(&cshare->cond);
716
pthread_mutex_unlock(&cshare->mutex);
720
DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
721
pthread_cond_destroy (&cshare->cond_writer);
722
pthread_cond_destroy (&cshare->cond);
723
pthread_mutex_destroy(&cshare->mutex);
731
Lock IO cache and wait for all other threads to join.
735
cache The cache of the thread entering the lock.
736
pos File position of the block to read.
737
Unused for the write thread.
741
Wait for all threads to finish with the current buffer. We want
742
all threads to proceed in concert. The last thread to join
743
lock_io_cache() will read the block from file and all threads start
744
to use it. Then they will join again for reading the next block.
746
The waiting threads detect a fresh buffer by comparing
747
cshare->pos_in_file with the position they want to process next.
748
Since the first block may start at position 0, we take
749
cshare->read_end as an additional condition. This variable is
750
initialized to NULL and will be set after a block of data is written
754
1 OK, lock in place, go ahead and read.
755
0 OK, unlocked, another thread did the read.
758
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
760
IO_CACHE_SHARE *cshare= cache->share;
761
DBUG_ENTER("lock_io_cache");
763
/* Enter the lock. */
764
pthread_mutex_lock(&cshare->mutex);
765
cshare->running_threads--;
766
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
767
(cache == cshare->source_cache) ?
768
"writer" : "reader", (long) cache, (ulong) pos,
769
cshare->running_threads));
771
if (cshare->source_cache)
773
/* A write cache is synchronized to the read caches. */
775
if (cache == cshare->source_cache)
777
/* The writer waits until all readers are here. */
778
while (cshare->running_threads)
780
DBUG_PRINT("io_cache_share", ("writer waits in lock"));
781
pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
783
DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
785
/* Stay locked. Leave the lock later by unlock_io_cache(). */
789
/* The last thread wakes the writer. */
790
if (!cshare->running_threads)
792
DBUG_PRINT("io_cache_share", ("waking writer"));
793
pthread_cond_signal(&cshare->cond_writer);
797
Readers wait until the data is copied from the writer. Another
798
reason to stop waiting is the removal of the write thread. If this
799
happens, we leave the lock with old data in the buffer.
801
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
802
cshare->source_cache)
804
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
805
pthread_cond_wait(&cshare->cond, &cshare->mutex);
809
If the writer was removed from the share while this thread was
810
asleep, we need to simulate an EOF condition. The writer cannot
811
reset the share variables as they might still be in use by readers
812
of the last block. When we awake here then because the last
813
joining thread signalled us. If the writer is not the last, it
814
will not signal. So it is safe to clear the buffer here.
816
if (!cshare->read_end || (cshare->pos_in_file < pos))
818
DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
819
cshare->read_end= cshare->buffer; /* Empty buffer. */
820
cshare->error= 0; /* EOF is not an error. */
826
There are read caches only. The last thread arriving in
827
lock_io_cache() continues with a locked cache and reads the block.
829
if (!cshare->running_threads)
831
DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
832
/* Stay locked. Leave the lock later by unlock_io_cache(). */
837
All other threads wait until the requested block is read by the
838
last thread arriving. Another reason to stop waiting is the
839
removal of a thread. If this leads to all threads being in the
840
lock, we have to continue also. The first of the awaken threads
841
will then do the read.
843
while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
844
cshare->running_threads)
846
DBUG_PRINT("io_cache_share", ("reader waits in lock"));
847
pthread_cond_wait(&cshare->cond, &cshare->mutex);
850
/* If the block is not yet read, continue with a locked cache and read. */
851
if (!cshare->read_end || (cshare->pos_in_file < pos))
853
DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
854
/* Stay locked. Leave the lock later by unlock_io_cache(). */
858
/* Another thread did read the block already. */
860
DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
861
(uint) (cshare->read_end ? (size_t)
862
(cshare->read_end - cshare->buffer) :
866
Leave the lock. Do not call unlock_io_cache() later. The thread that
867
filled the buffer did this and marked all threads as running.
869
pthread_mutex_unlock(&cshare->mutex);
879
cache The cache of the thread leaving the lock.
882
This is called by the thread that filled the buffer. It marks all
883
threads as running and awakes them. This must not be done by any
886
Do not signal cond_writer. Either there is no writer or the writer
887
is the only one who can call this function.
889
The reason for resetting running_threads to total_threads before
890
waking all other threads is that it could be possible that this
891
thread is so fast with processing the buffer that it enters the lock
892
before even one other thread has left it. If every awoken thread
893
would increase running_threads by one, this thread could think that
894
he is again the last to join and would not wait for the other
895
threads to process the data.
901
static void unlock_io_cache(IO_CACHE *cache)
903
IO_CACHE_SHARE *cshare= cache->share;
904
DBUG_ENTER("unlock_io_cache");
905
DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
906
(cache == cshare->source_cache) ?
908
(long) cache, (ulong) cshare->pos_in_file,
909
cshare->total_threads));
911
cshare->running_threads= cshare->total_threads;
912
pthread_cond_broadcast(&cshare->cond);
913
pthread_mutex_unlock(&cshare->mutex);
919
Read from IO_CACHE when it is shared between several threads.
923
cache IO_CACHE pointer
924
Buffer Buffer to retrieve count bytes from file
925
Count Number of bytes to read into Buffer
928
This function is only called from the my_b_read() macro when there
929
isn't enough characters in the buffer to satisfy the request.
933
It works as follows: when a thread tries to read from a file (that
934
is, after using all the data from the (shared) buffer), it just
935
hangs on lock_io_cache(), waiting for other threads. When the very
936
last thread attempts a read, lock_io_cache() returns 1, the thread
937
does actual IO and unlock_io_cache(), which signals all the waiting
938
threads that data is in the buffer.
942
When changing this function, be careful with handling file offsets
943
(end-of_file, pos_in_file). Do not cast them to possibly smaller
944
types than my_off_t unless you can be sure that their value fits.
945
Same applies to differences of file offsets. (Bug #11527)
947
When changing this function, check _my_b_read(). It might need the
951
0 we succeeded in reading all data
952
1 Error: can't read requested characters
955
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
957
my_off_t pos_in_file;
958
size_t length, diff_length, left_length;
959
IO_CACHE_SHARE *cshare= cache->share;
960
DBUG_ENTER("_my_b_read_r");
962
if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
964
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
965
memcpy(Buffer, cache->read_pos, left_length);
966
Buffer+= left_length;
973
pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
974
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
975
length=IO_ROUND_UP(Count+diff_length)-diff_length;
976
length= ((length <= cache->read_length) ?
977
length + IO_ROUND_DN(cache->read_length - length) :
978
length - IO_ROUND_UP(length - cache->read_length));
979
if (cache->type != READ_FIFO &&
980
(length > (cache->end_of_file - pos_in_file)))
981
length= (size_t) (cache->end_of_file - pos_in_file);
984
cache->error= (int) left_length;
987
if (lock_io_cache(cache, pos_in_file))
989
/* With a synchronized write/read cache we won't come here... */
990
DBUG_ASSERT(!cshare->source_cache);
992
... unless the writer has gone before this thread entered the
993
lock. Simulate EOF in this case. It can be distinguished by
1001
Whenever a function which operates on IO_CACHE flushes/writes
1002
some part of the IO_CACHE to disk it will set the property
1003
"seek_not_done" to indicate this to other functions operating
1006
if (cache->seek_not_done)
1008
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1009
== MY_FILEPOS_ERROR)
1012
unlock_io_cache(cache);
1016
len= my_read(cache->file, cache->buffer, length, cache->myflags);
1018
DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1020
cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1021
cache->error= (len == length ? 0 : (int) len);
1022
cache->pos_in_file= pos_in_file;
1024
/* Copy important values to the share. */
1025
cshare->error= cache->error;
1026
cshare->read_end= cache->read_end;
1027
cshare->pos_in_file= pos_in_file;
1029
/* Mark all threads as running and wake them. */
1030
unlock_io_cache(cache);
1035
With a synchronized write/read cache readers always come here.
1036
Copy important values from the share.
1038
cache->error= cshare->error;
1039
cache->read_end= cshare->read_end;
1040
cache->pos_in_file= cshare->pos_in_file;
1042
len= ((cache->error == -1) ? (size_t) -1 :
1043
(size_t) (cache->read_end - cache->buffer));
1045
cache->read_pos= cache->buffer;
1046
cache->seek_not_done= 0;
1047
if (len == 0 || len == (size_t) -1)
1049
DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1050
(ulong) len, (ulong) left_length));
1051
cache->error= (int) left_length;
1054
cnt= (len > Count) ? Count : len;
1055
memcpy(Buffer, cache->read_pos, cnt);
1059
cache->read_pos+= cnt;
1066
Copy data from write cache to read cache.
1069
copy_to_read_buffer()
1070
write_cache The write cache.
1071
write_buffer The source of data, mostly the cache buffer.
1072
write_length The number of bytes to copy.
1075
The write thread will wait for all read threads to join the cache
1076
lock. Then it copies the data over and wakes the read threads.
1082
static void copy_to_read_buffer(IO_CACHE *write_cache,
1083
const uchar *write_buffer, size_t write_length)
1085
IO_CACHE_SHARE *cshare= write_cache->share;
1087
DBUG_ASSERT(cshare->source_cache == write_cache);
1089
write_length is usually less or equal to buffer_length.
1090
It can be bigger if _my_b_write() is called with a big length.
1092
while (write_length)
1094
size_t copy_length= min(write_length, write_cache->buffer_length);
1095
int __attribute__((unused)) rc;
1097
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1098
/* The writing thread does always have the lock when it awakes. */
1101
memcpy(cshare->buffer, write_buffer, copy_length);
1104
cshare->read_end= cshare->buffer + copy_length;
1105
cshare->pos_in_file= write_cache->pos_in_file;
1107
/* Mark all threads as running and wake them. */
1108
unlock_io_cache(write_cache);
1110
write_buffer+= copy_length;
1111
write_length-= copy_length;
1118
Do sequential read from the SEQ_READ_APPEND cache.
1120
We do this in three stages:
1121
- first read from info->buffer
1122
- then if there are still data to read, try the file descriptor
1123
- afterwards, if there are still data to read, try append buffer
1130
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1132
size_t length, diff_length, left_length, save_count, max_length;
1133
my_off_t pos_in_file;
1136
/* first, read the regular buffer */
1137
if ((left_length=(size_t) (info->read_end-info->read_pos)))
1139
DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
1140
memcpy(Buffer,info->read_pos, left_length);
1141
Buffer+=left_length;
1144
lock_append_buffer(info);
1146
/* pos_in_file always point on where info->buffer was read */
1147
if ((pos_in_file=info->pos_in_file +
1148
(size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1149
goto read_append_buffer;
1152
With read-append cache we must always do a seek before we read,
1153
because the write could have moved the file pointer astray
1155
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1158
unlock_append_buffer(info);
1161
info->seek_not_done=0;
1163
diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1165
/* now the second stage begins - read from file descriptor */
1166
if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1168
/* Fill first intern buffer */
1171
length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1172
if ((read_length= my_read(info->file,Buffer, length,
1173
info->myflags)) == (size_t) -1)
1176
unlock_append_buffer(info);
1180
Buffer+=read_length;
1181
pos_in_file+=read_length;
1183
if (read_length != length)
1186
We only got part of data; Read the rest of the data from the
1189
goto read_append_buffer;
1191
left_length+=length;
1195
max_length= info->read_length-diff_length;
1196
if (max_length > (info->end_of_file - pos_in_file))
1197
max_length= (size_t) (info->end_of_file - pos_in_file);
1201
goto read_append_buffer;
1202
length=0; /* Didn't read any more chars */
1206
length= my_read(info->file,info->buffer, max_length, info->myflags);
1207
if (length == (size_t) -1)
1210
unlock_append_buffer(info);
1215
memcpy(Buffer, info->buffer, length);
1220
added the line below to make
1221
DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1222
otherwise this does not appear to be needed
1224
pos_in_file += length;
1225
goto read_append_buffer;
1228
unlock_append_buffer(info);
1229
info->read_pos=info->buffer+Count;
1230
info->read_end=info->buffer+length;
1231
info->pos_in_file=pos_in_file;
1232
memcpy(Buffer,info->buffer,(size_t) Count);
1238
Read data from the current write buffer.
1239
Count should never be == 0 here (The code will work even if count is 0)
1243
/* First copy the data to Count */
1244
size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1246
size_t transfer_len;
1248
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1250
TODO: figure out if the assert below is needed or correct.
1252
DBUG_ASSERT(pos_in_file == info->end_of_file);
1253
copy_len=min(Count, len_in_buff);
1254
memcpy(Buffer, info->append_read_pos, copy_len);
1255
info->append_read_pos += copy_len;
1258
info->error = save_count - Count;
1260
/* Fill read buffer with data from write buffer */
1261
memcpy(info->buffer, info->append_read_pos,
1262
(size_t) (transfer_len=len_in_buff - copy_len));
1263
info->read_pos= info->buffer;
1264
info->read_end= info->buffer+transfer_len;
1265
info->append_read_pos=info->write_pos;
1266
info->pos_in_file=pos_in_file+copy_len;
1267
info->end_of_file+=len_in_buff;
1269
unlock_append_buffer(info);
1270
return Count ? 1 : 0;
1277
Read from the IO_CACHE into a buffer and feed asynchronously
1278
from disk when needed.
1282
info IO_CACHE pointer
1283
Buffer Buffer to retrieve count bytes from file
1284
Count Number of bytes to read into Buffer
1287
-1 An error has occurred; my_errno is set.
1289
1 An error has occurred; IO_CACHE to error state.
1292
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1294
size_t length,read_length,diff_length,left_length,use_length,org_Count;
1296
my_off_t next_pos_in_file;
1299
memcpy(Buffer,info->read_pos,
1300
(left_length= (size_t) (info->read_end-info->read_pos)));
1301
Buffer+=left_length;
1306
{ /* wait for read block */
1307
info->inited=0; /* No more block to read */
1308
my_aiowait(&info->aio_result); /* Wait for outstanding req */
1309
if (info->aio_result.result.aio_errno)
1311
if (info->myflags & MY_WME)
1312
my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1313
my_filename(info->file),
1314
info->aio_result.result.aio_errno);
1315
my_errno=info->aio_result.result.aio_errno;
1319
if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1320
read_length == (size_t) -1)
1322
my_errno=0; /* For testing */
1323
info->error= (read_length == (size_t) -1 ? -1 :
1324
(int) (read_length+left_length));
1327
info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1329
if (info->request_pos != info->buffer)
1330
info->request_pos=info->buffer;
1332
info->request_pos=info->buffer+info->read_length;
1333
info->read_pos=info->request_pos;
1334
next_pos_in_file=info->aio_read_pos+read_length;
1336
/* Check if pos_in_file is changed
1337
(_ni_read_cache may have skipped some bytes) */
1339
if (info->aio_read_pos < info->pos_in_file)
1340
{ /* Fix if skipped bytes */
1341
if (info->aio_read_pos + read_length < info->pos_in_file)
1343
read_length=0; /* Skip block */
1344
next_pos_in_file=info->pos_in_file;
1348
my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1349
info->pos_in_file=info->aio_read_pos; /* Whe are here */
1350
info->read_pos=info->request_pos+offset;
1351
read_length-=offset; /* Bytes left from read_pos */
1355
if (info->aio_read_pos > info->pos_in_file)
1358
return(info->read_length= (size_t) -1);
1361
/* Copy found bytes to buffer */
1362
length=min(Count,read_length);
1363
memcpy(Buffer,info->read_pos,(size_t) length);
1366
left_length+=length;
1367
info->read_end=info->rc_pos+read_length;
1368
info->read_pos+=length;
1371
next_pos_in_file=(info->pos_in_file+ (size_t)
1372
(info->read_end - info->request_pos));
1374
/* If reading large blocks, or first read or read with skip */
1377
if (next_pos_in_file == info->end_of_file)
1379
info->error=(int) (read_length+left_length);
1383
if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1384
== MY_FILEPOS_ERROR)
1390
read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1391
if (Count < read_length)
1392
{ /* Small block, read to cache */
1393
if ((read_length=my_read(info->file,info->request_pos,
1394
read_length, info->myflags)) == (size_t) -1)
1395
return info->error= -1;
1396
use_length=min(Count,read_length);
1397
memcpy(Buffer,info->request_pos,(size_t) use_length);
1398
info->read_pos=info->request_pos+Count;
1399
info->read_end=info->request_pos+read_length;
1400
info->pos_in_file=next_pos_in_file; /* Start of block in cache */
1401
next_pos_in_file+=read_length;
1403
if (Count != use_length)
1404
{ /* Didn't find hole block */
1405
if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1406
my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1407
my_filename(info->file),my_errno);
1408
info->error=(int) (read_length+left_length);
1413
{ /* Big block, don't cache it */
1414
if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
1417
info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1420
info->read_pos=info->read_end=info->request_pos;
1421
info->pos_in_file=(next_pos_in_file+=Count);
1425
/* Read next block with asyncronic io */
1426
diff_length=(next_pos_in_file & (IO_SIZE-1));
1427
max_length= info->read_length - diff_length;
1428
if (max_length > info->end_of_file - next_pos_in_file)
1429
max_length= (size_t) (info->end_of_file - next_pos_in_file);
1431
if (info->request_pos != info->buffer)
1432
read_buffer=info->buffer;
1434
read_buffer=info->buffer+info->read_length;
1435
info->aio_read_pos=next_pos_in_file;
1438
info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1439
DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1440
(ulong) next_pos_in_file, (ulong) max_length));
1441
if (aioread(info->file,read_buffer, max_length,
1442
(my_off_t) next_pos_in_file,MY_SEEK_SET,
1443
&info->aio_result.result))
1444
{ /* Skip async io */
1446
DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1447
errno, info->aio_result.result.aio_errno));
1448
if (info->request_pos != info->buffer)
1450
bmove(info->buffer,info->request_pos,
1451
(size_t) (info->read_end - info->read_pos));
1452
info->request_pos=info->buffer;
1453
info->read_pos-=info->read_length;
1454
info->read_end-=info->read_length;
1456
info->read_length=info->buffer_length; /* Use hole buffer */
1457
info->read_function=_my_b_read; /* Use normal IO_READ next */
1460
info->inited=info->aio_result.pending=1;
1462
return 0; /* Block read, async in use */
1463
} /* _my_b_async_read */
1467
/* Read one byte when buffer is empty */
1469
int _my_b_get(IO_CACHE *info)
1472
IO_CACHE_CALLBACK pre_read,post_read;
1473
if ((pre_read = info->pre_read))
1475
if ((*(info)->read_function)(info,&buff,1))
1477
if ((post_read = info->post_read))
1479
return (int) (uchar) buff;
1483
Write a byte buffer to IO_CACHE and flush to disk
1484
if IO_CACHE is full.
1489
-1 On error; my_errno contains error code.
1492
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1494
size_t rest_length,length;
1496
if (info->pos_in_file+info->buffer_length > info->end_of_file)
1498
my_errno=errno=EFBIG;
1499
return info->error = -1;
1502
rest_length= (size_t) (info->write_end - info->write_pos);
1503
memcpy(info->write_pos,Buffer,(size_t) rest_length);
1504
Buffer+=rest_length;
1506
info->write_pos+=rest_length;
1508
if (my_b_flush_io_cache(info,1))
1510
if (Count >= IO_SIZE)
1511
{ /* Fill first intern buffer */
1512
length=Count & (size_t) ~(IO_SIZE-1);
1513
if (info->seek_not_done)
1516
Whenever a function which operates on IO_CACHE flushes/writes
1517
some part of the IO_CACHE to disk it will set the property
1518
"seek_not_done" to indicate this to other functions operating
1521
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1526
info->seek_not_done=0;
1528
if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1529
return info->error= -1;
1533
In case of a shared I/O cache with a writer we normally do direct
1534
write cache to read cache copy. Simulate this here by direct
1535
caller buffer to read cache copy. Do it after the write so that
1536
the cache readers actions on the flushed part can go in parallel
1537
with the write of the extra stuff. copy_to_read_buffer()
1538
synchronizes writer and readers so that after this call the
1539
readers can act on the extra stuff while the writer can go ahead
1540
and prepare the next output. copy_to_read_buffer() relies on
1544
copy_to_read_buffer(info, Buffer, length);
1549
info->pos_in_file+=length;
1551
memcpy(info->write_pos,Buffer,(size_t) Count);
1552
info->write_pos+=Count;
1558
Append a block to the write buffer.
1559
This is done with the buffer locked to ensure that we don't read from
1560
the write buffer before we are ready with it.
1563
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1565
size_t rest_length,length;
1569
Assert that we cannot come here with a shared cache. If we do one
1570
day, we might need to add a call to copy_to_read_buffer().
1572
DBUG_ASSERT(!info->share);
1575
lock_append_buffer(info);
1576
rest_length= (size_t) (info->write_end - info->write_pos);
1577
if (Count <= rest_length)
1579
memcpy(info->write_pos, Buffer, rest_length);
1580
Buffer+=rest_length;
1582
info->write_pos+=rest_length;
1583
if (my_b_flush_io_cache(info,0))
1585
unlock_append_buffer(info);
1588
if (Count >= IO_SIZE)
1589
{ /* Fill first intern buffer */
1590
length=Count & (size_t) ~(IO_SIZE-1);
1591
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1593
unlock_append_buffer(info);
1594
return info->error= -1;
1598
info->end_of_file+=length;
1602
memcpy(info->write_pos,Buffer,(size_t) Count);
1603
info->write_pos+=Count;
1604
unlock_append_buffer(info);
1609
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1612
Sasha: We are not writing this with the ? operator to avoid hitting
1613
a possible compiler bug. At least gcc 2.95 cannot deal with
1614
several layers of ternary operators that evaluated comma(,) operator
1615
expressions inside - I do have a test case if somebody wants it
1617
if (info->type == SEQ_READ_APPEND)
1618
return my_b_append(info, Buffer, Count);
1619
return my_b_write(info, Buffer, Count);
1624
Write a block to disk where part of the data may be inside the record
1625
buffer. As all write calls to the data goes through the cache,
1626
we will never get a seek over the end of the buffer
1629
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1637
Assert that we cannot come here with a shared cache. If we do one
1638
day, we might need to add a call to copy_to_read_buffer().
1640
DBUG_ASSERT(!info->share);
1643
if (pos < info->pos_in_file)
1645
/* Of no overlap, write everything without buffering */
1646
if (pos + Count <= info->pos_in_file)
1647
return my_pwrite(info->file, Buffer, Count, pos,
1648
info->myflags | MY_NABP);
1649
/* Write the part of the block that is before buffer */
1650
length= (uint) (info->pos_in_file - pos);
1651
if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1652
info->error= error= -1;
1657
info->seek_not_done=1;
1661
/* Check if we want to write inside the used part of the buffer.*/
1662
length= (size_t) (info->write_end - info->buffer);
1663
if (pos < info->pos_in_file + length)
1665
size_t offset= (size_t) (pos - info->pos_in_file);
1669
memcpy(info->buffer+offset, Buffer, length);
1672
/* Fix length of buffer if the new data was larger */
1673
if (info->buffer+length > info->write_pos)
1674
info->write_pos=info->buffer+length;
1678
/* Write at the end of the current buffer; This is the normal case */
1679
if (_my_b_write(info, Buffer, Count))
1685
/* Flush write cache */
1688
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1689
lock_append_buffer(info);
1690
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1691
unlock_append_buffer(info);
1693
#define LOCK_APPEND_BUFFER
1694
#define UNLOCK_APPEND_BUFFER
1698
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1701
my_bool append_cache;
1702
my_off_t pos_in_file;
1703
DBUG_ENTER("my_b_flush_io_cache");
1704
DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1706
if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1707
need_append_buffer_lock=0;
1709
if (info->type == WRITE_CACHE || append_cache)
1711
if (info->file == -1)
1713
if (real_open_cached_file(info))
1714
DBUG_RETURN((info->error= -1));
1718
if ((length=(size_t) (info->write_pos - info->write_buffer)))
1722
In case of a shared I/O cache with a writer we do direct write
1723
cache to read cache copy. Do it before the write here so that
1724
the readers can work in parallel with the write.
1725
copy_to_read_buffer() relies on info->pos_in_file.
1728
copy_to_read_buffer(info, info->write_buffer, length);
1731
pos_in_file=info->pos_in_file;
1733
If we have append cache, we always open the file with
1734
O_APPEND which moves the pos to EOF automatically on every write
1736
if (!append_cache && info->seek_not_done)
1737
{ /* File touched, do seek */
1738
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1741
UNLOCK_APPEND_BUFFER;
1742
DBUG_RETURN((info->error= -1));
1745
info->seek_not_done=0;
1748
info->pos_in_file+=length;
1749
info->write_end= (info->write_buffer+info->buffer_length-
1750
((pos_in_file+length) & (IO_SIZE-1)));
1752
if (my_write(info->file,info->write_buffer,length,
1753
info->myflags | MY_NABP))
1759
set_if_bigger(info->end_of_file,(pos_in_file+length));
1763
info->end_of_file+=(info->write_pos-info->append_read_pos);
1764
DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
1767
info->append_read_pos=info->write_pos=info->write_buffer;
1768
++info->disk_writes;
1769
UNLOCK_APPEND_BUFFER;
1770
DBUG_RETURN(info->error);
1774
else if (info->type != READ_NET)
1776
my_aiowait(&info->aio_result); /* Wait for outstanding req */
1780
UNLOCK_APPEND_BUFFER;
1785
Free an IO_CACHE object
1789
info IO_CACHE Handle to free
1792
It's currently safe to call this if one has called init_io_cache()
1793
on the 'info' object, even if init_io_cache() failed.
1794
This function is also safe to call twice with the same handle.
1801
int end_io_cache(IO_CACHE *info)
1804
IO_CACHE_CALLBACK pre_close;
1805
DBUG_ENTER("end_io_cache");
1806
DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1810
Every thread must call remove_io_thread(). The last one destroys
1813
DBUG_ASSERT(!info->share || !info->share->total_threads);
1816
if ((pre_close=info->pre_close))
1821
if (info->alloced_buffer)
1823
info->alloced_buffer=0;
1824
if (info->file != -1) /* File doesn't exist */
1825
error= my_b_flush_io_cache(info,1);
1826
my_free((uchar*) info->buffer,MYF(MY_WME));
1827
info->buffer=info->read_pos=(uchar*) 0;
1829
if (info->type == SEQ_READ_APPEND)
1831
/* Destroy allocated mutex */
1832
info->type= TYPE_NOT_SET;
1834
pthread_mutex_destroy(&info->append_buffer_lock);
1838
} /* end_io_cache */
1841
/**********************************************************************
1842
Testing of MF_IOCACHE
1843
**********************************************************************/
1849
void die(const char* fmt, ...)
1852
va_start(va_args,fmt);
1853
fprintf(stderr,"Error:");
1854
vfprintf(stderr, fmt,va_args);
1855
fprintf(stderr,", errno=%d\n", errno);
1859
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1862
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1863
die("Could not open %s", fname);
1864
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1865
die("failed in init_io_cache()");
1869
void close_file(IO_CACHE* info)
1872
my_close(info->file, MYF(MY_WME));
1875
int main(int argc, char** argv)
1877
IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1879
const char* fname="/tmp/iocache.test";
1880
int cache_size=16384;
1882
int max_block,total_bytes=0;
1883
int i,num_loops=100,error=0;
1885
char* block, *block_end;
1887
max_block = cache_size*3;
1888
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1889
die("Not enough memory to allocate test block");
1890
block_end = block + max_block;
1891
for (p = block,i=0; p < block_end;i++)
1895
if (my_stat(fname,&status, MYF(0)) &&
1896
my_delete(fname,MYF(MY_WME)))
1898
die("Delete of %s failed, aborting", fname);
1900
open_file(fname,&sra_cache, cache_size);
1901
for (i = 0; i < num_loops; i++)
1904
int block_size = abs(rand() % max_block);
1905
int4store(buf, block_size);
1906
if (my_b_append(&sra_cache,buf,4) ||
1907
my_b_append(&sra_cache, block, block_size))
1908
die("write failed");
1909
total_bytes += 4+block_size;
1911
close_file(&sra_cache);
1912
my_free(block,MYF(MY_WME));
1913
if (!my_stat(fname,&status,MYF(MY_WME)))
1914
die("%s failed to stat, but I had just closed it,\
1915
wonder how that happened");
1916
printf("Final size of %s is %s, wrote %d bytes\n",fname,
1917
llstr(status.st_size,llstr_buf),
1919
my_delete(fname, MYF(MY_WME));
1920
/* check correctness of tests */
1921
if (total_bytes != status.st_size)
1923
fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1924
supposedly written\n");