~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

Merge/fix in FAQ.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include <config.h>
51
 
 
52
 
#include <drizzled/internal/my_sys.h>
53
 
#include <drizzled/internal/m_string.h>
54
 
#include <drizzled/drizzled.h>
 
50
#define MAP_TO_USE_RAID
 
51
#include "mysys_priv.h"
 
52
#include <m_string.h>
55
53
#ifdef HAVE_AIOWAIT
56
 
#include <drizzled/error.h>
57
 
#include <drizzled/internal/aio_result.h>
 
54
#include "mysys_err.h"
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include <drizzled/internal/iocache.h>
61
57
#include <errno.h>
62
 
#include <drizzled/util/test.h>
63
 
#include <stdlib.h>
64
 
#include <algorithm>
65
 
 
66
 
using namespace std;
67
 
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
58
 
 
59
#define lock_append_buffer(info) \
 
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
61
#define unlock_append_buffer(info) \
 
62
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
63
 
 
64
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
65
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
66
 
 
67
/*
 
68
  Setup internal pointers inside IO_CACHE
 
69
 
 
70
  SYNOPSIS
 
71
    setup_io_cache()
 
72
    info                IO_CACHE handler
 
73
 
 
74
  NOTES
 
75
    This is called on automaticly on init or reinit of IO_CACHE
 
76
    It must be called externally if one moves or copies an IO_CACHE
 
77
    object.
 
78
*/
 
79
 
 
80
void setup_io_cache(IO_CACHE* info)
126
81
{
127
82
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
83
  if (info->type == WRITE_CACHE)
129
84
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
85
    info->current_pos= &info->write_pos;
 
86
    info->current_end= &info->write_end;
132
87
  }
133
88
  else
134
89
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
90
    info->current_pos= &info->read_pos;
 
91
    info->current_end= &info->read_end;
137
92
  }
138
93
}
139
94
 
140
95
 
141
 
void st_io_cache::init_functions()
 
96
static void
 
97
init_functions(IO_CACHE* info)
142
98
{
 
99
  enum cache_type type= info->type;
143
100
  switch (type) {
144
101
  case READ_NET:
145
102
    /*
150
107
      as myisamchk
151
108
    */
152
109
    break;
 
110
  case SEQ_READ_APPEND:
 
111
    info->read_function = _my_b_seq_read;
 
112
    info->write_function = 0;                   /* Force a core if used */
 
113
    break;
153
114
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
115
    info->read_function =
 
116
                          info->share ? _my_b_read_r :
 
117
                                        _my_b_read;
 
118
    info->write_function = _my_b_write;
156
119
  }
157
120
 
158
 
  setup_io_cache();
 
121
  setup_io_cache(info);
159
122
}
160
123
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
124
 
 
125
/*
 
126
  Initialize an IO_CACHE object
 
127
 
 
128
  SYNOPSOS
 
129
    init_io_cache()
 
130
    info                cache handler to initialize
 
131
    file                File that should be associated to to the handler
 
132
                        If == -1 then real_open_cached_file()
 
133
                        will be called when it's time to open file.
 
134
    cachesize           Size of buffer to allocate for read/write
 
135
                        If == 0 then use my_default_record_cache_size
 
136
    type                Type of cache
 
137
    seek_offset         Where cache should start reading/writing
 
138
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
139
    cache_myflags       Bitmap of differnt flags
 
140
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
141
                        MY_DONT_CHECK_FILESIZE
 
142
 
 
143
  RETURN
 
144
    0  ok
 
145
    #  error
 
146
*/
 
147
 
 
148
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
149
                  enum cache_type type, my_off_t seek_offset,
 
150
                  bool use_async_io, myf cache_myflags)
181
151
{
182
152
  size_t min_cache;
183
 
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
153
  my_off_t pos;
 
154
  my_off_t end_of_file= ~(my_off_t) 0;
185
155
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
156
  info->file= file;
 
157
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
158
  info->pos_in_file= seek_offset;
 
159
  info->pre_close = info->pre_read = info->post_read = 0;
 
160
  info->arg = 0;
 
161
  info->alloced_buffer = 0;
 
162
  info->buffer=0;
 
163
  info->seek_not_done= 0;
194
164
 
195
165
  if (file >= 0)
196
166
  {
197
 
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
167
    pos= my_tell(file, MYF(0));
 
168
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
199
169
    {
200
170
      /*
201
171
         This kind of object doesn't support seek() or tell(). Don't set a
202
172
         flag that will make us again try to seek() later and fail.
203
173
      */
204
 
      seek_not_done= 0;
 
174
      info->seek_not_done= 0;
205
175
      /*
206
176
        Additionally, if we're supposed to start somewhere other than the
207
177
        the beginning of whatever this file is, then somebody made a bad
210
180
      assert(seek_offset == 0);
211
181
    }
212
182
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
183
      info->seek_not_done= test(seek_offset != pos);
214
184
  }
215
185
 
 
186
  info->disk_writes= 0;
 
187
  info->share=0;
 
188
 
216
189
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
190
    return(1);                          /* No cache requested */
218
191
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
192
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
193
  {                                             /* Assume file isn't growing */
221
194
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
195
    {
223
196
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
197
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
225
198
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
199
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
200
      if (end_of_file < seek_offset)
 
201
        end_of_file=seek_offset;
229
202
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
203
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
204
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
205
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
206
        use_async_io=0;                         /* No need to use async */
234
207
      }
235
208
    }
236
209
  }
237
210
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
211
  if (type != READ_NET && type != WRITE_NET)
239
212
  {
240
213
    /* Retry allocating memory in smaller blocks until we get one */
241
214
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
218
      if (cachesize < min_cache)
246
219
        cachesize = min_cache;
247
220
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
255
 
           (unsigned char*) malloc(buffer_block)) != 0)
256
 
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
221
      if (type == SEQ_READ_APPEND)
 
222
        buffer_block *= 2;
 
223
      if ((info->buffer=
 
224
           (uchar*) my_malloc(buffer_block,
 
225
                             MYF((cache_myflags & ~ MY_WME) |
 
226
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
227
      {
 
228
        info->write_buffer=info->buffer;
 
229
        if (type == SEQ_READ_APPEND)
 
230
          info->write_buffer = info->buffer + cachesize;
 
231
        info->alloced_buffer=1;
259
232
        break;                                  /* Enough memory found */
260
233
      }
261
234
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
235
        return(2);                              /* Can't alloc cache */
267
236
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
237
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
238
    }
272
239
  }
273
240
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
241
  info->read_length=info->buffer_length=cachesize;
 
242
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
243
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
244
  if (type == SEQ_READ_APPEND)
 
245
  {
 
246
    info->append_read_pos = info->write_pos = info->write_buffer;
 
247
    info->write_end = info->write_buffer + info->buffer_length;
 
248
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
249
  }
 
250
#if defined(SAFE_MUTEX)
 
251
  else
 
252
  {
 
253
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
254
    bzero((char*) &info->append_buffer_lock, sizeof(info));
 
255
  }
 
256
#endif
277
257
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
258
  if (type == WRITE_CACHE)
 
259
    info->write_end=
 
260
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
261
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
262
    info->read_end=info->buffer;                /* Nothing in cache */
283
263
 
284
264
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
265
  info->end_of_file= end_of_file;
 
266
  info->error=0;
 
267
  info->type= type;
 
268
  init_functions(info);
289
269
#ifdef HAVE_AIOWAIT
290
270
  if (use_async_io && ! my_disable_async_io)
291
271
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
272
    info->read_length/=2;
 
273
    info->read_function=_my_b_async_read;
294
274
  }
295
 
  inited= aio_result.pending= 0;
 
275
  info->inited=info->aio_result.pending=0;
296
276
#endif
297
 
  return 0;
 
277
  return(0);
298
278
}                                               /* init_io_cache */
299
279
 
300
280
        /* Wait until current request is ready */
319
299
        break;
320
300
    }
