~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

  • Committer: Djellel E. Difallah
  • Date: 2010-03-27 10:10:49 UTC
  • mto: This revision was merged to the branch mainline in revision 1429.
  • Revision ID: ded@ubuntu-20100327101049-oo3arvatjoyku124
merge my_decimal and decimal

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include <config.h>
 
50
#include "config.h"
51
51
 
52
 
#include <drizzled/internal/my_sys.h>
53
 
#include <drizzled/internal/m_string.h>
54
 
#include <drizzled/drizzled.h>
 
52
#include "drizzled/internal/my_sys.h"
 
53
#include "drizzled/internal/m_string.h"
55
54
#ifdef HAVE_AIOWAIT
56
 
#include <drizzled/error.h>
57
 
#include <drizzled/internal/aio_result.h>
 
55
#include "drizzled/error.h"
 
56
#include "drizzled/internal/aio_result.h"
58
57
static void my_aiowait(my_aio_result *result);
59
58
#endif
60
 
#include <drizzled/internal/iocache.h>
 
59
#include "drizzled/internal/iocache.h"
61
60
#include <errno.h>
62
61
#include <drizzled/util/test.h>
63
62
#include <stdlib.h>
70
69
namespace internal
71
70
{
72
71
 
73
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
72
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
 
73
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
 
74
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
 
75
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
 
76
 
 
77
#define lock_append_buffer(info) \
 
78
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
79
#define unlock_append_buffer(info) \
 
80
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
81
 
 
82
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
83
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
84
 
 
85
/*
 
86
  Setup internal pointers inside IO_CACHE
 
87
 
 
88
  SYNOPSIS
 
89
    setup_io_cache()
 
90
    info                IO_CACHE handler
 
91
 
 
92
  NOTES
 
93
    This is called on automaticly on init or reinit of IO_CACHE
 
94
    It must be called externally if one moves or copies an IO_CACHE
 
95
    object.
 
96
*/
 
97
 
 
98
void setup_io_cache(IO_CACHE* info)
126
99
{
127
100
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
101
  if (info->type == WRITE_CACHE)
129
102
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
103
    info->current_pos= &info->write_pos;
 
104
    info->current_end= &info->write_end;
132
105
  }
133
106
  else
134
107
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
108
    info->current_pos= &info->read_pos;
 
109
    info->current_end= &info->read_end;
137
110
  }
138
111
}
139
112
 
140
113
 
141
 
void st_io_cache::init_functions()
 
114
static void
 
115
init_functions(IO_CACHE* info)
142
116
{
 
117
  enum cache_type type= info->type;
143
118
  switch (type) {
144
119
  case READ_NET:
145
120
    /*
150
125
      as myisamchk
151
126
    */
152
127
    break;
 
128
  case SEQ_READ_APPEND:
 
129
    info->read_function = _my_b_seq_read;
 
130
    info->write_function = 0;                   /* Force a core if used */
 
131
    break;
153
132
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
133
    info->read_function =
 
134
                          info->share ? _my_b_read_r :
 
135
                                        _my_b_read;
 
136
    info->write_function = _my_b_write;
156
137
  }
157
138
 
158
 
  setup_io_cache();
 
139
  setup_io_cache(info);
159
140
}
160
141
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
142
 
 
143
/*
 
144
  Initialize an IO_CACHE object
 
145
 
 
146
  SYNOPSOS
 
147
    init_io_cache()
 
148
    info                cache handler to initialize
 
149
    file                File that should be associated to to the handler
 
150
                        If == -1 then real_open_cached_file()
 
151
                        will be called when it's time to open file.
 
152
    cachesize           Size of buffer to allocate for read/write
 
153
                        If == 0 then use my_default_record_cache_size
 
154
    type                Type of cache
 
155
    seek_offset         Where cache should start reading/writing
 
156
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
157
    cache_myflags       Bitmap of differnt flags
 
158
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
159
                        MY_DONT_CHECK_FILESIZE
 
160
 
 
161
  RETURN
 
162
    0  ok
 
163
    #  error
 
164
*/
 
165
 
 
166
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
 
167
                  enum cache_type type, my_off_t seek_offset,
 
168
                  bool use_async_io, myf cache_myflags)
181
169
{
182
170
  size_t min_cache;
183
171
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
172
  my_off_t end_of_file= ~(my_off_t) 0;
185
173
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
174
  info->file= file;
 
175
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
176
  info->pos_in_file= seek_offset;
 
177
  info->pre_close = info->pre_read = info->post_read = 0;
 
178
  info->arg = 0;
 
179
  info->alloced_buffer = 0;
 
180
  info->buffer=0;
 
181
  info->seek_not_done= 0;
194
182
 
195
183
  if (file >= 0)
196
184
  {
201
189
         This kind of object doesn't support seek() or tell(). Don't set a
202
190
         flag that will make us again try to seek() later and fail.
203
191
      */
204
 
      seek_not_done= 0;
 
192
      info->seek_not_done= 0;
205
193
      /*
206
194
        Additionally, if we're supposed to start somewhere other than the
207
195
        the beginning of whatever this file is, then somebody made a bad
210
198
      assert(seek_offset == 0);
211
199
    }
212
200
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
201
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
214
202
  }
215
203
 
 
204
  info->share=0;
 
205
 
216
206
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
207
    return(1);                          /* No cache requested */
218
208
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
209
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
210
  {                                             /* Assume file isn't growing */
221
211
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
212
    {
223
213
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
214
      end_of_file=lseek(file,0L,SEEK_END);
225
215
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
216
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
217
      if (end_of_file < seek_offset)
 
218
        end_of_file=seek_offset;
229
219
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
220
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
221
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
222
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
223
        use_async_io=0;                         /* No need to use async */
234
224
      }
235
225
    }
236
226
  }
237
227
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
228
  if (type != READ_NET && type != WRITE_NET)
239
229
  {
240
230
    /* Retry allocating memory in smaller blocks until we get one */
241
231
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
235
      if (cachesize < min_cache)
246
236
        cachesize = min_cache;
247
237
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
 
238
      if (type == SEQ_READ_APPEND)
 
239
        buffer_block *= 2;
 
240
      if ((info->buffer=
255
241
           (unsigned char*) malloc(buffer_block)) != 0)
256
242
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
243
        info->write_buffer=info->buffer;
 
244
        if (type == SEQ_READ_APPEND)
 
245
          info->write_buffer = info->buffer + cachesize;
 
246
        info->alloced_buffer=1;
259
247
        break;                                  /* Enough memory found */
260
248
      }
261
249
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
250
        return(2);                              /* Can't alloc cache */
267
251
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
252
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
253
    }
