~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

move functions from item.cc/item.h to item directory

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
54
 
#include "drizzled/drizzled.h"
 
50
#include "mysys_priv.h"
 
51
#include <mystrings/m_string.h>
55
52
#ifdef HAVE_AIOWAIT
56
 
#include "drizzled/error.h"
57
 
#include "drizzled/internal/aio_result.h"
 
53
#include "mysys_err.h"
 
54
#include <mysys/aio_result.h>
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include "drizzled/internal/iocache.h"
 
57
#include <mysys/iocache.h>
61
58
#include <errno.h>
62
59
#include <drizzled/util/test.h>
63
60
#include <stdlib.h>
64
 
#include <algorithm>
65
 
 
66
 
using namespace std;
67
 
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(register st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(register st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
61
 
 
62
#define lock_append_buffer(info) \
 
63
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
64
#define unlock_append_buffer(info) \
 
65
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
66
 
 
67
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
68
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
69
 
 
70
/*
 
71
  Setup internal pointers inside IO_CACHE
 
72
 
 
73
  SYNOPSIS
 
74
    setup_io_cache()
 
75
    info                IO_CACHE handler
 
76
 
 
77
  NOTES
 
78
    This is called on automaticly on init or reinit of IO_CACHE
 
79
    It must be called externally if one moves or copies an IO_CACHE
 
80
    object.
 
81
*/
 
82
 
 
83
void setup_io_cache(IO_CACHE* info)
126
84
{
127
85
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
86
  if (info->type == WRITE_CACHE)
129
87
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
88
    info->current_pos= &info->write_pos;
 
89
    info->current_end= &info->write_end;
132
90
  }
133
91
  else
134
92
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
93
    info->current_pos= &info->read_pos;
 
94
    info->current_end= &info->read_end;
137
95
  }
138
96
}
139
97
 
140
98
 
141
 
void st_io_cache::init_functions()
 
99
static void
 
100
init_functions(IO_CACHE* info)
142
101
{
 
102
  enum cache_type type= info->type;
143
103
  switch (type) {
144
104
  case READ_NET:
145
105
    /*
150
110
      as myisamchk
151
111
    */
152
112
    break;
 
113
  case SEQ_READ_APPEND:
 
114
    info->read_function = _my_b_seq_read;
 
115
    info->write_function = 0;                   /* Force a core if used */
 
116
    break;
153
117
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
118
    info->read_function =
 
119
                          info->share ? _my_b_read_r :
 
120
                                        _my_b_read;
 
121
    info->write_function = _my_b_write;
156
122
  }
157
123
 
158
 
  setup_io_cache();
 
124
  setup_io_cache(info);
159
125
}
160
126
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
127
 
 
128
/*
 
129
  Initialize an IO_CACHE object
 
130
 
 
131
  SYNOPSOS
 
132
    init_io_cache()
 
133
    info                cache handler to initialize
 
134
    file                File that should be associated to to the handler
 
135
                        If == -1 then real_open_cached_file()
 
136
                        will be called when it's time to open file.
 
137
    cachesize           Size of buffer to allocate for read/write
 
138
                        If == 0 then use my_default_record_cache_size
 
139
    type                Type of cache
 
140
    seek_offset         Where cache should start reading/writing
 
141
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
142
    cache_myflags       Bitmap of differnt flags
 
143
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
144
                        MY_DONT_CHECK_FILESIZE
 
145
 
 
146
  RETURN
 
147
    0  ok
 
148
    #  error
 
149
*/
 
150
 
 
151
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
152
                  enum cache_type type, my_off_t seek_offset,
 
153
                  bool use_async_io, myf cache_myflags)
181
154
{
182
155
  size_t min_cache;
183
 
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
156
  my_off_t pos;
 
157
  my_off_t end_of_file= ~(my_off_t) 0;
185
158
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
159
  info->file= file;
 
160
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
161
  info->pos_in_file= seek_offset;
 
162
  info->pre_close = info->pre_read = info->post_read = 0;
 
163
  info->arg = 0;
 
164
  info->alloced_buffer = 0;
 
165
  info->buffer=0;
 
166
  info->seek_not_done= 0;
194
167
 
195
168
  if (file >= 0)
196
169
  {
197
 
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
170
    pos= my_tell(file);
 
171
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
199
172
    {
200
173
      /*
201
174
         This kind of object doesn't support seek() or tell(). Don't set a
202
175
         flag that will make us again try to seek() later and fail.
203
176
      */
204
 
      seek_not_done= 0;
 
177
      info->seek_not_done= 0;
205
178
      /*
206
179
        Additionally, if we're supposed to start somewhere other than the
207
180
        the beginning of whatever this file is, then somebody made a bad
210
183
      assert(seek_offset == 0);
211
184
    }
212
185
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
186
      info->seek_not_done= test(seek_offset != pos);
214
187
  }
215
188
 
 
189
  info->disk_writes= 0;
 
190
  info->share=0;
 
191
 
216
192
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
193
    return(1);                          /* No cache requested */
218
194
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
195
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
196
  {                                             /* Assume file isn't growing */
221
197
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
198
    {
223
199
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
200
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
225
201
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
202
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
203
      if (end_of_file < seek_offset)
 
204
        end_of_file=seek_offset;
229
205
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
206
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
207
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
208
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
209
        use_async_io=0;                         /* No need to use async */
234
210
      }
235
211
    }
236
212
  }
237
213
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
214
  if (type != READ_NET && type != WRITE_NET)
239
215
  {
240
216
    /* Retry allocating memory in smaller blocks until we get one */
241
217
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
221
      if (cachesize < min_cache)
246
222
        cachesize = min_cache;
247
223
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
255
 
           (unsigned char*) malloc(buffer_block)) != 0)
256
 
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
224
      if (type == SEQ_READ_APPEND)
 
225
        buffer_block *= 2;
 
226
      if ((info->buffer=
 
227
           (unsigned char*) my_malloc(buffer_block,
 
228
                             MYF((cache_myflags & ~ MY_WME) |
 
229
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
230
      {
 
231
        info->write_buffer=info->buffer;
 
232
        if (type == SEQ_READ_APPEND)
 
233
          info->write_buffer = info->buffer + cachesize;
 
234
        info->alloced_buffer=1;
259
235
        break;                                  /* Enough memory found */
260
236
      }
261
237
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
238
        return(2);                              /* Can't alloc cache */
267
239
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
240
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
241
    }
272
242
  }
273
243
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
244
  info->read_length=info->buffer_length=cachesize;
 
245
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
246
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
247
  if (type == SEQ_READ_APPEND)
 
248
  {
 
249
    info->append_read_pos = info->write_pos = info->write_buffer;
 
250
    info->write_end = info->write_buffer + info->buffer_length;
 
251
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
252
  }
 
253
#if defined(SAFE_MUTEX)
 
254
  else
 
255
  {
 
256
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
257
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
258
  }
 
259
#endif
277
260
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
261
  if (type == WRITE_CACHE)
 
262
    info->write_end=
 
263
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
264
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
265
    info->read_end=info->buffer;                /* Nothing in cache */
283
266
 
284
267
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
268
  info->end_of_file= end_of_file;
 
269
  info->error=0;
 
270
  info->type= type;
 
271
  init_functions(info);
289
272
#ifdef HAVE_AIOWAIT
290
273
  if (use_async_io && ! my_disable_async_io)
291
274
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
275
    info->read_length/=2;
 
276
    info->read_function=_my_b_async_read;
294
277
  }
295
 
  inited= aio_result.pending= 0;
 
278
  info->inited=info->aio_result.pending=0;
296
279
#endif
297
 
  return 0;
 
280
  return(0);
298
281
}                                               /* init_io_cache */
299
282
 
300
283
        /* Wait until current request is ready */
319
302
        break;
320
303
    }
