~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

  • Committer: Brian Aker
  • Date: 2010-11-22 00:16:44 UTC
  • mto: (1945.2.1 quick)
  • mto: This revision was merged to the branch mainline in revision 1947.
  • Revision ID: brian@tangent.org-20101122001644-pi6jv0d65e82xn38
Merge in lock refactor, this just encapsulates.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
51
51
 
52
52
#include "drizzled/internal/my_sys.h"
53
53
#include "drizzled/internal/m_string.h"
 
54
#include "drizzled/drizzled.h"
54
55
#ifdef HAVE_AIOWAIT
55
56
#include "drizzled/error.h"
56
57
#include "drizzled/internal/aio_result.h"
69
70
namespace internal
70
71
{
71
72
 
72
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
73
 
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
75
 
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
76
 
 
77
 
#define lock_append_buffer(info) \
78
 
 pthread_mutex_lock(&(info)->append_buffer_lock)
79
 
#define unlock_append_buffer(info) \
80
 
 pthread_mutex_unlock(&(info)->append_buffer_lock)
81
 
 
82
 
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
83
 
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
84
 
 
85
 
/*
86
 
  Setup internal pointers inside IO_CACHE
87
 
 
88
 
  SYNOPSIS
89
 
    setup_io_cache()
90
 
    info                IO_CACHE handler
91
 
 
92
 
  NOTES
93
 
    This is called on automaticly on init or reinit of IO_CACHE
94
 
    It must be called externally if one moves or copies an IO_CACHE
95
 
    object.
96
 
*/
97
 
 
98
 
void setup_io_cache(IO_CACHE* info)
 
73
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
 
74
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
 
75
 
 
76
/**
 
77
 * @brief
 
78
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
 
79
 */
 
80
inline
 
81
static void lock_append_buffer(register st_io_cache *, int )
 
82
{
 
83
}
 
84
 
 
85
/**
 
86
 * @brief
 
87
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
 
88
 */
 
89
inline
 
90
static void unlock_append_buffer(register st_io_cache *, int )
 
91
{
 
92
}
 
93
 
 
94
/**
 
95
 * @brief
 
96
 *   Round up to the nearest IO_SIZE boundary 
 
97
 */
 
98
inline
 
99
static size_t io_round_up(size_t x)
 
100
{
 
101
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
 
102
}
 
103
 
 
104
/**
 
105
 * @brief
 
106
 *   Round down to the nearest IO_SIZE boundary   
 
107
 */
 
108
inline
 
109
static size_t io_round_dn(size_t x)
 
110
{
 
111
  return (x & ~(IO_SIZE-1));
 
112
}
 
113
 
 
114
 
 
115
/**
 
116
 * @brief 
 
117
 *   Setup internal pointers inside st_io_cache
 
118
 * 
 
119
 * @details
 
120
 *   This is called on automatically on init or reinit of st_io_cache
 
121
 *   It must be called externally if one moves or copies an st_io_cache object.
 
122
 * 
 
123
 * @param info Cache handler to setup
 
124
 */
 
125
void st_io_cache::setup_io_cache()
99
126
{
100
127
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
101
 
  if (info->type == WRITE_CACHE)
 
128
  if (type == WRITE_CACHE)
102
129
  {
103
 
    info->current_pos= &info->write_pos;
104
 
    info->current_end= &info->write_end;
 
130
    current_pos= &write_pos;
 
131
    current_end= &write_end;
105
132
  }
106
133
  else
107
134
  {
108
 
    info->current_pos= &info->read_pos;
109
 
    info->current_end= &info->read_end;
 
135
    current_pos= &read_pos;
 
136
    current_end= &read_end;
110
137
  }
111
138
}
112
139
 
113
140
 
114
 
static void
115
 
init_functions(IO_CACHE* info)
 
141
void st_io_cache::init_functions()
116
142
{
117
 
  enum cache_type type= info->type;
118
143
  switch (type) {
119
144
  case READ_NET:
120
145
    /*
125
150
      as myisamchk
126
151
    */
127
152
    break;
128
 
  case SEQ_READ_APPEND:
129
 
    info->read_function = _my_b_seq_read;
130
 
    info->write_function = 0;                   /* Force a core if used */
131
 
    break;
132
153
  default:
133
 
    info->read_function =
134
 
                          info->share ? _my_b_read_r :
135
 
                                        _my_b_read;
136
 
    info->write_function = _my_b_write;
 
154
    read_function = _my_b_read;
 
155
    write_function = _my_b_write;
137
156
  }
138
157
 
139
 
  setup_io_cache(info);
 
158
  setup_io_cache();
140
159
}
141
160
 
142
 
 
143
 
/*
144
 
  Initialize an IO_CACHE object
145
 
 
146
 
  SYNOPSOS
147
 
    init_io_cache()
148
 
    info                cache handler to initialize
149
 
    file                File that should be associated to to the handler
150
 
                        If == -1 then real_open_cached_file()
151
 
                        will be called when it's time to open file.
152
 
    cachesize           Size of buffer to allocate for read/write
153
 
                        If == 0 then use my_default_record_cache_size
154
 
    type                Type of cache
155
 
    seek_offset         Where cache should start reading/writing
156
 
    use_async_io        Set to 1 of we should use async_io (if avaiable)
157
 
    cache_myflags       Bitmap of differnt flags
158
 
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
159
 
                        MY_DONT_CHECK_FILESIZE
160
 
 
161
 
  RETURN
162
 
    0  ok
163
 
    #  error
164
 
*/
165
 
 
166
 
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
167
 
                  enum cache_type type, my_off_t seek_offset,
168
 
                  bool use_async_io, myf cache_myflags)
 
161
/**
 
162
 * @brief
 
163
 *   Initialize an st_io_cache object
 
164
 *
 
165
 * @param file File that should be associated with the handler
 
166
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
 
167
 * @param cachesize Size of buffer to allocate for read/write
 
168
 *                      If == 0 then use my_default_record_cache_size
 
169
 * @param type Type of cache
 
170
 * @param seek_offset Where cache should start reading/writing
 
171
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
 
172
 * @param cache_myflags Bitmap of different flags
 
173
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
 
174
 * 
 
175
 * @retval 0 success
 
176
 * @retval # error
 
177
 */
 
178
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
 
179
                               enum cache_type type_arg, my_off_t seek_offset,
 
180
                               bool use_async_io, myf cache_myflags)