272
254
  }
273
255
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
256
  info->read_length=info->buffer_length=cachesize;
 
257
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
258
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
259
  if (type == SEQ_READ_APPEND)
 
260
  {
 
261
    info->append_read_pos = info->write_pos = info->write_buffer;
 
262
    info->write_end = info->write_buffer + info->buffer_length;
 
263
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
264
  }
277
265
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
266
  if (type == WRITE_CACHE)
 
267
    info->write_end=
 
268
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
269
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
270
    info->read_end=info->buffer;                /* Nothing in cache */
283
271
 
284
272
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
273
  info->end_of_file= end_of_file;
 
274
  info->error=0;
 
275
  info->type= type;
 
276
  init_functions(info);
289
277
#ifdef HAVE_AIOWAIT
290
278
  if (use_async_io && ! my_disable_async_io)
291
279
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
280
    info->read_length/=2;
 
281
    info->read_function=_my_b_async_read;
294
282
  }
295
 
  inited= aio_result.pending= 0;
 
283
  info->inited=info->aio_result.pending=0;
296
284
#endif
297
 
  return 0;
 
285
  return(0);
298
286
}                                               /* init_io_cache */
299
287
 
300
288
        /* Wait until current request is ready */
319
307
        break;
320
308
    }
321
309
  }
 
310
  return;
322
311
}
323
312
#endif
324
313
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
314
 
 
315
/*
 
316
  Use this to reset cache to re-start reading or to change the type
 
317
  between READ_CACHE <-> WRITE_CACHE
 
318
  If we are doing a reinit of a cache where we have the start of the file
 
319
  in the cache, we are reusing this memory without flushing it to disk.
 
320
*/
 
321
 
 
322
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
323
                        my_off_t seek_offset,
 
324
                        bool use_async_io,
 
325
                        bool clear_cache)
339
326
{
340
327
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
328
  assert(type != READ_NET && info->type != READ_NET &&
 
329
              type != WRITE_NET && info->type != WRITE_NET &&
 
330
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
331
 
344
332
  /* If the whole file is in memory, avoid flushing to disk */
345
333
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
334
      seek_offset >= info->pos_in_file &&
 
335
      seek_offset <= my_b_tell(info))
348
336
  {
349
337
    /* Reuse current buffer without flushing it to disk */
350
338
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
339
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
340
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
341
      info->read_end=info->write_pos;
 
342
      info->end_of_file=my_b_tell(info);
355
343
      /*
356
344
        Trigger a new seek only if we have a valid
357
345
        file handle.
358
346
      */
359
 
      seek_not_done= (file != -1);
 
347
      info->seek_not_done= (info->file != -1);
360
348
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
349
    else if (type == WRITE_CACHE)
362
350
    {
363
 
      if (type == READ_CACHE)
 
351
      if (info->type == READ_CACHE)
364
352
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
353
        info->write_end=info->write_buffer+info->buffer_length;
 
354
        info->seek_not_done=1;
367
355
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
356
      info->end_of_file = ~(my_off_t) 0;
369
357
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
358
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
359
    if (type == WRITE_CACHE)
 
360
      info->write_pos=pos;
373
361
    else
374
 
      read_pos= pos;
 
362
      info->read_pos= pos;
375
363
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
364
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
365
#endif
378
366
  }
379
367
  else
382
370
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
371
      after the current positions should be ignored
384
372
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
373
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
374
      info->end_of_file=my_b_tell(info);
387
375
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
376
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
377
      return(1);
 
378
    info->pos_in_file=seek_offset;
391
379
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
380
    info->seek_not_done=1;
 
381
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
382
    if (type == READ_CACHE)
395
383
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
384
      info->read_end=info->buffer;              /* Nothing in cache */
397
385
    }
398
386
    else
399
387
    {
400
 
      write_end=(buffer + buffer_length -
 
388
      info->write_end=(info->buffer + info->buffer_length -
401
389
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
390
      info->end_of_file= ~(my_off_t) 0;
403
391
    }
404
392
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
393
  info->type=type;
 
394
  info->error=0;
 
395
  init_functions(info);
408
396
 
409
397
#ifdef HAVE_AIOWAIT
410
398
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
399
      ((uint32_t) info->buffer_length <
 
400
       (uint32_t) (info->end_of_file - seek_offset)))
413
401
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
402
    info->read_length=info->buffer_length/2;
 
403
    info->read_function=_my_b_async_read;
416
404
  }
417
 
  inited= 0;
 
405
  info->inited=0;
418
406
#else
419
407
  (void)use_async_io;
420
408
#endif
421
 
  return 0;
 
409
  return(0);
422
410
} /* reinit_io_cache */
423
411
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
412
 
 
413
 
 
414
/*
 
415
  Read buffered.
 
416
 
 
417
  SYNOPSIS
 
418
    _my_b_read()
 
419
      info                      IO_CACHE pointer
 
420
      Buffer                    Buffer to retrieve count bytes from file
 
421
      Count                     Number of bytes to read into Buffer
 
422
 
 
423
  NOTE
 
424
    This function is only called from the my_b_read() macro when there
 
425
    isn't enough characters in the buffer to satisfy the request.
 
426
 
 
427
  WARNING
 
428
 
 
429
    When changing this function, be careful with handling file offsets
 
430
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
431
    types than my_off_t unless you can be sure that their value fits.
 
432
    Same applies to differences of file offsets.
 
433
 
 
434
    When changing this function, check _my_b_read_r(). It might need the
 
435
    same change.
 
436
 
 
437
  RETURN
 
438
    0      we succeeded in reading all data
 
439
    1      Error: can't read requested characters
 
440
*/
 
441
 
 
442
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
443
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
444
  size_t length,diff_length,left_length, max_length;
 
445
  my_off_t pos_in_file;
448
446
 
449
447
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
448
  {
455
453
  }
456
454
 
457
455
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
456
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
457
 
460
458
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
459
    Whenever a function which operates on IO_CACHE flushes/writes
 
460
    some part of the IO_CACHE to disk it will set the property
463
461
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
462
    on the IO_CACHE.
465
463
  */
466
464
  if (info->seek_not_done)
467
465
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
466
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
469
467
    {
470
468
      /* No error, reset seek_not_done flag. */
471
469
      info->seek_not_done= 0;
483
481
    }
484
482
  }
