~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

Renamed more stuff to drizzle.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
 
50
#define MAP_TO_USE_RAID
 
51
#include "mysys_priv.h"
 
52
#include <m_string.h>
54
53
#ifdef HAVE_AIOWAIT
55
 
#include "drizzled/error.h"
56
 
#include "drizzled/internal/aio_result.h"
 
54
#include "mysys_err.h"
57
55
static void my_aiowait(my_aio_result *result);
58
56
#endif
59
 
#include "drizzled/internal/iocache.h"
60
57
#include <errno.h>
61
 
#include <drizzled/util/test.h>
62
 
#include <stdlib.h>
63
 
#include <algorithm>
64
 
 
65
 
using namespace std;
66
 
 
67
 
namespace drizzled
68
 
{
69
 
namespace internal
70
 
{
71
 
 
72
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
73
 
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
74
 
 
75
 
/**
76
 
 * @brief
77
 
 *   Lock appends for the IO_CACHE if required (need_append_buffer_lock)   
78
 
 */
79
 
inline
80
 
static void lock_append_buffer(register IO_CACHE *, int )
81
 
{
82
 
}
83
 
 
84
 
/**
85
 
 * @brief
86
 
 *   Unlock appends for the IO_CACHE if required (need_append_buffer_lock)   
87
 
 */
88
 
inline
89
 
static void unlock_append_buffer(register IO_CACHE *, int )
90
 
{
91
 
}
92
 
 
93
 
/**
94
 
 * @brief
95
 
 *   Round up to the nearest IO_SIZE boundary 
96
 
 */
97
 
inline
98
 
static size_t io_round_up(size_t x)
99
 
{
100
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
101
 
}
102
 
 
103
 
/**
104
 
 * @brief
105
 
 *   Round down to the nearest IO_SIZE boundary   
106
 
 */
107
 
inline
108
 
static size_t io_round_dn(size_t x)
109
 
{
110
 
  return (x & ~(IO_SIZE-1));
111
 
}
112
 
 
113
 
 
114
 
/**
115
 
 * @brief 
116
 
 *   Setup internal pointers inside IO_CACHE
117
 
 * 
118
 
 * @details
119
 
 *   This is called on automatically on init or reinit of IO_CACHE
120
 
 *   It must be called externally if one moves or copies an IO_CACHE object.
121
 
 * 
122
 
 * @param info Cache handler to setup
123
 
 */
 
58
 
 
59
#define lock_append_buffer(info) \
 
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
61
#define unlock_append_buffer(info) \
 
62
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
63
 
 
64
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
65
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
66
 
 
67
/*
 
68
  Setup internal pointers inside IO_CACHE
 
69
 
 
70
  SYNOPSIS
 
71
    setup_io_cache()
 
72
    info                IO_CACHE handler
 
73
 
 
74
  NOTES
 
75
    This is called on automaticly on init or reinit of IO_CACHE
 
76
    It must be called externally if one moves or copies an IO_CACHE
 
77
    object.
 
78
*/
 
79
 
124
80
void setup_io_cache(IO_CACHE* info)
125
81
{
126
82
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
151
107
      as myisamchk
152
108
    */
153
109
    break;
 
110
  case SEQ_READ_APPEND:
 
111
    info->read_function = _my_b_seq_read;
 
112
    info->write_function = 0;                   /* Force a core if used */
 
113
    break;
154
114
  default:
155
 
    info->read_function = _my_b_read;
 
115
    info->read_function =
 
116
                          info->share ? _my_b_read_r :
 
117
                                        _my_b_read;
156
118
    info->write_function = _my_b_write;
157
119
  }
158
120
 
159
121
  setup_io_cache(info);
160
122
}
161
123
 
162
 
/**
163
 
 * @brief
164
 
 *   Initialize an IO_CACHE object
165
 
 *
166
 
 * @param info Cache handler to initialize
167
 
 * @param file File that should be associated with the handler
168
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
169
 
 * @param cachesize Size of buffer to allocate for read/write
170
 
 *                      If == 0 then use my_default_record_cache_size
171
 
 * @param type Type of cache
172
 
 * @param seek_offset Where cache should start reading/writing
173
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
174
 
 * @param cache_myflags Bitmap of different flags
175
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
176
 
 * 
177
 
 * @retval 0 success
178
 
 * @retval # error
179
 
 */
180
 
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
 
124
 
 
125
/*
 
126
  Initialize an IO_CACHE object
 
127
 
 
128
  SYNOPSOS
 
129
    init_io_cache()
 
130
    info                cache handler to initialize
 
131
    file                File that should be associated to to the handler
 
132
                        If == -1 then real_open_cached_file()
 
133
                        will be called when it's time to open file.
 
134
    cachesize           Size of buffer to allocate for read/write
 
135
                        If == 0 then use my_default_record_cache_size
 
136
    type                Type of cache
 
137
    seek_offset         Where cache should start reading/writing
 
138
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
139
    cache_myflags       Bitmap of differnt flags
 
140
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
141
                        MY_DONT_CHECK_FILESIZE
 
142
 
 
143
  RETURN
 
144
    0  ok
 
145
    #  error
 
146
*/
 
147
 
 
148
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
181
149
                  enum cache_type type, my_off_t seek_offset,
182
 
                  bool use_async_io, myf cache_myflags)
 
150
                  pbool use_async_io, myf cache_myflags)
183
151
{
184
152
  size_t min_cache;
185
 
  off_t pos;
 
153
  my_off_t pos;
186
154
  my_off_t end_of_file= ~(my_off_t) 0;
 
155
  DBUG_ENTER("init_io_cache");
 
156
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
 
157
                      (ulong) info, (int) type, (ulong) seek_offset));
187
158
 
188
159
  info->file= file;
189
160
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
196
167
 
197
168
  if (file >= 0)
198
169
  {
199
 
    pos= lseek(file, 0, SEEK_CUR);
200
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
170
    pos= my_tell(file, MYF(0));
 
171
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
201
172
    {
202
173
      /*
203
174
         This kind of object doesn't support seek() or tell(). Don't set a
209
180
        the beginning of whatever this file is, then somebody made a bad
210
181
        assumption.
211
182
      */
212
 
      assert(seek_offset == 0);
 
183
      DBUG_ASSERT(seek_offset == 0);
213
184
    }
214
185
    else
215
 
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
 
186
      info->seek_not_done= test(seek_offset != pos);
216
187
  }
217
188
 
 
189
  info->disk_writes= 0;
 
190
  info->share=0;
 
191
 
218
192
  if (!cachesize && !(cachesize= my_default_record_cache_size))
219
 
    return(1);                          /* No cache requested */
 
193
    DBUG_RETURN(1);                             /* No cache requested */
220
194
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
221
 
  if (type == READ_CACHE)
 