321
301
  }
 
302
  return;
322
303
}
323
304
#endif
324
305
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
306
 
 
307
/*
 
308
  Use this to reset cache to re-start reading or to change the type
 
309
  between READ_CACHE <-> WRITE_CACHE
 
310
  If we are doing a reinit of a cache where we have the start of the file
 
311
  in the cache, we are reusing this memory without flushing it to disk.
 
312
*/
 
313
 
 
314
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
315
                        my_off_t seek_offset,
 
316
                        bool use_async_io __attribute__((unused)),
 
317
                        bool clear_cache)
339
318
{
340
319
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
320
  assert(type != READ_NET && info->type != READ_NET &&
 
321
              type != WRITE_NET && info->type != WRITE_NET &&
 
322
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
323
 
344
324
  /* If the whole file is in memory, avoid flushing to disk */
345
325
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
326
      seek_offset >= info->pos_in_file &&
 
327
      seek_offset <= my_b_tell(info))
348
328
  {
349
329
    /* Reuse current buffer without flushing it to disk */
350
 
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
330
    uchar *pos;
 
331
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
332
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
333
      info->read_end=info->write_pos;
 
334
      info->end_of_file=my_b_tell(info);
355
335
      /*
356
336
        Trigger a new seek only if we have a valid
357
337
        file handle.
358
338
      */
359
 
      seek_not_done= (file != -1);
 
339
      info->seek_not_done= (info->file != -1);
360
340
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
341
    else if (type == WRITE_CACHE)
362
342
    {
363
 
      if (type == READ_CACHE)
 
343
      if (info->type == READ_CACHE)
364
344
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
345
        info->write_end=info->write_buffer+info->buffer_length;
 
346
        info->seek_not_done=1;
367
347
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
348
      info->end_of_file = ~(my_off_t) 0;
369
349
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
350
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
351
    if (type == WRITE_CACHE)
 
352
      info->write_pos=pos;
373
353
    else
374
 
      read_pos= pos;
 
354
      info->read_pos= pos;
375
355
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
356
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
357
#endif
378
358
  }
379
359
  else
382
362
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
363
      after the current positions should be ignored
384
364
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
365
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
366
      info->end_of_file=my_b_tell(info);
387
367
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
368
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
369
      return(1);
 
370
    info->pos_in_file=seek_offset;
391
371
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
372
    info->seek_not_done=1;
 
373
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
374
    if (type == READ_CACHE)
395
375
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
376
      info->read_end=info->buffer;              /* Nothing in cache */
397
377
    }
398
378
    else
399
379
    {
400
 
      write_end=(buffer + buffer_length -
 
380
      info->write_end=(info->buffer + info->buffer_length -
401
381
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
382
      info->end_of_file= ~(my_off_t) 0;
403
383
    }
404
384
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
385
  info->type=type;
 
386
  info->error=0;
 
387
  init_functions(info);
408
388
 
409
389
#ifdef HAVE_AIOWAIT
410
390
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
391
      ((ulong) info->buffer_length <
 
392
       (ulong) (info->end_of_file - seek_offset)))
413
393
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
394
    info->read_length=info->buffer_length/2;
 
395
    info->read_function=_my_b_async_read;
416
396
  }
417
 
  inited= 0;
418
 
#else
419
 
  (void)use_async_io;
 
397
  info->inited=0;
420
398
#endif
421
 
  return 0;
 
399
  return(0);
422
400
} /* reinit_io_cache */
423
401
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
402
 
 
403
 
 
404
/*
 
405
  Read buffered.
 
406
 
 
407
  SYNOPSIS
 
408
    _my_b_read()
 
409
      info                      IO_CACHE pointer
 
410
      Buffer                    Buffer to retrieve count bytes from file
 
411
      Count                     Number of bytes to read into Buffer
 
412
 
 
413
  NOTE
 
414
    This function is only called from the my_b_read() macro when there
 
415
    isn't enough characters in the buffer to satisfy the request.
 
416
 
 
417
  WARNING
 
418
 
 
419
    When changing this function, be careful with handling file offsets
 
420
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
421
    types than my_off_t unless you can be sure that their value fits.
 
422
    Same applies to differences of file offsets.
 
423
 
 
424
    When changing this function, check _my_b_read_r(). It might need the
 
425
    same change.
 
426
 
 
427
  RETURN
 
428
    0      we succeeded in reading all data
 
429
    1      Error: can't read requested characters
 
430
*/
 
431
 
 
432
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
445
433
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
434
  size_t length,diff_length,left_length, max_length;
 
435
  my_off_t pos_in_file;
448
436
 
449
437
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
438
  {
455
443
  }
456
444
 
457
445
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
446
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
447
 
460
 
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
448
  /* 
 
449
    Whenever a function which operates on IO_CACHE flushes/writes
 
450
    some part of the IO_CACHE to disk it will set the property
463
451
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
452
    on the IO_CACHE.
465
453
  */
466
454
  if (info->seek_not_done)
467
455
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
456
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
457
        != MY_FILEPOS_ERROR))
469
458
    {
470
459
      /* No error, reset seek_not_done flag. */
471
460
      info->seek_not_done= 0;
477
466
        info->file is a pipe or socket or FIFO.  We never should have tried
478
467
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
468
      */
480
 
      assert(errno != ESPIPE);
 
469
      assert(my_errno != ESPIPE);
481
470
      info->error= -1;
482
471
      return(1);
483
472
    }
484
473
  }
485
474
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
475
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
476
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
477
  {                                     /* Fill first intern buffer */
489
478
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
479
    if (info->end_of_file <= pos_in_file)
491
480
    {                                   /* End of file */
492
481
      info->error= (int) left_length;
493
482
      return(1);
494
483
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
484
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
485
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
486
        != length)
497
487
    {
498
488
      info->error= (read_length == (size_t) -1 ? -1 :
499
489
                    (int) (read_length+left_length));
500
490
      return(1);
501
491
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
492
    Count-=length;
 
493
    Buffer+=length;
 
494
    pos_in_file+=length;
 
495
    left_length+=length;
506
496
    diff_length=0;
507
497
  }
508
498
 
509
499
  max_length= info->read_length-diff_length;
510
500
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
501
      max_length > (info->end_of_file - pos_in_file))
 
502
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
503
  if (!max_length)
