~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Brian Aker
  • Date: 2009-08-18 07:19:56 UTC
  • mfrom: (1116.1.3 stewart)
  • mto: This revision was merged to the branch mainline in revision 1118.
  • Revision ID: brian@gaz-20090818071956-nfpoe9rp3i7p50kx
Merge my branch from Stewart into one branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include <config.h>
51
 
 
52
 
#include <drizzled/internal/my_sys.h>
53
 
#include <drizzled/internal/m_string.h>
54
 
#include <drizzled/drizzled.h>
 
50
#include "mysys/mysys_priv.h"
 
51
#include <mystrings/m_string.h>
55
52
#ifdef HAVE_AIOWAIT
56
 
#include <drizzled/error.h>
57
 
#include <drizzled/internal/aio_result.h>
 
53
#include "mysys/mysys_err.h"
 
54
#include <mysys/aio_result.h>
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include <drizzled/internal/iocache.h>
 
57
#include <mysys/iocache.h>
61
58
#include <errno.h>
62
59
#include <drizzled/util/test.h>
63
60
#include <stdlib.h>
65
62
 
66
63
using namespace std;
67
64
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
65
#define lock_append_buffer(info) \
 
66
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
67
#define unlock_append_buffer(info) \
 
68
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
69
 
 
70
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
71
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
72
 
 
73
/*
 
74
  Setup internal pointers inside IO_CACHE
 
75
 
 
76
  SYNOPSIS
 
77
    setup_io_cache()
 
78
    info                IO_CACHE handler
 
79
 
 
80
  NOTES
 
81
    This is called on automaticly on init or reinit of IO_CACHE
 
82
    It must be called externally if one moves or copies an IO_CACHE
 
83
    object.
 
84
*/
 
85
 
 
86
void setup_io_cache(IO_CACHE* info)
126
87
{
127
88
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
89
  if (info->type == WRITE_CACHE)
129
90
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
91
    info->current_pos= &info->write_pos;
 
92
    info->current_end= &info->write_end;
132
93
  }
133
94
  else
134
95
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
96
    info->current_pos= &info->read_pos;
 
97
    info->current_end= &info->read_end;
137
98
  }
138
99
}
139
100
 
140
101
 
141
 
void st_io_cache::init_functions()
 
102
static void
 
103
init_functions(IO_CACHE* info)
142
104
{
 
105
  enum cache_type type= info->type;
143
106
  switch (type) {
144
107
  case READ_NET:
145
108
    /*
150
113
      as myisamchk
151
114
    */
152
115
    break;
 
116
  case SEQ_READ_APPEND:
 
117
    info->read_function = _my_b_seq_read;
 
118
    info->write_function = 0;                   /* Force a core if used */
 
119
    break;
153
120
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
121
    info->read_function =
 
122
                          info->share ? _my_b_read_r :
 
123
                                        _my_b_read;
 
124
    info->write_function = _my_b_write;
156
125
  }
157
126
 
158
 
  setup_io_cache();
 
127
  setup_io_cache(info);
159
128
}
160
129
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
130
 
 
131
/*
 
132
  Initialize an IO_CACHE object
 
133
 
 
134
  SYNOPSOS
 
135
    init_io_cache()
 
136
    info                cache handler to initialize
 
137
    file                File that should be associated to to the handler
 
138
                        If == -1 then real_open_cached_file()
 
139
                        will be called when it's time to open file.
 
140
    cachesize           Size of buffer to allocate for read/write
 
141
                        If == 0 then use my_default_record_cache_size
 
142
    type                Type of cache
 
143
    seek_offset         Where cache should start reading/writing
 
144
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
145
    cache_myflags       Bitmap of differnt flags
 
146
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
147
                        MY_DONT_CHECK_FILESIZE
 
148
 
 
149
  RETURN
 
150
    0  ok
 
151
    #  error
 
152
*/
 
153
 
 
154
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
155
                  enum cache_type type, my_off_t seek_offset,
 
156
                  bool use_async_io, myf cache_myflags)
181
157
{
182
158
  size_t min_cache;
183
159
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
160
  my_off_t end_of_file= ~(my_off_t) 0;
185
161
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
162
  info->file= file;
 
163
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
164
  info->pos_in_file= seek_offset;
 
165
  info->pre_close = info->pre_read = info->post_read = 0;
 
166
  info->arg = 0;
 
167
  info->alloced_buffer = 0;
 
168
  info->buffer=0;
 
169
  info->seek_not_done= 0;
194
170
 
195
171
  if (file >= 0)
196
172
  {
197
173
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
174
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
199
175
    {
200
176
      /*
201
177
         This kind of object doesn't support seek() or tell(). Don't set a
202
178
         flag that will make us again try to seek() later and fail.
203
179
      */
204
 
      seek_not_done= 0;
 
180
      info->seek_not_done= 0;
205
181
      /*
206
182
        Additionally, if we're supposed to start somewhere other than the
207
183
        the beginning of whatever this file is, then somebody made a bad
210
186
      assert(seek_offset == 0);
211
187
    }
212
188
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
189
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
214
190
  }
215
191
 
 
192
  info->share=0;
 
193
 
216
194
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
195
    return(1);                          /* No cache requested */
218
196
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
197
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
198
  {                                             /* Assume file isn't growing */
221
199
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
200
    {
223
201
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
202
      end_of_file=lseek(file,0L,SEEK_END);
225
203
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
204
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
205
      if (end_of_file < seek_offset)
 
206
        end_of_file=seek_offset;
229
207
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
208
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
209
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
210
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
211
        use_async_io=0;                         /* No need to use async */
234
212
      }
235
213
    }
236
214
  }
237
215
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
216
  if (type != READ_NET && type != WRITE_NET)
239
217
  {
240
218
    /* Retry allocating memory in smaller blocks until we get one */
241
219
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
223
      if (cachesize < min_cache)
246
224
        cachesize = min_cache;
247
225
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
 
226
      if (type == SEQ_READ_APPEND)
 
227
        buffer_block *= 2;
 
228
      if ((info->buffer=
255
229
           (unsigned char*) malloc(buffer_block)) != 0)
256
230
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
231
        info->write_buffer=info->buffer;
 
232
        if (type == SEQ_READ_APPEND)
 
233
          info->write_buffer = info->buffer + cachesize;
 
234
        info->alloced_buffer=1;
259
235
        break;                                  /* Enough memory found */
260
236
      }
261
237
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
238
        return(2);                              /* Can't alloc cache */
267
239
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
240
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
241
    }
272
242
  }
273
243
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
244
  info->read_length=info->buffer_length=cachesize;
 
245
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
246
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
247
  if (type == SEQ_READ_APPEND)
 
248
  {
 
249
    info->append_read_pos = info->write_pos = info->write_buffer;
 
250
    info->write_end = info->write_buffer + info->buffer_length;
 
251
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
252
  }
277
253
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
254
  if (type == WRITE_CACHE)
 
255
    info->write_end=
 