321
304
  }
 
305
  return;
322
306
}
323
307
#endif
324
308
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
309
 
 
310
/*
 
311
  Use this to reset cache to re-start reading or to change the type
 
312
  between READ_CACHE <-> WRITE_CACHE
 
313
  If we are doing a reinit of a cache where we have the start of the file
 
314
  in the cache, we are reusing this memory without flushing it to disk.
 
315
*/
 
316
 
 
317
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
318
                        my_off_t seek_offset,
 
319
                        bool use_async_io,
 
320
                        bool clear_cache)
339
321
{
340
322
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
323
  assert(type != READ_NET && info->type != READ_NET &&
 
324
              type != WRITE_NET && info->type != WRITE_NET &&
 
325
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
326
 
344
327
  /* If the whole file is in memory, avoid flushing to disk */
345
328
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
329
      seek_offset >= info->pos_in_file &&
 
330
      seek_offset <= my_b_tell(info))
348
331
  {
349
332
    /* Reuse current buffer without flushing it to disk */
350
333
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
334
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
335
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
336
      info->read_end=info->write_pos;
 
337
      info->end_of_file=my_b_tell(info);
355
338
      /*
356
339
        Trigger a new seek only if we have a valid
357
340
        file handle.
358
341
      */
359
 
      seek_not_done= (file != -1);
 
342
      info->seek_not_done= (info->file != -1);
360
343
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
344
    else if (type == WRITE_CACHE)
362
345
    {
363
 
      if (type == READ_CACHE)
 
346
      if (info->type == READ_CACHE)
364
347
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
348
        info->write_end=info->write_buffer+info->buffer_length;
 
349
        info->seek_not_done=1;
367
350
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
351
      info->end_of_file = ~(my_off_t) 0;
369
352
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
353
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
354
    if (type == WRITE_CACHE)
 
355
      info->write_pos=pos;
373
356
    else
374
 
      read_pos= pos;
 
357
      info->read_pos= pos;
375
358
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
359
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
360
#endif
378
361
  }
379
362
  else
382
365
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
366
      after the current positions should be ignored
384
367
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
368
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
369
      info->end_of_file=my_b_tell(info);
387
370
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
371
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
372
      return(1);
 
373
    info->pos_in_file=seek_offset;
391
374
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
375
    info->seek_not_done=1;
 
376
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
377
    if (type == READ_CACHE)
395
378
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
379
      info->read_end=info->buffer;              /* Nothing in cache */
397
380
    }
398
381
    else
399
382
    {
400
 
      write_end=(buffer + buffer_length -
 
383
      info->write_end=(info->buffer + info->buffer_length -
401
384
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
385
      info->end_of_file= ~(my_off_t) 0;
403
386
    }
404
387
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
388
  info->type=type;
 
389
  info->error=0;
 
390
  init_functions(info);
408
391
 
409
392
#ifdef HAVE_AIOWAIT
410
393
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
394
      ((uint32_t) info->buffer_length <
 
395
       (uint32_t) (info->end_of_file - seek_offset)))
413
396
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
397
    info->read_length=info->buffer_length/2;
 
398
    info->read_function=_my_b_async_read;
416
399
  }
417
 
  inited= 0;
 
400
  info->inited=0;
418
401
#else
419
402
  (void)use_async_io;
420
403
#endif
421
 
  return 0;
 
404
  return(0);
422
405
} /* reinit_io_cache */
423
406
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
407
 
 
408
 
 
409
/*
 
410
  Read buffered.
 
411
 
 
412
  SYNOPSIS
 
413
    _my_b_read()
 
414
      info                      IO_CACHE pointer
 
415
      Buffer                    Buffer to retrieve count bytes from file
 
416
      Count                     Number of bytes to read into Buffer
 
417
 
 
418
  NOTE
 
419
    This function is only called from the my_b_read() macro when there
 
420
    isn't enough characters in the buffer to satisfy the request.
 
421
 
 
422
  WARNING
 
423
 
 
424
    When changing this function, be careful with handling file offsets
 
425
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
426
    types than my_off_t unless you can be sure that their value fits.
 
427
    Same applies to differences of file offsets.
 
428
 
 
429
    When changing this function, check _my_b_read_r(). It might need the
 
430
    same change.
 
431
 
 
432
  RETURN
 
433
    0      we succeeded in reading all data
 
434
    1      Error: can't read requested characters
 
435
*/
 
436
 
 
437
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
438
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
439
  size_t length,diff_length,left_length, max_length;
 
440
  my_off_t pos_in_file;
448
441
 
449
442
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
443
  {
455
448
  }
456
449
 
457
450
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
451
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
452
 
460
 
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
453
  /* 
 
454
    Whenever a function which operates on IO_CACHE flushes/writes
 
455
    some part of the IO_CACHE to disk it will set the property
463
456
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
457
    on the IO_CACHE.
465
458
  */
466
459
  if (info->seek_not_done)
467
460
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
461
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
462
        != MY_FILEPOS_ERROR))
469
463
    {
470
464
      /* No error, reset seek_not_done flag. */
471
465
      info->seek_not_done= 0;
477
471
        info->file is a pipe or socket or FIFO.  We never should have tried
478
472
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
473
      */
480
 
      assert(errno != ESPIPE);
 
474
      assert(my_errno != ESPIPE);
481
475
      info->error= -1;
482
476
      return(1);
483
477
    }
484
478
  }
485
479
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
480
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
481
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
482
  {                                     /* Fill first intern buffer */
489
483
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
484
    if (info->end_of_file <= pos_in_file)
491
485
    {                                   /* End of file */
492
486
      info->error= (int) left_length;
493
487
      return(1);
494
488
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
489
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
490
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
491
        != length)
497
492
    {
498
493
      info->error= (read_length == (size_t) -1 ? -1 :
499
494
                    (int) (read_length+left_length));
500
495
      return(1);
501
496
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
497
    Count-=length;
 
498
    Buffer+=length;
 
499
    pos_in_file+=length;
 
500
    left_length+=length;
506
501
    diff_length=0;
507
502
  }
508
503
 
509
504
  max_length= info->read_length-diff_length;
510
505
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
506
      max_length > (info->end_of_file - pos_in_file))
 
507
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
508
  if (!max_length)
