~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2000 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Cashing of files with only does (sequential) read or writes of fixed-
18
  length records. A read isn't allowed to go over file-length. A read is ok
19
  if it ends at file-length and next read can try to read after file-length
20
  (and get a EOF-error).
21
  Possibly use of asyncronic io.
22
  macros for read and writes for faster io.
23
  Used instead of FILE when reading or writing whole files.
24
  This code makes mf_rec_cache obsolete (currently only used by ISAM)
25
  One can change info->pos_in_file to a higher value to skip bytes in file if
26
  also info->read_pos is set to info->read_end.
27
  If called through open_cached_file(), then the temporary file will
28
  only be created if a write exeeds the file buffer or if one calls
29
  my_b_flush_io_cache().
30
31
  If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32
  reading and another for writing.  Reads are first done from disk and
33
  then done from the write buffer.  This is an efficient way to read
34
  from a log file when one is writing to it at the same time.
35
  For this to work, the file has to be opened in append mode!
36
  Note that when one uses SEQ_READ_APPEND, one MUST write using
37
  my_b_append !  This is needed because we need to lock the mutex
38
  every time we access the write buffer.
39
40
TODO:
41
  When one SEQ_READ_APPEND and we are reading and writing at the same time,
42
  each time the write buffer gets full and it's written to disk, we will
43
  always do a disk read to read a part of the buffer from disk to the
44
  read buffer.
45
  This should be fixed so that when we do a my_b_flush_io_cache() and
46
  we have been reading the write buffer, we should transfer the rest of the
47
  write buffer to the read buffer before we start to reuse it.
48
*/
49
50
#define MAP_TO_USE_RAID
51
#include "mysys_priv.h"
212.5.18 by Monty Taylor
Moved m_ctype, m_string and my_bitmap. Removed t_ctype.
52
#include <mystrings/m_string.h>
1 by brian
clean slate
53
#ifdef HAVE_AIOWAIT
54
#include "mysys_err.h"
55
static void my_aiowait(my_aio_result *result);
56
#endif
57
#include <errno.h>
58
59
#define lock_append_buffer(info) \
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
61
#define unlock_append_buffer(info) \
62
 pthread_mutex_unlock(&(info)->append_buffer_lock)
63
64
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
65
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
66
67
/*
68
  Setup internal pointers inside IO_CACHE
69
70
  SYNOPSIS
71
    setup_io_cache()
72
    info		IO_CACHE handler
73
74
  NOTES
75
    This is called on automaticly on init or reinit of IO_CACHE
76
    It must be called externally if one moves or copies an IO_CACHE
77
    object.
78
*/
79
80
void setup_io_cache(IO_CACHE* info)
81
{
82
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
83
  if (info->type == WRITE_CACHE)
84
  {
85
    info->current_pos= &info->write_pos;
86
    info->current_end= &info->write_end;
87
  }
88
  else
89
  {
90
    info->current_pos= &info->read_pos;
91
    info->current_end= &info->read_end;
92
  }
93
}
94
95
96
static void
97
init_functions(IO_CACHE* info)
98
{
99
  enum cache_type type= info->type;
100
  switch (type) {
101
  case READ_NET:
102
    /*
103
      Must be initialized by the caller. The problem is that
104
      _my_b_net_read has to be defined in sql directory because of
105
      the dependency on THD, and therefore cannot be visible to
106
      programs that link against mysys but know nothing about THD, such
107
      as myisamchk
108
    */
109
    break;
110
  case SEQ_READ_APPEND:
111
    info->read_function = _my_b_seq_read;
112
    info->write_function = 0;			/* Force a core if used */
113
    break;
114
  default:
115
    info->read_function =
116
                          info->share ? _my_b_read_r :
117
                                        _my_b_read;
118
    info->write_function = _my_b_write;
119
  }
120
121
  setup_io_cache(info);
122
}
123
124
125
/*
126
  Initialize an IO_CACHE object
127
128
  SYNOPSOS
129
    init_io_cache()
130
    info		cache handler to initialize
131
    file		File that should be associated to to the handler
132
			If == -1 then real_open_cached_file()
133
			will be called when it's time to open file.
134
    cachesize		Size of buffer to allocate for read/write
135
			If == 0 then use my_default_record_cache_size
136
    type		Type of cache
137
    seek_offset		Where cache should start reading/writing
138
    use_async_io	Set to 1 of we should use async_io (if avaiable)
139
    cache_myflags	Bitmap of differnt flags
140
			MY_WME | MY_FAE | MY_NABP | MY_FNABP |
141
			MY_DONT_CHECK_FILESIZE
142
143
  RETURN
144
    0  ok
145
    #  error
146
*/
147
148
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
149
		  enum cache_type type, my_off_t seek_offset,
154 by Brian Aker
Removed oddball types in my_global.h
150
		  bool use_async_io, myf cache_myflags)
1 by brian
clean slate
151
{
152
  size_t min_cache;
153
  my_off_t pos;
154
  my_off_t end_of_file= ~(my_off_t) 0;
155
156
  info->file= file;
157
  info->type= TYPE_NOT_SET;	    /* Don't set it until mutex are created */
158
  info->pos_in_file= seek_offset;
159
  info->pre_close = info->pre_read = info->post_read = 0;
160
  info->arg = 0;
161
  info->alloced_buffer = 0;
162
  info->buffer=0;
163
  info->seek_not_done= 0;
164
165
  if (file >= 0)
166
  {
167
    pos= my_tell(file, MYF(0));
168
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
169
    {
170
      /*
171
         This kind of object doesn't support seek() or tell(). Don't set a
172
         flag that will make us again try to seek() later and fail.
173
      */
174
      info->seek_not_done= 0;
175
      /*
176
        Additionally, if we're supposed to start somewhere other than the
177
        the beginning of whatever this file is, then somebody made a bad
178
        assumption.
179
      */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
180
      assert(seek_offset == 0);
1 by brian
clean slate
181
    }
182
    else
183
      info->seek_not_done= test(seek_offset != pos);
184
  }
185
186
  info->disk_writes= 0;
187
  info->share=0;
188
189
  if (!cachesize && !(cachesize= my_default_record_cache_size))
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
190
    return(1);				/* No cache requested */
1 by brian
clean slate
191
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
192
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
193
  {						/* Assume file isn't growing */
194
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
195
    {
196
      /* Calculate end of file to avoid allocating oversized buffers */
197
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
198
      /* Need to reset seek_not_done now that we just did a seek. */
199
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200
      if (end_of_file < seek_offset)
201
	end_of_file=seek_offset;
202
      /* Trim cache size if the file is very small */
203
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
204
      {
205
	cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
206
	use_async_io=0;				/* No need to use async */
207
      }
208
    }
209
  }
210
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
211
  if (type != READ_NET && type != WRITE_NET)
212
  {
213
    /* Retry allocating memory in smaller blocks until we get one */
214
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
215
    for (;;)
216
    {
217
      size_t buffer_block;
218
      if (cachesize < min_cache)
219
	cachesize = min_cache;
220
      buffer_block= cachesize;
221
      if (type == SEQ_READ_APPEND)
222
	buffer_block *= 2;
223
      if ((info->buffer=
224
	   (uchar*) my_malloc(buffer_block,
225
			     MYF((cache_myflags & ~ MY_WME) |
226
				 (cachesize == min_cache ? MY_WME : 0)))) != 0)
227
      {
228
	info->write_buffer=info->buffer;
229
	if (type == SEQ_READ_APPEND)
230
	  info->write_buffer = info->buffer + cachesize;
231
	info->alloced_buffer=1;
232
	break;					/* Enough memory found */
233
      }
234
      if (cachesize == min_cache)
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
235
	return(2);				/* Can't alloc cache */
1 by brian
clean slate
236
      /* Try with less memory */
237
      cachesize= (cachesize*3/4 & ~(min_cache-1));
238
    }
239
  }
240
241
  info->read_length=info->buffer_length=cachesize;
242
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
243
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
244
  if (type == SEQ_READ_APPEND)
245
  {
246
    info->append_read_pos = info->write_pos = info->write_buffer;
247
    info->write_end = info->write_buffer + info->buffer_length;
248
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
249
  }
28.1.35 by Monty Taylor
Removed all references to THREAD.
250
#if defined(SAFE_MUTEX)
1 by brian
clean slate
251
  else
252
  {
253
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
212.6.1 by Mats Kindahl
Replacing all bzero() calls with memset() calls and removing the bzero.c file.
254
    memset((char*) &info->append_buffer_lock, 0, sizeof(info));
1 by brian
clean slate
255
  }
256
#endif
257
258
  if (type == WRITE_CACHE)
259
    info->write_end=
260
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
261
  else
262
    info->read_end=info->buffer;		/* Nothing in cache */
263
264
  /* End_of_file may be changed by user later */
265
  info->end_of_file= end_of_file;
266
  info->error=0;
267
  info->type= type;
268
  init_functions(info);
269
#ifdef HAVE_AIOWAIT
270
  if (use_async_io && ! my_disable_async_io)
271
  {
272
    info->read_length/=2;
273
    info->read_function=_my_b_async_read;
274
  }
275
  info->inited=info->aio_result.pending=0;
276
#endif
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
277
  return(0);
1 by brian
clean slate
278
}						/* init_io_cache */
279
280
	/* Wait until current request is ready */