195
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
222
196
  {                                             /* Assume file isn't growing */
223
197
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
224
198
    {
225
199
      /* Calculate end of file to avoid allocating oversized buffers */
226
 
      end_of_file=lseek(file,0L,SEEK_END);
 
200
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
227
201
      /* Need to reset seek_not_done now that we just did a seek. */
228
202
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
229
203
      if (end_of_file < seek_offset)
247
221
      if (cachesize < min_cache)
248
222
        cachesize = min_cache;
249
223
      buffer_block= cachesize;
 
224
      if (type == SEQ_READ_APPEND)
 
225
        buffer_block *= 2;
250
226
      if ((info->buffer=
251
 
           (unsigned char*) malloc(buffer_block)) != 0)
 
227
           (uchar*) my_malloc(buffer_block,
 
228
                             MYF((cache_myflags & ~ MY_WME) |
 
229
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
252
230
      {
253
231
        info->write_buffer=info->buffer;
254
 
        info->alloced_buffer= true;
 
232
        if (type == SEQ_READ_APPEND)
 
233
          info->write_buffer = info->buffer + cachesize;
 
234
        info->alloced_buffer=1;
255
235
        break;                                  /* Enough memory found */
256
236
      }
257
237
      if (cachesize == min_cache)
258
 
        return(2);                              /* Can't alloc cache */
 
238
        DBUG_RETURN(2);                         /* Can't alloc cache */
259
239
      /* Try with less memory */
260
240
      cachesize= (cachesize*3/4 & ~(min_cache-1));
261
241
    }
262
242
  }
263
243
 
 
244
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
264
245
  info->read_length=info->buffer_length=cachesize;
265
246
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
266
247
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
248
  if (type == SEQ_READ_APPEND)
 
249
  {
 
250
    info->append_read_pos = info->write_pos = info->write_buffer;
 
251
    info->write_end = info->write_buffer + info->buffer_length;
 
252
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
253
  }
 
254
#if defined(SAFE_MUTEX)
 
255
  else
 
256
  {
 
257
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
258
    bzero((char*) &info->append_buffer_lock, sizeof(info));
 
259
  }
 
260
#endif
267
261
 
268
262
  if (type == WRITE_CACHE)
269
263
    info->write_end=
279
273
#ifdef HAVE_AIOWAIT
280
274
  if (use_async_io && ! my_disable_async_io)
281
275
  {
 
276
    DBUG_PRINT("info",("Using async io"));
282
277
    info->read_length/=2;
283
278
    info->read_function=_my_b_async_read;
284
279
  }
285
280
  info->inited=info->aio_result.pending=0;
286
281
#endif
287
 
  return(0);
 
282
  DBUG_RETURN(0);
288
283
}                                               /* init_io_cache */
289
284
 
290
285
        /* Wait until current request is ready */
301
296
      {
302
297
        if (errno == EINTR)
303
298
          continue;
 
299
        DBUG_PRINT("error",("No aio request, error: %d",errno));
304
300
        result->pending=0;                      /* Assume everythings is ok */
305
301
        break;
306
302
      }
313
309
}
314
310
#endif
315
311
 
316
 
/**
317
 
 * @brief 
318
 
 *   Reset the cache
319
 
 * 
320
 
 * @detail
321
 
 *   Use this to reset cache to re-start reading or to change the type 
322
 
 *   between READ_CACHE <-> WRITE_CACHE
323
 
 *   If we are doing a reinit of a cache where we have the start of the file
324
 
 *   in the cache, we are reusing this memory without flushing it to disk.
325
 
 */
326
 
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
312
 
 
313
/*
 
314
  Use this to reset cache to re-start reading or to change the type
 
315
  between READ_CACHE <-> WRITE_CACHE
 
316
  If we are doing a reinit of a cache where we have the start of the file
 
317
  in the cache, we are reusing this memory without flushing it to disk.
 
318
*/
 
319
 
 
320
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
327
321
                        my_off_t seek_offset,
328
 
                        bool use_async_io,
329
 
                        bool clear_cache)
 
322
                        pbool use_async_io __attribute__((unused)),
 
323
                        pbool clear_cache)
330
324
{
 
325
  DBUG_ENTER("reinit_io_cache");
 
326
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
 
327
                      (ulong) info, type, (ulong) seek_offset,
 
328
                      (int) clear_cache));
 
329
 
331
330
  /* One can't do reinit with the following types */
332
 
  assert(type != READ_NET && info->type != READ_NET &&
333
 
              type != WRITE_NET && info->type != WRITE_NET);
 
331
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
 
332
              type != WRITE_NET && info->type != WRITE_NET &&
 
333
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
334
334
 
335
335
  /* If the whole file is in memory, avoid flushing to disk */
336
336
  if (! clear_cache &&
338
338
      seek_offset <= my_b_tell(info))
339
339
  {
340
340
    /* Reuse current buffer without flushing it to disk */
341
 
    unsigned char *pos;
 
341
    uchar *pos;
342
342
    if (info->type == WRITE_CACHE && type == READ_CACHE)
343
343
    {
344
344
      info->read_end=info->write_pos;
377
377
      info->end_of_file=my_b_tell(info);
378
378
    /* flush cache if we want to reuse it */
379
379
    if (!clear_cache && my_b_flush_io_cache(info,1))
380
 
      return(1);
 
380
      DBUG_RETURN(1);
381
381
    info->pos_in_file=seek_offset;
382
382
    /* Better to do always do a seek */
383
383
    info->seek_not_done=1;
399
399
 
400
400
#ifdef HAVE_AIOWAIT
401
401
  if (use_async_io && ! my_disable_async_io &&
402
 
      ((uint32_t) info->buffer_length <
403
 
       (uint32_t) (info->end_of_file - seek_offset)))
 
402
      ((ulong) info->buffer_length <
 
403
       (ulong) (info->end_of_file - seek_offset)))
404
404
  {
405
405
    info->read_length=info->buffer_length/2;
406
406
    info->read_function=_my_b_async_read;
407
407
  }
408
408
  info->inited=0;
409
 
#else
410
 
  (void)use_async_io;
411
409
#endif
412
 
  return(0);
 
410
  DBUG_RETURN(0);
413
411
} /* reinit_io_cache */
414
412
 
415
 
/**
416
 
 * @brief 
417
 
 *   Read buffered.
418
 
 * 
419
 
 * @detail
420
 
 *   This function is only called from the my_b_read() macro when there
421
 
 *   aren't enough characters in the buffer to satisfy the request.
422
 
 *
423
 
 * WARNING
424
 
 *   When changing this function, be careful with handling file offsets
425
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
426
 
 *   types than my_off_t unless you can be sure that their value fits.
427
 
 *   Same applies to differences of file offsets.
428
 
 *
429
 
 * @param info IO_CACHE pointer @param Buffer Buffer to retrieve count bytes
430
 
 * from file @param Count Number of bytes to read into Buffer
431
 
 * 
432
 
 * @retval 0 We succeeded in reading all data
433
 
 * @retval 1 Error: can't read requested characters
434
 
 */
435
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
413
 
 
414
 
 
415
/*
 
416
  Read buffered.
 
417
 
 
418
  SYNOPSIS
 
419
    _my_b_read()
 
420
      info                      IO_CACHE pointer
 
421
      Buffer                    Buffer to retrieve count bytes from file
 
422
      Count                     Number of bytes to read into Buffer
 
423
 
 
424
  NOTE
 
425
    This function is only called from the my_b_read() macro when there
 
426
    isn't enough characters in the buffer to satisfy the request.
 
427
 
 
428
  WARNING
 
429
 
 
430
    When changing this function, be careful with handling file offsets
 
431
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
432
    types than my_off_t unless you can be sure that their value fits.
 
433
    Same applies to differences of file offsets.
 
434
 
 
435
    When changing this function, check _my_b_read_r(). It might need the
 
436
    same change.
 
437
 
 
438
  RETURN
 
439
    0      we succeeded in reading all data
 
440
    1      Error: can't read requested characters
 
441
*/
 
442
 
 
443
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
436
444
{
437
445
  size_t length,diff_length,left_length, max_length;
438
446
  my_off_t pos_in_file;
 
447
  DBUG_ENTER("_my_b_read");
439
448
 
440
449
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
441
450
  {
442
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
451
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
443
452
    memcpy(Buffer,info->read_pos, left_length);
444
453
    Buffer+=left_length;
445
454
    Count-=left_length;
448
457
  /* pos_in_file always point on where info->buffer was read */
449
458
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
450
459
 
451
 
  /*
 
460
  /* 
452
461
    Whenever a function which operates on IO_CACHE flushes/writes
453
462
    some part of the IO_CACHE to disk it will set the property
454
463
    "seek_not_done" to indicate this to other functions operating
456
465
  */
457
466
  if (info->seek_not_done)
458
467
  {
459
 
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
 
468
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
469
        != MY_FILEPOS_ERROR))
460
470
    {
461
471
      /* No error, reset seek_not_done flag. */
462
472
      info->seek_not_done= 0;
468
478
        info->file is a pipe or socket or FIFO.  We never should have tried
469
479
        to seek on that.  See Bugs#25807 and #22828 for more info.
470
480
      */
471
 
      assert(errno != ESPIPE);
 
481
      DBUG_ASSERT(my_errno != ESPIPE);
472
482
      info->error= -1;
473
 
      return(1);
 
483
      DBUG_RETURN(1);
474
484
    }
475
485
  }
