~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: devananda
  • Date: 2009-07-01 17:38:47 UTC
  • mto: (1093.1.7 captain)
  • mto: This revision was merged to the branch mainline in revision 1095.
  • Revision ID: devananda.vdv@gmail.com-20090701173847-3n3mbtessg5ff35e
refactored function/benchmark into plugin/benchmark

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
54
 
#include "drizzled/drizzled.h"
 
50
#include "mysys/mysys_priv.h"
 
51
#include <mystrings/m_string.h>
55
52
#ifdef HAVE_AIOWAIT
56
 
#include "drizzled/error.h"
57
 
#include "drizzled/internal/aio_result.h"
 
53
#include "mysys/mysys_err.h"
 
54
#include <mysys/aio_result.h>
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include "drizzled/internal/iocache.h"
 
57
#include <mysys/iocache.h>
61
58
#include <errno.h>
62
59
#include <drizzled/util/test.h>
63
60
#include <stdlib.h>
64
 
#include <algorithm>
65
 
 
66
 
using namespace std;
67
 
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(register st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(register st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
61
 
 
62
#define lock_append_buffer(info) \
 
63
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
64
#define unlock_append_buffer(info) \
 
65
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
66
 
 
67
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
68
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
69
 
 
70
/*
 
71
  Setup internal pointers inside IO_CACHE
 
72
 
 
73
  SYNOPSIS
 
74
    setup_io_cache()
 
75
    info                IO_CACHE handler
 
76
 
 
77
  NOTES
 
78
    This is called on automaticly on init or reinit of IO_CACHE
 
79
    It must be called externally if one moves or copies an IO_CACHE
 
80
    object.
 
81
*/
 
82
 
 
83
void setup_io_cache(IO_CACHE* info)
126
84
{
127
85
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
86
  if (info->type == WRITE_CACHE)
129
87
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
88
    info->current_pos= &info->write_pos;
 
89
    info->current_end= &info->write_end;
132
90
  }
133
91
  else
134
92
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
93
    info->current_pos= &info->read_pos;
 
94
    info->current_end= &info->read_end;
137
95
  }
138
96
}
139
97
 
140
98
 
141
 
void st_io_cache::init_functions()
 
99
static void
 
100
init_functions(IO_CACHE* info)
142
101
{
 
102
  enum cache_type type= info->type;
143
103
  switch (type) {
144
104
  case READ_NET:
145
105
    /*
150
110
      as myisamchk
151
111
    */
152
112
    break;
 
113
  case SEQ_READ_APPEND:
 
114
    info->read_function = _my_b_seq_read;
 
115
    info->write_function = 0;                   /* Force a core if used */
 
116
    break;
153
117
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
118
    info->read_function =
 
119
                          info->share ? _my_b_read_r :
 
120
                                        _my_b_read;
 
121
    info->write_function = _my_b_write;
156
122
  }
157
123
 
158
 
  setup_io_cache();
 
124
  setup_io_cache(info);
159
125
}
160
126
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
127
 
 
128
/*
 
129
  Initialize an IO_CACHE object
 
130
 
 
131
  SYNOPSOS
 
132
    init_io_cache()
 
133
    info                cache handler to initialize
 
134
    file                File that should be associated to to the handler
 
135
                        If == -1 then real_open_cached_file()
 
136
                        will be called when it's time to open file.
 
137
    cachesize           Size of buffer to allocate for read/write
 
138
                        If == 0 then use my_default_record_cache_size
 
139
    type                Type of cache
 
140
    seek_offset         Where cache should start reading/writing
 
141
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
142
    cache_myflags       Bitmap of differnt flags
 
143
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
144
                        MY_DONT_CHECK_FILESIZE
 
145
 
 
146
  RETURN
 
147
    0  ok
 
148
    #  error
 
149
*/
 
150
 
 
151
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
152
                  enum cache_type type, my_off_t seek_offset,
 
153
                  bool use_async_io, myf cache_myflags)
181
154
{
182
155
  size_t min_cache;
183
156
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
157
  my_off_t end_of_file= ~(my_off_t) 0;
185
158
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
159
  info->file= file;
 
160
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
161
  info->pos_in_file= seek_offset;
 
162
  info->pre_close = info->pre_read = info->post_read = 0;
 
163
  info->arg = 0;
 
164
  info->alloced_buffer = 0;
 
165
  info->buffer=0;
 
166
  info->seek_not_done= 0;
194
167
 
195
168
  if (file >= 0)
196
169
  {
197
170
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
171
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
199
172
    {
200
173
      /*
201
174
         This kind of object doesn't support seek() or tell(). Don't set a
202
175
         flag that will make us again try to seek() later and fail.
203
176
      */
204
 
      seek_not_done= 0;
 
177
      info->seek_not_done= 0;
205
178
      /*
206
179
        Additionally, if we're supposed to start somewhere other than the
207
180
        the beginning of whatever this file is, then somebody made a bad
210
183
      assert(seek_offset == 0);
211
184
    }
212
185
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
186
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
214
187
  }
215
188
 
 
189
  info->share=0;
 
190
 
216
191
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
192
    return(1);                          /* No cache requested */
218
193
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
194
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
195
  {                                             /* Assume file isn't growing */
221
196
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
197
    {
223
198
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
199
      end_of_file=lseek(file,0L,SEEK_END);
225
200
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
201
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
202
      if (end_of_file < seek_offset)
 
203
        end_of_file=seek_offset;
229
204
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
205
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
206
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
207
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
208
        use_async_io=0;                         /* No need to use async */
234
209
      }
235
210
    }
236
211
  }
237
212
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
213
  if (type != READ_NET && type != WRITE_NET)
239
214
  {
240
215
    /* Retry allocating memory in smaller blocks until we get one */
241
216
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
220
      if (cachesize < min_cache)
246
221
        cachesize = min_cache;
247
222
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
 
223
      if (type == SEQ_READ_APPEND)
 
224
        buffer_block *= 2;
 
225
      if ((info->buffer=
255
226
           (unsigned char*) malloc(buffer_block)) != 0)
256
227
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
228
        info->write_buffer=info->buffer;
 
229
        if (type == SEQ_READ_APPEND)
 
230
          info->write_buffer = info->buffer + cachesize;
 
231
        info->alloced_buffer=1;
259
232
        break;                                  /* Enough memory found */
260
233
      }
261
234
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
235
        return(2);                              /* Can't alloc cache */
267
236
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
237
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
238
    }
272
239
  }
273
240
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
241
  info->read_length=info->buffer_length=cachesize;
 
242
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
243
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
244
  if (type == SEQ_READ_APPEND)
 
245
  {
 
246
    info->append_read_pos = info->write_pos = info->write_buffer;
 
247
    info->write_end = info->write_buffer + info->buffer_length;
 
248
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
249
  }
277
250
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
251
  if (type == WRITE_CACHE)
 
252
    info->write_end=
 
253
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
254
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
255
    info->read_end=info->buffer;                /* Nothing in cache */
283
256
 
284
257
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
258
  info->end_of_file= end_of_file;
 
