~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Monty Taylor
  • Date: 2008-10-14 21:20:42 UTC
  • mto: (511.1.4 codestyle)
  • mto: This revision was merged to the branch mainline in revision 521.
  • Revision ID: monty@inaugust.com-20081014212042-tef3njx3368b6lwt
Override copy ctr and op= because we have pointer members.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include <config.h>
51
 
 
52
 
#include <drizzled/internal/my_sys.h>
53
 
#include <drizzled/internal/m_string.h>
54
 
#include <drizzled/drizzled.h>
 
50
#include "mysys_priv.h"
 
51
#include <mystrings/m_string.h>
55
52
#ifdef HAVE_AIOWAIT
56
 
#include <drizzled/error.h>
57
 
#include <drizzled/internal/aio_result.h>
 
53
#include "mysys_err.h"
 
54
#include <mysys/aio_result.h>
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include <drizzled/internal/iocache.h>
 
57
#include <mysys/iocache.h>
61
58
#include <errno.h>
62
59
#include <drizzled/util/test.h>
63
 
#include <stdlib.h>
64
 
#include <algorithm>
65
 
 
66
 
using namespace std;
67
 
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
60
#define lock_append_buffer(info) \
 
61
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
62
#define unlock_append_buffer(info) \
 
63
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
64
 
 
65
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
66
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
67
 
 
68
/*
 
69
  Setup internal pointers inside IO_CACHE
 
70
 
 
71
  SYNOPSIS
 
72
    setup_io_cache()
 
73
    info                IO_CACHE handler
 
74
 
 
75
  NOTES
 
76
    This is called on automaticly on init or reinit of IO_CACHE
 
77
    It must be called externally if one moves or copies an IO_CACHE
 
78
    object.
 
79
*/
 
80
 
 
81
void setup_io_cache(IO_CACHE* info)
126
82
{
127
83
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
84
  if (info->type == WRITE_CACHE)
129
85
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
86
    info->current_pos= &info->write_pos;
 
87
    info->current_end= &info->write_end;
132
88
  }
133
89
  else
134
90
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
91
    info->current_pos= &info->read_pos;
 
92
    info->current_end= &info->read_end;
137
93
  }
138
94
}
139
95
 
140
96
 
141
 
void st_io_cache::init_functions()
 
97
static void
 
98
init_functions(IO_CACHE* info)
142
99
{
 
100
  enum cache_type type= info->type;
143
101
  switch (type) {
144
102
  case READ_NET:
145
103
    /*
150
108
      as myisamchk
151
109
    */
152
110
    break;
 
111
  case SEQ_READ_APPEND:
 
112
    info->read_function = _my_b_seq_read;
 
113
    info->write_function = 0;                   /* Force a core if used */
 
114
    break;
153
115
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
116
    info->read_function =
 
117
                          info->share ? _my_b_read_r :
 
118
                                        _my_b_read;
 
119
    info->write_function = _my_b_write;
156
120
  }
157
121
 
158
 
  setup_io_cache();
 
122
  setup_io_cache(info);
159
123
}
160
124
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
125
 
 
126
/*
 
127
  Initialize an IO_CACHE object
 
128
 
 
129
  SYNOPSOS
 
130
    init_io_cache()
 
131
    info                cache handler to initialize
 
132
    file                File that should be associated to to the handler
 
133
                        If == -1 then real_open_cached_file()
 
134
                        will be called when it's time to open file.
 
135
    cachesize           Size of buffer to allocate for read/write
 
136
                        If == 0 then use my_default_record_cache_size
 
137
    type                Type of cache
 
138
    seek_offset         Where cache should start reading/writing
 
139
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
140
    cache_myflags       Bitmap of differnt flags
 
141
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
142
                        MY_DONT_CHECK_FILESIZE
 
143
 
 
144
  RETURN
 
145
    0  ok
 
146
    #  error
 
147
*/
 
148
 
 
149
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
150
                  enum cache_type type, my_off_t seek_offset,
 
151
                  bool use_async_io, myf cache_myflags)
181
152
{
182
153
  size_t min_cache;
183
 
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
154
  my_off_t pos;
 
155
  my_off_t end_of_file= ~(my_off_t) 0;
185
156
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
157
  info->file= file;
 
158
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
159
  info->pos_in_file= seek_offset;
 
160
  info->pre_close = info->pre_read = info->post_read = 0;
 
161
  info->arg = 0;
 
162
  info->alloced_buffer = 0;
 
163
  info->buffer=0;
 
164
  info->seek_not_done= 0;
194
165
 
195
166
  if (file >= 0)
196
167
  {
197
 
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
168
    pos= my_tell(file, MYF(0));
 
169
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
199
170
    {
200
171
      /*
201
172
         This kind of object doesn't support seek() or tell(). Don't set a
202
173
         flag that will make us again try to seek() later and fail.
203
174
      */
204
 
      seek_not_done= 0;
 
175
      info->seek_not_done= 0;
205
176
      /*
206
177
        Additionally, if we're supposed to start somewhere other than the
207
178
        the beginning of whatever this file is, then somebody made a bad
210
181
      assert(seek_offset == 0);
211
182
    }
212
183
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
184
      info->seek_not_done= test(seek_offset != pos);
214
185
  }
215
186
 
 
187
  info->disk_writes= 0;
 
188
  info->share=0;
 
189
 
216
190
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
191
    return(1);                          /* No cache requested */
218
192
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
193
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
194
  {                                             /* Assume file isn't growing */
221
195
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
196
    {
223
197
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
198
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
225
199
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
200
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
201
      if (end_of_file < seek_offset)
 
202
        end_of_file=seek_offset;
229
203
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
204
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
205
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
206
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
207
        use_async_io=0;                         /* No need to use async */
234
208
      }
235
209
    }
236
210
  }
237
211
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
212
  if (type != READ_NET && type != WRITE_NET)
239
213
  {
240
214
    /* Retry allocating memory in smaller blocks until we get one */
241
215
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
219
      if (cachesize < min_cache)
246
220
        cachesize = min_cache;
247
221
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
255
 
           (unsigned char*) malloc(buffer_block)) != 0)
256
 
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
222
      if (type == SEQ_READ_APPEND)
 
223
        buffer_block *= 2;
 
224
      if ((info->buffer=
 
225
           (unsigned char*) my_malloc(buffer_block,
 
226
                             MYF((cache_myflags & ~ MY_WME) |
 
227
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
228
      {
 
229
        info->write_buffer=info->buffer;
 
230
        if (type == SEQ_READ_APPEND)
 
231
          info->write_buffer = info->buffer + cachesize;
 
232
        info->alloced_buffer=1;
259
233
        break;                                  /* Enough memory found */
260
234
      }
261
235
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
236
        return(2);                              /* Can't alloc cache */
267
237
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
238
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
239
    }
272
240
  }
273
241
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
242
  info->read_length=info->buffer_length=cachesize;
 
243
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
244
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
245
  if (type == SEQ_READ_APPEND)
 
246
  {
 
247
    info->append_read_pos = info->write_pos = info->write_buffer;
 
248
    info->write_end = info->write_buffer + info->buffer_length;
 
249
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
250
  }
 
251
#if defined(SAFE_MUTEX)
 
252
  else
 
253
  {
 
254
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
255
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
256
  }
 
257
#endif
277
258
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
259
  if (type == WRITE_CACHE)
 
260
    info->write_end=
 
261
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
262
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
263
    info->read_end=info->buffer;                /* Nothing in cache */
283
264
 
284
265
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
266
  info->end_of_file= end_of_file;
 
267
  info->error=0;
 
268
  info->type= type;
 
269
  init_functions(info);
289
270
#ifdef HAVE_AIOWAIT
290
271
  if (use_async_io && ! my_disable_async_io)
291
272
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
273
    info->read_length/=2;
 
274
    info->read_function=_my_b_async_read;
294
275
  }
295
 
  inited= aio_result.pending= 0;
 
276
  info->inited=info->aio_result.pending=0;
296
277
#endif
297
 
  return 0;
 
278
  return(0);
298
279
}                                               /* init_io_cache */
299
280
 
300
281
        /* Wait until current request is ready */
319
300
        break;
320
301
    }