281
282
#ifdef HAVE_AIOWAIT
283
static void my_aiowait(my_aio_result *result)
284
{
285
  if (result->pending)
286
  {
287
    struct aio_result_t *tmp;
288
    for (;;)
289
    {
290
      if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
291
      {
292
	if (errno == EINTR)
293
	  continue;
294
	result->pending=0;			/* Assume everythings is ok */
295
	break;
296
      }
297
      ((my_aio_result*) tmp)->pending=0;
298
      if ((my_aio_result*) tmp == result)
299
	break;
300
    }
301
  }
302
  return;
303
}
304
#endif
305
306
307
/*
308
  Use this to reset cache to re-start reading or to change the type
309
  between READ_CACHE <-> WRITE_CACHE
310
  If we are doing a reinit of a cache where we have the start of the file
311
  in the cache, we are reusing this memory without flushing it to disk.
312
*/
313
146 by Brian Aker
my_bool cleanup.
314
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
1 by brian
clean slate
315
			my_off_t seek_offset,
154 by Brian Aker
Removed oddball types in my_global.h
316
			bool use_async_io __attribute__((unused)),
317
			bool clear_cache)
1 by brian
clean slate
318
{
319
  /* One can't do reinit with the following types */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
320
  assert(type != READ_NET && info->type != READ_NET &&
1 by brian
clean slate
321
	      type != WRITE_NET && info->type != WRITE_NET &&
322
	      type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
323
324
  /* If the whole file is in memory, avoid flushing to disk */
325
  if (! clear_cache &&
326
      seek_offset >= info->pos_in_file &&
327
      seek_offset <= my_b_tell(info))
328
  {
329
    /* Reuse current buffer without flushing it to disk */
330
    uchar *pos;
331
    if (info->type == WRITE_CACHE && type == READ_CACHE)
332
    {
333
      info->read_end=info->write_pos;
334
      info->end_of_file=my_b_tell(info);
335
      /*
336
        Trigger a new seek only if we have a valid
337
        file handle.
338
      */
339
      info->seek_not_done= (info->file != -1);
340
    }
341
    else if (type == WRITE_CACHE)
342
    {
343
      if (info->type == READ_CACHE)
344
      {
345
	info->write_end=info->write_buffer+info->buffer_length;
346
	info->seek_not_done=1;
347
      }
348
      info->end_of_file = ~(my_off_t) 0;
349
    }
350
    pos=info->request_pos+(seek_offset-info->pos_in_file);
351
    if (type == WRITE_CACHE)
352
      info->write_pos=pos;
353
    else
354
      info->read_pos= pos;
355
#ifdef HAVE_AIOWAIT
356
    my_aiowait(&info->aio_result);		/* Wait for outstanding req */
357
#endif
358
  }
359
  else
360
  {
361
    /*
362
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
363
      after the current positions should be ignored
364
    */
365
    if (info->type == WRITE_CACHE && type == READ_CACHE)
366
      info->end_of_file=my_b_tell(info);
367
    /* flush cache if we want to reuse it */
368
    if (!clear_cache && my_b_flush_io_cache(info,1))
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
369
      return(1);
1 by brian
clean slate
370
    info->pos_in_file=seek_offset;
371
    /* Better to do always do a seek */
372
    info->seek_not_done=1;
373
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
374
    if (type == READ_CACHE)
375
    {
376
      info->read_end=info->buffer;		/* Nothing in cache */
377
    }
378
    else
379
    {
380
      info->write_end=(info->buffer + info->buffer_length -
381
		       (seek_offset & (IO_SIZE-1)));
382
      info->end_of_file= ~(my_off_t) 0;
383
    }
384
  }
385
  info->type=type;
386
  info->error=0;
387
  init_functions(info);
388
389
#ifdef HAVE_AIOWAIT
390
  if (use_async_io && ! my_disable_async_io &&
391
      ((ulong) info->buffer_length <
392
       (ulong) (info->end_of_file - seek_offset)))
393
  {
394
    info->read_length=info->buffer_length/2;
395
    info->read_function=_my_b_async_read;
396
  }
397
  info->inited=0;
398
#endif
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
399
  return(0);
1 by brian
clean slate
400
} /* reinit_io_cache */
401
402
403
404
/*
405
  Read buffered.
406
407
  SYNOPSIS
408
    _my_b_read()
409
      info                      IO_CACHE pointer
410
      Buffer                    Buffer to retrieve count bytes from file
411
      Count                     Number of bytes to read into Buffer
412
413
  NOTE
414
    This function is only called from the my_b_read() macro when there
415
    isn't enough characters in the buffer to satisfy the request.
416
417
  WARNING
418
419
    When changing this function, be careful with handling file offsets
420
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
421
    types than my_off_t unless you can be sure that their value fits.
422
    Same applies to differences of file offsets.
423
424
    When changing this function, check _my_b_read_r(). It might need the
425
    same change.
426
427
  RETURN
428
    0      we succeeded in reading all data
429
    1      Error: can't read requested characters
430
*/
431
432
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
433
{
434
  size_t length,diff_length,left_length, max_length;
435
  my_off_t pos_in_file;
436
437
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
438
  {
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
439
    assert(Count >= left_length);	/* User is not using my_b_read() */
1 by brian
clean slate
440
    memcpy(Buffer,info->read_pos, left_length);
441
    Buffer+=left_length;
442
    Count-=left_length;
443
  }
444
445
  /* pos_in_file always point on where info->buffer was read */
446
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
447
448
  /* 
449
    Whenever a function which operates on IO_CACHE flushes/writes
450
    some part of the IO_CACHE to disk it will set the property
451
    "seek_not_done" to indicate this to other functions operating
452
    on the IO_CACHE.
453
  */
454
  if (info->seek_not_done)
455
  {
456
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
457
        != MY_FILEPOS_ERROR))
458
    {
459
      /* No error, reset seek_not_done flag. */
460
      info->seek_not_done= 0;
461
    }
462
    else
463
    {
464
      /*
465
        If the seek failed and the error number is ESPIPE, it is because
466
        info->file is a pipe or socket or FIFO.  We never should have tried
467
        to seek on that.  See Bugs#25807 and #22828 for more info.
468
      */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
469
      assert(my_errno != ESPIPE);
1 by brian
clean slate
470
      info->error= -1;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
471
      return(1);
1 by brian
clean slate
472
    }
473
  }