169
181
{
170
182
  size_t min_cache;
171
183
  off_t pos;
172
 
  my_off_t end_of_file= ~(my_off_t) 0;
 
184
  my_off_t end_of_file_local= ~(my_off_t) 0;
173
185
 
174
 
  info->file= file;
175
 
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
176
 
  info->pos_in_file= seek_offset;
177
 
  info->pre_close = info->pre_read = info->post_read = 0;
178
 
  info->arg = 0;
179
 
  info->alloced_buffer = 0;
180
 
  info->buffer=0;
181
 
  info->seek_not_done= 0;
 
186
  file= file_arg;
 
187
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
 
188
  pos_in_file= seek_offset;
 
189
  pre_close = pre_read = post_read = 0;
 
190
  arg = 0;
 
191
  alloced_buffer = 0;
 
192
  buffer=0;
 
193
  seek_not_done= 0;
182
194
 
183
195
  if (file >= 0)
184
196
  {
189
201
         This kind of object doesn't support seek() or tell(). Don't set a
190
202
         flag that will make us again try to seek() later and fail.
191
203
      */
192
 
      info->seek_not_done= 0;
 
204
      seek_not_done= 0;
193
205
      /*
194
206
        Additionally, if we're supposed to start somewhere other than the
195
207
        the beginning of whatever this file is, then somebody made a bad
198
210
      assert(seek_offset == 0);
199
211
    }
200
212
    else
201
 
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
 
213
      seek_not_done= test(seek_offset != (my_off_t)pos);
202
214
  }
203
215
 
204
 
  info->share=0;
205
 
 
206
216
  if (!cachesize && !(cachesize= my_default_record_cache_size))
207
 
    return(1);                          /* No cache requested */
 
217
    return 1;                           /* No cache requested */
208
218
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
209
 
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
 
219
  if (type_arg == READ_CACHE)
210
220
  {                                             /* Assume file isn't growing */
211
221
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
212
222
    {
213
223
      /* Calculate end of file to avoid allocating oversized buffers */
214
 
      end_of_file=lseek(file,0L,SEEK_END);
 
224
      end_of_file_local=lseek(file,0L,SEEK_END);
215
225
      /* Need to reset seek_not_done now that we just did a seek. */
216
 
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
217
 
      if (end_of_file < seek_offset)
218
 
        end_of_file=seek_offset;
 
226
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
 
227
      if (end_of_file_local < seek_offset)
 
228
        end_of_file_local=seek_offset;
219
229
      /* Trim cache size if the file is very small */
220
 
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
 
230
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
221
231
      {
222
 
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
 
232
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
223
233
        use_async_io=0;                         /* No need to use async */
224
234
      }
225
235
    }
226
236
  }
227
237
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
228
 
  if (type != READ_NET && type != WRITE_NET)
 
238
  if (type_arg != READ_NET && type_arg != WRITE_NET)
229
239
  {
230
240
    /* Retry allocating memory in smaller blocks until we get one */
231
241
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
235
245
      if (cachesize < min_cache)
236
246
        cachesize = min_cache;
237
247
      buffer_block= cachesize;
238
 
      if (type == SEQ_READ_APPEND)
239
 
        buffer_block *= 2;
240
 
      if ((info->buffer=
 
248
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
 
249
      {
 
250
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
 
251
        return 2;
 
252
      }
 
253
 
 
254
      if ((buffer=
241
255
           (unsigned char*) malloc(buffer_block)) != 0)
242
256
      {
243
 
        info->write_buffer=info->buffer;
244
 
        if (type == SEQ_READ_APPEND)
245
 
          info->write_buffer = info->buffer + cachesize;
246
 
        info->alloced_buffer=1;
 
257
        write_buffer=buffer;
 
258
        alloced_buffer= true;
247
259
        break;                                  /* Enough memory found */
248
260
      }
249
261
      if (cachesize == min_cache)
250
 
        return(2);                              /* Can't alloc cache */
 
262
      {
 
263
        if (type_arg == READ_CACHE)
 
264
          global_read_buffer.sub(buffer_block);
 
265
        return 2;                               /* Can't alloc cache */
 
266
      }
251
267
      /* Try with less memory */
 
268
      if (type_arg == READ_CACHE)
 
269
        global_read_buffer.sub(buffer_block);
252
270
      cachesize= (cachesize*3/4 & ~(min_cache-1));
253
271
    }
254
272
  }
255
273
 
256
 
  info->read_length=info->buffer_length=cachesize;
257
 
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
258
 
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
259
 
  if (type == SEQ_READ_APPEND)
260
 
  {
261
 
    info->append_read_pos = info->write_pos = info->write_buffer;
262
 
    info->write_end = info->write_buffer + info->buffer_length;
263
 
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
264
 
  }
 
274
  read_length=buffer_length=cachesize;
 
275
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
276
  request_pos= read_pos= write_pos = buffer;
265
277
 
266
 
  if (type == WRITE_CACHE)
267
 
    info->write_end=
268
 
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
 
278
  if (type_arg == WRITE_CACHE)
 
279
    write_end=
 
280
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
269
281
  else
270
 
    info->read_end=info->buffer;                /* Nothing in cache */
 
282
    read_end=buffer;            /* Nothing in cache */
271
283
 
272
284
  /* End_of_file may be changed by user later */
273
 
  info->end_of_file= end_of_file;
274
 
  info->error=0;
275
 
  info->type= type;
276
 
  init_functions(info);
 
285
  end_of_file= end_of_file_local;
 
286
  error= 0;
 
287
  type= type_arg;
 
288
  init_functions();
277
289
#ifdef HAVE_AIOWAIT
278
290
  if (use_async_io && ! my_disable_async_io)
279
291
  {
280
 
    info->read_length/=2;
281
 
    info->read_function=_my_b_async_read;
 
292
    read_length/=2;
 
293
    read_function=_my_b_async_read;
282
294
  }
283
 
  info->inited=info->aio_result.pending=0;
 
295
  inited= aio_result.pending= 0;
284
296
#endif
285
 
  return(0);
 
297
  return 0;
286
298
}                                               /* init_io_cache */
287
299
 
288
300
        /* Wait until current request is ready */
307
319
        break;
308
320
    }
309
321
  }
310
 
  return;
311
322
}
312
323
#endif
313
324
 
314
 
 
315
 
/*
316
 
  Use this to reset cache to re-start reading or to change the type
317
 
  between READ_CACHE <-> WRITE_CACHE
318
 
  If we are doing a reinit of a cache where we have the start of the file
319
 
  in the cache, we are reusing this memory without flushing it to disk.
320
 
*/
321
 
 
322
 
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
323
 
                        my_off_t seek_offset,
324
 
                        bool use_async_io,
325
 
                        bool clear_cache)
 
325
/**
 
326
 * @brief 
 
327
 *   Reset the cache
 
328
 * 
 
329
 * @detail
 
330
 *   Use this to reset cache to re-start reading or to change the type 
 
331
 *   between READ_CACHE <-> WRITE_CACHE
 
332
 *   If we are doing a reinit of a cache where we have the start of the file
 
333
 *   in the cache, we are reusing this memory without flushing it to disk.
 
334
 */
 
335
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
 
336
                                  my_off_t seek_offset,
 
337
                                  bool use_async_io,
 
338
                                  bool clear_cache)
326
339
{
327
340
  /* One can't do reinit with the following types */
328
 
  assert(type != READ_NET && info->type != READ_NET &&
329
 
              type != WRITE_NET && info->type != WRITE_NET &&
330
 
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
 
341
  assert(type_arg != READ_NET && type != READ_NET &&
 
342
              type_arg != WRITE_NET && type != WRITE_NET);
331
343
 
332
344
  /* If the whole file is in memory, avoid flushing to disk */
333
345
  if (! clear_cache &&
334
 
      seek_offset >= info->pos_in_file &&
335
 
      seek_offset <= my_b_tell(info))
 
346
      seek_offset >= pos_in_file &&
 
347
      seek_offset <= my_b_tell(this))
336
348
  {
337
349
    /* Reuse current buffer without flushing it to disk */
338
350
    unsigned char *pos;
339
 
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
351
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
340
352
    {
341
 
      info->read_end=info->write_pos;
342
 
      info->end_of_file=my_b_tell(info);
 
353
      read_end=write_pos;
 
354
      end_of_file=my_b_tell(this);
343
355
      /*
344
356
        Trigger a new seek only if we have a valid
345
357
        file handle.
346
358
      */
347
 
      info->seek_not_done= (info->file != -1);
 
359
      seek_not_done= (file != -1);
348
360
    }
349
 
    else if (type == WRITE_CACHE)
 
361
    else if (type_arg == WRITE_CACHE)
350
362
    {
351
 
      if (info->type == READ_CACHE)
 
363
      if (type == READ_CACHE)
352
364
      {
353
 
        info->write_end=info->write_buffer+info->buffer_length;
354
 
        info->seek_not_done=1;
 
365
        write_end=write_buffer+buffer_length;
 
366
        seek_not_done=1;
355
367
      }
356
 
      info->end_of_file = ~(my_off_t) 0;
 
368
      end_of_file = ~(my_off_t) 0;
357
369
    }
358
 
    pos=info->request_pos+(seek_offset-info->pos_in_file);
359
 
    if (type == WRITE_CACHE)
360
 
      info->write_pos=pos;
 
370
    pos=request_pos+(seek_offset-pos_in_file);
 
371
    if (type_arg == WRITE_CACHE)
 
372
      write_pos=pos;
361
373
    else
362
 
      info->read_pos= pos;
 
374
      read_pos= pos;
363
375
#ifdef HAVE_AIOWAIT
364
 
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
376
    my_aiowait(&aio_result);            /* Wait for outstanding req */
365
377
#endif
366
378
  }
367
379
  else
370
382
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
371
383
      after the current positions should be ignored
372
384
    */
373
 
    if (info->type == WRITE_CACHE && type == READ_CACHE)
374
 
      info->end_of_file=my_b_tell(info);
 
385
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
386
      end_of_file=my_b_tell(this);
375
387
    /* flush cache if we want to reuse it */
376
 
    if (!clear_cache && my_b_flush_io_cache(info,1))
377
 
      return(1);
378
 
    info->pos_in_file=seek_offset;
 
388
    if (!clear_cache && my_b_flush_io_cache(this, 1))
 
389
      return 1;
 
390
    pos_in_file=seek_offset;
379
391
    /* Better to do always do a seek */
380
 
    info->seek_not_done=1;
381
 
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
382
 
    if (type == READ_CACHE)
 
392
    seek_not_done=1;
 
393
    request_pos=read_pos=write_pos=buffer;
 
394
    if (type_arg == READ_CACHE)
383
395
    {
384
 
      info->read_end=info->buffer;              /* Nothing in cache */
 
396
      read_end=buffer;          /* Nothing in cache */
385
397
    }
386
398
    else
387
399
    {
388
 
      info->write_end=(info->buffer + info->buffer_length -
 
400
      write_end=(buffer + buffer_length -
389
401
                       (seek_offset & (IO_SIZE-1)));
390
 
      info->end_of_file= ~(my_off_t) 0;
 
402
      end_of_file= ~(my_off_t) 0;
391
403
    }
392
404
  }
393
 
  info->type=type;
394
 
  info->error=0;
395
 
  init_functions(info);
 
405
  type= type_arg;
 
406
  error=0;
 
407
  init_functions();
396
408
 
397
409
#ifdef HAVE_AIOWAIT
398
410
  if (use_async_io && ! my_disable_async_io &&
399
 
      ((uint32_t) info->buffer_length <
400
 
       (uint32_t) (info->end_of_file - seek_offset)))
 
411
      ((uint32_t) buffer_length <
 
412
       (uint32_t) (end_of_file - seek_offset)))