485
483
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
484
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
485
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
486
  {                                     /* Fill first intern buffer */
489
487
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
488
    if (info->end_of_file <= pos_in_file)
491
489
    {                                   /* End of file */
492
490
      info->error= (int) left_length;
493
491
      return(1);
494
492
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
493
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
494
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
495
        != length)
497
496
    {
498
497
      info->error= (read_length == (size_t) -1 ? -1 :
499
498
                    (int) (read_length+left_length));
500
499
      return(1);
501
500
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
501
    Count-=length;
 
502
    Buffer+=length;
 
503
    pos_in_file+=length;
 
504
    left_length+=length;
506
505
    diff_length=0;
507
506
  }
508
507
 
509
508
  max_length= info->read_length-diff_length;
510
509
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
510
      max_length > (info->end_of_file - pos_in_file))
 
511
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
512
  if (!max_length)
514
513
  {
515
514
    if (Count)
517
516
      info->error= static_cast<int>(left_length);       /* We only got this many char */
518
517
      return(1);
519
518
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
519
    length=0;                           /* Didn't read any chars */
521
520
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
521
  else if ((length= my_read(info->file,info->buffer, max_length,
523
522
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
523
           length == (size_t) -1)
525
524
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
525
    if (length != (size_t) -1)
 
526
      memcpy(Buffer, info->buffer, length);
 
527
    info->pos_in_file= pos_in_file;
 
528
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
529
    info->read_pos=info->read_end=info->buffer;
531
530
    return(1);
532
531
  }
533
532
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
533
  info->read_end=info->buffer+length;
 
534
  info->pos_in_file=pos_in_file;
536
535
  memcpy(Buffer, info->buffer, Count);
537
536
  return(0);
538
537
}
539
538
 
540
539
 
 
540
/*
 
541
  Prepare IO_CACHE for shared use.
 
542
 
 
543
  SYNOPSIS
 
544
    init_io_cache_share()
 
545
      read_cache                A read cache. This will be copied for
 
546
                                every thread after setup.
 
547
      cshare                    The share.
 
548
      write_cache               If non-NULL a write cache that is to be
 
549
                                synchronized with the read caches.
 
550
      num_threads               Number of threads sharing the cache
 
551
                                including the write thread if any.
 
552
 
 
553
  DESCRIPTION
 
554
 
 
555
    The shared cache is used so: One IO_CACHE is initialized with
 
556
    init_io_cache(). This includes the allocation of a buffer. Then a
 
557
    share is allocated and init_io_cache_share() is called with the io
 
558
    cache and the share. Then the io cache is copied for each thread. So
 
559
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
560
    is shared because cache->buffer is the same for all caches.
 
561
 
 
562
    One thread reads data from the file into the buffer. All threads
 
563
    read from the buffer, but every thread maintains its own set of
 
564
    pointers into the buffer. When all threads have used up the buffer
 
565
    contents, one of the threads reads the next block of data into the
 
566
    buffer. To accomplish this, each thread enters the cache lock before
 
567
    accessing the buffer. They wait in lock_io_cache() until all threads
 
568
    joined the lock. The last thread entering the lock is in charge of
 
569
    reading from file to buffer. It wakes all threads when done.
 
570
 
 
571
    Synchronizing a write cache to the read caches works so: Whenever
 
572
    the write buffer needs a flush, the write thread enters the lock and
 
573
    waits for all other threads to enter the lock too. They do this when
 
574
    they have used up the read buffer. When all threads are in the lock,
 
575
    the write thread copies the write buffer to the read buffer and
 
576
    wakes all threads.
 
577
 
 
578
    share->running_threads is the number of threads not being in the
 
579
    cache lock. When entering lock_io_cache() the number is decreased.
 
580
    When the thread that fills the buffer enters unlock_io_cache() the
 
581
    number is reset to the number of threads. The condition
 
582
    running_threads == 0 means that all threads are in the lock. Bumping
 
583
    up the number to the full count is non-intuitive. But increasing the
 
584
    number by one for each thread that leaves the lock could lead to a
 
585
    solo run of one thread. The last thread to join a lock reads from
 
586
    file to buffer, wakes the other threads, processes the data in the
 
587
    cache and enters the lock again. If no other thread left the lock
 
588
    meanwhile, it would think it's the last one again and read the next
 
589
    block...
 
590
 
 
591
    The share has copies of 'error', 'buffer', 'read_end', and
 
592
    'pos_in_file' from the thread that filled the buffer. We may not be
 
593
    able to access this information directly from its cache because the
 
594
    thread may be removed from the share before the variables could be
 
595
    copied by all other threads. Or, if a write buffer is synchronized,
 
596
    it would change its 'pos_in_file' after waking the other threads,
 
597
    possibly before they could copy its value.
 
598
 
 
599
    However, the 'buffer' variable in the share is for a synchronized
 
600
    write cache. It needs to know where to put the data. Otherwise it
 
601
    would need access to the read cache of one of the threads that is
 
602
    not yet removed from the share.
 
603
 
 
604
  RETURN
 
605
    void
 
606
*/
 
607
 
 
608
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
609
                         IO_CACHE *write_cache, uint32_t num_threads)
 
610
{
 
611
  assert(num_threads > 1);
 
612
  assert(read_cache->type == READ_CACHE);
 
613
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
614
 
 
615
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
616
  pthread_cond_init(&cshare->cond, 0);
 
617
  pthread_cond_init(&cshare->cond_writer, 0);
 
618
 
 
619
  cshare->running_threads= num_threads;
 
620
  cshare->total_threads=   num_threads;
 
621
  cshare->error=           0;    /* Initialize. */
 
622
  cshare->buffer=          read_cache->buffer;
 
623
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
624
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
625
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
626
 
 
627
  read_cache->share=         cshare;
 
628
  read_cache->read_function= _my_b_read_r;
 
629
  read_cache->current_pos=   NULL;
 
630
  read_cache->current_end=   NULL;
 
631
 
 
632
  if (write_cache)
 
633
    write_cache->share= cshare;
 
634
 
 
635
  return;
 
636
}
 
637
 
 
638
 
 
639
/*
 
640
  Remove a thread from shared access to IO_CACHE.
 
641
 
 
642
  SYNOPSIS
 
643
    remove_io_thread()
 
644
      cache                     The IO_CACHE to be removed from the share.
 
645
 
 
646
  NOTE
 
647
 
 
648
    Every thread must do that on exit for not to deadlock other threads.
 
649
 
 
650
    The last thread destroys the pthread resources.
 
651
 
 
652
    A writer flushes its cache first.
 
653
 
 
654
  RETURN
 
655
    void
 
656
*/
 
657
 
 
658
void remove_io_thread(IO_CACHE *cache)
 