514
509
  {
515
510
    if (Count)
516
511
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
512
      info->error= left_length;         /* We only got this many char */
518
513
      return(1);
519
514
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
515
    length=0;                           /* Didn't read any chars */
521
516
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
517
  else if ((length= my_read(info->file,info->buffer, max_length,
523
518
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
519
           length == (size_t) -1)
525
520
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
521
    if (length != (size_t) -1)
 
522
      memcpy(Buffer, info->buffer, length);
 
523
    info->pos_in_file= pos_in_file;
 
524
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
525
    info->read_pos=info->read_end=info->buffer;
531
526
    return(1);
532
527
  }
533
528
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
529
  info->read_end=info->buffer+length;
 
530
  info->pos_in_file=pos_in_file;
536
531
  memcpy(Buffer, info->buffer, Count);
537
532
  return(0);
538
533
}
539
534
 
540
535
 
 
536
/*
 
537
  Prepare IO_CACHE for shared use.
 
538
 
 
539
  SYNOPSIS
 
540
    init_io_cache_share()
 
541
      read_cache                A read cache. This will be copied for
 
542
                                every thread after setup.
 
543
      cshare                    The share.
 
544
      write_cache               If non-NULL a write cache that is to be
 
545
                                synchronized with the read caches.
 
546
      num_threads               Number of threads sharing the cache
 
547
                                including the write thread if any.
 
548
 
 
549
  DESCRIPTION
 
550
 
 
551
    The shared cache is used so: One IO_CACHE is initialized with
 
552
    init_io_cache(). This includes the allocation of a buffer. Then a
 
553
    share is allocated and init_io_cache_share() is called with the io
 
554
    cache and the share. Then the io cache is copied for each thread. So
 
555
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
556
    is shared because cache->buffer is the same for all caches.
 
557
 
 
558
    One thread reads data from the file into the buffer. All threads
 
559
    read from the buffer, but every thread maintains its own set of
 
560
    pointers into the buffer. When all threads have used up the buffer
 
561
    contents, one of the threads reads the next block of data into the
 
562
    buffer. To accomplish this, each thread enters the cache lock before
 
563
    accessing the buffer. They wait in lock_io_cache() until all threads
 
564
    joined the lock. The last thread entering the lock is in charge of
 
565
    reading from file to buffer. It wakes all threads when done.
 
566
 
 
567
    Synchronizing a write cache to the read caches works so: Whenever
 
568
    the write buffer needs a flush, the write thread enters the lock and
 
569
    waits for all other threads to enter the lock too. They do this when
 
570
    they have used up the read buffer. When all threads are in the lock,
 
571
    the write thread copies the write buffer to the read buffer and
 
572
    wakes all threads.
 
573
 
 
574
    share->running_threads is the number of threads not being in the
 
575
    cache lock. When entering lock_io_cache() the number is decreased.
 
576
    When the thread that fills the buffer enters unlock_io_cache() the
 
577
    number is reset to the number of threads. The condition
 
578
    running_threads == 0 means that all threads are in the lock. Bumping
 
579
    up the number to the full count is non-intuitive. But increasing the
 
580
    number by one for each thread that leaves the lock could lead to a
 
581
    solo run of one thread. The last thread to join a lock reads from
 
582
    file to buffer, wakes the other threads, processes the data in the
 
583
    cache and enters the lock again. If no other thread left the lock
 
584
    meanwhile, it would think it's the last one again and read the next
 
585
    block...
 
586
 
 
587
    The share has copies of 'error', 'buffer', 'read_end', and
 
588
    'pos_in_file' from the thread that filled the buffer. We may not be
 
589
    able to access this information directly from its cache because the
 
590
    thread may be removed from the share before the variables could be
 
591
    copied by all other threads. Or, if a write buffer is synchronized,
 
592
    it would change its 'pos_in_file' after waking the other threads,
 
593
    possibly before they could copy its value.
 
594
 
 
595
    However, the 'buffer' variable in the share is for a synchronized
 
596
    write cache. It needs to know where to put the data. Otherwise it
 
597
    would need access to the read cache of one of the threads that is
 
598
    not yet removed from the share.
 
599
 
 
600
  RETURN
 
601
    void
 
602
*/
 
603
 
 
604
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
605
                         IO_CACHE *write_cache, uint32_t num_threads)
 
606
{
 
607
  assert(num_threads > 1);
 
608
  assert(read_cache->type == READ_CACHE);
 
609
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
610
 
 
611
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
612
  pthread_cond_init(&cshare->cond, 0);
 
613
  pthread_cond_init(&cshare->cond_writer, 0);
 
614
 
 
615
  cshare->running_threads= num_threads;
 
616
  cshare->total_threads=   num_threads;
 
617
  cshare->error=           0;    /* Initialize. */
 
618
  cshare->buffer=          read_cache->buffer;
 
619
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
620
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
621
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
622
 
 
623
  read_cache->share=         cshare;
 
624
  read_cache->read_function= _my_b_read_r;
 
625
  read_cache->current_pos=   NULL;
 
626
  read_cache->current_end=   NULL;
 
627
 
 
628
  if (write_cache)
 
629
    write_cache->share= cshare;
 
630
 
 
631
  return;
 
632
}
 
633
 
 
634
 
 
635
/*
 
636
  Remove a thread from shared access to IO_CACHE.
 
637
 
 
638
  SYNOPSIS
 
639
    remove_io_thread()
 
640
      cache                     The IO_CACHE to be removed from the share.
 
641
 
 
642
  NOTE
 
643
 
 
644
    Every thread must do that on exit for not to deadlock other threads.
 
645
 
 
646
    The last thread destroys the pthread resources.
 
647
 
 
648
    A writer flushes its cache first.
 
649
 
 
650
  RETURN
 
651
    void
 
652
*/
 
653
 
 
654
void remove_io_thread(IO_CACHE *cache)
 
655
{
 
656
  IO_CACHE_SHARE *cshare= cache->share;
 
657
  uint32_t total;
 
658
 
 
659
  /* If the writer goes, it needs to flush the write cache. */
 
660
  if (cache == cshare->source_cache)
 
661
    flush_io_cache(cache);
 
662
 
 
663
  pthread_mutex_lock(&cshare->mutex);
 
664
 
 
665
  /* Remove from share. */
 
666
  total= --cshare->total_threads;
 
667
 
 
668
  /* Detach from share. */
 
669
  cache->share= NULL;
 
670
 
 
671
  /* If the writer goes, let the readers know. */
 
672
  if (cache == cshare->source_cache)
 
673
  {
 
674
    cshare->source_cache= NULL;
 
675
  }
 
676
 
 
677
  /* If all threads are waiting for me to join the lock, wake them. */
 
678
  if (!--cshare->running_threads)
 
679
  {
 
680
    pthread_cond_signal(&cshare->cond_writer);
 
681
    pthread_cond_broadcast(&cshare->cond);
 
682
  }
 
683
 
 
684
  pthread_mutex_unlock(&cshare->mutex);
 
685
 
 
686
  if (!total)
 
687
  {
 
688
    pthread_cond_destroy (&cshare->cond_writer);
 
689
    pthread_cond_destroy (&cshare->cond);
 
690
    pthread_mutex_destroy(&cshare->mutex);
 
691
  }
 
692
 
 
693
  return;
 
694
}
 