401
413
  {
402
 
    info->read_length=info->buffer_length/2;
403
 
    info->read_function=_my_b_async_read;
 
414
    read_length=buffer_length/2;
 
415
    read_function=_my_b_async_read;
404
416
  }
405
 
  info->inited=0;
 
417
  inited= 0;
406
418
#else
407
419
  (void)use_async_io;
408
420
#endif
409
 
  return(0);
 
421
  return 0;
410
422
} /* reinit_io_cache */
411
423
 
412
 
 
413
 
 
414
 
/*
415
 
  Read buffered.
416
 
 
417
 
  SYNOPSIS
418
 
    _my_b_read()
419
 
      info                      IO_CACHE pointer
420
 
      Buffer                    Buffer to retrieve count bytes from file
421
 
      Count                     Number of bytes to read into Buffer
422
 
 
423
 
  NOTE
424
 
    This function is only called from the my_b_read() macro when there
425
 
    isn't enough characters in the buffer to satisfy the request.
426
 
 
427
 
  WARNING
428
 
 
429
 
    When changing this function, be careful with handling file offsets
430
 
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
431
 
    types than my_off_t unless you can be sure that their value fits.
432
 
    Same applies to differences of file offsets.
433
 
 
434
 
    When changing this function, check _my_b_read_r(). It might need the
435
 
    same change.
436
 
 
437
 
  RETURN
438
 
    0      we succeeded in reading all data
439
 
    1      Error: can't read requested characters
440
 
*/
441
 
 
442
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
424
/**
 
425
 * @brief 
 
426
 *   Read buffered.
 
427
 * 
 
428
 * @detail
 
429
 *   This function is only called from the my_b_read() macro when there
 
430
 *   aren't enough characters in the buffer to satisfy the request.
 
431
 *
 
432
 * WARNING
 
433
 *   When changing this function, be careful with handling file offsets
 
434
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
435
 *   types than my_off_t unless you can be sure that their value fits.
 
436
 *   Same applies to differences of file offsets.
 
437
 *
 
438
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
 
439
 * from file @param Count Number of bytes to read into Buffer
 
440
 * 
 
441
 * @retval 0 We succeeded in reading all data
 
442
 * @retval 1 Error: can't read requested characters
 
443
 */
 
444
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
443
445
{
444
 
  size_t length,diff_length,left_length, max_length;
445
 
  my_off_t pos_in_file;
 
446
  size_t length_local,diff_length,left_length, max_length;
 
447
  my_off_t pos_in_file_local;
446
448
 
447
449
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
448
450
  {
453
455
  }
454
456
 
455
457
  /* pos_in_file always point on where info->buffer was read */
456
 
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
458
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
457
459
 
458
460
  /*
459
 
    Whenever a function which operates on IO_CACHE flushes/writes
460
 
    some part of the IO_CACHE to disk it will set the property
 
461
    Whenever a function which operates on st_io_cache flushes/writes
 
462
    some part of the st_io_cache to disk it will set the property
461
463
    "seek_not_done" to indicate this to other functions operating
462
 
    on the IO_CACHE.
 
464
    on the st_io_cache.
463
465
  */
464
466
  if (info->seek_not_done)
465
467
  {
466
 
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
 
468
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
467
469
    {
468
470
      /* No error, reset seek_not_done flag. */
469
471
      info->seek_not_done= 0;
481
483
    }
482
484
  }
483
485
 
484
 
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
486
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
485
487
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
486
488
  {                                     /* Fill first intern buffer */
487
489
    size_t read_length;
488
 
    if (info->end_of_file <= pos_in_file)
 
490
    if (info->end_of_file <= pos_in_file_local)
489
491
    {                                   /* End of file */
490
492
      info->error= (int) left_length;
491
493
      return(1);
492
494
    }
493
 
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
494
 
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
495
 
        != length)
 
495
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
496
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
496
497
    {
497
498
      info->error= (read_length == (size_t) -1 ? -1 :
498
499
                    (int) (read_length+left_length));
499
500
      return(1);
500
501
    }
501
 
    Count-=length;
502
 
    Buffer+=length;
503
 
    pos_in_file+=length;
504
 
    left_length+=length;
 
502
    Count-= length_local;
 
503
    Buffer+= length_local;
 
504
    pos_in_file_local+= length_local;
 
505
    left_length+= length_local;
505
506
    diff_length=0;
506
507
  }
507
508
 
508
509
  max_length= info->read_length-diff_length;
509
510
  if (info->type != READ_FIFO &&
510
 
      max_length > (info->end_of_file - pos_in_file))
511
 
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
511
      max_length > (info->end_of_file - pos_in_file_local))
 
512
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
512
513
  if (!max_length)
513
514
  {
514
515
    if (Count)
516
517
      info->error= static_cast<int>(left_length);       /* We only got this many char */
517
518
      return(1);
518
519
    }
519
 
    length=0;                           /* Didn't read any chars */
 
520
     length_local=0;                            /* Didn't read any chars */
520
521
  }
521
 
  else if ((length= my_read(info->file,info->buffer, max_length,
 
522
  else if (( length_local= my_read(info->file,info->buffer, max_length,
522
523
                            info->myflags)) < Count ||
523
 
           length == (size_t) -1)
 
524
            length_local == (size_t) -1)
524
525
  {
525
 
    if (length != (size_t) -1)
526
 
      memcpy(Buffer, info->buffer, length);
527
 
    info->pos_in_file= pos_in_file;
528
 
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
 
526
    if ( length_local != (size_t) -1)
 
527
      memcpy(Buffer, info->buffer,  length_local);
 
528
    info->pos_in_file= pos_in_file_local;
 
529
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
529
530
    info->read_pos=info->read_end=info->buffer;
530
531
    return(1);
531
532
  }
532
533
  info->read_pos=info->buffer+Count;
533
 
  info->read_end=info->buffer+length;
534
 
  info->pos_in_file=pos_in_file;
 
534
  info->read_end=info->buffer+ length_local;
 
535
  info->pos_in_file=pos_in_file_local;
535
536
  memcpy(Buffer, info->buffer, Count);
536
537
  return(0);
537
538
}
538
539
 
539
540
 
540
 
/*
541
 
  Prepare IO_CACHE for shared use.
542
 
 
543
 
  SYNOPSIS
544
 
    init_io_cache_share()
545
 
      read_cache                A read cache. This will be copied for
546
 
                                every thread after setup.
547
 
      cshare                    The share.
548
 
      write_cache               If non-NULL a write cache that is to be
549
 
                                synchronized with the read caches.
550
 
      num_threads               Number of threads sharing the cache
551
 
                                including the write thread if any.
552
 
 
553
 
  DESCRIPTION
554
 
 
555
 
    The shared cache is used so: One IO_CACHE is initialized with
556
 
    init_io_cache(). This includes the allocation of a buffer. Then a
557
 
    share is allocated and init_io_cache_share() is called with the io
558
 
    cache and the share. Then the io cache is copied for each thread. So
559
 
    every thread has its own copy of IO_CACHE. But the allocated buffer
560
 
    is shared because cache->buffer is the same for all caches.
561
 
 
562
 
    One thread reads data from the file into the buffer. All threads
563
 
    read from the buffer, but every thread maintains its own set of
564
 
    pointers into the buffer. When all threads have used up the buffer
565
 
    contents, one of the threads reads the next block of data into the
566
 
    buffer. To accomplish this, each thread enters the cache lock before
567
 
    accessing the buffer. They wait in lock_io_cache() until all threads
568
 
    joined the lock. The last thread entering the lock is in charge of
569
 
    reading from file to buffer. It wakes all threads when done.
570
 
 
571
 
    Synchronizing a write cache to the read caches works so: Whenever
572
 
    the write buffer needs a flush, the write thread enters the lock and
573
 
    waits for all other threads to enter the lock too. They do this when
574
 
    they have used up the read buffer. When all threads are in the lock,
575
 
    the write thread copies the write buffer to the read buffer and
576
 
    wakes all threads.
577
 
 
578
 
    share->running_threads is the number of threads not being in the
579
 
    cache lock. When entering lock_io_cache() the number is decreased.
580
 
    When the thread that fills the buffer enters unlock_io_cache() the
581
 
    number is reset to the number of threads. The condition
582
 
    running_threads == 0 means that all threads are in the lock. Bumping
583
 
    up the number to the full count is non-intuitive. But increasing the
584
 
    number by one for each thread that leaves the lock could lead to a
585
 
    solo run of one thread. The last thread to join a lock reads from
586
 
    file to buffer, wakes the other threads, processes the data in the
587
 
    cache and enters the lock again. If no other thread left the lock
588
 
    meanwhile, it would think it's the last one again and read the next
589
 
    block...
590
 
 
591
 
    The share has copies of 'error', 'buffer', 'read_end', and
592
 
    'pos_in_file' from the thread that filled the buffer. We may not be
593
 
    able to access this information directly from its cache because the
594
 
    thread may be removed from the share before the variables could be
595
 
    copied by all other threads. Or, if a write buffer is synchronized,
596
 
    it would change its 'pos_in_file' after waking the other threads,
597
 
    possibly before they could copy its value.
598
 
 
599
 
    However, the 'buffer' variable in the share is for a synchronized
600
 
    write cache. It needs to know where to put the data. Otherwise it
601
 
    would need access to the read cache of one of the threads that is
602
 
    not yet removed from the share.
603
 
 
604
 
  RETURN
605
 
    void
606
 
*/
607
 
 
608
 
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
609
 
                         IO_CACHE *write_cache, uint32_t num_threads)
