~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2000 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Cashing of files with only does (sequential) read or writes of fixed-
18
  length records. A read isn't allowed to go over file-length. A read is ok
19
  if it ends at file-length and next read can try to read after file-length
20
  (and get a EOF-error).
21
  Possibly use of asyncronic io.
22
  macros for read and writes for faster io.
23
  Used instead of FILE when reading or writing whole files.
24
  This code makes mf_rec_cache obsolete (currently only used by ISAM)
25
  One can change info->pos_in_file to a higher value to skip bytes in file if
26
  also info->read_pos is set to info->read_end.
27
  If called through open_cached_file(), then the temporary file will
28
  only be created if a write exeeds the file buffer or if one calls
29
  my_b_flush_io_cache().
30
31
  If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32
  reading and another for writing.  Reads are first done from disk and
33
  then done from the write buffer.  This is an efficient way to read
34
  from a log file when one is writing to it at the same time.
35
  For this to work, the file has to be opened in append mode!
36
  Note that when one uses SEQ_READ_APPEND, one MUST write using
37
  my_b_append !  This is needed because we need to lock the mutex
38
  every time we access the write buffer.
39
40
TODO:
41
  When one SEQ_READ_APPEND and we are reading and writing at the same time,
42
  each time the write buffer gets full and it's written to disk, we will
43
  always do a disk read to read a part of the buffer from disk to the
44
  read buffer.
45
  This should be fixed so that when we do a my_b_flush_io_cache() and
46
  we have been reading the write buffer, we should transfer the rest of the
47
  write buffer to the read buffer before we start to reuse it.
48
*/
49
50
#define MAP_TO_USE_RAID
51
#include "mysys_priv.h"
52
#include <m_string.h>
53
#ifdef HAVE_AIOWAIT
54
#include "mysys_err.h"
55
static void my_aiowait(my_aio_result *result);
56
#endif
57
#include <errno.h>
58
59
#define lock_append_buffer(info) \
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
61
#define unlock_append_buffer(info) \
62
 pthread_mutex_unlock(&(info)->append_buffer_lock)
63
64
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
65
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
66
67
/*
68
  Setup internal pointers inside IO_CACHE
69
70
  SYNOPSIS
71
    setup_io_cache()
72
    info		IO_CACHE handler
73
74
  NOTES
75
    This is called on automaticly on init or reinit of IO_CACHE
76
    It must be called externally if one moves or copies an IO_CACHE
77
    object.
78
*/
79
80
void setup_io_cache(IO_CACHE* info)
81
{
82
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
83
  if (info->type == WRITE_CACHE)
84
  {
85
    info->current_pos= &info->write_pos;
86
    info->current_end= &info->write_end;
87
  }
88
  else
89
  {
90
    info->current_pos= &info->read_pos;
91
    info->current_end= &info->read_end;
92
  }
93
}
94
95
96
static void
97
init_functions(IO_CACHE* info)
98
{
99
  enum cache_type type= info->type;
100
  switch (type) {
101
  case READ_NET:
102
    /*
103
      Must be initialized by the caller. The problem is that
104
      _my_b_net_read has to be defined in sql directory because of
105
      the dependency on THD, and therefore cannot be visible to
106
      programs that link against mysys but know nothing about THD, such
107
      as myisamchk
108
    */
109
    break;
110
  case SEQ_READ_APPEND:
111
    info->read_function = _my_b_seq_read;
112
    info->write_function = 0;			/* Force a core if used */
113
    break;
114
  default:
115
    info->read_function =
116
                          info->share ? _my_b_read_r :
117
                                        _my_b_read;
118
    info->write_function = _my_b_write;
119
  }
120
121
  setup_io_cache(info);
122
}
123
124
125
/*
126
  Initialize an IO_CACHE object
127
128
  SYNOPSOS
129
    init_io_cache()
130
    info		cache handler to initialize
131
    file		File that should be associated to to the handler
132
			If == -1 then real_open_cached_file()
133
			will be called when it's time to open file.
134
    cachesize		Size of buffer to allocate for read/write
135
			If == 0 then use my_default_record_cache_size
136
    type		Type of cache
137
    seek_offset		Where cache should start reading/writing
138
    use_async_io	Set to 1 of we should use async_io (if avaiable)
139
    cache_myflags	Bitmap of differnt flags
140
			MY_WME | MY_FAE | MY_NABP | MY_FNABP |
141
			MY_DONT_CHECK_FILESIZE
142
143
  RETURN
144
    0  ok
145
    #  error
146
*/
147
148
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
149
		  enum cache_type type, my_off_t seek_offset,
150
		  pbool use_async_io, myf cache_myflags)
151
{
152
  size_t min_cache;
153
  my_off_t pos;
154
  my_off_t end_of_file= ~(my_off_t) 0;
155
  DBUG_ENTER("init_io_cache");
156
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
157
		      (ulong) info, (int) type, (ulong) seek_offset));
158
159
  info->file= file;
160
  info->type= TYPE_NOT_SET;	    /* Don't set it until mutex are created */
161
  info->pos_in_file= seek_offset;
162
  info->pre_close = info->pre_read = info->post_read = 0;
163
  info->arg = 0;
164
  info->alloced_buffer = 0;
165
  info->buffer=0;
166
  info->seek_not_done= 0;
167
168
  if (file >= 0)
169
  {
170
    pos= my_tell(file, MYF(0));
171
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
172
    {
173
      /*
174
         This kind of object doesn't support seek() or tell(). Don't set a
175
         flag that will make us again try to seek() later and fail.
176
      */
177
      info->seek_not_done= 0;
178
      /*
179
        Additionally, if we're supposed to start somewhere other than the
180
        the beginning of whatever this file is, then somebody made a bad
181
        assumption.
182
      */
183
      DBUG_ASSERT(seek_offset == 0);
184
    }
185
    else
186
      info->seek_not_done= test(seek_offset != pos);
187
  }
188
189
  info->disk_writes= 0;
190
  info->share=0;
191
192
  if (!cachesize && !(cachesize= my_default_record_cache_size))
193
    DBUG_RETURN(1);				/* No cache requested */
194
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
195
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
196
  {						/* Assume file isn't growing */
197
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
198
    {
199
      /* Calculate end of file to avoid allocating oversized buffers */
200
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
201
      /* Need to reset seek_not_done now that we just did a seek. */
202
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
203
      if (end_of_file < seek_offset)
204
	end_of_file=seek_offset;
205
      /* Trim cache size if the file is very small */
206
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
207
      {
208
	cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
209
	use_async_io=0;				/* No need to use async */
210
      }
211
    }
212
  }
213
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
214
  if (type != READ_NET && type != WRITE_NET)
215
  {
216
    /* Retry allocating memory in smaller blocks until we get one */
217
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
218
    for (;;)
219
    {
220
      size_t buffer_block;
221
      if (cachesize < min_cache)
222
	cachesize = min_cache;
223
      buffer_block= cachesize;
224
      if (type == SEQ_READ_APPEND)
225
	buffer_block *= 2;
226
      if ((info->buffer=
227
	   (uchar*) my_malloc(buffer_block,
228
			     MYF((cache_myflags & ~ MY_WME) |
229
				 (cachesize == min_cache ? MY_WME : 0)))) != 0)
230
      {
231
	info->write_buffer=info->buffer;
232
	if (type == SEQ_READ_APPEND)
233
	  info->write_buffer = info->buffer + cachesize;
234
	info->alloced_buffer=1;
235
	break;					/* Enough memory found */
236
      }
237
      if (cachesize == min_cache)
238
	DBUG_RETURN(2);				/* Can't alloc cache */
239
      /* Try with less memory */
240
      cachesize= (cachesize*3/4 & ~(min_cache-1));
241
    }
242
  }
243
244
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
245
  info->read_length=info->buffer_length=cachesize;
246
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
247
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
248
  if (type == SEQ_READ_APPEND)