695
 
 
696
 
 
697
/*
 
698
  Lock IO cache and wait for all other threads to join.
 
699
 
 
700
  SYNOPSIS
 
701
    lock_io_cache()
 
702
      cache                     The cache of the thread entering the lock.
 
703
      pos                       File position of the block to read.
 
704
                                Unused for the write thread.
 
705
 
 
706
  DESCRIPTION
 
707
 
 
708
    Wait for all threads to finish with the current buffer. We want
 
709
    all threads to proceed in concert. The last thread to join
 
710
    lock_io_cache() will read the block from file and all threads start
 
711
    to use it. Then they will join again for reading the next block.
 
712
 
 
713
    The waiting threads detect a fresh buffer by comparing
 
714
    cshare->pos_in_file with the position they want to process next.
 
715
    Since the first block may start at position 0, we take
 
716
    cshare->read_end as an additional condition. This variable is
 
717
    initialized to NULL and will be set after a block of data is written
 
718
    to the buffer.
 
719
 
 
720
  RETURN
 
721
    1           OK, lock in place, go ahead and read.
 
722
    0           OK, unlocked, another thread did the read.
 
723
*/
 
724
 
 
725
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
726
{
 
727
  IO_CACHE_SHARE *cshare= cache->share;
 
728
 
 
729
  /* Enter the lock. */
 
730
  pthread_mutex_lock(&cshare->mutex);
 
731
  cshare->running_threads--;
 
732
 
 
733
  if (cshare->source_cache)
 
734
  {
 
735
    /* A write cache is synchronized to the read caches. */
 
736
 
 
737
    if (cache == cshare->source_cache)
 
738
    {
 
739
      /* The writer waits until all readers are here. */
 
740
      while (cshare->running_threads)
 
741
      {
 
742
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
743
      }
 
744
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
745
      return(1);
 
746
    }
 
747
 
 
748
    /* The last thread wakes the writer. */
 
749
    if (!cshare->running_threads)
 
750
    {
 
751
      pthread_cond_signal(&cshare->cond_writer);
 
752
    }
 
753
 
 
754
    /*
 
755
      Readers wait until the data is copied from the writer. Another
 
756
      reason to stop waiting is the removal of the write thread. If this
 
757
      happens, we leave the lock with old data in the buffer.
 
758
    */
 
759
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
760
           cshare->source_cache)
 
761
    {
 
762
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
763
    }
 
764
 
 
765
    /*
 
766
      If the writer was removed from the share while this thread was
 
767
      asleep, we need to simulate an EOF condition. The writer cannot
 
768
      reset the share variables as they might still be in use by readers
 
769
      of the last block. When we awake here then because the last
 
770
      joining thread signalled us. If the writer is not the last, it
 
771
      will not signal. So it is safe to clear the buffer here.
 
772
    */
 
773
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
774
    {
 
775
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
776
      cshare->error= 0; /* EOF is not an error. */
 
777
    }
 
778
  }
 
779
  else
 
780
  {
 
781
    /*
 
782
      There are read caches only. The last thread arriving in
 
783
      lock_io_cache() continues with a locked cache and reads the block.
 
784
    */
 
785
    if (!cshare->running_threads)
 
786
    {
 
787
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
788
      return(1);
 
789
    }
 
790
 
 
791
    /*
 
792
      All other threads wait until the requested block is read by the
 
793
      last thread arriving. Another reason to stop waiting is the
 
794
      removal of a thread. If this leads to all threads being in the
 
795
      lock, we have to continue also. The first of the awaken threads
 
796
      will then do the read.
 
797
    */
 
798
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
799
           cshare->running_threads)
 
800
    {
 
801
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
802
    }
 
803
 
 
804
    /* If the block is not yet read, continue with a locked cache and read. */
 
805
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
806
    {
 
807
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
808
      return(1);
 
809
    }
 
810
 
 
811
    /* Another thread did read the block already. */
 
812
  }
 
813
 
 
814
  /*
 
815
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
816
    filled the buffer did this and marked all threads as running.
 
817
  */
 
818
  pthread_mutex_unlock(&cshare->mutex);
 
819
  return(0);
 
820
}
 
821
 
 
822
 
 
823
/*
 
824
  Unlock IO cache.
 
825
 
 
826
  SYNOPSIS
 
827
    unlock_io_cache()
 
828
      cache                     The cache of the thread leaving the lock.
 
829
 
 
830
  NOTE
 
831
    This is called by the thread that filled the buffer. It marks all
 
832
    threads as running and awakes them. This must not be done by any
 
833
    other thread.
 
834
 
 
835
    Do not signal cond_writer. Either there is no writer or the writer
 
836
    is the only one who can call this function.
 
837
 
 
838
    The reason for resetting running_threads to total_threads before
 
839
    waking all other threads is that it could be possible that this
 
840
    thread is so fast with processing the buffer that it enters the lock
 
841
    before even one other thread has left it. If every awoken thread
 
842
    would increase running_threads by one, this thread could think that
 
843
    he is again the last to join and would not wait for the other
 
844
    threads to process the data.
 
845
 
 
846
  RETURN
 
847
    void
 
848
*/
 
849
 
 
850
static void unlock_io_cache(IO_CACHE *cache)
 
851
{
 
852
  IO_CACHE_SHARE *cshare= cache->share;
 
853
 
 
854
  cshare->running_threads= cshare->total_threads;
 
855
  pthread_cond_broadcast(&cshare->cond);
 
856
  pthread_mutex_unlock(&cshare->mutex);
 
857
  return;
 
858
}
 
859
 
 
860
 
 
861
/*
 
862
  Read from IO_CACHE when it is shared between several threads.
 
863
 
 
864
  SYNOPSIS
 
865
    _my_b_read_r()
 
866
      cache                     IO_CACHE pointer
 
867
      Buffer                    Buffer to retrieve count bytes from file
 
868
      Count                     Number of bytes to read into Buffer
 
869
 
 
870
  NOTE
 
871
    This function is only called from the my_b_read() macro when there
 
872
    isn't enough characters in the buffer to satisfy the request.
 
873
 
 
874
  IMPLEMENTATION
 
875
 
 
876
    It works as follows: when a thread tries to read from a file (that
 
877
    is, after using all the data from the (shared) buffer), it just
 
878
    hangs on lock_io_cache(), waiting for other threads. When the very
 
879
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
880
    does actual IO and unlock_io_cache(), which signals all the waiting
 
881
    threads that data is in the buffer.
 
882
 
 
883
  WARNING
 
884
 
 
885
    When changing this function, be careful with handling file offsets
 
886
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
887
    types than my_off_t unless you can be sure that their value fits.
 
888
    Same applies to differences of file offsets. (Bug #11527)
 
889
 
 
890
    When changing this function, check _my_b_read(). It might need the
 
891
    same change.
 
892
 
 
893
  RETURN
 
894
    0      we succeeded in reading all data
 
895
    1      Error: can't read requested characters
 
896
*/
 
