~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Monty Taylor
  • Date: 2009-01-29 19:04:39 UTC
  • mto: (779.3.29 devel)
  • mto: This revision was merged to the branch mainline in revision 823.
  • Revision ID: mordred@inaugust.com-20090129190439-vfr95s6gaudjacm7
Add timegm which is missing on Solaris.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
54
 
#include "drizzled/drizzled.h"
 
50
#include "mysys_priv.h"
 
51
#include <mystrings/m_string.h>
55
52
#ifdef HAVE_AIOWAIT
56
 
#include "drizzled/error.h"
57
 
#include "drizzled/internal/aio_result.h"
 
53
#include "mysys_err.h"
 
54
#include <mysys/aio_result.h>
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include "drizzled/internal/iocache.h"
 
57
#include <mysys/iocache.h>
61
58
#include <errno.h>
62
59
#include <drizzled/util/test.h>
63
60
#include <stdlib.h>
64
 
#include <algorithm>
65
 
 
66
 
using namespace std;
67
 
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(register st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(register st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
61
 
 
62
#define lock_append_buffer(info) \
 
63
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
64
#define unlock_append_buffer(info) \
 
65
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
66
 
 
67
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
68
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
69
 
 
70
/*
 
71
  Setup internal pointers inside IO_CACHE
 
72
 
 
73
  SYNOPSIS
 
74
    setup_io_cache()
 
75
    info                IO_CACHE handler
 
76
 
 
77
  NOTES
 
78
    This is called on automaticly on init or reinit of IO_CACHE
 
79
    It must be called externally if one moves or copies an IO_CACHE
 
80
    object.
 
81
*/
 
82
 
 
83
void setup_io_cache(IO_CACHE* info)
126
84
{
127
85
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
86
  if (info->type == WRITE_CACHE)
129
87
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
88
    info->current_pos= &info->write_pos;
 
89
    info->current_end= &info->write_end;
132
90
  }
133
91
  else
134
92
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
93
    info->current_pos= &info->read_pos;
 
94
    info->current_end= &info->read_end;
137
95
  }
138
96
}
139
97
 
140
98
 
141
 
void st_io_cache::init_functions()
 
99
static void
 
100
init_functions(IO_CACHE* info)
142
101
{
 
102
  enum cache_type type= info->type;
143
103
  switch (type) {
144
104
  case READ_NET:
145
105
    /*
150
110
      as myisamchk
151
111
    */
152
112
    break;
 
113
  case SEQ_READ_APPEND:
 
114
    info->read_function = _my_b_seq_read;
 
115
    info->write_function = 0;                   /* Force a core if used */
 
116
    break;
153
117
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
118
    info->read_function =
 
119
                          info->share ? _my_b_read_r :
 
120
                                        _my_b_read;
 
121
    info->write_function = _my_b_write;
156
122
  }
157
123
 
158
 
  setup_io_cache();
 
124
  setup_io_cache(info);
159
125
}
160
126
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
127
 
 
128
/*
 
129
  Initialize an IO_CACHE object
 
130
 
 
131
  SYNOPSOS
 
132
    init_io_cache()
 
133
    info                cache handler to initialize
 
134
    file                File that should be associated to to the handler
 
135
                        If == -1 then real_open_cached_file()
 
136
                        will be called when it's time to open file.
 
137
    cachesize           Size of buffer to allocate for read/write
 
138
                        If == 0 then use my_default_record_cache_size
 
139
    type                Type of cache
 
140
    seek_offset         Where cache should start reading/writing
 
141
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
142
    cache_myflags       Bitmap of differnt flags
 
143
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
144
                        MY_DONT_CHECK_FILESIZE
 
145
 
 
146
  RETURN
 
147
    0  ok
 
148
    #  error
 
149
*/
 
150
 
 
151
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
152
                  enum cache_type type, my_off_t seek_offset,
 
153
                  bool use_async_io, myf cache_myflags)
181
154
{
182
155
  size_t min_cache;
183
156
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
157
  my_off_t end_of_file= ~(my_off_t) 0;
185
158
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
159
  info->file= file;
 
160
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
161
  info->pos_in_file= seek_offset;
 
162
  info->pre_close = info->pre_read = info->post_read = 0;
 
163
  info->arg = 0;
 
164
  info->alloced_buffer = 0;
 
165
  info->buffer=0;
 
166
  info->seek_not_done= 0;
194
167
 
195
168
  if (file >= 0)
196
169
  {
197
170
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
171
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
199
172
    {
200
173
      /*
201
174
         This kind of object doesn't support seek() or tell(). Don't set a
202
175
         flag that will make us again try to seek() later and fail.
203
176
      */
204
 
      seek_not_done= 0;
 
177
      info->seek_not_done= 0;
205
178
      /*
206
179
        Additionally, if we're supposed to start somewhere other than the
207
180
        the beginning of whatever this file is, then somebody made a bad
210
183
      assert(seek_offset == 0);
211
184
    }
212
185
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
186
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
214
187
  }
215
188
 
 
189
  info->disk_writes= 0;
 
190
  info->share=0;
 
191
 
216
192
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
193
    return(1);                          /* No cache requested */
218
194
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
195
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
196
  {                                             /* Assume file isn't growing */
221
197
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
198
    {
223
199
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
200
      end_of_file=lseek(file,0L,SEEK_END);
225
201
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
202
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
203
      if (end_of_file < seek_offset)
 
204
        end_of_file=seek_offset;
229
205
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
206
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
207
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
208
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
209
        use_async_io=0;                         /* No need to use async */
234
210
      }
235
211
    }
236
212
  }
237
213
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
214
  if (type != READ_NET && type != WRITE_NET)
239
215
  {
240
216
    /* Retry allocating memory in smaller blocks until we get one */
241
217
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
221
      if (cachesize < min_cache)
246
222
        cachesize = min_cache;
247
223
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
 
224
      if (type == SEQ_READ_APPEND)
 
225
        buffer_block *= 2;
 
226
      if ((info->buffer=
255
227
           (unsigned char*) malloc(buffer_block)) != 0)
256
228
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
229
        info->write_buffer=info->buffer;
 
230
        if (type == SEQ_READ_APPEND)
 
231
          info->write_buffer = info->buffer + cachesize;
 
232
        info->alloced_buffer=1;
259
233
        break;                                  /* Enough memory found */
260
234
      }
261
235
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
236
        return(2);                              /* Can't alloc cache */
267
237
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
238
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
239
    }
272
240
  }
273
241
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
242
  info->read_length=info->buffer_length=cachesize;
 
243
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
244
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
245
  if (type == SEQ_READ_APPEND)
 
246
  {
 
247
    info->append_read_pos = info->write_pos = info->write_buffer;
 
248
    info->write_end = info->write_buffer + info->buffer_length;
 
249
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
250
  }
 
251
#if defined(SAFE_MUTEX)
 
252
  else
 
253
  {
 
254
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
255
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
256
  }
 
257
#endif
277
258
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
259
  if (type == WRITE_CACHE)
 
260
    info->write_end=
 