256
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
257
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
258
    info->read_end=info->buffer;                /* Nothing in cache */
283
259
 
284
260
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
261
  info->end_of_file= end_of_file;
 
262
  info->error=0;
 
263
  info->type= type;
 
264
  init_functions(info);
289
265
#ifdef HAVE_AIOWAIT
290
266
  if (use_async_io && ! my_disable_async_io)
291
267
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
268
    info->read_length/=2;
 
269
    info->read_function=_my_b_async_read;
294
270
  }
295
 
  inited= aio_result.pending= 0;
 
271
  info->inited=info->aio_result.pending=0;
296
272
#endif
297
 
  return 0;
 
273
  return(0);
298
274
}                                               /* init_io_cache */
299
275
 
300
276
        /* Wait until current request is ready */
319
295
        break;
320
296
    }
321
297
  }
 
298
  return;
322
299
}
323
300
#endif
324
301
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
302
 
 
303
/*
 
304
  Use this to reset cache to re-start reading or to change the type
 
305
  between READ_CACHE <-> WRITE_CACHE
 
306
  If we are doing a reinit of a cache where we have the start of the file
 
307
  in the cache, we are reusing this memory without flushing it to disk.
 
308
*/
 
309
 
 
310
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
311
                        my_off_t seek_offset,
 
312
                        bool use_async_io,
 
313
                        bool clear_cache)
339
314
{
340
315
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
316
  assert(type != READ_NET && info->type != READ_NET &&
 
317
              type != WRITE_NET && info->type != WRITE_NET &&
 
318
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
319
 
344
320
  /* If the whole file is in memory, avoid flushing to disk */
345
321
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
322
      seek_offset >= info->pos_in_file &&
 
323
      seek_offset <= my_b_tell(info))
348
324
  {
349
325
    /* Reuse current buffer without flushing it to disk */
350
326
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
327
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
328
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
329
      info->read_end=info->write_pos;
 
330
      info->end_of_file=my_b_tell(info);
355
331
      /*
356
332
        Trigger a new seek only if we have a valid
357
333
        file handle.
358
334
      */
359
 
      seek_not_done= (file != -1);
 
335
      info->seek_not_done= (info->file != -1);
360
336
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
337
    else if (type == WRITE_CACHE)
362
338
    {
363
 
      if (type == READ_CACHE)
 
339
      if (info->type == READ_CACHE)
364
340
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
341
        info->write_end=info->write_buffer+info->buffer_length;
 
342
        info->seek_not_done=1;
367
343
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
344
      info->end_of_file = ~(my_off_t) 0;
369
345
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
346
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
347
    if (type == WRITE_CACHE)
 
348
      info->write_pos=pos;
373
349
    else
374
 
      read_pos= pos;
 
350
      info->read_pos= pos;
375
351
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
352
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
353
#endif
378
354
  }
379
355
  else
382
358
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
359
      after the current positions should be ignored
384
360
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
361
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
362
      info->end_of_file=my_b_tell(info);
387
363
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
364
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
365
      return(1);
 
366
    info->pos_in_file=seek_offset;
391
367
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
368
    info->seek_not_done=1;
 
369
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
370
    if (type == READ_CACHE)
395
371
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
372
      info->read_end=info->buffer;              /* Nothing in cache */
397
373
    }
398
374
    else
399
375
    {
400
 
      write_end=(buffer + buffer_length -
 
376
      info->write_end=(info->buffer + info->buffer_length -
401
377
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
378
      info->end_of_file= ~(my_off_t) 0;
403
379
    }
404
380
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
381
  info->type=type;
 
382
  info->error=0;
 
383
  init_functions(info);
408
384
 
409
385
#ifdef HAVE_AIOWAIT
410
386
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
387
      ((uint32_t) info->buffer_length <
 
388
       (uint32_t) (info->end_of_file - seek_offset)))
413
389
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
390
    info->read_length=info->buffer_length/2;
 
391
    info->read_function=_my_b_async_read;
416
392
  }
417
 
  inited= 0;
 
393
  info->inited=0;
418
394
#else
419
395
  (void)use_async_io;
420
396
#endif
421
 
  return 0;
 
397
  return(0);
422
398
} /* reinit_io_cache */
423
399
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
400
 
 
401
 
 
402
/*
 
403
  Read buffered.
 
404
 
 
405
  SYNOPSIS
 
406
    _my_b_read()
 
407
      info                      IO_CACHE pointer
 
408
      Buffer                    Buffer to retrieve count bytes from file
 
409
      Count                     Number of bytes to read into Buffer
 
410
 
 
411
  NOTE
 
412
    This function is only called from the my_b_read() macro when there
 
413
    isn't enough characters in the buffer to satisfy the request.
 
414
 
 
415
  WARNING
 
416
 
 
417
    When changing this function, be careful with handling file offsets
 
418
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
419
    types than my_off_t unless you can be sure that their value fits.
 
420
    Same applies to differences of file offsets.
 
421
 
 
422
    When changing this function, check _my_b_read_r(). It might need the
 
423
    same change.
 
424
 
 
425
  RETURN
 
426
    0      we succeeded in reading all data
 
427
    1      Error: can't read requested characters
 
428
*/
 
429
 
 
430
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
431
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
432
  size_t length,diff_length,left_length, max_length;
 
433
  my_off_t pos_in_file;
448
434
 
449
435
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
436
  {
455
441
  }
456
442
 
457
443
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
444
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
445
 
460
446
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
447
    Whenever a function which operates on IO_CACHE flushes/writes
 
448
    some part of the IO_CACHE to disk it will set the property
463
449
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
450
    on the IO_CACHE.
465
451
  */
466
452
  if (info->seek_not_done)
467
453
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
454
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
469
455
    {
470
456
      /* No error, reset seek_not_done flag. */
471
457
      info->seek_not_done= 0;
477
463
        info->file is a pipe or socket or FIFO.  We never should have tried
478
464
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
465
      */
480
 
      assert(errno != ESPIPE);
 
466
      assert(my_errno != ESPIPE);
481
467
      info->error= -1;
482
468
      return(1);
483
469
    }
484
470
  }
485
471
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
472
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
473
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
474
  {                                     /* Fill first intern buffer */
489
475
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
476
    if (info->end_of_file <= pos_in_file)
491
477
    {                                   /* End of file */
492
478
      info->error= (int) left_length;
493
479
      return(1);
494
480
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
481
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
482
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
483
        != length)
497
484
    {
498
485
      info->error= (read_length == (size_t) -1 ? -1 :
499
486
                    (int) (read_length+left_length));
500
487
      return(1);
501
488
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
489
    Count-=length;
 
490
    Buffer+=length;
 
491
    pos_in_file+=length;
 
492
    left_length+=length;
506
493
    diff_length=0;
507
494
  }
508
495
 
509
496
  max_length= info->read_length-diff_length;
510
497
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
498
      max_length > (info->end_of_file - pos_in_file))
 
499
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
500
  if (!max_length)