610
 
{
611
 
  assert(num_threads > 1);
612
 
  assert(read_cache->type == READ_CACHE);
613
 
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
614
 
 
615
 
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
616
 
  pthread_cond_init(&cshare->cond, 0);
617
 
  pthread_cond_init(&cshare->cond_writer, 0);
618
 
 
619
 
  cshare->running_threads= num_threads;
620
 
  cshare->total_threads=   num_threads;
621
 
  cshare->error=           0;    /* Initialize. */
622
 
  cshare->buffer=          read_cache->buffer;
623
 
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
624
 
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
625
 
  cshare->source_cache=    write_cache; /* Can be NULL. */
626
 
 
627
 
  read_cache->share=         cshare;
628
 
  read_cache->read_function= _my_b_read_r;
629
 
  read_cache->current_pos=   NULL;
630
 
  read_cache->current_end=   NULL;
631
 
 
632
 
  if (write_cache)
633
 
    write_cache->share= cshare;
634
 
 
635
 
  return;
636
 
}
637
 
 
638
 
 
639
 
/*
640
 
  Remove a thread from shared access to IO_CACHE.
641
 
 
642
 
  SYNOPSIS
643
 
    remove_io_thread()
644
 
      cache                     The IO_CACHE to be removed from the share.
645
 
 
646
 
  NOTE
647
 
 
648
 
    Every thread must do that on exit for not to deadlock other threads.
649
 
 
650
 
    The last thread destroys the pthread resources.
651
 
 
652
 
    A writer flushes its cache first.
653
 
 
654
 
  RETURN
655
 
    void
656
 
*/
657
 
 
658
 
void remove_io_thread(IO_CACHE *cache)
659
 
{
660
 
  IO_CACHE_SHARE *cshare= cache->share;
661
 
  uint32_t total;
662
 
 
663
 
  /* If the writer goes, it needs to flush the write cache. */
664
 
  if (cache == cshare->source_cache)
665
 
    flush_io_cache(cache);
666
 
 
667
 
  pthread_mutex_lock(&cshare->mutex);
668
 
 
669
 
  /* Remove from share. */
670
 
  total= --cshare->total_threads;
671
 
 
672
 
  /* Detach from share. */
673
 
  cache->share= NULL;
674
 
 
675
 
  /* If the writer goes, let the readers know. */
676
 
  if (cache == cshare->source_cache)
677
 
  {
678
 
    cshare->source_cache= NULL;
679
 
  }
680
 
 
681
 
  /* If all threads are waiting for me to join the lock, wake them. */
682
 
  if (!--cshare->running_threads)
683
 
  {
684
 
    pthread_cond_signal(&cshare->cond_writer);
685
 
    pthread_cond_broadcast(&cshare->cond);
686
 
  }
687
 
 
688
 
  pthread_mutex_unlock(&cshare->mutex);
689
 
 
690
 
  if (!total)
691
 
  {
692
 
    pthread_cond_destroy (&cshare->cond_writer);
693
 
    pthread_cond_destroy (&cshare->cond);
694
 
    pthread_mutex_destroy(&cshare->mutex);
695
 
  }
696
 
 
697
 
  return;
698
 
}
699
 
 
700
 
 
701
 
/*
702
 
  Lock IO cache and wait for all other threads to join.
703
 
 
704
 
  SYNOPSIS
705
 
    lock_io_cache()
706
 
      cache                     The cache of the thread entering the lock.
707
 
      pos                       File position of the block to read.
708
 
                                Unused for the write thread.
709
 
 
710
 
  DESCRIPTION
711
 
 
712
 
    Wait for all threads to finish with the current buffer. We want
713
 
    all threads to proceed in concert. The last thread to join
714
 
    lock_io_cache() will read the block from file and all threads start
715
 
    to use it. Then they will join again for reading the next block.
716
 
 
717
 
    The waiting threads detect a fresh buffer by comparing
718
 
    cshare->pos_in_file with the position they want to process next.
719
 
    Since the first block may start at position 0, we take
720
 
    cshare->read_end as an additional condition. This variable is
721
 
    initialized to NULL and will be set after a block of data is written
722
 
    to the buffer.
723
 
 
724
 
  RETURN
725
 
    1           OK, lock in place, go ahead and read.
726
 
    0           OK, unlocked, another thread did the read.
727
 
*/
728
 
 
729
 
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
730
 
{
731
 
  IO_CACHE_SHARE *cshare= cache->share;
732
 
 
733
 
  /* Enter the lock. */
734
 
  pthread_mutex_lock(&cshare->mutex);
735
 
  cshare->running_threads--;
736
 
 
737
 
  if (cshare->source_cache)
738
 
  {
739
 
    /* A write cache is synchronized to the read caches. */
740
 
 
741
 
    if (cache == cshare->source_cache)
742
 
    {
743
 
      /* The writer waits until all readers are here. */
744
 
      while (cshare->running_threads)
745
 
      {
746
 
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
747
 
      }
748
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
749
 
      return(1);
750
 
    }
751
 
 
752
 
    /* The last thread wakes the writer. */
753
 
    if (!cshare->running_threads)
754
 
    {
755
 
      pthread_cond_signal(&cshare->cond_writer);
756
 
    }
757
 
 
758
 
    /*
759
 
      Readers wait until the data is copied from the writer. Another
760
 
      reason to stop waiting is the removal of the write thread. If this
761
 
      happens, we leave the lock with old data in the buffer.
762
 
    */
763
 
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
764
 
           cshare->source_cache)
765
 
    {
766
 
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
767
 
    }
768
 
 
769
 
    /*
770
 
      If the writer was removed from the share while this thread was
771
 
      asleep, we need to simulate an EOF condition. The writer cannot
772
 
      reset the share variables as they might still be in use by readers
773
 
      of the last block. When we awake here then because the last
774
 
      joining thread signalled us. If the writer is not the last, it
775
 
      will not signal. So it is safe to clear the buffer here.
776
 
    */
777
 
    if (!cshare->read_end || (cshare->pos_in_file < pos))
778
 
    {
779
 
      cshare->read_end= cshare->buffer; /* Empty buffer. */
780
 
      cshare->error= 0; /* EOF is not an error. */
781
 
    }
782
 
  }
783
 
  else
784
 
  {
785
 
    /*
786
 
      There are read caches only. The last thread arriving in
787
 
      lock_io_cache() continues with a locked cache and reads the block.
788
 
    */
789
 
    if (!cshare->running_threads)
790
 
    {
791
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
792
 
      return(1);
793
 
    }
794
 
 
795
 
    /*
796
 
      All other threads wait until the requested block is read by the
797
 
      last thread arriving. Another reason to stop waiting is the
798
 
      removal of a thread. If this leads to all threads being in the
799
 
      lock, we have to continue also. The first of the awaken threads
800
 
      will then do the read.
801
 
    */
802
 
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
803
 
           cshare->running_threads)