249
  {
250
    info->append_read_pos = info->write_pos = info->write_buffer;
251
    info->write_end = info->write_buffer + info->buffer_length;
252
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
253
  }
28.1.35 by Monty Taylor
Removed all references to THREAD.
254
#if defined(SAFE_MUTEX)
1 by brian
clean slate
255
  else
256
  {
257
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
258
    bzero((char*) &info->append_buffer_lock, sizeof(info));
259
  }
260
#endif
261
262
  if (type == WRITE_CACHE)
263
    info->write_end=
264
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
265
  else
266
    info->read_end=info->buffer;		/* Nothing in cache */
267
268
  /* End_of_file may be changed by user later */
269
  info->end_of_file= end_of_file;
270
  info->error=0;
271
  info->type= type;
272
  init_functions(info);
273
#ifdef HAVE_AIOWAIT
274
  if (use_async_io && ! my_disable_async_io)
275
  {
276
    DBUG_PRINT("info",("Using async io"));
277
    info->read_length/=2;
278
    info->read_function=_my_b_async_read;
279
  }
280
  info->inited=info->aio_result.pending=0;
281
#endif
282
  DBUG_RETURN(0);
283
}						/* init_io_cache */
284
285
	/* Wait until current request is ready */
286
287
#ifdef HAVE_AIOWAIT
288
static void my_aiowait(my_aio_result *result)
289
{
290
  if (result->pending)
291
  {
292
    struct aio_result_t *tmp;
293
    for (;;)
294
    {
295
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
296
      {
297
	if (errno == EINTR)
298
	  continue;
299
	DBUG_PRINT("error",("No aio request, error: %d",errno));
300
	result->pending=0;			/* Assume everythings is ok */
301
	break;
302
      }
303
      ((my_aio_result*) tmp)->pending=0;
304
      if ((my_aio_result*) tmp == result)
305
	break;
306
    }
307
  }
308
  return;
309
}
310
#endif
311
312
313
/*
314
  Use this to reset cache to re-start reading or to change the type
315
  between READ_CACHE <-> WRITE_CACHE
316
  If we are doing a reinit of a cache where we have the start of the file
317
  in the cache, we are reusing this memory without flushing it to disk.
318
*/
319
320
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
321
			my_off_t seek_offset,
322
			pbool use_async_io __attribute__((unused)),
323
			pbool clear_cache)
324
{
325
  DBUG_ENTER("reinit_io_cache");
326
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
327
		      (ulong) info, type, (ulong) seek_offset,
328
		      (int) clear_cache));
329
330
  /* One can't do reinit with the following types */
331
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
332
	      type != WRITE_NET && info->type != WRITE_NET &&
333
	      type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
334
335
  /* If the whole file is in memory, avoid flushing to disk */
336
  if (! clear_cache &&
337
      seek_offset >= info->pos_in_file &&
338
      seek_offset <= my_b_tell(info))
339
  {
340
    /* Reuse current buffer without flushing it to disk */
341
    uchar *pos;
342
    if (info->type == WRITE_CACHE && type == READ_CACHE)
343
    {
344
      info->read_end=info->write_pos;
345
      info->end_of_file=my_b_tell(info);
346
      /*
347
        Trigger a new seek only if we have a valid
348
        file handle.
349
      */
350
      info->seek_not_done= (info->file != -1);
351
    }
352
    else if (type == WRITE_CACHE)
353
    {
354
      if (info->type == READ_CACHE)
355
      {
356
	info->write_end=info->write_buffer+info->buffer_length;
357
	info->seek_not_done=1;
358
      }
359
      info->end_of_file = ~(my_off_t) 0;
360
    }
361
    pos=info->request_pos+(seek_offset-info->pos_in_file);
362
    if (type == WRITE_CACHE)
363
      info->write_pos=pos;
364
    else
365
      info->read_pos= pos;
366
#ifdef HAVE_AIOWAIT
367
    my_aiowait(&info->aio_result);		/* Wait for outstanding req */
368
#endif
369
  }
370
  else
371
  {
372
    /*
373
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
374
      after the current positions should be ignored
375
    */
376
    if (info->type == WRITE_CACHE && type == READ_CACHE)
377
      info->end_of_file=my_b_tell(info);
378
    /* flush cache if we want to reuse it */
379
    if (!clear_cache && my_b_flush_io_cache(info,1))
380
      DBUG_RETURN(1);
381
    info->pos_in_file=seek_offset;
382
    /* Better to do always do a seek */
383
    info->seek_not_done=1;
384
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
385
    if (type == READ_CACHE)
386
    {
387
      info->read_end=info->buffer;		/* Nothing in cache */
388
    }
389
    else
390
    {
391
      info->write_end=(info->buffer + info->buffer_length -
392
		       (seek_offset & (IO_SIZE-1)));
393
      info->end_of_file= ~(my_off_t) 0;
394
    }
395
  }
396
  info->type=type;
397
  info->error=0;
398
  init_functions(info);
399
400
#ifdef HAVE_AIOWAIT
401
  if (use_async_io && ! my_disable_async_io &&
402
      ((ulong) info->buffer_length <
403
       (ulong) (info->end_of_file - seek_offset)))
404
  {
405
    info->read_length=info->buffer_length/2;
406
    info->read_function=_my_b_async_read;
407
  }
408
  info->inited=0;
409
#endif
410
  DBUG_RETURN(0);
411
} /* reinit_io_cache */
412
413
414
415
/*
416
  Read buffered.
417
418
  SYNOPSIS
419
    _my_b_read()
420
      info                      IO_CACHE pointer
421
      Buffer                    Buffer to retrieve count bytes from file
422
      Count                     Number of bytes to read into Buffer
423
424
  NOTE
425
    This function is only called from the my_b_read() macro when there
426
    isn't enough characters in the buffer to satisfy the request.
427
428
  WARNING
429
430
    When changing this function, be careful with handling file offsets
431
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
432
    types than my_off_t unless you can be sure that their value fits.
433
    Same applies to differences of file offsets.
434
435
    When changing this function, check _my_b_read_r(). It might need the
436
    same change.
437
438
  RETURN
439
    0      we succeeded in reading all data
440
    1      Error: can't read requested characters
441
*/
442
443
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
444
{
445
  size_t length,diff_length,left_length, max_length;
446
  my_off_t pos_in_file;
447
  DBUG_ENTER("_my_b_read");
448
449
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
  {
451
    DBUG_ASSERT(Count >= left_length);	/* User is not using my_b_read() */
452
    memcpy(Buffer,info->read_pos, left_length);
453
    Buffer+=left_length;
454
    Count-=left_length;
455
  }
456
457
  /* pos_in_file always point on where info->buffer was read */
458
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
460
  /* 
461
    Whenever a function which operates on IO_CACHE flushes/writes
462
    some part of the IO_CACHE to disk it will set the property
463
    "seek_not_done" to indicate this to other functions operating
464
    on the IO_CACHE.
465
  */
466
  if (info->seek_not_done)
467
  {
468
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
469
        != MY_FILEPOS_ERROR))
470
    {
471
      /* No error, reset seek_not_done flag. */
472
      info->seek_not_done= 0;
473
    }
474
    else
475
    {
476
      /*
477
        If the seek failed and the error number is ESPIPE, it is because
478
        info->file is a pipe or socket or FIFO.  We never should have tried
479
        to seek on that.  See Bugs#25807 and #22828 for more info.
480
      */
481
      DBUG_ASSERT(my_errno != ESPIPE);
482
      info->error= -1;
483
      DBUG_RETURN(1);
484
    }
485
  }
486
487
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
488
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
489
  {					/* Fill first intern buffer */
490
    size_t read_length;
491
    if (info->end_of_file <= pos_in_file)
492
    {					/* End of file */
493
      info->error= (int) left_length;
494
      DBUG_RETURN(1);
495
    }
496
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
497
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
498
	!= length)