514
504
  {
515
505
    if (Count)
516
506
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
507
      info->error= left_length;         /* We only got this many char */
518
508
      return(1);
519
509
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
510
    length=0;                           /* Didn't read any chars */
521
511
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
512
  else if ((length= my_read(info->file,info->buffer, max_length,
523
513
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
514
           length == (size_t) -1)
525
515
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
516
    if (length != (size_t) -1)
 
517
      memcpy(Buffer, info->buffer, length);
 
518
    info->pos_in_file= pos_in_file;
 
519
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
520
    info->read_pos=info->read_end=info->buffer;
531
521
    return(1);
532
522
  }
533
523
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
524
  info->read_end=info->buffer+length;
 
525
  info->pos_in_file=pos_in_file;
536
526
  memcpy(Buffer, info->buffer, Count);
537
527
  return(0);
538
528
}
539
529
 
540
530
 
 
531
/*
 
532
  Prepare IO_CACHE for shared use.
 
533
 
 
534
  SYNOPSIS
 
535
    init_io_cache_share()
 
536
      read_cache                A read cache. This will be copied for
 
537
                                every thread after setup.
 
538
      cshare                    The share.
 
539
      write_cache               If non-NULL a write cache that is to be
 
540
                                synchronized with the read caches.
 
541
      num_threads               Number of threads sharing the cache
 
542
                                including the write thread if any.
 
543
 
 
544
  DESCRIPTION
 
545
 
 
546
    The shared cache is used so: One IO_CACHE is initialized with
 
547
    init_io_cache(). This includes the allocation of a buffer. Then a
 
548
    share is allocated and init_io_cache_share() is called with the io
 
549
    cache and the share. Then the io cache is copied for each thread. So
 
550
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
551
    is shared because cache->buffer is the same for all caches.
 
552
 
 
553
    One thread reads data from the file into the buffer. All threads
 
554
    read from the buffer, but every thread maintains its own set of
 
555
    pointers into the buffer. When all threads have used up the buffer
 
556
    contents, one of the threads reads the next block of data into the
 
557
    buffer. To accomplish this, each thread enters the cache lock before
 
558
    accessing the buffer. They wait in lock_io_cache() until all threads
 
559
    joined the lock. The last thread entering the lock is in charge of
 
560
    reading from file to buffer. It wakes all threads when done.
 
561
 
 
562
    Synchronizing a write cache to the read caches works so: Whenever
 
563
    the write buffer needs a flush, the write thread enters the lock and
 
564
    waits for all other threads to enter the lock too. They do this when
 
565
    they have used up the read buffer. When all threads are in the lock,
 
566
    the write thread copies the write buffer to the read buffer and
 
567
    wakes all threads.
 
568
 
 
569
    share->running_threads is the number of threads not being in the
 
570
    cache lock. When entering lock_io_cache() the number is decreased.
 
571
    When the thread that fills the buffer enters unlock_io_cache() the
 
572
    number is reset to the number of threads. The condition
 
573
    running_threads == 0 means that all threads are in the lock. Bumping
 
574
    up the number to the full count is non-intuitive. But increasing the
 
575
    number by one for each thread that leaves the lock could lead to a
 
576
    solo run of one thread. The last thread to join a lock reads from
 
577
    file to buffer, wakes the other threads, processes the data in the
 
578
    cache and enters the lock again. If no other thread left the lock
 
579
    meanwhile, it would think it's the last one again and read the next
 
580
    block...
 
581
 
 
582
    The share has copies of 'error', 'buffer', 'read_end', and
 
583
    'pos_in_file' from the thread that filled the buffer. We may not be
 
584
    able to access this information directly from its cache because the
 
585
    thread may be removed from the share before the variables could be
 
586
    copied by all other threads. Or, if a write buffer is synchronized,
 
587
    it would change its 'pos_in_file' after waking the other threads,
 
588
    possibly before they could copy its value.
 
589
 
 
590
    However, the 'buffer' variable in the share is for a synchronized
 
591
    write cache. It needs to know where to put the data. Otherwise it
 
592
    would need access to the read cache of one of the threads that is
 
593
    not yet removed from the share.
 
594
 
 
595
  RETURN
 
596
    void
 
597
*/
 
598
 
 
599
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
600
                         IO_CACHE *write_cache, uint num_threads)
 
601
{
 
602
  assert(num_threads > 1);
 
603
  assert(read_cache->type == READ_CACHE);
 
604
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
605
 
 
606
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
607
  pthread_cond_init(&cshare->cond, 0);
 
608
  pthread_cond_init(&cshare->cond_writer, 0);
 
609
 
 
610
  cshare->running_threads= num_threads;
 
611
  cshare->total_threads=   num_threads;
 
612
  cshare->error=           0;    /* Initialize. */
 
613
  cshare->buffer=          read_cache->buffer;
 
614
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
615
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
616
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
617
 
 
618
  read_cache->share=         cshare;
 
619
  read_cache->read_function= _my_b_read_r;
 
620
  read_cache->current_pos=   NULL;
 
621
  read_cache->current_end=   NULL;
 
622
 
 
623
  if (write_cache)
 
624
    write_cache->share= cshare;
 
625
 
 
626
  return;
 
627
}
 
628
 
 
629
 
 
630
/*
 
631
  Remove a thread from shared access to IO_CACHE.
 
632
 
 
633
  SYNOPSIS
 
634
    remove_io_thread()
 
635
      cache                     The IO_CACHE to be removed from the share.
 
636
 
 
637
  NOTE
 
638
 
 
639
    Every thread must do that on exit for not to deadlock other threads.
 
640
 
 
641
    The last thread destroys the pthread resources.
 
642
 
 
643
    A writer flushes its cache first.
 
644
 
 
645
  RETURN
 
646
    void
 
647
*/
 
648
 
 
649
void remove_io_thread(IO_CACHE *cache)
 
650
{
 
651
  IO_CACHE_SHARE *cshare= cache->share;
 
652
  uint total;
 
653
 
 
654
  /* If the writer goes, it needs to flush the write cache. */
 
655
  if (cache == cshare->source_cache)
 
656
    flush_io_cache(cache);
 
657
 
 
658
  pthread_mutex_lock(&cshare->mutex);
 
659
 
 
660
  /* Remove from share. */
 
661
  total= --cshare->total_threads;
 
662
 
 
663
  /* Detach from share. */
 
664
  cache->share= NULL;
 
665
 
 
666
  /* If the writer goes, let the readers know. */
 
667
  if (cache == cshare->source_cache)
 
668
  {
 
669
    cshare->source_cache= NULL;
 
670
  }
 
671
 
 
672
  /* If all threads are waiting for me to join the lock, wake them. */
 
673
  if (!--cshare->running_threads)
 
674
  {
 
675
    pthread_cond_signal(&cshare->cond_writer);
 
676
    pthread_cond_broadcast(&cshare->cond);
 
677
  }
 
678
 
 
679
  pthread_mutex_unlock(&cshare->mutex);
 
680
 
 
681
  if (!total)
 
682
  {
 
683
    pthread_cond_destroy (&cshare->cond_writer);
 
684
    pthread_cond_destroy (&cshare->cond);
 
685
    pthread_mutex_destroy(&cshare->mutex);
 
686
  }
 
687
 
 
688
  return;
 
689
}
 