804
 
    {
805
 
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
806
 
    }
807
 
 
808
 
    /* If the block is not yet read, continue with a locked cache and read. */
809
 
    if (!cshare->read_end || (cshare->pos_in_file < pos))
810
 
    {
811
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
812
 
      return(1);
813
 
    }
814
 
 
815
 
    /* Another thread did read the block already. */
816
 
  }
817
 
 
818
 
  /*
819
 
    Leave the lock. Do not call unlock_io_cache() later. The thread that
820
 
    filled the buffer did this and marked all threads as running.
821
 
  */
822
 
  pthread_mutex_unlock(&cshare->mutex);
823
 
  return(0);
824
 
}
825
 
 
826
 
 
827
 
/*
828
 
  Unlock IO cache.
829
 
 
830
 
  SYNOPSIS
831
 
    unlock_io_cache()
832
 
      cache                     The cache of the thread leaving the lock.
833
 
 
834
 
  NOTE
835
 
    This is called by the thread that filled the buffer. It marks all
836
 
    threads as running and awakes them. This must not be done by any
837
 
    other thread.
838
 
 
839
 
    Do not signal cond_writer. Either there is no writer or the writer
840
 
    is the only one who can call this function.
841
 
 
842
 
    The reason for resetting running_threads to total_threads before
843
 
    waking all other threads is that it could be possible that this
844
 
    thread is so fast with processing the buffer that it enters the lock
845
 
    before even one other thread has left it. If every awoken thread
846
 
    would increase running_threads by one, this thread could think that
847
 
    he is again the last to join and would not wait for the other
848
 
    threads to process the data.
849
 
 
850
 
  RETURN
851
 
    void
852
 
*/
853
 
 
854
 
static void unlock_io_cache(IO_CACHE *cache)
855
 
{
856
 
  IO_CACHE_SHARE *cshare= cache->share;
857
 
 
858
 
  cshare->running_threads= cshare->total_threads;
859
 
  pthread_cond_broadcast(&cshare->cond);
860
 
  pthread_mutex_unlock(&cshare->mutex);
861
 
  return;
862
 
}
863
 
 
864
 
 
865
 
/*
866
 
  Read from IO_CACHE when it is shared between several threads.
867
 
 
868
 
  SYNOPSIS
869
 
    _my_b_read_r()
870
 
      cache                     IO_CACHE pointer
871
 
      Buffer                    Buffer to retrieve count bytes from file
872
 
      Count                     Number of bytes to read into Buffer
873
 
 
874
 
  NOTE
875
 
    This function is only called from the my_b_read() macro when there
876
 
    isn't enough characters in the buffer to satisfy the request.
877
 
 
878
 
  IMPLEMENTATION
879
 
 
880
 
    It works as follows: when a thread tries to read from a file (that
881
 
    is, after using all the data from the (shared) buffer), it just
882
 
    hangs on lock_io_cache(), waiting for other threads. When the very
883
 
    last thread attempts a read, lock_io_cache() returns 1, the thread
884
 
    does actual IO and unlock_io_cache(), which signals all the waiting
885
 
    threads that data is in the buffer.
886
 
 
887
 
  WARNING
888
 
 
889
 
    When changing this function, be careful with handling file offsets
890
 
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
891
 
    types than my_off_t unless you can be sure that their value fits.
892
 
    Same applies to differences of file offsets. (Bug #11527)
893
 
 
894
 
    When changing this function, check _my_b_read(). It might need the
895
 
    same change.
896
 
 
897
 
  RETURN
898
 
    0      we succeeded in reading all data
899
 
    1      Error: can't read requested characters
900
 
*/
901
 
 
902
 
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
903
 
{
904
 
  my_off_t pos_in_file;
905
 
  size_t length, diff_length, left_length;
906
 
  IO_CACHE_SHARE *cshare= cache->share;
907
 
 
908
 
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
909
 
  {
910
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
911
 
    memcpy(Buffer, cache->read_pos, left_length);
912
 
    Buffer+= left_length;
913
 
    Count-= left_length;
914
 
  }
915
 
  while (Count)
916
 
  {
917
 
    size_t cnt, len;
918
 
 
919
 
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
920
 
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
921
 
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
922
 
    length= ((length <= cache->read_length) ?
923
 
             length + IO_ROUND_DN(cache->read_length - length) :
924
 
             length - IO_ROUND_UP(length - cache->read_length));
925
 
    if (cache->type != READ_FIFO &&
926
 
        (length > (cache->end_of_file - pos_in_file)))
927
 
      length= (size_t) (cache->end_of_file - pos_in_file);
928
 
    if (length == 0)
929
 
    {
930
 
      cache->error= (int) left_length;
931
 
      return(1);
932
 
    }
933
 
    if (lock_io_cache(cache, pos_in_file))
934
 
    {
935
 
      /* With a synchronized write/read cache we won't come here... */
936
 
      assert(!cshare->source_cache);
937
 
      /*
938
 
        ... unless the writer has gone before this thread entered the
939
 
        lock. Simulate EOF in this case. It can be distinguished by
940
 
        cache->file.
941
 
      */
942
 
      if (cache->file < 0)
943
 
        len= 0;
944
 
      else
945
 
      {
946
 
        /*
947
 
          Whenever a function which operates on IO_CACHE flushes/writes
948
 
          some part of the IO_CACHE to disk it will set the property
949
 
          "seek_not_done" to indicate this to other functions operating
950
 
          on the IO_CACHE.
951
 
        */
952
 
        if (cache->seek_not_done)
953
 
        {
954
 
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
955
 
          {
956
 
            cache->error= -1;
957
 
            unlock_io_cache(cache);
958
 
            return(1);
959
 
          }
960
 
        }
961
 
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
962
 
      }
963
 
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
964
 
      cache->error=       (len == length ? 0 : (int) len);
965
 
      cache->pos_in_file= pos_in_file;
966
 
 
967
 
      /* Copy important values to the share. */
968
 
      cshare->error=       cache->error;
969
 
      cshare->read_end=    cache->read_end;
970
 
      cshare->pos_in_file= pos_in_file;
971
 
 
972
 
      /* Mark all threads as running and wake them. */
973
 
      unlock_io_cache(cache);
974
 
    }
975
 
    else
976
 
    {
977
 
      /*
978
 
        With a synchronized write/read cache readers always come here.
979
 
        Copy important values from the share.
980
 
      */
981
 
      cache->error=       cshare->error;
982
 
      cache->read_end=    cshare->read_end;
983
 
      cache->pos_in_file= cshare->pos_in_file;
984
 
 
985
 
      len= ((cache->error == -1) ? (size_t) -1 :
986
 
            (size_t) (cache->read_end - cache->buffer));
987
 
    }
988
 
    cache->read_pos=      cache->buffer;
989
 
    cache->seek_not_done= 0;
990
 
    if (len == 0 || len == (size_t) -1)
991
 
    {
992
 
      cache->error= (int) left_length;
993
 
      return(1);
994
 
    }
995
 
    cnt= (len > Count) ? Count : len;
996
 
    memcpy(Buffer, cache->read_pos, cnt);
997
 
    Count -= cnt;
998
 
    Buffer+= cnt;
999
 
    left_length+= cnt;
1000
 
    cache->read_pos+= cnt;
1001
 
  }
1002
 
  return(0);
1003
 
}
1004
 
 
1005
 
 
1006
 
/*
1007
 
  Copy data from write cache to read cache.
1008
 
 
1009
 
  SYNOPSIS
1010
 
    copy_to_read_buffer()
1011
 
      write_cache               The write cache.
1012
 
      write_buffer              The source of data, mostly the cache buffer.
1013
 
      write_length              The number of bytes to copy.
1014
 
 
1015
 
  NOTE
1016
 
    The write thread will wait for all read threads to join the cache
1017
 
    lock. Then it copies the data over and wakes the read threads.
1018
 
 
1019
 
  RETURN
1020
 
    void
1021
 
*/
1022
 
 
1023
 
static void copy_to_read_buffer(IO_CACHE *write_cache,
1024
 
                                const unsigned char *write_buffer, size_t write_length)
1025
 