499
    {
500
      info->error= (read_length == (size_t) -1 ? -1 :
501
		    (int) (read_length+left_length));
502
      DBUG_RETURN(1);
503
    }
504
    Count-=length;
505
    Buffer+=length;
506
    pos_in_file+=length;
507
    left_length+=length;
508
    diff_length=0;
509
  }
510
511
  max_length= info->read_length-diff_length;
512
  if (info->type != READ_FIFO &&
513
      max_length > (info->end_of_file - pos_in_file))
514
    max_length= (size_t) (info->end_of_file - pos_in_file);
515
  if (!max_length)
516
  {
517
    if (Count)
518
    {
519
      info->error= left_length;		/* We only got this many char */
520
      DBUG_RETURN(1);
521
    }
522
    length=0;				/* Didn't read any chars */
523
  }
524
  else if ((length= my_read(info->file,info->buffer, max_length,
525
                            info->myflags)) < Count ||
526
	   length == (size_t) -1)
527
  {
528
    if (length != (size_t) -1)
529
      memcpy(Buffer, info->buffer, length);
530
    info->pos_in_file= pos_in_file;
531
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
532
    info->read_pos=info->read_end=info->buffer;
533
    DBUG_RETURN(1);
534
  }
535
  info->read_pos=info->buffer+Count;
536
  info->read_end=info->buffer+length;
537
  info->pos_in_file=pos_in_file;
538
  memcpy(Buffer, info->buffer, Count);
539
  DBUG_RETURN(0);
540
}
541
542
543
/*
544
  Prepare IO_CACHE for shared use.
545
546
  SYNOPSIS
547
    init_io_cache_share()
548
      read_cache                A read cache. This will be copied for
549
                                every thread after setup.
550
      cshare                    The share.
551
      write_cache               If non-NULL a write cache that is to be
552
                                synchronized with the read caches.
553
      num_threads               Number of threads sharing the cache
554
                                including the write thread if any.
555
556
  DESCRIPTION
557
558
    The shared cache is used so: One IO_CACHE is initialized with
559
    init_io_cache(). This includes the allocation of a buffer. Then a
560
    share is allocated and init_io_cache_share() is called with the io
561
    cache and the share. Then the io cache is copied for each thread. So
562
    every thread has its own copy of IO_CACHE. But the allocated buffer
563
    is shared because cache->buffer is the same for all caches.
564
565
    One thread reads data from the file into the buffer. All threads
566
    read from the buffer, but every thread maintains its own set of
567
    pointers into the buffer. When all threads have used up the buffer
568
    contents, one of the threads reads the next block of data into the
569
    buffer. To accomplish this, each thread enters the cache lock before
570
    accessing the buffer. They wait in lock_io_cache() until all threads
571
    joined the lock. The last thread entering the lock is in charge of
572
    reading from file to buffer. It wakes all threads when done.
573
574
    Synchronizing a write cache to the read caches works so: Whenever
575
    the write buffer needs a flush, the write thread enters the lock and
576
    waits for all other threads to enter the lock too. They do this when
577
    they have used up the read buffer. When all threads are in the lock,
578
    the write thread copies the write buffer to the read buffer and
579
    wakes all threads.
580
581
    share->running_threads is the number of threads not being in the
582
    cache lock. When entering lock_io_cache() the number is decreased.
583
    When the thread that fills the buffer enters unlock_io_cache() the
584
    number is reset to the number of threads. The condition
585
    running_threads == 0 means that all threads are in the lock. Bumping
586
    up the number to the full count is non-intuitive. But increasing the
587
    number by one for each thread that leaves the lock could lead to a
588
    solo run of one thread. The last thread to join a lock reads from
589
    file to buffer, wakes the other threads, processes the data in the
590
    cache and enters the lock again. If no other thread left the lock
591
    meanwhile, it would think it's the last one again and read the next
592
    block...
593
594
    The share has copies of 'error', 'buffer', 'read_end', and
595
    'pos_in_file' from the thread that filled the buffer. We may not be
596
    able to access this information directly from its cache because the
597
    thread may be removed from the share before the variables could be
598
    copied by all other threads. Or, if a write buffer is synchronized,
599
    it would change its 'pos_in_file' after waking the other threads,
600
    possibly before they could copy its value.
601
602
    However, the 'buffer' variable in the share is for a synchronized
603
    write cache. It needs to know where to put the data. Otherwise it
604
    would need access to the read cache of one of the threads that is
605
    not yet removed from the share.
606
607
  RETURN
608
    void
609
*/
610
611
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
612
                         IO_CACHE *write_cache, uint num_threads)
613
{
614
  DBUG_ENTER("init_io_cache_share");
615
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
616
                                "write_cache: 0x%lx  threads: %u",
617
                                (long) read_cache, (long) cshare,
618
                                (long) write_cache, num_threads));
619
620
  DBUG_ASSERT(num_threads > 1);
621
  DBUG_ASSERT(read_cache->type == READ_CACHE);
622
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
623
624
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
625
  pthread_cond_init(&cshare->cond, 0);
626
  pthread_cond_init(&cshare->cond_writer, 0);
627
628
  cshare->running_threads= num_threads;
629
  cshare->total_threads=   num_threads;
630
  cshare->error=           0;    /* Initialize. */
631
  cshare->buffer=          read_cache->buffer;
632
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
633
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
634
  cshare->source_cache=    write_cache; /* Can be NULL. */
635
636
  read_cache->share=         cshare;
637
  read_cache->read_function= _my_b_read_r;
638
  read_cache->current_pos=   NULL;
639
  read_cache->current_end=   NULL;
640
641
  if (write_cache)
642
    write_cache->share= cshare;
643
644
  DBUG_VOID_RETURN;
645
}
646
647
648
/*
649
  Remove a thread from shared access to IO_CACHE.
650
651
  SYNOPSIS
652
    remove_io_thread()
653
      cache                     The IO_CACHE to be removed from the share.
654
655
  NOTE
656
657
    Every thread must do that on exit for not to deadlock other threads.
658
659
    The last thread destroys the pthread resources.
660
661
    A writer flushes its cache first.
662
663
  RETURN
664
    void
665
*/
666
667
void remove_io_thread(IO_CACHE *cache)
668
{
669
  IO_CACHE_SHARE *cshare= cache->share;
670
  uint total;
671
  DBUG_ENTER("remove_io_thread");
672
673
  /* If the writer goes, it needs to flush the write cache. */
674
  if (cache == cshare->source_cache)
675
    flush_io_cache(cache);
676
677
  pthread_mutex_lock(&cshare->mutex);
678
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
679
                                (cache == cshare->source_cache) ?
680
                                "writer" : "reader", (long) cache));
681
682
  /* Remove from share. */
683
  total= --cshare->total_threads;
684
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
685
686
  /* Detach from share. */
687
  cache->share= NULL;
688
689
  /* If the writer goes, let the readers know. */
690
  if (cache == cshare->source_cache)
691
  {
692
    DBUG_PRINT("io_cache_share", ("writer leaves"));
693
    cshare->source_cache= NULL;
694
  }
695
696
  /* If all threads are waiting for me to join the lock, wake them. */
697
  if (!--cshare->running_threads)
698
  {
699
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
700
    pthread_cond_signal(&cshare->cond_writer);
701
    pthread_cond_broadcast(&cshare->cond);
702
  }
703
704
  pthread_mutex_unlock(&cshare->mutex);
705
706
  if (!total)
707
  {
708
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
709
    pthread_cond_destroy (&cshare->cond_writer);
710
    pthread_cond_destroy (&cshare->cond);
711
    pthread_mutex_destroy(&cshare->mutex);
712
  }
713
714
  DBUG_VOID_RETURN;