321
302
  }
 
303
  return;
322
304
}
323
305
#endif
324
306
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
307
 
 
308
/*
 
309
  Use this to reset cache to re-start reading or to change the type
 
310
  between READ_CACHE <-> WRITE_CACHE
 
311
  If we are doing a reinit of a cache where we have the start of the file
 
312
  in the cache, we are reusing this memory without flushing it to disk.
 
313
*/
 
314
 
 
315
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
316
                        my_off_t seek_offset,
 
317
                        bool use_async_io __attribute__((unused)),
 
318
                        bool clear_cache)
339
319
{
340
320
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
321
  assert(type != READ_NET && info->type != READ_NET &&
 
322
              type != WRITE_NET && info->type != WRITE_NET &&
 
323
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
324
 
344
325
  /* If the whole file is in memory, avoid flushing to disk */
345
326
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
327
      seek_offset >= info->pos_in_file &&
 
328
      seek_offset <= my_b_tell(info))
348
329
  {
349
330
    /* Reuse current buffer without flushing it to disk */
350
331
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
332
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
333
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
334
      info->read_end=info->write_pos;
 
335
      info->end_of_file=my_b_tell(info);
355
336
      /*
356
337
        Trigger a new seek only if we have a valid
357
338
        file handle.
358
339
      */
359
 
      seek_not_done= (file != -1);
 
340
      info->seek_not_done= (info->file != -1);
360
341
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
342
    else if (type == WRITE_CACHE)
362
343
    {
363
 
      if (type == READ_CACHE)
 
344
      if (info->type == READ_CACHE)
364
345
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
346
        info->write_end=info->write_buffer+info->buffer_length;
 
347
        info->seek_not_done=1;
367
348
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
349
      info->end_of_file = ~(my_off_t) 0;
369
350
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
351
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
352
    if (type == WRITE_CACHE)
 
353
      info->write_pos=pos;
373
354
    else
374
 
      read_pos= pos;
 
355
      info->read_pos= pos;
375
356
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
357
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
358
#endif
378
359
  }
379
360
  else
382
363
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
364
      after the current positions should be ignored
384
365
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
366
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
367
      info->end_of_file=my_b_tell(info);
387
368
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
369
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
370
      return(1);
 
371
    info->pos_in_file=seek_offset;
391
372
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
373
    info->seek_not_done=1;
 
374
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
375
    if (type == READ_CACHE)
395
376
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
377
      info->read_end=info->buffer;              /* Nothing in cache */
397
378
    }
398
379
    else
399
380
    {
400
 
      write_end=(buffer + buffer_length -
 
381
      info->write_end=(info->buffer + info->buffer_length -
401
382
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
383
      info->end_of_file= ~(my_off_t) 0;
403
384
    }
404
385
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
386
  info->type=type;
 
387
  info->error=0;
 
388
  init_functions(info);
408
389
 
409
390
#ifdef HAVE_AIOWAIT
410
391
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
392
      ((uint32_t) info->buffer_length <
 
393
       (uint32_t) (info->end_of_file - seek_offset)))
413
394
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
395
    info->read_length=info->buffer_length/2;
 
396
    info->read_function=_my_b_async_read;
416
397
  }
417
 
  inited= 0;
418
 
#else
419
 
  (void)use_async_io;
 
398
  info->inited=0;
420
399
#endif
421
 
  return 0;
 
400
  return(0);
422
401
} /* reinit_io_cache */
423
402
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
403
 
 
404
 
 
405
/*
 
406
  Read buffered.
 
407
 
 
408
  SYNOPSIS
 
409
    _my_b_read()
 
410
      info                      IO_CACHE pointer
 
411
      Buffer                    Buffer to retrieve count bytes from file
 
412
      Count                     Number of bytes to read into Buffer
 
413
 
 
414
  NOTE
 
415
    This function is only called from the my_b_read() macro when there
 
416
    isn't enough characters in the buffer to satisfy the request.
 
417
 
 
418
  WARNING
 
419
 
 
420
    When changing this function, be careful with handling file offsets
 
421
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
422
    types than my_off_t unless you can be sure that their value fits.
 
423
    Same applies to differences of file offsets.
 
424
 
 
425
    When changing this function, check _my_b_read_r(). It might need the
 
426
    same change.
 
427
 
 
428
  RETURN
 
429
    0      we succeeded in reading all data
 
430
    1      Error: can't read requested characters
 
431
*/
 
432
 
 
433
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
434
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
435
  size_t length,diff_length,left_length, max_length;
 
436
  my_off_t pos_in_file;
448
437
 
449
438
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
439
  {
455
444
  }
456
445
 
457
446
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
447
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
448
 
460
 
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
449
  /* 
 
450
    Whenever a function which operates on IO_CACHE flushes/writes
 
451
    some part of the IO_CACHE to disk it will set the property
463
452
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
453
    on the IO_CACHE.
465
454
  */
466
455
  if (info->seek_not_done)
467
456
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
457
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
458
        != MY_FILEPOS_ERROR))
469
459
    {
470
460
      /* No error, reset seek_not_done flag. */
471
461
      info->seek_not_done= 0;
477
467
        info->file is a pipe or socket or FIFO.  We never should have tried
478
468
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
469
      */
480
 
      assert(errno != ESPIPE);
 
470
      assert(my_errno != ESPIPE);
481
471
      info->error= -1;
482
472
      return(1);
483
473
    }
484
474
  }
485
475
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
476
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
477
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
478
  {                                     /* Fill first intern buffer */
489
479
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
480
    if (info->end_of_file <= pos_in_file)
491
481
    {                                   /* End of file */
492
482
      info->error= (int) left_length;
493
483
      return(1);
494
484
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
485
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
486
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
487
        != length)
497
488
    {
498
489
      info->error= (read_length == (size_t) -1 ? -1 :
499
490
                    (int) (read_length+left_length));
500
491
      return(1);
501
492
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
493
    Count-=length;
 
494
    Buffer+=length;
 
495
    pos_in_file+=length;
 
496
    left_length+=length;
506
497
    diff_length=0;
507
498
  }
508
499
 
509
500
  max_length= info->read_length-diff_length;
510
501
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
502
      max_length > (info->end_of_file - pos_in_file))
 
503
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
504
  if (!max_length)