261
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
262
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
263
    info->read_end=info->buffer;                /* Nothing in cache */
283
264
 
284
265
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
266
  info->end_of_file= end_of_file;
 
267
  info->error=0;
 
268
  info->type= type;
 
269
  init_functions(info);
289
270
#ifdef HAVE_AIOWAIT
290
271
  if (use_async_io && ! my_disable_async_io)
291
272
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
273
    info->read_length/=2;
 
274
    info->read_function=_my_b_async_read;
294
275
  }
295
 
  inited= aio_result.pending= 0;
 
276
  info->inited=info->aio_result.pending=0;
296
277
#endif
297
 
  return 0;
 
278
  return(0);
298
279
}                                               /* init_io_cache */
299
280
 
300
281
        /* Wait until current request is ready */
319
300
        break;
320
301
    }
321
302
  }
 
303
  return;
322
304
}
323
305
#endif
324
306
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
307
 
 
308
/*
 
309
  Use this to reset cache to re-start reading or to change the type
 
310
  between READ_CACHE <-> WRITE_CACHE
 
311
  If we are doing a reinit of a cache where we have the start of the file
 
312
  in the cache, we are reusing this memory without flushing it to disk.
 
313
*/
 
314
 
 
315
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
316
                        my_off_t seek_offset,
 
317
                        bool use_async_io,
 
318
                        bool clear_cache)
339
319
{
340
320
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
321
  assert(type != READ_NET && info->type != READ_NET &&
 
322
              type != WRITE_NET && info->type != WRITE_NET &&
 
323
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
324
 
344
325
  /* If the whole file is in memory, avoid flushing to disk */
345
326
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
327
      seek_offset >= info->pos_in_file &&
 
328
      seek_offset <= my_b_tell(info))
348
329
  {
349
330
    /* Reuse current buffer without flushing it to disk */
350
331
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
332
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
333
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
334
      info->read_end=info->write_pos;
 
335
      info->end_of_file=my_b_tell(info);
355
336
      /*
356
337
        Trigger a new seek only if we have a valid
357
338
        file handle.
358
339
      */
359
 
      seek_not_done= (file != -1);
 
340
      info->seek_not_done= (info->file != -1);
360
341
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
342
    else if (type == WRITE_CACHE)
362
343
    {
363
 
      if (type == READ_CACHE)
 
344
      if (info->type == READ_CACHE)
364
345
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
346
        info->write_end=info->write_buffer+info->buffer_length;
 
347
        info->seek_not_done=1;
367
348
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
349
      info->end_of_file = ~(my_off_t) 0;
369
350
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
351
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
352
    if (type == WRITE_CACHE)
 
353
      info->write_pos=pos;
373
354
    else
374
 
      read_pos= pos;
 
355
      info->read_pos= pos;
375
356
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
357
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
358
#endif
378
359
  }
379
360
  else
382
363
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
364
      after the current positions should be ignored
384
365
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
366
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
367
      info->end_of_file=my_b_tell(info);
387
368
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
369
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
370
      return(1);
 
371
    info->pos_in_file=seek_offset;
391
372
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
373
    info->seek_not_done=1;
 
374
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
375
    if (type == READ_CACHE)
395
376
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
377
      info->read_end=info->buffer;              /* Nothing in cache */
397
378
    }
398
379
    else
399
380
    {
400
 
      write_end=(buffer + buffer_length -
 
381
      info->write_end=(info->buffer + info->buffer_length -
401
382
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
383
      info->end_of_file= ~(my_off_t) 0;
403
384
    }
404
385
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
386
  info->type=type;
 
387
  info->error=0;
 
388
  init_functions(info);
408
389
 
409
390
#ifdef HAVE_AIOWAIT
410
391
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
392
      ((uint32_t) info->buffer_length <
 
393
       (uint32_t) (info->end_of_file - seek_offset)))
413
394
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
395
    info->read_length=info->buffer_length/2;
 
396
    info->read_function=_my_b_async_read;
416
397
  }
417
 
  inited= 0;
 
398
  info->inited=0;
418
399
#else
419
400
  (void)use_async_io;
420
401
#endif
421
 
  return 0;
 
402
  return(0);
422
403
} /* reinit_io_cache */
423
404
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
405
 
 
406
 
 
407
/*
 
408
  Read buffered.
 
409
 
 
410
  SYNOPSIS
 
411
    _my_b_read()
 
412
      info                      IO_CACHE pointer
 
413
      Buffer                    Buffer to retrieve count bytes from file
 
414
      Count                     Number of bytes to read into Buffer
 
415
 
 
416
  NOTE
 
417
    This function is only called from the my_b_read() macro when there
 
418
    isn't enough characters in the buffer to satisfy the request.
 
419
 
 
420
  WARNING
 
421
 
 
422
    When changing this function, be careful with handling file offsets
 
423
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
424
    types than my_off_t unless you can be sure that their value fits.
 
425
    Same applies to differences of file offsets.
 
426
 
 
427
    When changing this function, check _my_b_read_r(). It might need the
 
428
    same change.
 
429
 
 
430
  RETURN
 
431
    0      we succeeded in reading all data
 
432
    1      Error: can't read requested characters
 
433
*/
 
434
 
 
435
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
436
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
437
  size_t length,diff_length,left_length, max_length;
 
438
  my_off_t pos_in_file;
448
439
 
449
440
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
441
  {
455
446
  }
456
447
 
457
448
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
449
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
450
 
460
451
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
452
    Whenever a function which operates on IO_CACHE flushes/writes
 
453
    some part of the IO_CACHE to disk it will set the property
463
454
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
455
    on the IO_CACHE.
465
456
  */
466
457
  if (info->seek_not_done)
467
458
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
459
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
469
460
    {
470
461
      /* No error, reset seek_not_done flag. */
471
462
      info->seek_not_done= 0;
477
468
        info->file is a pipe or socket or FIFO.  We never should have tried
478
469
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
470
      */
480
 
      assert(errno != ESPIPE);
 
471
      assert(my_errno != ESPIPE);
481
472
      info->error= -1;
482
473
      return(1);
483
474
    }
484
475
  }
485
476
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
477
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
478
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
479
  {                                     /* Fill first intern buffer */
489
480
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
481
    if (info->end_of_file <= pos_in_file)
491
482
    {                                   /* End of file */
492
483
      info->error= (int) left_length;
493
484
      return(1);
494
485
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
486
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
487
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
488
        != length)
497
489
    {
498
490
      info->error= (read_length == (size_t) -1 ? -1 :
499
491
                    (int) (read_length+left_length));
500
492
      return(1);
501
493
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
494
    Count-=length;
 
495
    Buffer+=length;
 
496
    pos_in_file+=length;
 
497
    left_length+=length;
506
498
    diff_length=0;
507
499
  }
508
500
 
509
501
  max_length= info->read_length-diff_length;
510
502
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
503
      max_length > (info->end_of_file - pos_in_file))
 
504
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
505
  if (!max_length)