690
 
 
691
 
 
692
/*
 
693
  Lock IO cache and wait for all other threads to join.
 
694
 
 
695
  SYNOPSIS
 
696
    lock_io_cache()
 
697
      cache                     The cache of the thread entering the lock.
 
698
      pos                       File position of the block to read.
 
699
                                Unused for the write thread.
 
700
 
 
701
  DESCRIPTION
 
702
 
 
703
    Wait for all threads to finish with the current buffer. We want
 
704
    all threads to proceed in concert. The last thread to join
 
705
    lock_io_cache() will read the block from file and all threads start
 
706
    to use it. Then they will join again for reading the next block.
 
707
 
 
708
    The waiting threads detect a fresh buffer by comparing
 
709
    cshare->pos_in_file with the position they want to process next.
 
710
    Since the first block may start at position 0, we take
 
711
    cshare->read_end as an additional condition. This variable is
 
712
    initialized to NULL and will be set after a block of data is written
 
713
    to the buffer.
 
714
 
 
715
  RETURN
 
716
    1           OK, lock in place, go ahead and read.
 
717
    0           OK, unlocked, another thread did the read.
 
718
*/
 
719
 
 
720
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
721
{
 
722
  IO_CACHE_SHARE *cshare= cache->share;
 
723
 
 
724
  /* Enter the lock. */
 
725
  pthread_mutex_lock(&cshare->mutex);
 
726
  cshare->running_threads--;
 
727
 
 
728
  if (cshare->source_cache)
 
729
  {
 
730
    /* A write cache is synchronized to the read caches. */
 
731
 
 
732
    if (cache == cshare->source_cache)
 
733
    {
 
734
      /* The writer waits until all readers are here. */
 
735
      while (cshare->running_threads)
 
736
      {
 
737
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
738
      }
 
739
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
740
      return(1);
 
741
    }
 
742
 
 
743
    /* The last thread wakes the writer. */
 
744
    if (!cshare->running_threads)
 
745
    {
 
746
      pthread_cond_signal(&cshare->cond_writer);
 
747
    }
 
748
 
 
749
    /*
 
750
      Readers wait until the data is copied from the writer. Another
 
751
      reason to stop waiting is the removal of the write thread. If this
 
752
      happens, we leave the lock with old data in the buffer.
 
753
    */
 
754
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
755
           cshare->source_cache)
 
756
    {
 
757
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
758
    }
 
759
 
 
760
    /*
 
761
      If the writer was removed from the share while this thread was
 
762
      asleep, we need to simulate an EOF condition. The writer cannot
 
763
      reset the share variables as they might still be in use by readers
 
764
      of the last block. When we awake here then because the last
 
765
      joining thread signalled us. If the writer is not the last, it
 
766
      will not signal. So it is safe to clear the buffer here.
 
767
    */
 
768
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
769
    {
 
770
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
771
      cshare->error= 0; /* EOF is not an error. */
 
772
    }
 
773
  }
 
774
  else
 
775
  {
 
776
    /*
 
777
      There are read caches only. The last thread arriving in
 
778
      lock_io_cache() continues with a locked cache and reads the block.
 
779
    */
 
780
    if (!cshare->running_threads)
 
781
    {
 
782
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
783
      return(1);
 
784
    }
 
785
 
 
786
    /*
 
787
      All other threads wait until the requested block is read by the
 
788
      last thread arriving. Another reason to stop waiting is the
 
789
      removal of a thread. If this leads to all threads being in the
 
790
      lock, we have to continue also. The first of the awaken threads
 
791
      will then do the read.
 
792
    */
 
793
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
794
           cshare->running_threads)
 
795
    {
 
796
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
797
    }
 
798
 
 
799
    /* If the block is not yet read, continue with a locked cache and read. */
 
800
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
801
    {
 
802
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
803
      return(1);
 
804
    }
 
805
 
 
806
    /* Another thread did read the block already. */
 
807
  }
 
808
 
 
809
  /*
 
810
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
811
    filled the buffer did this and marked all threads as running.
 
812
  */
 
813
  pthread_mutex_unlock(&cshare->mutex);
 
814
  return(0);
 
815
}
 
816
 
 
817
 
 
818
/*
 
819
  Unlock IO cache.
 
820
 
 
821
  SYNOPSIS
 
822
    unlock_io_cache()
 
823
      cache                     The cache of the thread leaving the lock.
 
824
 
 
825
  NOTE
 
826
    This is called by the thread that filled the buffer. It marks all
 
827
    threads as running and awakes them. This must not be done by any
 
828
    other thread.
 
829
 
 
830
    Do not signal cond_writer. Either there is no writer or the writer
 
831
    is the only one who can call this function.
 
832
 
 
833
    The reason for resetting running_threads to total_threads before
 
834
    waking all other threads is that it could be possible that this
 
835
    thread is so fast with processing the buffer that it enters the lock
 
836
    before even one other thread has left it. If every awoken thread
 
837
    would increase running_threads by one, this thread could think that
 
838
    he is again the last to join and would not wait for the other
 
839
    threads to process the data.
 
840
 
 
841
  RETURN
 
842
    void
 
843
*/
 
844
 
 
845
static void unlock_io_cache(IO_CACHE *cache)
 
846
{
 
847
  IO_CACHE_SHARE *cshare= cache->share;
 
848
 
 
849
  cshare->running_threads= cshare->total_threads;
 
850
  pthread_cond_broadcast(&cshare->cond);
 
851
  pthread_mutex_unlock(&cshare->mutex);
 
852
  return;
 
853
}
 
854
 
 
855
 
 
856
/*
 
857
  Read from IO_CACHE when it is shared between several threads.
 
858
 
 
859
  SYNOPSIS
 
860
    _my_b_read_r()
 
861
      cache                     IO_CACHE pointer
 
862
      Buffer                    Buffer to retrieve count bytes from file
 
863
      Count                     Number of bytes to read into Buffer
 
864
 
 
865
  NOTE
 
866
    This function is only called from the my_b_read() macro when there
 
867
    isn't enough characters in the buffer to satisfy the request.
 
868
 
 
869
  IMPLEMENTATION
 
870
 
 
871
    It works as follows: when a thread tries to read from a file (that
 
872
    is, after using all the data from the (shared) buffer), it just
 
873
    hangs on lock_io_cache(), waiting for other threads. When the very
 
874
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
875
    does actual IO and unlock_io_cache(), which signals all the waiting
 
876
    threads that data is in the buffer.
 
877
 
 
878
  WARNING
 
879
 
 
880
    When changing this function, be careful with handling file offsets
 
881
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
882
    types than my_off_t unless you can be sure that their value fits.
 
883
    Same applies to differences of file offsets. (Bug #11527)
 
884
 
 
885
    When changing this function, check _my_b_read(). It might need the
 
886
    same change.
 
887
 
 
888
  RETURN
 
889
    0      we succeeded in reading all data
 
890
    1      Error: can't read requested characters
 
891
*/
 