{
1026
 
  IO_CACHE_SHARE *cshare= write_cache->share;
1027
 
 
1028
 
  assert(cshare->source_cache == write_cache);
1029
 
  /*
1030
 
    write_length is usually less or equal to buffer_length.
1031
 
    It can be bigger if _my_b_write() is called with a big length.
1032
 
  */
1033
 
  while (write_length)
1034
 
  {
1035
 
    size_t copy_length= min(write_length, write_cache->buffer_length);
1036
 
    int  rc;
1037
 
 
1038
 
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1039
 
    /* The writing thread does always have the lock when it awakes. */
1040
 
    assert(rc);
1041
 
 
1042
 
    memcpy(cshare->buffer, write_buffer, copy_length);
1043
 
 
1044
 
    cshare->error=       0;
1045
 
    cshare->read_end=    cshare->buffer + copy_length;
1046
 
    cshare->pos_in_file= write_cache->pos_in_file;
1047
 
 
1048
 
    /* Mark all threads as running and wake them. */
1049
 
    unlock_io_cache(write_cache);
1050
 
 
1051
 
    write_buffer+= copy_length;
1052
 
    write_length-= copy_length;
1053
 
  }
1054
 
}
1055
 
 
1056
 
 
1057
 
/*
1058
 
  Do sequential read from the SEQ_READ_APPEND cache.
1059
 
 
1060
 
  We do this in three stages:
1061
 
   - first read from info->buffer
1062
 
   - then if there are still data to read, try the file descriptor
1063
 
   - afterwards, if there are still data to read, try append buffer
1064
 
 
1065
 
  RETURNS
1066
 
    0  Success
1067
 
    1  Failed to read
1068
 
*/
1069
 
 
1070
 
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1071
 
{
1072
 
  size_t length, diff_length, left_length, save_count, max_length;
1073
 
  my_off_t pos_in_file;
1074
 
  save_count=Count;
1075
 
 
1076
 
  /* first, read the regular buffer */
1077
 
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1078
 
  {
1079
 
    assert(Count > left_length);        /* User is not using my_b_read() */
1080
 
    memcpy(Buffer,info->read_pos, left_length);
1081
 
    Buffer+=left_length;
1082
 
    Count-=left_length;
1083
 
  }
1084
 
  lock_append_buffer(info);
1085
 
 
1086
 
  /* pos_in_file always point on where info->buffer was read */
1087
 
  if ((pos_in_file=info->pos_in_file +
1088
 
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1089
 
    goto read_append_buffer;
1090
 
 
1091
 
  /*
1092
 
    With read-append cache we must always do a seek before we read,
1093
 
    because the write could have moved the file pointer astray
1094
 
  */
1095
 
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1096
 
  {
1097
 
   info->error= -1;
1098
 
   unlock_append_buffer(info);
1099
 
   return (1);
1100
 
  }
1101
 
  info->seek_not_done=0;
1102
 
 
1103
 
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1104
 
 
1105
 
  /* now the second stage begins - read from file descriptor */
1106
 
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1107
 
  {
1108
 
    /* Fill first intern buffer */
1109
 
    size_t read_length;
1110
 
 
1111
 
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1112
 
    if ((read_length= my_read(info->file,Buffer, length,
1113
 
                              info->myflags)) == (size_t) -1)
1114
 
    {
1115
 
      info->error= -1;
1116
 
      unlock_append_buffer(info);
1117
 
      return 1;
1118
 
    }
1119
 
    Count-=read_length;
1120
 
    Buffer+=read_length;
1121
 
    pos_in_file+=read_length;
1122
 
 
1123
 
    if (read_length != length)
1124
 
    {
1125
 
      /*
1126
 
        We only got part of data;  Read the rest of the data from the
1127
 
        write buffer
1128
 
      */
1129
 
      goto read_append_buffer;
1130
 
    }
1131
 
    left_length+=length;
1132
 
    diff_length=0;
1133
 
  }
1134
 
 
1135
 
  max_length= info->read_length-diff_length;
1136
 
  if (max_length > (info->end_of_file - pos_in_file))
1137
 
    max_length= (size_t) (info->end_of_file - pos_in_file);
1138
 
  if (!max_length)
1139
 
  {
1140
 
    if (Count)
1141
 
      goto read_append_buffer;
1142
 
    length=0;                           /* Didn't read any more chars */
1143
 
  }
1144
 
  else
1145
 
  {
1146
 
    length= my_read(info->file,info->buffer, max_length, info->myflags);
1147
 
    if (length == (size_t) -1)
1148
 
    {
1149
 
      info->error= -1;
1150
 
      unlock_append_buffer(info);
1151
 
      return 1;
1152
 
    }
1153
 
    if (length < Count)
1154
 
    {
1155
 
      memcpy(Buffer, info->buffer, length);
1156
 
      Count -= length;
1157
 
      Buffer += length;
1158
 
 
1159
 
      /*
1160
 
         added the line below to make
1161
 
         assert(pos_in_file==info->end_of_file) pass.
1162
 
         otherwise this does not appear to be needed
1163
 
      */
1164
 
      pos_in_file += length;
1165
 
      goto read_append_buffer;
1166
 
    }
1167
 
  }
1168
 
  unlock_append_buffer(info);
1169
 
  info->read_pos=info->buffer+Count;
1170
 
  info->read_end=info->buffer+length;
1171
 
  info->pos_in_file=pos_in_file;
1172
 
  memcpy(Buffer,info->buffer,(size_t) Count);
1173
 
  return 0;
1174
 
 
1175
 
read_append_buffer:
1176
 
 
1177
 
  /*
1178
 
     Read data from the current write buffer.
1179
 
     Count should never be == 0 here (The code will work even if count is 0)
1180
 
  */
1181
 
 
1182
 
  {
1183
 
    /* First copy the data to Count */
1184
 
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1185
 
    size_t copy_len;
1186
 
    size_t transfer_len;
1187
 
 
1188
 
    assert(info->append_read_pos <= info->write_pos);
1189
 
    /*
1190
 
      TODO: figure out if the assert below is needed or correct.
1191
 
    */
1192
 
    assert(pos_in_file == info->end_of_file);
1193
 
    copy_len=min(Count, len_in_buff);
1194
 
    memcpy(Buffer, info->append_read_pos, copy_len);
1195
 
    info->append_read_pos += copy_len;
1196
 
    Count -= copy_len;
1197
 
    if (Count)
1198
 
      info->error = static_cast<int>(save_count - Count);
1199
 
 
1200
 
    /* Fill read buffer with data from write buffer */
1201
 
    memcpy(info->buffer, info->append_read_pos,
1202
 
           (size_t) (transfer_len=len_in_buff - copy_len));
1203
 
    info->read_pos= info->buffer;
1204
 
    info->read_end= info->buffer+transfer_len;
1205
 
    info->append_read_pos=info->write_pos;
1206
 
    info->pos_in_file=pos_in_file+copy_len;
1207
 
    info->end_of_file+=len_in_buff;
1208
 
  }
1209
 
  unlock_append_buffer(info);
1210
 
  return Count ? 1 : 0;
1211
 
}
1212
 
 
1213
 
 
1214
541
#ifdef HAVE_AIOWAIT
1215
542
 
1216
 
/*
1217
 
  Read from the IO_CACHE into a buffer and feed asynchronously
1218
 
  from disk when needed.
1219
 
 
1220
 
  SYNOPSIS
1221
 
    _my_b_async_read()
1222
 
      info                      IO_CACHE pointer
1223
 
      Buffer                    Buffer to retrieve count bytes from file
1224
 
      Count                     Number of bytes to read into Buffer
1225
 
 
1226
 
  RETURN VALUE
1227
 
    -1          An error has occurred; errno is set.
1228
 
     0          Success
1229
 
     1          An error has occurred; IO_CACHE to error state.
1230
 
*/
1231
 
 
1232
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
543
/**
 
544
 * @brief
 
545
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
 
546
 *
 
547
 * @param info st_io_cache pointer
 
548
 * @param Buffer Buffer to retrieve count bytes from file
 
549
 * @param Count Number of bytes to read into Buffer
 
550
 * 
 
551
 * @retval -1 An error has occurred; errno is set.
 
552
 * @retval 0 Success
 
553
 * @retval 1 An error has occurred; st_io_cache to error state.
 
554
 */
 
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
1233
556
{
1234
 
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
 
557
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1235
558
  size_t max_length;
1236
559
  my_off_t next_pos_in_file;
1237
560
  unsigned char *read_buffer;
1292
615
      }
1293
616
    }
1294
617
        /* Copy found bytes to buffer */
1295
 
    length=min(Count,read_length);
1296
 
    memcpy(Buffer,info->read_pos,(size_t) length);
1297
 
    Buffer+=length;
1298
 
    Count-=length;
1299
 
    left_length+=length;
 
618
    length_local=min(Count,read_length);
 
619
    memcpy(Buffer,info->read_pos,(size_t) length_local);
 
620
    Buffer+=length_local;
 
621
    Count-=length_local;
 
622
    left_length+=length_local;
1300
623
    info->read_end=info->rc_pos+read_length;
1301
 
    info->read_pos+=length;
 
624
    info->read_pos+=length_local;
1302
625
  }