259
  info->error=0;
 
260
  info->type= type;
 
261
  init_functions(info);
289
262
#ifdef HAVE_AIOWAIT
290
263
  if (use_async_io && ! my_disable_async_io)
291
264
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
265
    info->read_length/=2;
 
266
    info->read_function=_my_b_async_read;
294
267
  }
295
 
  inited= aio_result.pending= 0;
 
268
  info->inited=info->aio_result.pending=0;
296
269
#endif
297
 
  return 0;
 
270
  return(0);
298
271
}                                               /* init_io_cache */
299
272
 
300
273
        /* Wait until current request is ready */
319
292
        break;
320
293
    }
321
294
  }
 
295
  return;
322
296
}
323
297
#endif
324
298
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
299
 
 
300
/*
 
301
  Use this to reset cache to re-start reading or to change the type
 
302
  between READ_CACHE <-> WRITE_CACHE
 
303
  If we are doing a reinit of a cache where we have the start of the file
 
304
  in the cache, we are reusing this memory without flushing it to disk.
 
305
*/
 
306
 
 
307
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
308
                        my_off_t seek_offset,
 
309
                        bool use_async_io,
 
310
                        bool clear_cache)
339
311
{
340
312
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
313
  assert(type != READ_NET && info->type != READ_NET &&
 
314
              type != WRITE_NET && info->type != WRITE_NET &&
 
315
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
316
 
344
317
  /* If the whole file is in memory, avoid flushing to disk */
345
318
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
319
      seek_offset >= info->pos_in_file &&
 
320
      seek_offset <= my_b_tell(info))
348
321
  {
349
322
    /* Reuse current buffer without flushing it to disk */
350
323
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
324
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
325
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
326
      info->read_end=info->write_pos;
 
327
      info->end_of_file=my_b_tell(info);
355
328
      /*
356
329
        Trigger a new seek only if we have a valid
357
330
        file handle.
358
331
      */
359
 
      seek_not_done= (file != -1);
 
332
      info->seek_not_done= (info->file != -1);
360
333
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
334
    else if (type == WRITE_CACHE)
362
335
    {
363
 
      if (type == READ_CACHE)
 
336
      if (info->type == READ_CACHE)
364
337
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
338
        info->write_end=info->write_buffer+info->buffer_length;
 
339
        info->seek_not_done=1;
367
340
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
341
      info->end_of_file = ~(my_off_t) 0;
369
342
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
343
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
344
    if (type == WRITE_CACHE)
 
345
      info->write_pos=pos;
373
346
    else
374
 
      read_pos= pos;
 
347
      info->read_pos= pos;
375
348
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
349
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
350
#endif
378
351
  }
379
352
  else
382
355
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
356
      after the current positions should be ignored
384
357
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
358
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
359
      info->end_of_file=my_b_tell(info);
387
360
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
361
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
362
      return(1);
 
363
    info->pos_in_file=seek_offset;
391
364
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
365
    info->seek_not_done=1;
 
366
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
367
    if (type == READ_CACHE)
395
368
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
369
      info->read_end=info->buffer;              /* Nothing in cache */
397
370
    }
398
371
    else
399
372
    {
400
 
      write_end=(buffer + buffer_length -
 
373
      info->write_end=(info->buffer + info->buffer_length -
401
374
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
375
      info->end_of_file= ~(my_off_t) 0;
403
376
    }
404
377
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
378
  info->type=type;
 
379
  info->error=0;
 
380
  init_functions(info);
408
381
 
409
382
#ifdef HAVE_AIOWAIT
410
383
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
384
      ((uint32_t) info->buffer_length <
 
385
       (uint32_t) (info->end_of_file - seek_offset)))
413
386
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
387
    info->read_length=info->buffer_length/2;
 
388
    info->read_function=_my_b_async_read;
416
389
  }
417
 
  inited= 0;
 
390
  info->inited=0;
418
391
#else
419
392
  (void)use_async_io;
420
393
#endif
421
 
  return 0;
 
394
  return(0);
422
395
} /* reinit_io_cache */
423
396
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
397
 
 
398
 
 
399
/*
 
400
  Read buffered.
 
401
 
 
402
  SYNOPSIS
 
403
    _my_b_read()
 
404
      info                      IO_CACHE pointer
 
405
      Buffer                    Buffer to retrieve count bytes from file
 
406
      Count                     Number of bytes to read into Buffer
 
407
 
 
408
  NOTE
 
409
    This function is only called from the my_b_read() macro when there
 
410
    isn't enough characters in the buffer to satisfy the request.
 
411
 
 
412
  WARNING
 
413
 
 
414
    When changing this function, be careful with handling file offsets
 
415
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
416
    types than my_off_t unless you can be sure that their value fits.
 
417
    Same applies to differences of file offsets.
 
418
 
 
419
    When changing this function, check _my_b_read_r(). It might need the
 
420
    same change.
 
421
 
 
422
  RETURN
 
423
    0      we succeeded in reading all data
 
424
    1      Error: can't read requested characters
 
425
*/
 
426
 
 
427
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
428
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
429
  size_t length,diff_length,left_length, max_length;
 
430
  my_off_t pos_in_file;
448
431
 
449
432
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
433
  {
455
438
  }
456
439
 
457
440
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
441
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
442
 
460
443
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
444
    Whenever a function which operates on IO_CACHE flushes/writes
 
445
    some part of the IO_CACHE to disk it will set the property
463
446
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
447
    on the IO_CACHE.
465
448
  */
466
449
  if (info->seek_not_done)
467
450
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
451
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
469
452
    {
470
453
      /* No error, reset seek_not_done flag. */
471
454
      info->seek_not_done= 0;
477
460
        info->file is a pipe or socket or FIFO.  We never should have tried
478
461
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
462
      */
480
 
      assert(errno != ESPIPE);
 
463
      assert(my_errno != ESPIPE);
481
464
      info->error= -1;
482
465
      return(1);
483
466
    }
484
467
  }
485
468
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
469
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
470
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
471
  {                                     /* Fill first intern buffer */
489
472
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
473
    if (info->end_of_file <= pos_in_file)
491
474
    {                                   /* End of file */
492
475
      info->error= (int) left_length;
493
476
      return(1);
494
477
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
478
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
479
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
480
        != length)
497
481
    {
498
482
      info->error= (read_length == (size_t) -1 ? -1 :
499
483
                    (int) (read_length+left_length));
500
484
      return(1);
501
485
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
486
    Count-=length;
 
487
    Buffer+=length;
 
488
    pos_in_file+=length;
 
489
    left_length+=length;
506
490
    diff_length=0;
507
491
  }
508
492
 
509
493
  max_length= info->read_length-diff_length;
510
494
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
495
      max_length > (info->end_of_file - pos_in_file))
 
496
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
497
  if (!max_length)