514
501
  {
515
502
    if (Count)
516
503
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
504
      info->error= left_length;         /* We only got this many char */
518
505
      return(1);
519
506
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
507
    length=0;                           /* Didn't read any chars */
521
508
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
509
  else if ((length= my_read(info->file,info->buffer, max_length,
523
510
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
511
           length == (size_t) -1)
525
512
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
513
    if (length != (size_t) -1)
 
514
      memcpy(Buffer, info->buffer, length);
 
515
    info->pos_in_file= pos_in_file;
 
516
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
517
    info->read_pos=info->read_end=info->buffer;
531
518
    return(1);
532
519
  }
533
520
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
521
  info->read_end=info->buffer+length;
 
522
  info->pos_in_file=pos_in_file;
536
523
  memcpy(Buffer, info->buffer, Count);
537
524
  return(0);
538
525
}
539
526
 
540
527
 
 
528
/*
 
529
  Prepare IO_CACHE for shared use.
 
530
 
 
531
  SYNOPSIS
 
532
    init_io_cache_share()
 
533
      read_cache                A read cache. This will be copied for
 
534
                                every thread after setup.
 
535
      cshare                    The share.
 
536
      write_cache               If non-NULL a write cache that is to be
 
537
                                synchronized with the read caches.
 
538
      num_threads               Number of threads sharing the cache
 
539
                                including the write thread if any.
 
540
 
 
541
  DESCRIPTION
 
542
 
 
543
    The shared cache is used so: One IO_CACHE is initialized with
 
544
    init_io_cache(). This includes the allocation of a buffer. Then a
 
545
    share is allocated and init_io_cache_share() is called with the io
 
546
    cache and the share. Then the io cache is copied for each thread. So
 
547
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
548
    is shared because cache->buffer is the same for all caches.
 
549
 
 
550
    One thread reads data from the file into the buffer. All threads
 
551
    read from the buffer, but every thread maintains its own set of
 
552
    pointers into the buffer. When all threads have used up the buffer
 
553
    contents, one of the threads reads the next block of data into the
 
554
    buffer. To accomplish this, each thread enters the cache lock before
 
555
    accessing the buffer. They wait in lock_io_cache() until all threads
 
556
    joined the lock. The last thread entering the lock is in charge of
 
557
    reading from file to buffer. It wakes all threads when done.
 
558
 
 
559
    Synchronizing a write cache to the read caches works so: Whenever
 
560
    the write buffer needs a flush, the write thread enters the lock and
 
561
    waits for all other threads to enter the lock too. They do this when
 
562
    they have used up the read buffer. When all threads are in the lock,
 
563
    the write thread copies the write buffer to the read buffer and
 
564
    wakes all threads.
 
565
 
 
566
    share->running_threads is the number of threads not being in the
 
567
    cache lock. When entering lock_io_cache() the number is decreased.
 
568
    When the thread that fills the buffer enters unlock_io_cache() the
 
569
    number is reset to the number of threads. The condition
 
570
    running_threads == 0 means that all threads are in the lock. Bumping
 
571
    up the number to the full count is non-intuitive. But increasing the
 
572
    number by one for each thread that leaves the lock could lead to a
 
573
    solo run of one thread. The last thread to join a lock reads from
 
574
    file to buffer, wakes the other threads, processes the data in the
 
575
    cache and enters the lock again. If no other thread left the lock
 
576
    meanwhile, it would think it's the last one again and read the next
 
577
    block...
 
578
 
 
579
    The share has copies of 'error', 'buffer', 'read_end', and
 
580
    'pos_in_file' from the thread that filled the buffer. We may not be
 
581
    able to access this information directly from its cache because the
 
582
    thread may be removed from the share before the variables could be
 
583
    copied by all other threads. Or, if a write buffer is synchronized,
 
584
    it would change its 'pos_in_file' after waking the other threads,
 
585
    possibly before they could copy its value.
 
586
 
 
587
    However, the 'buffer' variable in the share is for a synchronized
 
588
    write cache. It needs to know where to put the data. Otherwise it
 
589
    would need access to the read cache of one of the threads that is
 
590
    not yet removed from the share.
 
591
 
 
592
  RETURN
 
593
    void
 
594
*/
 
595
 
 
596
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
597
                         IO_CACHE *write_cache, uint32_t num_threads)
 
598
{
 
599
  assert(num_threads > 1);
 
600
  assert(read_cache->type == READ_CACHE);
 
601
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
602
 
 
603
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
604
  pthread_cond_init(&cshare->cond, 0);
 
605
  pthread_cond_init(&cshare->cond_writer, 0);
 
606
 
 
607
  cshare->running_threads= num_threads;
 
608
  cshare->total_threads=   num_threads;
 
609
  cshare->error=           0;    /* Initialize. */
 
610
  cshare->buffer=          read_cache->buffer;
 
611
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
612
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
613
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
614
 
 
615
  read_cache->share=         cshare;
 
616
  read_cache->read_function= _my_b_read_r;
 
617
  read_cache->current_pos=   NULL;
 
618
  read_cache->current_end=   NULL;
 
619
 
 
620
  if (write_cache)
 
621
    write_cache->share= cshare;
 
622
 
 
623
  return;
 
624
}
 
625
 
 
626
 
 
627
/*
 
628
  Remove a thread from shared access to IO_CACHE.
 
629
 
 
630
  SYNOPSIS
 
631
    remove_io_thread()
 
632
      cache                     The IO_CACHE to be removed from the share.
 
633
 
 
634
  NOTE
 
635
 
 
636
    Every thread must do that on exit for not to deadlock other threads.
 
637
 
 
638
    The last thread destroys the pthread resources.
 
639
 
 
640
    A writer flushes its cache first.
 
641
 
 
642
  RETURN
 
643
    void
 
644
*/
 
645
 
 
646
void remove_io_thread(IO_CACHE *cache)
 
647
{
 
648
  IO_CACHE_SHARE *cshare= cache->share;
 
649
  uint32_t total;
 
650
 
 
651
  /* If the writer goes, it needs to flush the write cache. */
 
652
  if (cache == cshare->source_cache)
 
653
    flush_io_cache(cache);
 
654
 
 
655
  pthread_mutex_lock(&cshare->mutex);
 
656
 
 
657
  /* Remove from share. */
 
658
  total= --cshare->total_threads;
 
659
 
 
660
  /* Detach from share. */
 
661
  cache->share= NULL;
 
662
 
 
663
  /* If the writer goes, let the readers know. */
 
664
  if (cache == cshare->source_cache)
 
665
  {
 
666
    cshare->source_cache= NULL;
 
667
  }
 
668
 
 
669
  /* If all threads are waiting for me to join the lock, wake them. */
 
670
  if (!--cshare->running_threads)
 
671
  {
 
672
    pthread_cond_signal(&cshare->cond_writer);
 
673
    pthread_cond_broadcast(&cshare->cond);
 
674
  }
 
675
 
 
676
  pthread_mutex_unlock(&cshare->mutex);
 
677
 
 
678
  if (!total)
 
679
  {
 
680
    pthread_cond_destroy (&cshare->cond_writer);
 
681
    pthread_cond_destroy (&cshare->cond);
 
682
    pthread_mutex_destroy(&cshare->mutex);
 
683
  }
 
684
 
 
685
  return;
 
686
}
 