659
{
 
660
  IO_CACHE_SHARE *cshare= cache->share;
 
661
  uint32_t total;
 
662
 
 
663
  /* If the writer goes, it needs to flush the write cache. */
 
664
  if (cache == cshare->source_cache)
 
665
    flush_io_cache(cache);
 
666
 
 
667
  pthread_mutex_lock(&cshare->mutex);
 
668
 
 
669
  /* Remove from share. */
 
670
  total= --cshare->total_threads;
 
671
 
 
672
  /* Detach from share. */
 
673
  cache->share= NULL;
 
674
 
 
675
  /* If the writer goes, let the readers know. */
 
676
  if (cache == cshare->source_cache)
 
677
  {
 
678
    cshare->source_cache= NULL;
 
679
  }
 
680
 
 
681
  /* If all threads are waiting for me to join the lock, wake them. */
 
682
  if (!--cshare->running_threads)
 
683
  {
 
684
    pthread_cond_signal(&cshare->cond_writer);
 
685
    pthread_cond_broadcast(&cshare->cond);
 
686
  }
 
687
 
 
688
  pthread_mutex_unlock(&cshare->mutex);
 
689
 
 
690
  if (!total)
 
691
  {
 
692
    pthread_cond_destroy (&cshare->cond_writer);
 
693
    pthread_cond_destroy (&cshare->cond);
 
694
    pthread_mutex_destroy(&cshare->mutex);
 
695
  }
 
696
 
 
697
  return;
 
698
}
 
699
 
 
700
 
 
701
/*
 
702
  Lock IO cache and wait for all other threads to join.
 
703
 
 
704
  SYNOPSIS
 
705
    lock_io_cache()
 
706
      cache                     The cache of the thread entering the lock.
 
707
      pos                       File position of the block to read.
 
708
                                Unused for the write thread.
 
709
 
 
710
  DESCRIPTION
 
711
 
 
712
    Wait for all threads to finish with the current buffer. We want
 
713
    all threads to proceed in concert. The last thread to join
 
714
    lock_io_cache() will read the block from file and all threads start
 
715
    to use it. Then they will join again for reading the next block.
 
716
 
 
717
    The waiting threads detect a fresh buffer by comparing
 
718
    cshare->pos_in_file with the position they want to process next.
 
719
    Since the first block may start at position 0, we take
 
720
    cshare->read_end as an additional condition. This variable is
 
721
    initialized to NULL and will be set after a block of data is written
 
722
    to the buffer.
 
723
 
 
724
  RETURN
 
725
    1           OK, lock in place, go ahead and read.
 
726
    0           OK, unlocked, another thread did the read.
 
727
*/
 
728
 
 
729
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
730
{
 
731
  IO_CACHE_SHARE *cshare= cache->share;
 
732
 
 
733
  /* Enter the lock. */
 
734
  pthread_mutex_lock(&cshare->mutex);
 
735
  cshare->running_threads--;
 
736
 
 
737
  if (cshare->source_cache)
 
738
  {
 
739
    /* A write cache is synchronized to the read caches. */
 
740
 
 
741
    if (cache == cshare->source_cache)
 
742
    {
 
743
      /* The writer waits until all readers are here. */
 
744
      while (cshare->running_threads)
 
745
      {
 
746
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
747
      }
 
748
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
749
      return(1);
 
750
    }
 
751
 
 
752
    /* The last thread wakes the writer. */
 
753
    if (!cshare->running_threads)
 
754
    {
 
755
      pthread_cond_signal(&cshare->cond_writer);
 
756
    }
 
757
 
 
758
    /*
 
759
      Readers wait until the data is copied from the writer. Another
 
760
      reason to stop waiting is the removal of the write thread. If this
 
761
      happens, we leave the lock with old data in the buffer.
 
762
    */
 
763
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
764
           cshare->source_cache)
 
765
    {
 
766
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
767
    }
 
768
 
 
769
    /*
 
770
      If the writer was removed from the share while this thread was
 
771
      asleep, we need to simulate an EOF condition. The writer cannot
 
772
      reset the share variables as they might still be in use by readers
 
773
      of the last block. When we awake here then because the last
 
774
      joining thread signalled us. If the writer is not the last, it
 
775
      will not signal. So it is safe to clear the buffer here.
 
776
    */
 
777
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
778
    {
 
779
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
780
      cshare->error= 0; /* EOF is not an error. */
 
781
    }
 
782
  }
 
783
  else
 
784
  {
 
785
    /*
 
786
      There are read caches only. The last thread arriving in
 
787
      lock_io_cache() continues with a locked cache and reads the block.
 
788
    */
 
789
    if (!cshare->running_threads)
 
790
    {
 
791
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
792
      return(1);
 
793
    }
 
794
 
 
795
    /*
 
796
      All other threads wait until the requested block is read by the
 
797
      last thread arriving. Another reason to stop waiting is the
 
798
      removal of a thread. If this leads to all threads being in the
 
799
      lock, we have to continue also. The first of the awaken threads
 
800
      will then do the read.
 
801
    */
 
802
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
803
           cshare->running_threads)
 
804
    {
 
805
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
806
    }
 
807
 
 
808
    /* If the block is not yet read, continue with a locked cache and read. */
 
809
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
810
    {
 
811
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
812
      return(1);
 
813
    }
 
814
 
 
815
    /* Another thread did read the block already. */
 
816
  }
 
817
 
 
818
  /*
 
819
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
820
    filled the buffer did this and marked all threads as running.
 
821
  */
 
822
  pthread_mutex_unlock(&cshare->mutex);
 
823
  return(0);
 
824
}
 
825
 
 
826
 
 
827
/*
 
828
  Unlock IO cache.
 
829
 
 
830
  SYNOPSIS
 
831
    unlock_io_cache()
 
832
      cache                     The cache of the thread leaving the lock.
 
833
 
 
834
  NOTE
 
835
    This is called by the thread that filled the buffer. It marks all
 
836
    threads as running and awakes them. This must not be done by any
 
837
    other thread.
 
838
 
 
839
    Do not signal cond_writer. Either there is no writer or the writer
 
840
    is the only one who can call this function.
 
841
 
 
842
    The reason for resetting running_threads to total_threads before
 
843
    waking all other threads is that it could be possible that this
 
844
    thread is so fast with processing the buffer that it enters the lock
 
845
    before even one other thread has left it. If every awoken thread
 
846
    would increase running_threads by one, this thread could think that
 
847
    he is again the last to join and would not wait for the other
 
848
    threads to process the data.
 
849
 
 
850
  RETURN
 
851
    void
 
852
*/
 