715
}
716
717
718
/*
719
  Lock IO cache and wait for all other threads to join.
720
721
  SYNOPSIS
722
    lock_io_cache()
723
      cache                     The cache of the thread entering the lock.
724
      pos                       File position of the block to read.
725
                                Unused for the write thread.
726
727
  DESCRIPTION
728
729
    Wait for all threads to finish with the current buffer. We want
730
    all threads to proceed in concert. The last thread to join
731
    lock_io_cache() will read the block from file and all threads start
732
    to use it. Then they will join again for reading the next block.
733
734
    The waiting threads detect a fresh buffer by comparing
735
    cshare->pos_in_file with the position they want to process next.
736
    Since the first block may start at position 0, we take
737
    cshare->read_end as an additional condition. This variable is
738
    initialized to NULL and will be set after a block of data is written
739
    to the buffer.
740
741
  RETURN
742
    1           OK, lock in place, go ahead and read.
743
    0           OK, unlocked, another thread did the read.
744
*/
745
746
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
747
{
748
  IO_CACHE_SHARE *cshare= cache->share;
749
  DBUG_ENTER("lock_io_cache");
750
751
  /* Enter the lock. */
752
  pthread_mutex_lock(&cshare->mutex);
753
  cshare->running_threads--;
754
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
755
                                (cache == cshare->source_cache) ?
756
                                "writer" : "reader", (long) cache, (ulong) pos,
757
                                cshare->running_threads));
758
759
  if (cshare->source_cache)
760
  {
761
    /* A write cache is synchronized to the read caches. */
762
763
    if (cache == cshare->source_cache)
764
    {
765
      /* The writer waits until all readers are here. */
766
      while (cshare->running_threads)
767
      {
768
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
769
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
770
      }
771
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
772
773
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
774
      DBUG_RETURN(1);
775
    }
776
777
    /* The last thread wakes the writer. */
778
    if (!cshare->running_threads)
779
    {
780
      DBUG_PRINT("io_cache_share", ("waking writer"));
781
      pthread_cond_signal(&cshare->cond_writer);
782
    }
783
784
    /*
785
      Readers wait until the data is copied from the writer. Another
786
      reason to stop waiting is the removal of the write thread. If this
787
      happens, we leave the lock with old data in the buffer.
788
    */
789
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
790
           cshare->source_cache)
791
    {
792
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
793
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
794
    }
795
796
    /*
797
      If the writer was removed from the share while this thread was
798
      asleep, we need to simulate an EOF condition. The writer cannot
799
      reset the share variables as they might still be in use by readers
800
      of the last block. When we awake here then because the last
801
      joining thread signalled us. If the writer is not the last, it
802
      will not signal. So it is safe to clear the buffer here.
803
    */
804
    if (!cshare->read_end || (cshare->pos_in_file < pos))
805
    {
806
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
807
      cshare->read_end= cshare->buffer; /* Empty buffer. */
808
      cshare->error= 0; /* EOF is not an error. */
809
    }
810
  }
811
  else
812
  {
813
    /*
814
      There are read caches only. The last thread arriving in
815
      lock_io_cache() continues with a locked cache and reads the block.
816
    */
817
    if (!cshare->running_threads)
818
    {
819
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
820
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
821
      DBUG_RETURN(1);
822
    }
823
824
    /*
825
      All other threads wait until the requested block is read by the
826
      last thread arriving. Another reason to stop waiting is the
827
      removal of a thread. If this leads to all threads being in the
828
      lock, we have to continue also. The first of the awaken threads
829
      will then do the read.
830
    */
831
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
832
           cshare->running_threads)
833
    {
834
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
835
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
836
    }
837
838
    /* If the block is not yet read, continue with a locked cache and read. */
839
    if (!cshare->read_end || (cshare->pos_in_file < pos))
840
    {
841
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
842
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
843
      DBUG_RETURN(1);
844
    }
845
846
    /* Another thread did read the block already. */
847
  }
848
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
849
                                (uint) (cshare->read_end ? (size_t)
850
                                        (cshare->read_end - cshare->buffer) :
851
                                        0)));
852
853
  /*
854
    Leave the lock. Do not call unlock_io_cache() later. The thread that
855
    filled the buffer did this and marked all threads as running.
856
  */
857
  pthread_mutex_unlock(&cshare->mutex);
858
  DBUG_RETURN(0);
859
}
860
861
862
/*
863
  Unlock IO cache.
864
865
  SYNOPSIS
866
    unlock_io_cache()
867
      cache                     The cache of the thread leaving the lock.
868
869
  NOTE
870
    This is called by the thread that filled the buffer. It marks all
871
    threads as running and awakes them. This must not be done by any
872
    other thread.
873
874
    Do not signal cond_writer. Either there is no writer or the writer
875
    is the only one who can call this function.
876
877
    The reason for resetting running_threads to total_threads before
878
    waking all other threads is that it could be possible that this
879
    thread is so fast with processing the buffer that it enters the lock
880
    before even one other thread has left it. If every awoken thread
881
    would increase running_threads by one, this thread could think that
882
    he is again the last to join and would not wait for the other
883
    threads to process the data.
884
885
  RETURN
886
    void
887
*/
888
889
static void unlock_io_cache(IO_CACHE *cache)
890
{
891
  IO_CACHE_SHARE *cshare= cache->share;
892
  DBUG_ENTER("unlock_io_cache");
893
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
894
                                (cache == cshare->source_cache) ?
895
                                "writer" : "reader",
896
                                (long) cache, (ulong) cshare->pos_in_file,
897
                                cshare->total_threads));
898
899
  cshare->running_threads= cshare->total_threads;
900
  pthread_cond_broadcast(&cshare->cond);
901
  pthread_mutex_unlock(&cshare->mutex);
902
  DBUG_VOID_RETURN;