476
486
 
481
491
    if (info->end_of_file <= pos_in_file)
482
492
    {                                   /* End of file */
483
493
      info->error= (int) left_length;
484
 
      return(1);
 
494
      DBUG_RETURN(1);
485
495
    }
486
496
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
487
497
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
489
499
    {
490
500
      info->error= (read_length == (size_t) -1 ? -1 :
491
501
                    (int) (read_length+left_length));
492
 
      return(1);
 
502
      DBUG_RETURN(1);
493
503
    }
494
504
    Count-=length;
495
505
    Buffer+=length;
506
516
  {
507
517
    if (Count)
508
518
    {
509
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
510
 
      return(1);
 
519
      info->error= left_length;         /* We only got this many char */
 
520
      DBUG_RETURN(1);
511
521
    }
512
522
    length=0;                           /* Didn't read any chars */
513
523
  }
520
530
    info->pos_in_file= pos_in_file;
521
531
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
522
532
    info->read_pos=info->read_end=info->buffer;
523
 
    return(1);
 
533
    DBUG_RETURN(1);
524
534
  }
525
535
  info->read_pos=info->buffer+Count;
526
536
  info->read_end=info->buffer+length;
527
537
  info->pos_in_file=pos_in_file;
528
538
  memcpy(Buffer, info->buffer, Count);
529
 
  return(0);
 
539
  DBUG_RETURN(0);
 
540
}
 
541
 
 
542
 
 
543
/*
 
544
  Prepare IO_CACHE for shared use.
 
545
 
 
546
  SYNOPSIS
 
547
    init_io_cache_share()
 
548
      read_cache                A read cache. This will be copied for
 
549
                                every thread after setup.
 
550
      cshare                    The share.
 
551
      write_cache               If non-NULL a write cache that is to be
 
552
                                synchronized with the read caches.
 
553
      num_threads               Number of threads sharing the cache
 
554
                                including the write thread if any.
 
555
 
 
556
  DESCRIPTION
 
557
 
 
558
    The shared cache is used so: One IO_CACHE is initialized with
 
559
    init_io_cache(). This includes the allocation of a buffer. Then a
 
560
    share is allocated and init_io_cache_share() is called with the io
 
561
    cache and the share. Then the io cache is copied for each thread. So
 
562
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
563
    is shared because cache->buffer is the same for all caches.
 
564
 
 
565
    One thread reads data from the file into the buffer. All threads
 
566
    read from the buffer, but every thread maintains its own set of
 
567
    pointers into the buffer. When all threads have used up the buffer
 
568
    contents, one of the threads reads the next block of data into the
 
569
    buffer. To accomplish this, each thread enters the cache lock before
 
570
    accessing the buffer. They wait in lock_io_cache() until all threads
 
571
    joined the lock. The last thread entering the lock is in charge of
 
572
    reading from file to buffer. It wakes all threads when done.
 
573
 
 
574
    Synchronizing a write cache to the read caches works so: Whenever
 
575
    the write buffer needs a flush, the write thread enters the lock and
 
576
    waits for all other threads to enter the lock too. They do this when
 
577
    they have used up the read buffer. When all threads are in the lock,
 
578
    the write thread copies the write buffer to the read buffer and
 
579
    wakes all threads.
 
580
 
 
581
    share->running_threads is the number of threads not being in the
 
582
    cache lock. When entering lock_io_cache() the number is decreased.
 
583
    When the thread that fills the buffer enters unlock_io_cache() the
 
584
    number is reset to the number of threads. The condition
 
585
    running_threads == 0 means that all threads are in the lock. Bumping
 
586
    up the number to the full count is non-intuitive. But increasing the
 
587
    number by one for each thread that leaves the lock could lead to a
 
588
    solo run of one thread. The last thread to join a lock reads from
 
589
    file to buffer, wakes the other threads, processes the data in the
 
590
    cache and enters the lock again. If no other thread left the lock
 
591
    meanwhile, it would think it's the last one again and read the next
 
592
    block...
 
593
 
 
594
    The share has copies of 'error', 'buffer', 'read_end', and
 
595
    'pos_in_file' from the thread that filled the buffer. We may not be
 
596
    able to access this information directly from its cache because the
 
597
    thread may be removed from the share before the variables could be
 
598
    copied by all other threads. Or, if a write buffer is synchronized,
 
599
    it would change its 'pos_in_file' after waking the other threads,
 
600
    possibly before they could copy its value.
 
601
 
 
602
    However, the 'buffer' variable in the share is for a synchronized
 
603
    write cache. It needs to know where to put the data. Otherwise it
 
604
    would need access to the read cache of one of the threads that is
 
605
    not yet removed from the share.
 
606
 
 
607
  RETURN
 
608
    void
 
609
*/
 
610
 
 
611
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
612
                         IO_CACHE *write_cache, uint num_threads)
 
613
{
 
614
  DBUG_ENTER("init_io_cache_share");
 
615
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
 
616
                                "write_cache: 0x%lx  threads: %u",
 
617
                                (long) read_cache, (long) cshare,
 
618
                                (long) write_cache, num_threads));
 
619
 
 
620
  DBUG_ASSERT(num_threads > 1);
 
621
  DBUG_ASSERT(read_cache->type == READ_CACHE);
 
622
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
 
623
 
 
624
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
625
  pthread_cond_init(&cshare->cond, 0);
 
626
  pthread_cond_init(&cshare->cond_writer, 0);
 
627
 
 
628
  cshare->running_threads= num_threads;
 
629
  cshare->total_threads=   num_threads;
 
630
  cshare->error=           0;    /* Initialize. */
 
631
  cshare->buffer=          read_cache->buffer;
 
632
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
633
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
634
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
635
 
 
636
  read_cache->share=         cshare;
 
637
  read_cache->read_function= _my_b_read_r;
 
638
  read_cache->current_pos=   NULL;
 
639
  read_cache->current_end=   NULL;
 
640
 
 
641
  if (write_cache)
 
642
    write_cache->share= cshare;
 
643
 
 
644
  DBUG_VOID_RETURN;
 
645
}
 
646
 
 
647
 
 
648
/*
 
649
  Remove a thread from shared access to IO_CACHE.
 
650
 
 
651
  SYNOPSIS
 
652
    remove_io_thread()
 
653
      cache                     The IO_CACHE to be removed from the share.
 
654
 
 
655
  NOTE
 
656
 
 
657
    Every thread must do that on exit for not to deadlock other threads.
 
658
 
 
659
    The last thread destroys the pthread resources.
 
660
 
 
661
    A writer flushes its cache first.
 
662
 
 
663
  RETURN
 
664
    void
 
665
*/
 
666
 
 
667
void remove_io_thread(IO_CACHE *cache)
 
668
{
 
669
  IO_CACHE_SHARE *cshare= cache->share;
 
670
  uint total;
 
671
  DBUG_ENTER("remove_io_thread");
 
672
 
 
673
  /* If the writer goes, it needs to flush the write cache. */
 
674
  if (cache == cshare->source_cache)
 
675
    flush_io_cache(cache);
 
676
 
 
677
  pthread_mutex_lock(&cshare->mutex);
 
678
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
 
679
                                (cache == cshare->source_cache) ?
 
680
                                "writer" : "reader", (long) cache));
 
681
 
 
682
  /* Remove from share. */
 
683
  total= --cshare->total_threads;
 
684
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
 
685
 
 
686
  /* Detach from share. */
 
687
  cache->share= NULL;
 
688
 
 
689
  /* If the writer goes, let the readers know. */
 
690
  if (cache == cshare->source_cache)
 
691
  {
 
692
    DBUG_PRINT("io_cache_share", ("writer leaves"));
 
693
    cshare->source_cache= NULL;
 
694
  }
 
695
 
 
696
  /* If all threads are waiting for me to join the lock, wake them. */
 
697
  if (!--cshare->running_threads)
 
698
  {
 
699
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
 
700
    pthread_cond_signal(&cshare->cond_writer);
 
701
    pthread_cond_broadcast(&cshare->cond);
 
702
  }
 