892
 
 
893
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
894
{
 
895
  my_off_t pos_in_file;
 
896
  size_t length, diff_length, left_length;
 
897
  IO_CACHE_SHARE *cshare= cache->share;
 
898
 
 
899
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
900
  {
 
901
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
902
    memcpy(Buffer, cache->read_pos, left_length);
 
903
    Buffer+= left_length;
 
904
    Count-= left_length;
 
905
  }
 
906
  while (Count)
 
907
  {
 
908
    size_t cnt, len;
 
909
 
 
910
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
911
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
912
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
913
    length= ((length <= cache->read_length) ?
 
914
             length + IO_ROUND_DN(cache->read_length - length) :
 
915
             length - IO_ROUND_UP(length - cache->read_length));
 
916
    if (cache->type != READ_FIFO &&
 
917
        (length > (cache->end_of_file - pos_in_file)))
 
918
      length= (size_t) (cache->end_of_file - pos_in_file);
 
919
    if (length == 0)
 
920
    {
 
921
      cache->error= (int) left_length;
 
922
      return(1);
 
923
    }
 
924
    if (lock_io_cache(cache, pos_in_file))
 
925
    {
 
926
      /* With a synchronized write/read cache we won't come here... */
 
927
      assert(!cshare->source_cache);
 
928
      /*
 
929
        ... unless the writer has gone before this thread entered the
 
930
        lock. Simulate EOF in this case. It can be distinguished by
 
931
        cache->file.
 
932
      */
 
933
      if (cache->file < 0)
 
934
        len= 0;
 
935
      else
 
936
      {
 
937
        /*
 
938
          Whenever a function which operates on IO_CACHE flushes/writes
 
939
          some part of the IO_CACHE to disk it will set the property
 
940
          "seek_not_done" to indicate this to other functions operating
 
941
          on the IO_CACHE.
 
942
        */
 
943
        if (cache->seek_not_done)
 
944
        {
 
945
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
946
              == MY_FILEPOS_ERROR)
 
947
          {
 
948
            cache->error= -1;
 
949
            unlock_io_cache(cache);
 
950
            return(1);
 
951
          }
 
952
        }
 
953
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
954
      }
 
955
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
956
      cache->error=       (len == length ? 0 : (int) len);
 
957
      cache->pos_in_file= pos_in_file;
 
958
 
 
959
      /* Copy important values to the share. */
 
960
      cshare->error=       cache->error;
 
961
      cshare->read_end=    cache->read_end;
 
962
      cshare->pos_in_file= pos_in_file;
 
963
 
 
964
      /* Mark all threads as running and wake them. */
 
965
      unlock_io_cache(cache);
 
966
    }
 
967
    else
 
968
    {
 
969
      /*
 
970
        With a synchronized write/read cache readers always come here.
 
971
        Copy important values from the share.
 
972
      */
 
973
      cache->error=       cshare->error;
 
974
      cache->read_end=    cshare->read_end;
 
975
      cache->pos_in_file= cshare->pos_in_file;
 
976
 
 
977
      len= ((cache->error == -1) ? (size_t) -1 :
 
978
            (size_t) (cache->read_end - cache->buffer));
 
979
    }
 
980
    cache->read_pos=      cache->buffer;
 
981
    cache->seek_not_done= 0;
 
982
    if (len == 0 || len == (size_t) -1)
 
983
    {
 
984
      cache->error= (int) left_length;
 
985
      return(1);
 
986
    }
 
987
    cnt= (len > Count) ? Count : len;
 
988
    memcpy(Buffer, cache->read_pos, cnt);
 
989
    Count -= cnt;
 
990
    Buffer+= cnt;
 
991
    left_length+= cnt;
 
992
    cache->read_pos+= cnt;
 
993
  }
 
994
  return(0);
 
995
}
 
996
 
 
997
 
 
998
/*
 
999
  Copy data from write cache to read cache.
 
1000
 
 
1001
  SYNOPSIS
 
1002
    copy_to_read_buffer()
 
1003
      write_cache               The write cache.
 
1004
      write_buffer              The source of data, mostly the cache buffer.
 
1005
      write_length              The number of bytes to copy.
 
1006
 
 
1007
  NOTE
 
1008
    The write thread will wait for all read threads to join the cache
 
1009
    lock. Then it copies the data over and wakes the read threads.
 
1010
 
 
1011
  RETURN
 
1012
    void
 
1013
*/
 
1014
 
 
1015
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1016
                                const uchar *write_buffer, size_t write_length)
 
1017
{
 
1018
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1019
 
 
1020
  assert(cshare->source_cache == write_cache);
 
1021
  /*
 
1022
    write_length is usually less or equal to buffer_length.
 
1023
    It can be bigger if _my_b_write() is called with a big length.
 
1024
  */
 
1025
  while (write_length)
 
1026
  {
 
1027
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1028
    int  __attribute__((unused)) rc;
 
1029
 
 
1030
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1031
    /* The writing thread does always have the lock when it awakes. */
 
1032
    assert(rc);
 
1033
 
 
1034
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1035
 
 
1036
    cshare->error=       0;
 
1037
    cshare->read_end=    cshare->buffer + copy_length;
 
1038
    cshare->pos_in_file= write_cache->pos_in_file;
 
1039
 
 
1040
    /* Mark all threads as running and wake them. */
 
1041
    unlock_io_cache(write_cache);
 
1042
 
 
1043
    write_buffer+= copy_length;
 
1044
    write_length-= copy_length;
 
1045
  }
 
1046
}
 
1047
 
 
1048
 
 
1049
/*
 
1050
  Do sequential read from the SEQ_READ_APPEND cache.
 
1051
  
 
1052
  We do this in three stages:
 
1053
   - first read from info->buffer
 
1054
   - then if there are still data to read, try the file descriptor
 
1055
   - afterwards, if there are still data to read, try append buffer
 
1056
 
 
1057
  RETURNS
 
1058
    0  Success
 
1059
    1  Failed to read
 
1060
*/
 