853
 
 
854
static void unlock_io_cache(IO_CACHE *cache)
 
855
{
 
856
  IO_CACHE_SHARE *cshare= cache->share;
 
857
 
 
858
  cshare->running_threads= cshare->total_threads;
 
859
  pthread_cond_broadcast(&cshare->cond);
 
860
  pthread_mutex_unlock(&cshare->mutex);
 
861
  return;
 
862
}
 
863
 
 
864
 
 
865
/*
 
866
  Read from IO_CACHE when it is shared between several threads.
 
867
 
 
868
  SYNOPSIS
 
869
    _my_b_read_r()
 
870
      cache                     IO_CACHE pointer
 
871
      Buffer                    Buffer to retrieve count bytes from file
 
872
      Count                     Number of bytes to read into Buffer
 
873
 
 
874
  NOTE
 
875
    This function is only called from the my_b_read() macro when there
 
876
    isn't enough characters in the buffer to satisfy the request.
 
877
 
 
878
  IMPLEMENTATION
 
879
 
 
880
    It works as follows: when a thread tries to read from a file (that
 
881
    is, after using all the data from the (shared) buffer), it just
 
882
    hangs on lock_io_cache(), waiting for other threads. When the very
 
883
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
884
    does actual IO and unlock_io_cache(), which signals all the waiting
 
885
    threads that data is in the buffer.
 
886
 
 
887
  WARNING
 
888
 
 
889
    When changing this function, be careful with handling file offsets
 
890
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
891
    types than my_off_t unless you can be sure that their value fits.
 
892
    Same applies to differences of file offsets. (Bug #11527)
 
893
 
 
894
    When changing this function, check _my_b_read(). It might need the
 
895
    same change.
 
896
 
 
897
  RETURN
 
898
    0      we succeeded in reading all data
 
899
    1      Error: can't read requested characters
 
900
*/
 
901
 
 
902
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
903
{
 
904
  my_off_t pos_in_file;
 
905
  size_t length, diff_length, left_length;
 
906
  IO_CACHE_SHARE *cshare= cache->share;
 
907
 
 
908
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
909
  {
 
910
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
911
    memcpy(Buffer, cache->read_pos, left_length);
 
912
    Buffer+= left_length;
 
913
    Count-= left_length;
 
914
  }
 
915
  while (Count)
 
916
  {
 
917
    size_t cnt, len;
 
918
 
 
919
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
920
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
921
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
922
    length= ((length <= cache->read_length) ?
 
923
             length + IO_ROUND_DN(cache->read_length - length) :
 
924
             length - IO_ROUND_UP(length - cache->read_length));
 
925
    if (cache->type != READ_FIFO &&
 
926
        (length > (cache->end_of_file - pos_in_file)))
 
927
      length= (size_t) (cache->end_of_file - pos_in_file);
 
928
    if (length == 0)
 
929
    {
 
930
      cache->error= (int) left_length;
 
931
      return(1);
 
932
    }
 
933
    if (lock_io_cache(cache, pos_in_file))
 
934
    {
 
935
      /* With a synchronized write/read cache we won't come here... */
 
936
      assert(!cshare->source_cache);
 
937
      /*
 
938
        ... unless the writer has gone before this thread entered the
 
939
        lock. Simulate EOF in this case. It can be distinguished by
 
940
        cache->file.
 
941
      */
 
942
      if (cache->file < 0)
 
943
        len= 0;
 
944
      else
 
945
      {
 
946
        /*
 
947
          Whenever a function which operates on IO_CACHE flushes/writes
 
948
          some part of the IO_CACHE to disk it will set the property
 
949
          "seek_not_done" to indicate this to other functions operating
 
950
          on the IO_CACHE.
 
951
        */
 
952
        if (cache->seek_not_done)
 
953
        {
 
954
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
955
          {
 
956
            cache->error= -1;
 
957
            unlock_io_cache(cache);
 
958
            return(1);
 
959
          }
 
960
        }
 
961
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
962
      }
 
963
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
964
      cache->error=       (len == length ? 0 : (int) len);
 
965
      cache->pos_in_file= pos_in_file;
 
966
 
 
967
      /* Copy important values to the share. */
 
968
      cshare->error=       cache->error;
 
969
      cshare->read_end=    cache->read_end;
 
970
      cshare->pos_in_file= pos_in_file;
 
971
 
 
972
      /* Mark all threads as running and wake them. */
 
973
      unlock_io_cache(cache);
 
974
    }
 
975
    else
 
976
    {
 
977
      /*
 
978
        With a synchronized write/read cache readers always come here.
 
979
        Copy important values from the share.
 
980
      */
 
981
      cache->error=       cshare->error;
 
982
      cache->read_end=    cshare->read_end;
 
983
      cache->pos_in_file= cshare->pos_in_file;
 
984
 
 
985
      len= ((cache->error == -1) ? (size_t) -1 :
 
986
            (size_t) (cache->read_end - cache->buffer));
 
987
    }
 
988
    cache->read_pos=      cache->buffer;
 
989
    cache->seek_not_done= 0;
 
990
    if (len == 0 || len == (size_t) -1)
 
991
    {
 
992
      cache->error= (int) left_length;
 
993
      return(1);
 
994
    }
 
995
    cnt= (len > Count) ? Count : len;
 
996
    memcpy(Buffer, cache->read_pos, cnt);
 
997
    Count -= cnt;
 
998
    Buffer+= cnt;
 
999
    left_length+= cnt;
 
1000
    cache->read_pos+= cnt;
 
1001
  }
 
1002
  return(0);
 
1003
}
 
1004
 
 
1005
 
 
1006
/*
 
1007
  Copy data from write cache to read cache.
 
1008
 
 
1009
  SYNOPSIS
 
1010
    copy_to_read_buffer()
 
1011
      write_cache               The write cache.
 
1012
      write_buffer              The source of data, mostly the cache buffer.
 
1013
      write_length              The number of bytes to copy.
 
1014
 
 
1015
  NOTE
 
1016
    The write thread will wait for all read threads to join the cache
 
1017
    lock. Then it copies the data over and wakes the read threads.
 
1018
 
 
1019
  RETURN
 
1020
    void
 
1021
*/
 
1022
 
 
1023
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1024
                                const unsigned char *write_buffer, size_t write_length)
 