514
505
  {
515
506
    if (Count)
516
507
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
508
      info->error= left_length;         /* We only got this many char */
518
509
      return(1);
519
510
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
511
    length=0;                           /* Didn't read any chars */
521
512
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
513
  else if ((length= my_read(info->file,info->buffer, max_length,
523
514
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
515
           length == (size_t) -1)
525
516
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
517
    if (length != (size_t) -1)
 
518
      memcpy(Buffer, info->buffer, length);
 
519
    info->pos_in_file= pos_in_file;
 
520
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
521
    info->read_pos=info->read_end=info->buffer;
531
522
    return(1);
532
523
  }
533
524
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
525
  info->read_end=info->buffer+length;
 
526
  info->pos_in_file=pos_in_file;
536
527
  memcpy(Buffer, info->buffer, Count);
537
528
  return(0);
538
529
}
539
530
 
540
531
 
 
532
/*
 
533
  Prepare IO_CACHE for shared use.
 
534
 
 
535
  SYNOPSIS
 
536
    init_io_cache_share()
 
537
      read_cache                A read cache. This will be copied for
 
538
                                every thread after setup.
 
539
      cshare                    The share.
 
540
      write_cache               If non-NULL a write cache that is to be
 
541
                                synchronized with the read caches.
 
542
      num_threads               Number of threads sharing the cache
 
543
                                including the write thread if any.
 
544
 
 
545
  DESCRIPTION
 
546
 
 
547
    The shared cache is used so: One IO_CACHE is initialized with
 
548
    init_io_cache(). This includes the allocation of a buffer. Then a
 
549
    share is allocated and init_io_cache_share() is called with the io
 
550
    cache and the share. Then the io cache is copied for each thread. So
 
551
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
552
    is shared because cache->buffer is the same for all caches.
 
553
 
 
554
    One thread reads data from the file into the buffer. All threads
 
555
    read from the buffer, but every thread maintains its own set of
 
556
    pointers into the buffer. When all threads have used up the buffer
 
557
    contents, one of the threads reads the next block of data into the
 
558
    buffer. To accomplish this, each thread enters the cache lock before
 
559
    accessing the buffer. They wait in lock_io_cache() until all threads
 
560
    joined the lock. The last thread entering the lock is in charge of
 
561
    reading from file to buffer. It wakes all threads when done.
 
562
 
 
563
    Synchronizing a write cache to the read caches works so: Whenever
 
564
    the write buffer needs a flush, the write thread enters the lock and
 
565
    waits for all other threads to enter the lock too. They do this when
 
566
    they have used up the read buffer. When all threads are in the lock,
 
567
    the write thread copies the write buffer to the read buffer and
 
568
    wakes all threads.
 
569
 
 
570
    share->running_threads is the number of threads not being in the
 
571
    cache lock. When entering lock_io_cache() the number is decreased.
 
572
    When the thread that fills the buffer enters unlock_io_cache() the
 
573
    number is reset to the number of threads. The condition
 
574
    running_threads == 0 means that all threads are in the lock. Bumping
 
575
    up the number to the full count is non-intuitive. But increasing the
 
576
    number by one for each thread that leaves the lock could lead to a
 
577
    solo run of one thread. The last thread to join a lock reads from
 
578
    file to buffer, wakes the other threads, processes the data in the
 
579
    cache and enters the lock again. If no other thread left the lock
 
580
    meanwhile, it would think it's the last one again and read the next
 
581
    block...
 
582
 
 
583
    The share has copies of 'error', 'buffer', 'read_end', and
 
584
    'pos_in_file' from the thread that filled the buffer. We may not be
 
585
    able to access this information directly from its cache because the
 
586
    thread may be removed from the share before the variables could be
 
587
    copied by all other threads. Or, if a write buffer is synchronized,
 
588
    it would change its 'pos_in_file' after waking the other threads,
 
589
    possibly before they could copy its value.
 
590
 
 
591
    However, the 'buffer' variable in the share is for a synchronized
 
592
    write cache. It needs to know where to put the data. Otherwise it
 
593
    would need access to the read cache of one of the threads that is
 
594
    not yet removed from the share.
 
595
 
 
596
  RETURN
 
597
    void
 
598
*/
 
599
 
 
600
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
601
                         IO_CACHE *write_cache, uint32_t num_threads)
 
602
{
 
603
  assert(num_threads > 1);
 
604
  assert(read_cache->type == READ_CACHE);
 
605
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
606
 
 
607
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
608
  pthread_cond_init(&cshare->cond, 0);
 
609
  pthread_cond_init(&cshare->cond_writer, 0);
 
610
 
 
611
  cshare->running_threads= num_threads;
 
612
  cshare->total_threads=   num_threads;
 
613
  cshare->error=           0;    /* Initialize. */
 
614
  cshare->buffer=          read_cache->buffer;
 
615
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
616
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
617
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
618
 
 
619
  read_cache->share=         cshare;
 
620
  read_cache->read_function= _my_b_read_r;
 
621
  read_cache->current_pos=   NULL;
 
622
  read_cache->current_end=   NULL;
 
623
 
 
624
  if (write_cache)
 
625
    write_cache->share= cshare;
 
626
 
 
627
  return;
 
628
}
 
629
 
 
630
 
 
631
/*
 
632
  Remove a thread from shared access to IO_CACHE.
 
633
 
 
634
  SYNOPSIS
 
635
    remove_io_thread()
 
636
      cache                     The IO_CACHE to be removed from the share.
 
637
 
 
638
  NOTE
 
639
 
 
640
    Every thread must do that on exit for not to deadlock other threads.
 
641
 
 
642
    The last thread destroys the pthread resources.
 
643
 
 
644
    A writer flushes its cache first.
 
645
 
 
646
  RETURN
 
647
    void
 
648
*/
 
649
 
 
650
void remove_io_thread(IO_CACHE *cache)
 
651
{
 
652
  IO_CACHE_SHARE *cshare= cache->share;
 
653
  uint32_t total;
 
654
 
 
655
  /* If the writer goes, it needs to flush the write cache. */
 
656
  if (cache == cshare->source_cache)
 
657
    flush_io_cache(cache);
 
658
 
 
659
  pthread_mutex_lock(&cshare->mutex);
 
660
 
 
661
  /* Remove from share. */
 
662
  total= --cshare->total_threads;
 
663
 
 
664
  /* Detach from share. */
 
665
  cache->share= NULL;
 
666
 
 
667
  /* If the writer goes, let the readers know. */
 
668
  if (cache == cshare->source_cache)
 
669
  {
 
670
    cshare->source_cache= NULL;
 
671
  }
 
672
 
 
673
  /* If all threads are waiting for me to join the lock, wake them. */
 
674
  if (!--cshare->running_threads)
 
675
  {
 
676
    pthread_cond_signal(&cshare->cond_writer);
 
677
    pthread_cond_broadcast(&cshare->cond);
 
678
  }
 
679
 
 
680
  pthread_mutex_unlock(&cshare->mutex);
 
681
 
 
682
  if (!total)
 
683
  {
 
684
    pthread_cond_destroy (&cshare->cond_writer);
 
685
    pthread_cond_destroy (&cshare->cond);
 
686
    pthread_mutex_destroy(&cshare->mutex);
 
687
  }
 
688
 
 
689
  return;
 
690
}
 