687
 
 
688
 
 
689
/*
 
690
  Lock IO cache and wait for all other threads to join.
 
691
 
 
692
  SYNOPSIS
 
693
    lock_io_cache()
 
694
      cache                     The cache of the thread entering the lock.
 
695
      pos                       File position of the block to read.
 
696
                                Unused for the write thread.
 
697
 
 
698
  DESCRIPTION
 
699
 
 
700
    Wait for all threads to finish with the current buffer. We want
 
701
    all threads to proceed in concert. The last thread to join
 
702
    lock_io_cache() will read the block from file and all threads start
 
703
    to use it. Then they will join again for reading the next block.
 
704
 
 
705
    The waiting threads detect a fresh buffer by comparing
 
706
    cshare->pos_in_file with the position they want to process next.
 
707
    Since the first block may start at position 0, we take
 
708
    cshare->read_end as an additional condition. This variable is
 
709
    initialized to NULL and will be set after a block of data is written
 
710
    to the buffer.
 
711
 
 
712
  RETURN
 
713
    1           OK, lock in place, go ahead and read.
 
714
    0           OK, unlocked, another thread did the read.
 
715
*/
 
716
 
 
717
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
718
{
 
719
  IO_CACHE_SHARE *cshare= cache->share;
 
720
 
 
721
  /* Enter the lock. */
 
722
  pthread_mutex_lock(&cshare->mutex);
 
723
  cshare->running_threads--;
 
724
 
 
725
  if (cshare->source_cache)
 
726
  {
 
727
    /* A write cache is synchronized to the read caches. */
 
728
 
 
729
    if (cache == cshare->source_cache)
 
730
    {
 
731
      /* The writer waits until all readers are here. */
 
732
      while (cshare->running_threads)
 
733
      {
 
734
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
735
      }
 
736
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
737
      return(1);
 
738
    }
 
739
 
 
740
    /* The last thread wakes the writer. */
 
741
    if (!cshare->running_threads)
 
742
    {
 
743
      pthread_cond_signal(&cshare->cond_writer);
 
744
    }
 
745
 
 
746
    /*
 
747
      Readers wait until the data is copied from the writer. Another
 
748
      reason to stop waiting is the removal of the write thread. If this
 
749
      happens, we leave the lock with old data in the buffer.
 
750
    */
 
751
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
752
           cshare->source_cache)
 
753
    {
 
754
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
755
    }
 
756
 
 
757
    /*
 
758
      If the writer was removed from the share while this thread was
 
759
      asleep, we need to simulate an EOF condition. The writer cannot
 
760
      reset the share variables as they might still be in use by readers
 
761
      of the last block. When we awake here then because the last
 
762
      joining thread signalled us. If the writer is not the last, it
 
763
      will not signal. So it is safe to clear the buffer here.
 
764
    */
 
765
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
766
    {
 
767
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
768
      cshare->error= 0; /* EOF is not an error. */
 
769
    }
 
770
  }
 
771
  else
 
772
  {
 
773
    /*
 
774
      There are read caches only. The last thread arriving in
 
775
      lock_io_cache() continues with a locked cache and reads the block.
 
776
    */
 
777
    if (!cshare->running_threads)
 
778
    {
 
779
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
780
      return(1);
 
781
    }
 
782
 
 
783
    /*
 
784
      All other threads wait until the requested block is read by the
 
785
      last thread arriving. Another reason to stop waiting is the
 
786
      removal of a thread. If this leads to all threads being in the
 
787
      lock, we have to continue also. The first of the awaken threads
 
788
      will then do the read.
 
789
    */
 
790
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
791
           cshare->running_threads)
 
792
    {
 
793
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
794
    }
 
795
 
 
796
    /* If the block is not yet read, continue with a locked cache and read. */
 
797
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
798
    {
 
799
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
800
      return(1);
 
801
    }
 
802
 
 
803
    /* Another thread did read the block already. */
 
804
  }
 
805
 
 
806
  /*
 
807
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
808
    filled the buffer did this and marked all threads as running.
 
809
  */
 
810
  pthread_mutex_unlock(&cshare->mutex);
 
811
  return(0);
 
812
}
 
813
 
 
814
 
 
815
/*
 
816
  Unlock IO cache.
 
817
 
 
818
  SYNOPSIS
 
819
    unlock_io_cache()
 
820
      cache                     The cache of the thread leaving the lock.
 
821
 
 
822
  NOTE
 
823
    This is called by the thread that filled the buffer. It marks all
 
824
    threads as running and awakes them. This must not be done by any
 
825
    other thread.
 
826
 
 
827
    Do not signal cond_writer. Either there is no writer or the writer
 
828
    is the only one who can call this function.
 
829
 
 
830
    The reason for resetting running_threads to total_threads before
 
831
    waking all other threads is that it could be possible that this
 
832
    thread is so fast with processing the buffer that it enters the lock
 
833
    before even one other thread has left it. If every awoken thread
 
834
    would increase running_threads by one, this thread could think that
 
835
    he is again the last to join and would not wait for the other
 
836
    threads to process the data.
 
837
 
 
838
  RETURN
 
839
    void
 
840
*/
 
841
 
 
842
static void unlock_io_cache(IO_CACHE *cache)
 
843
{
 
844
  IO_CACHE_SHARE *cshare= cache->share;
 
845
 
 
846
  cshare->running_threads= cshare->total_threads;
 
847
  pthread_cond_broadcast(&cshare->cond);
 
848
  pthread_mutex_unlock(&cshare->mutex);
 
849
  return;
 
850
}
 
851
 
 
852
 
 
853
/*
 
854
  Read from IO_CACHE when it is shared between several threads.
 
855
 
 
856
  SYNOPSIS
 
857
    _my_b_read_r()
 
858
      cache                     IO_CACHE pointer
 
859
      Buffer                    Buffer to retrieve count bytes from file
 
860
      Count                     Number of bytes to read into Buffer
 
861
 
 
862
  NOTE
 
863
    This function is only called from the my_b_read() macro when there
 
864
    isn't enough characters in the buffer to satisfy the request.
 
865
 
 
866
  IMPLEMENTATION
 
867
 
 
868
    It works as follows: when a thread tries to read from a file (that
 
869
    is, after using all the data from the (shared) buffer), it just
 
870
    hangs on lock_io_cache(), waiting for other threads. When the very
 
871
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
872
    does actual IO and unlock_io_cache(), which signals all the waiting
 
873
    threads that data is in the buffer.
 
874
 
 
875
  WARNING
 
876
 
 
877
    When changing this function, be careful with handling file offsets
 
878
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
879
    types than my_off_t unless you can be sure that their value fits.
 
880
    Same applies to differences of file offsets. (Bug #11527)
 
881
 
 
882
    When changing this function, check _my_b_read(). It might need the
 
883
    same change.
 
884
 
 
885
  RETURN
 
886
    0      we succeeded in reading all data
 
887
    1      Error: can't read requested characters
 
888
*/
 