514
498
  {
515
499
    if (Count)
516
500
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
501
      info->error= left_length;         /* We only got this many char */
518
502
      return(1);
519
503
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
504
    length=0;                           /* Didn't read any chars */
521
505
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
506
  else if ((length= my_read(info->file,info->buffer, max_length,
523
507
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
508
           length == (size_t) -1)
525
509
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
510
    if (length != (size_t) -1)
 
511
      memcpy(Buffer, info->buffer, length);
 
512
    info->pos_in_file= pos_in_file;
 
513
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
514
    info->read_pos=info->read_end=info->buffer;
531
515
    return(1);
532
516
  }
533
517
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
518
  info->read_end=info->buffer+length;
 
519
  info->pos_in_file=pos_in_file;
536
520
  memcpy(Buffer, info->buffer, Count);
537
521
  return(0);
538
522
}
539
523
 
540
524
 
 
525
/*
 
526
  Prepare IO_CACHE for shared use.
 
527
 
 
528
  SYNOPSIS
 
529
    init_io_cache_share()
 
530
      read_cache                A read cache. This will be copied for
 
531
                                every thread after setup.
 
532
      cshare                    The share.
 
533
      write_cache               If non-NULL a write cache that is to be
 
534
                                synchronized with the read caches.
 
535
      num_threads               Number of threads sharing the cache
 
536
                                including the write thread if any.
 
537
 
 
538
  DESCRIPTION
 
539
 
 
540
    The shared cache is used so: One IO_CACHE is initialized with
 
541
    init_io_cache(). This includes the allocation of a buffer. Then a
 
542
    share is allocated and init_io_cache_share() is called with the io
 
543
    cache and the share. Then the io cache is copied for each thread. So
 
544
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
545
    is shared because cache->buffer is the same for all caches.
 
546
 
 
547
    One thread reads data from the file into the buffer. All threads
 
548
    read from the buffer, but every thread maintains its own set of
 
549
    pointers into the buffer. When all threads have used up the buffer
 
550
    contents, one of the threads reads the next block of data into the
 
551
    buffer. To accomplish this, each thread enters the cache lock before
 
552
    accessing the buffer. They wait in lock_io_cache() until all threads
 
553
    joined the lock. The last thread entering the lock is in charge of
 
554
    reading from file to buffer. It wakes all threads when done.
 
555
 
 
556
    Synchronizing a write cache to the read caches works so: Whenever
 
557
    the write buffer needs a flush, the write thread enters the lock and
 
558
    waits for all other threads to enter the lock too. They do this when
 
559
    they have used up the read buffer. When all threads are in the lock,
 
560
    the write thread copies the write buffer to the read buffer and
 
561
    wakes all threads.
 
562
 
 
563
    share->running_threads is the number of threads not being in the
 
564
    cache lock. When entering lock_io_cache() the number is decreased.
 
565
    When the thread that fills the buffer enters unlock_io_cache() the
 
566
    number is reset to the number of threads. The condition
 
567
    running_threads == 0 means that all threads are in the lock. Bumping
 
568
    up the number to the full count is non-intuitive. But increasing the
 
569
    number by one for each thread that leaves the lock could lead to a
 
570
    solo run of one thread. The last thread to join a lock reads from
 
571
    file to buffer, wakes the other threads, processes the data in the
 
572
    cache and enters the lock again. If no other thread left the lock
 
573
    meanwhile, it would think it's the last one again and read the next
 
574
    block...
 
575
 
 
576
    The share has copies of 'error', 'buffer', 'read_end', and
 
577
    'pos_in_file' from the thread that filled the buffer. We may not be
 
578
    able to access this information directly from its cache because the
 
579
    thread may be removed from the share before the variables could be
 
580
    copied by all other threads. Or, if a write buffer is synchronized,
 
581
    it would change its 'pos_in_file' after waking the other threads,
 
582
    possibly before they could copy its value.
 
583
 
 
584
    However, the 'buffer' variable in the share is for a synchronized
 
585
    write cache. It needs to know where to put the data. Otherwise it
 
586
    would need access to the read cache of one of the threads that is
 
587
    not yet removed from the share.
 
588
 
 
589
  RETURN
 
590
    void
 
591
*/
 
592
 
 
593
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
594
                         IO_CACHE *write_cache, uint32_t num_threads)
 
595
{
 
596
  assert(num_threads > 1);
 
597
  assert(read_cache->type == READ_CACHE);
 
598
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
599
 
 
600
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
601
  pthread_cond_init(&cshare->cond, 0);
 
602
  pthread_cond_init(&cshare->cond_writer, 0);
 
603
 
 
604
  cshare->running_threads= num_threads;
 
605
  cshare->total_threads=   num_threads;
 
606
  cshare->error=           0;    /* Initialize. */
 
607
  cshare->buffer=          read_cache->buffer;
 
608
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
609
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
610
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
611
 
 
612
  read_cache->share=         cshare;
 
613
  read_cache->read_function= _my_b_read_r;
 
614
  read_cache->current_pos=   NULL;
 
615
  read_cache->current_end=   NULL;
 
616
 
 
617
  if (write_cache)
 
618
    write_cache->share= cshare;
 
619
 
 
620
  return;
 
621
}
 
622
 
 
623
 
 
624
/*
 
625
  Remove a thread from shared access to IO_CACHE.
 
626
 
 
627
  SYNOPSIS
 
628
    remove_io_thread()
 
629
      cache                     The IO_CACHE to be removed from the share.
 
630
 
 
631
  NOTE
 
632
 
 
633
    Every thread must do that on exit for not to deadlock other threads.
 
634
 
 
635
    The last thread destroys the pthread resources.
 
636
 
 
637
    A writer flushes its cache first.
 
638
 
 
639
  RETURN
 
640
    void
 
641
*/
 
642
 
 
643
void remove_io_thread(IO_CACHE *cache)
 
644
{
 
645
  IO_CACHE_SHARE *cshare= cache->share;
 
646
  uint32_t total;
 
647
 
 
648
  /* If the writer goes, it needs to flush the write cache. */
 
649
  if (cache == cshare->source_cache)
 
650
    flush_io_cache(cache);
 
651
 
 
652
  pthread_mutex_lock(&cshare->mutex);
 
653
 
 
654
  /* Remove from share. */
 
655
  total= --cshare->total_threads;
 
656
 
 
657
  /* Detach from share. */
 
658
  cache->share= NULL;
 
659
 
 
660
  /* If the writer goes, let the readers know. */
 
661
  if (cache == cshare->source_cache)
 
662
  {
 
663
    cshare->source_cache= NULL;
 
664
  }
 
665
 
 
666
  /* If all threads are waiting for me to join the lock, wake them. */
 
667
  if (!--cshare->running_threads)
 
668
  {
 
669
    pthread_cond_signal(&cshare->cond_writer);
 
670
    pthread_cond_broadcast(&cshare->cond);
 
671
  }
 
672
 
 
673
  pthread_mutex_unlock(&cshare->mutex);
 
674
 
 
675
  if (!total)
 
676
  {
 
677
    pthread_cond_destroy (&cshare->cond_writer);
 
678
    pthread_cond_destroy (&cshare->cond);
 
679
    pthread_mutex_destroy(&cshare->mutex);
 
680
  }
 
681
 
 
682
  return;
 
683
}
 