1303
626
  else
1304
627
    next_pos_in_file=(info->pos_in_file+ (size_t)
1392
715
#endif
1393
716
 
1394
717
 
1395
 
/* Read one byte when buffer is empty */
1396
 
 
1397
 
int _my_b_get(IO_CACHE *info)
 
718
/**
 
719
 * @brief
 
720
 *   Read one byte when buffer is empty
 
721
 */
 
722
int _my_b_get(st_io_cache *info)
1398
723
{
1399
724
  unsigned char buff;
1400
725
  IO_CACHE_CALLBACK pre_read,post_read;
1407
732
  return (int) (unsigned char) buff;
1408
733
}
1409
734
 
1410
 
/*
1411
 
   Write a byte buffer to IO_CACHE and flush to disk
1412
 
   if IO_CACHE is full.
1413
 
 
1414
 
   RETURN VALUE
1415
 
    1 On error on write
1416
 
    0 On success
1417
 
   -1 On error; errno contains error code.
1418
 
*/
1419
 
 
1420
 
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
735
/**
 
736
 * @brief
 
737
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
 
738
 *
 
739
 * @retval -1 On error; errno contains error code.
 
740
 * @retval 0 On success
 
741
 * @retval 1 On error on write
 
742
 */
 
743
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
1421
744
{
1422
 
  size_t rest_length,length;
 
745
  size_t rest_length,length_local;
1423
746
 
1424
747
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1425
748
  {
1426
 
    errno=errno=EFBIG;
 
749
    errno=EFBIG;
1427
750
    return info->error = -1;
1428
751
  }
1429
752
 
1437
760
    return 1;
1438
761
  if (Count >= IO_SIZE)
1439
762
  {                                     /* Fill first intern buffer */
1440
 
    length=Count & (size_t) ~(IO_SIZE-1);
 
763
    length_local=Count & (size_t) ~(IO_SIZE-1);
1441
764
    if (info->seek_not_done)
1442
765
    {
1443
766
      /*
1444
 
        Whenever a function which operates on IO_CACHE flushes/writes
1445
 
        some part of the IO_CACHE to disk it will set the property
 
767
        Whenever a function which operates on st_io_cache flushes/writes
 
768
        some part of the st_io_cache to disk it will set the property
1446
769
        "seek_not_done" to indicate this to other functions operating
1447
 
        on the IO_CACHE.
 
770
        on the st_io_cache.
1448
771
      */
1449
772
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
1450
773
      {
1453
776
      }
1454
777
      info->seek_not_done=0;
1455
778
    }
1456
 
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
779
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
1457
780
      return info->error= -1;
1458
781
 
1459
 
    /*
1460
 
      In case of a shared I/O cache with a writer we normally do direct
1461
 
      write cache to read cache copy. Simulate this here by direct
1462
 
      caller buffer to read cache copy. Do it after the write so that
1463
 
      the cache readers actions on the flushed part can go in parallel
1464
 
      with the write of the extra stuff. copy_to_read_buffer()
1465
 
      synchronizes writer and readers so that after this call the
1466
 
      readers can act on the extra stuff while the writer can go ahead
1467
 
      and prepare the next output. copy_to_read_buffer() relies on
1468
 
      info->pos_in_file.
1469
 
    */
1470
 
    if (info->share)
1471
 
      copy_to_read_buffer(info, Buffer, length);
1472
 
 
1473
 
    Count-=length;
1474
 
    Buffer+=length;
1475
 
    info->pos_in_file+=length;
 
782
    Count-=length_local;
 
783
    Buffer+=length_local;
 
784
    info->pos_in_file+=length_local;
1476
785
  }
1477
786
  memcpy(info->write_pos,Buffer,(size_t) Count);
1478
787
  info->write_pos+=Count;
1479
788
  return 0;
1480
789
}
1481
790
 
1482
 
/*
1483
 
  Write a block to disk where part of the data may be inside the record
1484
 
  buffer.  As all write calls to the data goes through the cache,
1485
 
  we will never get a seek over the end of the buffer
1486
 
*/
1487
 
 
1488
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
791
/**
 
792
 * @brief
 
793
 *   Write a block to disk where part of the data may be inside the record buffer.
 
794
 *   As all write calls to the data goes through the cache,
 
795
 *   we will never get a seek over the end of the buffer.
 
796
 */
 
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
1489
798
                   my_off_t pos)
1490
799
{
1491
 
  size_t length;
 
800
  size_t length_local;
1492
801
  int error=0;
1493
802
 
1494
 
  /*
1495
 
    Assert that we cannot come here with a shared cache. If we do one
1496
 
    day, we might need to add a call to copy_to_read_buffer().
1497
 
  */
1498
 
  assert(!info->share);
1499
 
 
1500
803
  if (pos < info->pos_in_file)
1501
804
  {
1502
805
    /* Of no overlap, write everything without buffering */
1503
806
    if (pos + Count <= info->pos_in_file)
1504
807
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1505
808
    /* Write the part of the block that is before buffer */
1506
 
    length= (uint32_t) (info->pos_in_file - pos);
1507
 
    if (pwrite(info->file, Buffer, length, pos) == 0)
 
809
    length_local= (uint32_t) (info->pos_in_file - pos);
 
810
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
1508
811
      info->error= error= -1;
1509
 
    Buffer+=length;
1510
 
    pos+=  length;
1511
 
    Count-= length;
 
812
    Buffer+=length_local;
 
813
    pos+=  length_local;
 
814
    Count-= length_local;
1512
815
  }
1513
816
 
1514
817
  /* Check if we want to write inside the used part of the buffer.*/
1515
 
  length= (size_t) (info->write_end - info->buffer);
1516
 
  if (pos < info->pos_in_file + length)
 
818
  length_local= (size_t) (info->write_end - info->buffer);
 
819
  if (pos < info->pos_in_file + length_local)
1517
820
  {
1518
821
    size_t offset= (size_t) (pos - info->pos_in_file);
1519
 
    length-=offset;
1520
 
    if (length > Count)
1521
 
      length=Count;
1522
 
    memcpy(info->buffer+offset, Buffer, length);
1523
 
    Buffer+=length;
1524
 
    Count-= length;
1525
 
    /* Fix length of buffer if the new data was larger */
1526
 
    if (info->buffer+length > info->write_pos)
1527
 
      info->write_pos=info->buffer+length;
 
822
    length_local-=offset;
 
823
    if (length_local > Count)
 
824
      length_local=Count;
 
825
    memcpy(info->buffer+offset, Buffer, length_local);
 
826
    Buffer+=length_local;
 
827
    Count-= length_local;
 
828
    /* Fix length_local of buffer if the new data was larger */
 
829
    if (info->buffer+length_local > info->write_pos)
 
830
      info->write_pos=info->buffer+length_local;
1528
831
    if (!Count)
1529
832
      return (error);
1530
833
  }
1534
837
  return error;
1535
838
}
1536
839
 
1537
 
 
1538
 
        /* Flush write cache */
1539
 
 
1540
 
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1541
 
  lock_append_buffer(info);
1542
 
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1543
 
  unlock_append_buffer(info);
1544
 
 
1545
 
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
 
840
/**
 
841
 * @brief
 
842
 *   Flush write cache 
 
843
 */
 