1025
{
 
1026
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1027
 
 
1028
  assert(cshare->source_cache == write_cache);
 
1029
  /*
 
1030
    write_length is usually less or equal to buffer_length.
 
1031
    It can be bigger if _my_b_write() is called with a big length.
 
1032
  */
 
1033
  while (write_length)
 
1034
  {
 
1035
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1036
    int  rc;
 
1037
 
 
1038
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1039
    /* The writing thread does always have the lock when it awakes. */
 
1040
    assert(rc);
 
1041
 
 
1042
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1043
 
 
1044
    cshare->error=       0;
 
1045
    cshare->read_end=    cshare->buffer + copy_length;
 
1046
    cshare->pos_in_file= write_cache->pos_in_file;
 
1047
 
 
1048
    /* Mark all threads as running and wake them. */
 
1049
    unlock_io_cache(write_cache);
 
1050
 
 
1051
    write_buffer+= copy_length;
 
1052
    write_length-= copy_length;
 
1053
  }
 
1054
}
 
1055
 
 
1056
 
 
1057
/*
 
1058
  Do sequential read from the SEQ_READ_APPEND cache.
 
1059
 
 
1060
  We do this in three stages:
 
1061
   - first read from info->buffer
 
1062
   - then if there are still data to read, try the file descriptor
 
1063
   - afterwards, if there are still data to read, try append buffer
 
1064
 
 
1065
  RETURNS
 
1066
    0  Success
 
1067
    1  Failed to read
 
1068
*/
 
1069
 
 
1070
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1071
{
 
1072
  size_t length, diff_length, left_length, save_count, max_length;
 
1073
  my_off_t pos_in_file;
 
1074
  save_count=Count;
 
1075
 
 
1076
  /* first, read the regular buffer */
 
1077
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1078
  {
 
1079
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1080
    memcpy(Buffer,info->read_pos, left_length);
 
1081
    Buffer+=left_length;
 
1082
    Count-=left_length;
 
1083
  }
 
1084
  lock_append_buffer(info);
 
1085
 
 
1086
  /* pos_in_file always point on where info->buffer was read */
 
1087
  if ((pos_in_file=info->pos_in_file +
 
1088
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1089
    goto read_append_buffer;
 
1090
 
 
1091
  /*
 
1092
    With read-append cache we must always do a seek before we read,
 
1093
    because the write could have moved the file pointer astray
 
1094
  */
 
1095
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1096
  {
 
1097
   info->error= -1;
 
1098
   unlock_append_buffer(info);
 
1099
   return (1);
 
1100
  }
 
1101
  info->seek_not_done=0;
 
1102
 
 
1103
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1104
 
 
1105
  /* now the second stage begins - read from file descriptor */
 
1106
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1107
  {
 
1108
    /* Fill first intern buffer */
 
1109
    size_t read_length;
 
1110
 
 
1111
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1112
    if ((read_length= my_read(info->file,Buffer, length,
 
1113
                              info->myflags)) == (size_t) -1)
 
1114
    {
 
1115
      info->error= -1;
 
1116
      unlock_append_buffer(info);
 
1117
      return 1;
 
1118
    }
 
1119
    Count-=read_length;
 
1120
    Buffer+=read_length;
 
1121
    pos_in_file+=read_length;
 
1122
 
 
1123
    if (read_length != length)
 
1124
    {
 
1125
      /*
 
1126
        We only got part of data;  Read the rest of the data from the
 
1127
        write buffer
 
1128
      */
 
1129
      goto read_append_buffer;
 
1130
    }
 
1131
    left_length+=length;
 
1132
    diff_length=0;
 
1133
  }
 
1134
 
 
1135
  max_length= info->read_length-diff_length;
 
1136
  if (max_length > (info->end_of_file - pos_in_file))
 
1137
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1138
  if (!max_length)
 
1139
  {
 
1140
    if (Count)
 
1141
      goto read_append_buffer;
 
1142
    length=0;                           /* Didn't read any more chars */
 
1143
  }
 
1144
  else
 
1145
  {
 
1146
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1147
    if (length == (size_t) -1)
 
1148
    {
 
1149
      info->error= -1;
 
1150
      unlock_append_buffer(info);
 
1151
      return 1;
 
1152
    }
 
1153
    if (length < Count)
 
1154
    {
 
1155
      memcpy(Buffer, info->buffer, length);
 
1156
      Count -= length;
 
1157
      Buffer += length;
 
1158
 
 
1159
      /*
 
1160
         added the line below to make
 
1161
         assert(pos_in_file==info->end_of_file) pass.
 
1162
         otherwise this does not appear to be needed
 
1163
      */
 
1164
      pos_in_file += length;
 
1165
      goto read_append_buffer;
 
1166
    }
 
1167
  }
 
1168
  unlock_append_buffer(info);
 
1169
  info->read_pos=info->buffer+Count;
 
1170
  info->read_end=info->buffer+length;
 
1171
  info->pos_in_file=pos_in_file;
 
1172
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1173
  return 0;
 
1174
 
 
1175
read_append_buffer:
 
1176
 
 
1177
  /*
 
1178
     Read data from the current write buffer.
 
1179
     Count should never be == 0 here (The code will work even if count is 0)
 
1180
  */
 
1181
 
 
1182
  {
 
1183
    /* First copy the data to Count */
 
1184
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1185
    size_t copy_len;
 
1186
    size_t transfer_len;
 
1187
 
 
1188
    assert(info->append_read_pos <= info->write_pos);
 
1189
    /*
 
1190
      TODO: figure out if the assert below is needed or correct.
 
1191
    */
 
1192
    assert(pos_in_file == info->end_of_file);
 
1193
    copy_len=min(Count, len_in_buff);
 
1194
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1195
    info->append_read_pos += copy_len;
 
1196
    Count -= copy_len;
 
1197
    if (Count)
 
1198
      info->error = static_cast<int>(save_count - Count);
 
1199
 
 
1200
    /* Fill read buffer with data from write buffer */
 
1201
    memcpy(info->buffer, info->append_read_pos,
 
1202
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1203
    info->read_pos= info->buffer;
 
1204
    info->read_end= info->buffer+transfer_len;
 
1205
    info->append_read_pos=info->write_pos;
 
1206
    info->pos_in_file=pos_in_file+copy_len;
 
1207
    info->end_of_file+=len_in_buff;
 
1208
  }
 
1209
  unlock_append_buffer(info);
 
1210
  return Count ? 1 : 0;
 
1211
}
 
1212
 
 
1213
 
541
1214
#ifdef HAVE_AIOWAIT
542
1215
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1216
/*
 
1217
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1218
  from disk when needed.
 
1219
 
 
1220
  SYNOPSIS
 
1221
    _my_b_async_read()
 
1222
      info                      IO_CACHE pointer
 
1223
      Buffer                    Buffer to retrieve count bytes from file
 
1224
      Count                     Number of bytes to read into Buffer
 
1225
 
 
1226
  RETURN VALUE
 
1227
    -1          An error has occurred; errno is set.
 
1228
     0          Success
 
1229
     1          An error has occurred; IO_CACHE to error state.
 
1230
*/
 
1231
 
 
1232
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1233
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1234
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1235
  size_t max_length;
559
1236
  my_off_t next_pos_in_file;
560
1237
  unsigned char *read_buffer;
615
1292
      }
616
1293
    }
617
1294
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1295
    length=min(Count,read_length);
 
1296
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1297
    Buffer+=length;
 
1298
    Count-=length;
 
1299
    left_length+=length;
623
1300
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1301
    info->read_pos+=length;
625
1302
  }