514
506
  {
515
507
    if (Count)
516
508
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
509
      info->error= left_length;         /* We only got this many char */
518
510
      return(1);
519
511
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
512
    length=0;                           /* Didn't read any chars */
521
513
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
514
  else if ((length= my_read(info->file,info->buffer, max_length,
523
515
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
516
           length == (size_t) -1)
525
517
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
518
    if (length != (size_t) -1)
 
519
      memcpy(Buffer, info->buffer, length);
 
520
    info->pos_in_file= pos_in_file;
 
521
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
522
    info->read_pos=info->read_end=info->buffer;
531
523
    return(1);
532
524
  }
533
525
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
526
  info->read_end=info->buffer+length;
 
527
  info->pos_in_file=pos_in_file;
536
528
  memcpy(Buffer, info->buffer, Count);
537
529
  return(0);
538
530
}
539
531
 
540
532
 
 
533
/*
 
534
  Prepare IO_CACHE for shared use.
 
535
 
 
536
  SYNOPSIS
 
537
    init_io_cache_share()
 
538
      read_cache                A read cache. This will be copied for
 
539
                                every thread after setup.
 
540
      cshare                    The share.
 
541
      write_cache               If non-NULL a write cache that is to be
 
542
                                synchronized with the read caches.
 
543
      num_threads               Number of threads sharing the cache
 
544
                                including the write thread if any.
 
545
 
 
546
  DESCRIPTION
 
547
 
 
548
    The shared cache is used so: One IO_CACHE is initialized with
 
549
    init_io_cache(). This includes the allocation of a buffer. Then a
 
550
    share is allocated and init_io_cache_share() is called with the io
 
551
    cache and the share. Then the io cache is copied for each thread. So
 
552
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
553
    is shared because cache->buffer is the same for all caches.
 
554
 
 
555
    One thread reads data from the file into the buffer. All threads
 
556
    read from the buffer, but every thread maintains its own set of
 
557
    pointers into the buffer. When all threads have used up the buffer
 
558
    contents, one of the threads reads the next block of data into the
 
559
    buffer. To accomplish this, each thread enters the cache lock before
 
560
    accessing the buffer. They wait in lock_io_cache() until all threads
 
561
    joined the lock. The last thread entering the lock is in charge of
 
562
    reading from file to buffer. It wakes all threads when done.
 
563
 
 
564
    Synchronizing a write cache to the read caches works so: Whenever
 
565
    the write buffer needs a flush, the write thread enters the lock and
 
566
    waits for all other threads to enter the lock too. They do this when
 
567
    they have used up the read buffer. When all threads are in the lock,
 
568
    the write thread copies the write buffer to the read buffer and
 
569
    wakes all threads.
 
570
 
 
571
    share->running_threads is the number of threads not being in the
 
572
    cache lock. When entering lock_io_cache() the number is decreased.
 
573
    When the thread that fills the buffer enters unlock_io_cache() the
 
574
    number is reset to the number of threads. The condition
 
575
    running_threads == 0 means that all threads are in the lock. Bumping
 
576
    up the number to the full count is non-intuitive. But increasing the
 
577
    number by one for each thread that leaves the lock could lead to a
 
578
    solo run of one thread. The last thread to join a lock reads from
 
579
    file to buffer, wakes the other threads, processes the data in the
 
580
    cache and enters the lock again. If no other thread left the lock
 
581
    meanwhile, it would think it's the last one again and read the next
 
582
    block...
 
583
 
 
584
    The share has copies of 'error', 'buffer', 'read_end', and
 
585
    'pos_in_file' from the thread that filled the buffer. We may not be
 
586
    able to access this information directly from its cache because the
 
587
    thread may be removed from the share before the variables could be
 
588
    copied by all other threads. Or, if a write buffer is synchronized,
 
589
    it would change its 'pos_in_file' after waking the other threads,
 
590
    possibly before they could copy its value.
 
591
 
 
592
    However, the 'buffer' variable in the share is for a synchronized
 
593
    write cache. It needs to know where to put the data. Otherwise it
 
594
    would need access to the read cache of one of the threads that is
 
595
    not yet removed from the share.
 
596
 
 
597
  RETURN
 
598
    void
 
599
*/
 
600
 
 
601
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
602
                         IO_CACHE *write_cache, uint32_t num_threads)
 
603
{
 
604
  assert(num_threads > 1);
 
605
  assert(read_cache->type == READ_CACHE);
 
606
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
607
 
 
608
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
609
  pthread_cond_init(&cshare->cond, 0);
 
610
  pthread_cond_init(&cshare->cond_writer, 0);
 
611
 
 
612
  cshare->running_threads= num_threads;
 
613
  cshare->total_threads=   num_threads;
 
614
  cshare->error=           0;    /* Initialize. */
 
615
  cshare->buffer=          read_cache->buffer;
 
616
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
617
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
618
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
619
 
 
620
  read_cache->share=         cshare;
 
621
  read_cache->read_function= _my_b_read_r;
 
622
  read_cache->current_pos=   NULL;
 
623
  read_cache->current_end=   NULL;
 
624
 
 
625
  if (write_cache)
 
626
    write_cache->share= cshare;
 
627
 
 
628
  return;
 
629
}
 
630
 
 
631
 
 
632
/*
 
633
  Remove a thread from shared access to IO_CACHE.
 
634
 
 
635
  SYNOPSIS
 
636
    remove_io_thread()
 
637
      cache                     The IO_CACHE to be removed from the share.
 
638
 
 
639
  NOTE
 
640
 
 
641
    Every thread must do that on exit for not to deadlock other threads.
 
642
 
 
643
    The last thread destroys the pthread resources.
 
644
 
 
645
    A writer flushes its cache first.
 
646
 
 
647
  RETURN
 
648
    void
 
649
*/
 
650
 
 
651
void remove_io_thread(IO_CACHE *cache)
 
652
{
 
653
  IO_CACHE_SHARE *cshare= cache->share;
 
654
  uint32_t total;
 
655
 
 
656
  /* If the writer goes, it needs to flush the write cache. */
 
657
  if (cache == cshare->source_cache)
 
658
    flush_io_cache(cache);
 
659
 
 
660
  pthread_mutex_lock(&cshare->mutex);
 
661
 
 
662
  /* Remove from share. */
 
663
  total= --cshare->total_threads;
 
664
 
 
665
  /* Detach from share. */
 
666
  cache->share= NULL;
 
667
 
 
668
  /* If the writer goes, let the readers know. */
 
669
  if (cache == cshare->source_cache)
 
670
  {
 
671
    cshare->source_cache= NULL;
 
672
  }
 
673
 
 
674
  /* If all threads are waiting for me to join the lock, wake them. */
 
675
  if (!--cshare->running_threads)
 
676
  {
 
677
    pthread_cond_signal(&cshare->cond_writer);
 
678
    pthread_cond_broadcast(&cshare->cond);
 
679
  }
 
680
 
 
681
  pthread_mutex_unlock(&cshare->mutex);
 
682
 
 
683
  if (!total)
 
684
  {
 
685
    pthread_cond_destroy (&cshare->cond_writer);
 
686
    pthread_cond_destroy (&cshare->cond);
 
687
    pthread_mutex_destroy(&cshare->mutex);
 
688
  }
 
689
 
 
690
  return;
 
691
}
 