703
 
 
704
  pthread_mutex_unlock(&cshare->mutex);
 
705
 
 
706
  if (!total)
 
707
  {
 
708
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
 
709
    pthread_cond_destroy (&cshare->cond_writer);
 
710
    pthread_cond_destroy (&cshare->cond);
 
711
    pthread_mutex_destroy(&cshare->mutex);
 
712
  }
 
713
 
 
714
  DBUG_VOID_RETURN;
 
715
}
 
716
 
 
717
 
 
718
/*
 
719
  Lock IO cache and wait for all other threads to join.
 
720
 
 
721
  SYNOPSIS
 
722
    lock_io_cache()
 
723
      cache                     The cache of the thread entering the lock.
 
724
      pos                       File position of the block to read.
 
725
                                Unused for the write thread.
 
726
 
 
727
  DESCRIPTION
 
728
 
 
729
    Wait for all threads to finish with the current buffer. We want
 
730
    all threads to proceed in concert. The last thread to join
 
731
    lock_io_cache() will read the block from file and all threads start
 
732
    to use it. Then they will join again for reading the next block.
 
733
 
 
734
    The waiting threads detect a fresh buffer by comparing
 
735
    cshare->pos_in_file with the position they want to process next.
 
736
    Since the first block may start at position 0, we take
 
737
    cshare->read_end as an additional condition. This variable is
 
738
    initialized to NULL and will be set after a block of data is written
 
739
    to the buffer.
 
740
 
 
741
  RETURN
 
742
    1           OK, lock in place, go ahead and read.
 
743
    0           OK, unlocked, another thread did the read.
 
744
*/
 
745
 
 
746
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
747
{
 
748
  IO_CACHE_SHARE *cshare= cache->share;
 
749
  DBUG_ENTER("lock_io_cache");
 
750
 
 
751
  /* Enter the lock. */
 
752
  pthread_mutex_lock(&cshare->mutex);
 
753
  cshare->running_threads--;
 
754
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
755
                                (cache == cshare->source_cache) ?
 
756
                                "writer" : "reader", (long) cache, (ulong) pos,
 
757
                                cshare->running_threads));
 
758
 
 
759
  if (cshare->source_cache)
 
760
  {
 
761
    /* A write cache is synchronized to the read caches. */
 
762
 
 
763
    if (cache == cshare->source_cache)
 
764
    {
 
765
      /* The writer waits until all readers are here. */
 
766
      while (cshare->running_threads)
 
767
      {
 
768
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
 
769
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
770
      }
 
771
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
 
772
 
 
773
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
774
      DBUG_RETURN(1);
 
775
    }
 
776
 
 
777
    /* The last thread wakes the writer. */
 
778
    if (!cshare->running_threads)
 
779
    {
 
780
      DBUG_PRINT("io_cache_share", ("waking writer"));
 
781
      pthread_cond_signal(&cshare->cond_writer);
 
782
    }
 
783
 
 
784
    /*
 
785
      Readers wait until the data is copied from the writer. Another
 
786
      reason to stop waiting is the removal of the write thread. If this
 
787
      happens, we leave the lock with old data in the buffer.
 
788
    */
 
789
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
790
           cshare->source_cache)
 
791
    {
 
792
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
793
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
794
    }
 
795
 
 
796
    /*
 
797
      If the writer was removed from the share while this thread was
 
798
      asleep, we need to simulate an EOF condition. The writer cannot
 
799
      reset the share variables as they might still be in use by readers
 
800
      of the last block. When we awake here then because the last
 
801
      joining thread signalled us. If the writer is not the last, it
 
802
      will not signal. So it is safe to clear the buffer here.
 
803
    */
 
804
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
805
    {
 
806
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
 
807
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
808
      cshare->error= 0; /* EOF is not an error. */
 
809
    }
 
810
  }
 
811
  else
 
812
  {
 
813
    /*
 
814
      There are read caches only. The last thread arriving in
 
815
      lock_io_cache() continues with a locked cache and reads the block.
 
816
    */
 
817
    if (!cshare->running_threads)
 
818
    {
 
819
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
 
820
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
821
      DBUG_RETURN(1);
 
822
    }
 
823
 
 
824
    /*
 
825
      All other threads wait until the requested block is read by the
 
826
      last thread arriving. Another reason to stop waiting is the
 
827
      removal of a thread. If this leads to all threads being in the
 
828
      lock, we have to continue also. The first of the awaken threads
 
829
      will then do the read.
 
830
    */
 
831
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
832
           cshare->running_threads)
 
833
    {
 
834
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
835
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
836
    }
 
837
 
 
838
    /* If the block is not yet read, continue with a locked cache and read. */
 
839
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
840
    {
 
841
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
 
842
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
843
      DBUG_RETURN(1);
 
844
    }
 
845
 
 
846
    /* Another thread did read the block already. */
 
847
  }
 
848
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
 
849
                                (uint) (cshare->read_end ? (size_t)
 
850
                                        (cshare->read_end - cshare->buffer) :
 
851
                                        0)));
 
852
 
 
853
  /*
 
854
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
855
    filled the buffer did this and marked all threads as running.
 
856
  */
 
857
  pthread_mutex_unlock(&cshare->mutex);
 
858
  DBUG_RETURN(0);
 
859
}
 
860
 
 
861
 
 
862
/*
 
863
  Unlock IO cache.
 
864
 
 
865
  SYNOPSIS
 
866
    unlock_io_cache()
 
867
      cache                     The cache of the thread leaving the lock.
 
868
 
 
869
  NOTE
 
870
    This is called by the thread that filled the buffer. It marks all
 
871
    threads as running and awakes them. This must not be done by any
 
872
    other thread.
 
873
 
 
874
    Do not signal cond_writer. Either there is no writer or the writer
 
875
    is the only one who can call this function.
 
876
 
 
877
    The reason for resetting running_threads to total_threads before
 
878
    waking all other threads is that it could be possible that this
 
879
    thread is so fast with processing the buffer that it enters the lock
 
880
    before even one other thread has left it. If every awoken thread
 
881
    would increase running_threads by one, this thread could think that
 
882
    he is again the last to join and would not wait for the other
 
883
    threads to process the data.
 
884
 
 
885
  RETURN
 
886
    void
 
887
*/
 
888
 
 
889
static void unlock_io_cache(IO_CACHE *cache)
 
890
{
 
891
  IO_CACHE_SHARE *cshare= cache->share;
 
892
  DBUG_ENTER("unlock_io_cache");
 
893
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
894
                                (cache == cshare->source_cache) ?
 
895
                                "writer" : "reader",
 
896
                                (long) cache, (ulong) cshare->pos_in_file,
 
897
                                cshare->total_threads));
 
898
 
 
899
  cshare->running_threads= cshare->total_threads;
 
900
  pthread_cond_broadcast(&cshare->cond);
 
901
  pthread_mutex_unlock(&cshare->mutex);
 
902
  DBUG_VOID_RETURN;
 
903
}
 
904
 
 
905
 
 
906
/*
 
907
  Read from IO_CACHE when it is shared between several threads.
 
908
 
 
909
  SYNOPSIS
 
910
    _my_b_read_r()
 
911
      cache                     IO_CACHE pointer
 
912
      Buffer                    Buffer to retrieve count bytes from file
 
913
      Count                     Number of bytes to read into Buffer
 
914
 
 
915
  NOTE
 
916
    This function is only called from the my_b_read() macro when there
 
917
    isn't enough characters in the buffer to satisfy the request.
 
918
 
 
919
  IMPLEMENTATION
 
920
 
 
921
    It works as follows: when a thread tries to read from a file (that
 
922
    is, after using all the data from the (shared) buffer), it just
 
923
    hangs on lock_io_cache(), waiting for other threads. When the very
 
924
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
925
    does actual IO and unlock_io_cache(), which signals all the waiting
 
926
    threads that data is in the buffer.
 
927
 
 
928
  WARNING
 
929
 
 
930
    When changing this function, be careful with handling file offsets
 
931
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
932
    types than my_off_t unless you can be sure that their value fits.
 
933
    Same applies to differences of file offsets. (Bug #11527)
 
934
 
 
935
    When changing this function, check _my_b_read(). It might need the
 
936
    same change.
 
937
 
 
938
  RETURN
 
939
    0      we succeeded in reading all data
 
940
    1      Error: can't read requested characters
 
941
*/
 