691
 
 
692
 
 
693
/*
 
694
  Lock IO cache and wait for all other threads to join.
 
695
 
 
696
  SYNOPSIS
 
697
    lock_io_cache()
 
698
      cache                     The cache of the thread entering the lock.
 
699
      pos                       File position of the block to read.
 
700
                                Unused for the write thread.
 
701
 
 
702
  DESCRIPTION
 
703
 
 
704
    Wait for all threads to finish with the current buffer. We want
 
705
    all threads to proceed in concert. The last thread to join
 
706
    lock_io_cache() will read the block from file and all threads start
 
707
    to use it. Then they will join again for reading the next block.
 
708
 
 
709
    The waiting threads detect a fresh buffer by comparing
 
710
    cshare->pos_in_file with the position they want to process next.
 
711
    Since the first block may start at position 0, we take
 
712
    cshare->read_end as an additional condition. This variable is
 
713
    initialized to NULL and will be set after a block of data is written
 
714
    to the buffer.
 
715
 
 
716
  RETURN
 
717
    1           OK, lock in place, go ahead and read.
 
718
    0           OK, unlocked, another thread did the read.
 
719
*/
 
720
 
 
721
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
722
{
 
723
  IO_CACHE_SHARE *cshare= cache->share;
 
724
 
 
725
  /* Enter the lock. */
 
726
  pthread_mutex_lock(&cshare->mutex);
 
727
  cshare->running_threads--;
 
728
 
 
729
  if (cshare->source_cache)
 
730
  {
 
731
    /* A write cache is synchronized to the read caches. */
 
732
 
 
733
    if (cache == cshare->source_cache)
 
734
    {
 
735
      /* The writer waits until all readers are here. */
 
736
      while (cshare->running_threads)
 
737
      {
 
738
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
739
      }
 
740
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
741
      return(1);
 
742
    }
 
743
 
 
744
    /* The last thread wakes the writer. */
 
745
    if (!cshare->running_threads)
 
746
    {
 
747
      pthread_cond_signal(&cshare->cond_writer);
 
748
    }
 
749
 
 
750
    /*
 
751
      Readers wait until the data is copied from the writer. Another
 
752
      reason to stop waiting is the removal of the write thread. If this
 
753
      happens, we leave the lock with old data in the buffer.
 
754
    */
 
755
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
756
           cshare->source_cache)
 
757
    {
 
758
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
759
    }
 
760
 
 
761
    /*
 
762
      If the writer was removed from the share while this thread was
 
763
      asleep, we need to simulate an EOF condition. The writer cannot
 
764
      reset the share variables as they might still be in use by readers
 
765
      of the last block. When we awake here then because the last
 
766
      joining thread signalled us. If the writer is not the last, it
 
767
      will not signal. So it is safe to clear the buffer here.
 
768
    */
 
769
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
770
    {
 
771
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
772
      cshare->error= 0; /* EOF is not an error. */
 
773
    }
 
774
  }
 
775
  else
 
776
  {
 
777
    /*
 
778
      There are read caches only. The last thread arriving in
 
779
      lock_io_cache() continues with a locked cache and reads the block.
 
780
    */
 
781
    if (!cshare->running_threads)
 
782
    {
 
783
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
784
      return(1);
 
785
    }
 
786
 
 
787
    /*
 
788
      All other threads wait until the requested block is read by the
 
789
      last thread arriving. Another reason to stop waiting is the
 
790
      removal of a thread. If this leads to all threads being in the
 
791
      lock, we have to continue also. The first of the awaken threads
 
792
      will then do the read.
 
793
    */
 
794
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
795
           cshare->running_threads)
 
796
    {
 
797
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
798
    }
 
799
 
 
800
    /* If the block is not yet read, continue with a locked cache and read. */
 
801
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
802
    {
 
803
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
804
      return(1);
 
805
    }
 
806
 
 
807
    /* Another thread did read the block already. */
 
808
  }
 
809
 
 
810
  /*
 
811
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
812
    filled the buffer did this and marked all threads as running.
 
813
  */
 
814
  pthread_mutex_unlock(&cshare->mutex);
 
815
  return(0);
 
816
}
 
817
 
 
818
 
 
819
/*
 
820
  Unlock IO cache.
 
821
 
 
822
  SYNOPSIS
 
823
    unlock_io_cache()
 
824
      cache                     The cache of the thread leaving the lock.
 
825
 
 
826
  NOTE
 
827
    This is called by the thread that filled the buffer. It marks all
 
828
    threads as running and awakes them. This must not be done by any
 
829
    other thread.
 
830
 
 
831
    Do not signal cond_writer. Either there is no writer or the writer
 
832
    is the only one who can call this function.
 
833
 
 
834
    The reason for resetting running_threads to total_threads before
 
835
    waking all other threads is that it could be possible that this
 
836
    thread is so fast with processing the buffer that it enters the lock
 
837
    before even one other thread has left it. If every awoken thread
 
838
    would increase running_threads by one, this thread could think that
 
839
    he is again the last to join and would not wait for the other
 
840
    threads to process the data.
 
841
 
 
842
  RETURN
 
843
    void
 
844
*/
 
845
 
 
846
static void unlock_io_cache(IO_CACHE *cache)
 
847
{
 
848
  IO_CACHE_SHARE *cshare= cache->share;
 
849
 
 
850
  cshare->running_threads= cshare->total_threads;
 
851
  pthread_cond_broadcast(&cshare->cond);
 
852
  pthread_mutex_unlock(&cshare->mutex);
 
853
  return;
 
854
}
 
855
 
 
856
 
 
857
/*
 
858
  Read from IO_CACHE when it is shared between several threads.
 
859
 
 
860
  SYNOPSIS
 
861
    _my_b_read_r()
 
862
      cache                     IO_CACHE pointer
 
863
      Buffer                    Buffer to retrieve count bytes from file
 
864
      Count                     Number of bytes to read into Buffer
 
865
 
 
866
  NOTE
 
867
    This function is only called from the my_b_read() macro when there
 
868
    isn't enough characters in the buffer to satisfy the request.
 
869
 
 
870
  IMPLEMENTATION
 
871
 
 
872
    It works as follows: when a thread tries to read from a file (that
 
873
    is, after using all the data from the (shared) buffer), it just
 
874
    hangs on lock_io_cache(), waiting for other threads. When the very
 
875
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
876
    does actual IO and unlock_io_cache(), which signals all the waiting
 
877
    threads that data is in the buffer.
 
878
 
 
879
  WARNING
 
880
 
 
881
    When changing this function, be careful with handling file offsets
 
882
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
883
    types than my_off_t unless you can be sure that their value fits.
 
884
    Same applies to differences of file offsets. (Bug #11527)
 
885
 
 
886
    When changing this function, check _my_b_read(). It might need the
 
887
    same change.
 
888
 
 
889
  RETURN
 
890
    0      we succeeded in reading all data
 
891
    1      Error: can't read requested characters
 
892
*/
 