889
 
 
890
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
891
{
 
892
  my_off_t pos_in_file;
 
893
  size_t length, diff_length, left_length;
 
894
  IO_CACHE_SHARE *cshare= cache->share;
 
895
 
 
896
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
897
  {
 
898
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
899
    memcpy(Buffer, cache->read_pos, left_length);
 
900
    Buffer+= left_length;
 
901
    Count-= left_length;
 
902
  }
 
903
  while (Count)
 
904
  {
 
905
    size_t cnt, len;
 
906
 
 
907
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
908
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
909
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
910
    length= ((length <= cache->read_length) ?
 
911
             length + IO_ROUND_DN(cache->read_length - length) :
 
912
             length - IO_ROUND_UP(length - cache->read_length));
 
913
    if (cache->type != READ_FIFO &&
 
914
        (length > (cache->end_of_file - pos_in_file)))
 
915
      length= (size_t) (cache->end_of_file - pos_in_file);
 
916
    if (length == 0)
 
917
    {
 
918
      cache->error= (int) left_length;
 
919
      return(1);
 
920
    }
 
921
    if (lock_io_cache(cache, pos_in_file))
 
922
    {
 
923
      /* With a synchronized write/read cache we won't come here... */
 
924
      assert(!cshare->source_cache);
 
925
      /*
 
926
        ... unless the writer has gone before this thread entered the
 
927
        lock. Simulate EOF in this case. It can be distinguished by
 
928
        cache->file.
 
929
      */
 
930
      if (cache->file < 0)
 
931
        len= 0;
 
932
      else
 
933
      {
 
934
        /*
 
935
          Whenever a function which operates on IO_CACHE flushes/writes
 
936
          some part of the IO_CACHE to disk it will set the property
 
937
          "seek_not_done" to indicate this to other functions operating
 
938
          on the IO_CACHE.
 
939
        */
 
940
        if (cache->seek_not_done)
 
941
        {
 
942
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
943
          {
 
944
            cache->error= -1;
 
945
            unlock_io_cache(cache);
 
946
            return(1);
 
947
          }
 
948
        }
 
949
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
950
      }
 
951
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
952
      cache->error=       (len == length ? 0 : (int) len);
 
953
      cache->pos_in_file= pos_in_file;
 
954
 
 
955
      /* Copy important values to the share. */
 
956
      cshare->error=       cache->error;
 
957
      cshare->read_end=    cache->read_end;
 
958
      cshare->pos_in_file= pos_in_file;
 
959
 
 
960
      /* Mark all threads as running and wake them. */
 
961
      unlock_io_cache(cache);
 
962
    }
 
963
    else
 
964
    {
 
965
      /*
 
966
        With a synchronized write/read cache readers always come here.
 
967
        Copy important values from the share.
 
968
      */
 
969
      cache->error=       cshare->error;
 
970
      cache->read_end=    cshare->read_end;
 
971
      cache->pos_in_file= cshare->pos_in_file;
 
972
 
 
973
      len= ((cache->error == -1) ? (size_t) -1 :
 
974
            (size_t) (cache->read_end - cache->buffer));
 
975
    }
 
976
    cache->read_pos=      cache->buffer;
 
977
    cache->seek_not_done= 0;
 
978
    if (len == 0 || len == (size_t) -1)
 
979
    {
 
980
      cache->error= (int) left_length;
 
981
      return(1);
 
982
    }
 
983
    cnt= (len > Count) ? Count : len;
 
984
    memcpy(Buffer, cache->read_pos, cnt);
 
985
    Count -= cnt;
 
986
    Buffer+= cnt;
 
987
    left_length+= cnt;
 
988
    cache->read_pos+= cnt;
 
989
  }
 
990
  return(0);
 
991
}
 
992
 
 
993
 
 
994
/*
 
995
  Copy data from write cache to read cache.
 
996
 
 
997
  SYNOPSIS
 
998
    copy_to_read_buffer()
 
999
      write_cache               The write cache.
 
1000
      write_buffer              The source of data, mostly the cache buffer.
 
1001
      write_length              The number of bytes to copy.
 
1002
 
 
1003
  NOTE
 
1004
    The write thread will wait for all read threads to join the cache
 
1005
    lock. Then it copies the data over and wakes the read threads.
 
1006
 
 
1007
  RETURN
 
1008
    void
 
1009
*/
 
1010
 
 
1011
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1012
                                const unsigned char *write_buffer, size_t write_length)
 
1013
{
 
1014
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1015
 
 
1016
  assert(cshare->source_cache == write_cache);
 
1017
  /*
 
1018
    write_length is usually less or equal to buffer_length.
 
1019
    It can be bigger if _my_b_write() is called with a big length.
 
1020
  */
 
1021
  while (write_length)
 
1022
  {
 
1023
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1024
    int  rc;
 
1025
 
 
1026
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1027
    /* The writing thread does always have the lock when it awakes. */
 
1028
    assert(rc);
 
1029
 
 
1030
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1031
 
 
1032
    cshare->error=       0;
 
1033
    cshare->read_end=    cshare->buffer + copy_length;
 
1034
    cshare->pos_in_file= write_cache->pos_in_file;
 
1035
 
 
1036
    /* Mark all threads as running and wake them. */
 
1037
    unlock_io_cache(write_cache);
 
1038
 
 
1039
    write_buffer+= copy_length;
 
1040
    write_length-= copy_length;
 
1041
  }
 
1042
}
 
1043
 
 
1044
 
 
1045
/*
 
1046
  Do sequential read from the SEQ_READ_APPEND cache.
 
1047
 
 
1048
  We do this in three stages:
 
1049
   - first read from info->buffer
 
1050
   - then if there are still data to read, try the file descriptor
 
1051
   - afterwards, if there are still data to read, try append buffer
 
1052
 
 
1053
  RETURNS
 
1054
    0  Success
 
1055
    1  Failed to read
 
1056
*/
 