626
1303
  else
627
1304
    next_pos_in_file=(info->pos_in_file+ (size_t)
715
1392
#endif
716
1393
 
717
1394
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1395
/* Read one byte when buffer is empty */
 
1396
 
 
1397
int _my_b_get(IO_CACHE *info)
723
1398
{
724
1399
  unsigned char buff;
725
1400
  IO_CACHE_CALLBACK pre_read,post_read;
732
1407
  return (int) (unsigned char) buff;
733
1408
}
734
1409
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1410
/*
 
1411
   Write a byte buffer to IO_CACHE and flush to disk
 
1412
   if IO_CACHE is full.
 
1413
 
 
1414
   RETURN VALUE
 
1415
    1 On error on write
 
1416
    0 On success
 
1417
   -1 On error; errno contains error code.
 
1418
*/
 
1419
 
 
1420
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1421
{
745
 
  size_t rest_length,length_local;
 
1422
  size_t rest_length,length;
746
1423
 
747
1424
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1425
  {
749
 
    errno=EFBIG;
 
1426
    errno=errno=EFBIG;
750
1427
    return info->error = -1;
751
1428
  }
752
1429
 
760
1437
    return 1;
761
1438
  if (Count >= IO_SIZE)
762
1439
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1440
    length=Count & (size_t) ~(IO_SIZE-1);
764
1441
    if (info->seek_not_done)
765
1442
    {
766
1443
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1444
        Whenever a function which operates on IO_CACHE flushes/writes
 
1445
        some part of the IO_CACHE to disk it will set the property
769
1446
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1447
        on the IO_CACHE.
771
1448
      */
772
1449
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
773
1450
      {
776
1453
      }
777
1454
      info->seek_not_done=0;
778
1455
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
 
1456
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
780
1457
      return info->error= -1;
781
1458
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
 
1459
    /*
 
1460
      In case of a shared I/O cache with a writer we normally do direct
 
1461
      write cache to read cache copy. Simulate this here by direct
 
1462
      caller buffer to read cache copy. Do it after the write so that
 
1463
      the cache readers actions on the flushed part can go in parallel
 
1464
      with the write of the extra stuff. copy_to_read_buffer()
 
1465
      synchronizes writer and readers so that after this call the
 
1466
      readers can act on the extra stuff while the writer can go ahead
 
1467
      and prepare the next output. copy_to_read_buffer() relies on
 
1468
      info->pos_in_file.
 
1469
    */
 
1470
    if (info->share)
 
1471
      copy_to_read_buffer(info, Buffer, length);
 
1472
 
 
1473
    Count-=length;
 
1474
    Buffer+=length;
 
1475
    info->pos_in_file+=length;
785
1476
  }
786
1477
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
1478
  info->write_pos+=Count;
788
1479
  return 0;
789
1480
}
790
1481
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1482
/*
 
1483
  Write a block to disk where part of the data may be inside the record
 
1484
  buffer.  As all write calls to the data goes through the cache,
 
1485
  we will never get a seek over the end of the buffer
 
1486
*/
 
1487
 
 
1488
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1489
                   my_off_t pos)
799
1490
{
800
 
  size_t length_local;
 
1491
  size_t length;
801
1492
  int error=0;
802
1493
 
 
1494
  /*
 
1495
    Assert that we cannot come here with a shared cache. If we do one
 
1496
    day, we might need to add a call to copy_to_read_buffer().
 
1497
  */
 
1498
  assert(!info->share);
 
1499
 
803
1500
  if (pos < info->pos_in_file)
804
1501
  {
805
1502
    /* Of no overlap, write everything without buffering */
806
1503
    if (pos + Count <= info->pos_in_file)
807
1504
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1505
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1506
    length= (uint32_t) (info->pos_in_file - pos);
 
1507
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1508
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1509
    Buffer+=length;
 
1510
    pos+=  length;
 
1511
    Count-= length;
815
1512
  }
816
1513
 
817
1514
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1515
  length= (size_t) (info->write_end - info->buffer);
 
1516
  if (pos < info->pos_in_file + length)
820
1517
  {
821
1518
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1519
    length-=offset;
 
1520
    if (length > Count)
 
1521
      length=Count;
 
1522
    memcpy(info->buffer+offset, Buffer, length);
 
1523
    Buffer+=length;
 
1524
    Count-= length;
 
1525
    /* Fix length of buffer if the new data was larger */
 
1526
    if (info->buffer+length > info->write_pos)
 
1527
      info->write_pos=info->buffer+length;
831
1528
    if (!Count)
832
1529
      return (error);
833
1530
  }
837
1534
  return error;
838
1535
}
839
1536
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1537
 
 
1538
        /* Flush write cache */
 
1539
 
 
1540
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1541
  lock_append_buffer(info);
 
1542
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1543
  unlock_append_buffer(info);
 
1544
 
 
1545
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1546
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1547
  size_t length;
 
1548
  bool append_cache;
 
1549
  my_off_t pos_in_file;
 
1550
 
 
1551
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1552
    need_append_buffer_lock=0;
849
1553
 
850
1554
  if (info->type == WRITE_CACHE || append_cache)
851
1555
  {
852
1556
    if (info->file == -1)
853
1557
    {
854
 
      if (info->real_open_cached_file())
 
1558
      if (real_open_cached_file(info))
855
1559
        return((info->error= -1));
856
1560
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1561
    LOCK_APPEND_BUFFER;
858
1562
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1563
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1564
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1565
      /*
 
1566
        In case of a shared I/O cache with a writer we do direct write
 
1567
        cache to read cache copy. Do it before the write here so that
 
1568
        the readers can work in parallel with the write.
 
1569
        copy_to_read_buffer() relies on info->pos_in_file.
 
1570
      */
 
1571
      if (info->share)
 
1572
        copy_to_read_buffer(info, info->write_buffer, length);
 
1573
 
 
1574
      pos_in_file=info->pos_in_file;
862
1575
      /*
863
1576
        If we have append cache, we always open the file with
864
1577
        O_APPEND which moves the pos to EOF automatically on every write
865
1578
      */
866
1579
      if (!append_cache && info->seek_not_done)
867
1580
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1581
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
869
1582
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1583
          UNLOCK_APPEND_BUFFER;
871
1584
          return((info->error= -1));
872
1585
        }
873
1586
        if (!append_cache)
874
1587
          info->seek_not_done=0;
875
1588
      }