692
 
 
693
 
 
694
/*
 
695
  Lock IO cache and wait for all other threads to join.
 
696
 
 
697
  SYNOPSIS
 
698
    lock_io_cache()
 
699
      cache                     The cache of the thread entering the lock.
 
700
      pos                       File position of the block to read.
 
701
                                Unused for the write thread.
 
702
 
 
703
  DESCRIPTION
 
704
 
 
705
    Wait for all threads to finish with the current buffer. We want
 
706
    all threads to proceed in concert. The last thread to join
 
707
    lock_io_cache() will read the block from file and all threads start
 
708
    to use it. Then they will join again for reading the next block.
 
709
 
 
710
    The waiting threads detect a fresh buffer by comparing
 
711
    cshare->pos_in_file with the position they want to process next.
 
712
    Since the first block may start at position 0, we take
 
713
    cshare->read_end as an additional condition. This variable is
 
714
    initialized to NULL and will be set after a block of data is written
 
715
    to the buffer.
 
716
 
 
717
  RETURN
 
718
    1           OK, lock in place, go ahead and read.
 
719
    0           OK, unlocked, another thread did the read.
 
720
*/
 
721
 
 
722
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
723
{
 
724
  IO_CACHE_SHARE *cshare= cache->share;
 
725
 
 
726
  /* Enter the lock. */
 
727
  pthread_mutex_lock(&cshare->mutex);
 
728
  cshare->running_threads--;
 
729
 
 
730
  if (cshare->source_cache)
 
731
  {
 
732
    /* A write cache is synchronized to the read caches. */
 
733
 
 
734
    if (cache == cshare->source_cache)
 
735
    {
 
736
      /* The writer waits until all readers are here. */
 
737
      while (cshare->running_threads)
 
738
      {
 
739
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
740
      }
 
741
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
742
      return(1);
 
743
    }
 
744
 
 
745
    /* The last thread wakes the writer. */
 
746
    if (!cshare->running_threads)
 
747
    {
 
748
      pthread_cond_signal(&cshare->cond_writer);
 
749
    }
 
750
 
 
751
    /*
 
752
      Readers wait until the data is copied from the writer. Another
 
753
      reason to stop waiting is the removal of the write thread. If this
 
754
      happens, we leave the lock with old data in the buffer.
 
755
    */
 
756
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
757
           cshare->source_cache)
 
758
    {
 
759
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
760
    }
 
761
 
 
762
    /*
 
763
      If the writer was removed from the share while this thread was
 
764
      asleep, we need to simulate an EOF condition. The writer cannot
 
765
      reset the share variables as they might still be in use by readers
 
766
      of the last block. When we awake here then because the last
 
767
      joining thread signalled us. If the writer is not the last, it
 
768
      will not signal. So it is safe to clear the buffer here.
 
769
    */
 
770
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
771
    {
 
772
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
773
      cshare->error= 0; /* EOF is not an error. */
 
774
    }
 
775
  }
 
776
  else
 
777
  {
 
778
    /*
 
779
      There are read caches only. The last thread arriving in
 
780
      lock_io_cache() continues with a locked cache and reads the block.
 
781
    */
 
782
    if (!cshare->running_threads)
 
783
    {
 
784
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
785
      return(1);
 
786
    }
 
787
 
 
788
    /*
 
789
      All other threads wait until the requested block is read by the
 
790
      last thread arriving. Another reason to stop waiting is the
 
791
      removal of a thread. If this leads to all threads being in the
 
792
      lock, we have to continue also. The first of the awaken threads
 
793
      will then do the read.
 
794
    */
 
795
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
796
           cshare->running_threads)
 
797
    {
 
798
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
799
    }
 
800
 
 
801
    /* If the block is not yet read, continue with a locked cache and read. */
 
802
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
803
    {
 
804
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
805
      return(1);
 
806
    }
 
807
 
 
808
    /* Another thread did read the block already. */
 
809
  }
 
810
 
 
811
  /*
 
812
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
813
    filled the buffer did this and marked all threads as running.
 
814
  */
 
815
  pthread_mutex_unlock(&cshare->mutex);
 
816
  return(0);
 
817
}
 
818
 
 
819
 
 
820
/*
 
821
  Unlock IO cache.
 
822
 
 
823
  SYNOPSIS
 
824
    unlock_io_cache()
 
825
      cache                     The cache of the thread leaving the lock.
 
826
 
 
827
  NOTE
 
828
    This is called by the thread that filled the buffer. It marks all
 
829
    threads as running and awakes them. This must not be done by any
 
830
    other thread.
 
831
 
 
832
    Do not signal cond_writer. Either there is no writer or the writer
 
833
    is the only one who can call this function.
 
834
 
 
835
    The reason for resetting running_threads to total_threads before
 
836
    waking all other threads is that it could be possible that this
 
837
    thread is so fast with processing the buffer that it enters the lock
 
838
    before even one other thread has left it. If every awoken thread
 
839
    would increase running_threads by one, this thread could think that
 
840
    he is again the last to join and would not wait for the other
 
841
    threads to process the data.
 
842
 
 
843
  RETURN
 
844
    void
 
845
*/
 
846
 
 
847
static void unlock_io_cache(IO_CACHE *cache)
 
848
{
 
849
  IO_CACHE_SHARE *cshare= cache->share;
 
850
 
 
851
  cshare->running_threads= cshare->total_threads;
 
852
  pthread_cond_broadcast(&cshare->cond);
 
853
  pthread_mutex_unlock(&cshare->mutex);
 
854
  return;
 
855
}
 
856
 
 
857
 
 
858
/*
 
859
  Read from IO_CACHE when it is shared between several threads.
 
860
 
 
861
  SYNOPSIS
 
862
    _my_b_read_r()
 
863
      cache                     IO_CACHE pointer
 
864
      Buffer                    Buffer to retrieve count bytes from file
 
865
      Count                     Number of bytes to read into Buffer
 
866
 
 
867
  NOTE
 
868
    This function is only called from the my_b_read() macro when there
 
869
    isn't enough characters in the buffer to satisfy the request.
 
870
 
 
871
  IMPLEMENTATION
 
872
 
 
873
    It works as follows: when a thread tries to read from a file (that
 
874
    is, after using all the data from the (shared) buffer), it just
 
875
    hangs on lock_io_cache(), waiting for other threads. When the very
 
876
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
877
    does actual IO and unlock_io_cache(), which signals all the waiting
 
878
    threads that data is in the buffer.
 
879
 
 
880
  WARNING
 
881
 
 
882
    When changing this function, be careful with handling file offsets
 
883
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
884
    types than my_off_t unless you can be sure that their value fits.
 
885
    Same applies to differences of file offsets. (Bug #11527)
 
886
 
 
887
    When changing this function, check _my_b_read(). It might need the
 
888
    same change.
 
889
 
 
890
  RETURN
 
891
    0      we succeeded in reading all data
 
892
    1      Error: can't read requested characters
 
893
*/
 