903
}
904
905
906
/*
907
  Read from IO_CACHE when it is shared between several threads.
908
909
  SYNOPSIS
910
    _my_b_read_r()
911
      cache                     IO_CACHE pointer
912
      Buffer                    Buffer to retrieve count bytes from file
913
      Count                     Number of bytes to read into Buffer
914
915
  NOTE
916
    This function is only called from the my_b_read() macro when there
917
    isn't enough characters in the buffer to satisfy the request.
918
919
  IMPLEMENTATION
920
921
    It works as follows: when a thread tries to read from a file (that
922
    is, after using all the data from the (shared) buffer), it just
923
    hangs on lock_io_cache(), waiting for other threads. When the very
924
    last thread attempts a read, lock_io_cache() returns 1, the thread
925
    does actual IO and unlock_io_cache(), which signals all the waiting
926
    threads that data is in the buffer.
927
928
  WARNING
929
930
    When changing this function, be careful with handling file offsets
931
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
932
    types than my_off_t unless you can be sure that their value fits.
933
    Same applies to differences of file offsets. (Bug #11527)
934
935
    When changing this function, check _my_b_read(). It might need the
936
    same change.
937
938
  RETURN
939
    0      we succeeded in reading all data
940
    1      Error: can't read requested characters
941
*/
942
943
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
944
{
945
  my_off_t pos_in_file;
946
  size_t length, diff_length, left_length;
947
  IO_CACHE_SHARE *cshare= cache->share;
948
  DBUG_ENTER("_my_b_read_r");
949
950
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
951
  {
952
    DBUG_ASSERT(Count >= left_length);	/* User is not using my_b_read() */
953
    memcpy(Buffer, cache->read_pos, left_length);
954
    Buffer+= left_length;
955
    Count-= left_length;
956
  }
957
  while (Count)
958
  {
959
    size_t cnt, len;
960
961
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
962
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
963
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
964
    length= ((length <= cache->read_length) ?
965
             length + IO_ROUND_DN(cache->read_length - length) :
966
             length - IO_ROUND_UP(length - cache->read_length));
967
    if (cache->type != READ_FIFO &&
968
	(length > (cache->end_of_file - pos_in_file)))
969
      length= (size_t) (cache->end_of_file - pos_in_file);
970
    if (length == 0)
971
    {
972
      cache->error= (int) left_length;
973
      DBUG_RETURN(1);
974
    }
975
    if (lock_io_cache(cache, pos_in_file))
976
    {
977
      /* With a synchronized write/read cache we won't come here... */
978
      DBUG_ASSERT(!cshare->source_cache);
979
      /*
980
        ... unless the writer has gone before this thread entered the
981
        lock. Simulate EOF in this case. It can be distinguished by
982
        cache->file.
983
      */
984
      if (cache->file < 0)
985
        len= 0;
986
      else
987
      {
988
        /*
989
          Whenever a function which operates on IO_CACHE flushes/writes
990
          some part of the IO_CACHE to disk it will set the property
991
          "seek_not_done" to indicate this to other functions operating
992
          on the IO_CACHE.
993
        */
994
        if (cache->seek_not_done)
995
        {
996
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
997
              == MY_FILEPOS_ERROR)
998
          {
999
            cache->error= -1;
1000
            unlock_io_cache(cache);
1001
            DBUG_RETURN(1);
1002
          }
1003
        }
1004
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
1005
      }
1006
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1007
1008
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
1009
      cache->error=       (len == length ? 0 : (int) len);
1010
      cache->pos_in_file= pos_in_file;
1011
1012
      /* Copy important values to the share. */
1013
      cshare->error=       cache->error;
1014
      cshare->read_end=    cache->read_end;
1015
      cshare->pos_in_file= pos_in_file;
1016
1017
      /* Mark all threads as running and wake them. */
1018
      unlock_io_cache(cache);
1019
    }
1020
    else
1021
    {
1022
      /*
1023
        With a synchronized write/read cache readers always come here.
1024
        Copy important values from the share.
1025
      */
1026
      cache->error=       cshare->error;
1027
      cache->read_end=    cshare->read_end;
1028
      cache->pos_in_file= cshare->pos_in_file;
1029
1030
      len= ((cache->error == -1) ? (size_t) -1 :
1031
            (size_t) (cache->read_end - cache->buffer));
1032
    }
1033
    cache->read_pos=      cache->buffer;
1034
    cache->seek_not_done= 0;
1035
    if (len == 0 || len == (size_t) -1)
1036
    {
1037
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
1038
                                    (ulong) len, (ulong) left_length));
1039
      cache->error= (int) left_length;
1040
      DBUG_RETURN(1);
1041
    }
1042
    cnt= (len > Count) ? Count : len;
1043
    memcpy(Buffer, cache->read_pos, cnt);
1044
    Count -= cnt;
1045
    Buffer+= cnt;
1046
    left_length+= cnt;
1047
    cache->read_pos+= cnt;
1048
  }
1049
  DBUG_RETURN(0);
1050
}
1051
1052
1053
/*
1054
  Copy data from write cache to read cache.
1055
1056
  SYNOPSIS
1057
    copy_to_read_buffer()
1058
      write_cache               The write cache.
1059
      write_buffer              The source of data, mostly the cache buffer.
1060
      write_length              The number of bytes to copy.
1061
1062
  NOTE
1063
    The write thread will wait for all read threads to join the cache
1064
    lock. Then it copies the data over and wakes the read threads.
1065
1066
  RETURN
1067
    void
1068
*/
1069
1070
static void copy_to_read_buffer(IO_CACHE *write_cache,
1071
                                const uchar *write_buffer, size_t write_length)