876
1589
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1590
        info->pos_in_file+=length;
878
1591
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1592
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1593
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1594
      if (my_write(info->file,info->write_buffer,length,
882
1595
                   info->myflags | MY_NABP))
883
1596
        info->error= -1;
884
1597
      else
885
1598
        info->error= 0;
886
1599
      if (!append_cache)
887
1600
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1601
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1602
      }
890
1603
      else
891
1604
      {
895
1608
      }
896
1609
 
897
1610
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1611
      UNLOCK_APPEND_BUFFER;
899
1612
      return(info->error);
900
1613
    }
901
1614
  }
906
1619
    info->inited=0;
907
1620
  }
908
1621
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1622
  UNLOCK_APPEND_BUFFER;
910
1623
  return(0);
911
1624
}
912
1625
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1626
/*
 
1627
  Free an IO_CACHE object
 
1628
 
 
1629
  SYNOPSOS
 
1630
    end_io_cache()
 
1631
    info                IO_CACHE Handle to free
 
1632
 
 
1633
  NOTES
 
1634
    It's currently safe to call this if one has called init_io_cache()
 
1635
    on the 'info' object, even if init_io_cache() failed.
 
1636
    This function is also safe to call twice with the same handle.
 
1637
 
 
1638
  RETURN
 
1639
   0  ok
 
1640
   #  Error
 
1641
*/
 
1642
 
 
1643
int end_io_cache(IO_CACHE *info)
928
1644
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1645
  int error=0;
 
1646
  IO_CACHE_CALLBACK pre_close;
 
1647
 
 
1648
  /*
 
1649
    Every thread must call remove_io_thread(). The last one destroys
 
1650
    the share elements.
 
1651
  */
 
1652
  assert(!info->share || !info->share->total_threads);
 
1653
 
 
1654
  if ((pre_close=info->pre_close))
 
1655
  {
 
1656
    (*pre_close)(info);
 
1657
    info->pre_close= 0;
 
1658
  }
 
1659
  if (info->alloced_buffer)
 
1660
  {
 
1661
    info->alloced_buffer=0;
 
1662
    if (info->file != -1)                       /* File doesn't exist */
 
1663
      error= my_b_flush_io_cache(info,1);
 
1664
    free((unsigned char*) info->buffer);
 
1665
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1666
  }
 
1667
  if (info->type == SEQ_READ_APPEND)
 
1668
  {
 
1669
    /* Destroy allocated mutex */
 
1670
    info->type= TYPE_NOT_SET;
 
1671
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1672
  }
 
1673
  return(error);
948
1674
} /* end_io_cache */
949
1675
 
950
1676
} /* namespace internal */
951
1677
} /* namespace drizzled */
 
1678
 
 
1679
/**********************************************************************
 
1680
 Testing of MF_IOCACHE
 
1681
**********************************************************************/
 
1682
 
 
1683
#ifdef MAIN
 
1684
 
 
1685
void die(const char* fmt, ...)
 
1686
{
 
1687
  va_list va_args;
 
1688
  va_start(va_args,fmt);
 
1689
  fprintf(stderr,"Error:");
 
1690
  vfprintf(stderr, fmt,va_args);
 
1691
  fprintf(stderr,", errno=%d\n", errno);
 
1692
  exit(1);
 
1693
}
 
1694
 
 
1695
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1696
{
 
1697
  int fd;
 
1698
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1699
    die("Could not open %s", fname);
 
1700
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1701
    die("failed in init_io_cache()");
 
1702
  return fd;
 
1703
}
 
1704
 
 
1705
void close_file(IO_CACHE* info)
 
1706
{
 
1707
  end_io_cache(info);
 
1708
  my_close(info->file, MYF(MY_WME));
 
1709
}
 
1710
 
 
1711
int main(int argc, char** argv)
 
1712
{
 
1713
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1714
  MY_STAT status;
 
1715
  const char* fname="/tmp/iocache.test";
 
1716
  int cache_size=16384;
 
1717
  char llstr_buf[22];
 
1718
  int max_block,total_bytes=0;
 
1719
  int i,num_loops=100,error=0;
 
1720
  char *p;
 
1721
  char* block, *block_end;
 
1722
  MY_INIT(argv[0]);
 
1723
  max_block = cache_size*3;
 
1724
  if (!(block=(char*)malloc(max_block)))
 
1725
    die("Not enough memory to allocate test block");
 
1726
  block_end = block + max_block;
 
1727
  for (p = block,i=0; p < block_end;i++)
 
1728
  {
 
1729
    *p++ = (char)i;
 
1730
  }
 
1731
  if (my_stat(fname,&status, MYF(0)) &&
 
1732
      my_delete(fname,MYF(MY_WME)))
 
1733
    {
 
1734
      die("Delete of %s failed, aborting", fname);
 
1735
    }
 
1736
  open_file(fname,&sra_cache, cache_size);
 
1737
  for (i = 0; i < num_loops; i++)
 
1738
  {
 
1739
    char buf[4];
 
1740
    int block_size = abs(rand() % max_block);
 
1741
    int4store(buf, block_size);
 
1742
    if (my_b_append(&sra_cache,buf,4) ||
 
1743
        my_b_append(&sra_cache, block, block_size))
 
1744
      die("write failed");
 
1745
    total_bytes += 4+block_size;
 
1746
  }
 
1747
  close_file(&sra_cache);
 
1748
  free(block);
 
1749
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1750
    die("%s failed to stat, but I had just closed it,\
 
1751
 wonder how that happened");
 
1752
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1753
         llstr(status.st_size,llstr_buf),
 
1754
         total_bytes);
 
1755
  my_delete(fname, MYF(MY_WME));
 
1756
  /* check correctness of tests */
 
1757
  if (total_bytes != status.st_size)
 
1758
  {
 
1759
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1760
supposedly written\n");
 
1761
    error=1;
 
1762
  }
 
1763
  exit(error);
 
1764
  return 0;
 
1765
}
 
1766
#endif