897
 
 
898
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
899
{
 
900
  my_off_t pos_in_file;
 
901
  size_t length, diff_length, left_length;
 
902
  IO_CACHE_SHARE *cshare= cache->share;
 
903
 
 
904
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
905
  {
 
906
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
907
    memcpy(Buffer, cache->read_pos, left_length);
 
908
    Buffer+= left_length;
 
909
    Count-= left_length;
 
910
  }
 
911
  while (Count)
 
912
  {
 
913
    size_t cnt, len;
 
914
 
 
915
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
916
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
917
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
918
    length= ((length <= cache->read_length) ?
 
919
             length + IO_ROUND_DN(cache->read_length - length) :
 
920
             length - IO_ROUND_UP(length - cache->read_length));
 
921
    if (cache->type != READ_FIFO &&
 
922
        (length > (cache->end_of_file - pos_in_file)))
 
923
      length= (size_t) (cache->end_of_file - pos_in_file);
 
924
    if (length == 0)
 
925
    {
 
926
      cache->error= (int) left_length;
 
927
      return(1);
 
928
    }
 
929
    if (lock_io_cache(cache, pos_in_file))
 
930
    {
 
931
      /* With a synchronized write/read cache we won't come here... */
 
932
      assert(!cshare->source_cache);
 
933
      /*
 
934
        ... unless the writer has gone before this thread entered the
 
935
        lock. Simulate EOF in this case. It can be distinguished by
 
936
        cache->file.
 
937
      */
 
938
      if (cache->file < 0)
 
939
        len= 0;
 
940
      else
 
941
      {
 
942
        /*
 
943
          Whenever a function which operates on IO_CACHE flushes/writes
 
944
          some part of the IO_CACHE to disk it will set the property
 
945
          "seek_not_done" to indicate this to other functions operating
 
946
          on the IO_CACHE.
 
947
        */
 
948
        if (cache->seek_not_done)
 
949
        {
 
950
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
951
              == MY_FILEPOS_ERROR)
 
952
          {
 
953
            cache->error= -1;
 
954
            unlock_io_cache(cache);
 
955
            return(1);
 
956
          }
 
957
        }
 
958
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
959
      }
 
960
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
961
      cache->error=       (len == length ? 0 : (int) len);
 
962
      cache->pos_in_file= pos_in_file;
 
963
 
 
964
      /* Copy important values to the share. */
 
965
      cshare->error=       cache->error;
 
966
      cshare->read_end=    cache->read_end;
 
967
      cshare->pos_in_file= pos_in_file;
 
968
 
 
969
      /* Mark all threads as running and wake them. */
 
970
      unlock_io_cache(cache);
 
971
    }
 
972
    else
 
973
    {
 
974
      /*
 
975
        With a synchronized write/read cache readers always come here.
 
976
        Copy important values from the share.
 
977
      */
 
978
      cache->error=       cshare->error;
 
979
      cache->read_end=    cshare->read_end;
 
980
      cache->pos_in_file= cshare->pos_in_file;
 
981
 
 
982
      len= ((cache->error == -1) ? (size_t) -1 :
 
983
            (size_t) (cache->read_end - cache->buffer));
 
984
    }
 
985
    cache->read_pos=      cache->buffer;
 
986
    cache->seek_not_done= 0;
 
987
    if (len == 0 || len == (size_t) -1)
 
988
    {
 
989
      cache->error= (int) left_length;
 
990
      return(1);
 
991
    }
 
992
    cnt= (len > Count) ? Count : len;
 
993
    memcpy(Buffer, cache->read_pos, cnt);
 
994
    Count -= cnt;
 
995
    Buffer+= cnt;
 
996
    left_length+= cnt;
 
997
    cache->read_pos+= cnt;
 
998
  }
 
999
  return(0);
 
1000
}
 
1001
 
 
1002
 
 
1003
/*
 
1004
  Copy data from write cache to read cache.
 
1005
 
 
1006
  SYNOPSIS
 
1007
    copy_to_read_buffer()
 
1008
      write_cache               The write cache.
 
1009
      write_buffer              The source of data, mostly the cache buffer.
 
1010
      write_length              The number of bytes to copy.
 
1011
 
 
1012
  NOTE
 
1013
    The write thread will wait for all read threads to join the cache
 
1014
    lock. Then it copies the data over and wakes the read threads.
 
1015
 
 
1016
  RETURN
 
1017
    void
 
1018
*/
 
1019
 
 
1020
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1021
                                const unsigned char *write_buffer, size_t write_length)
 
1022
{
 
1023
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1024
 
 
1025
  assert(cshare->source_cache == write_cache);
 
1026
  /*
 
1027
    write_length is usually less or equal to buffer_length.
 
1028
    It can be bigger if _my_b_write() is called with a big length.
 
1029
  */
 
1030
  while (write_length)
 
1031
  {
 
1032
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1033
    int  rc;
 
1034
 
 
1035
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1036
    /* The writing thread does always have the lock when it awakes. */
 
1037
    assert(rc);
 
1038
 
 
1039
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1040
 
 
1041
    cshare->error=       0;
 
1042
    cshare->read_end=    cshare->buffer + copy_length;
 
1043
    cshare->pos_in_file= write_cache->pos_in_file;
 
1044
 
 
1045
    /* Mark all threads as running and wake them. */
 
1046
    unlock_io_cache(write_cache);
 
1047
 
 
1048
    write_buffer+= copy_length;
 
1049
    write_length-= copy_length;
 
1050
  }
 
1051
}
 
1052
 
 
1053
 
 
1054
/*
 
1055
  Do sequential read from the SEQ_READ_APPEND cache.
 
1056
  
 
1057
  We do this in three stages:
 
1058
   - first read from info->buffer
 
1059
   - then if there are still data to read, try the file descriptor
 
1060
   - afterwards, if there are still data to read, try append buffer
 
1061
 
 
1062
  RETURNS
 
1063
    0  Success
 
1064
    1  Failed to read
 
1065
*/
 