684
 
 
685
 
 
686
/*
 
687
  Lock IO cache and wait for all other threads to join.
 
688
 
 
689
  SYNOPSIS
 
690
    lock_io_cache()
 
691
      cache                     The cache of the thread entering the lock.
 
692
      pos                       File position of the block to read.
 
693
                                Unused for the write thread.
 
694
 
 
695
  DESCRIPTION
 
696
 
 
697
    Wait for all threads to finish with the current buffer. We want
 
698
    all threads to proceed in concert. The last thread to join
 
699
    lock_io_cache() will read the block from file and all threads start
 
700
    to use it. Then they will join again for reading the next block.
 
701
 
 
702
    The waiting threads detect a fresh buffer by comparing
 
703
    cshare->pos_in_file with the position they want to process next.
 
704
    Since the first block may start at position 0, we take
 
705
    cshare->read_end as an additional condition. This variable is
 
706
    initialized to NULL and will be set after a block of data is written
 
707
    to the buffer.
 
708
 
 
709
  RETURN
 
710
    1           OK, lock in place, go ahead and read.
 
711
    0           OK, unlocked, another thread did the read.
 
712
*/
 
713
 
 
714
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
715
{
 
716
  IO_CACHE_SHARE *cshare= cache->share;
 
717
 
 
718
  /* Enter the lock. */
 
719
  pthread_mutex_lock(&cshare->mutex);
 
720
  cshare->running_threads--;
 
721
 
 
722
  if (cshare->source_cache)
 
723
  {
 
724
    /* A write cache is synchronized to the read caches. */
 
725
 
 
726
    if (cache == cshare->source_cache)
 
727
    {
 
728
      /* The writer waits until all readers are here. */
 
729
      while (cshare->running_threads)
 
730
      {
 
731
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
732
      }
 
733
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
734
      return(1);
 
735
    }
 
736
 
 
737
    /* The last thread wakes the writer. */
 
738
    if (!cshare->running_threads)
 
739
    {
 
740
      pthread_cond_signal(&cshare->cond_writer);
 
741
    }
 
742
 
 
743
    /*
 
744
      Readers wait until the data is copied from the writer. Another
 
745
      reason to stop waiting is the removal of the write thread. If this
 
746
      happens, we leave the lock with old data in the buffer.
 
747
    */
 
748
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
749
           cshare->source_cache)
 
750
    {
 
751
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
752
    }
 
753
 
 
754
    /*
 
755
      If the writer was removed from the share while this thread was
 
756
      asleep, we need to simulate an EOF condition. The writer cannot
 
757
      reset the share variables as they might still be in use by readers
 
758
      of the last block. When we awake here then because the last
 
759
      joining thread signalled us. If the writer is not the last, it
 
760
      will not signal. So it is safe to clear the buffer here.
 
761
    */
 
762
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
763
    {
 
764
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
765
      cshare->error= 0; /* EOF is not an error. */
 
766
    }
 
767
  }
 
768
  else
 
769
  {
 
770
    /*
 
771
      There are read caches only. The last thread arriving in
 
772
      lock_io_cache() continues with a locked cache and reads the block.
 
773
    */
 
774
    if (!cshare->running_threads)
 
775
    {
 
776
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
777
      return(1);
 
778
    }
 
779
 
 
780
    /*
 
781
      All other threads wait until the requested block is read by the
 
782
      last thread arriving. Another reason to stop waiting is the
 
783
      removal of a thread. If this leads to all threads being in the
 
784
      lock, we have to continue also. The first of the awaken threads
 
785
      will then do the read.
 
786
    */
 
787
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
788
           cshare->running_threads)
 
789
    {
 
790
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
791
    }
 
792
 
 
793
    /* If the block is not yet read, continue with a locked cache and read. */
 
794
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
795
    {
 
796
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
797
      return(1);
 
798
    }
 
799
 
 
800
    /* Another thread did read the block already. */
 
801
  }
 
802
 
 
803
  /*
 
804
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
805
    filled the buffer did this and marked all threads as running.
 
806
  */
 
807
  pthread_mutex_unlock(&cshare->mutex);
 
808
  return(0);
 
809
}
 
810
 
 
811
 
 
812
/*
 
813
  Unlock IO cache.
 
814
 
 
815
  SYNOPSIS
 
816
    unlock_io_cache()
 
817
      cache                     The cache of the thread leaving the lock.
 
818
 
 
819
  NOTE
 
820
    This is called by the thread that filled the buffer. It marks all
 
821
    threads as running and awakes them. This must not be done by any
 
822
    other thread.
 
823
 
 
824
    Do not signal cond_writer. Either there is no writer or the writer
 
825
    is the only one who can call this function.
 
826
 
 
827
    The reason for resetting running_threads to total_threads before
 
828
    waking all other threads is that it could be possible that this
 
829
    thread is so fast with processing the buffer that it enters the lock
 
830
    before even one other thread has left it. If every awoken thread
 
831
    would increase running_threads by one, this thread could think that
 
832
    he is again the last to join and would not wait for the other
 
833
    threads to process the data.
 
834
 
 
835
  RETURN
 
836
    void
 
837
*/
 
838
 
 
839
static void unlock_io_cache(IO_CACHE *cache)
 
840
{
 
841
  IO_CACHE_SHARE *cshare= cache->share;
 
842
 
 
843
  cshare->running_threads= cshare->total_threads;
 
844
  pthread_cond_broadcast(&cshare->cond);
 
845
  pthread_mutex_unlock(&cshare->mutex);
 
846
  return;
 
847
}
 
848
 
 
849
 
 
850
/*
 
851
  Read from IO_CACHE when it is shared between several threads.
 
852
 
 
853
  SYNOPSIS
 
854
    _my_b_read_r()
 
855
      cache                     IO_CACHE pointer
 
856
      Buffer                    Buffer to retrieve count bytes from file
 
857
      Count                     Number of bytes to read into Buffer
 
858
 
 
859
  NOTE
 
860
    This function is only called from the my_b_read() macro when there
 
861
    isn't enough characters in the buffer to satisfy the request.
 
862
 
 
863
  IMPLEMENTATION
 
864
 
 
865
    It works as follows: when a thread tries to read from a file (that
 
866
    is, after using all the data from the (shared) buffer), it just
 
867
    hangs on lock_io_cache(), waiting for other threads. When the very
 
868
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
869
    does actual IO and unlock_io_cache(), which signals all the waiting
 
870
    threads that data is in the buffer.
 
871
 
 
872
  WARNING
 
873
 
 
874
    When changing this function, be careful with handling file offsets
 
875
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
876
    types than my_off_t unless you can be sure that their value fits.
 
877
    Same applies to differences of file offsets. (Bug #11527)
 
878
 
 
879
    When changing this function, check _my_b_read(). It might need the
 
880
    same change.
 
881
 
 
882
  RETURN
 
883
    0      we succeeded in reading all data
 
884
    1      Error: can't read requested characters
 
885
*/
 