474
475
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
476
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
477
  {					/* Fill first intern buffer */
478
    size_t read_length;
479
    if (info->end_of_file <= pos_in_file)
480
    {					/* End of file */
481
      info->error= (int) left_length;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
482
      return(1);
1 by brian
clean slate
483
    }
484
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
485
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
486
	!= length)
487
    {
488
      info->error= (read_length == (size_t) -1 ? -1 :
489
		    (int) (read_length+left_length));
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
490
      return(1);
1 by brian
clean slate
491
    }
492
    Count-=length;
493
    Buffer+=length;
494
    pos_in_file+=length;
495
    left_length+=length;
496
    diff_length=0;
497
  }
498
499
  max_length= info->read_length-diff_length;
500
  if (info->type != READ_FIFO &&
501
      max_length > (info->end_of_file - pos_in_file))
502
    max_length= (size_t) (info->end_of_file - pos_in_file);
503
  if (!max_length)
504
  {
505
    if (Count)
506
    {
507
      info->error= left_length;		/* We only got this many char */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
508
      return(1);
1 by brian
clean slate
509
    }
510
    length=0;				/* Didn't read any chars */
511
  }
512
  else if ((length= my_read(info->file,info->buffer, max_length,
513
                            info->myflags)) < Count ||
514
	   length == (size_t) -1)
515
  {
516
    if (length != (size_t) -1)
517
      memcpy(Buffer, info->buffer, length);
518
    info->pos_in_file= pos_in_file;
519
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
520
    info->read_pos=info->read_end=info->buffer;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
521
    return(1);
1 by brian
clean slate
522
  }
523
  info->read_pos=info->buffer+Count;
524
  info->read_end=info->buffer+length;
525
  info->pos_in_file=pos_in_file;
526
  memcpy(Buffer, info->buffer, Count);
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
527
  return(0);
1 by brian
clean slate
528
}
529
530
531
/*
532
  Prepare IO_CACHE for shared use.
533
534
  SYNOPSIS
535
    init_io_cache_share()
536
      read_cache                A read cache. This will be copied for
537
                                every thread after setup.
538
      cshare                    The share.
539
      write_cache               If non-NULL a write cache that is to be
540
                                synchronized with the read caches.
541
      num_threads               Number of threads sharing the cache
542
                                including the write thread if any.
543
544
  DESCRIPTION
545
546
    The shared cache is used so: One IO_CACHE is initialized with
547
    init_io_cache(). This includes the allocation of a buffer. Then a
548
    share is allocated and init_io_cache_share() is called with the io
549
    cache and the share. Then the io cache is copied for each thread. So
550
    every thread has its own copy of IO_CACHE. But the allocated buffer
551
    is shared because cache->buffer is the same for all caches.
552
553
    One thread reads data from the file into the buffer. All threads
554
    read from the buffer, but every thread maintains its own set of
555
    pointers into the buffer. When all threads have used up the buffer
556
    contents, one of the threads reads the next block of data into the
557
    buffer. To accomplish this, each thread enters the cache lock before
558
    accessing the buffer. They wait in lock_io_cache() until all threads
559
    joined the lock. The last thread entering the lock is in charge of
560
    reading from file to buffer. It wakes all threads when done.
561
562
    Synchronizing a write cache to the read caches works so: Whenever
563
    the write buffer needs a flush, the write thread enters the lock and
564
    waits for all other threads to enter the lock too. They do this when
565
    they have used up the read buffer. When all threads are in the lock,
566
    the write thread copies the write buffer to the read buffer and
567
    wakes all threads.
568
569
    share->running_threads is the number of threads not being in the
570
    cache lock. When entering lock_io_cache() the number is decreased.
571
    When the thread that fills the buffer enters unlock_io_cache() the
572
    number is reset to the number of threads. The condition
573
    running_threads == 0 means that all threads are in the lock. Bumping
574
    up the number to the full count is non-intuitive. But increasing the
575
    number by one for each thread that leaves the lock could lead to a
576
    solo run of one thread. The last thread to join a lock reads from
577
    file to buffer, wakes the other threads, processes the data in the
578
    cache and enters the lock again. If no other thread left the lock
579
    meanwhile, it would think it's the last one again and read the next
580
    block...
581
582
    The share has copies of 'error', 'buffer', 'read_end', and
583
    'pos_in_file' from the thread that filled the buffer. We may not be
584
    able to access this information directly from its cache because the
585
    thread may be removed from the share before the variables could be
586
    copied by all other threads. Or, if a write buffer is synchronized,
587
    it would change its 'pos_in_file' after waking the other threads,
588
    possibly before they could copy its value.
589
590
    However, the 'buffer' variable in the share is for a synchronized
591
    write cache. It needs to know where to put the data. Otherwise it
592
    would need access to the read cache of one of the threads that is
593
    not yet removed from the share.
594
595
  RETURN
596
    void
597
*/
598
599
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
600
                         IO_CACHE *write_cache, uint num_threads)