1066
 
 
1067
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1068
{
 
1069
  size_t length, diff_length, left_length, save_count, max_length;
 
1070
  my_off_t pos_in_file;
 
1071
  save_count=Count;
 
1072
 
 
1073
  /* first, read the regular buffer */
 
1074
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1075
  {
 
1076
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1077
    memcpy(Buffer,info->read_pos, left_length);
 
1078
    Buffer+=left_length;
 
1079
    Count-=left_length;
 
1080
  }
 
1081
  lock_append_buffer(info);
 
1082
 
 
1083
  /* pos_in_file always point on where info->buffer was read */
 
1084
  if ((pos_in_file=info->pos_in_file +
 
1085
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1086
    goto read_append_buffer;
 
1087
 
 
1088
  /*
 
1089
    With read-append cache we must always do a seek before we read,
 
1090
    because the write could have moved the file pointer astray
 
1091
  */
 
1092
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1093
  {
 
1094
   info->error= -1;
 
1095
   unlock_append_buffer(info);
 
1096
   return (1);
 
1097
  }
 
1098
  info->seek_not_done=0;
 
1099
 
 
1100
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1101
 
 
1102
  /* now the second stage begins - read from file descriptor */
 
1103
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1104
  {
 
1105
    /* Fill first intern buffer */
 
1106
    size_t read_length;
 
1107
 
 
1108
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1109
    if ((read_length= my_read(info->file,Buffer, length,
 
1110
                              info->myflags)) == (size_t) -1)
 
1111
    {
 
1112
      info->error= -1;
 
1113
      unlock_append_buffer(info);
 
1114
      return 1;
 
1115
    }
 
1116
    Count-=read_length;
 
1117
    Buffer+=read_length;
 
1118
    pos_in_file+=read_length;
 
1119
 
 
1120
    if (read_length != length)
 
1121
    {
 
1122
      /*
 
1123
        We only got part of data;  Read the rest of the data from the
 
1124
        write buffer
 
1125
      */
 
1126
      goto read_append_buffer;
 
1127
    }
 
1128
    left_length+=length;
 
1129
    diff_length=0;
 
1130
  }
 
1131
 
 
1132
  max_length= info->read_length-diff_length;
 
1133
  if (max_length > (info->end_of_file - pos_in_file))
 
1134
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1135
  if (!max_length)
 
1136
  {
 
1137
    if (Count)
 
1138
      goto read_append_buffer;
 
1139
    length=0;                           /* Didn't read any more chars */
 
1140
  }
 
1141
  else
 
1142
  {
 
1143
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1144
    if (length == (size_t) -1)
 
1145
    {
 
1146
      info->error= -1;
 
1147
      unlock_append_buffer(info);
 
1148
      return 1;
 
1149
    }
 
1150
    if (length < Count)
 
1151
    {
 
1152
      memcpy(Buffer, info->buffer, length);
 
1153
      Count -= length;
 
1154
      Buffer += length;
 
1155
 
 
1156
      /*
 
1157
         added the line below to make
 
1158
         assert(pos_in_file==info->end_of_file) pass.
 
1159
         otherwise this does not appear to be needed
 
1160
      */
 
1161
      pos_in_file += length;
 
1162
      goto read_append_buffer;
 
1163
    }
 
1164
  }
 
1165
  unlock_append_buffer(info);
 
1166
  info->read_pos=info->buffer+Count;
 
1167
  info->read_end=info->buffer+length;
 
1168
  info->pos_in_file=pos_in_file;
 
1169
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1170
  return 0;
 
1171
 
 
1172
read_append_buffer:
 
1173
 
 
1174
  /*
 
1175
     Read data from the current write buffer.
 
1176
     Count should never be == 0 here (The code will work even if count is 0)
 
1177
  */
 
1178
 
 
1179
  {
 
1180
    /* First copy the data to Count */
 
1181
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1182
    size_t copy_len;
 
1183
    size_t transfer_len;
 
1184
 
 
1185
    assert(info->append_read_pos <= info->write_pos);
 
1186
    /*
 
1187
      TODO: figure out if the assert below is needed or correct.
 
1188
    */
 
1189
    assert(pos_in_file == info->end_of_file);
 
1190
    copy_len=cmin(Count, len_in_buff);
 
1191
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1192
    info->append_read_pos += copy_len;
 
1193
    Count -= copy_len;
 
1194
    if (Count)
 
1195
      info->error = save_count - Count;
 
1196
 
 
1197
    /* Fill read buffer with data from write buffer */
 
1198
    memcpy(info->buffer, info->append_read_pos,
 
1199
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1200
    info->read_pos= info->buffer;
 
1201
    info->read_end= info->buffer+transfer_len;
 
1202
    info->append_read_pos=info->write_pos;
 
1203
    info->pos_in_file=pos_in_file+copy_len;
 
1204
    info->end_of_file+=len_in_buff;
 
1205
  }
 
1206
  unlock_append_buffer(info);
 
1207
  return Count ? 1 : 0;
 
1208
}
 
1209
 
 
1210
 
541
1211
#ifdef HAVE_AIOWAIT
542
1212
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1213
/*
 
1214
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1215
  from disk when needed.
 
1216
 
 
1217
  SYNOPSIS
 
1218
    _my_b_async_read()
 
1219
      info                      IO_CACHE pointer
 
1220
      Buffer                    Buffer to retrieve count bytes from file
 
1221
      Count                     Number of bytes to read into Buffer
 
1222
 
 
1223
  RETURN VALUE
 
1224
    -1          An error has occurred; my_errno is set.
 
1225
     0          Success
 
1226
     1          An error has occurred; IO_CACHE to error state.
 
1227
*/
 
1228
 
 
1229
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1230
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1231
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1232
  size_t max_length;
559
1233
  my_off_t next_pos_in_file;
560
1234
  unsigned char *read_buffer;
575
1249
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1250
                 my_filename(info->file),
577
1251
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1252
      my_errno=info->aio_result.result.aio_errno;
579
1253
      info->error= -1;
580
1254
      return(1);
581
1255
    }
582
1256
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1257
        read_length == (size_t) -1)
584
1258
    {
585
 
      errno=0;                          /* For testing */
 
1259
      my_errno=0;                               /* For testing */
586
1260
      info->error= (read_length == (size_t) -1 ? -1 :
587
1261
                    (int) (read_length+left_length));
588
1262
      return(1);
615
1289
      }
616
1290
    }
617
1291
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1292
    length=cmin(Count,read_length);
 
1293
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1294
    Buffer+=length;
 
1295
    Count-=length;
 
1296
    left_length+=length;
623
1297
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1298
    info->read_pos+=length;
625
1299
  }
626
1300
  else
627
1301
    next_pos_in_file=(info->pos_in_file+ (size_t)
635
1309
      info->error=(int) (read_length+left_length);
636
1310
      return 1;
637
1311
    }
638
 
 
639
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1312
    
 
1313
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1314
        == MY_FILEPOS_ERROR)
640
1315
    {
641
1316
      info->error= -1;
642
1317
      return (1);
648
1323
      if ((read_length=my_read(info->file,info->request_pos,
649
1324
                               read_length, info->myflags)) == (size_t) -1)
650
1325
        return info->error= -1;
651
 
      use_length=min(Count,read_length);
 
1326
      use_length=cmin(Count,read_length);
652
1327
      memcpy(Buffer,info->request_pos,(size_t) use_length);
653
1328
      info->read_pos=info->request_pos+Count;
654
1329
      info->read_end=info->request_pos+read_length;
659
1334
      {                                 /* Didn't find hole block */
660
1335
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1336
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1337
                   my_filename(info->file),my_errno);
663
1338
        info->error=(int) (read_length+left_length);
664
1339
        return 1;
665
1340
      }