1057
 
 
1058
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1059
{
 
1060
  size_t length, diff_length, left_length, save_count, max_length;
 
1061
  my_off_t pos_in_file;
 
1062
  save_count=Count;
 
1063
 
 
1064
  /* first, read the regular buffer */
 
1065
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1066
  {
 
1067
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1068
    memcpy(Buffer,info->read_pos, left_length);
 
1069
    Buffer+=left_length;
 
1070
    Count-=left_length;
 
1071
  }
 
1072
  lock_append_buffer(info);
 
1073
 
 
1074
  /* pos_in_file always point on where info->buffer was read */
 
1075
  if ((pos_in_file=info->pos_in_file +
 
1076
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1077
    goto read_append_buffer;
 
1078
 
 
1079
  /*
 
1080
    With read-append cache we must always do a seek before we read,
 
1081
    because the write could have moved the file pointer astray
 
1082
  */
 
1083
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1084
  {
 
1085
   info->error= -1;
 
1086
   unlock_append_buffer(info);
 
1087
   return (1);
 
1088
  }
 
1089
  info->seek_not_done=0;
 
1090
 
 
1091
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1092
 
 
1093
  /* now the second stage begins - read from file descriptor */
 
1094
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1095
  {
 
1096
    /* Fill first intern buffer */
 
1097
    size_t read_length;
 
1098
 
 
1099
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1100
    if ((read_length= my_read(info->file,Buffer, length,
 
1101
                              info->myflags)) == (size_t) -1)
 
1102
    {
 
1103
      info->error= -1;
 
1104
      unlock_append_buffer(info);
 
1105
      return 1;
 
1106
    }
 
1107
    Count-=read_length;
 
1108
    Buffer+=read_length;
 
1109
    pos_in_file+=read_length;
 
1110
 
 
1111
    if (read_length != length)
 
1112
    {
 
1113
      /*
 
1114
        We only got part of data;  Read the rest of the data from the
 
1115
        write buffer
 
1116
      */
 
1117
      goto read_append_buffer;
 
1118
    }
 
1119
    left_length+=length;
 
1120
    diff_length=0;
 
1121
  }
 
1122
 
 
1123
  max_length= info->read_length-diff_length;
 
1124
  if (max_length > (info->end_of_file - pos_in_file))
 
1125
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1126
  if (!max_length)
 
1127
  {
 
1128
    if (Count)
 
1129
      goto read_append_buffer;
 
1130
    length=0;                           /* Didn't read any more chars */
 
1131
  }
 
1132
  else
 
1133
  {
 
1134
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1135
    if (length == (size_t) -1)
 
1136
    {
 
1137
      info->error= -1;
 
1138
      unlock_append_buffer(info);
 
1139
      return 1;
 
1140
    }
 
1141
    if (length < Count)
 
1142
    {
 
1143
      memcpy(Buffer, info->buffer, length);
 
1144
      Count -= length;
 
1145
      Buffer += length;
 
1146
 
 
1147
      /*
 
1148
         added the line below to make
 
1149
         assert(pos_in_file==info->end_of_file) pass.
 
1150
         otherwise this does not appear to be needed
 
1151
      */
 
1152
      pos_in_file += length;
 
1153
      goto read_append_buffer;
 
1154
    }
 
1155
  }
 
1156
  unlock_append_buffer(info);
 
1157
  info->read_pos=info->buffer+Count;
 
1158
  info->read_end=info->buffer+length;
 
1159
  info->pos_in_file=pos_in_file;
 
1160
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1161
  return 0;
 
1162
 
 
1163
read_append_buffer:
 
1164
 
 
1165
  /*
 
1166
     Read data from the current write buffer.
 
1167
     Count should never be == 0 here (The code will work even if count is 0)
 
1168
  */
 
1169
 
 
1170
  {
 
1171
    /* First copy the data to Count */
 
1172
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1173
    size_t copy_len;
 
1174
    size_t transfer_len;
 
1175
 
 
1176
    assert(info->append_read_pos <= info->write_pos);
 
1177
    /*
 
1178
      TODO: figure out if the assert below is needed or correct.
 
1179
    */
 
1180
    assert(pos_in_file == info->end_of_file);
 
1181
    copy_len=min(Count, len_in_buff);
 
1182
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1183
    info->append_read_pos += copy_len;
 
1184
    Count -= copy_len;
 
1185
    if (Count)
 
1186
      info->error = save_count - Count;
 
1187
 
 
1188
    /* Fill read buffer with data from write buffer */
 
1189
    memcpy(info->buffer, info->append_read_pos,
 
1190
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1191
    info->read_pos= info->buffer;
 
1192
    info->read_end= info->buffer+transfer_len;
 
1193
    info->append_read_pos=info->write_pos;
 
1194
    info->pos_in_file=pos_in_file+copy_len;
 
1195
    info->end_of_file+=len_in_buff;
 
1196
  }
 
1197
  unlock_append_buffer(info);
 
1198
  return Count ? 1 : 0;
 
1199
}
 
1200
 
 
1201
 
541
1202
#ifdef HAVE_AIOWAIT
542
1203
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1204
/*
 
1205
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1206
  from disk when needed.
 
1207
 
 
1208
  SYNOPSIS
 
1209
    _my_b_async_read()
 
1210
      info                      IO_CACHE pointer
 
1211
      Buffer                    Buffer to retrieve count bytes from file
 
1212
      Count                     Number of bytes to read into Buffer
 
1213
 
 
1214
  RETURN VALUE
 
1215
    -1          An error has occurred; my_errno is set.
 
1216
     0          Success
 
1217
     1          An error has occurred; IO_CACHE to error state.
 
1218
*/
 
1219
 
 
1220
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1221
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1222
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1223
  size_t max_length;
559
1224
  my_off_t next_pos_in_file;
560
1225
  unsigned char *read_buffer;
575
1240
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1241
                 my_filename(info->file),
577
1242
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1243
      my_errno=info->aio_result.result.aio_errno;
579
1244
      info->error= -1;
580
1245
      return(1);
581
1246
    }
582
1247
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1248
        read_length == (size_t) -1)
584
1249
    {
585
 
      errno=0;                          /* For testing */
 
1250
      my_errno=0;                               /* For testing */
586
1251
      info->error= (read_length == (size_t) -1 ? -1 :
587
1252
                    (int) (read_length+left_length));
588
1253
      return(1);
615
1280
      }
616
1281
    }
617
1282
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1283
    length=min(Count,read_length);
 
1284
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1285
    Buffer+=length;
 
1286
    Count-=length;
 
1287
    left_length+=length;
623
1288
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1289
    info->read_pos+=length;
625
1290
  }
626
1291
  else
627
1292
    next_pos_in_file=(info->pos_in_file+ (size_t)
659
1324
      {                                 /* Didn't find hole block */
660
1325
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1326
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1327
                   my_filename(info->file),my_errno);
663
1328
        info->error=(int) (read_length+left_length);
664
1329
        return 1;
665
1330
      }
695
1360
                (my_off_t) next_pos_in_file,SEEK_SET,
696
1361
                &info->aio_result.result))
697
1362
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1363
      my_errno=errno;
699
1364
      if (info->request_pos != info->buffer)