601
{
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
602
  assert(num_threads > 1);
603
  assert(read_cache->type == READ_CACHE);
604
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
1 by brian
clean slate
605
606
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
607
  pthread_cond_init(&cshare->cond, 0);
608
  pthread_cond_init(&cshare->cond_writer, 0);
609
610
  cshare->running_threads= num_threads;
611
  cshare->total_threads=   num_threads;
612
  cshare->error=           0;    /* Initialize. */
613
  cshare->buffer=          read_cache->buffer;
614
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
615
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
616
  cshare->source_cache=    write_cache; /* Can be NULL. */
617
618
  read_cache->share=         cshare;
619
  read_cache->read_function= _my_b_read_r;
620
  read_cache->current_pos=   NULL;
621
  read_cache->current_end=   NULL;
622
623
  if (write_cache)
624
    write_cache->share= cshare;
625
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
626
  return;
1 by brian
clean slate
627
}
628
629
630
/*
631
  Remove a thread from shared access to IO_CACHE.
632
633
  SYNOPSIS
634
    remove_io_thread()
635
      cache                     The IO_CACHE to be removed from the share.
636
637
  NOTE
638
639
    Every thread must do that on exit for not to deadlock other threads.
640
641
    The last thread destroys the pthread resources.
642
643
    A writer flushes its cache first.
644
645
  RETURN
646
    void
647
*/
648
649
void remove_io_thread(IO_CACHE *cache)
650
{
651
  IO_CACHE_SHARE *cshare= cache->share;
652
  uint total;
653
654
  /* If the writer goes, it needs to flush the write cache. */
655
  if (cache == cshare->source_cache)
656
    flush_io_cache(cache);
657
658
  pthread_mutex_lock(&cshare->mutex);
659
660
  /* Remove from share. */
661
  total= --cshare->total_threads;
662
663
  /* Detach from share. */
664
  cache->share= NULL;
665
666
  /* If the writer goes, let the readers know. */
667
  if (cache == cshare->source_cache)
668
  {
669
    cshare->source_cache= NULL;
670
  }
671
672
  /* If all threads are waiting for me to join the lock, wake them. */
673
  if (!--cshare->running_threads)
674
  {
675
    pthread_cond_signal(&cshare->cond_writer);
676
    pthread_cond_broadcast(&cshare->cond);
677
  }
678
679
  pthread_mutex_unlock(&cshare->mutex);
680
681
  if (!total)
682
  {
683
    pthread_cond_destroy (&cshare->cond_writer);
684
    pthread_cond_destroy (&cshare->cond);
685
    pthread_mutex_destroy(&cshare->mutex);
686
  }
687
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
688
  return;
1 by brian
clean slate
689
}
690
691
692
/*
693
  Lock IO cache and wait for all other threads to join.
694
695
  SYNOPSIS
696
    lock_io_cache()
697
      cache                     The cache of the thread entering the lock.
698
      pos                       File position of the block to read.
699
                                Unused for the write thread.
700
701
  DESCRIPTION
702
703
    Wait for all threads to finish with the current buffer. We want
704
    all threads to proceed in concert. The last thread to join
705
    lock_io_cache() will read the block from file and all threads start
706
    to use it. Then they will join again for reading the next block.
707
708
    The waiting threads detect a fresh buffer by comparing
709
    cshare->pos_in_file with the position they want to process next.
710
    Since the first block may start at position 0, we take
711
    cshare->read_end as an additional condition. This variable is
712
    initialized to NULL and will be set after a block of data is written
713
    to the buffer.
714
715
  RETURN
716
    1           OK, lock in place, go ahead and read.
717
    0           OK, unlocked, another thread did the read.
718
*/
719
720
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
721
{
722
  IO_CACHE_SHARE *cshare= cache->share;
723
724
  /* Enter the lock. */
725
  pthread_mutex_lock(&cshare->mutex);
726
  cshare->running_threads--;
727
728
  if (cshare->source_cache)
729
  {
730
    /* A write cache is synchronized to the read caches. */
731
732
    if (cache == cshare->source_cache)
733
    {
734
      /* The writer waits until all readers are here. */
735
      while (cshare->running_threads)
736
      {
737
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
738
      }
739
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
740
      return(1);
1 by brian
clean slate
741
    }
742
743
    /* The last thread wakes the writer. */
744
    if (!cshare->running_threads)
745
    {
746
      pthread_cond_signal(&cshare->cond_writer);
747
    }
748
749
    /*
750
      Readers wait until the data is copied from the writer. Another
751
      reason to stop waiting is the removal of the write thread. If this
752
      happens, we leave the lock with old data in the buffer.
753
    */
754
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
755
           cshare->source_cache)
756
    {
757
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
758
    }
759
760
    /*
761
      If the writer was removed from the share while this thread was
762
      asleep, we need to simulate an EOF condition. The writer cannot
763
      reset the share variables as they might still be in use by readers
764
      of the last block. When we awake here then because the last
765
      joining thread signalled us. If the writer is not the last, it
766
      will not signal. So it is safe to clear the buffer here.
767
    */
768
    if (!cshare->read_end || (cshare->pos_in_file < pos))
769
    {
770
      cshare->read_end= cshare->buffer; /* Empty buffer. */
771
      cshare->error= 0; /* EOF is not an error. */
772
    }
773
  }
774
  else
775
  {
776
    /*
777
      There are read caches only. The last thread arriving in
778
      lock_io_cache() continues with a locked cache and reads the block.
779
    */
780
    if (!cshare->running_threads)
781
    {
782
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
783
      return(1);
1 by brian
clean slate
784
    }
785
786
    /*
787
      All other threads wait until the requested block is read by the
788
      last thread arriving. Another reason to stop waiting is the
789
      removal of a thread. If this leads to all threads being in the
790
      lock, we have to continue also. The first of the awaken threads
791
      will then do the read.
792
    */
793
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
794
           cshare->running_threads)
795
    {
796
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
797
    }
798
799
    /* If the block is not yet read, continue with a locked cache and read. */
800
    if (!cshare->read_end || (cshare->pos_in_file < pos))
801
    {
802
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
803
      return(1);
1 by brian
clean slate
804
    }
805
806
    /* Another thread did read the block already. */
807
  }
808
809
  /*
810
    Leave the lock. Do not call unlock_io_cache() later. The thread that
811
    filled the buffer did this and marked all threads as running.
812
  */
813
  pthread_mutex_unlock(&cshare->mutex);
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
814
  return(0);
1 by brian
clean slate
815
}
816
817
818
/*
819
  Unlock IO cache.
820
821
  SYNOPSIS
822
    unlock_io_cache()
823
      cache                     The cache of the thread leaving the lock.
824
825
  NOTE
826
    This is called by the thread that filled the buffer. It marks all
827
    threads as running and awakes them. This must not be done by any
828
    other thread.
829
830
    Do not signal cond_writer. Either there is no writer or the writer
831
    is the only one who can call this function.
832
833
    The reason for resetting running_threads to total_threads before
834
    waking all other threads is that it could be possible that this
835
    thread is so fast with processing the buffer that it enters the lock
836
    before even one other thread has left it. If every awoken thread
837
    would increase running_threads by one, this thread could think that
838
    he is again the last to join and would not wait for the other
839
    threads to process the data.
840
841
  RETURN
842
    void
843
*/
844
845
static void unlock_io_cache(IO_CACHE *cache)
846
{
847
  IO_CACHE_SHARE *cshare= cache->share;
848
849
  cshare->running_threads= cshare->total_threads;
850
  pthread_cond_broadcast(&cshare->cond);
851
  pthread_mutex_unlock(&cshare->mutex);
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
852
  return;
1 by brian
clean slate
853
}
854
855
856
/*
857
  Read from IO_CACHE when it is shared between several threads.
858
859
  SYNOPSIS
860
    _my_b_read_r()
861
      cache                     IO_CACHE pointer
862
      Buffer                    Buffer to retrieve count bytes from file
863
      Count                     Number of bytes to read into Buffer
864
865
  NOTE
866
    This function is only called from the my_b_read() macro when there
867
    isn't enough characters in the buffer to satisfy the request.
868
869
  IMPLEMENTATION
870
871
    It works as follows: when a thread tries to read from a file (that
872
    is, after using all the data from the (shared) buffer), it just
873
    hangs on lock_io_cache(), waiting for other threads. When the very
874
    last thread attempts a read, lock_io_cache() returns 1, the thread
875
    does actual IO and unlock_io_cache(), which signals all the waiting
876
    threads that data is in the buffer.
877
878
  WARNING
879
880
    When changing this function, be careful with handling file offsets
881
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
882
    types than my_off_t unless you can be sure that their value fits.
883
    Same applies to differences of file offsets. (Bug #11527)
884
885
    When changing this function, check _my_b_read(). It might need the
886
    same change.
887
888
  RETURN
889
    0      we succeeded in reading all data
890
    1      Error: can't read requested characters
891
*/
892
893
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
894
{
895
  my_off_t pos_in_file;
896
  size_t length, diff_length, left_length;
897
  IO_CACHE_SHARE *cshare= cache->share;
898
899
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
900
  {
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
901
    assert(Count >= left_length);	/* User is not using my_b_read() */
1 by brian
clean slate
902
    memcpy(Buffer, cache->read_pos, left_length);
903
    Buffer+= left_length;
904
    Count-= left_length;
905
  }
906
  while (Count)
907
  {
908
    size_t cnt, len;
909
910
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
911
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
912
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
913
    length= ((length <= cache->read_length) ?
914
             length + IO_ROUND_DN(cache->read_length - length) :
915
             length - IO_ROUND_UP(length - cache->read_length));
916
    if (cache->type != READ_FIFO &&
917
	(length > (cache->end_of_file - pos_in_file)))
918
      length= (size_t) (cache->end_of_file - pos_in_file);
919
    if (length == 0)
920
    {
921
      cache->error= (int) left_length;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
922
      return(1);
1 by brian
clean slate
923
    }
924
    if (lock_io_cache(cache, pos_in_file))
925
    {
926
      /* With a synchronized write/read cache we won't come here... */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
927
      assert(!cshare->source_cache);
1 by brian
clean slate
928
      /*
929
        ... unless the writer has gone before this thread entered the
930
        lock. Simulate EOF in this case. It can be distinguished by
931
        cache->file.
932
      */
933
      if (cache->file < 0)
934
        len= 0;
935
      else
936
      {
937
        /*
938
          Whenever a function which operates on IO_CACHE flushes/writes
939
          some part of the IO_CACHE to disk it will set the property
940
          "seek_not_done" to indicate this to other functions operating
941
          on the IO_CACHE.
942
        */
943
        if (cache->seek_not_done)
944
        {
945
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
946
              == MY_FILEPOS_ERROR)
947
          {
948
            cache->error= -1;
949
            unlock_io_cache(cache);
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
950
            return(1);
1 by brian
clean slate
951
          }
952
        }
953
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
954
      }
955
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
956
      cache->error=       (len == length ? 0 : (int) len);
957
      cache->pos_in_file= pos_in_file;
958
959
      /* Copy important values to the share. */
960
      cshare->error=       cache->error;
961
      cshare->read_end=    cache->read_end;
962
      cshare->pos_in_file= pos_in_file;
963
964
      /* Mark all threads as running and wake them. */
965
      unlock_io_cache(cache);
966
    }
967
    else
968
    {
969
      /*
970
        With a synchronized write/read cache readers always come here.
971
        Copy important values from the share.
972
      */
973
      cache->error=       cshare->error;
974
      cache->read_end=    cshare->read_end;
975
      cache->pos_in_file= cshare->pos_in_file;
976
977
      len= ((cache->error == -1) ? (size_t) -1 :
978
            (size_t) (cache->read_end - cache->buffer));
979
    }
980
    cache->read_pos=      cache->buffer;
981
    cache->seek_not_done= 0;
982
    if (len == 0 || len == (size_t) -1)
983
    {
984
      cache->error= (int) left_length;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
985
      return(1);
1 by brian
clean slate
986
    }
987
    cnt= (len > Count) ? Count : len;
988
    memcpy(Buffer, cache->read_pos, cnt);
989
    Count -= cnt;
990
    Buffer+= cnt;
991
    left_length+= cnt;
992
    cache->read_pos+= cnt;
993
  }
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
994
  return(0);