1061
 
 
1062
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1063
{
 
1064
  size_t length, diff_length, left_length, save_count, max_length;
 
1065
  my_off_t pos_in_file;
 
1066
  save_count=Count;
 
1067
 
 
1068
  /* first, read the regular buffer */
 
1069
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1070
  {
 
1071
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1072
    memcpy(Buffer,info->read_pos, left_length);
 
1073
    Buffer+=left_length;
 
1074
    Count-=left_length;
 
1075
  }
 
1076
  lock_append_buffer(info);
 
1077
 
 
1078
  /* pos_in_file always point on where info->buffer was read */
 
1079
  if ((pos_in_file=info->pos_in_file +
 
1080
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1081
    goto read_append_buffer;
 
1082
 
 
1083
  /*
 
1084
    With read-append cache we must always do a seek before we read,
 
1085
    because the write could have moved the file pointer astray
 
1086
  */
 
1087
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1088
  {
 
1089
   info->error= -1;
 
1090
   unlock_append_buffer(info);
 
1091
   return (1);
 
1092
  }
 
1093
  info->seek_not_done=0;
 
1094
 
 
1095
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1096
 
 
1097
  /* now the second stage begins - read from file descriptor */
 
1098
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1099
  {
 
1100
    /* Fill first intern buffer */
 
1101
    size_t read_length;
 
1102
 
 
1103
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1104
    if ((read_length= my_read(info->file,Buffer, length,
 
1105
                              info->myflags)) == (size_t) -1)
 
1106
    {
 
1107
      info->error= -1;
 
1108
      unlock_append_buffer(info);
 
1109
      return 1;
 
1110
    }
 
1111
    Count-=read_length;
 
1112
    Buffer+=read_length;
 
1113
    pos_in_file+=read_length;
 
1114
 
 
1115
    if (read_length != length)
 
1116
    {
 
1117
      /*
 
1118
        We only got part of data;  Read the rest of the data from the
 
1119
        write buffer
 
1120
      */
 
1121
      goto read_append_buffer;
 
1122
    }
 
1123
    left_length+=length;
 
1124
    diff_length=0;
 
1125
  }
 
1126
 
 
1127
  max_length= info->read_length-diff_length;
 
1128
  if (max_length > (info->end_of_file - pos_in_file))
 
1129
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1130
  if (!max_length)
 
1131
  {
 
1132
    if (Count)
 
1133
      goto read_append_buffer;
 
1134
    length=0;                           /* Didn't read any more chars */
 
1135
  }
 
1136
  else
 
1137
  {
 
1138
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1139
    if (length == (size_t) -1)
 
1140
    {
 
1141
      info->error= -1;
 
1142
      unlock_append_buffer(info);
 
1143
      return 1;
 
1144
    }
 
1145
    if (length < Count)
 
1146
    {
 
1147
      memcpy(Buffer, info->buffer, length);
 
1148
      Count -= length;
 
1149
      Buffer += length;
 
1150
 
 
1151
      /*
 
1152
         added the line below to make
 
1153
         assert(pos_in_file==info->end_of_file) pass.
 
1154
         otherwise this does not appear to be needed
 
1155
      */
 
1156
      pos_in_file += length;
 
1157
      goto read_append_buffer;
 
1158
    }
 
1159
  }
 
1160
  unlock_append_buffer(info);
 
1161
  info->read_pos=info->buffer+Count;
 
1162
  info->read_end=info->buffer+length;
 
1163
  info->pos_in_file=pos_in_file;
 
1164
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1165
  return 0;
 
1166
 
 
1167
read_append_buffer:
 
1168
 
 
1169
  /*
 
1170
     Read data from the current write buffer.
 
1171
     Count should never be == 0 here (The code will work even if count is 0)
 
1172
  */
 
1173
 
 
1174
  {
 
1175
    /* First copy the data to Count */
 
1176
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1177
    size_t copy_len;
 
1178
    size_t transfer_len;
 
1179
 
 
1180
    assert(info->append_read_pos <= info->write_pos);
 
1181
    /*
 
1182
      TODO: figure out if the assert below is needed or correct.
 
1183
    */
 
1184
    assert(pos_in_file == info->end_of_file);
 
1185
    copy_len=min(Count, len_in_buff);
 
1186
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1187
    info->append_read_pos += copy_len;
 
1188
    Count -= copy_len;
 
1189
    if (Count)
 
1190
      info->error = save_count - Count;
 
1191
 
 
1192
    /* Fill read buffer with data from write buffer */
 
1193
    memcpy(info->buffer, info->append_read_pos,
 
1194
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1195
    info->read_pos= info->buffer;
 
1196
    info->read_end= info->buffer+transfer_len;
 
1197
    info->append_read_pos=info->write_pos;
 
1198
    info->pos_in_file=pos_in_file+copy_len;
 
1199
    info->end_of_file+=len_in_buff;
 
1200
  }
 
1201
  unlock_append_buffer(info);
 
1202
  return Count ? 1 : 0;
 
1203
}
 
1204
 
 
1205
 
541
1206
#ifdef HAVE_AIOWAIT
542
1207
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1208
/*
 
1209
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1210
  from disk when needed.
 
1211
 
 
1212
  SYNOPSIS
 
1213
    _my_b_async_read()
 
1214
      info                      IO_CACHE pointer
 
1215
      Buffer                    Buffer to retrieve count bytes from file
 
1216
      Count                     Number of bytes to read into Buffer
 
1217
 
 
1218
  RETURN VALUE
 
1219
    -1          An error has occurred; my_errno is set.
 
1220
     0          Success
 
1221
     1          An error has occurred; IO_CACHE to error state.
 
1222
*/
 
1223
 
 
1224
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
556
1225
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1226
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1227
  size_t max_length;
559
1228
  my_off_t next_pos_in_file;
560
 
  unsigned char *read_buffer;
 
1229
  uchar *read_buffer;
561
1230
 
562
1231
  memcpy(Buffer,info->read_pos,
563
1232
         (left_length= (size_t) (info->read_end-info->read_pos)));
575
1244
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1245
                 my_filename(info->file),
577
1246
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1247
      my_errno=info->aio_result.result.aio_errno;
579
1248
      info->error= -1;
580
1249
      return(1);
581
1250
    }
582
1251
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1252
        read_length == (size_t) -1)
584
1253
    {
585
 
      errno=0;                          /* For testing */
 
1254
      my_errno=0;                               /* For testing */
586
1255
      info->error= (read_length == (size_t) -1 ? -1 :
587
1256
                    (int) (read_length+left_length));
588
1257
      return(1);
615
1284
      }
616
1285
    }
617
1286
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1287
    length=min(Count,read_length);
 
1288
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1289
    Buffer+=length;
 
1290
    Count-=length;
 
1291
    left_length+=length;
623
1292
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1293
    info->read_pos+=length;
625
1294
  }
626
1295
  else
627
1296
    next_pos_in_file=(info->pos_in_file+ (size_t)
635
1304
      info->error=(int) (read_length+left_length);
636
1305
      return 1;
637
1306
    }
638
 
 
639
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1307
    
 
1308
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1309
        == MY_FILEPOS_ERROR)
640
1310
    {
641
1311
      info->error= -1;
642
1312
      return (1);
659
1329
      {                                 /* Didn't find hole block */
660
1330
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1331
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1332
                   my_filename(info->file),my_errno);
663
1333
        info->error=(int) (read_length+left_length);
664
1334
        return 1;
665
1335
      }