886
 
 
887
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
888
{
 
889
  my_off_t pos_in_file;
 
890
  size_t length, diff_length, left_length;
 
891
  IO_CACHE_SHARE *cshare= cache->share;
 
892
 
 
893
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
894
  {
 
895
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
896
    memcpy(Buffer, cache->read_pos, left_length);
 
897
    Buffer+= left_length;
 
898
    Count-= left_length;
 
899
  }
 
900
  while (Count)
 
901
  {
 
902
    size_t cnt, len;
 
903
 
 
904
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
905
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
906
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
907
    length= ((length <= cache->read_length) ?
 
908
             length + IO_ROUND_DN(cache->read_length - length) :
 
909
             length - IO_ROUND_UP(length - cache->read_length));
 
910
    if (cache->type != READ_FIFO &&
 
911
        (length > (cache->end_of_file - pos_in_file)))
 
912
      length= (size_t) (cache->end_of_file - pos_in_file);
 
913
    if (length == 0)
 
914
    {
 
915
      cache->error= (int) left_length;
 
916
      return(1);
 
917
    }
 
918
    if (lock_io_cache(cache, pos_in_file))
 
919
    {
 
920
      /* With a synchronized write/read cache we won't come here... */
 
921
      assert(!cshare->source_cache);
 
922
      /*
 
923
        ... unless the writer has gone before this thread entered the
 
924
        lock. Simulate EOF in this case. It can be distinguished by
 
925
        cache->file.
 
926
      */
 
927
      if (cache->file < 0)
 
928
        len= 0;
 
929
      else
 
930
      {
 
931
        /*
 
932
          Whenever a function which operates on IO_CACHE flushes/writes
 
933
          some part of the IO_CACHE to disk it will set the property
 
934
          "seek_not_done" to indicate this to other functions operating
 
935
          on the IO_CACHE.
 
936
        */
 
937
        if (cache->seek_not_done)
 
938
        {
 
939
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
940
          {
 
941
            cache->error= -1;
 
942
            unlock_io_cache(cache);
 
943
            return(1);
 
944
          }
 
945
        }
 
946
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
947
      }
 
948
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
949
      cache->error=       (len == length ? 0 : (int) len);
 
950
      cache->pos_in_file= pos_in_file;
 
951
 
 
952
      /* Copy important values to the share. */
 
953
      cshare->error=       cache->error;
 
954
      cshare->read_end=    cache->read_end;
 
955
      cshare->pos_in_file= pos_in_file;
 
956
 
 
957
      /* Mark all threads as running and wake them. */
 
958
      unlock_io_cache(cache);
 
959
    }
 
960
    else
 
961
    {
 
962
      /*
 
963
        With a synchronized write/read cache readers always come here.
 
964
        Copy important values from the share.
 
965
      */
 
966
      cache->error=       cshare->error;
 
967
      cache->read_end=    cshare->read_end;
 
968
      cache->pos_in_file= cshare->pos_in_file;
 
969
 
 
970
      len= ((cache->error == -1) ? (size_t) -1 :
 
971
            (size_t) (cache->read_end - cache->buffer));
 
972
    }
 
973
    cache->read_pos=      cache->buffer;
 
974
    cache->seek_not_done= 0;
 
975
    if (len == 0 || len == (size_t) -1)
 
976
    {
 
977
      cache->error= (int) left_length;
 
978
      return(1);
 
979
    }
 
980
    cnt= (len > Count) ? Count : len;
 
981
    memcpy(Buffer, cache->read_pos, cnt);
 
982
    Count -= cnt;
 
983
    Buffer+= cnt;
 
984
    left_length+= cnt;
 
985
    cache->read_pos+= cnt;
 
986
  }
 
987
  return(0);
 
988
}
 
989
 
 
990
 
 
991
/*
 
992
  Copy data from write cache to read cache.
 
993
 
 
994
  SYNOPSIS
 
995
    copy_to_read_buffer()
 
996
      write_cache               The write cache.
 
997
      write_buffer              The source of data, mostly the cache buffer.
 
998
      write_length              The number of bytes to copy.
 
999
 
 
1000
  NOTE
 
1001
    The write thread will wait for all read threads to join the cache
 
1002
    lock. Then it copies the data over and wakes the read threads.
 
1003
 
 
1004
  RETURN
 
1005
    void
 
1006
*/
 
1007
 
 
1008
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1009
                                const unsigned char *write_buffer, size_t write_length)
 
1010
{
 
1011
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1012
 
 
1013
  assert(cshare->source_cache == write_cache);
 
1014
  /*
 
1015
    write_length is usually less or equal to buffer_length.
 
1016
    It can be bigger if _my_b_write() is called with a big length.
 
1017
  */
 
1018
  while (write_length)
 
1019
  {
 
1020
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1021
    int  rc;
 
1022
 
 
1023
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1024
    /* The writing thread does always have the lock when it awakes. */
 
1025
    assert(rc);
 
1026
 
 
1027
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1028
 
 
1029
    cshare->error=       0;
 
1030
    cshare->read_end=    cshare->buffer + copy_length;
 
1031
    cshare->pos_in_file= write_cache->pos_in_file;
 
1032
 
 
1033
    /* Mark all threads as running and wake them. */
 
1034
    unlock_io_cache(write_cache);
 
1035
 
 
1036
    write_buffer+= copy_length;
 
1037
    write_length-= copy_length;
 
1038
  }
 
1039
}
 
1040
 
 
1041
 
 
1042
/*
 
1043
  Do sequential read from the SEQ_READ_APPEND cache.
 
1044
 
 
1045
  We do this in three stages:
 
1046
   - first read from info->buffer
 
1047
   - then if there are still data to read, try the file descriptor
 
1048
   - afterwards, if there are still data to read, try append buffer
 
1049
 
 
1050
  RETURNS
 
1051
    0  Success
 
1052
    1  Failed to read
 
1053
*/
 