1 by brian
clean slate
995
}
996
997
998
/*
999
  Copy data from write cache to read cache.
1000
1001
  SYNOPSIS
1002
    copy_to_read_buffer()
1003
      write_cache               The write cache.
1004
      write_buffer              The source of data, mostly the cache buffer.
1005
      write_length              The number of bytes to copy.
1006
1007
  NOTE
1008
    The write thread will wait for all read threads to join the cache
1009
    lock. Then it copies the data over and wakes the read threads.
1010
1011
  RETURN
1012
    void
1013
*/
1014
1015
static void copy_to_read_buffer(IO_CACHE *write_cache,
1016
                                const uchar *write_buffer, size_t write_length)
1017
{
1018
  IO_CACHE_SHARE *cshare= write_cache->share;
1019
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1020
  assert(cshare->source_cache == write_cache);
1 by brian
clean slate
1021
  /*
1022
    write_length is usually less or equal to buffer_length.
1023
    It can be bigger if _my_b_write() is called with a big length.
1024
  */
1025
  while (write_length)
1026
  {
1027
    size_t copy_length= min(write_length, write_cache->buffer_length);
1028
    int  __attribute__((unused)) rc;
1029
1030
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1031
    /* The writing thread does always have the lock when it awakes. */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1032
    assert(rc);
1 by brian
clean slate
1033
1034
    memcpy(cshare->buffer, write_buffer, copy_length);
1035
1036
    cshare->error=       0;
1037
    cshare->read_end=    cshare->buffer + copy_length;
1038
    cshare->pos_in_file= write_cache->pos_in_file;
1039
1040
    /* Mark all threads as running and wake them. */
1041
    unlock_io_cache(write_cache);
1042
1043
    write_buffer+= copy_length;
1044
    write_length-= copy_length;
1045
  }
1046
}
1047
1048
1049
/*
1050
  Do sequential read from the SEQ_READ_APPEND cache.
1051
  
1052
  We do this in three stages:
1053
   - first read from info->buffer
1054
   - then if there are still data to read, try the file descriptor
1055
   - afterwards, if there are still data to read, try append buffer
1056
1057
  RETURNS
1058
    0  Success
1059
    1  Failed to read
1060
*/
1061
1062
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1063
{
1064
  size_t length, diff_length, left_length, save_count, max_length;
1065
  my_off_t pos_in_file;
1066
  save_count=Count;
1067
1068
  /* first, read the regular buffer */
1069
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1070
  {
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1071
    assert(Count > left_length);	/* User is not using my_b_read() */
1 by brian
clean slate
1072
    memcpy(Buffer,info->read_pos, left_length);
1073
    Buffer+=left_length;
1074
    Count-=left_length;
1075
  }
1076
  lock_append_buffer(info);
1077
1078
  /* pos_in_file always point on where info->buffer was read */
1079
  if ((pos_in_file=info->pos_in_file +
1080
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1081
    goto read_append_buffer;
1082
1083
  /*
1084
    With read-append cache we must always do a seek before we read,
1085
    because the write could have moved the file pointer astray
1086
  */
1087
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1088
  {
1089
   info->error= -1;
1090
   unlock_append_buffer(info);
1091
   return (1);
1092
  }
1093
  info->seek_not_done=0;
1094
1095
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1096
1097
  /* now the second stage begins - read from file descriptor */
1098
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1099
  {
1100
    /* Fill first intern buffer */
1101
    size_t read_length;
1102
1103
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1104
    if ((read_length= my_read(info->file,Buffer, length,
1105
                              info->myflags)) == (size_t) -1)
1106
    {
1107
      info->error= -1;
1108
      unlock_append_buffer(info);
1109
      return 1;
1110
    }
1111
    Count-=read_length;
1112
    Buffer+=read_length;
1113
    pos_in_file+=read_length;
1114
1115
    if (read_length != length)
1116
    {
1117
      /*
1118
	We only got part of data;  Read the rest of the data from the
1119
	write buffer
1120
      */
1121
      goto read_append_buffer;
1122
    }
1123
    left_length+=length;
1124
    diff_length=0;
1125
  }
1126
1127
  max_length= info->read_length-diff_length;
1128
  if (max_length > (info->end_of_file - pos_in_file))
1129
    max_length= (size_t) (info->end_of_file - pos_in_file);
1130
  if (!max_length)
1131
  {
1132
    if (Count)
1133
      goto read_append_buffer;
1134
    length=0;				/* Didn't read any more chars */
1135
  }
1136
  else
1137
  {
1138
    length= my_read(info->file,info->buffer, max_length, info->myflags);
1139
    if (length == (size_t) -1)
1140
    {
1141
      info->error= -1;
1142
      unlock_append_buffer(info);
1143
      return 1;
1144
    }
1145
    if (length < Count)
1146
    {
1147
      memcpy(Buffer, info->buffer, length);
1148
      Count -= length;
1149
      Buffer += length;
1150
1151
      /*
1152
	 added the line below to make
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1153
	 assert(pos_in_file==info->end_of_file) pass.
1 by brian
clean slate
1154
	 otherwise this does not appear to be needed
1155
      */
1156
      pos_in_file += length;
1157
      goto read_append_buffer;
1158
    }
1159
  }
1160
  unlock_append_buffer(info);
1161
  info->read_pos=info->buffer+Count;
1162
  info->read_end=info->buffer+length;
1163
  info->pos_in_file=pos_in_file;
1164
  memcpy(Buffer,info->buffer,(size_t) Count);
1165
  return 0;
1166
1167
read_append_buffer:
1168
1169
  /*
1170
     Read data from the current write buffer.
1171
     Count should never be == 0 here (The code will work even if count is 0)
1172
  */
1173
1174
  {
1175
    /* First copy the data to Count */
1176
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1177
    size_t copy_len;
1178
    size_t transfer_len;
1179
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1180
    assert(info->append_read_pos <= info->write_pos);
1 by brian
clean slate
1181
    /*
1182
      TODO: figure out if the assert below is needed or correct.
1183
    */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1184
    assert(pos_in_file == info->end_of_file);
1 by brian
clean slate
1185
    copy_len=min(Count, len_in_buff);
1186
    memcpy(Buffer, info->append_read_pos, copy_len);
1187
    info->append_read_pos += copy_len;
1188
    Count -= copy_len;
1189
    if (Count)
1190
      info->error = save_count - Count;
1191
1192
    /* Fill read buffer with data from write buffer */
1193
    memcpy(info->buffer, info->append_read_pos,
1194
	   (size_t) (transfer_len=len_in_buff - copy_len));
1195
    info->read_pos= info->buffer;
1196
    info->read_end= info->buffer+transfer_len;
1197
    info->append_read_pos=info->write_pos;
1198
    info->pos_in_file=pos_in_file+copy_len;
1199
    info->end_of_file+=len_in_buff;
1200
  }
1201
  unlock_append_buffer(info);
1202
  return Count ? 1 : 0;
1203
}
1204
1205
1206
#ifdef HAVE_AIOWAIT
1207
1208
/*
1209
  Read from the IO_CACHE into a buffer and feed asynchronously
1210
  from disk when needed.
1211
1212
  SYNOPSIS
1213
    _my_b_async_read()
1214
      info                      IO_CACHE pointer
1215
      Buffer                    Buffer to retrieve count bytes from file
1216
      Count                     Number of bytes to read into Buffer
1217
1218
  RETURN VALUE
1219
    -1          An error has occurred; my_errno is set.
1220
     0          Success
1221
     1          An error has occurred; IO_CACHE to error state.
1222
*/
1223
1224
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1225
{
1226
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1227
  size_t max_length;
1228
  my_off_t next_pos_in_file;
1229
  uchar *read_buffer;
1230
1231
  memcpy(Buffer,info->read_pos,
1232
	 (left_length= (size_t) (info->read_end-info->read_pos)));
1233
  Buffer+=left_length;
1234
  org_Count=Count;
1235
  Count-=left_length;
1236
1237
  if (info->inited)
1238
  {						/* wait for read block */
1239
    info->inited=0;				/* No more block to read */
1240
    my_aiowait(&info->aio_result);		/* Wait for outstanding req */
1241
    if (info->aio_result.result.aio_errno)
1242
    {
1243
      if (info->myflags & MY_WME)
1244
	my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1245
		 my_filename(info->file),
1246
		 info->aio_result.result.aio_errno);
1247
      my_errno=info->aio_result.result.aio_errno;
1248
      info->error= -1;
1249
      return(1);
1250
    }
1251
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1252
	read_length == (size_t) -1)
1253
    {
1254
      my_errno=0;				/* For testing */
1255
      info->error= (read_length == (size_t) -1 ? -1 :
1256
		    (int) (read_length+left_length));
1257
      return(1);
1258
    }
1259
    info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1260
1261
    if (info->request_pos != info->buffer)
1262
      info->request_pos=info->buffer;
1263
    else
1264
      info->request_pos=info->buffer+info->read_length;
1265
    info->read_pos=info->request_pos;
1266
    next_pos_in_file=info->aio_read_pos+read_length;
1267
1268
	/* Check if pos_in_file is changed
1269
	   (_ni_read_cache may have skipped some bytes) */
1270
1271
    if (info->aio_read_pos < info->pos_in_file)
1272
    {						/* Fix if skipped bytes */
1273
      if (info->aio_read_pos + read_length < info->pos_in_file)
1274
      {
1275
	read_length=0;				/* Skip block */
1276
	next_pos_in_file=info->pos_in_file;
1277
      }
1278
      else
1279
      {
1280
	my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1281
	info->pos_in_file=info->aio_read_pos; /* Whe are here */
1282
	info->read_pos=info->request_pos+offset;
1283
	read_length-=offset;			/* Bytes left from read_pos */
1284
      }
1285
    }
1286
	/* Copy found bytes to buffer */
1287
    length=min(Count,read_length);
1288
    memcpy(Buffer,info->read_pos,(size_t) length);
1289
    Buffer+=length;
1290
    Count-=length;
1291
    left_length+=length;
1292
    info->read_end=info->rc_pos+read_length;
1293
    info->read_pos+=length;
1294
  }
1295
  else
1296
    next_pos_in_file=(info->pos_in_file+ (size_t)
1297
		      (info->read_end - info->request_pos));
1298
1299
	/* If reading large blocks, or first read or read with skip */
1300
  if (Count)
1301
  {
1302
    if (next_pos_in_file == info->end_of_file)
1303
    {
1304
      info->error=(int) (read_length+left_length);
1305
      return 1;
1306
    }
1307
    
1308
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1309
        == MY_FILEPOS_ERROR)
1310
    {
1311
      info->error= -1;
1312
      return (1);
1313
    }
1314
1315
    read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1316
    if (Count < read_length)
1317
    {					/* Small block, read to cache */
1318
      if ((read_length=my_read(info->file,info->request_pos,
1319
			       read_length, info->myflags)) == (size_t) -1)
1320
        return info->error= -1;
1321
      use_length=min(Count,read_length);
1322
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1323
      info->read_pos=info->request_pos+Count;
1324
      info->read_end=info->request_pos+read_length;
1325
      info->pos_in_file=next_pos_in_file;	/* Start of block in cache */
1326
      next_pos_in_file+=read_length;
1327
1328
      if (Count != use_length)
1329
      {					/* Didn't find hole block */
1330
	if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1331
	  my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1332
		   my_filename(info->file),my_errno);
1333
	info->error=(int) (read_length+left_length);
1334
	return 1;
1335
      }
1336
    }