692
1362
  {
693
1363
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
694
1364
    if (aioread(info->file,read_buffer, max_length,
695
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1365
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
696
1366
                &info->aio_result.result))
697
1367
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1368
      my_errno=errno;
699
1369
      if (info->request_pos != info->buffer)
700
1370
      {
701
 
        memmove(info->buffer, info->request_pos,
702
 
                (size_t) (info->read_end - info->read_pos));
 
1371
        bmove(info->buffer,info->request_pos,
 
1372
              (size_t) (info->read_end - info->read_pos));
703
1373
        info->request_pos=info->buffer;
704
1374
        info->read_pos-=info->read_length;
705
1375
        info->read_end-=info->read_length;
715
1385
#endif
716
1386
 
717
1387
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1388
/* Read one byte when buffer is empty */
 
1389
 
 
1390
int _my_b_get(IO_CACHE *info)
723
1391
{
724
 
  unsigned char buff;
 
1392
  uchar buff;
725
1393
  IO_CACHE_CALLBACK pre_read,post_read;
726
1394
  if ((pre_read = info->pre_read))
727
1395
    (*pre_read)(info);
729
1397
    return my_b_EOF;
730
1398
  if ((post_read = info->post_read))
731
1399
    (*post_read)(info);
732
 
  return (int) (unsigned char) buff;
 
1400
  return (int) (uchar) buff;
733
1401
}
734
1402
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1403
/* 
 
1404
   Write a byte buffer to IO_CACHE and flush to disk
 
1405
   if IO_CACHE is full.
 
1406
 
 
1407
   RETURN VALUE
 
1408
    1 On error on write
 
1409
    0 On success
 
1410
   -1 On error; my_errno contains error code.
 
1411
*/
 
1412
 
 
1413
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
744
1414
{
745
 
  size_t rest_length,length_local;
 
1415
  size_t rest_length,length;
746
1416
 
747
1417
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1418
  {
749
 
    errno=EFBIG;
 
1419
    my_errno=errno=EFBIG;
750
1420
    return info->error = -1;
751
1421
  }
752
1422
 
760
1430
    return 1;
761
1431
  if (Count >= IO_SIZE)
762
1432
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1433
    length=Count & (size_t) ~(IO_SIZE-1);
764
1434
    if (info->seek_not_done)
765
1435
    {
766
1436
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1437
        Whenever a function which operates on IO_CACHE flushes/writes
 
1438
        some part of the IO_CACHE to disk it will set the property
769
1439
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1440
        on the IO_CACHE.
771
1441
      */
772
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1442
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
773
1443
      {
774
1444
        info->error= -1;
775
1445
        return (1);
776
1446
      }
777
1447
      info->seek_not_done=0;
778
1448
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1449
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1450
      return info->error= -1;
 
1451
 
 
1452
    /*
 
1453
      In case of a shared I/O cache with a writer we normally do direct
 
1454
      write cache to read cache copy. Simulate this here by direct
 
1455
      caller buffer to read cache copy. Do it after the write so that
 
1456
      the cache readers actions on the flushed part can go in parallel
 
1457
      with the write of the extra stuff. copy_to_read_buffer()
 
1458
      synchronizes writer and readers so that after this call the
 
1459
      readers can act on the extra stuff while the writer can go ahead
 
1460
      and prepare the next output. copy_to_read_buffer() relies on
 
1461
      info->pos_in_file.
 
1462
    */
 
1463
    if (info->share)
 
1464
      copy_to_read_buffer(info, Buffer, length);
 
1465
 
 
1466
    Count-=length;
 
1467
    Buffer+=length;
 
1468
    info->pos_in_file+=length;
 
1469
  }
 
1470
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1471
  info->write_pos+=Count;
 
1472
  return 0;
 
1473
}
 
1474
 
 
1475
 
 
1476
/*
 
1477
  Append a block to the write buffer.
 
1478
  This is done with the buffer locked to ensure that we don't read from
 
1479
  the write buffer before we are ready with it.
 
1480
*/
 
1481
 
 
1482
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1483
{
 
1484
  size_t rest_length,length;
 
1485
 
 
1486
  /*
 
1487
    Assert that we cannot come here with a shared cache. If we do one
 
1488
    day, we might need to add a call to copy_to_read_buffer().
 
1489
  */
 
1490
  assert(!info->share);
 
1491
 
 
1492
  lock_append_buffer(info);
 
1493
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1494
  if (Count <= rest_length)
 
1495
    goto end;
 
1496
  memcpy(info->write_pos, Buffer, rest_length);
 
1497
  Buffer+=rest_length;
 
1498
  Count-=rest_length;
 
1499
  info->write_pos+=rest_length;
 
1500
  if (my_b_flush_io_cache(info,0))
 
1501
  {
 
1502
    unlock_append_buffer(info);
 
1503
    return 1;
 
1504
  }
 
1505
  if (Count >= IO_SIZE)
 
1506
  {                                     /* Fill first intern buffer */
 
1507
    length=Count & (size_t) ~(IO_SIZE-1);
 
1508
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1509
    {
 
1510
      unlock_append_buffer(info);
 
1511
      return info->error= -1;
 
1512
    }
 
1513
    Count-=length;
 
1514
    Buffer+=length;
 
1515
    info->end_of_file+=length;
 
1516
  }
 
1517
 
 
1518
end:
 
1519
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1520
  info->write_pos+=Count;
 
1521
  unlock_append_buffer(info);
 
1522
  return 0;
 
1523
}
 
1524
 
 
1525
 
 
1526
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1527
{
 
1528
  /*
 
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1531
    several layers of ternary operators that evaluated comma(,) operator
 
1532
    expressions inside - I do have a test case if somebody wants it
 
1533
  */
 
1534
  if (info->type == SEQ_READ_APPEND)
 
1535
    return my_b_append(info, Buffer, Count);
 
1536
  return my_b_write(info, Buffer, Count);
 
1537
}
 
1538
 
 
1539
 
 
1540
/*
 
1541
  Write a block to disk where part of the data may be inside the record
 
1542
  buffer.  As all write calls to the data goes through the cache,
 
1543
  we will never get a seek over the end of the buffer
 
1544
*/
 
1545
 
 
1546
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
798
1547
                   my_off_t pos)
799
1548
{
800
 
  size_t length_local;
 
1549
  size_t length;
801
1550
  int error=0;
802
1551
 
 
1552
  /*
 
1553
    Assert that we cannot come here with a shared cache. If we do one
 
1554
    day, we might need to add a call to copy_to_read_buffer().
 
1555
  */
 
1556
  assert(!info->share);
 
1557
 
803
1558
  if (pos < info->pos_in_file)
804
1559
  {
805
1560
    /* Of no overlap, write everything without buffering */
806
1561
    if (pos + Count <= info->pos_in_file)
807
1562
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1563
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1564
    length= (uint) (info->pos_in_file - pos);
 
1565
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1566
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1567
    Buffer+=length;
 
1568
    pos+=  length;
 
1569
    Count-= length;
 
1570
#ifndef HAVE_PREAD
 
1571
    info->seek_not_done=1;
 
1572
#endif
815
1573
  }
816
1574
 
817
1575
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1576
  length= (size_t) (info->write_end - info->buffer);
 
1577
  if (pos < info->pos_in_file + length)
820
1578
  {
821
1579
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1580
    length-=offset;
 
1581
    if (length > Count)
 
1582
      length=Count;
 
1583
    memcpy(info->buffer+offset, Buffer, length);
 
1584
    Buffer+=length;
 
1585
    Count-= length;
 
1586
    /* Fix length of buffer if the new data was larger */
 
1587
    if (info->buffer+length > info->write_pos)
 
1588
      info->write_pos=info->buffer+length;
831
1589
    if (!Count)
832
1590
      return (error);
833
1591
  }
837
1595
  return error;
838
1596
}
839
1597
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1598
 
 
1599
        /* Flush write cache */
 
1600
 
 
1601
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1602
  lock_append_buffer(info);
 
1603
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1604
  unlock_append_buffer(info);
 
1605
 
 
1606
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1607
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1608
  size_t length;
 
1609
  bool append_cache;
 
1610
  my_off_t pos_in_file;
 
1611
 
 
1612
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1613
    need_append_buffer_lock=0;
849
1614
 
850
1615
  if (info->type == WRITE_CACHE || append_cache)
851
1616
  {
852
1617
    if (info->file == -1)
853
1618
    {
854
 
      if (info->real_open_cached_file())
 
1619
      if (real_open_cached_file(info))
855
1620
        return((info->error= -1));
856
1621
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1622
    LOCK_APPEND_BUFFER;
858
1623
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1624
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1625
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1626
      /*
 
1627
        In case of a shared I/O cache with a writer we do direct write
 
1628
        cache to read cache copy. Do it before the write here so that
 
1629
        the readers can work in parallel with the write.
 
1630
        copy_to_read_buffer() relies on info->pos_in_file.
 
1631
      */
 
1632
      if (info->share)
 
1633
        copy_to_read_buffer(info, info->write_buffer, length);
 
1634
 
 
1635
      pos_in_file=info->pos_in_file;
862
1636
      /*
863
1637
        If we have append cache, we always open the file with
864
1638
        O_APPEND which moves the pos to EOF automatically on every write
865
1639
      */
866
1640
      if (!append_cache && info->seek_not_done)
867
1641
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1642
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1643
            MY_FILEPOS_ERROR)
869
1644
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1645
          UNLOCK_APPEND_BUFFER;
871
1646
          return((info->error= -1));
872
1647
        }
873
1648
        if (!append_cache)
874
1649
          info->seek_not_done=0;
875
1650
      }