1072
{
1073
  IO_CACHE_SHARE *cshare= write_cache->share;
1074
1075
  DBUG_ASSERT(cshare->source_cache == write_cache);
1076
  /*
1077
    write_length is usually less or equal to buffer_length.
1078
    It can be bigger if _my_b_write() is called with a big length.
1079
  */
1080
  while (write_length)
1081
  {
1082
    size_t copy_length= min(write_length, write_cache->buffer_length);
1083
    int  __attribute__((unused)) rc;
1084
1085
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1086
    /* The writing thread does always have the lock when it awakes. */
1087
    DBUG_ASSERT(rc);
1088
1089
    memcpy(cshare->buffer, write_buffer, copy_length);
1090
1091
    cshare->error=       0;
1092
    cshare->read_end=    cshare->buffer + copy_length;
1093
    cshare->pos_in_file= write_cache->pos_in_file;
1094
1095
    /* Mark all threads as running and wake them. */
1096
    unlock_io_cache(write_cache);
1097
1098
    write_buffer+= copy_length;
1099
    write_length-= copy_length;
1100
  }
1101
}
1102
1103
1104
/*
1105
  Do sequential read from the SEQ_READ_APPEND cache.
1106
  
1107
  We do this in three stages:
1108
   - first read from info->buffer
1109
   - then if there are still data to read, try the file descriptor
1110
   - afterwards, if there are still data to read, try append buffer
1111
1112
  RETURNS
1113
    0  Success
1114
    1  Failed to read
1115
*/
1116
1117
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1118
{
1119
  size_t length, diff_length, left_length, save_count, max_length;
1120
  my_off_t pos_in_file;
1121
  save_count=Count;
1122
1123
  /* first, read the regular buffer */
1124
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1125
  {
1126
    DBUG_ASSERT(Count > left_length);	/* User is not using my_b_read() */
1127
    memcpy(Buffer,info->read_pos, left_length);
1128
    Buffer+=left_length;
1129
    Count-=left_length;
1130
  }
1131
  lock_append_buffer(info);
1132
1133
  /* pos_in_file always point on where info->buffer was read */
1134
  if ((pos_in_file=info->pos_in_file +
1135
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1136
    goto read_append_buffer;
1137
1138
  /*
1139
    With read-append cache we must always do a seek before we read,
1140
    because the write could have moved the file pointer astray
1141
  */
1142
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1143
  {
1144
   info->error= -1;
1145
   unlock_append_buffer(info);
1146
   return (1);
1147
  }
1148
  info->seek_not_done=0;
1149
1150
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1151
1152
  /* now the second stage begins - read from file descriptor */
1153
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1154
  {
1155
    /* Fill first intern buffer */
1156
    size_t read_length;
1157
1158
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1159
    if ((read_length= my_read(info->file,Buffer, length,
1160
                              info->myflags)) == (size_t) -1)
1161
    {
1162
      info->error= -1;
1163
      unlock_append_buffer(info);
1164
      return 1;
1165
    }
1166
    Count-=read_length;
1167
    Buffer+=read_length;
1168
    pos_in_file+=read_length;
1169
1170
    if (read_length != length)
1171
    {
1172
      /*
1173
	We only got part of data;  Read the rest of the data from the
1174
	write buffer
1175
      */
1176
      goto read_append_buffer;
1177
    }
1178
    left_length+=length;
1179
    diff_length=0;
1180
  }
1181
1182
  max_length= info->read_length-diff_length;
1183
  if (max_length > (info->end_of_file - pos_in_file))
1184
    max_length= (size_t) (info->end_of_file - pos_in_file);
1185
  if (!max_length)
1186
  {
1187
    if (Count)
1188
      goto read_append_buffer;
1189
    length=0;				/* Didn't read any more chars */
1190
  }
1191
  else
1192
  {
1193
    length= my_read(info->file,info->buffer, max_length, info->myflags);
1194
    if (length == (size_t) -1)
1195
    {
1196
      info->error= -1;
1197
      unlock_append_buffer(info);
1198
      return 1;
1199
    }
1200
    if (length < Count)
1201
    {
1202
      memcpy(Buffer, info->buffer, length);
1203
      Count -= length;
1204
      Buffer += length;
1205
1206
      /*
1207
	 added the line below to make
1208
	 DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1209
	 otherwise this does not appear to be needed
1210
      */
1211
      pos_in_file += length;
1212
      goto read_append_buffer;
1213
    }
1214
  }
1215
  unlock_append_buffer(info);
1216
  info->read_pos=info->buffer+Count;
1217
  info->read_end=info->buffer+length;
1218
  info->pos_in_file=pos_in_file;
1219
  memcpy(Buffer,info->buffer,(size_t) Count);
1220
  return 0;
1221
1222
read_append_buffer:
1223
1224
  /*
1225
     Read data from the current write buffer.
1226
     Count should never be == 0 here (The code will work even if count is 0)
1227
  */
1228
1229
  {
1230
    /* First copy the data to Count */
1231
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1232
    size_t copy_len;
1233
    size_t transfer_len;
1234
1235
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1236
    /*
1237
      TODO: figure out if the assert below is needed or correct.
1238
    */
1239
    DBUG_ASSERT(pos_in_file == info->end_of_file);
1240
    copy_len=min(Count, len_in_buff);
1241
    memcpy(Buffer, info->append_read_pos, copy_len);
1242
    info->append_read_pos += copy_len;
1243
    Count -= copy_len;
1244
    if (Count)
1245
      info->error = save_count - Count;
1246
1247
    /* Fill read buffer with data from write buffer */
1248
    memcpy(info->buffer, info->append_read_pos,
1249
	   (size_t) (transfer_len=len_in_buff - copy_len));
1250
    info->read_pos= info->buffer;
1251
    info->read_end= info->buffer+transfer_len;
1252
    info->append_read_pos=info->write_pos;
1253
    info->pos_in_file=pos_in_file+copy_len;
1254
    info->end_of_file+=len_in_buff;
1255
  }
1256
  unlock_append_buffer(info);
1257
  return Count ? 1 : 0;
1258
}
1259
1260
1261
#ifdef HAVE_AIOWAIT
1262
1263
/*
1264
  Read from the IO_CACHE into a buffer and feed asynchronously
1265
  from disk when needed.
1266
1267
  SYNOPSIS
1268
    _my_b_async_read()
1269
      info                      IO_CACHE pointer
1270
      Buffer                    Buffer to retrieve count bytes from file
1271
      Count                     Number of bytes to read into Buffer
1272
1273
  RETURN VALUE
1274
    -1          An error has occurred; my_errno is set.
1275
     0          Success
1276
     1          An error has occurred; IO_CACHE to error state.
1277
*/
1278
1279
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1280
{
1281
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1282
  size_t max_length;
1283
  my_off_t next_pos_in_file;
1284
  uchar *read_buffer;
1285
1286
  memcpy(Buffer,info->read_pos,
1287
	 (left_length= (size_t) (info->read_end-info->read_pos)));
1288
  Buffer+=left_length;
1289
  org_Count=Count;
1290
  Count-=left_length;
1291
1292
  if (info->inited)
1293
  {						/* wait for read block */
1294
    info->inited=0;				/* No more block to read */
1295
    my_aiowait(&info->aio_result);		/* Wait for outstanding req */
1296
    if (info->aio_result.result.aio_errno)
1297
    {
1298
      if (info->myflags & MY_WME)
1299
	my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1300
		 my_filename(info->file),
1301
		 info->aio_result.result.aio_errno);
1302
      my_errno=info->aio_result.result.aio_errno;
1303
      info->error= -1;
1304
      return(1);
1305
    }
1306
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1307
	read_length == (size_t) -1)
1308
    {
1309
      my_errno=0;				/* For testing */
1310
      info->error= (read_length == (size_t) -1 ? -1 :
1311
		    (int) (read_length+left_length));
1312
      return(1);
1313
    }
1314
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1315
1316
    if (info->request_pos != info->buffer)
1317
      info->request_pos=info->buffer;
1318
    else
1319
      info->request_pos=info->buffer+info->read_length;
1320
    info->read_pos=info->request_pos;
1321
    next_pos_in_file=info->aio_read_pos+read_length;
1322
1323
	/* Check if pos_in_file is changed
1324
	   (_ni_read_cache may have skipped some bytes) */
1325
1326
    if (info->aio_read_pos < info->pos_in_file)
1327
    {						/* Fix if skipped bytes */
1328
      if (info->aio_read_pos + read_length < info->pos_in_file)
1329
      {
1330
	read_length=0;				/* Skip block */
1331
	next_pos_in_file=info->pos_in_file;
1332
      }
1333
      else
1334
      {
1335
	my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1336
	info->pos_in_file=info->aio_read_pos; /* Whe are here */
1337
	info->read_pos=info->request_pos+offset;
1338
	read_length-=offset;			/* Bytes left from read_pos */
1339
      }
1340
    }
1341
#ifndef DBUG_OFF
1342
    if (info->aio_read_pos > info->pos_in_file)
1343
    {
1344
      my_errno=EINVAL;
1345
      return(info->read_length= (size_t) -1);
1346
    }
1347
#endif
1348
	/* Copy found bytes to buffer */
1349
    length=min(Count,read_length);
1350
    memcpy(Buffer,info->read_pos,(size_t) length);
1351
    Buffer+=length;
1352
    Count-=length;
1353
    left_length+=length;
1354
    info->read_end=info->rc_pos+read_length;
1355
    info->read_pos+=length;
1356
  }
1357
  else
1358
    next_pos_in_file=(info->pos_in_file+ (size_t)
1359
		      (info->read_end - info->request_pos));
1360
1361
	/* If reading large blocks, or first read or read with skip */
1362
  if (Count)
1363
  {
1364
    if (next_pos_in_file == info->end_of_file)
1365
    {
1366
      info->error=(int) (read_length+left_length);
1367
      return 1;
1368
    }
1369
    
1370
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1371
        == MY_FILEPOS_ERROR)
1372
    {
1373
      info->error= -1;
1374
      return (1);
1375
    }
1376
1377
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1378
    if (Count < read_length)
1379
    {					/* Small block, read to cache */
1380
      if ((read_length=my_read(info->file,info->request_pos,
1381
			       read_length, info->myflags)) == (size_t) -1)
1382
        return info->error= -1;
1383
      use_length=min(Count,read_length);
1384
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1385
      info->read_pos=info->request_pos+Count;
1386
      info->read_end=info->request_pos+read_length;
1387
      info->pos_in_file=next_pos_in_file;	/* Start of block in cache */
1388
      next_pos_in_file+=read_length;
1389
1390
      if (Count != use_length)
1391
      {					/* Didn't find hole block */
1392
	if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1393
	  my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1394
		   my_filename(info->file),my_errno);
1395
	info->error=(int) (read_length+left_length);
1396
	return 1;
1397
      }
1398
    }
1399
    else
1400
    {						/* Big block, don't cache it */
1401
      if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
1402
	  != Count)
1403
      {
1404
	info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1405
	return 1;
1406
      }
1407
      info->read_pos=info->read_end=info->request_pos;
1408
      info->pos_in_file=(next_pos_in_file+=Count);
1409
    }
1410
  }
1411
1412
  /* Read next block with asyncronic io */
1413
  diff_length=(next_pos_in_file & (IO_SIZE-1));
1414
  max_length= info->read_length - diff_length;
1415
  if (max_length > info->end_of_file - next_pos_in_file)
1416
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
1417
1418
  if (info->request_pos != info->buffer)
1419
    read_buffer=info->buffer;
1420
  else
1421
    read_buffer=info->buffer+info->read_length;
1422
  info->aio_read_pos=next_pos_in_file;
1423
  if (max_length)
1424
  {
1425
    info->aio_result.result.aio_errno=AIO_INPROGRESS;	/* Marker for test */
1426
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
1427
			  (ulong) next_pos_in_file, (ulong) max_length));
1428
    if (aioread(info->file,read_buffer, max_length,
1429
		(my_off_t) next_pos_in_file,MY_SEEK_SET,
1430
		&info->aio_result.result))
1431
    {						/* Skip async io */
1432
      my_errno=errno;
1433
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1434
			  errno, info->aio_result.result.aio_errno));
1435
      if (info->request_pos != info->buffer)