1337
    else
1338
    {						/* Big block, don't cache it */
1339
      if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
1340
	  != Count)
1341
      {
1342
	info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1343
	return 1;
1344
      }
1345
      info->read_pos=info->read_end=info->request_pos;
1346
      info->pos_in_file=(next_pos_in_file+=Count);
1347
    }
1348
  }
1349
1350
  /* Read next block with asyncronic io */
1351
  diff_length=(next_pos_in_file & (IO_SIZE-1));
1352
  max_length= info->read_length - diff_length;
1353
  if (max_length > info->end_of_file - next_pos_in_file)
1354
    max_length= (size_t) (info->end_of_file - next_pos_in_file);
1355
1356
  if (info->request_pos != info->buffer)
1357
    read_buffer=info->buffer;
1358
  else
1359
    read_buffer=info->buffer+info->read_length;
1360
  info->aio_read_pos=next_pos_in_file;
1361
  if (max_length)
1362
  {
1363
    info->aio_result.result.aio_errno=AIO_INPROGRESS;	/* Marker for test */
1364
    if (aioread(info->file,read_buffer, max_length,
1365
		(my_off_t) next_pos_in_file,MY_SEEK_SET,
1366
		&info->aio_result.result))
1367
    {						/* Skip async io */
1368
      my_errno=errno;
1369
      if (info->request_pos != info->buffer)
1370
      {
212.6.3 by Mats Kindahl
Removing deprecated functions from code and replacing them with C99 equivalents:
1371
	memcpy(info->buffer, info->request_pos,
1372
               (size_t) (info->read_end - info->read_pos));
1 by brian
clean slate
1373
	info->request_pos=info->buffer;
1374
	info->read_pos-=info->read_length;
1375
	info->read_end-=info->read_length;
1376
      }
1377
      info->read_length=info->buffer_length;	/* Use hole buffer */
1378
      info->read_function=_my_b_read;		/* Use normal IO_READ next */
1379
    }
1380
    else
1381
      info->inited=info->aio_result.pending=1;
1382
  }