893
 
 
894
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
895
{
 
896
  my_off_t pos_in_file;
 
897
  size_t length, diff_length, left_length;
 
898
  IO_CACHE_SHARE *cshare= cache->share;
 
899
 
 
900
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
901
  {
 
902
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
903
    memcpy(Buffer, cache->read_pos, left_length);
 
904
    Buffer+= left_length;
 
905
    Count-= left_length;
 
906
  }
 
907
  while (Count)
 
908
  {
 
909
    size_t cnt, len;
 
910
 
 
911
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
912
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
913
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
914
    length= ((length <= cache->read_length) ?
 
915
             length + IO_ROUND_DN(cache->read_length - length) :
 
916
             length - IO_ROUND_UP(length - cache->read_length));
 
917
    if (cache->type != READ_FIFO &&
 
918
        (length > (cache->end_of_file - pos_in_file)))
 
919
      length= (size_t) (cache->end_of_file - pos_in_file);
 
920
    if (length == 0)
 
921
    {
 
922
      cache->error= (int) left_length;
 
923
      return(1);
 
924
    }
 
925
    if (lock_io_cache(cache, pos_in_file))
 
926
    {
 
927
      /* With a synchronized write/read cache we won't come here... */
 
928
      assert(!cshare->source_cache);
 
929
      /*
 
930
        ... unless the writer has gone before this thread entered the
 
931
        lock. Simulate EOF in this case. It can be distinguished by
 
932
        cache->file.
 
933
      */
 
934
      if (cache->file < 0)
 
935
        len= 0;
 
936
      else
 
937
      {
 
938
        /*
 
939
          Whenever a function which operates on IO_CACHE flushes/writes
 
940
          some part of the IO_CACHE to disk it will set the property
 
941
          "seek_not_done" to indicate this to other functions operating
 
942
          on the IO_CACHE.
 
943
        */
 
944
        if (cache->seek_not_done)
 
945
        {
 
946
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
947
              == MY_FILEPOS_ERROR)
 
948
          {
 
949
            cache->error= -1;
 
950
            unlock_io_cache(cache);
 
951
            return(1);
 
952
          }
 
953
        }
 
954
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
955
      }
 
956
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
957
      cache->error=       (len == length ? 0 : (int) len);
 
958
      cache->pos_in_file= pos_in_file;
 
959
 
 
960
      /* Copy important values to the share. */
 
961
      cshare->error=       cache->error;
 
962
      cshare->read_end=    cache->read_end;
 
963
      cshare->pos_in_file= pos_in_file;
 
964
 
 
965
      /* Mark all threads as running and wake them. */
 
966
      unlock_io_cache(cache);
 
967
    }
 
968
    else
 
969
    {
 
970
      /*
 
971
        With a synchronized write/read cache readers always come here.
 
972
        Copy important values from the share.
 
973
      */
 
974
      cache->error=       cshare->error;
 
975
      cache->read_end=    cshare->read_end;
 
976
      cache->pos_in_file= cshare->pos_in_file;
 
977
 
 
978
      len= ((cache->error == -1) ? (size_t) -1 :
 
979
            (size_t) (cache->read_end - cache->buffer));
 
980
    }
 
981
    cache->read_pos=      cache->buffer;
 
982
    cache->seek_not_done= 0;
 
983
    if (len == 0 || len == (size_t) -1)
 
984
    {
 
985
      cache->error= (int) left_length;
 
986
      return(1);
 
987
    }
 
988
    cnt= (len > Count) ? Count : len;
 
989
    memcpy(Buffer, cache->read_pos, cnt);
 
990
    Count -= cnt;
 
991
    Buffer+= cnt;
 
992
    left_length+= cnt;
 
993
    cache->read_pos+= cnt;
 
994
  }
 
995
  return(0);
 
996
}
 
997
 
 
998
 
 
999
/*
 
1000
  Copy data from write cache to read cache.
 
1001
 
 
1002
  SYNOPSIS
 
1003
    copy_to_read_buffer()
 
1004
      write_cache               The write cache.
 
1005
      write_buffer              The source of data, mostly the cache buffer.
 
1006
      write_length              The number of bytes to copy.
 
1007
 
 
1008
  NOTE
 
1009
    The write thread will wait for all read threads to join the cache
 
1010
    lock. Then it copies the data over and wakes the read threads.
 
1011
 
 
1012
  RETURN
 
1013
    void
 
1014
*/
 
1015
 
 
1016
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1017
                                const unsigned char *write_buffer, size_t write_length)
 
1018
{
 
1019
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1020
 
 
1021
  assert(cshare->source_cache == write_cache);
 
1022
  /*
 
1023
    write_length is usually less or equal to buffer_length.
 
1024
    It can be bigger if _my_b_write() is called with a big length.
 
1025
  */
 
1026
  while (write_length)
 
1027
  {
 
1028
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1029
    int  __attribute__((unused)) rc;
 
1030
 
 
1031
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1032
    /* The writing thread does always have the lock when it awakes. */
 
1033
    assert(rc);
 
1034
 
 
1035
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1036
 
 
1037
    cshare->error=       0;
 
1038
    cshare->read_end=    cshare->buffer + copy_length;
 
1039
    cshare->pos_in_file= write_cache->pos_in_file;
 
1040
 
 
1041
    /* Mark all threads as running and wake them. */
 
1042
    unlock_io_cache(write_cache);
 
1043
 
 
1044
    write_buffer+= copy_length;
 
1045
    write_length-= copy_length;
 
1046
  }
 
1047
}
 
1048
 
 
1049
 
 
1050
/*
 
1051
  Do sequential read from the SEQ_READ_APPEND cache.
 
1052
  
 
1053
  We do this in three stages:
 
1054
   - first read from info->buffer
 
1055
   - then if there are still data to read, try the file descriptor
 
1056
   - afterwards, if there are still data to read, try append buffer
 
1057
 
 
1058
  RETURNS
 
1059
    0  Success
 
1060
    1  Failed to read
 
1061
*/
 