942
 
 
943
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
944
{
 
945
  my_off_t pos_in_file;
 
946
  size_t length, diff_length, left_length;
 
947
  IO_CACHE_SHARE *cshare= cache->share;
 
948
  DBUG_ENTER("_my_b_read_r");
 
949
 
 
950
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
951
  {
 
952
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
 
953
    memcpy(Buffer, cache->read_pos, left_length);
 
954
    Buffer+= left_length;
 
955
    Count-= left_length;
 
956
  }
 
957
  while (Count)
 
958
  {
 
959
    size_t cnt, len;
 
960
 
 
961
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
962
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
963
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
964
    length= ((length <= cache->read_length) ?
 
965
             length + IO_ROUND_DN(cache->read_length - length) :
 
966
             length - IO_ROUND_UP(length - cache->read_length));
 
967
    if (cache->type != READ_FIFO &&
 
968
        (length > (cache->end_of_file - pos_in_file)))
 
969
      length= (size_t) (cache->end_of_file - pos_in_file);
 
970
    if (length == 0)
 
971
    {
 
972
      cache->error= (int) left_length;
 
973
      DBUG_RETURN(1);
 
974
    }
 
975
    if (lock_io_cache(cache, pos_in_file))
 
976
    {
 
977
      /* With a synchronized write/read cache we won't come here... */
 
978
      DBUG_ASSERT(!cshare->source_cache);
 
979
      /*
 
980
        ... unless the writer has gone before this thread entered the
 
981
        lock. Simulate EOF in this case. It can be distinguished by
 
982
        cache->file.
 
983
      */
 
984
      if (cache->file < 0)
 
985
        len= 0;
 
986
      else
 
987
      {
 
988
        /*
 
989
          Whenever a function which operates on IO_CACHE flushes/writes
 
990
          some part of the IO_CACHE to disk it will set the property
 
991
          "seek_not_done" to indicate this to other functions operating
 
992
          on the IO_CACHE.
 
993
        */
 
994
        if (cache->seek_not_done)
 
995
        {
 
996
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
997
              == MY_FILEPOS_ERROR)
 
998
          {
 
999
            cache->error= -1;
 
1000
            unlock_io_cache(cache);
 
1001
            DBUG_RETURN(1);
 
1002
          }
 
1003
        }
 
1004
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
1005
      }
 
1006
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
 
1007
 
 
1008
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
1009
      cache->error=       (len == length ? 0 : (int) len);
 
1010
      cache->pos_in_file= pos_in_file;
 
1011
 
 
1012
      /* Copy important values to the share. */
 
1013
      cshare->error=       cache->error;
 
1014
      cshare->read_end=    cache->read_end;
 
1015
      cshare->pos_in_file= pos_in_file;
 
1016
 
 
1017
      /* Mark all threads as running and wake them. */
 
1018
      unlock_io_cache(cache);
 
1019
    }
 
1020
    else
 
1021
    {
 
1022
      /*
 
1023
        With a synchronized write/read cache readers always come here.
 
1024
        Copy important values from the share.
 
1025
      */
 
1026
      cache->error=       cshare->error;
 
1027
      cache->read_end=    cshare->read_end;
 
1028
      cache->pos_in_file= cshare->pos_in_file;
 
1029
 
 
1030
      len= ((cache->error == -1) ? (size_t) -1 :
 
1031
            (size_t) (cache->read_end - cache->buffer));
 
1032
    }
 
1033
    cache->read_pos=      cache->buffer;
 
1034
    cache->seek_not_done= 0;
 
1035
    if (len == 0 || len == (size_t) -1)
 
1036
    {
 
1037
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
 
1038
                                    (ulong) len, (ulong) left_length));
 
1039
      cache->error= (int) left_length;
 
1040
      DBUG_RETURN(1);
 
1041
    }
 
1042
    cnt= (len > Count) ? Count : len;
 
1043
    memcpy(Buffer, cache->read_pos, cnt);
 
1044
    Count -= cnt;
 
1045
    Buffer+= cnt;
 
1046
    left_length+= cnt;
 
1047
    cache->read_pos+= cnt;
 
1048
  }
 
1049
  DBUG_RETURN(0);
 
1050
}
 
1051
 
 
1052
 
 
1053
/*
 
1054
  Copy data from write cache to read cache.
 
1055
 
 
1056
  SYNOPSIS
 
1057
    copy_to_read_buffer()
 
1058
      write_cache               The write cache.
 
1059
      write_buffer              The source of data, mostly the cache buffer.
 
1060
      write_length              The number of bytes to copy.
 
1061
 
 
1062
  NOTE
 
1063
    The write thread will wait for all read threads to join the cache
 
1064
    lock. Then it copies the data over and wakes the read threads.
 
1065
 
 
1066
  RETURN
 
1067
    void
 
1068
*/
 
1069
 
 
1070
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1071
                                const uchar *write_buffer, size_t write_length)
 
1072
{
 
1073
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1074
 
 
1075
  DBUG_ASSERT(cshare->source_cache == write_cache);
 
1076
  /*
 
1077
    write_length is usually less or equal to buffer_length.
 
1078
    It can be bigger if _my_b_write() is called with a big length.
 
1079
  */
 
1080
  while (write_length)
 
1081
  {
 
1082
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1083
    int  __attribute__((unused)) rc;
 
1084
 
 
1085
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1086
    /* The writing thread does always have the lock when it awakes. */
 
1087
    DBUG_ASSERT(rc);
 
1088
 
 
1089
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1090
 
 
1091
    cshare->error=       0;
 
1092
    cshare->read_end=    cshare->buffer + copy_length;
 
1093
    cshare->pos_in_file= write_cache->pos_in_file;
 
1094
 
 
1095
    /* Mark all threads as running and wake them. */
 
1096
    unlock_io_cache(write_cache);
 
1097
 
 
1098
    write_buffer+= copy_length;
 
1099
    write_length-= copy_length;
 
1100
  }
 
1101
}
 
1102
 
 
1103
 
 
1104
/*
 
1105
  Do sequential read from the SEQ_READ_APPEND cache.
 
1106
  
 
1107
  We do this in three stages:
 
1108
   - first read from info->buffer
 
1109
   - then if there are still data to read, try the file descriptor
 
1110
   - afterwards, if there are still data to read, try append buffer
 
1111
 
 
1112
  RETURNS
 
1113
    0  Success
 
1114
    1  Failed to read
 
1115
*/
 