1383
  return 0;					/* Block read, async in use */
1384
} /* _my_b_async_read */
1385
#endif
1386
1387
1388
/* Read one byte when buffer is empty */
1389
1390
int _my_b_get(IO_CACHE *info)
1391
{
1392
  uchar buff;
1393
  IO_CACHE_CALLBACK pre_read,post_read;
1394
  if ((pre_read = info->pre_read))
1395
    (*pre_read)(info);
1396
  if ((*(info)->read_function)(info,&buff,1))
1397
    return my_b_EOF;
1398
  if ((post_read = info->post_read))
1399
    (*post_read)(info);
1400
  return (int) (uchar) buff;
1401
}
1402
1403
/* 
1404
   Write a byte buffer to IO_CACHE and flush to disk
1405
   if IO_CACHE is full.
1406
1407
   RETURN VALUE
1408
    1 On error on write
1409
    0 On success
1410
   -1 On error; my_errno contains error code.
1411
*/
1412
1413
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1414
{
1415
  size_t rest_length,length;
1416
1417
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1418
  {
1419
    my_errno=errno=EFBIG;
1420
    return info->error = -1;
1421
  }
1422
1423
  rest_length= (size_t) (info->write_end - info->write_pos);
1424
  memcpy(info->write_pos,Buffer,(size_t) rest_length);
1425
  Buffer+=rest_length;
1426
  Count-=rest_length;
1427
  info->write_pos+=rest_length;
1428
1429
  if (my_b_flush_io_cache(info,1))
1430
    return 1;
1431
  if (Count >= IO_SIZE)
1432
  {					/* Fill first intern buffer */
1433
    length=Count & (size_t) ~(IO_SIZE-1);
1434
    if (info->seek_not_done)
1435
    {
1436
      /*
1437
        Whenever a function which operates on IO_CACHE flushes/writes
1438
        some part of the IO_CACHE to disk it will set the property
1439
        "seek_not_done" to indicate this to other functions operating
1440
        on the IO_CACHE.
1441
      */
1442
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1443
      {
1444
        info->error= -1;
1445
        return (1);
1446
      }
1447
      info->seek_not_done=0;
1448
    }
1449
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1450
      return info->error= -1;
1451
1452
    /*
1453
      In case of a shared I/O cache with a writer we normally do direct
1454
      write cache to read cache copy. Simulate this here by direct
1455
      caller buffer to read cache copy. Do it after the write so that
1456
      the cache readers actions on the flushed part can go in parallel
1457
      with the write of the extra stuff. copy_to_read_buffer()
1458
      synchronizes writer and readers so that after this call the
1459
      readers can act on the extra stuff while the writer can go ahead
1460
      and prepare the next output. copy_to_read_buffer() relies on
1461
      info->pos_in_file.
1462
    */
1463
    if (info->share)
1464
      copy_to_read_buffer(info, Buffer, length);
1465
1466
    Count-=length;
1467
    Buffer+=length;
1468
    info->pos_in_file+=length;
1469
  }
1470
  memcpy(info->write_pos,Buffer,(size_t) Count);
1471
  info->write_pos+=Count;
1472
  return 0;
1473
}
1474
1475
1476
/*
1477
  Append a block to the write buffer.
1478
  This is done with the buffer locked to ensure that we don't read from
1479
  the write buffer before we are ready with it.
1480
*/
1481
1482
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1483
{
1484
  size_t rest_length,length;
1485
1486
  /*
1487
    Assert that we cannot come here with a shared cache. If we do one
1488
    day, we might need to add a call to copy_to_read_buffer().
1489
  */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1490
  assert(!info->share);
1 by brian
clean slate
1491
1492
  lock_append_buffer(info);
1493
  rest_length= (size_t) (info->write_end - info->write_pos);
1494
  if (Count <= rest_length)
1495
    goto end;
1496
  memcpy(info->write_pos, Buffer, rest_length);
1497
  Buffer+=rest_length;
1498
  Count-=rest_length;
1499
  info->write_pos+=rest_length;
1500
  if (my_b_flush_io_cache(info,0))
1501
  {
1502
    unlock_append_buffer(info);
1503
    return 1;
1504
  }
1505
  if (Count >= IO_SIZE)
1506
  {					/* Fill first intern buffer */
1507
    length=Count & (size_t) ~(IO_SIZE-1);
1508
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1509
    {
1510
      unlock_append_buffer(info);
1511
      return info->error= -1;
1512
    }
1513
    Count-=length;
1514
    Buffer+=length;
1515
    info->end_of_file+=length;
1516
  }
1517
1518
end:
1519
  memcpy(info->write_pos,Buffer,(size_t) Count);
1520
  info->write_pos+=Count;
1521
  unlock_append_buffer(info);
1522
  return 0;
1523
}
1524
1525
1526
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1527
{
1528
  /*
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with 
1531
    several layers of ternary operators that evaluated comma(,) operator
1532
    expressions inside - I do have a test case if somebody wants it
1533
  */
1534
  if (info->type == SEQ_READ_APPEND)
1535
    return my_b_append(info, Buffer, Count);
1536
  return my_b_write(info, Buffer, Count);
1537
}
1538
1539
1540
/*
1541
  Write a block to disk where part of the data may be inside the record
1542
  buffer.  As all write calls to the data goes through the cache,
1543
  we will never get a seek over the end of the buffer
1544
*/
1545
1546
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1547
		   my_off_t pos)
1548
{
1549
  size_t length;
1550
  int error=0;
1551
1552
  /*
1553
    Assert that we cannot come here with a shared cache. If we do one
1554
    day, we might need to add a call to copy_to_read_buffer().
1555
  */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1556
  assert(!info->share);
1 by brian
clean slate
1557
1558
  if (pos < info->pos_in_file)
1559
  {
1560
    /* Of no overlap, write everything without buffering */
1561
    if (pos + Count <= info->pos_in_file)
32 by Brian Aker
More cleanup on pread()
1562
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1 by brian
clean slate
1563
    /* Write the part of the block that is before buffer */
1564
    length= (uint) (info->pos_in_file - pos);
32 by Brian Aker
More cleanup on pread()
1565
    if (pwrite(info->file, Buffer, length, pos) == 0)
1 by brian
clean slate
1566
      info->error= error= -1;
1567
    Buffer+=length;
1568
    pos+=  length;
1569
    Count-= length;
1570
#ifndef HAVE_PREAD
1571
    info->seek_not_done=1;
1572
#endif
1573
  }
1574
1575
  /* Check if we want to write inside the used part of the buffer.*/
1576
  length= (size_t) (info->write_end - info->buffer);
1577
  if (pos < info->pos_in_file + length)
1578
  {
1579
    size_t offset= (size_t) (pos - info->pos_in_file);
1580
    length-=offset;
1581
    if (length > Count)
1582
      length=Count;
1583
    memcpy(info->buffer+offset, Buffer, length);
1584
    Buffer+=length;
1585
    Count-= length;
1586
    /* Fix length of buffer if the new data was larger */
1587
    if (info->buffer+length > info->write_pos)
1588
      info->write_pos=info->buffer+length;
1589
    if (!Count)
1590
      return (error);
1591
  }
1592
  /* Write at the end of the current buffer; This is the normal case */
1593
  if (_my_b_write(info, Buffer, Count))
1594
    error= -1;
1595
  return error;
1596
}
1597
1598
1599
	/* Flush write cache */
1600
1601
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1602
  lock_append_buffer(info);
1603
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1604
  unlock_append_buffer(info);