1062
 
 
1063
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1064
{
 
1065
  size_t length, diff_length, left_length, save_count, max_length;
 
1066
  my_off_t pos_in_file;
 
1067
  save_count=Count;
 
1068
 
 
1069
  /* first, read the regular buffer */
 
1070
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1071
  {
 
1072
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1073
    memcpy(Buffer,info->read_pos, left_length);
 
1074
    Buffer+=left_length;
 
1075
    Count-=left_length;
 
1076
  }
 
1077
  lock_append_buffer(info);
 
1078
 
 
1079
  /* pos_in_file always point on where info->buffer was read */
 
1080
  if ((pos_in_file=info->pos_in_file +
 
1081
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1082
    goto read_append_buffer;
 
1083
 
 
1084
  /*
 
1085
    With read-append cache we must always do a seek before we read,
 
1086
    because the write could have moved the file pointer astray
 
1087
  */
 
1088
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1089
  {
 
1090
   info->error= -1;
 
1091
   unlock_append_buffer(info);
 
1092
   return (1);
 
1093
  }
 
1094
  info->seek_not_done=0;
 
1095
 
 
1096
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1097
 
 
1098
  /* now the second stage begins - read from file descriptor */
 
1099
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1100
  {
 
1101
    /* Fill first intern buffer */
 
1102
    size_t read_length;
 
1103
 
 
1104
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1105
    if ((read_length= my_read(info->file,Buffer, length,
 
1106
                              info->myflags)) == (size_t) -1)
 
1107
    {
 
1108
      info->error= -1;
 
1109
      unlock_append_buffer(info);
 
1110
      return 1;
 
1111
    }
 
1112
    Count-=read_length;
 
1113
    Buffer+=read_length;
 
1114
    pos_in_file+=read_length;
 
1115
 
 
1116
    if (read_length != length)
 
1117
    {
 
1118
      /*
 
1119
        We only got part of data;  Read the rest of the data from the
 
1120
        write buffer
 
1121
      */
 
1122
      goto read_append_buffer;
 
1123
    }
 
1124
    left_length+=length;
 
1125
    diff_length=0;
 
1126
  }
 
1127
 
 
1128
  max_length= info->read_length-diff_length;
 
1129
  if (max_length > (info->end_of_file - pos_in_file))
 
1130
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1131
  if (!max_length)
 
1132
  {
 
1133
    if (Count)
 
1134
      goto read_append_buffer;
 
1135
    length=0;                           /* Didn't read any more chars */
 
1136
  }
 
1137
  else
 
1138
  {
 
1139
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1140
    if (length == (size_t) -1)
 
1141
    {
 
1142
      info->error= -1;
 
1143
      unlock_append_buffer(info);
 
1144
      return 1;
 
1145
    }
 
1146
    if (length < Count)
 
1147
    {
 
1148
      memcpy(Buffer, info->buffer, length);
 
1149
      Count -= length;
 
1150
      Buffer += length;
 
1151
 
 
1152
      /*
 
1153
         added the line below to make
 
1154
         assert(pos_in_file==info->end_of_file) pass.
 
1155
         otherwise this does not appear to be needed
 
1156
      */
 
1157
      pos_in_file += length;
 
1158
      goto read_append_buffer;
 
1159
    }
 
1160
  }
 
1161
  unlock_append_buffer(info);
 
1162
  info->read_pos=info->buffer+Count;
 
1163
  info->read_end=info->buffer+length;
 
1164
  info->pos_in_file=pos_in_file;
 
1165
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1166
  return 0;
 
1167
 
 
1168
read_append_buffer:
 
1169
 
 
1170
  /*
 
1171
     Read data from the current write buffer.
 
1172
     Count should never be == 0 here (The code will work even if count is 0)
 
1173
  */
 
1174
 
 
1175
  {
 
1176
    /* First copy the data to Count */
 
1177
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1178
    size_t copy_len;
 
1179
    size_t transfer_len;
 
1180
 
 
1181
    assert(info->append_read_pos <= info->write_pos);
 
1182
    /*
 
1183
      TODO: figure out if the assert below is needed or correct.
 
1184
    */
 
1185
    assert(pos_in_file == info->end_of_file);
 
1186
    copy_len=cmin(Count, len_in_buff);
 
1187
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1188
    info->append_read_pos += copy_len;
 
1189
    Count -= copy_len;
 
1190
    if (Count)
 
1191
      info->error = save_count - Count;
 
1192
 
 
1193
    /* Fill read buffer with data from write buffer */
 
1194
    memcpy(info->buffer, info->append_read_pos,
 
1195
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1196
    info->read_pos= info->buffer;
 
1197
    info->read_end= info->buffer+transfer_len;
 
1198
    info->append_read_pos=info->write_pos;
 
1199
    info->pos_in_file=pos_in_file+copy_len;
 
1200
    info->end_of_file+=len_in_buff;
 
1201
  }
 
1202
  unlock_append_buffer(info);
 
1203
  return Count ? 1 : 0;
 
1204
}
 
1205
 
 
1206
 
541
1207
#ifdef HAVE_AIOWAIT
542
1208
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1209
/*
 
1210
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1211
  from disk when needed.
 
1212
 
 
1213
  SYNOPSIS
 
1214
    _my_b_async_read()
 
1215
      info                      IO_CACHE pointer
 
1216
      Buffer                    Buffer to retrieve count bytes from file
 
1217
      Count                     Number of bytes to read into Buffer
 
1218
 
 
1219
  RETURN VALUE
 
1220
    -1          An error has occurred; my_errno is set.
 
1221
     0          Success
 
1222
     1          An error has occurred; IO_CACHE to error state.
 
1223
*/
 
1224
 
 
1225
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1226
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1227
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1228
  size_t max_length;
559
1229
  my_off_t next_pos_in_file;
560
1230
  unsigned char *read_buffer;
575
1245
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1246
                 my_filename(info->file),
577
1247
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1248
      my_errno=info->aio_result.result.aio_errno;
579
1249
      info->error= -1;
580
1250
      return(1);
581
1251
    }
582
1252
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1253
        read_length == (size_t) -1)
584
1254
    {
585
 
      errno=0;                          /* For testing */
 
1255
      my_errno=0;                               /* For testing */
586
1256
      info->error= (read_length == (size_t) -1 ? -1 :
587
1257
                    (int) (read_length+left_length));
588
1258
      return(1);
615
1285
      }
616
1286
    }
617
1287
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1288
    length=cmin(Count,read_length);
 
1289
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1290
    Buffer+=length;
 
1291
    Count-=length;
 
1292
    left_length+=length;
623
1293
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1294
    info->read_pos+=length;
625
1295
  }
626
1296
  else
627
1297
    next_pos_in_file=(info->pos_in_file+ (size_t)
635
1305
      info->error=(int) (read_length+left_length);
636
1306
      return 1;
637
1307
    }
638
 
 
639
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1308
    
 
1309
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1310
        == MY_FILEPOS_ERROR)
640
1311
    {
641
1312
      info->error= -1;
642
1313
      return (1);
648
1319
      if ((read_length=my_read(info->file,info->request_pos,
649
1320
                               read_length, info->myflags)) == (size_t) -1)
650
1321
        return info->error= -1;
651
 
      use_length=min(Count,read_length);
 
1322
      use_length=cmin(Count,read_length);
652
1323
      memcpy(Buffer,info->request_pos,(size_t) use_length);
653
1324
      info->read_pos=info->request_pos+Count;
654
1325
      info->read_end=info->request_pos+read_length;
659
1330
      {                                 /* Didn't find hole block */
660
1331
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1332
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1333
                   my_filename(info->file),my_errno);
663
1334
        info->error=(int) (read_length+left_length);
664
1335
        return 1;
665
1336
      }