894
 
 
895
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
896
{
 
897
  my_off_t pos_in_file;
 
898
  size_t length, diff_length, left_length;
 
899
  IO_CACHE_SHARE *cshare= cache->share;
 
900
 
 
901
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
902
  {
 
903
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
904
    memcpy(Buffer, cache->read_pos, left_length);
 
905
    Buffer+= left_length;
 
906
    Count-= left_length;
 
907
  }
 
908
  while (Count)
 
909
  {
 
910
    size_t cnt, len;
 
911
 
 
912
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
913
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
914
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
915
    length= ((length <= cache->read_length) ?
 
916
             length + IO_ROUND_DN(cache->read_length - length) :
 
917
             length - IO_ROUND_UP(length - cache->read_length));
 
918
    if (cache->type != READ_FIFO &&
 
919
        (length > (cache->end_of_file - pos_in_file)))
 
920
      length= (size_t) (cache->end_of_file - pos_in_file);
 
921
    if (length == 0)
 
922
    {
 
923
      cache->error= (int) left_length;
 
924
      return(1);
 
925
    }
 
926
    if (lock_io_cache(cache, pos_in_file))
 
927
    {
 
928
      /* With a synchronized write/read cache we won't come here... */
 
929
      assert(!cshare->source_cache);
 
930
      /*
 
931
        ... unless the writer has gone before this thread entered the
 
932
        lock. Simulate EOF in this case. It can be distinguished by
 
933
        cache->file.
 
934
      */
 
935
      if (cache->file < 0)
 
936
        len= 0;
 
937
      else
 
938
      {
 
939
        /*
 
940
          Whenever a function which operates on IO_CACHE flushes/writes
 
941
          some part of the IO_CACHE to disk it will set the property
 
942
          "seek_not_done" to indicate this to other functions operating
 
943
          on the IO_CACHE.
 
944
        */
 
945
        if (cache->seek_not_done)
 
946
        {
 
947
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
948
          {
 
949
            cache->error= -1;
 
950
            unlock_io_cache(cache);
 
951
            return(1);
 
952
          }
 
953
        }
 
954
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
955
      }
 
956
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
957
      cache->error=       (len == length ? 0 : (int) len);
 
958
      cache->pos_in_file= pos_in_file;
 
959
 
 
960
      /* Copy important values to the share. */
 
961
      cshare->error=       cache->error;
 
962
      cshare->read_end=    cache->read_end;
 
963
      cshare->pos_in_file= pos_in_file;
 
964
 
 
965
      /* Mark all threads as running and wake them. */
 
966
      unlock_io_cache(cache);
 
967
    }
 
968
    else
 
969
    {
 
970
      /*
 
971
        With a synchronized write/read cache readers always come here.
 
972
        Copy important values from the share.
 
973
      */
 
974
      cache->error=       cshare->error;
 
975
      cache->read_end=    cshare->read_end;
 
976
      cache->pos_in_file= cshare->pos_in_file;
 
977
 
 
978
      len= ((cache->error == -1) ? (size_t) -1 :
 
979
            (size_t) (cache->read_end - cache->buffer));
 
980
    }
 
981
    cache->read_pos=      cache->buffer;
 
982
    cache->seek_not_done= 0;
 
983
    if (len == 0 || len == (size_t) -1)
 
984
    {
 
985
      cache->error= (int) left_length;
 
986
      return(1);
 
987
    }
 
988
    cnt= (len > Count) ? Count : len;
 
989
    memcpy(Buffer, cache->read_pos, cnt);
 
990
    Count -= cnt;
 
991
    Buffer+= cnt;
 
992
    left_length+= cnt;
 
993
    cache->read_pos+= cnt;
 
994
  }
 
995
  return(0);
 
996
}
 
997
 
 
998
 
 
999
/*
 
1000
  Copy data from write cache to read cache.
 
1001
 
 
1002
  SYNOPSIS
 
1003
    copy_to_read_buffer()
 
1004
      write_cache               The write cache.
 
1005
      write_buffer              The source of data, mostly the cache buffer.
 
1006
      write_length              The number of bytes to copy.
 
1007
 
 
1008
  NOTE
 
1009
    The write thread will wait for all read threads to join the cache
 
1010
    lock. Then it copies the data over and wakes the read threads.
 
1011
 
 
1012
  RETURN
 
1013
    void
 
1014
*/
 
1015
 
 
1016
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1017
                                const unsigned char *write_buffer, size_t write_length)
 
1018
{
 
1019
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1020
 
 
1021
  assert(cshare->source_cache == write_cache);
 
1022
  /*
 
1023
    write_length is usually less or equal to buffer_length.
 
1024
    It can be bigger if _my_b_write() is called with a big length.
 
1025
  */
 
1026
  while (write_length)
 
1027
  {
 
1028
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1029
    int  rc;
 
1030
 
 
1031
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1032
    /* The writing thread does always have the lock when it awakes. */
 
1033
    assert(rc);
 
1034
 
 
1035
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1036
 
 
1037
    cshare->error=       0;
 
1038
    cshare->read_end=    cshare->buffer + copy_length;
 
1039
    cshare->pos_in_file= write_cache->pos_in_file;
 
1040
 
 
1041
    /* Mark all threads as running and wake them. */
 
1042
    unlock_io_cache(write_cache);
 
1043
 
 
1044
    write_buffer+= copy_length;
 
1045
    write_length-= copy_length;
 
1046
  }
 
1047
}
 
1048
 
 
1049
 
 
1050
/*
 
1051
  Do sequential read from the SEQ_READ_APPEND cache.
 
1052
 
 
1053
  We do this in three stages:
 
1054
   - first read from info->buffer
 
1055
   - then if there are still data to read, try the file descriptor
 
1056
   - afterwards, if there are still data to read, try append buffer
 
1057
 
 
1058
  RETURNS
 
1059
    0  Success
 
1060
    1  Failed to read
 
1061
*/
 