1605
1606
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1607
{
1608
  size_t length;
146 by Brian Aker
my_bool cleanup.
1609
  bool append_cache;
1 by brian
clean slate
1610
  my_off_t pos_in_file;
1611
1612
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1613
    need_append_buffer_lock=0;
1614
1615
  if (info->type == WRITE_CACHE || append_cache)
1616
  {
1617
    if (info->file == -1)
1618
    {
1619
      if (real_open_cached_file(info))
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1620
	return((info->error= -1));
1 by brian
clean slate
1621
    }
1622
    LOCK_APPEND_BUFFER;
1623
1624
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
1625
    {
1626
      /*
1627
        In case of a shared I/O cache with a writer we do direct write
1628
        cache to read cache copy. Do it before the write here so that
1629
        the readers can work in parallel with the write.
1630
        copy_to_read_buffer() relies on info->pos_in_file.
1631
      */
1632
      if (info->share)
1633
        copy_to_read_buffer(info, info->write_buffer, length);
1634
1635
      pos_in_file=info->pos_in_file;
1636
      /*
1637
	If we have append cache, we always open the file with
1638
	O_APPEND which moves the pos to EOF automatically on every write
1639
      */
1640
      if (!append_cache && info->seek_not_done)
1641
      {					/* File touched, do seek */
1642
	if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1643
	    MY_FILEPOS_ERROR)
1644
	{
1645
	  UNLOCK_APPEND_BUFFER;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1646
	  return((info->error= -1));
1 by brian
clean slate
1647
	}
1648
	if (!append_cache)
1649
	  info->seek_not_done=0;
1650
      }
1651
      if (!append_cache)
1652
	info->pos_in_file+=length;
1653
      info->write_end= (info->write_buffer+info->buffer_length-
1654
			((pos_in_file+length) & (IO_SIZE-1)));
1655
1656
      if (my_write(info->file,info->write_buffer,length,
1657
		   info->myflags | MY_NABP))
1658
	info->error= -1;
1659
      else
1660
	info->error= 0;
1661
      if (!append_cache)
1662
      {
1663
        set_if_bigger(info->end_of_file,(pos_in_file+length));
1664
      }
1665
      else
1666
      {
1667
	info->end_of_file+=(info->write_pos-info->append_read_pos);
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1668
	assert(info->end_of_file == my_tell(info->file,MYF(0)));
1 by brian
clean slate
1669
      }
1670
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
1672
      ++info->disk_writes;
1673
      UNLOCK_APPEND_BUFFER;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1674
      return(info->error);
1 by brian
clean slate
1675
    }
1676
  }
1677
#ifdef HAVE_AIOWAIT
1678
  else if (info->type != READ_NET)
1679
  {
1680
    my_aiowait(&info->aio_result);		/* Wait for outstanding req */
1681
    info->inited=0;
1682
  }
1683
#endif
1684
  UNLOCK_APPEND_BUFFER;
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1685
  return(0);
1 by brian
clean slate
1686
}
1687
1688
/*
1689
  Free an IO_CACHE object
1690
1691
  SYNOPSOS
1692
    end_io_cache()
1693
    info		IO_CACHE Handle to free
1694
1695
  NOTES
1696
    It's currently safe to call this if one has called init_io_cache()
1697
    on the 'info' object, even if init_io_cache() failed.
1698
    This function is also safe to call twice with the same handle.
1699
1700
  RETURN
1701
   0  ok
1702
   #  Error
1703
*/
1704
1705
int end_io_cache(IO_CACHE *info)
1706
{
1707
  int error=0;
1708
  IO_CACHE_CALLBACK pre_close;
1709
1710
  /*
1711
    Every thread must call remove_io_thread(). The last one destroys
1712
    the share elements.
1713
  */
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1714
  assert(!info->share || !info->share->total_threads);
1 by brian
clean slate
1715
1716
  if ((pre_close=info->pre_close))
1717
  {
1718
    (*pre_close)(info);
1719
    info->pre_close= 0;
1720
  }
1721
  if (info->alloced_buffer)
1722
  {
1723
    info->alloced_buffer=0;
1724
    if (info->file != -1)			/* File doesn't exist */
1725
      error= my_b_flush_io_cache(info,1);
1726
    my_free((uchar*) info->buffer,MYF(MY_WME));
1727
    info->buffer=info->read_pos=(uchar*) 0;
1728
  }
1729
  if (info->type == SEQ_READ_APPEND)
1730
  {
1731
    /* Destroy allocated mutex */
1732
    info->type= TYPE_NOT_SET;
1733
    pthread_mutex_destroy(&info->append_buffer_lock);
1734
  }
51.3.22 by Jay Pipes
Final round of removal of DBUG in mysys/, including Makefile
1735
  return(error);
1 by brian
clean slate
1736
} /* end_io_cache */
1737
1738
1739
/**********************************************************************
1740
 Testing of MF_IOCACHE
1741
**********************************************************************/
1742
1743
#ifdef MAIN
1744
1745
#include <my_dir.h>
1746
1747
void die(const char* fmt, ...)
1748
{
1749
  va_list va_args;
1750
  va_start(va_args,fmt);
1751
  fprintf(stderr,"Error:");
1752
  vfprintf(stderr, fmt,va_args);
1753
  fprintf(stderr,", errno=%d\n", errno);
1754
  exit(1);
1755
}
1756
1757
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1758
{
1759
  int fd;
1760
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1761
    die("Could not open %s", fname);
1762
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1763
    die("failed in init_io_cache()");
1764
  return fd;
1765
}
1766
1767
void close_file(IO_CACHE* info)
1768
{
1769
  end_io_cache(info);
1770
  my_close(info->file, MYF(MY_WME));
1771
}
1772
1773
int main(int argc, char** argv)
1774
{
1775
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1776
  MY_STAT status;
1777
  const char* fname="/tmp/iocache.test";
1778
  int cache_size=16384;
1779
  char llstr_buf[22];
1780
  int max_block,total_bytes=0;
1781
  int i,num_loops=100,error=0;
1782
  char *p;
1783
  char* block, *block_end;
1784
  MY_INIT(argv[0]);
1785
  max_block = cache_size*3;
1786
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1787
    die("Not enough memory to allocate test block");
1788
  block_end = block + max_block;
1789
  for (p = block,i=0; p < block_end;i++)
1790
  {
1791
    *p++ = (char)i;
1792
  }
1793
  if (my_stat(fname,&status, MYF(0)) &&
1794
      my_delete(fname,MYF(MY_WME)))
1795
    {
1796
      die("Delete of %s failed, aborting", fname);
1797
    }
1798
  open_file(fname,&sra_cache, cache_size);
1799
  for (i = 0; i < num_loops; i++)
1800
  {
1801
    char buf[4];
1802
    int block_size = abs(rand() % max_block);
1803
    int4store(buf, block_size);
1804
    if (my_b_append(&sra_cache,buf,4) ||
1805
	my_b_append(&sra_cache, block, block_size))
1806
      die("write failed");
1807
    total_bytes += 4+block_size;
1808
  }
1809
  close_file(&sra_cache);
1810
  my_free(block,MYF(MY_WME));
1811
  if (!my_stat(fname,&status,MYF(MY_WME)))
1812
    die("%s failed to stat, but I had just closed it,\
1813
 wonder how that happened");
1814
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
1815
	 llstr(status.st_size,llstr_buf),
1816
	 total_bytes);
1817
  my_delete(fname, MYF(MY_WME));
1818
  /* check correctness of tests */
1819
  if (total_bytes != status.st_size)
1820
  {
1821
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
1822
supposedly written\n");
1823
    error=1;
1824
  }
1825
  exit(error);
1826
  return 0;
1827
}
1828
#endif