700
1365
      {
701
1366
        memmove(info->buffer, info->request_pos,
715
1380
#endif
716
1381
 
717
1382
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1383
/* Read one byte when buffer is empty */
 
1384
 
 
1385
int _my_b_get(IO_CACHE *info)
723
1386
{
724
1387
  unsigned char buff;
725
1388
  IO_CACHE_CALLBACK pre_read,post_read;
732
1395
  return (int) (unsigned char) buff;
733
1396
}
734
1397
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1398
/*
 
1399
   Write a byte buffer to IO_CACHE and flush to disk
 
1400
   if IO_CACHE is full.
 
1401
 
 
1402
   RETURN VALUE
 
1403
    1 On error on write
 
1404
    0 On success
 
1405
   -1 On error; my_errno contains error code.
 
1406
*/
 
1407
 
 
1408
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1409
{
745
 
  size_t rest_length,length_local;
 
1410
  size_t rest_length,length;
746
1411
 
747
1412
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1413
  {
749
 
    errno=EFBIG;
 
1414
    my_errno=errno=EFBIG;
750
1415
    return info->error = -1;
751
1416
  }
752
1417
 
760
1425
    return 1;
761
1426
  if (Count >= IO_SIZE)
762
1427
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1428
    length=Count & (size_t) ~(IO_SIZE-1);
764
1429
    if (info->seek_not_done)
765
1430
    {
766
1431
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1432
        Whenever a function which operates on IO_CACHE flushes/writes
 
1433
        some part of the IO_CACHE to disk it will set the property
769
1434
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1435
        on the IO_CACHE.
771
1436
      */
772
1437
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
773
1438
      {
776
1441
      }
777
1442
      info->seek_not_done=0;
778
1443
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1444
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1445
      return info->error= -1;
 
1446
 
 
1447
    /*
 
1448
      In case of a shared I/O cache with a writer we normally do direct
 
1449
      write cache to read cache copy. Simulate this here by direct
 
1450
      caller buffer to read cache copy. Do it after the write so that
 
1451
      the cache readers actions on the flushed part can go in parallel
 
1452
      with the write of the extra stuff. copy_to_read_buffer()
 
1453
      synchronizes writer and readers so that after this call the
 
1454
      readers can act on the extra stuff while the writer can go ahead
 
1455
      and prepare the next output. copy_to_read_buffer() relies on
 
1456
      info->pos_in_file.
 
1457
    */
 
1458
    if (info->share)
 
1459
      copy_to_read_buffer(info, Buffer, length);
 
1460
 
 
1461
    Count-=length;
 
1462
    Buffer+=length;
 
1463
    info->pos_in_file+=length;
 
1464
  }
 
1465
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1466
  info->write_pos+=Count;
 
1467
  return 0;
 
1468
}
 
1469
 
 
1470
 
 
1471
/*
 
1472
  Append a block to the write buffer.
 
1473
  This is done with the buffer locked to ensure that we don't read from
 
1474
  the write buffer before we are ready with it.
 
1475
*/
 
1476
 
 
1477
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1478
{
 
1479
  size_t rest_length,length;
 
1480
 
 
1481
  /*
 
1482
    Assert that we cannot come here with a shared cache. If we do one
 
1483
    day, we might need to add a call to copy_to_read_buffer().
 
1484
  */
 
1485
  assert(!info->share);
 
1486
 
 
1487
  lock_append_buffer(info);
 
1488
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1489
  if (Count <= rest_length)
 
1490
    goto end;
 
1491
  memcpy(info->write_pos, Buffer, rest_length);
 
1492
  Buffer+=rest_length;
 
1493
  Count-=rest_length;
 
1494
  info->write_pos+=rest_length;
 
1495
  if (my_b_flush_io_cache(info,0))
 
1496
  {
 
1497
    unlock_append_buffer(info);
 
1498
    return 1;
 
1499
  }
 
1500
  if (Count >= IO_SIZE)
 
1501
  {                                     /* Fill first intern buffer */
 
1502
    length=Count & (size_t) ~(IO_SIZE-1);
 
1503
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1504
    {
 
1505
      unlock_append_buffer(info);
 
1506
      return info->error= -1;
 
1507
    }
 
1508
    Count-=length;
 
1509
    Buffer+=length;
 
1510
    info->end_of_file+=length;
 
1511
  }
 
1512
 
 
1513
end:
 
1514
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1515
  info->write_pos+=Count;
 
1516
  unlock_append_buffer(info);
 
1517
  return 0;
 
1518
}
 
1519
 
 
1520
 
 
1521
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1522
{
 
1523
  /*
 
1524
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1525
    a possible compiler bug. At least gcc 2.95 cannot deal with
 
1526
    several layers of ternary operators that evaluated comma(,) operator
 
1527
    expressions inside - I do have a test case if somebody wants it
 
1528
  */
 
1529
  if (info->type == SEQ_READ_APPEND)
 
1530
    return my_b_append(info, Buffer, Count);
 
1531
  return my_b_write(info, Buffer, Count);
 
1532
}
 
1533
 
 
1534
 
 
1535
/*
 
1536
  Write a block to disk where part of the data may be inside the record
 
1537
  buffer.  As all write calls to the data goes through the cache,
 
1538
  we will never get a seek over the end of the buffer
 
1539
*/
 
1540
 
 
1541
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1542
                   my_off_t pos)
799
1543
{
800
 
  size_t length_local;
 
1544
  size_t length;
801
1545
  int error=0;
802
1546
 
 
1547
  /*
 
1548
    Assert that we cannot come here with a shared cache. If we do one
 
1549
    day, we might need to add a call to copy_to_read_buffer().
 
1550
  */
 
1551
  assert(!info->share);
 
1552
 
803
1553
  if (pos < info->pos_in_file)
804
1554
  {
805
1555
    /* Of no overlap, write everything without buffering */
806
1556
    if (pos + Count <= info->pos_in_file)
807
1557
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1558
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1559
    length= (uint32_t) (info->pos_in_file - pos);
 
1560
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1561
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1562
    Buffer+=length;
 
1563
    pos+=  length;
 
1564
    Count-= length;
 
1565
#ifndef HAVE_PREAD
 
1566
    info->seek_not_done=1;
 
1567
#endif
815
1568
  }
816
1569
 
817
1570
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1571
  length= (size_t) (info->write_end - info->buffer);
 
1572
  if (pos < info->pos_in_file + length)
820
1573
  {
821
1574
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1575
    length-=offset;
 
1576
    if (length > Count)
 
1577
      length=Count;
 
1578
    memcpy(info->buffer+offset, Buffer, length);
 
1579
    Buffer+=length;
 
1580
    Count-= length;
 
1581
    /* Fix length of buffer if the new data was larger */
 
1582
    if (info->buffer+length > info->write_pos)
 
1583
      info->write_pos=info->buffer+length;
831
1584
    if (!Count)
832
1585
      return (error);
833
1586
  }
837
1590
  return error;
838
1591
}
839
1592
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1593
 
 
1594
        /* Flush write cache */
 
1595
 
 
1596
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1597
  lock_append_buffer(info);
 
1598
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1599
  unlock_append_buffer(info);
 
1600
 
 
1601
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1602
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1603
  size_t length;
 
1604
  bool append_cache;
 
1605
  my_off_t pos_in_file;
 
1606
 
 
1607
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1608
    need_append_buffer_lock=0;
849
1609
 
850
1610
  if (info->type == WRITE_CACHE || append_cache)
851
1611
  {
852
1612
    if (info->file == -1)
853
1613
    {
854
 
      if (info->real_open_cached_file())
 
1614
      if (real_open_cached_file(info))
855
1615
        return((info->error= -1));
856
1616
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1617
    LOCK_APPEND_BUFFER;
858
1618
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1619
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1620
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1621
      /*
 
1622
        In case of a shared I/O cache with a writer we do direct write
 
1623
        cache to read cache copy. Do it before the write here so that
 
1624
        the readers can work in parallel with the write.
 
1625
        copy_to_read_buffer() relies on info->pos_in_file.
 
1626
      */
 
1627
      if (info->share)
 
1628
        copy_to_read_buffer(info, info->write_buffer, length);
 
1629
 
 
1630
      pos_in_file=info->pos_in_file;
862
1631
      /*
863
1632
        If we have append cache, we always open the file with
864
1633
        O_APPEND which moves the pos to EOF automatically on every write
865
1634
      */
866
1635
      if (!append_cache && info->seek_not_done)
867
1636
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1637
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
869
1638
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1639
          UNLOCK_APPEND_BUFFER;
871
1640
          return((info->error= -1));
872
1641
        }
873
1642
        if (!append_cache)
874
1643
          info->seek_not_done=0;
875
1644
      }
876
1645
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1646
        info->pos_in_file+=length;
878
1647
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1648
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1649
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1650
      if (my_write(info->file,info->write_buffer,length,
882
1651
                   info->myflags | MY_NABP))
883
1652
        info->error= -1;
884
1653
      else
885
1654
        info->error= 0;
886
1655
      if (!append_cache)
887
1656
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1657
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1658
      }