1116
 
 
1117
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1118
{
 
1119
  size_t length, diff_length, left_length, save_count, max_length;
 
1120
  my_off_t pos_in_file;
 
1121
  save_count=Count;
 
1122
 
 
1123
  /* first, read the regular buffer */
 
1124
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1125
  {
 
1126
    DBUG_ASSERT(Count > left_length);   /* User is not using my_b_read() */
 
1127
    memcpy(Buffer,info->read_pos, left_length);
 
1128
    Buffer+=left_length;
 
1129
    Count-=left_length;
 
1130
  }
 
1131
  lock_append_buffer(info);
 
1132
 
 
1133
  /* pos_in_file always point on where info->buffer was read */
 
1134
  if ((pos_in_file=info->pos_in_file +
 
1135
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1136
    goto read_append_buffer;
 
1137
 
 
1138
  /*
 
1139
    With read-append cache we must always do a seek before we read,
 
1140
    because the write could have moved the file pointer astray
 
1141
  */
 
1142
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1143
  {
 
1144
   info->error= -1;
 
1145
   unlock_append_buffer(info);
 
1146
   return (1);
 
1147
  }
 
1148
  info->seek_not_done=0;
 
1149
 
 
1150
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1151
 
 
1152
  /* now the second stage begins - read from file descriptor */
 
1153
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1154
  {
 
1155
    /* Fill first intern buffer */
 
1156
    size_t read_length;
 
1157
 
 
1158
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1159
    if ((read_length= my_read(info->file,Buffer, length,
 
1160
                              info->myflags)) == (size_t) -1)
 
1161
    {
 
1162
      info->error= -1;
 
1163
      unlock_append_buffer(info);
 
1164
      return 1;
 
1165
    }
 
1166
    Count-=read_length;
 
1167
    Buffer+=read_length;
 
1168
    pos_in_file+=read_length;
 
1169
 
 
1170
    if (read_length != length)
 
1171
    {
 
1172
      /*
 
1173
        We only got part of data;  Read the rest of the data from the
 
1174
        write buffer
 
1175
      */
 
1176
      goto read_append_buffer;
 
1177
    }
 
1178
    left_length+=length;
 
1179
    diff_length=0;
 
1180
  }
 
1181
 
 
1182
  max_length= info->read_length-diff_length;
 
1183
  if (max_length > (info->end_of_file - pos_in_file))
 
1184
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1185
  if (!max_length)
 
1186
  {
 
1187
    if (Count)
 
1188
      goto read_append_buffer;
 
1189
    length=0;                           /* Didn't read any more chars */
 
1190
  }
 
1191
  else
 
1192
  {
 
1193
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1194
    if (length == (size_t) -1)
 
1195
    {
 
1196
      info->error= -1;
 
1197
      unlock_append_buffer(info);
 
1198
      return 1;
 
1199
    }
 
1200
    if (length < Count)
 
1201
    {
 
1202
      memcpy(Buffer, info->buffer, length);
 
1203
      Count -= length;
 
1204
      Buffer += length;
 
1205
 
 
1206
      /*
 
1207
         added the line below to make
 
1208
         DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
 
1209
         otherwise this does not appear to be needed
 
1210
      */
 
1211
      pos_in_file += length;
 
1212
      goto read_append_buffer;
 
1213
    }
 
1214
  }
 
1215
  unlock_append_buffer(info);
 
1216
  info->read_pos=info->buffer+Count;
 
1217
  info->read_end=info->buffer+length;
 
1218
  info->pos_in_file=pos_in_file;
 
1219
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1220
  return 0;
 
1221
 
 
1222
read_append_buffer:
 
1223
 
 
1224
  /*
 
1225
     Read data from the current write buffer.
 
1226
     Count should never be == 0 here (The code will work even if count is 0)
 
1227
  */
 
1228
 
 
1229
  {
 
1230
    /* First copy the data to Count */
 
1231
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1232
    size_t copy_len;
 
1233
    size_t transfer_len;
 
1234
 
 
1235
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
 
1236
    /*
 
1237
      TODO: figure out if the assert below is needed or correct.
 
1238
    */
 
1239
    DBUG_ASSERT(pos_in_file == info->end_of_file);
 
1240
    copy_len=min(Count, len_in_buff);
 
1241
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1242
    info->append_read_pos += copy_len;
 
1243
    Count -= copy_len;
 
1244
    if (Count)
 
1245
      info->error = save_count - Count;
 
1246
 
 
1247
    /* Fill read buffer with data from write buffer */
 
1248
    memcpy(info->buffer, info->append_read_pos,
 
1249
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1250
    info->read_pos= info->buffer;
 
1251
    info->read_end= info->buffer+transfer_len;
 
1252
    info->append_read_pos=info->write_pos;
 
1253
    info->pos_in_file=pos_in_file+copy_len;
 
1254
    info->end_of_file+=len_in_buff;
 
1255
  }
 
1256
  unlock_append_buffer(info);
 
1257
  return Count ? 1 : 0;
530
1258
}
531
1259
 
532
1260
 
533
1261
#ifdef HAVE_AIOWAIT
534
1262
 
535
 
/**
536
 
 * @brief
537
 
 *   Read from the IO_CACHE into a buffer and feed asynchronously from disk when needed.
538
 
 *
539
 
 * @param info IO_CACHE pointer
540
 
 * @param Buffer Buffer to retrieve count bytes from file
541
 
 * @param Count Number of bytes to read into Buffer
542
 
 * 
543
 
 * @retval -1 An error has occurred; errno is set.
544
 
 * @retval 0 Success
545
 
 * @retval 1 An error has occurred; IO_CACHE to error state.
546
 
 */
547
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1263
/*
 
1264
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1265
  from disk when needed.
 
1266
 
 
1267
  SYNOPSIS
 
1268
    _my_b_async_read()
 
1269
      info                      IO_CACHE pointer
 
1270
      Buffer                    Buffer to retrieve count bytes from file
 
1271
      Count                     Number of bytes to read into Buffer
 
1272
 
 
1273
  RETURN VALUE
 
1274
    -1          An error has occurred; my_errno is set.
 
1275
     0          Success
 
1276
     1          An error has occurred; IO_CACHE to error state.
 
1277
*/
 
1278
 
 
1279
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
548
1280
{
549
1281
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
550
1282
  size_t max_length;
551
1283
  my_off_t next_pos_in_file;
552
 
  unsigned char *read_buffer;
 
1284
  uchar *read_buffer;
553
1285
 
554
1286
  memcpy(Buffer,info->read_pos,
555
1287
         (left_length= (size_t) (info->read_end-info->read_pos)));
567
1299
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
568
1300
                 my_filename(info->file),
569
1301
                 info->aio_result.result.aio_errno);
570
 
      errno=info->aio_result.result.aio_errno;
 
1302
      my_errno=info->aio_result.result.aio_errno;
571
1303
      info->error= -1;
572
1304
      return(1);
573
1305
    }
574
1306
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
575
1307
        read_length == (size_t) -1)
576
1308
    {
577
 
      errno=0;                          /* For testing */
 
1309
      my_errno=0;                               /* For testing */
578
1310
      info->error= (read_length == (size_t) -1 ? -1 :
579
1311
                    (int) (read_length+left_length));
580
1312
      return(1);
606
1338
        read_length-=offset;                    /* Bytes left from read_pos */
607
1339
      }
608
1340
    }
 
1341
#ifndef DBUG_OFF
 
1342
    if (info->aio_read_pos > info->pos_in_file)
 
1343
    {
 
1344
      my_errno=EINVAL;
 
1345
      return(info->read_length= (size_t) -1);
 
1346
    }
 
1347
#endif
609
1348
        /* Copy found bytes to buffer */
610
1349
    length=min(Count,read_length);
611
1350
    memcpy(Buffer,info->read_pos,(size_t) length);
627
1366
      info->error=(int) (read_length+left_length);
628
1367
      return 1;
629
1368
    }
630
 
 
631
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1369
    
 
1370
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1371
        == MY_FILEPOS_ERROR)
632
1372
    {
633
1373
      info->error= -1;
634
1374
      return (1);
651
1391
      {                                 /* Didn't find hole block */
652
1392
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
653
1393
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
654
 
                   my_filename(info->file),errno);
 
1394
                   my_filename(info->file),my_errno);
655
1395
        info->error=(int) (read_length+left_length);
656
1396
        return 1;
657
1397
      }
683
1423
  if (max_length)
684
1424
  {
685
1425
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
1426
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
 
1427
                          (ulong) next_pos_in_file, (ulong) max_length));
686
1428
    if (aioread(info->file,read_buffer, max_length,
687
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1429
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
688
1430
                &info->aio_result.result))
689
1431
    {                                           /* Skip async io */
690
 
      errno=errno;
 
1432
      my_errno=errno;
 
1433
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
 
1434
                          errno, info->aio_result.result.aio_errno));
691
1435
      if (info->request_pos != info->buffer)