1054
 
 
1055
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1056
{
 
1057
  size_t length, diff_length, left_length, save_count, max_length;
 
1058
  my_off_t pos_in_file;
 
1059
  save_count=Count;
 
1060
 
 
1061
  /* first, read the regular buffer */
 
1062
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1063
  {
 
1064
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1065
    memcpy(Buffer,info->read_pos, left_length);
 
1066
    Buffer+=left_length;
 
1067
    Count-=left_length;
 
1068
  }
 
1069
  lock_append_buffer(info);
 
1070
 
 
1071
  /* pos_in_file always point on where info->buffer was read */
 
1072
  if ((pos_in_file=info->pos_in_file +
 
1073
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1074
    goto read_append_buffer;
 
1075
 
 
1076
  /*
 
1077
    With read-append cache we must always do a seek before we read,
 
1078
    because the write could have moved the file pointer astray
 
1079
  */
 
1080
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1081
  {
 
1082
   info->error= -1;
 
1083
   unlock_append_buffer(info);
 
1084
   return (1);
 
1085
  }
 
1086
  info->seek_not_done=0;
 
1087
 
 
1088
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1089
 
 
1090
  /* now the second stage begins - read from file descriptor */
 
1091
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1092
  {
 
1093
    /* Fill first intern buffer */
 
1094
    size_t read_length;
 
1095
 
 
1096
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1097
    if ((read_length= my_read(info->file,Buffer, length,
 
1098
                              info->myflags)) == (size_t) -1)
 
1099
    {
 
1100
      info->error= -1;
 
1101
      unlock_append_buffer(info);
 
1102
      return 1;
 
1103
    }
 
1104
    Count-=read_length;
 
1105
    Buffer+=read_length;
 
1106
    pos_in_file+=read_length;
 
1107
 
 
1108
    if (read_length != length)
 
1109
    {
 
1110
      /*
 
1111
        We only got part of data;  Read the rest of the data from the
 
1112
        write buffer
 
1113
      */
 
1114
      goto read_append_buffer;
 
1115
    }
 
1116
    left_length+=length;
 
1117
    diff_length=0;
 
1118
  }
 
1119
 
 
1120
  max_length= info->read_length-diff_length;
 
1121
  if (max_length > (info->end_of_file - pos_in_file))
 
1122
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1123
  if (!max_length)
 
1124
  {
 
1125
    if (Count)
 
1126
      goto read_append_buffer;
 
1127
    length=0;                           /* Didn't read any more chars */
 
1128
  }
 
1129
  else
 
1130
  {
 
1131
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1132
    if (length == (size_t) -1)
 
1133
    {
 
1134
      info->error= -1;
 
1135
      unlock_append_buffer(info);
 
1136
      return 1;
 
1137
    }
 
1138
    if (length < Count)
 
1139
    {
 
1140
      memcpy(Buffer, info->buffer, length);
 
1141
      Count -= length;
 
1142
      Buffer += length;
 
1143
 
 
1144
      /*
 
1145
         added the line below to make
 
1146
         assert(pos_in_file==info->end_of_file) pass.
 
1147
         otherwise this does not appear to be needed
 
1148
      */
 
1149
      pos_in_file += length;
 
1150
      goto read_append_buffer;
 
1151
    }
 
1152
  }
 
1153
  unlock_append_buffer(info);
 
1154
  info->read_pos=info->buffer+Count;
 
1155
  info->read_end=info->buffer+length;
 
1156
  info->pos_in_file=pos_in_file;
 
1157
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1158
  return 0;
 
1159
 
 
1160
read_append_buffer:
 
1161
 
 
1162
  /*
 
1163
     Read data from the current write buffer.
 
1164
     Count should never be == 0 here (The code will work even if count is 0)
 
1165
  */
 
1166
 
 
1167
  {
 
1168
    /* First copy the data to Count */
 
1169
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1170
    size_t copy_len;
 
1171
    size_t transfer_len;
 
1172
 
 
1173
    assert(info->append_read_pos <= info->write_pos);
 
1174
    /*
 
1175
      TODO: figure out if the assert below is needed or correct.
 
1176
    */
 
1177
    assert(pos_in_file == info->end_of_file);
 
1178
    copy_len=cmin(Count, len_in_buff);
 
1179
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1180
    info->append_read_pos += copy_len;
 
1181
    Count -= copy_len;
 
1182
    if (Count)
 
1183
      info->error = save_count - Count;
 
1184
 
 
1185
    /* Fill read buffer with data from write buffer */
 
1186
    memcpy(info->buffer, info->append_read_pos,
 
1187
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1188
    info->read_pos= info->buffer;
 
1189
    info->read_end= info->buffer+transfer_len;
 
1190
    info->append_read_pos=info->write_pos;
 
1191
    info->pos_in_file=pos_in_file+copy_len;
 
1192
    info->end_of_file+=len_in_buff;
 
1193
  }
 
1194
  unlock_append_buffer(info);
 
1195
  return Count ? 1 : 0;
 
1196
}
 
1197
 
 
1198
 
541
1199
#ifdef HAVE_AIOWAIT
542
1200
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1201
/*
 
1202
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1203
  from disk when needed.
 
1204
 
 
1205
  SYNOPSIS
 
1206
    _my_b_async_read()
 
1207
      info                      IO_CACHE pointer
 
1208
      Buffer                    Buffer to retrieve count bytes from file
 
1209
      Count                     Number of bytes to read into Buffer
 
1210
 
 
1211
  RETURN VALUE
 
1212
    -1          An error has occurred; my_errno is set.
 
1213
     0          Success
 
1214
     1          An error has occurred; IO_CACHE to error state.
 
1215
*/
 
1216
 
 
1217
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1218
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1219
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1220
  size_t max_length;
559
1221
  my_off_t next_pos_in_file;
560
1222
  unsigned char *read_buffer;
575
1237
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1238
                 my_filename(info->file),
577
1239
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1240
      my_errno=info->aio_result.result.aio_errno;
579
1241
      info->error= -1;
580
1242
      return(1);
581
1243
    }
582
1244
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1245
        read_length == (size_t) -1)
584
1246
    {
585
 
      errno=0;                          /* For testing */
 
1247
      my_errno=0;                               /* For testing */
586
1248
      info->error= (read_length == (size_t) -1 ? -1 :
587
1249
                    (int) (read_length+left_length));
588
1250
      return(1);
615
1277
      }
616
1278
    }
617
1279
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1280
    length=cmin(Count,read_length);
 
1281
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1282
    Buffer+=length;
 
1283
    Count-=length;
 
1284
    left_length+=length;
623
1285
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1286
    info->read_pos+=length;
625
1287
  }
626
1288
  else
627
1289
    next_pos_in_file=(info->pos_in_file+ (size_t)
648
1310
      if ((read_length=my_read(info->file,info->request_pos,
649
1311
                               read_length, info->myflags)) == (size_t) -1)
650
1312
        return info->error= -1;
651
 
      use_length=min(Count,read_length);
 
1313
      use_length=cmin(Count,read_length);
652
1314
      memcpy(Buffer,info->request_pos,(size_t) use_length);
653
1315
      info->read_pos=info->request_pos+Count;
654
1316
      info->read_end=info->request_pos+read_length;
659
1321
      {                                 /* Didn't find hole block */
660
1322
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1323
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1324
                   my_filename(info->file),my_errno);
663
1325
        info->error=(int) (read_length+left_length);
664
1326
        return 1;
665
1327
      }
695
1357
                (my_off_t) next_pos_in_file,SEEK_SET,
696
1358
                &info->aio_result.result))
697
1359
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1360
      my_errno=errno;
699
1361
      if (info->request_pos != info->buffer)