1436
      {
1437
	bmove(info->buffer,info->request_pos,
1438
	      (size_t) (info->read_end - info->read_pos));
1439
	info->request_pos=info->buffer;
1440
	info->read_pos-=info->read_length;
1441
	info->read_end-=info->read_length;
1442
      }
1443
      info->read_length=info->buffer_length;	/* Use hole buffer */
1444
      info->read_function=_my_b_read;		/* Use normal IO_READ next */
1445
    }
1446
    else
1447
      info->inited=info->aio_result.pending=1;
1448
  }
1449
  return 0;					/* Block read, async in use */
1450
} /* _my_b_async_read */
1451
#endif
1452
1453
1454
/* Read one byte when buffer is empty */
1455
1456
int _my_b_get(IO_CACHE *info)
1457
{
1458
  uchar buff;
1459
  IO_CACHE_CALLBACK pre_read,post_read;
1460
  if ((pre_read = info->pre_read))
1461
    (*pre_read)(info);
1462
  if ((*(info)->read_function)(info,&buff,1))
1463
    return my_b_EOF;
1464
  if ((post_read = info->post_read))
1465
    (*post_read)(info);
1466
  return (int) (uchar) buff;
1467
}
1468
1469
/* 
1470
   Write a byte buffer to IO_CACHE and flush to disk
1471
   if IO_CACHE is full.
1472
1473
   RETURN VALUE
1474
    1 On error on write
1475
    0 On success
1476
   -1 On error; my_errno contains error code.
1477
*/
1478
1479
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1480
{
1481
  size_t rest_length,length;
1482
1483
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1484
  {
1485
    my_errno=errno=EFBIG;
1486
    return info->error = -1;
1487
  }
1488
1489
  rest_length= (size_t) (info->write_end - info->write_pos);
1490
  memcpy(info->write_pos,Buffer,(size_t) rest_length);
1491
  Buffer+=rest_length;
1492
  Count-=rest_length;
1493
  info->write_pos+=rest_length;
1494
1495
  if (my_b_flush_io_cache(info,1))
1496
    return 1;
1497
  if (Count >= IO_SIZE)
1498
  {					/* Fill first intern buffer */
1499
    length=Count & (size_t) ~(IO_SIZE-1);
1500
    if (info->seek_not_done)
1501
    {
1502
      /*
1503
        Whenever a function which operates on IO_CACHE flushes/writes
1504
        some part of the IO_CACHE to disk it will set the property
1505
        "seek_not_done" to indicate this to other functions operating
1506
        on the IO_CACHE.
1507
      */
1508
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1509
      {
1510
        info->error= -1;
1511
        return (1);
1512
      }
1513
      info->seek_not_done=0;
1514
    }
1515
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1516
      return info->error= -1;
1517
1518
    /*
1519
      In case of a shared I/O cache with a writer we normally do direct
1520
      write cache to read cache copy. Simulate this here by direct
1521
      caller buffer to read cache copy. Do it after the write so that
1522
      the cache readers actions on the flushed part can go in parallel
1523
      with the write of the extra stuff. copy_to_read_buffer()
1524
      synchronizes writer and readers so that after this call the
1525
      readers can act on the extra stuff while the writer can go ahead
1526
      and prepare the next output. copy_to_read_buffer() relies on
1527
      info->pos_in_file.
1528
    */
1529
    if (info->share)
1530
      copy_to_read_buffer(info, Buffer, length);
1531
1532
    Count-=length;
1533
    Buffer+=length;
1534
    info->pos_in_file+=length;
1535
  }
1536
  memcpy(info->write_pos,Buffer,(size_t) Count);
1537
  info->write_pos+=Count;
1538
  return 0;
1539
}
1540
1541
1542
/*
1543
  Append a block to the write buffer.
1544
  This is done with the buffer locked to ensure that we don't read from
1545
  the write buffer before we are ready with it.
1546
*/
1547
1548
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1549
{
1550
  size_t rest_length,length;
1551
1552
  /*
1553
    Assert that we cannot come here with a shared cache. If we do one
1554
    day, we might need to add a call to copy_to_read_buffer().
1555
  */
1556
  DBUG_ASSERT(!info->share);
1557
1558
  lock_append_buffer(info);
1559
  rest_length= (size_t) (info->write_end - info->write_pos);
1560
  if (Count <= rest_length)
1561
    goto end;
1562
  memcpy(info->write_pos, Buffer, rest_length);
1563
  Buffer+=rest_length;
1564
  Count-=rest_length;
1565
  info->write_pos+=rest_length;
1566
  if (my_b_flush_io_cache(info,0))
1567
  {
1568
    unlock_append_buffer(info);
1569
    return 1;
1570
  }
1571
  if (Count >= IO_SIZE)
1572
  {					/* Fill first intern buffer */
1573
    length=Count & (size_t) ~(IO_SIZE-1);
1574
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1575
    {
1576
      unlock_append_buffer(info);
1577
      return info->error= -1;
1578
    }
1579
    Count-=length;
1580
    Buffer+=length;
1581
    info->end_of_file+=length;
1582
  }
1583
1584
end:
1585
  memcpy(info->write_pos,Buffer,(size_t) Count);
1586
  info->write_pos+=Count;
1587
  unlock_append_buffer(info);
1588
  return 0;
1589
}
1590
1591
1592
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1593
{
1594
  /*
1595
    Sasha: We are not writing this with the ? operator to avoid hitting
1596
    a possible compiler bug. At least gcc 2.95 cannot deal with 
1597
    several layers of ternary operators that evaluated comma(,) operator
1598
    expressions inside - I do have a test case if somebody wants it
1599
  */
1600
  if (info->type == SEQ_READ_APPEND)
1601
    return my_b_append(info, Buffer, Count);
1602
  return my_b_write(info, Buffer, Count);
1603
}
1604
1605
1606
/*
1607
  Write a block to disk where part of the data may be inside the record
1608
  buffer.  As all write calls to the data goes through the cache,
1609
  we will never get a seek over the end of the buffer
1610
*/
1611
1612
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1613
		   my_off_t pos)
1614
{
1615
  size_t length;
1616
  int error=0;
1617
1618
  /*
1619
    Assert that we cannot come here with a shared cache. If we do one
1620
    day, we might need to add a call to copy_to_read_buffer().
1621
  */
1622
  DBUG_ASSERT(!info->share);
1623
1624
  if (pos < info->pos_in_file)
1625
  {
1626
    /* Of no overlap, write everything without buffering */
1627
    if (pos + Count <= info->pos_in_file)
32 by Brian Aker
More cleanup on pread()
1628
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1 by brian
clean slate
1629
    /* Write the part of the block that is before buffer */
1630
    length= (uint) (info->pos_in_file - pos);
32 by Brian Aker
More cleanup on pread()
1631
    if (pwrite(info->file, Buffer, length, pos) == 0)
1 by brian
clean slate
1632
      info->error= error= -1;
1633
    Buffer+=length;
1634
    pos+=  length;
1635
    Count-= length;
1636
#ifndef HAVE_PREAD
1637
    info->seek_not_done=1;
1638
#endif
1639
  }
1640
1641
  /* Check if we want to write inside the used part of the buffer.*/
1642
  length= (size_t) (info->write_end - info->buffer);
1643
  if (pos < info->pos_in_file + length)
1644
  {
1645
    size_t offset= (size_t) (pos - info->pos_in_file);
1646
    length-=offset;
1647
    if (length > Count)
1648
      length=Count;
1649
    memcpy(info->buffer+offset, Buffer, length);
1650
    Buffer+=length;
1651
    Count-= length;
1652
    /* Fix length of buffer if the new data was larger */
1653
    if (info->buffer+length > info->write_pos)
1654
      info->write_pos=info->buffer+length;
1655
    if (!Count)
1656
      return (error);
1657
  }
1658
  /* Write at the end of the current buffer; This is the normal case */
1659
  if (_my_b_write(info, Buffer, Count))
1660
    error= -1;
1661
  return error;
1662
}
1663
1664
1665
	/* Flush write cache */