1062
 
 
1063
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1064
{
 
1065
  size_t length, diff_length, left_length, save_count, max_length;
 
1066
  my_off_t pos_in_file;
 
1067
  save_count=Count;
 
1068
 
 
1069
  /* first, read the regular buffer */
 
1070
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1071
  {
 
1072
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1073
    memcpy(Buffer,info->read_pos, left_length);
 
1074
    Buffer+=left_length;
 
1075
    Count-=left_length;
 
1076
  }
 
1077
  lock_append_buffer(info);
 
1078
 
 
1079
  /* pos_in_file always point on where info->buffer was read */
 
1080
  if ((pos_in_file=info->pos_in_file +
 
1081
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1082
    goto read_append_buffer;
 
1083
 
 
1084
  /*
 
1085
    With read-append cache we must always do a seek before we read,
 
1086
    because the write could have moved the file pointer astray
 
1087
  */
 
1088
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1089
  {
 
1090
   info->error= -1;
 
1091
   unlock_append_buffer(info);
 
1092
   return (1);
 
1093
  }
 
1094
  info->seek_not_done=0;
 
1095
 
 
1096
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1097
 
 
1098
  /* now the second stage begins - read from file descriptor */
 
1099
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1100
  {
 
1101
    /* Fill first intern buffer */
 
1102
    size_t read_length;
 
1103
 
 
1104
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1105
    if ((read_length= my_read(info->file,Buffer, length,
 
1106
                              info->myflags)) == (size_t) -1)
 
1107
    {
 
1108
      info->error= -1;
 
1109
      unlock_append_buffer(info);
 
1110
      return 1;
 
1111
    }
 
1112
    Count-=read_length;
 
1113
    Buffer+=read_length;
 
1114
    pos_in_file+=read_length;
 
1115
 
 
1116
    if (read_length != length)
 
1117
    {
 
1118
      /*
 
1119
        We only got part of data;  Read the rest of the data from the
 
1120
        write buffer
 
1121
      */
 
1122
      goto read_append_buffer;
 
1123
    }
 
1124
    left_length+=length;
 
1125
    diff_length=0;
 
1126
  }
 
1127
 
 
1128
  max_length= info->read_length-diff_length;
 
1129
  if (max_length > (info->end_of_file - pos_in_file))
 
1130
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1131
  if (!max_length)
 
1132
  {
 
1133
    if (Count)
 
1134
      goto read_append_buffer;
 
1135
    length=0;                           /* Didn't read any more chars */
 
1136
  }
 
1137
  else
 
1138
  {
 
1139
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1140
    if (length == (size_t) -1)
 
1141
    {
 
1142
      info->error= -1;
 
1143
      unlock_append_buffer(info);
 
1144
      return 1;
 
1145
    }
 
1146
    if (length < Count)
 
1147
    {
 
1148
      memcpy(Buffer, info->buffer, length);
 
1149
      Count -= length;
 
1150
      Buffer += length;
 
1151
 
 
1152
      /*
 
1153
         added the line below to make
 
1154
         assert(pos_in_file==info->end_of_file) pass.
 
1155
         otherwise this does not appear to be needed
 
1156
      */
 
1157
      pos_in_file += length;
 
1158
      goto read_append_buffer;
 
1159
    }
 
1160
  }
 
1161
  unlock_append_buffer(info);
 
1162
  info->read_pos=info->buffer+Count;
 
1163
  info->read_end=info->buffer+length;
 
1164
  info->pos_in_file=pos_in_file;
 
1165
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1166
  return 0;
 
1167
 
 
1168
read_append_buffer:
 
1169
 
 
1170
  /*
 
1171
     Read data from the current write buffer.
 
1172
     Count should never be == 0 here (The code will work even if count is 0)
 
1173
  */
 
1174
 
 
1175
  {
 
1176
    /* First copy the data to Count */
 
1177
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1178
    size_t copy_len;
 
1179
    size_t transfer_len;
 
1180
 
 
1181
    assert(info->append_read_pos <= info->write_pos);
 
1182
    /*
 
1183
      TODO: figure out if the assert below is needed or correct.
 
1184
    */
 
1185
    assert(pos_in_file == info->end_of_file);
 
1186
    copy_len=cmin(Count, len_in_buff);
 
1187
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1188
    info->append_read_pos += copy_len;
 
1189
    Count -= copy_len;
 
1190
    if (Count)
 
1191
      info->error = save_count - Count;
 
1192
 
 
1193
    /* Fill read buffer with data from write buffer */
 
1194
    memcpy(info->buffer, info->append_read_pos,
 
1195
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1196
    info->read_pos= info->buffer;
 
1197
    info->read_end= info->buffer+transfer_len;
 
1198
    info->append_read_pos=info->write_pos;
 
1199
    info->pos_in_file=pos_in_file+copy_len;
 
1200
    info->end_of_file+=len_in_buff;
 
1201
  }
 
1202
  unlock_append_buffer(info);
 
1203
  return Count ? 1 : 0;
 
1204
}
 
1205
 
 
1206
 
541
1207
#ifdef HAVE_AIOWAIT
542
1208
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1209
/*
 
1210
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1211
  from disk when needed.
 
1212
 
 
1213
  SYNOPSIS
 
1214
    _my_b_async_read()
 
1215
      info                      IO_CACHE pointer
 
1216
      Buffer                    Buffer to retrieve count bytes from file
 
1217
      Count                     Number of bytes to read into Buffer
 
1218
 
 
1219
  RETURN VALUE
 
1220
    -1          An error has occurred; my_errno is set.
 
1221
     0          Success
 
1222
     1          An error has occurred; IO_CACHE to error state.
 
1223
*/
 
1224
 
 
1225
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1226
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1227
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1228
  size_t max_length;
559
1229
  my_off_t next_pos_in_file;
560
1230
  unsigned char *read_buffer;
575
1245
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1246
                 my_filename(info->file),
577
1247
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1248
      my_errno=info->aio_result.result.aio_errno;
579
1249
      info->error= -1;
580
1250
      return(1);
581
1251
    }
582
1252
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1253
        read_length == (size_t) -1)
584
1254
    {
585
 
      errno=0;                          /* For testing */
 
1255
      my_errno=0;                               /* For testing */
586
1256
      info->error= (read_length == (size_t) -1 ? -1 :
587
1257
                    (int) (read_length+left_length));
588
1258
      return(1);
615
1285
      }
616
1286
    }
617
1287
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1288
    length=cmin(Count,read_length);
 
1289
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1290
    Buffer+=length;
 
1291
    Count-=length;
 
1292
    left_length+=length;
623
1293
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1294
    info->read_pos+=length;
625
1295
  }
626
1296
  else
627
1297
    next_pos_in_file=(info->pos_in_file+ (size_t)
648
1318
      if ((read_length=my_read(info->file,info->request_pos,
649
1319
                               read_length, info->myflags)) == (size_t) -1)
650
1320
        return info->error= -1;
651
 
      use_length=min(Count,read_length);
 
1321
      use_length=cmin(Count,read_length);
652
1322
      memcpy(Buffer,info->request_pos,(size_t) use_length);
653
1323
      info->read_pos=info->request_pos+Count;
654
1324
      info->read_end=info->request_pos+read_length;
659
1329
      {                                 /* Didn't find hole block */
660
1330
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1331
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1332
                   my_filename(info->file),my_errno);
663
1333
        info->error=(int) (read_length+left_length);
664
1334
        return 1;
665
1335
      }
695
1365
                (my_off_t) next_pos_in_file,SEEK_SET,
696
1366
                &info->aio_result.result))
697
1367
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1368
      my_errno=errno;
699
1369
      if (info->request_pos != info->buffer)