692
1367
  {
693
1368
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
694
1369
    if (aioread(info->file,read_buffer, max_length,
695
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1370
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
696
1371
                &info->aio_result.result))
697
1372
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1373
      my_errno=errno;
699
1374
      if (info->request_pos != info->buffer)
700
1375
      {
701
1376
        memmove(info->buffer, info->request_pos,
715
1390
#endif
716
1391
 
717
1392
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1393
/* Read one byte when buffer is empty */
 
1394
 
 
1395
int _my_b_get(IO_CACHE *info)
723
1396
{
724
1397
  unsigned char buff;
725
1398
  IO_CACHE_CALLBACK pre_read,post_read;
732
1405
  return (int) (unsigned char) buff;
733
1406
}
734
1407
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1408
/* 
 
1409
   Write a byte buffer to IO_CACHE and flush to disk
 
1410
   if IO_CACHE is full.
 
1411
 
 
1412
   RETURN VALUE
 
1413
    1 On error on write
 
1414
    0 On success
 
1415
   -1 On error; my_errno contains error code.
 
1416
*/
 
1417
 
 
1418
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1419
{
745
 
  size_t rest_length,length_local;
 
1420
  size_t rest_length,length;
746
1421
 
747
1422
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1423
  {
749
 
    errno=EFBIG;
 
1424
    my_errno=errno=EFBIG;
750
1425
    return info->error = -1;
751
1426
  }
752
1427
 
760
1435
    return 1;
761
1436
  if (Count >= IO_SIZE)
762
1437
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1438
    length=Count & (size_t) ~(IO_SIZE-1);
764
1439
    if (info->seek_not_done)
765
1440
    {
766
1441
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1442
        Whenever a function which operates on IO_CACHE flushes/writes
 
1443
        some part of the IO_CACHE to disk it will set the property
769
1444
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1445
        on the IO_CACHE.
771
1446
      */
772
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1447
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
773
1448
      {
774
1449
        info->error= -1;
775
1450
        return (1);
776
1451
      }
777
1452
      info->seek_not_done=0;
778
1453
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1454
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1455
      return info->error= -1;
 
1456
 
 
1457
    /*
 
1458
      In case of a shared I/O cache with a writer we normally do direct
 
1459
      write cache to read cache copy. Simulate this here by direct
 
1460
      caller buffer to read cache copy. Do it after the write so that
 
1461
      the cache readers actions on the flushed part can go in parallel
 
1462
      with the write of the extra stuff. copy_to_read_buffer()
 
1463
      synchronizes writer and readers so that after this call the
 
1464
      readers can act on the extra stuff while the writer can go ahead
 
1465
      and prepare the next output. copy_to_read_buffer() relies on
 
1466
      info->pos_in_file.
 
1467
    */
 
1468
    if (info->share)
 
1469
      copy_to_read_buffer(info, Buffer, length);
 
1470
 
 
1471
    Count-=length;
 
1472
    Buffer+=length;
 
1473
    info->pos_in_file+=length;
 
1474
  }
 
1475
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1476
  info->write_pos+=Count;
 
1477
  return 0;
 
1478
}
 
1479
 
 
1480
 
 
1481
/*
 
1482
  Append a block to the write buffer.
 
1483
  This is done with the buffer locked to ensure that we don't read from
 
1484
  the write buffer before we are ready with it.
 
1485
*/
 
1486
 
 
1487
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1488
{
 
1489
  size_t rest_length,length;
 
1490
 
 
1491
  /*
 
1492
    Assert that we cannot come here with a shared cache. If we do one
 
1493
    day, we might need to add a call to copy_to_read_buffer().
 
1494
  */
 
1495
  assert(!info->share);
 
1496
 
 
1497
  lock_append_buffer(info);
 
1498
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1499
  if (Count <= rest_length)
 
1500
    goto end;
 
1501
  memcpy(info->write_pos, Buffer, rest_length);
 
1502
  Buffer+=rest_length;
 
1503
  Count-=rest_length;
 
1504
  info->write_pos+=rest_length;
 
1505
  if (my_b_flush_io_cache(info,0))
 
1506
  {
 
1507
    unlock_append_buffer(info);
 
1508
    return 1;
 
1509
  }
 
1510
  if (Count >= IO_SIZE)
 
1511
  {                                     /* Fill first intern buffer */
 
1512
    length=Count & (size_t) ~(IO_SIZE-1);
 
1513
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1514
    {
 
1515
      unlock_append_buffer(info);
 
1516
      return info->error= -1;
 
1517
    }
 
1518
    Count-=length;
 
1519
    Buffer+=length;
 
1520
    info->end_of_file+=length;
 
1521
  }
 
1522
 
 
1523
end:
 
1524
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1525
  info->write_pos+=Count;
 
1526
  unlock_append_buffer(info);
 
1527
  return 0;
 
1528
}
 
1529
 
 
1530
 
 
1531
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1532
{
 
1533
  /*
 
1534
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1535
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1536
    several layers of ternary operators that evaluated comma(,) operator
 
1537
    expressions inside - I do have a test case if somebody wants it
 
1538
  */
 
1539
  if (info->type == SEQ_READ_APPEND)
 
1540
    return my_b_append(info, Buffer, Count);
 
1541
  return my_b_write(info, Buffer, Count);
 
1542
}
 
1543
 
 
1544
 
 
1545
/*
 
1546
  Write a block to disk where part of the data may be inside the record
 
1547
  buffer.  As all write calls to the data goes through the cache,
 
1548
  we will never get a seek over the end of the buffer
 
1549
*/
 
1550
 
 
1551
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1552
                   my_off_t pos)
799
1553
{
800
 
  size_t length_local;
 
1554
  size_t length;
801
1555
  int error=0;
802
1556
 
 
1557
  /*
 
1558
    Assert that we cannot come here with a shared cache. If we do one
 
1559
    day, we might need to add a call to copy_to_read_buffer().
 
1560
  */
 
1561
  assert(!info->share);
 
1562
 
803
1563
  if (pos < info->pos_in_file)
804
1564
  {
805
1565
    /* Of no overlap, write everything without buffering */
806
1566
    if (pos + Count <= info->pos_in_file)
807
1567
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1568
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1569
    length= (uint) (info->pos_in_file - pos);
 
1570
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1571
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1572
    Buffer+=length;
 
1573
    pos+=  length;
 
1574
    Count-= length;
 
1575
#ifndef HAVE_PREAD
 
1576
    info->seek_not_done=1;
 
1577
#endif
815
1578
  }
816
1579
 
817
1580
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1581
  length= (size_t) (info->write_end - info->buffer);
 
1582
  if (pos < info->pos_in_file + length)
820
1583
  {
821
1584
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1585
    length-=offset;
 
1586
    if (length > Count)
 
1587
      length=Count;
 
1588
    memcpy(info->buffer+offset, Buffer, length);
 
1589
    Buffer+=length;
 
1590
    Count-= length;
 
1591
    /* Fix length of buffer if the new data was larger */
 
1592
    if (info->buffer+length > info->write_pos)
 
1593
      info->write_pos=info->buffer+length;
831
1594
    if (!Count)
832
1595
      return (error);
833
1596
  }
837
1600
  return error;
838
1601
}
839
1602
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1603
 
 
1604
        /* Flush write cache */
 
1605
 
 
1606
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1607
  lock_append_buffer(info);
 
1608
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1609
  unlock_append_buffer(info);
 
1610
 
 
1611
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1612
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1613
  size_t length;
 
1614
  bool append_cache;
 
1615
  my_off_t pos_in_file;
 
1616
 
 
1617
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1618
    need_append_buffer_lock=0;
849
1619
 
850
1620
  if (info->type == WRITE_CACHE || append_cache)
851
1621
  {
852
1622
    if (info->file == -1)
853
1623
    {
854
 
      if (info->real_open_cached_file())
 
1624
      if (real_open_cached_file(info))
855
1625
        return((info->error= -1));
856
1626
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1627
    LOCK_APPEND_BUFFER;
858
1628
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1629
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1630
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1631
      /*
 
1632
        In case of a shared I/O cache with a writer we do direct write
 
1633
        cache to read cache copy. Do it before the write here so that
 
1634
        the readers can work in parallel with the write.
 
1635
        copy_to_read_buffer() relies on info->pos_in_file.
 
1636
      */
 
1637
      if (info->share)
 
1638
        copy_to_read_buffer(info, info->write_buffer, length);
 
1639
 
 
1640
      pos_in_file=info->pos_in_file;
862
1641
      /*
863
1642
        If we have append cache, we always open the file with
864
1643
        O_APPEND which moves the pos to EOF automatically on every write
865
1644
      */
866
1645
      if (!append_cache && info->seek_not_done)
867
1646
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1647
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1648
            MY_FILEPOS_ERROR)
869
1649
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1650
          UNLOCK_APPEND_BUFFER;
871
1651
          return((info->error= -1));
872
1652
        }
873
1653
        if (!append_cache)
874
1654
          info->seek_not_done=0;
875
1655
      }
876
1656
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1657
        info->pos_in_file+=length;
878
1658
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1659
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1660
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1661
      if (my_write(info->file,info->write_buffer,length,
882
1662
                   info->myflags | MY_NABP))
883
1663
        info->error= -1;
884
1664
      else
885
1665
        info->error= 0;
886
1666
      if (!append_cache)
887
1667
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1668
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1669
      }