1666
1667
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1668
  lock_append_buffer(info);
1669
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1670
  unlock_append_buffer(info);
1671
1672
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1673
{
1674
  size_t length;
1675
  my_bool append_cache;
1676
  my_off_t pos_in_file;
1677
  DBUG_ENTER("my_b_flush_io_cache");
1678
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1679
1680
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1681
    need_append_buffer_lock=0;
1682
1683
  if (info->type == WRITE_CACHE || append_cache)
1684
  {
1685
    if (info->file == -1)
1686
    {
1687
      if (real_open_cached_file(info))
1688
	DBUG_RETURN((info->error= -1));
1689
    }
1690
    LOCK_APPEND_BUFFER;
1691
1692
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
1693
    {
1694
      /*
1695
        In case of a shared I/O cache with a writer we do direct write
1696
        cache to read cache copy. Do it before the write here so that
1697
        the readers can work in parallel with the write.
1698
        copy_to_read_buffer() relies on info->pos_in_file.
1699
      */
1700
      if (info->share)
1701
        copy_to_read_buffer(info, info->write_buffer, length);
1702
1703
      pos_in_file=info->pos_in_file;
1704
      /*
1705
	If we have append cache, we always open the file with
1706
	O_APPEND which moves the pos to EOF automatically on every write
1707
      */
1708
      if (!append_cache && info->seek_not_done)
1709
      {					/* File touched, do seek */
1710
	if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1711
	    MY_FILEPOS_ERROR)
1712
	{
1713
	  UNLOCK_APPEND_BUFFER;
1714
	  DBUG_RETURN((info->error= -1));
1715
	}
1716
	if (!append_cache)
1717
	  info->seek_not_done=0;
1718
      }
1719
      if (!append_cache)
1720
	info->pos_in_file+=length;
1721
      info->write_end= (info->write_buffer+info->buffer_length-
1722
			((pos_in_file+length) & (IO_SIZE-1)));
1723
1724
      if (my_write(info->file,info->write_buffer,length,
1725
		   info->myflags | MY_NABP))
1726
	info->error= -1;
1727
      else
1728
	info->error= 0;
1729
      if (!append_cache)
1730
      {
1731
        set_if_bigger(info->end_of_file,(pos_in_file+length));
1732
      }
1733
      else
1734
      {
1735
	info->end_of_file+=(info->write_pos-info->append_read_pos);
1736
	DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
1737
      }
1738
1739
      info->append_read_pos=info->write_pos=info->write_buffer;
1740
      ++info->disk_writes;
1741
      UNLOCK_APPEND_BUFFER;
1742
      DBUG_RETURN(info->error);
1743
    }
1744
  }
1745
#ifdef HAVE_AIOWAIT
1746
  else if (info->type != READ_NET)
1747
  {
1748
    my_aiowait(&info->aio_result);		/* Wait for outstanding req */
1749
    info->inited=0;
1750
  }
1751
#endif
1752
  UNLOCK_APPEND_BUFFER;
1753
  DBUG_RETURN(0);
1754
}
1755
1756
/*
1757
  Free an IO_CACHE object
1758
1759
  SYNOPSOS
1760
    end_io_cache()
1761
    info		IO_CACHE Handle to free
1762
1763
  NOTES
1764
    It's currently safe to call this if one has called init_io_cache()
1765
    on the 'info' object, even if init_io_cache() failed.
1766
    This function is also safe to call twice with the same handle.
1767
1768
  RETURN
1769
   0  ok
1770
   #  Error
1771
*/
1772
1773
int end_io_cache(IO_CACHE *info)
1774
{
1775
  int error=0;
1776
  IO_CACHE_CALLBACK pre_close;
1777
  DBUG_ENTER("end_io_cache");
1778
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1779
1780
  /*
1781
    Every thread must call remove_io_thread(). The last one destroys
1782
    the share elements.
1783
  */
1784
  DBUG_ASSERT(!info->share || !info->share->total_threads);
1785
1786
  if ((pre_close=info->pre_close))
1787
  {
1788
    (*pre_close)(info);
1789
    info->pre_close= 0;
1790
  }
1791
  if (info->alloced_buffer)
1792
  {
1793
    info->alloced_buffer=0;
1794
    if (info->file != -1)			/* File doesn't exist */
1795
      error= my_b_flush_io_cache(info,1);
1796
    my_free((uchar*) info->buffer,MYF(MY_WME));
1797
    info->buffer=info->read_pos=(uchar*) 0;
1798
  }
1799
  if (info->type == SEQ_READ_APPEND)
1800
  {
1801
    /* Destroy allocated mutex */
1802
    info->type= TYPE_NOT_SET;
1803
    pthread_mutex_destroy(&info->append_buffer_lock);
1804
  }
1805
  DBUG_RETURN(error);
1806
} /* end_io_cache */
1807
1808
1809
/**********************************************************************
1810
 Testing of MF_IOCACHE
1811
**********************************************************************/
1812
1813
#ifdef MAIN
1814
1815
#include <my_dir.h>
1816
1817
void die(const char* fmt, ...)
1818
{
1819
  va_list va_args;
1820
  va_start(va_args,fmt);
1821
  fprintf(stderr,"Error:");
1822
  vfprintf(stderr, fmt,va_args);
1823
  fprintf(stderr,", errno=%d\n", errno);
1824
  exit(1);
1825
}
1826
1827
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1828
{
1829
  int fd;
1830
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1831
    die("Could not open %s", fname);
1832
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1833
    die("failed in init_io_cache()");
1834
  return fd;
1835
}
1836
1837
void close_file(IO_CACHE* info)
1838
{
1839
  end_io_cache(info);
1840
  my_close(info->file, MYF(MY_WME));
1841
}
1842
1843
int main(int argc, char** argv)
1844
{
1845
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1846
  MY_STAT status;
1847
  const char* fname="/tmp/iocache.test";
1848
  int cache_size=16384;
1849
  char llstr_buf[22];
1850
  int max_block,total_bytes=0;
1851
  int i,num_loops=100,error=0;
1852
  char *p;
1853
  char* block, *block_end;
1854
  MY_INIT(argv[0]);
1855
  max_block = cache_size*3;
1856
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1857
    die("Not enough memory to allocate test block");
1858
  block_end = block + max_block;
1859
  for (p = block,i=0; p < block_end;i++)
1860
  {
1861
    *p++ = (char)i;
1862
  }
1863
  if (my_stat(fname,&status, MYF(0)) &&
1864
      my_delete(fname,MYF(MY_WME)))
1865
    {
1866
      die("Delete of %s failed, aborting", fname);
1867
    }
1868
  open_file(fname,&sra_cache, cache_size);
1869
  for (i = 0; i < num_loops; i++)
1870
  {
1871
    char buf[4];
1872
    int block_size = abs(rand() % max_block);
1873
    int4store(buf, block_size);
1874
    if (my_b_append(&sra_cache,buf,4) ||
1875
	my_b_append(&sra_cache, block, block_size))
1876
      die("write failed");
1877
    total_bytes += 4+block_size;
1878
  }
1879
  close_file(&sra_cache);
1880
  my_free(block,MYF(MY_WME));
1881
  if (!my_stat(fname,&status,MYF(MY_WME)))
1882
    die("%s failed to stat, but I had just closed it,\
1883
 wonder how that happened");
1884
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
1885
	 llstr(status.st_size,llstr_buf),
1886
	 total_bytes);
1887
  my_delete(fname, MYF(MY_WME));
1888
  /* check correctness of tests */
1889
  if (total_bytes != status.st_size)
1890
  {
1891
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
1892
supposedly written\n");
1893
    error=1;
1894
  }
1895
  exit(error);
1896
  return 0;
1897
}
1898
#endif