700
1370
      {
701
1371
        memmove(info->buffer, info->request_pos,
715
1385
#endif
716
1386
 
717
1387
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1388
/* Read one byte when buffer is empty */
 
1389
 
 
1390
int _my_b_get(IO_CACHE *info)
723
1391
{
724
1392
  unsigned char buff;
725
1393
  IO_CACHE_CALLBACK pre_read,post_read;
732
1400
  return (int) (unsigned char) buff;
733
1401
}
734
1402
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1403
/*
 
1404
   Write a byte buffer to IO_CACHE and flush to disk
 
1405
   if IO_CACHE is full.
 
1406
 
 
1407
   RETURN VALUE
 
1408
    1 On error on write
 
1409
    0 On success
 
1410
   -1 On error; my_errno contains error code.
 
1411
*/
 
1412
 
 
1413
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1414
{
745
 
  size_t rest_length,length_local;
 
1415
  size_t rest_length,length;
746
1416
 
747
1417
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1418
  {
749
 
    errno=EFBIG;
 
1419
    my_errno=errno=EFBIG;
750
1420
    return info->error = -1;
751
1421
  }
752
1422
 
760
1430
    return 1;
761
1431
  if (Count >= IO_SIZE)
762
1432
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1433
    length=Count & (size_t) ~(IO_SIZE-1);
764
1434
    if (info->seek_not_done)
765
1435
    {
766
1436
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1437
        Whenever a function which operates on IO_CACHE flushes/writes
 
1438
        some part of the IO_CACHE to disk it will set the property
769
1439
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1440
        on the IO_CACHE.
771
1441
      */
772
1442
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
773
1443
      {
776
1446
      }
777
1447
      info->seek_not_done=0;
778
1448
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1449
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1450
      return info->error= -1;
 
1451
 
 
1452
    /*
 
1453
      In case of a shared I/O cache with a writer we normally do direct
 
1454
      write cache to read cache copy. Simulate this here by direct
 
1455
      caller buffer to read cache copy. Do it after the write so that
 
1456
      the cache readers actions on the flushed part can go in parallel
 
1457
      with the write of the extra stuff. copy_to_read_buffer()
 
1458
      synchronizes writer and readers so that after this call the
 
1459
      readers can act on the extra stuff while the writer can go ahead
 
1460
      and prepare the next output. copy_to_read_buffer() relies on
 
1461
      info->pos_in_file.
 
1462
    */
 
1463
    if (info->share)
 
1464
      copy_to_read_buffer(info, Buffer, length);
 
1465
 
 
1466
    Count-=length;
 
1467
    Buffer+=length;
 
1468
    info->pos_in_file+=length;
 
1469
  }
 
1470
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1471
  info->write_pos+=Count;
 
1472
  return 0;
 
1473
}
 
1474
 
 
1475
 
 
1476
/*
 
1477
  Append a block to the write buffer.
 
1478
  This is done with the buffer locked to ensure that we don't read from
 
1479
  the write buffer before we are ready with it.
 
1480
*/
 
1481
 
 
1482
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1483
{
 
1484
  size_t rest_length,length;
 
1485
 
 
1486
  /*
 
1487
    Assert that we cannot come here with a shared cache. If we do one
 
1488
    day, we might need to add a call to copy_to_read_buffer().
 
1489
  */
 
1490
  assert(!info->share);
 
1491
 
 
1492
  lock_append_buffer(info);
 
1493
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1494
  if (Count <= rest_length)
 
1495
    goto end;
 
1496
  memcpy(info->write_pos, Buffer, rest_length);
 
1497
  Buffer+=rest_length;
 
1498
  Count-=rest_length;
 
1499
  info->write_pos+=rest_length;
 
1500
  if (my_b_flush_io_cache(info,0))
 
1501
  {
 
1502
    unlock_append_buffer(info);
 
1503
    return 1;
 
1504
  }
 
1505
  if (Count >= IO_SIZE)
 
1506
  {                                     /* Fill first intern buffer */
 
1507
    length=Count & (size_t) ~(IO_SIZE-1);
 
1508
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1509
    {
 
1510
      unlock_append_buffer(info);
 
1511
      return info->error= -1;
 
1512
    }
 
1513
    Count-=length;
 
1514
    Buffer+=length;
 
1515
    info->end_of_file+=length;
 
1516
  }
 
1517
 
 
1518
end:
 
1519
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1520
  info->write_pos+=Count;
 
1521
  unlock_append_buffer(info);
 
1522
  return 0;
 
1523
}
 
1524
 
 
1525
 
 
1526
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1527
{
 
1528
  /*
 
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with
 
1531
    several layers of ternary operators that evaluated comma(,) operator
 
1532
    expressions inside - I do have a test case if somebody wants it
 
1533
  */
 
1534
  if (info->type == SEQ_READ_APPEND)
 
1535
    return my_b_append(info, Buffer, Count);
 
1536
  return my_b_write(info, Buffer, Count);
 
1537
}
 
1538
 
 
1539
 
 
1540
/*
 
1541
  Write a block to disk where part of the data may be inside the record
 
1542
  buffer.  As all write calls to the data goes through the cache,
 
1543
  we will never get a seek over the end of the buffer
 
1544
*/
 
1545
 
 
1546
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1547
                   my_off_t pos)
799
1548
{
800
 
  size_t length_local;
 
1549
  size_t length;
801
1550
  int error=0;
802
1551
 
 
1552
  /*
 
1553
    Assert that we cannot come here with a shared cache. If we do one
 
1554
    day, we might need to add a call to copy_to_read_buffer().
 
1555
  */
 
1556
  assert(!info->share);
 
1557
 
803
1558
  if (pos < info->pos_in_file)
804
1559
  {
805
1560
    /* Of no overlap, write everything without buffering */
806
1561
    if (pos + Count <= info->pos_in_file)
807
1562
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1563
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1564
    length= (uint) (info->pos_in_file - pos);
 
1565
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1566
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1567
    Buffer+=length;
 
1568
    pos+=  length;
 
1569
    Count-= length;
 
1570
#ifndef HAVE_PREAD
 
1571
    info->seek_not_done=1;
 
1572
#endif
815
1573
  }
816
1574
 
817
1575
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1576
  length= (size_t) (info->write_end - info->buffer);
 
1577
  if (pos < info->pos_in_file + length)
820
1578
  {
821
1579
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1580
    length-=offset;
 
1581
    if (length > Count)
 
1582
      length=Count;
 
1583
    memcpy(info->buffer+offset, Buffer, length);
 
1584
    Buffer+=length;
 
1585
    Count-= length;
 
1586
    /* Fix length of buffer if the new data was larger */
 
1587
    if (info->buffer+length > info->write_pos)
 
1588
      info->write_pos=info->buffer+length;
831
1589
    if (!Count)
832
1590
      return (error);
833
1591
  }
837
1595
  return error;
838
1596
}
839
1597
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1598
 
 
1599
        /* Flush write cache */
 
1600
 
 
1601
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1602
  lock_append_buffer(info);
 
1603
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1604
  unlock_append_buffer(info);
 
1605
 
 
1606
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1607
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1608
  size_t length;
 
1609
  bool append_cache;
 
1610
  my_off_t pos_in_file;
 
1611
 
 
1612
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1613
    need_append_buffer_lock=0;
849
1614
 
850
1615
  if (info->type == WRITE_CACHE || append_cache)
851
1616
  {
852
1617
    if (info->file == -1)
853
1618
    {
854
 
      if (info->real_open_cached_file())
 
1619
      if (real_open_cached_file(info))
855
1620
        return((info->error= -1));
856
1621
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1622
    LOCK_APPEND_BUFFER;
858
1623
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1624
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1625
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1626
      /*
 
1627
        In case of a shared I/O cache with a writer we do direct write
 
1628
        cache to read cache copy. Do it before the write here so that
 
1629
        the readers can work in parallel with the write.
 
1630
        copy_to_read_buffer() relies on info->pos_in_file.
 
1631
      */
 
1632
      if (info->share)
 
1633
        copy_to_read_buffer(info, info->write_buffer, length);
 
1634
 
 
1635
      pos_in_file=info->pos_in_file;
862
1636
      /*
863
1637
        If we have append cache, we always open the file with
864
1638
        O_APPEND which moves the pos to EOF automatically on every write
865
1639
      */
866
1640
      if (!append_cache && info->seek_not_done)
867
1641
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1642
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
869
1643
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1644
          UNLOCK_APPEND_BUFFER;
871
1645
          return((info->error= -1));
872
1646
        }
873
1647
        if (!append_cache)
874
1648
          info->seek_not_done=0;
875
1649
      }
876
1650
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1651
        info->pos_in_file+=length;
878
1652
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1653
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1654
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1655
      if (my_write(info->file,info->write_buffer,length,
882
1656
                   info->myflags | MY_NABP))
883
1657
        info->error= -1;
884
1658
      else
885
1659
        info->error= 0;
886
1660
      if (!append_cache)
887
1661
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1662
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1663
      }