890
1670
      else
891
1671
      {
892
1672
        info->end_of_file+=(info->write_pos-info->append_read_pos);
893
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
 
1673
        my_off_t tell_ret= my_tell(info->file);
894
1674
        assert(info->end_of_file == tell_ret);
895
1675
      }
896
1676
 
897
1677
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1678
      ++info->disk_writes;
 
1679
      UNLOCK_APPEND_BUFFER;
899
1680
      return(info->error);
900
1681
    }
901
1682
  }
906
1687
    info->inited=0;
907
1688
  }
908
1689
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1690
  UNLOCK_APPEND_BUFFER;
910
1691
  return(0);
911
1692
}
912
1693
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1694
/*
 
1695
  Free an IO_CACHE object
 
1696
 
 
1697
  SYNOPSOS
 
1698
    end_io_cache()
 
1699
    info                IO_CACHE Handle to free
 
1700
 
 
1701
  NOTES
 
1702
    It's currently safe to call this if one has called init_io_cache()
 
1703
    on the 'info' object, even if init_io_cache() failed.
 
1704
    This function is also safe to call twice with the same handle.
 
1705
 
 
1706
  RETURN
 
1707
   0  ok
 
1708
   #  Error
 
1709
*/
 
1710
 
 
1711
int end_io_cache(IO_CACHE *info)
928
1712
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1713
  int error=0;
 
1714
  IO_CACHE_CALLBACK pre_close;
 
1715
 
 
1716
  /*
 
1717
    Every thread must call remove_io_thread(). The last one destroys
 
1718
    the share elements.
 
1719
  */
 
1720
  assert(!info->share || !info->share->total_threads);
 
1721
 
 
1722
  if ((pre_close=info->pre_close))
 
1723
  {
 
1724
    (*pre_close)(info);
 
1725
    info->pre_close= 0;
 
1726
  }
 
1727
  if (info->alloced_buffer)
 
1728
  {
 
1729
    info->alloced_buffer=0;
 
1730
    if (info->file != -1)                       /* File doesn't exist */
 
1731
      error= my_b_flush_io_cache(info,1);
 
1732
    free((unsigned char*) info->buffer);
 
1733
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1734
  }
 
1735
  if (info->type == SEQ_READ_APPEND)
 
1736
  {
 
1737
    /* Destroy allocated mutex */
 
1738
    info->type= TYPE_NOT_SET;
 
1739
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1740
  }
 
1741
  return(error);
948
1742
} /* end_io_cache */
949
1743
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1744
 
 
1745
/**********************************************************************
 
1746
 Testing of MF_IOCACHE
 
1747
**********************************************************************/
 
1748
 
 
1749
#ifdef MAIN
 
1750
 
 
1751
#include <my_dir.h>
 
1752
 
 
1753
void die(const char* fmt, ...)
 
1754
{
 
1755
  va_list va_args;
 
1756
  va_start(va_args,fmt);
 
1757
  fprintf(stderr,"Error:");
 
1758
  vfprintf(stderr, fmt,va_args);
 
1759
  fprintf(stderr,", errno=%d\n", errno);
 
1760
  exit(1);
 
1761
}
 
1762
 
 
1763
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1764
{
 
1765
  int fd;
 
1766
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1767
    die("Could not open %s", fname);
 
1768
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1769
    die("failed in init_io_cache()");
 
1770
  return fd;
 
1771
}
 
1772
 
 
1773
void close_file(IO_CACHE* info)
 
1774
{
 
1775
  end_io_cache(info);
 
1776
  my_close(info->file, MYF(MY_WME));
 
1777
}
 
1778
 
 
1779
int main(int argc, char** argv)
 
1780
{
 
1781
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1782
  MY_STAT status;
 
1783
  const char* fname="/tmp/iocache.test";
 
1784
  int cache_size=16384;
 
1785
  char llstr_buf[22];
 
1786
  int max_block,total_bytes=0;
 
1787
  int i,num_loops=100,error=0;
 
1788
  char *p;
 
1789
  char* block, *block_end;
 
1790
  MY_INIT(argv[0]);
 
1791
  max_block = cache_size*3;
 
1792
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1793
    die("Not enough memory to allocate test block");
 
1794
  block_end = block + max_block;
 
1795
  for (p = block,i=0; p < block_end;i++)
 
1796
  {
 
1797
    *p++ = (char)i;
 
1798
  }
 
1799
  if (my_stat(fname,&status, MYF(0)) &&
 
1800
      my_delete(fname,MYF(MY_WME)))
 
1801
    {
 
1802
      die("Delete of %s failed, aborting", fname);
 
1803
    }
 
1804
  open_file(fname,&sra_cache, cache_size);
 
1805
  for (i = 0; i < num_loops; i++)
 
1806
  {
 
1807
    char buf[4];
 
1808
    int block_size = abs(rand() % max_block);
 
1809
    int4store(buf, block_size);
 
1810
    if (my_b_append(&sra_cache,buf,4) ||
 
1811
        my_b_append(&sra_cache, block, block_size))
 
1812
      die("write failed");
 
1813
    total_bytes += 4+block_size;
 
1814
  }
 
1815
  close_file(&sra_cache);
 
1816
  free(block);
 
1817
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1818
    die("%s failed to stat, but I had just closed it,\
 
1819
 wonder how that happened");
 
1820
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1821
         llstr(status.st_size,llstr_buf),
 
1822
         total_bytes);
 
1823
  my_delete(fname, MYF(MY_WME));
 
1824
  /* check correctness of tests */
 
1825
  if (total_bytes != status.st_size)
 
1826
  {
 
1827
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1828
supposedly written\n");
 
1829
    error=1;
 
1830
  }
 
1831
  exit(error);
 
1832
  return 0;
 
1833
}
 
1834
#endif