700
1362
      {
701
1363
        memmove(info->buffer, info->request_pos,
715
1377
#endif
716
1378
 
717
1379
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1380
/* Read one byte when buffer is empty */
 
1381
 
 
1382
int _my_b_get(IO_CACHE *info)
723
1383
{
724
1384
  unsigned char buff;
725
1385
  IO_CACHE_CALLBACK pre_read,post_read;
732
1392
  return (int) (unsigned char) buff;
733
1393
}
734
1394
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1395
/*
 
1396
   Write a byte buffer to IO_CACHE and flush to disk
 
1397
   if IO_CACHE is full.
 
1398
 
 
1399
   RETURN VALUE
 
1400
    1 On error on write
 
1401
    0 On success
 
1402
   -1 On error; my_errno contains error code.
 
1403
*/
 
1404
 
 
1405
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1406
{
745
 
  size_t rest_length,length_local;
 
1407
  size_t rest_length,length;
746
1408
 
747
1409
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1410
  {
749
 
    errno=EFBIG;
 
1411
    my_errno=errno=EFBIG;
750
1412
    return info->error = -1;
751
1413
  }
752
1414
 
760
1422
    return 1;
761
1423
  if (Count >= IO_SIZE)
762
1424
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1425
    length=Count & (size_t) ~(IO_SIZE-1);
764
1426
    if (info->seek_not_done)
765
1427
    {
766
1428
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1429
        Whenever a function which operates on IO_CACHE flushes/writes
 
1430
        some part of the IO_CACHE to disk it will set the property
769
1431
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1432
        on the IO_CACHE.
771
1433
      */
772
1434
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
773
1435
      {
776
1438
      }
777
1439
      info->seek_not_done=0;
778
1440
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1441
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1442
      return info->error= -1;
 
1443
 
 
1444
    /*
 
1445
      In case of a shared I/O cache with a writer we normally do direct
 
1446
      write cache to read cache copy. Simulate this here by direct
 
1447
      caller buffer to read cache copy. Do it after the write so that
 
1448
      the cache readers actions on the flushed part can go in parallel
 
1449
      with the write of the extra stuff. copy_to_read_buffer()
 
1450
      synchronizes writer and readers so that after this call the
 
1451
      readers can act on the extra stuff while the writer can go ahead
 
1452
      and prepare the next output. copy_to_read_buffer() relies on
 
1453
      info->pos_in_file.
 
1454
    */
 
1455
    if (info->share)
 
1456
      copy_to_read_buffer(info, Buffer, length);
 
1457
 
 
1458
    Count-=length;
 
1459
    Buffer+=length;
 
1460
    info->pos_in_file+=length;
 
1461
  }
 
1462
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1463
  info->write_pos+=Count;
 
1464
  return 0;
 
1465
}
 
1466
 
 
1467
 
 
1468
/*
 
1469
  Append a block to the write buffer.
 
1470
  This is done with the buffer locked to ensure that we don't read from
 
1471
  the write buffer before we are ready with it.
 
1472
*/
 
1473
 
 
1474
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1475
{
 
1476
  size_t rest_length,length;
 
1477
 
 
1478
  /*
 
1479
    Assert that we cannot come here with a shared cache. If we do one
 
1480
    day, we might need to add a call to copy_to_read_buffer().
 
1481
  */
 
1482
  assert(!info->share);
 
1483
 
 
1484
  lock_append_buffer(info);
 
1485
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1486
  if (Count <= rest_length)
 
1487
    goto end;
 
1488
  memcpy(info->write_pos, Buffer, rest_length);
 
1489
  Buffer+=rest_length;
 
1490
  Count-=rest_length;
 
1491
  info->write_pos+=rest_length;
 
1492
  if (my_b_flush_io_cache(info,0))
 
1493
  {
 
1494
    unlock_append_buffer(info);
 
1495
    return 1;
 
1496
  }
 
1497
  if (Count >= IO_SIZE)
 
1498
  {                                     /* Fill first intern buffer */
 
1499
    length=Count & (size_t) ~(IO_SIZE-1);
 
1500
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1501
    {
 
1502
      unlock_append_buffer(info);
 
1503
      return info->error= -1;
 
1504
    }
 
1505
    Count-=length;
 
1506
    Buffer+=length;
 
1507
    info->end_of_file+=length;
 
1508
  }
 
1509
 
 
1510
end:
 
1511
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1512
  info->write_pos+=Count;
 
1513
  unlock_append_buffer(info);
 
1514
  return 0;
 
1515
}
 
1516
 
 
1517
 
 
1518
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1519
{
 
1520
  /*
 
1521
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1522
    a possible compiler bug. At least gcc 2.95 cannot deal with
 
1523
    several layers of ternary operators that evaluated comma(,) operator
 
1524
    expressions inside - I do have a test case if somebody wants it
 
1525
  */
 
1526
  if (info->type == SEQ_READ_APPEND)
 
1527
    return my_b_append(info, Buffer, Count);
 
1528
  return my_b_write(info, Buffer, Count);
 
1529
}
 
1530
 
 
1531
 
 
1532
/*
 
1533
  Write a block to disk where part of the data may be inside the record
 
1534
  buffer.  As all write calls to the data goes through the cache,
 
1535
  we will never get a seek over the end of the buffer
 
1536
*/
 
1537
 
 
1538
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1539
                   my_off_t pos)
799
1540
{
800
 
  size_t length_local;
 
1541
  size_t length;
801
1542
  int error=0;
802
1543
 
 
1544
  /*
 
1545
    Assert that we cannot come here with a shared cache. If we do one
 
1546
    day, we might need to add a call to copy_to_read_buffer().
 
1547
  */
 
1548
  assert(!info->share);
 
1549
 
803
1550
  if (pos < info->pos_in_file)
804
1551
  {
805
1552
    /* Of no overlap, write everything without buffering */
806
1553
    if (pos + Count <= info->pos_in_file)
807
1554
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1555
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1556
    length= (uint32_t) (info->pos_in_file - pos);
 
1557
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1558
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1559
    Buffer+=length;
 
1560
    pos+=  length;
 
1561
    Count-= length;
 
1562
#ifndef HAVE_PREAD
 
1563
    info->seek_not_done=1;
 
1564
#endif
815
1565
  }
816
1566
 
817
1567
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1568
  length= (size_t) (info->write_end - info->buffer);
 
1569
  if (pos < info->pos_in_file + length)
820
1570
  {
821
1571
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1572
    length-=offset;
 
1573
    if (length > Count)
 
1574
      length=Count;
 
1575
    memcpy(info->buffer+offset, Buffer, length);
 
1576
    Buffer+=length;
 
1577
    Count-= length;
 
1578
    /* Fix length of buffer if the new data was larger */
 
1579
    if (info->buffer+length > info->write_pos)
 
1580
      info->write_pos=info->buffer+length;
831
1581
    if (!Count)
832
1582
      return (error);
833
1583
  }
837
1587
  return error;
838
1588
}
839
1589
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1590
 
 
1591
        /* Flush write cache */
 
1592
 
 
1593
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1594
  lock_append_buffer(info);
 
1595
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1596
  unlock_append_buffer(info);
 
1597
 
 
1598
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1599
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1600
  size_t length;
 
1601
  bool append_cache;
 
1602
  my_off_t pos_in_file;
 
1603
 
 
1604
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1605
    need_append_buffer_lock=0;
849
1606
 
850
1607
  if (info->type == WRITE_CACHE || append_cache)
851
1608
  {
852
1609
    if (info->file == -1)
853
1610
    {
854
 
      if (info->real_open_cached_file())
 
1611
      if (real_open_cached_file(info))
855
1612
        return((info->error= -1));
856
1613
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1614
    LOCK_APPEND_BUFFER;
858
1615
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1616
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1617
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1618
      /*
 
1619
        In case of a shared I/O cache with a writer we do direct write
 
1620
        cache to read cache copy. Do it before the write here so that
 
1621
        the readers can work in parallel with the write.
 
1622
        copy_to_read_buffer() relies on info->pos_in_file.
 
1623
      */
 
1624
      if (info->share)
 
1625
        copy_to_read_buffer(info, info->write_buffer, length);
 
1626
 
 
1627
      pos_in_file=info->pos_in_file;
862
1628
      /*
863
1629
        If we have append cache, we always open the file with
864
1630
        O_APPEND which moves the pos to EOF automatically on every write
865
1631
      */
866
1632
      if (!append_cache && info->seek_not_done)
867
1633
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1634
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
869
1635
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1636
          UNLOCK_APPEND_BUFFER;
871
1637
          return((info->error= -1));
872
1638
        }
873
1639
        if (!append_cache)
874
1640
          info->seek_not_done=0;
875
1641
      }
876
1642
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1643
        info->pos_in_file+=length;
878
1644
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1645
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1646
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1647
      if (my_write(info->file,info->write_buffer,length,
882
1648
                   info->myflags | MY_NABP))
883
1649
        info->error= -1;
884
1650
      else
885
1651
        info->error= 0;
886
1652
      if (!append_cache)
887
1653
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1654
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1655
      }