692
1436
      {
693
 
        memmove(info->buffer, info->request_pos,
694
 
                (size_t) (info->read_end - info->read_pos));
 
1437
        bmove(info->buffer,info->request_pos,
 
1438
              (size_t) (info->read_end - info->read_pos));
695
1439
        info->request_pos=info->buffer;
696
1440
        info->read_pos-=info->read_length;
697
1441
        info->read_end-=info->read_length;
707
1451
#endif
708
1452
 
709
1453
 
710
 
/**
711
 
 * @brief
712
 
 *   Read one byte when buffer is empty
713
 
 */
 
1454
/* Read one byte when buffer is empty */
 
1455
 
714
1456
int _my_b_get(IO_CACHE *info)
715
1457
{
716
 
  unsigned char buff;
 
1458
  uchar buff;
717
1459
  IO_CACHE_CALLBACK pre_read,post_read;
718
1460
  if ((pre_read = info->pre_read))
719
1461
    (*pre_read)(info);
721
1463
    return my_b_EOF;
722
1464
  if ((post_read = info->post_read))
723
1465
    (*post_read)(info);
724
 
  return (int) (unsigned char) buff;
 
1466
  return (int) (uchar) buff;
725
1467
}
726
1468
 
727
 
/**
728
 
 * @brief
729
 
 *   Write a byte buffer to IO_CACHE and flush to disk if IO_CACHE is full.
730
 
 *
731
 
 * @retval -1 On error; errno contains error code.
732
 
 * @retval 0 On success
733
 
 * @retval 1 On error on write
734
 
 */
735
 
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1469
/* 
 
1470
   Write a byte buffer to IO_CACHE and flush to disk
 
1471
   if IO_CACHE is full.
 
1472
 
 
1473
   RETURN VALUE
 
1474
    1 On error on write
 
1475
    0 On success
 
1476
   -1 On error; my_errno contains error code.
 
1477
*/
 
1478
 
 
1479
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
736
1480
{
737
1481
  size_t rest_length,length;
738
1482
 
739
1483
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
740
1484
  {
741
 
    errno=errno=EFBIG;
 
1485
    my_errno=errno=EFBIG;
742
1486
    return info->error = -1;
743
1487
  }
744
1488
 
761
1505
        "seek_not_done" to indicate this to other functions operating
762
1506
        on the IO_CACHE.
763
1507
      */
764
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1508
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
765
1509
      {
766
1510
        info->error= -1;
767
1511
        return (1);
771
1515
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
772
1516
      return info->error= -1;
773
1517
 
 
1518
    /*
 
1519
      In case of a shared I/O cache with a writer we normally do direct
 
1520
      write cache to read cache copy. Simulate this here by direct
 
1521
      caller buffer to read cache copy. Do it after the write so that
 
1522
      the cache readers actions on the flushed part can go in parallel
 
1523
      with the write of the extra stuff. copy_to_read_buffer()
 
1524
      synchronizes writer and readers so that after this call the
 
1525
      readers can act on the extra stuff while the writer can go ahead
 
1526
      and prepare the next output. copy_to_read_buffer() relies on
 
1527
      info->pos_in_file.
 
1528
    */
 
1529
    if (info->share)
 
1530
      copy_to_read_buffer(info, Buffer, length);
 
1531
 
774
1532
    Count-=length;
775
1533
    Buffer+=length;
776
1534
    info->pos_in_file+=length;
780
1538
  return 0;
781
1539
}
782
1540
 
783
 
/**
784
 
 * @brief
785
 
 *   Write a block to disk where part of the data may be inside the record buffer.
786
 
 *   As all write calls to the data goes through the cache,
787
 
 *   we will never get a seek over the end of the buffer.
788
 
 */
789
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
1541
 
 
1542
/*
 
1543
  Append a block to the write buffer.
 
1544
  This is done with the buffer locked to ensure that we don't read from
 
1545
  the write buffer before we are ready with it.
 
1546
*/
 
1547
 
 
1548
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1549
{
 
1550
  size_t rest_length,length;
 
1551
 
 
1552
  /*
 
1553
    Assert that we cannot come here with a shared cache. If we do one
 
1554
    day, we might need to add a call to copy_to_read_buffer().
 
1555
  */
 
1556
  DBUG_ASSERT(!info->share);
 
1557
 
 
1558
  lock_append_buffer(info);
 
1559
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1560
  if (Count <= rest_length)
 
1561
    goto end;
 
1562
  memcpy(info->write_pos, Buffer, rest_length);
 
1563
  Buffer+=rest_length;
 
1564
  Count-=rest_length;
 
1565
  info->write_pos+=rest_length;
 
1566
  if (my_b_flush_io_cache(info,0))
 
1567
  {
 
1568
    unlock_append_buffer(info);
 
1569
    return 1;
 
1570
  }
 
1571
  if (Count >= IO_SIZE)
 
1572
  {                                     /* Fill first intern buffer */
 
1573
    length=Count & (size_t) ~(IO_SIZE-1);
 
1574
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1575
    {
 
1576
      unlock_append_buffer(info);
 
1577
      return info->error= -1;
 
1578
    }
 
1579
    Count-=length;
 
1580
    Buffer+=length;
 
1581
    info->end_of_file+=length;
 
1582
  }
 
1583
 
 
1584
end:
 
1585
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1586
  info->write_pos+=Count;
 
1587
  unlock_append_buffer(info);
 
1588
  return 0;
 
1589
}
 
1590
 
 
1591
 
 
1592
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1593
{
 
1594
  /*
 
1595
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1596
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1597
    several layers of ternary operators that evaluated comma(,) operator
 
1598
    expressions inside - I do have a test case if somebody wants it
 
1599
  */
 
1600
  if (info->type == SEQ_READ_APPEND)
 
1601
    return my_b_append(info, Buffer, Count);
 
1602
  return my_b_write(info, Buffer, Count);
 
1603
}
 
1604
 
 
1605
 
 
1606
/*
 
1607
  Write a block to disk where part of the data may be inside the record
 
1608
  buffer.  As all write calls to the data goes through the cache,
 
1609
  we will never get a seek over the end of the buffer
 
1610
*/
 
1611
 
 
1612
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
790
1613
                   my_off_t pos)
791
1614
{
792
1615
  size_t length;
793
1616
  int error=0;
794
1617
 
 
1618
  /*
 
1619
    Assert that we cannot come here with a shared cache. If we do one
 
1620
    day, we might need to add a call to copy_to_read_buffer().
 
1621
  */
 
1622
  DBUG_ASSERT(!info->share);
 
1623
 
795
1624
  if (pos < info->pos_in_file)
796
1625
  {
797
1626
    /* Of no overlap, write everything without buffering */
798
1627
    if (pos + Count <= info->pos_in_file)
799
1628
      return (pwrite(info->file, Buffer, Count, pos) == 0);
800
1629
    /* Write the part of the block that is before buffer */
801
 
    length= (uint32_t) (info->pos_in_file - pos);
 
1630
    length= (uint) (info->pos_in_file - pos);
802
1631
    if (pwrite(info->file, Buffer, length, pos) == 0)
803
1632
      info->error= error= -1;
804
1633
    Buffer+=length;
805
1634
    pos+=  length;
806
1635
    Count-= length;
 
1636
#ifndef HAVE_PREAD
 
1637
    info->seek_not_done=1;
 
1638
#endif
807
1639
  }
808
1640
 
809
1641
  /* Check if we want to write inside the used part of the buffer.*/
829
1661
  return error;
830
1662
}
831
1663
 
832
 
/**
833
 
 * @brief
834
 
 *   Flush write cache 
835
 
 */
 
1664
 
 
1665
        /* Flush write cache */
 
1666
 
 
1667
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1668
  lock_append_buffer(info);
 
1669
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1670
  unlock_append_buffer(info);
 
1671
 
836
1672
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
837
1673
{
838
1674
  size_t length;
839
 
  bool append_cache= false;
 
1675
  my_bool append_cache;
840
1676
  my_off_t pos_in_file;
 
1677
  DBUG_ENTER("my_b_flush_io_cache");
 
1678
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
 
1679
 
 
1680
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1681
    need_append_buffer_lock=0;
841
1682
 
842
1683
  if (info->type == WRITE_CACHE || append_cache)
843
1684
  {
844
1685
    if (info->file == -1)
845
1686
    {
846
1687
      if (real_open_cached_file(info))
847
 
        return((info->error= -1));
 
1688
        DBUG_RETURN((info->error= -1));
848
1689
    }
849
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1690
    LOCK_APPEND_BUFFER;
850
1691
 
851
1692
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
852
1693
    {
 
1694
      /*
 
1695
        In case of a shared I/O cache with a writer we do direct write
 
1696
        cache to read cache copy. Do it before the write here so that
 
1697
        the readers can work in parallel with the write.
 
1698
        copy_to_read_buffer() relies on info->pos_in_file.
 
1699
      */
 
1700
      if (info->share)
 
1701
        copy_to_read_buffer(info, info->write_buffer, length);
 
1702
 
853
1703
      pos_in_file=info->pos_in_file;
854
1704
      /*
855
1705
        If we have append cache, we always open the file with
857
1707
      */
858
1708
      if (!append_cache && info->seek_not_done)
859
1709
      {                                 /* File touched, do seek */
860
 
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1710
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1711
            MY_FILEPOS_ERROR)
861
1712
        {
862
 
          unlock_append_buffer(info, need_append_buffer_lock);
863
 
          return((info->error= -1));
 
1713
          UNLOCK_APPEND_BUFFER;
 
1714
          DBUG_RETURN((info->error= -1));
864
1715
        }
865
1716
        if (!append_cache)
866
1717
          info->seek_not_done=0;
882
1733
      else
883
1734
      {
884
1735
        info->end_of_file+=(info->write_pos-info->append_read_pos);
885
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
886
 
        assert(info->end_of_file == tell_ret);
 
1736
        DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
887
1737
      }
888
1738
 
889
1739
      info->append_read_pos=info->write_pos=info->write_buffer;
890
 
      unlock_append_buffer(info, need_append_buffer_lock);
891
 
      return(info->error);
 
1740
      ++info->disk_writes;
 
1741
      UNLOCK_APPEND_BUFFER;
 
1742
      DBUG_RETURN(info->error);
892
1743
    }
893
1744
  }
894
1745
#ifdef HAVE_AIOWAIT
898
1749
    info->inited=0;
899
1750
  }
900
1751
#endif
901
 
  unlock_append_buffer(info, need_append_buffer_lock);
902
 
  return(0);
 
1752
  UNLOCK_APPEND_BUFFER;
 
1753
  DBUG_RETURN(0);
903
1754
}
904
1755
 