844
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
1546
845
{
1547
 
  size_t length;
1548
 
  bool append_cache;
1549
 
  my_off_t pos_in_file;
1550
 
 
1551
 
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1552
 
    need_append_buffer_lock=0;
 
846
  size_t length_local;
 
847
  bool append_cache= false;
 
848
  my_off_t pos_in_file_local;
1553
849
 
1554
850
  if (info->type == WRITE_CACHE || append_cache)
1555
851
  {
1556
852
    if (info->file == -1)
1557
853
    {
1558
 
      if (real_open_cached_file(info))
 
854
      if (info->real_open_cached_file())
1559
855
        return((info->error= -1));
1560
856
    }
1561
 
    LOCK_APPEND_BUFFER;
 
857
    lock_append_buffer(info, need_append_buffer_lock);
1562
858
 
1563
 
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
 
859
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
1564
860
    {
1565
 
      /*
1566
 
        In case of a shared I/O cache with a writer we do direct write
1567
 
        cache to read cache copy. Do it before the write here so that
1568
 
        the readers can work in parallel with the write.
1569
 
        copy_to_read_buffer() relies on info->pos_in_file.
1570
 
      */
1571
 
      if (info->share)
1572
 
        copy_to_read_buffer(info, info->write_buffer, length);
1573
 
 
1574
 
      pos_in_file=info->pos_in_file;
 
861
      pos_in_file_local=info->pos_in_file;
1575
862
      /*
1576
863
        If we have append cache, we always open the file with
1577
864
        O_APPEND which moves the pos to EOF automatically on every write
1578
865
      */
1579
866
      if (!append_cache && info->seek_not_done)
1580
867
      {                                 /* File touched, do seek */
1581
 
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
868
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
1582
869
        {
1583
 
          UNLOCK_APPEND_BUFFER;
 
870
          unlock_append_buffer(info, need_append_buffer_lock);
1584
871
          return((info->error= -1));
1585
872
        }
1586
873
        if (!append_cache)
1587
874
          info->seek_not_done=0;
1588
875
      }
1589
876
      if (!append_cache)
1590
 
        info->pos_in_file+=length;
 
877
        info->pos_in_file+=length_local;
1591
878
      info->write_end= (info->write_buffer+info->buffer_length-
1592
 
                        ((pos_in_file+length) & (IO_SIZE-1)));
 
879
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
1593
880
 
1594
 
      if (my_write(info->file,info->write_buffer,length,
 
881
      if (my_write(info->file,info->write_buffer,length_local,
1595
882
                   info->myflags | MY_NABP))
1596
883
        info->error= -1;
1597
884
      else
1598
885
        info->error= 0;
1599
886
      if (!append_cache)
1600
887
      {
1601
 
        set_if_bigger(info->end_of_file,(pos_in_file+length));
 
888
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
1602
889
      }
1603
890
      else
1604
891
      {
1608
895
      }
1609
896
 
1610
897
      info->append_read_pos=info->write_pos=info->write_buffer;
1611
 
      UNLOCK_APPEND_BUFFER;
 
898
      unlock_append_buffer(info, need_append_buffer_lock);
1612
899
      return(info->error);
1613
900
    }
1614
901
  }
1619
906
    info->inited=0;
1620
907
  }
1621
908
#endif
1622
 
  UNLOCK_APPEND_BUFFER;
 
909
  unlock_append_buffer(info, need_append_buffer_lock);
1623
910
  return(0);
1624
911
}
1625
912
 
1626
 
/*
1627
 
  Free an IO_CACHE object
1628
 
 
1629
 
  SYNOPSOS
1630
 
    end_io_cache()
1631
 
    info                IO_CACHE Handle to free
1632
 
 
1633
 
  NOTES
1634
 
    It's currently safe to call this if one has called init_io_cache()
1635
 
    on the 'info' object, even if init_io_cache() failed.
1636
 
    This function is also safe to call twice with the same handle.
1637
 
 
1638
 
  RETURN
1639
 
   0  ok
1640
 
   #  Error
1641
 
*/
1642
 
 
1643
 
int end_io_cache(IO_CACHE *info)
 
913
/**
 
914
 * @brief
 
915
 *   Free an st_io_cache object
 
916
 * 
 
917
 * @detail
 
918
 *   It's currently safe to call this if one has called init_io_cache()
 
919
 *   on the 'info' object, even if init_io_cache() failed.
 
920
 *   This function is also safe to call twice with the same handle.
 
921
 * 
 
922
 * @param info st_io_cache Handle to free
 
923
 * 
 
924
 * @retval 0 ok
 
925
 * @retval # Error
 
926
 */
 
927
int st_io_cache::end_io_cache()
1644
928
{
1645
 
  int error=0;
1646
 
  IO_CACHE_CALLBACK pre_close;
1647
 
 
1648
 
  /*
1649
 
    Every thread must call remove_io_thread(). The last one destroys
1650
 
    the share elements.
1651
 
  */
1652
 
  assert(!info->share || !info->share->total_threads);
1653
 
 
1654
 
  if ((pre_close=info->pre_close))
1655
 
  {
1656
 
    (*pre_close)(info);
1657
 
    info->pre_close= 0;
1658
 
  }
1659
 
  if (info->alloced_buffer)
1660
 
  {
1661
 
    info->alloced_buffer=0;
1662
 
    if (info->file != -1)                       /* File doesn't exist */
1663
 
      error= my_b_flush_io_cache(info,1);
1664
 
    free((unsigned char*) info->buffer);
1665
 
    info->buffer=info->read_pos=(unsigned char*) 0;
1666
 
  }
1667
 
  if (info->type == SEQ_READ_APPEND)
1668
 
  {
1669
 
    /* Destroy allocated mutex */
1670
 
    info->type= TYPE_NOT_SET;
1671
 
    pthread_mutex_destroy(&info->append_buffer_lock);
1672
 
  }
1673
 
  return(error);
 
929
  int _error=0;
 
930
 
 
931
  if (pre_close)
 
932
  {
 
933
    (*pre_close)(this);
 
934
    pre_close= 0;
 
935
  }
 
936
  if (alloced_buffer)
 
937
  {
 
938
    if (type == READ_CACHE)
 
939
      global_read_buffer.sub(buffer_length);
 
940
    alloced_buffer=0;
 
941
    if (file != -1)                     /* File doesn't exist */
 
942
      _error= my_b_flush_io_cache(this, 1);
 
943
    free((unsigned char*) buffer);
 
944
    buffer=read_pos=(unsigned char*) 0;
 
945
  }
 
946
 
 
947
  return _error;
1674
948
} /* end_io_cache */
1675
949
 
1676
950
} /* namespace internal */
1677
951
} /* namespace drizzled */
1678
 
 
1679
 
/**********************************************************************
1680
 
 Testing of MF_IOCACHE
1681
 
**********************************************************************/
1682
 
 
1683
 
#ifdef MAIN
1684
 
 
1685
 
void die(const char* fmt, ...)
1686
 
{
1687
 
  va_list va_args;
1688
 
  va_start(va_args,fmt);
1689
 
  fprintf(stderr,"Error:");
1690
 
  vfprintf(stderr, fmt,va_args);
1691
 
  fprintf(stderr,", errno=%d\n", errno);
1692
 
  exit(1);
1693
 
}
1694
 
 
1695
 
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1696
 
{
1697
 
  int fd;
1698
 
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1699
 
    die("Could not open %s", fname);
1700
 
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1701
 
    die("failed in init_io_cache()");
1702
 
  return fd;
1703
 
}
1704
 
 
1705
 
void close_file(IO_CACHE* info)
1706
 
{
1707
 
  end_io_cache(info);
1708
 
  my_close(info->file, MYF(MY_WME));
1709
 
}
1710
 
 
1711
 
int main(int argc, char** argv)
1712
 
{
1713
 
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1714
 
  MY_STAT status;
1715
 
  const char* fname="/tmp/iocache.test";
1716
 
  int cache_size=16384;
1717
 
  char llstr_buf[22];
1718
 
  int max_block,total_bytes=0;
1719
 
  int i,num_loops=100,error=0;
1720
 
  char *p;
1721
 
  char* block, *block_end;
1722
 
  MY_INIT(argv[0]);
1723
 
  max_block = cache_size*3;
1724
 
  if (!(block=(char*)malloc(max_block)))
1725
 
    die("Not enough memory to allocate test block");
1726
 
  block_end = block + max_block;
1727
 
  for (p = block,i=0; p < block_end;i++)
1728
 
  {
1729
 
    *p++ = (char)i;
1730
 
  }
1731
 
  if (my_stat(fname,&status, MYF(0)) &&
1732
 
      my_delete(fname,MYF(MY_WME)))
1733
 
    {
1734
 
      die("Delete of %s failed, aborting", fname);
1735
 
    }
1736
 
  open_file(fname,&sra_cache, cache_size);
1737
 
  for (i = 0; i < num_loops; i++)
1738
 
  {
1739
 
    char buf[4];
1740
 
    int block_size = abs(rand() % max_block);
1741
 
    int4store(buf, block_size);
1742
 
    if (my_b_append(&sra_cache,buf,4) ||
1743
 
        my_b_append(&sra_cache, block, block_size))
1744
 
      die("write failed");
1745
 
    total_bytes += 4+block_size;
1746
 
  }
1747
 
  close_file(&sra_cache);
1748
 
  free(block);
1749
 
  if (!my_stat(fname,&status,MYF(MY_WME)))
1750
 
    die("%s failed to stat, but I had just closed it,\
1751
 
 wonder how that happened");
1752
 
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
1753
 
         llstr(status.st_size,llstr_buf),
1754
 
         total_bytes);
1755
 
  my_delete(fname, MYF(MY_WME));
1756
 
  /* check correctness of tests */
1757
 
  if (total_bytes != status.st_size)
1758
 
  {
1759
 
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
1760
 
supposedly written\n");
1761
 
    error=1;
1762
 
  }
1763
 
  exit(error);
1764
 
  return 0;
1765
 
}
1766
 
#endif