890
1664
      else
891
1665
      {
895
1669
      }
896
1670
 
897
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1672
      ++info->disk_writes;
 
1673
      UNLOCK_APPEND_BUFFER;
899
1674
      return(info->error);
900
1675
    }
901
1676
  }
906
1681
    info->inited=0;
907
1682
  }
908
1683
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1684
  UNLOCK_APPEND_BUFFER;
910
1685
  return(0);
911
1686
}
912
1687
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1688
/*
 
1689
  Free an IO_CACHE object
 
1690
 
 
1691
  SYNOPSOS
 
1692
    end_io_cache()
 
1693
    info                IO_CACHE Handle to free
 
1694
 
 
1695
  NOTES
 
1696
    It's currently safe to call this if one has called init_io_cache()
 
1697
    on the 'info' object, even if init_io_cache() failed.
 
1698
    This function is also safe to call twice with the same handle.
 
1699
 
 
1700
  RETURN
 
1701
   0  ok
 
1702
   #  Error
 
1703
*/
 
1704
 
 
1705
int end_io_cache(IO_CACHE *info)
928
1706
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1707
  int error=0;
 
1708
  IO_CACHE_CALLBACK pre_close;
 
1709
 
 
1710
  /*
 
1711
    Every thread must call remove_io_thread(). The last one destroys
 
1712
    the share elements.
 
1713
  */
 
1714
  assert(!info->share || !info->share->total_threads);
 
1715
 
 
1716
  if ((pre_close=info->pre_close))
 
1717
  {
 
1718
    (*pre_close)(info);
 
1719
    info->pre_close= 0;
 
1720
  }
 
1721
  if (info->alloced_buffer)
 
1722
  {
 
1723
    info->alloced_buffer=0;
 
1724
    if (info->file != -1)                       /* File doesn't exist */
 
1725
      error= my_b_flush_io_cache(info,1);
 
1726
    free((unsigned char*) info->buffer);
 
1727
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1728
  }
 
1729
  if (info->type == SEQ_READ_APPEND)
 
1730
  {
 
1731
    /* Destroy allocated mutex */
 
1732
    info->type= TYPE_NOT_SET;
 
1733
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1734
  }
 
1735
  return(error);
948
1736
} /* end_io_cache */
949
1737
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1738
 
 
1739
/**********************************************************************
 
1740
 Testing of MF_IOCACHE
 
1741
**********************************************************************/
 
1742
 
 
1743
#ifdef MAIN
 
1744
 
 
1745
#include <my_dir.h>
 
1746
 
 
1747
void die(const char* fmt, ...)
 
1748
{
 
1749
  va_list va_args;
 
1750
  va_start(va_args,fmt);
 
1751
  fprintf(stderr,"Error:");
 
1752
  vfprintf(stderr, fmt,va_args);
 
1753
  fprintf(stderr,", errno=%d\n", errno);
 
1754
  exit(1);
 
1755
}
 
1756
 
 
1757
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1758
{
 
1759
  int fd;
 
1760
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1761
    die("Could not open %s", fname);
 
1762
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1763
    die("failed in init_io_cache()");
 
1764
  return fd;
 
1765
}
 
1766
 
 
1767
void close_file(IO_CACHE* info)
 
1768
{
 
1769
  end_io_cache(info);
 
1770
  my_close(info->file, MYF(MY_WME));
 
1771
}
 
1772
 
 
1773
int main(int argc, char** argv)
 
1774
{
 
1775
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1776
  MY_STAT status;
 
1777
  const char* fname="/tmp/iocache.test";
 
1778
  int cache_size=16384;
 
1779
  char llstr_buf[22];
 
1780
  int max_block,total_bytes=0;
 
1781
  int i,num_loops=100,error=0;
 
1782
  char *p;
 
1783
  char* block, *block_end;
 
1784
  MY_INIT(argv[0]);
 
1785
  max_block = cache_size*3;
 
1786
  if (!(block=(char*)malloc(max_block)))
 
1787
    die("Not enough memory to allocate test block");
 
1788
  block_end = block + max_block;
 
1789
  for (p = block,i=0; p < block_end;i++)
 
1790
  {
 
1791
    *p++ = (char)i;
 
1792
  }
 
1793
  if (my_stat(fname,&status, MYF(0)) &&
 
1794
      my_delete(fname,MYF(MY_WME)))
 
1795
    {
 
1796
      die("Delete of %s failed, aborting", fname);
 
1797
    }
 
1798
  open_file(fname,&sra_cache, cache_size);
 
1799
  for (i = 0; i < num_loops; i++)
 
1800
  {
 
1801
    char buf[4];
 
1802
    int block_size = abs(rand() % max_block);
 
1803
    int4store(buf, block_size);
 
1804
    if (my_b_append(&sra_cache,buf,4) ||
 
1805
        my_b_append(&sra_cache, block, block_size))
 
1806
      die("write failed");
 
1807
    total_bytes += 4+block_size;
 
1808
  }
 
1809
  close_file(&sra_cache);
 
1810
  free(block);
 
1811
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1812
    die("%s failed to stat, but I had just closed it,\
 
1813
 wonder how that happened");
 
1814
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1815
         llstr(status.st_size,llstr_buf),
 
1816
         total_bytes);
 
1817
  my_delete(fname, MYF(MY_WME));
 
1818
  /* check correctness of tests */
 
1819
  if (total_bytes != status.st_size)
 
1820
  {
 
1821
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1822
supposedly written\n");
 
1823
    error=1;
 
1824
  }
 
1825
  exit(error);
 
1826
  return 0;
 
1827
}
 
1828
#endif