905
 
/**
906
 
 * @brief
907
 
 *   Free an IO_CACHE object
908
 
 * 
909
 
 * @detail
910
 
 *   It's currently safe to call this if one has called init_io_cache()
911
 
 *   on the 'info' object, even if init_io_cache() failed.
912
 
 *   This function is also safe to call twice with the same handle.
913
 
 * 
914
 
 * @param info IO_CACHE Handle to free
915
 
 * 
916
 
 * @retval 0 ok
917
 
 * @retval # Error
918
 
 */
 
1756
/*
 
1757
  Free an IO_CACHE object
 
1758
 
 
1759
  SYNOPSOS
 
1760
    end_io_cache()
 
1761
    info                IO_CACHE Handle to free
 
1762
 
 
1763
  NOTES
 
1764
    It's currently safe to call this if one has called init_io_cache()
 
1765
    on the 'info' object, even if init_io_cache() failed.
 
1766
    This function is also safe to call twice with the same handle.
 
1767
 
 
1768
  RETURN
 
1769
   0  ok
 
1770
   #  Error
 
1771
*/
 
1772
 
919
1773
int end_io_cache(IO_CACHE *info)
920
1774
{
921
1775
  int error=0;
922
1776
  IO_CACHE_CALLBACK pre_close;
 
1777
  DBUG_ENTER("end_io_cache");
 
1778
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
 
1779
 
 
1780
  /*
 
1781
    Every thread must call remove_io_thread(). The last one destroys
 
1782
    the share elements.
 
1783
  */
 
1784
  DBUG_ASSERT(!info->share || !info->share->total_threads);
923
1785
 
924
1786
  if ((pre_close=info->pre_close))
925
1787
  {
931
1793
    info->alloced_buffer=0;
932
1794
    if (info->file != -1)                       /* File doesn't exist */
933
1795
      error= my_b_flush_io_cache(info,1);
934
 
    free((unsigned char*) info->buffer);
935
 
    info->buffer=info->read_pos=(unsigned char*) 0;
936
 
  }
937
 
 
938
 
  return(error);
 
1796
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1797
    info->buffer=info->read_pos=(uchar*) 0;
 
1798
  }
 
1799
  if (info->type == SEQ_READ_APPEND)
 
1800
  {
 
1801
    /* Destroy allocated mutex */
 
1802
    info->type= TYPE_NOT_SET;
 
1803
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1804
  }
 
1805
  DBUG_RETURN(error);
939
1806
} /* end_io_cache */
940
1807
 
941
 
} /* namespace internal */
942
 
} /* namespace drizzled */
 
1808
 
 
1809
/**********************************************************************
 
1810
 Testing of MF_IOCACHE
 
1811
**********************************************************************/
 
1812
 
 
1813
#ifdef MAIN
 
1814
 
 
1815
#include <my_dir.h>
 
1816
 
 
1817
void die(const char* fmt, ...)
 
1818
{
 
1819
  va_list va_args;
 
1820
  va_start(va_args,fmt);
 
1821
  fprintf(stderr,"Error:");
 
1822
  vfprintf(stderr, fmt,va_args);
 
1823
  fprintf(stderr,", errno=%d\n", errno);
 
1824
  exit(1);
 
1825
}
 
1826
 
 
1827
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1828
{
 
1829
  int fd;
 
1830
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1831
    die("Could not open %s", fname);
 
1832
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1833
    die("failed in init_io_cache()");
 
1834
  return fd;
 
1835
}
 
1836
 
 
1837
void close_file(IO_CACHE* info)
 
1838
{
 
1839
  end_io_cache(info);
 
1840
  my_close(info->file, MYF(MY_WME));
 
1841
}
 
1842
 
 
1843
int main(int argc, char** argv)
 
1844
{
 
1845
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1846
  MY_STAT status;
 
1847
  const char* fname="/tmp/iocache.test";
 
1848
  int cache_size=16384;
 
1849
  char llstr_buf[22];
 
1850
  int max_block,total_bytes=0;
 
1851
  int i,num_loops=100,error=0;
 
1852
  char *p;
 
1853
  char* block, *block_end;
 
1854
  MY_INIT(argv[0]);
 
1855
  max_block = cache_size*3;
 
1856
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1857
    die("Not enough memory to allocate test block");
 
1858
  block_end = block + max_block;
 
1859
  for (p = block,i=0; p < block_end;i++)
 
1860
  {
 
1861
    *p++ = (char)i;
 
1862
  }
 
1863
  if (my_stat(fname,&status, MYF(0)) &&
 
1864
      my_delete(fname,MYF(MY_WME)))
 
1865
    {
 
1866
      die("Delete of %s failed, aborting", fname);
 
1867
    }
 
1868
  open_file(fname,&sra_cache, cache_size);
 
1869
  for (i = 0; i < num_loops; i++)
 
1870
  {
 
1871
    char buf[4];
 
1872
    int block_size = abs(rand() % max_block);
 
1873
    int4store(buf, block_size);
 
1874
    if (my_b_append(&sra_cache,buf,4) ||
 
1875
        my_b_append(&sra_cache, block, block_size))
 
1876
      die("write failed");
 
1877
    total_bytes += 4+block_size;
 
1878
  }
 
1879
  close_file(&sra_cache);
 
1880
  my_free(block,MYF(MY_WME));
 
1881
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1882
    die("%s failed to stat, but I had just closed it,\
 
1883
 wonder how that happened");
 
1884
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1885
         llstr(status.st_size,llstr_buf),
 
1886
         total_bytes);
 
1887
  my_delete(fname, MYF(MY_WME));
 
1888
  /* check correctness of tests */
 
1889
  if (total_bytes != status.st_size)
 
1890
  {
 
1891
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1892
supposedly written\n");
 
1893
    error=1;
 
1894
  }
 
1895
  exit(error);
 
1896
  return 0;
 
1897
}
 
1898
#endif