692
1363
  {
693
1364
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
694
1365
    if (aioread(info->file,read_buffer, max_length,
695
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1366
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
696
1367
                &info->aio_result.result))
697
1368
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1369
      my_errno=errno;
699
1370
      if (info->request_pos != info->buffer)
700
1371
      {
701
 
        memmove(info->buffer, info->request_pos,
702
 
                (size_t) (info->read_end - info->read_pos));
 
1372
        memcpy(info->buffer, info->request_pos,
 
1373
               (size_t) (info->read_end - info->read_pos));
703
1374
        info->request_pos=info->buffer;
704
1375
        info->read_pos-=info->read_length;
705
1376
        info->read_end-=info->read_length;
715
1386
#endif
716
1387
 
717
1388
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1389
/* Read one byte when buffer is empty */
 
1390
 
 
1391
int _my_b_get(IO_CACHE *info)
723
1392
{
724
1393
  unsigned char buff;
725
1394
  IO_CACHE_CALLBACK pre_read,post_read;
732
1401
  return (int) (unsigned char) buff;
733
1402
}
734
1403
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1404
/* 
 
1405
   Write a byte buffer to IO_CACHE and flush to disk
 
1406
   if IO_CACHE is full.
 
1407
 
 
1408
   RETURN VALUE
 
1409
    1 On error on write
 
1410
    0 On success
 
1411
   -1 On error; my_errno contains error code.
 
1412
*/
 
1413
 
 
1414
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1415
{
745
 
  size_t rest_length,length_local;
 
1416
  size_t rest_length,length;
746
1417
 
747
1418
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1419
  {
749
 
    errno=EFBIG;
 
1420
    my_errno=errno=EFBIG;
750
1421
    return info->error = -1;
751
1422
  }
752
1423
 
760
1431
    return 1;
761
1432
  if (Count >= IO_SIZE)
762
1433
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1434
    length=Count & (size_t) ~(IO_SIZE-1);
764
1435
    if (info->seek_not_done)
765
1436
    {
766
1437
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1438
        Whenever a function which operates on IO_CACHE flushes/writes
 
1439
        some part of the IO_CACHE to disk it will set the property
769
1440
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1441
        on the IO_CACHE.
771
1442
      */
772
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1443
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
773
1444
      {
774
1445
        info->error= -1;
775
1446
        return (1);
776
1447
      }
777
1448
      info->seek_not_done=0;
778
1449
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1450
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1451
      return info->error= -1;
 
1452
 
 
1453
    /*
 
1454
      In case of a shared I/O cache with a writer we normally do direct
 
1455
      write cache to read cache copy. Simulate this here by direct
 
1456
      caller buffer to read cache copy. Do it after the write so that
 
1457
      the cache readers actions on the flushed part can go in parallel
 
1458
      with the write of the extra stuff. copy_to_read_buffer()
 
1459
      synchronizes writer and readers so that after this call the
 
1460
      readers can act on the extra stuff while the writer can go ahead
 
1461
      and prepare the next output. copy_to_read_buffer() relies on
 
1462
      info->pos_in_file.
 
1463
    */
 
1464
    if (info->share)
 
1465
      copy_to_read_buffer(info, Buffer, length);
 
1466
 
 
1467
    Count-=length;
 
1468
    Buffer+=length;
 
1469
    info->pos_in_file+=length;
 
1470
  }
 
1471
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1472
  info->write_pos+=Count;
 
1473
  return 0;
 
1474
}
 
1475
 
 
1476
 
 
1477
/*
 
1478
  Append a block to the write buffer.
 
1479
  This is done with the buffer locked to ensure that we don't read from
 
1480
  the write buffer before we are ready with it.
 
1481
*/
 
1482
 
 
1483
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1484
{
 
1485
  size_t rest_length,length;
 
1486
 
 
1487
  /*
 
1488
    Assert that we cannot come here with a shared cache. If we do one
 
1489
    day, we might need to add a call to copy_to_read_buffer().
 
1490
  */
 
1491
  assert(!info->share);
 
1492
 
 
1493
  lock_append_buffer(info);
 
1494
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1495
  if (Count <= rest_length)
 
1496
    goto end;
 
1497
  memcpy(info->write_pos, Buffer, rest_length);
 
1498
  Buffer+=rest_length;
 
1499
  Count-=rest_length;
 
1500
  info->write_pos+=rest_length;
 
1501
  if (my_b_flush_io_cache(info,0))
 
1502
  {
 
1503
    unlock_append_buffer(info);
 
1504
    return 1;
 
1505
  }
 
1506
  if (Count >= IO_SIZE)
 
1507
  {                                     /* Fill first intern buffer */
 
1508
    length=Count & (size_t) ~(IO_SIZE-1);
 
1509
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1510
    {
 
1511
      unlock_append_buffer(info);
 
1512
      return info->error= -1;
 
1513
    }
 
1514
    Count-=length;
 
1515
    Buffer+=length;
 
1516
    info->end_of_file+=length;
 
1517
  }
 
1518
 
 
1519
end:
 
1520
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1521
  info->write_pos+=Count;
 
1522
  unlock_append_buffer(info);
 
1523
  return 0;
 
1524
}
 
1525
 
 
1526
 
 
1527
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1528
{
 
1529
  /*
 
1530
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1531
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1532
    several layers of ternary operators that evaluated comma(,) operator
 
1533
    expressions inside - I do have a test case if somebody wants it
 
1534
  */
 
1535
  if (info->type == SEQ_READ_APPEND)
 
1536
    return my_b_append(info, Buffer, Count);
 
1537
  return my_b_write(info, Buffer, Count);
 
1538
}
 
1539
 
 
1540
 
 
1541
/*
 
1542
  Write a block to disk where part of the data may be inside the record
 
1543
  buffer.  As all write calls to the data goes through the cache,
 
1544
  we will never get a seek over the end of the buffer
 
1545
*/
 
1546
 
 
1547
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1548
                   my_off_t pos)
799
1549
{
800
 
  size_t length_local;
 
1550
  size_t length;
801
1551
  int error=0;
802
1552
 
 
1553
  /*
 
1554
    Assert that we cannot come here with a shared cache. If we do one
 
1555
    day, we might need to add a call to copy_to_read_buffer().
 
1556
  */
 
1557
  assert(!info->share);
 
1558
 
803
1559
  if (pos < info->pos_in_file)
804
1560
  {
805
1561
    /* Of no overlap, write everything without buffering */
806
1562
    if (pos + Count <= info->pos_in_file)
807
1563
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1564
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1565
    length= (uint) (info->pos_in_file - pos);
 
1566
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1567
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1568
    Buffer+=length;
 
1569
    pos+=  length;
 
1570
    Count-= length;
 
1571
#ifndef HAVE_PREAD
 
1572
    info->seek_not_done=1;
 
1573
#endif
815
1574
  }
816
1575
 
817
1576
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1577
  length= (size_t) (info->write_end - info->buffer);
 
1578
  if (pos < info->pos_in_file + length)
820
1579
  {
821
1580
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1581
    length-=offset;
 
1582
    if (length > Count)
 
1583
      length=Count;
 
1584
    memcpy(info->buffer+offset, Buffer, length);
 
1585
    Buffer+=length;
 
1586
    Count-= length;
 
1587
    /* Fix length of buffer if the new data was larger */
 
1588
    if (info->buffer+length > info->write_pos)
 
1589
      info->write_pos=info->buffer+length;
831
1590
    if (!Count)
832
1591
      return (error);
833
1592
  }
837
1596
  return error;
838
1597
}
839
1598
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1599
 
 
1600
        /* Flush write cache */
 
1601
 
 
1602
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1603
  lock_append_buffer(info);
 
1604
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1605
  unlock_append_buffer(info);
 
1606
 
 
1607
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1608
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1609
  size_t length;
 
1610
  bool append_cache;
 
1611
  my_off_t pos_in_file;
 
1612
 
 
1613
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1614
    need_append_buffer_lock=0;
849
1615
 
850
1616
  if (info->type == WRITE_CACHE || append_cache)
851
1617
  {
852
1618
    if (info->file == -1)
853
1619
    {
854
 
      if (info->real_open_cached_file())
 
1620
      if (real_open_cached_file(info))
855
1621
        return((info->error= -1));
856
1622
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1623
    LOCK_APPEND_BUFFER;
858
1624
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1625
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1626
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1627
      /*
 
1628
        In case of a shared I/O cache with a writer we do direct write
 
1629
        cache to read cache copy. Do it before the write here so that
 
1630
        the readers can work in parallel with the write.
 
1631
        copy_to_read_buffer() relies on info->pos_in_file.
 
1632
      */
 
1633
      if (info->share)
 
1634
        copy_to_read_buffer(info, info->write_buffer, length);
 
1635
 
 
1636
      pos_in_file=info->pos_in_file;
862
1637
      /*
863
1638
        If we have append cache, we always open the file with
864
1639
        O_APPEND which moves the pos to EOF automatically on every write
865
1640
      */
866
1641
      if (!append_cache && info->seek_not_done)
867
1642
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1643
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1644
            MY_FILEPOS_ERROR)
869
1645
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1646
          UNLOCK_APPEND_BUFFER;
871
1647
          return((info->error= -1));
872
1648
        }
873
1649
        if (!append_cache)
874
1650
          info->seek_not_done=0;
875
1651
      }