876
1651
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1652
        info->pos_in_file+=length;
878
1653
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1654
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1655
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1656
      if (my_write(info->file,info->write_buffer,length,
882
1657
                   info->myflags | MY_NABP))
883
1658
        info->error= -1;
884
1659
      else
885
1660
        info->error= 0;
886
1661
      if (!append_cache)
887
1662
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1663
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1664
      }
890
1665
      else
891
1666
      {
892
1667
        info->end_of_file+=(info->write_pos-info->append_read_pos);
893
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
894
 
        assert(info->end_of_file == tell_ret);
 
1668
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
895
1669
      }
896
1670
 
897
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1672
      ++info->disk_writes;
 
1673
      UNLOCK_APPEND_BUFFER;
899
1674
      return(info->error);
900
1675
    }
901
1676
  }
906
1681
    info->inited=0;
907
1682
  }
908
1683
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1684
  UNLOCK_APPEND_BUFFER;
910
1685
  return(0);
911
1686
}
912
1687
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1688
/*
 
1689
  Free an IO_CACHE object
 
1690
 
 
1691
  SYNOPSOS
 
1692
    end_io_cache()
 
1693
    info                IO_CACHE Handle to free
 
1694
 
 
1695
  NOTES
 
1696
    It's currently safe to call this if one has called init_io_cache()
 
1697
    on the 'info' object, even if init_io_cache() failed.
 
1698
    This function is also safe to call twice with the same handle.
 
1699
 
 
1700
  RETURN
 
1701
   0  ok
 
1702
   #  Error
 
1703
*/
 
1704
 
 
1705
int end_io_cache(IO_CACHE *info)
928
1706
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1707
  int error=0;
 
1708
  IO_CACHE_CALLBACK pre_close;
 
1709
 
 
1710
  /*
 
1711
    Every thread must call remove_io_thread(). The last one destroys
 
1712
    the share elements.
 
1713
  */
 
1714
  assert(!info->share || !info->share->total_threads);
 
1715
 
 
1716
  if ((pre_close=info->pre_close))
 
1717
  {
 
1718
    (*pre_close)(info);
 
1719
    info->pre_close= 0;
 
1720
  }
 
1721
  if (info->alloced_buffer)
 
1722
  {
 
1723
    info->alloced_buffer=0;
 
1724
    if (info->file != -1)                       /* File doesn't exist */
 
1725
      error= my_b_flush_io_cache(info,1);
 
1726
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1727
    info->buffer=info->read_pos=(uchar*) 0;
 
1728
  }
 
1729
  if (info->type == SEQ_READ_APPEND)
 
1730
  {
 
1731
    /* Destroy allocated mutex */
 
1732
    info->type= TYPE_NOT_SET;
 
1733
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1734
  }
 
1735
  return(error);
948
1736
} /* end_io_cache */
949
1737
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1738
 
 
1739
/**********************************************************************
 
1740
 Testing of MF_IOCACHE
 
1741
**********************************************************************/
 
1742
 
 
1743
#ifdef MAIN
 
1744
 
 
1745
#include <my_dir.h>
 
1746
 
 
1747
void die(const char* fmt, ...)
 
1748
{
 
1749
  va_list va_args;
 
1750
  va_start(va_args,fmt);
 
1751
  fprintf(stderr,"Error:");
 
1752
  vfprintf(stderr, fmt,va_args);
 
1753
  fprintf(stderr,", errno=%d\n", errno);
 
1754
  exit(1);
 
1755
}
 
1756
 
 
1757
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1758
{
 
1759
  int fd;
 
1760
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1761
    die("Could not open %s", fname);
 
1762
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1763
    die("failed in init_io_cache()");
 
1764
  return fd;
 
1765
}
 
1766
 
 
1767
void close_file(IO_CACHE* info)
 
1768
{
 
1769
  end_io_cache(info);
 
1770
  my_close(info->file, MYF(MY_WME));
 
1771
}
 
1772
 
 
1773
int main(int argc, char** argv)
 
1774
{
 
1775
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1776
  MY_STAT status;
 
1777
  const char* fname="/tmp/iocache.test";
 
1778
  int cache_size=16384;
 
1779
  char llstr_buf[22];
 
1780
  int max_block,total_bytes=0;
 
1781
  int i,num_loops=100,error=0;
 
1782
  char *p;
 
1783
  char* block, *block_end;
 
1784
  MY_INIT(argv[0]);
 
1785
  max_block = cache_size*3;
 
1786
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1787
    die("Not enough memory to allocate test block");
 
1788
  block_end = block + max_block;
 
1789
  for (p = block,i=0; p < block_end;i++)
 
1790
  {
 
1791
    *p++ = (char)i;
 
1792
  }
 
1793
  if (my_stat(fname,&status, MYF(0)) &&
 
1794
      my_delete(fname,MYF(MY_WME)))
 
1795
    {
 
1796
      die("Delete of %s failed, aborting", fname);
 
1797
    }
 
1798
  open_file(fname,&sra_cache, cache_size);
 
1799
  for (i = 0; i < num_loops; i++)
 
1800
  {
 
1801
    char buf[4];
 
1802
    int block_size = abs(rand() % max_block);
 
1803
    int4store(buf, block_size);
 
1804
    if (my_b_append(&sra_cache,buf,4) ||
 
1805
        my_b_append(&sra_cache, block, block_size))
 
1806
      die("write failed");
 
1807
    total_bytes += 4+block_size;
 
1808
  }
 
1809
  close_file(&sra_cache);
 
1810
  my_free(block,MYF(MY_WME));
 
1811
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1812
    die("%s failed to stat, but I had just closed it,\
 
1813
 wonder how that happened");
 
1814
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1815
         llstr(status.st_size,llstr_buf),
 
1816
         total_bytes);
 
1817
  my_delete(fname, MYF(MY_WME));
 
1818
  /* check correctness of tests */
 
1819
  if (total_bytes != status.st_size)
 
1820
  {
 
1821
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1822
supposedly written\n");
 
1823
    error=1;
 
1824
  }
 
1825
  exit(error);
 
1826
  return 0;
 
1827
}
 
1828
#endif