890
1656
      else
891
1657
      {
895
1661
      }
896
1662
 
897
1663
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1664
      UNLOCK_APPEND_BUFFER;
899
1665
      return(info->error);
900
1666
    }
901
1667
  }
906
1672
    info->inited=0;
907
1673
  }
908
1674
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1675
  UNLOCK_APPEND_BUFFER;
910
1676
  return(0);
911
1677
}
912
1678
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1679
/*
 
1680
  Free an IO_CACHE object
 
1681
 
 
1682
  SYNOPSOS
 
1683
    end_io_cache()
 
1684
    info                IO_CACHE Handle to free
 
1685
 
 
1686
  NOTES
 
1687
    It's currently safe to call this if one has called init_io_cache()
 
1688
    on the 'info' object, even if init_io_cache() failed.
 
1689
    This function is also safe to call twice with the same handle.
 
1690
 
 
1691
  RETURN
 
1692
   0  ok
 
1693
   #  Error
 
1694
*/
 
1695
 
 
1696
int end_io_cache(IO_CACHE *info)
928
1697
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1698
  int error=0;
 
1699
  IO_CACHE_CALLBACK pre_close;
 
1700
 
 
1701
  /*
 
1702
    Every thread must call remove_io_thread(). The last one destroys
 
1703
    the share elements.
 
1704
  */
 
1705
  assert(!info->share || !info->share->total_threads);
 
1706
 
 
1707
  if ((pre_close=info->pre_close))
 
1708
  {
 
1709
    (*pre_close)(info);
 
1710
    info->pre_close= 0;
 
1711
  }
 
1712
  if (info->alloced_buffer)
 
1713
  {
 
1714
    info->alloced_buffer=0;
 
1715
    if (info->file != -1)                       /* File doesn't exist */
 
1716
      error= my_b_flush_io_cache(info,1);
 
1717
    free((unsigned char*) info->buffer);
 
1718
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1719
  }
 
1720
  if (info->type == SEQ_READ_APPEND)
 
1721
  {
 
1722
    /* Destroy allocated mutex */
 
1723
    info->type= TYPE_NOT_SET;
 
1724
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1725
  }
 
1726
  return(error);
948
1727
} /* end_io_cache */
949
1728
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1729
 
 
1730
/**********************************************************************
 
1731
 Testing of MF_IOCACHE
 
1732
**********************************************************************/
 
1733
 
 
1734
#ifdef MAIN
 
1735
 
 
1736
#include <my_dir.h>
 
1737
 
 
1738
void die(const char* fmt, ...)
 
1739
{
 
1740
  va_list va_args;
 
1741
  va_start(va_args,fmt);
 
1742
  fprintf(stderr,"Error:");
 
1743
  vfprintf(stderr, fmt,va_args);
 
1744
  fprintf(stderr,", errno=%d\n", errno);
 
1745
  exit(1);
 
1746
}
 
1747
 
 
1748
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1749
{
 
1750
  int fd;
 
1751
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1752
    die("Could not open %s", fname);
 
1753
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1754
    die("failed in init_io_cache()");
 
1755
  return fd;
 
1756
}
 
1757
 
 
1758
void close_file(IO_CACHE* info)
 
1759
{
 
1760
  end_io_cache(info);
 
1761
  my_close(info->file, MYF(MY_WME));
 
1762
}
 
1763
 
 
1764
int main(int argc, char** argv)
 
1765
{
 
1766
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1767
  MY_STAT status;
 
1768
  const char* fname="/tmp/iocache.test";
 
1769
  int cache_size=16384;
 
1770
  char llstr_buf[22];
 
1771
  int max_block,total_bytes=0;
 
1772
  int i,num_loops=100,error=0;
 
1773
  char *p;
 
1774
  char* block, *block_end;
 
1775
  MY_INIT(argv[0]);
 
1776
  max_block = cache_size*3;
 
1777
  if (!(block=(char*)malloc(max_block)))
 
1778
    die("Not enough memory to allocate test block");
 
1779
  block_end = block + max_block;
 
1780
  for (p = block,i=0; p < block_end;i++)
 
1781
  {
 
1782
    *p++ = (char)i;
 
1783
  }
 
1784
  if (my_stat(fname,&status, MYF(0)) &&
 
1785
      my_delete(fname,MYF(MY_WME)))
 
1786
    {
 
1787
      die("Delete of %s failed, aborting", fname);
 
1788
    }
 
1789
  open_file(fname,&sra_cache, cache_size);
 
1790
  for (i = 0; i < num_loops; i++)
 
1791
  {
 
1792
    char buf[4];
 
1793
    int block_size = abs(rand() % max_block);
 
1794
    int4store(buf, block_size);
 
1795
    if (my_b_append(&sra_cache,buf,4) ||
 
1796
        my_b_append(&sra_cache, block, block_size))
 
1797
      die("write failed");
 
1798
    total_bytes += 4+block_size;
 
1799
  }
 
1800
  close_file(&sra_cache);
 
1801
  free(block);
 
1802
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1803
    die("%s failed to stat, but I had just closed it,\
 
1804
 wonder how that happened");
 
1805
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1806
         llstr(status.st_size,llstr_buf),
 
1807
         total_bytes);
 
1808
  my_delete(fname, MYF(MY_WME));
 
1809
  /* check correctness of tests */
 
1810
  if (total_bytes != status.st_size)
 
1811
  {
 
1812
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1813
supposedly written\n");
 
1814
    error=1;
 
1815
  }
 
1816
  exit(error);
 
1817
  return 0;
 
1818
}
 
1819
#endif