890
1659
      else
891
1660
      {
895
1664
      }
896
1665
 
897
1666
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1667
      UNLOCK_APPEND_BUFFER;
899
1668
      return(info->error);
900
1669
    }
901
1670
  }
906
1675
    info->inited=0;
907
1676
  }
908
1677
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1678
  UNLOCK_APPEND_BUFFER;
910
1679
  return(0);
911
1680
}
912
1681
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1682
/*
 
1683
  Free an IO_CACHE object
 
1684
 
 
1685
  SYNOPSOS
 
1686
    end_io_cache()
 
1687
    info                IO_CACHE Handle to free
 
1688
 
 
1689
  NOTES
 
1690
    It's currently safe to call this if one has called init_io_cache()
 
1691
    on the 'info' object, even if init_io_cache() failed.
 
1692
    This function is also safe to call twice with the same handle.
 
1693
 
 
1694
  RETURN
 
1695
   0  ok
 
1696
   #  Error
 
1697
*/
 
1698
 
 
1699
int end_io_cache(IO_CACHE *info)
928
1700
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1701
  int error=0;
 
1702
  IO_CACHE_CALLBACK pre_close;
 
1703
 
 
1704
  /*
 
1705
    Every thread must call remove_io_thread(). The last one destroys
 
1706
    the share elements.
 
1707
  */
 
1708
  assert(!info->share || !info->share->total_threads);
 
1709
 
 
1710
  if ((pre_close=info->pre_close))
 
1711
  {
 
1712
    (*pre_close)(info);
 
1713
    info->pre_close= 0;
 
1714
  }
 
1715
  if (info->alloced_buffer)
 
1716
  {
 
1717
    info->alloced_buffer=0;
 
1718
    if (info->file != -1)                       /* File doesn't exist */
 
1719
      error= my_b_flush_io_cache(info,1);
 
1720
    free((unsigned char*) info->buffer);
 
1721
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1722
  }
 
1723
  if (info->type == SEQ_READ_APPEND)
 
1724
  {
 
1725
    /* Destroy allocated mutex */
 
1726
    info->type= TYPE_NOT_SET;
 
1727
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1728
  }
 
1729
  return(error);
948
1730
} /* end_io_cache */
949
1731
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1732
 
 
1733
/**********************************************************************
 
1734
 Testing of MF_IOCACHE
 
1735
**********************************************************************/
 
1736
 
 
1737
#ifdef MAIN
 
1738
 
 
1739
#include <my_dir.h>
 
1740
 
 
1741
void die(const char* fmt, ...)
 
1742
{
 
1743
  va_list va_args;
 
1744
  va_start(va_args,fmt);
 
1745
  fprintf(stderr,"Error:");
 
1746
  vfprintf(stderr, fmt,va_args);
 
1747
  fprintf(stderr,", errno=%d\n", errno);
 
1748
  exit(1);
 
1749
}
 
1750
 
 
1751
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1752
{
 
1753
  int fd;
 
1754
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1755
    die("Could not open %s", fname);
 
1756
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1757
    die("failed in init_io_cache()");
 
1758
  return fd;
 
1759
}
 
1760
 
 
1761
void close_file(IO_CACHE* info)
 
1762
{
 
1763
  end_io_cache(info);
 
1764
  my_close(info->file, MYF(MY_WME));
 
1765
}
 
1766
 
 
1767
int main(int argc, char** argv)
 
1768
{
 
1769
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1770
  MY_STAT status;
 
1771
  const char* fname="/tmp/iocache.test";
 
1772
  int cache_size=16384;
 
1773
  char llstr_buf[22];
 
1774
  int max_block,total_bytes=0;
 
1775
  int i,num_loops=100,error=0;
 
1776
  char *p;
 
1777
  char* block, *block_end;
 
1778
  MY_INIT(argv[0]);
 
1779
  max_block = cache_size*3;
 
1780
  if (!(block=(char*)malloc(max_block)))
 
1781
    die("Not enough memory to allocate test block");
 
1782
  block_end = block + max_block;
 
1783
  for (p = block,i=0; p < block_end;i++)
 
1784
  {
 
1785
    *p++ = (char)i;
 
1786
  }
 
1787
  if (my_stat(fname,&status, MYF(0)) &&
 
1788
      my_delete(fname,MYF(MY_WME)))
 
1789
    {
 
1790
      die("Delete of %s failed, aborting", fname);
 
1791
    }
 
1792
  open_file(fname,&sra_cache, cache_size);
 
1793
  for (i = 0; i < num_loops; i++)
 
1794
  {
 
1795
    char buf[4];
 
1796
    int block_size = abs(rand() % max_block);
 
1797
    int4store(buf, block_size);
 
1798
    if (my_b_append(&sra_cache,buf,4) ||
 
1799
        my_b_append(&sra_cache, block, block_size))
 
1800
      die("write failed");
 
1801
    total_bytes += 4+block_size;
 
1802
  }
 
1803
  close_file(&sra_cache);
 
1804
  free(block);
 
1805
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1806
    die("%s failed to stat, but I had just closed it,\
 
1807
 wonder how that happened");
 
1808
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1809
         llstr(status.st_size,llstr_buf),
 
1810
         total_bytes);
 
1811
  my_delete(fname, MYF(MY_WME));
 
1812
  /* check correctness of tests */
 
1813
  if (total_bytes != status.st_size)
 
1814
  {
 
1815
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1816
supposedly written\n");
 
1817
    error=1;
 
1818
  }
 
1819
  exit(error);
 
1820
  return 0;
 
1821
}
 
1822
#endif