876
1652
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1653
        info->pos_in_file+=length;
878
1654
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1655
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1656
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1657
      if (my_write(info->file,info->write_buffer,length,
882
1658
                   info->myflags | MY_NABP))
883
1659
        info->error= -1;
884
1660
      else
885
1661
        info->error= 0;
886
1662
      if (!append_cache)
887
1663
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1664
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1665
      }
890
1666
      else
891
1667
      {
892
1668
        info->end_of_file+=(info->write_pos-info->append_read_pos);
893
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
894
 
        assert(info->end_of_file == tell_ret);
 
1669
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
895
1670
      }
896
1671
 
897
1672
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1673
      ++info->disk_writes;
 
1674
      UNLOCK_APPEND_BUFFER;
899
1675
      return(info->error);
900
1676
    }
901
1677
  }
906
1682
    info->inited=0;
907
1683
  }
908
1684
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1685
  UNLOCK_APPEND_BUFFER;
910
1686
  return(0);
911
1687
}
912
1688
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1689
/*
 
1690
  Free an IO_CACHE object
 
1691
 
 
1692
  SYNOPSOS
 
1693
    end_io_cache()
 
1694
    info                IO_CACHE Handle to free
 
1695
 
 
1696
  NOTES
 
1697
    It's currently safe to call this if one has called init_io_cache()
 
1698
    on the 'info' object, even if init_io_cache() failed.
 
1699
    This function is also safe to call twice with the same handle.
 
1700
 
 
1701
  RETURN
 
1702
   0  ok
 
1703
   #  Error
 
1704
*/
 
1705
 
 
1706
int end_io_cache(IO_CACHE *info)
928
1707
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1708
  int error=0;
 
1709
  IO_CACHE_CALLBACK pre_close;
 
1710
 
 
1711
  /*
 
1712
    Every thread must call remove_io_thread(). The last one destroys
 
1713
    the share elements.
 
1714
  */
 
1715
  assert(!info->share || !info->share->total_threads);
 
1716
 
 
1717
  if ((pre_close=info->pre_close))
 
1718
  {
 
1719
    (*pre_close)(info);
 
1720
    info->pre_close= 0;
 
1721
  }
 
1722
  if (info->alloced_buffer)
 
1723
  {
 
1724
    info->alloced_buffer=0;
 
1725
    if (info->file != -1)                       /* File doesn't exist */
 
1726
      error= my_b_flush_io_cache(info,1);
 
1727
    free((unsigned char*) info->buffer);
 
1728
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1729
  }
 
1730
  if (info->type == SEQ_READ_APPEND)
 
1731
  {
 
1732
    /* Destroy allocated mutex */
 
1733
    info->type= TYPE_NOT_SET;
 
1734
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1735
  }
 
1736
  return(error);
948
1737
} /* end_io_cache */
949
1738
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1739
 
 
1740
/**********************************************************************
 
1741
 Testing of MF_IOCACHE
 
1742
**********************************************************************/
 
1743
 
 
1744
#ifdef MAIN
 
1745
 
 
1746
#include <my_dir.h>
 
1747
 
 
1748
void die(const char* fmt, ...)
 
1749
{
 
1750
  va_list va_args;
 
1751
  va_start(va_args,fmt);
 
1752
  fprintf(stderr,"Error:");
 
1753
  vfprintf(stderr, fmt,va_args);
 
1754
  fprintf(stderr,", errno=%d\n", errno);
 
1755
  exit(1);
 
1756
}
 
1757
 
 
1758
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1759
{
 
1760
  int fd;
 
1761
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1762
    die("Could not open %s", fname);
 
1763
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1764
    die("failed in init_io_cache()");
 
1765
  return fd;
 
1766
}
 
1767
 
 
1768
void close_file(IO_CACHE* info)
 
1769
{
 
1770
  end_io_cache(info);
 
1771
  my_close(info->file, MYF(MY_WME));
 
1772
}
 
1773
 
 
1774
int main(int argc, char** argv)
 
1775
{
 
1776
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1777
  MY_STAT status;
 
1778
  const char* fname="/tmp/iocache.test";
 
1779
  int cache_size=16384;
 
1780
  char llstr_buf[22];
 
1781
  int max_block,total_bytes=0;
 
1782
  int i,num_loops=100,error=0;
 
1783
  char *p;
 
1784
  char* block, *block_end;
 
1785
  MY_INIT(argv[0]);
 
1786
  max_block = cache_size*3;
 
1787
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1788
    die("Not enough memory to allocate test block");
 
1789
  block_end = block + max_block;
 
1790
  for (p = block,i=0; p < block_end;i++)
 
1791
  {
 
1792
    *p++ = (char)i;
 
1793
  }
 
1794
  if (my_stat(fname,&status, MYF(0)) &&
 
1795
      my_delete(fname,MYF(MY_WME)))
 
1796
    {
 
1797
      die("Delete of %s failed, aborting", fname);
 
1798
    }
 
1799
  open_file(fname,&sra_cache, cache_size);
 
1800
  for (i = 0; i < num_loops; i++)
 
1801
  {
 
1802
    char buf[4];
 
1803
    int block_size = abs(rand() % max_block);
 
1804
    int4store(buf, block_size);
 
1805
    if (my_b_append(&sra_cache,buf,4) ||
 
1806
        my_b_append(&sra_cache, block, block_size))
 
1807
      die("write failed");
 
1808
    total_bytes += 4+block_size;
 
1809
  }
 
1810
  close_file(&sra_cache);
 
1811
  free(block);
 
1812
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1813
    die("%s failed to stat, but I had just closed it,\
 
1814
 wonder how that happened");
 
1815
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1816
         llstr(status.st_size,llstr_buf),
 
1817
         total_bytes);
 
1818
  my_delete(fname, MYF(MY_WME));
 
1819
  /* check correctness of tests */
 
1820
  if (total_bytes != status.st_size)
 
1821
  {
 
1822
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1823
supposedly written\n");
 
1824
    error=1;
 
1825
  }
 
1826
  exit(error);
 
1827
  return 0;
 
1828
}
 
1829
#endif