~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/internal/mf_iocache.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "mysys/mysys_priv.h"
51
 
#include <mystrings/m_string.h>
 
50
#include "config.h"
 
51
 
 
52
#include "drizzled/internal/my_sys.h"
 
53
#include "drizzled/internal/m_string.h"
 
54
#include "drizzled/drizzled.h"
52
55
#ifdef HAVE_AIOWAIT
53
 
#include "mysys/mysys_err.h"
54
 
#include <mysys/aio_result.h>
 
56
#include "drizzled/error.h"
 
57
#include "drizzled/internal/aio_result.h"
55
58
static void my_aiowait(my_aio_result *result);
56
59
#endif
57
 
#include <mysys/iocache.h>
 
60
#include "drizzled/internal/iocache.h"
58
61
#include <errno.h>
59
62
#include <drizzled/util/test.h>
60
63
#include <stdlib.h>
62
65
 
63
66
using namespace std;
64
67
 
65
 
#define lock_append_buffer(info) \
66
 
 pthread_mutex_lock(&(info)->append_buffer_lock)
67
 
#define unlock_append_buffer(info) \
68
 
 pthread_mutex_unlock(&(info)->append_buffer_lock)
69
 
 
70
 
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
71
 
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
72
 
 
73
 
/*
74
 
  Setup internal pointers inside IO_CACHE
75
 
 
76
 
  SYNOPSIS
77
 
    setup_io_cache()
78
 
    info                IO_CACHE handler
79
 
 
80
 
  NOTES
81
 
    This is called on automaticly on init or reinit of IO_CACHE
82
 
    It must be called externally if one moves or copies an IO_CACHE
83
 
    object.
84
 
*/
85
 
 
86
 
void setup_io_cache(IO_CACHE* info)
 
68
namespace drizzled
 
69
{
 
70
namespace internal
 
71
{
 
72
 
 
73
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
 
74
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
 
75
 
 
76
/**
 
77
 * @brief
 
78
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
 
79
 */
 
80
inline
 
81
static void lock_append_buffer(register st_io_cache *, int )
 
82
{
 
83
}
 
84
 
 
85
/**
 
86
 * @brief
 
87
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
 
88
 */
 
89
inline
 
90
static void unlock_append_buffer(register st_io_cache *, int )
 
91
{
 
92
}
 
93
 
 
94
/**
 
95
 * @brief
 
96
 *   Round up to the nearest IO_SIZE boundary 
 
97
 */
 
98
inline
 
99
static size_t io_round_up(size_t x)
 
100
{
 
101
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
 
102
}
 
103
 
 
104
/**
 
105
 * @brief
 
106
 *   Round down to the nearest IO_SIZE boundary   
 
107
 */
 
108
inline
 
109
static size_t io_round_dn(size_t x)
 
110
{
 
111
  return (x & ~(IO_SIZE-1));
 
112
}
 
113
 
 
114
 
 
115
/**
 
116
 * @brief 
 
117
 *   Setup internal pointers inside st_io_cache
 
118
 * 
 
119
 * @details
 
120
 *   This is called on automatically on init or reinit of st_io_cache
 
121
 *   It must be called externally if one moves or copies an st_io_cache object.
 
122
 * 
 
123
 * @param info Cache handler to setup
 
124
 */
 
125
void st_io_cache::setup_io_cache()
87
126
{
88
127
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
89
 
  if (info->type == WRITE_CACHE)
 
128
  if (type == WRITE_CACHE)
90
129
  {
91
 
    info->current_pos= &info->write_pos;
92
 
    info->current_end= &info->write_end;
 
130
    current_pos= &write_pos;
 
131
    current_end= &write_end;
93
132
  }
94
133
  else
95
134
  {
96
 
    info->current_pos= &info->read_pos;
97
 
    info->current_end= &info->read_end;
 
135
    current_pos= &read_pos;
 
136
    current_end= &read_end;
98
137
  }
99
138
}
100
139
 
101
140
 
102
 
static void
103
 
init_functions(IO_CACHE* info)
 
141
void st_io_cache::init_functions()
104
142
{
105
 
  enum cache_type type= info->type;
106
143
  switch (type) {
107
144
  case READ_NET:
108
145
    /*
113
150
      as myisamchk
114
151
    */
115
152
    break;
116
 
  case SEQ_READ_APPEND:
117
 
    info->read_function = _my_b_seq_read;
118
 
    info->write_function = 0;                   /* Force a core if used */
119
 
    break;
120
153
  default:
121
 
    info->read_function =
122
 
                          info->share ? _my_b_read_r :
123
 
                                        _my_b_read;
124
 
    info->write_function = _my_b_write;
 
154
    read_function = _my_b_read;
 
155
    write_function = _my_b_write;
125
156
  }
126
157
 
127
 
  setup_io_cache(info);
 
158
  setup_io_cache();
128
159
}
129
160
 
130
 
 
131
 
/*
132
 
  Initialize an IO_CACHE object
133
 
 
134
 
  SYNOPSOS
135
 
    init_io_cache()
136
 
    info                cache handler to initialize
137
 
    file                File that should be associated to to the handler
138
 
                        If == -1 then real_open_cached_file()
139
 
                        will be called when it's time to open file.
140
 
    cachesize           Size of buffer to allocate for read/write
141
 
                        If == 0 then use my_default_record_cache_size
142
 
    type                Type of cache
143
 
    seek_offset         Where cache should start reading/writing
144
 
    use_async_io        Set to 1 of we should use async_io (if avaiable)
145
 
    cache_myflags       Bitmap of differnt flags
146
 
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
147
 
                        MY_DONT_CHECK_FILESIZE
148
 
 
149
 
  RETURN
150
 
    0  ok
151
 
    #  error
152
 
*/
153
 
 
154
 
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
155
 
                  enum cache_type type, my_off_t seek_offset,
156
 
                  bool use_async_io, myf cache_myflags)
 
161
/**
 
162
 * @brief
 
163
 *   Initialize an st_io_cache object
 
164
 *
 
165
 * @param file File that should be associated with the handler
 
166
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
 
167
 * @param cachesize Size of buffer to allocate for read/write
 
168
 *                      If == 0 then use my_default_record_cache_size
 
169
 * @param type Type of cache
 
170
 * @param seek_offset Where cache should start reading/writing
 
171
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
 
172
 * @param cache_myflags Bitmap of different flags
 
173
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
 
174
 * 
 
175
 * @retval 0 success
 
176
 * @retval # error
 
177
 */
 
178
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
 
179
                               enum cache_type type_arg, my_off_t seek_offset,
 
180
                               bool use_async_io, myf cache_myflags)
157
181
{
158
182
  size_t min_cache;
159
183
  off_t pos;
160
 
  my_off_t end_of_file= ~(my_off_t) 0;
 
184
  my_off_t end_of_file_local= ~(my_off_t) 0;
161
185
 
162
 
  info->file= file;
163
 
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
164
 
  info->pos_in_file= seek_offset;
165
 
  info->pre_close = info->pre_read = info->post_read = 0;
166
 
  info->arg = 0;
167
 
  info->alloced_buffer = 0;
168
 
  info->buffer=0;
169
 
  info->seek_not_done= 0;
 
186
  file= file_arg;
 
187
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
 
188
  pos_in_file= seek_offset;
 
189
  pre_close = pre_read = post_read = 0;
 
190
  arg = 0;
 
191
  alloced_buffer = 0;
 
192
  buffer=0;
 
193
  seek_not_done= 0;
170
194
 
171
195
  if (file >= 0)
172
196
  {
173
197
    pos= lseek(file, 0, SEEK_CUR);
174
 
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
 
198
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
175
199
    {
176
200
      /*
177
201
         This kind of object doesn't support seek() or tell(). Don't set a
178
202
         flag that will make us again try to seek() later and fail.
179
203
      */
180
 
      info->seek_not_done= 0;
 
204
      seek_not_done= 0;
181
205
      /*
182
206
        Additionally, if we're supposed to start somewhere other than the
183
207
        the beginning of whatever this file is, then somebody made a bad
186
210
      assert(seek_offset == 0);
187
211
    }
188
212
    else
189
 
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
 
213
      seek_not_done= test(seek_offset != (my_off_t)pos);
190
214
  }
191
215
 
192
 
  info->share=0;
193
 
 
194
216
  if (!cachesize && !(cachesize= my_default_record_cache_size))
195
 
    return(1);                          /* No cache requested */
 
217
    return 1;                           /* No cache requested */
196
218
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
197
 
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
 
219
  if (type_arg == READ_CACHE)
198
220
  {                                             /* Assume file isn't growing */
199
221
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
200
222
    {
201
223
      /* Calculate end of file to avoid allocating oversized buffers */
202
 
      end_of_file=lseek(file,0L,SEEK_END);
 
224
      end_of_file_local=lseek(file,0L,SEEK_END);
203
225
      /* Need to reset seek_not_done now that we just did a seek. */
204
 
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
205
 
      if (end_of_file < seek_offset)
206
 
        end_of_file=seek_offset;
 
226
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
 
227
      if (end_of_file_local < seek_offset)
 
228
        end_of_file_local=seek_offset;
207
229
      /* Trim cache size if the file is very small */
208
 
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
 
230
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
209
231
      {
210
 
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
 
232
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
211
233
        use_async_io=0;                         /* No need to use async */
212
234
      }
213
235
    }
214
236
  }
215
237
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
216
 
  if (type != READ_NET && type != WRITE_NET)
 
238
  if (type_arg != READ_NET && type_arg != WRITE_NET)
217
239
  {
218
240
    /* Retry allocating memory in smaller blocks until we get one */
219
241
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
223
245
      if (cachesize < min_cache)
224
246
        cachesize = min_cache;
225
247
      buffer_block= cachesize;
226
 
      if (type == SEQ_READ_APPEND)
227
 
        buffer_block *= 2;
228
 
      if ((info->buffer=
 
248
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
 
249
      {
 
250
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
 
251
        return 2;
 
252
      }
 
253
 
 
254
      if ((buffer=
229
255
           (unsigned char*) malloc(buffer_block)) != 0)
230
256
      {
231
 
        info->write_buffer=info->buffer;
232
 
        if (type == SEQ_READ_APPEND)
233
 
          info->write_buffer = info->buffer + cachesize;
234
 
        info->alloced_buffer=1;
 
257
        write_buffer=buffer;
 
258
        alloced_buffer= true;
235
259
        break;                                  /* Enough memory found */
236
260
      }
237
261
      if (cachesize == min_cache)
238
 
        return(2);                              /* Can't alloc cache */
 
262
      {
 
263
        if (type_arg == READ_CACHE)
 
264
          global_read_buffer.sub(buffer_block);
 
265
        return 2;                               /* Can't alloc cache */
 
266
      }
239
267
      /* Try with less memory */
 
268
      if (type_arg == READ_CACHE)
 
269
        global_read_buffer.sub(buffer_block);
240
270
      cachesize= (cachesize*3/4 & ~(min_cache-1));
241
271
    }
242
272
  }
243
273
 
244
 
  info->read_length=info->buffer_length=cachesize;
245
 
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
246
 
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
247
 
  if (type == SEQ_READ_APPEND)
248
 
  {
249
 
    info->append_read_pos = info->write_pos = info->write_buffer;
250
 
    info->write_end = info->write_buffer + info->buffer_length;
251
 
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
252
 
  }
 
274
  read_length=buffer_length=cachesize;
 
275
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
276
  request_pos= read_pos= write_pos = buffer;
253
277
 
254
 
  if (type == WRITE_CACHE)
255
 
    info->write_end=
256
 
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
 
278
  if (type_arg == WRITE_CACHE)
 
279
    write_end=
 
280
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
257
281
  else
258
 
    info->read_end=info->buffer;                /* Nothing in cache */
 
282
    read_end=buffer;            /* Nothing in cache */
259
283
 
260
284
  /* End_of_file may be changed by user later */
261
 
  info->end_of_file= end_of_file;
262
 
  info->error=0;
263
 
  info->type= type;
264
 
  init_functions(info);
 
285
  end_of_file= end_of_file_local;
 
286
  error= 0;
 
287
  type= type_arg;
 
288
  init_functions();
265
289
#ifdef HAVE_AIOWAIT
266
290
  if (use_async_io && ! my_disable_async_io)
267
291
  {
268
 
    info->read_length/=2;
269
 
    info->read_function=_my_b_async_read;
 
292
    read_length/=2;
 
293
    read_function=_my_b_async_read;
270
294
  }
271
 
  info->inited=info->aio_result.pending=0;
 
295
  inited= aio_result.pending= 0;
272
296
#endif
273
 
  return(0);
 
297
  return 0;
274
298
}                                               /* init_io_cache */
275
299
 
276
300
        /* Wait until current request is ready */
295
319
        break;
296
320
    }
297
321
  }
298
 
  return;
299
322
}
300
323
#endif
301
324
 
302
 
 
303
 
/*
304
 
  Use this to reset cache to re-start reading or to change the type
305
 
  between READ_CACHE <-> WRITE_CACHE
306
 
  If we are doing a reinit of a cache where we have the start of the file
307
 
  in the cache, we are reusing this memory without flushing it to disk.
308
 
*/
309
 
 
310
 
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
311
 
                        my_off_t seek_offset,
312
 
                        bool use_async_io,
313
 
                        bool clear_cache)
 
325
/**
 
326
 * @brief 
 
327
 *   Reset the cache
 
328
 * 
 
329
 * @detail
 
330
 *   Use this to reset cache to re-start reading or to change the type 
 
331
 *   between READ_CACHE <-> WRITE_CACHE
 
332
 *   If we are doing a reinit of a cache where we have the start of the file
 
333
 *   in the cache, we are reusing this memory without flushing it to disk.
 
334
 */
 
335
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
 
336
                                  my_off_t seek_offset,
 
337
                                  bool use_async_io,
 
338
                                  bool clear_cache)
314
339
{
315
340
  /* One can't do reinit with the following types */
316
 
  assert(type != READ_NET && info->type != READ_NET &&
317
 
              type != WRITE_NET && info->type != WRITE_NET &&
318
 
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
 
341
  assert(type_arg != READ_NET && type != READ_NET &&
 
342
              type_arg != WRITE_NET && type != WRITE_NET);
319
343
 
320
344
  /* If the whole file is in memory, avoid flushing to disk */
321
345
  if (! clear_cache &&
322
 
      seek_offset >= info->pos_in_file &&
323
 
      seek_offset <= my_b_tell(info))
 
346
      seek_offset >= pos_in_file &&
 
347
      seek_offset <= my_b_tell(this))
324
348
  {
325
349
    /* Reuse current buffer without flushing it to disk */
326
350
    unsigned char *pos;
327
 
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
351
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
328
352
    {
329
 
      info->read_end=info->write_pos;
330
 
      info->end_of_file=my_b_tell(info);
 
353
      read_end=write_pos;
 
354
      end_of_file=my_b_tell(this);
331
355
      /*
332
356
        Trigger a new seek only if we have a valid
333
357
        file handle.
334
358
      */
335
 
      info->seek_not_done= (info->file != -1);
 
359
      seek_not_done= (file != -1);
336
360
    }
337
 
    else if (type == WRITE_CACHE)
 
361
    else if (type_arg == WRITE_CACHE)
338
362
    {
339
 
      if (info->type == READ_CACHE)
 
363
      if (type == READ_CACHE)
340
364
      {
341
 
        info->write_end=info->write_buffer+info->buffer_length;
342
 
        info->seek_not_done=1;
 
365
        write_end=write_buffer+buffer_length;
 
366
        seek_not_done=1;
343
367
      }
344
 
      info->end_of_file = ~(my_off_t) 0;
 
368
      end_of_file = ~(my_off_t) 0;
345
369
    }
346
 
    pos=info->request_pos+(seek_offset-info->pos_in_file);
347
 
    if (type == WRITE_CACHE)
348
 
      info->write_pos=pos;
 
370
    pos=request_pos+(seek_offset-pos_in_file);
 
371
    if (type_arg == WRITE_CACHE)
 
372
      write_pos=pos;
349
373
    else
350
 
      info->read_pos= pos;
 
374
      read_pos= pos;
351
375
#ifdef HAVE_AIOWAIT
352
 
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
 
376
    my_aiowait(&aio_result);            /* Wait for outstanding req */
353
377
#endif
354
378
  }
355
379
  else
358
382
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
359
383
      after the current positions should be ignored
360
384
    */
361
 
    if (info->type == WRITE_CACHE && type == READ_CACHE)
362
 
      info->end_of_file=my_b_tell(info);
 
385
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
386
      end_of_file=my_b_tell(this);
363
387
    /* flush cache if we want to reuse it */
364
 
    if (!clear_cache && my_b_flush_io_cache(info,1))
365
 
      return(1);
366
 
    info->pos_in_file=seek_offset;
 
388
    if (!clear_cache && my_b_flush_io_cache(this, 1))
 
389
      return 1;
 
390
    pos_in_file=seek_offset;
367
391
    /* Better to do always do a seek */
368
 
    info->seek_not_done=1;
369
 
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
370
 
    if (type == READ_CACHE)
 
392
    seek_not_done=1;
 
393
    request_pos=read_pos=write_pos=buffer;
 
394
    if (type_arg == READ_CACHE)
371
395
    {
372
 
      info->read_end=info->buffer;              /* Nothing in cache */
 
396
      read_end=buffer;          /* Nothing in cache */
373
397
    }
374
398
    else
375
399
    {
376
 
      info->write_end=(info->buffer + info->buffer_length -
 
400
      write_end=(buffer + buffer_length -
377
401
                       (seek_offset & (IO_SIZE-1)));
378
 
      info->end_of_file= ~(my_off_t) 0;
 
402
      end_of_file= ~(my_off_t) 0;
379
403
    }
380
404
  }
381
 
  info->type=type;
382
 
  info->error=0;
383
 
  init_functions(info);
 
405
  type= type_arg;
 
406
  error=0;
 
407
  init_functions();
384
408
 
385
409
#ifdef HAVE_AIOWAIT
386
410
  if (use_async_io && ! my_disable_async_io &&
387
 
      ((uint32_t) info->buffer_length <
388
 
       (uint32_t) (info->end_of_file - seek_offset)))
 
411
      ((uint32_t) buffer_length <
 
412
       (uint32_t) (end_of_file - seek_offset)))
389
413
  {
390
 
    info->read_length=info->buffer_length/2;
391
 
    info->read_function=_my_b_async_read;
 
414
    read_length=buffer_length/2;
 
415
    read_function=_my_b_async_read;
392
416
  }
393
 
  info->inited=0;
 
417
  inited= 0;
394
418
#else
395
419
  (void)use_async_io;
396
420
#endif
397
 
  return(0);
 
421
  return 0;
398
422
} /* reinit_io_cache */
399
423
 
400
 
 
401
 
 
402
 
/*
403
 
  Read buffered.
404
 
 
405
 
  SYNOPSIS
406
 
    _my_b_read()
407
 
      info                      IO_CACHE pointer
408
 
      Buffer                    Buffer to retrieve count bytes from file
409
 
      Count                     Number of bytes to read into Buffer
410
 
 
411
 
  NOTE
412
 
    This function is only called from the my_b_read() macro when there
413
 
    isn't enough characters in the buffer to satisfy the request.
414
 
 
415
 
  WARNING
416
 
 
417
 
    When changing this function, be careful with handling file offsets
418
 
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
419
 
    types than my_off_t unless you can be sure that their value fits.
420
 
    Same applies to differences of file offsets.
421
 
 
422
 
    When changing this function, check _my_b_read_r(). It might need the
423
 
    same change.
424
 
 
425
 
  RETURN
426
 
    0      we succeeded in reading all data
427
 
    1      Error: can't read requested characters
428
 
*/
429
 
 
430
 
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
424
/**
 
425
 * @brief 
 
426
 *   Read buffered.
 
427
 * 
 
428
 * @detail
 
429
 *   This function is only called from the my_b_read() macro when there
 
430
 *   aren't enough characters in the buffer to satisfy the request.
 
431
 *
 
432
 * WARNING
 
433
 *   When changing this function, be careful with handling file offsets
 
434
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
435
 *   types than my_off_t unless you can be sure that their value fits.
 
436
 *   Same applies to differences of file offsets.
 
437
 *
 
438
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
 
439
 * from file @param Count Number of bytes to read into Buffer
 
440
 * 
 
441
 * @retval 0 We succeeded in reading all data
 
442
 * @retval 1 Error: can't read requested characters
 
443
 */
 
444
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
431
445
{
432
 
  size_t length,diff_length,left_length, max_length;
433
 
  my_off_t pos_in_file;
 
446
  size_t length_local,diff_length,left_length, max_length;
 
447
  my_off_t pos_in_file_local;
434
448
 
435
449
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
436
450
  {
441
455
  }
442
456
 
443
457
  /* pos_in_file always point on where info->buffer was read */
444
 
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
458
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
445
459
 
446
460
  /*
447
 
    Whenever a function which operates on IO_CACHE flushes/writes
448
 
    some part of the IO_CACHE to disk it will set the property
 
461
    Whenever a function which operates on st_io_cache flushes/writes
 
462
    some part of the st_io_cache to disk it will set the property
449
463
    "seek_not_done" to indicate this to other functions operating
450
 
    on the IO_CACHE.
 
464
    on the st_io_cache.
451
465
  */
452
466
  if (info->seek_not_done)
453
467
  {
454
 
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
 
468
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
455
469
    {
456
470
      /* No error, reset seek_not_done flag. */
457
471
      info->seek_not_done= 0;
463
477
        info->file is a pipe or socket or FIFO.  We never should have tried
464
478
        to seek on that.  See Bugs#25807 and #22828 for more info.
465
479
      */
466
 
      assert(my_errno != ESPIPE);
 
480
      assert(errno != ESPIPE);
467
481
      info->error= -1;
468
482
      return(1);
469
483
    }
470
484
  }
471
485
 
472
 
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
486
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
473
487
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
474
488
  {                                     /* Fill first intern buffer */
475
489
    size_t read_length;
476
 
    if (info->end_of_file <= pos_in_file)
 
490
    if (info->end_of_file <= pos_in_file_local)
477
491
    {                                   /* End of file */
478
492
      info->error= (int) left_length;
479
493
      return(1);
480
494
    }
481
 
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
482
 
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
483
 
        != length)
 
495
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
496
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
484
497
    {
485
498
      info->error= (read_length == (size_t) -1 ? -1 :
486
499
                    (int) (read_length+left_length));
487
500
      return(1);
488
501
    }
489
 
    Count-=length;
490
 
    Buffer+=length;
491
 
    pos_in_file+=length;
492
 
    left_length+=length;
 
502
    Count-= length_local;
 
503
    Buffer+= length_local;
 
504
    pos_in_file_local+= length_local;
 
505
    left_length+= length_local;
493
506
    diff_length=0;
494
507
  }
495
508
 
496
509
  max_length= info->read_length-diff_length;
497
510
  if (info->type != READ_FIFO &&
498
 
      max_length > (info->end_of_file - pos_in_file))
499
 
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
511
      max_length > (info->end_of_file - pos_in_file_local))
 
512
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
500
513
  if (!max_length)
501
514
  {
502
515
    if (Count)
503
516
    {
504
 
      info->error= left_length;         /* We only got this many char */
 
517
      info->error= static_cast<int>(left_length);       /* We only got this many char */
505
518
      return(1);
506
519
    }
507
 
    length=0;                           /* Didn't read any chars */
 
520
     length_local=0;                            /* Didn't read any chars */
508
521
  }
509
 
  else if ((length= my_read(info->file,info->buffer, max_length,
 
522
  else if (( length_local= my_read(info->file,info->buffer, max_length,
510
523
                            info->myflags)) < Count ||
511
 
           length == (size_t) -1)
 
524
            length_local == (size_t) -1)
512
525
  {
513
 
    if (length != (size_t) -1)
514
 
      memcpy(Buffer, info->buffer, length);
515
 
    info->pos_in_file= pos_in_file;
516
 
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
 
526
    if ( length_local != (size_t) -1)
 
527
      memcpy(Buffer, info->buffer,  length_local);
 
528
    info->pos_in_file= pos_in_file_local;
 
529
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
517
530
    info->read_pos=info->read_end=info->buffer;
518
531
    return(1);
519
532
  }
520
533
  info->read_pos=info->buffer+Count;
521
 
  info->read_end=info->buffer+length;
522
 
  info->pos_in_file=pos_in_file;
 
534
  info->read_end=info->buffer+ length_local;
 
535
  info->pos_in_file=pos_in_file_local;
523
536
  memcpy(Buffer, info->buffer, Count);
524
537
  return(0);
525
538
}
526
539
 
527
540
 
528
 
/*
529
 
  Prepare IO_CACHE for shared use.
530
 
 
531
 
  SYNOPSIS
532
 
    init_io_cache_share()
533
 
      read_cache                A read cache. This will be copied for
534
 
                                every thread after setup.
535
 
      cshare                    The share.
536
 
      write_cache               If non-NULL a write cache that is to be
537
 
                                synchronized with the read caches.
538
 
      num_threads               Number of threads sharing the cache
539
 
                                including the write thread if any.
540
 
 
541
 
  DESCRIPTION
542
 
 
543
 
    The shared cache is used so: One IO_CACHE is initialized with
544
 
    init_io_cache(). This includes the allocation of a buffer. Then a
545
 
    share is allocated and init_io_cache_share() is called with the io
546
 
    cache and the share. Then the io cache is copied for each thread. So
547
 
    every thread has its own copy of IO_CACHE. But the allocated buffer
548
 
    is shared because cache->buffer is the same for all caches.
549
 
 
550
 
    One thread reads data from the file into the buffer. All threads
551
 
    read from the buffer, but every thread maintains its own set of
552
 
    pointers into the buffer. When all threads have used up the buffer
553
 
    contents, one of the threads reads the next block of data into the
554
 
    buffer. To accomplish this, each thread enters the cache lock before
555
 
    accessing the buffer. They wait in lock_io_cache() until all threads
556
 
    joined the lock. The last thread entering the lock is in charge of
557
 
    reading from file to buffer. It wakes all threads when done.
558
 
 
559
 
    Synchronizing a write cache to the read caches works so: Whenever
560
 
    the write buffer needs a flush, the write thread enters the lock and
561
 
    waits for all other threads to enter the lock too. They do this when
562
 
    they have used up the read buffer. When all threads are in the lock,
563
 
    the write thread copies the write buffer to the read buffer and
564
 
    wakes all threads.
565
 
 
566
 
    share->running_threads is the number of threads not being in the
567
 
    cache lock. When entering lock_io_cache() the number is decreased.
568
 
    When the thread that fills the buffer enters unlock_io_cache() the
569
 
    number is reset to the number of threads. The condition
570
 
    running_threads == 0 means that all threads are in the lock. Bumping
571
 
    up the number to the full count is non-intuitive. But increasing the
572
 
    number by one for each thread that leaves the lock could lead to a
573
 
    solo run of one thread. The last thread to join a lock reads from
574
 
    file to buffer, wakes the other threads, processes the data in the
575
 
    cache and enters the lock again. If no other thread left the lock
576
 
    meanwhile, it would think it's the last one again and read the next
577
 
    block...
578
 
 
579
 
    The share has copies of 'error', 'buffer', 'read_end', and
580
 
    'pos_in_file' from the thread that filled the buffer. We may not be
581
 
    able to access this information directly from its cache because the
582
 
    thread may be removed from the share before the variables could be
583
 
    copied by all other threads. Or, if a write buffer is synchronized,
584
 
    it would change its 'pos_in_file' after waking the other threads,
585
 
    possibly before they could copy its value.
586
 
 
587
 
    However, the 'buffer' variable in the share is for a synchronized
588
 
    write cache. It needs to know where to put the data. Otherwise it
589
 
    would need access to the read cache of one of the threads that is
590
 
    not yet removed from the share.
591
 
 
592
 
  RETURN
593
 
    void
594
 
*/
595
 
 
596
 
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
597
 
                         IO_CACHE *write_cache, uint32_t num_threads)
598
 
{
599
 
  assert(num_threads > 1);
600
 
  assert(read_cache->type == READ_CACHE);
601
 
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
602
 
 
603
 
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
604
 
  pthread_cond_init(&cshare->cond, 0);
605
 
  pthread_cond_init(&cshare->cond_writer, 0);
606
 
 
607
 
  cshare->running_threads= num_threads;
608
 
  cshare->total_threads=   num_threads;
609
 
  cshare->error=           0;    /* Initialize. */
610
 
  cshare->buffer=          read_cache->buffer;
611
 
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
612
 
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
613
 
  cshare->source_cache=    write_cache; /* Can be NULL. */
614
 
 
615
 
  read_cache->share=         cshare;
616
 
  read_cache->read_function= _my_b_read_r;
617
 
  read_cache->current_pos=   NULL;
618
 
  read_cache->current_end=   NULL;
619
 
 
620
 
  if (write_cache)
621
 
    write_cache->share= cshare;
622
 
 
623
 
  return;
624
 
}
625
 
 
626
 
 
627
 
/*
628
 
  Remove a thread from shared access to IO_CACHE.
629
 
 
630
 
  SYNOPSIS
631
 
    remove_io_thread()
632
 
      cache                     The IO_CACHE to be removed from the share.
633
 
 
634
 
  NOTE
635
 
 
636
 
    Every thread must do that on exit for not to deadlock other threads.
637
 
 
638
 
    The last thread destroys the pthread resources.
639
 
 
640
 
    A writer flushes its cache first.
641
 
 
642
 
  RETURN
643
 
    void
644
 
*/
645
 
 
646
 
void remove_io_thread(IO_CACHE *cache)
647
 
{
648
 
  IO_CACHE_SHARE *cshare= cache->share;
649
 
  uint32_t total;
650
 
 
651
 
  /* If the writer goes, it needs to flush the write cache. */
652
 
  if (cache == cshare->source_cache)
653
 
    flush_io_cache(cache);
654
 
 
655
 
  pthread_mutex_lock(&cshare->mutex);
656
 
 
657
 
  /* Remove from share. */
658
 
  total= --cshare->total_threads;
659
 
 
660
 
  /* Detach from share. */
661
 
  cache->share= NULL;
662
 
 
663
 
  /* If the writer goes, let the readers know. */
664
 
  if (cache == cshare->source_cache)
665
 
  {
666
 
    cshare->source_cache= NULL;
667
 
  }
668
 
 
669
 
  /* If all threads are waiting for me to join the lock, wake them. */
670
 
  if (!--cshare->running_threads)
671
 
  {
672
 
    pthread_cond_signal(&cshare->cond_writer);
673
 
    pthread_cond_broadcast(&cshare->cond);
674
 
  }
675
 
 
676
 
  pthread_mutex_unlock(&cshare->mutex);
677
 
 
678
 
  if (!total)
679
 
  {
680
 
    pthread_cond_destroy (&cshare->cond_writer);
681
 
    pthread_cond_destroy (&cshare->cond);
682
 
    pthread_mutex_destroy(&cshare->mutex);
683
 
  }
684
 
 
685
 
  return;
686
 
}
687
 
 
688
 
 
689
 
/*
690
 
  Lock IO cache and wait for all other threads to join.
691
 
 
692
 
  SYNOPSIS
693
 
    lock_io_cache()
694
 
      cache                     The cache of the thread entering the lock.
695
 
      pos                       File position of the block to read.
696
 
                                Unused for the write thread.
697
 
 
698
 
  DESCRIPTION
699
 
 
700
 
    Wait for all threads to finish with the current buffer. We want
701
 
    all threads to proceed in concert. The last thread to join
702
 
    lock_io_cache() will read the block from file and all threads start
703
 
    to use it. Then they will join again for reading the next block.
704
 
 
705
 
    The waiting threads detect a fresh buffer by comparing
706
 
    cshare->pos_in_file with the position they want to process next.
707
 
    Since the first block may start at position 0, we take
708
 
    cshare->read_end as an additional condition. This variable is
709
 
    initialized to NULL and will be set after a block of data is written
710
 
    to the buffer.
711
 
 
712
 
  RETURN
713
 
    1           OK, lock in place, go ahead and read.
714
 
    0           OK, unlocked, another thread did the read.
715
 
*/
716
 
 
717
 
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
718
 
{
719
 
  IO_CACHE_SHARE *cshare= cache->share;
720
 
 
721
 
  /* Enter the lock. */
722
 
  pthread_mutex_lock(&cshare->mutex);
723
 
  cshare->running_threads--;
724
 
 
725
 
  if (cshare->source_cache)
726
 
  {
727
 
    /* A write cache is synchronized to the read caches. */
728
 
 
729
 
    if (cache == cshare->source_cache)
730
 
    {
731
 
      /* The writer waits until all readers are here. */
732
 
      while (cshare->running_threads)
733
 
      {
734
 
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
735
 
      }
736
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
737
 
      return(1);
738
 
    }
739
 
 
740
 
    /* The last thread wakes the writer. */
741
 
    if (!cshare->running_threads)
742
 
    {
743
 
      pthread_cond_signal(&cshare->cond_writer);
744
 
    }
745
 
 
746
 
    /*
747
 
      Readers wait until the data is copied from the writer. Another
748
 
      reason to stop waiting is the removal of the write thread. If this
749
 
      happens, we leave the lock with old data in the buffer.
750
 
    */
751
 
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
752
 
           cshare->source_cache)
753
 
    {
754
 
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
755
 
    }
756
 
 
757
 
    /*
758
 
      If the writer was removed from the share while this thread was
759
 
      asleep, we need to simulate an EOF condition. The writer cannot
760
 
      reset the share variables as they might still be in use by readers
761
 
      of the last block. When we awake here then because the last
762
 
      joining thread signalled us. If the writer is not the last, it
763
 
      will not signal. So it is safe to clear the buffer here.
764
 
    */
765
 
    if (!cshare->read_end || (cshare->pos_in_file < pos))
766
 
    {
767
 
      cshare->read_end= cshare->buffer; /* Empty buffer. */
768
 
      cshare->error= 0; /* EOF is not an error. */
769
 
    }
770
 
  }
771
 
  else
772
 
  {
773
 
    /*
774
 
      There are read caches only. The last thread arriving in
775
 
      lock_io_cache() continues with a locked cache and reads the block.
776
 
    */
777
 
    if (!cshare->running_threads)
778
 
    {
779
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
780
 
      return(1);
781
 
    }
782
 
 
783
 
    /*
784
 
      All other threads wait until the requested block is read by the
785
 
      last thread arriving. Another reason to stop waiting is the
786
 
      removal of a thread. If this leads to all threads being in the
787
 
      lock, we have to continue also. The first of the awaken threads
788
 
      will then do the read.
789
 
    */
790
 
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
791
 
           cshare->running_threads)
792
 
    {
793
 
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
794
 
    }
795
 
 
796
 
    /* If the block is not yet read, continue with a locked cache and read. */
797
 
    if (!cshare->read_end || (cshare->pos_in_file < pos))
798
 
    {
799
 
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
800
 
      return(1);
801
 
    }
802
 
 
803
 
    /* Another thread did read the block already. */
804
 
  }
805
 
 
806
 
  /*
807
 
    Leave the lock. Do not call unlock_io_cache() later. The thread that
808
 
    filled the buffer did this and marked all threads as running.
809
 
  */
810
 
  pthread_mutex_unlock(&cshare->mutex);
811
 
  return(0);
812
 
}
813
 
 
814
 
 
815
 
/*
816
 
  Unlock IO cache.
817
 
 
818
 
  SYNOPSIS
819
 
    unlock_io_cache()
820
 
      cache                     The cache of the thread leaving the lock.
821
 
 
822
 
  NOTE
823
 
    This is called by the thread that filled the buffer. It marks all
824
 
    threads as running and awakes them. This must not be done by any
825
 
    other thread.
826
 
 
827
 
    Do not signal cond_writer. Either there is no writer or the writer
828
 
    is the only one who can call this function.
829
 
 
830
 
    The reason for resetting running_threads to total_threads before
831
 
    waking all other threads is that it could be possible that this
832
 
    thread is so fast with processing the buffer that it enters the lock
833
 
    before even one other thread has left it. If every awoken thread
834
 
    would increase running_threads by one, this thread could think that
835
 
    he is again the last to join and would not wait for the other
836
 
    threads to process the data.
837
 
 
838
 
  RETURN
839
 
    void
840
 
*/
841
 
 
842
 
static void unlock_io_cache(IO_CACHE *cache)
843
 
{
844
 
  IO_CACHE_SHARE *cshare= cache->share;
845
 
 
846
 
  cshare->running_threads= cshare->total_threads;
847
 
  pthread_cond_broadcast(&cshare->cond);
848
 
  pthread_mutex_unlock(&cshare->mutex);
849
 
  return;
850
 
}
851
 
 
852
 
 
853
 
/*
854
 
  Read from IO_CACHE when it is shared between several threads.
855
 
 
856
 
  SYNOPSIS
857
 
    _my_b_read_r()
858
 
      cache                     IO_CACHE pointer
859
 
      Buffer                    Buffer to retrieve count bytes from file
860
 
      Count                     Number of bytes to read into Buffer
861
 
 
862
 
  NOTE
863
 
    This function is only called from the my_b_read() macro when there
864
 
    isn't enough characters in the buffer to satisfy the request.
865
 
 
866
 
  IMPLEMENTATION
867
 
 
868
 
    It works as follows: when a thread tries to read from a file (that
869
 
    is, after using all the data from the (shared) buffer), it just
870
 
    hangs on lock_io_cache(), waiting for other threads. When the very
871
 
    last thread attempts a read, lock_io_cache() returns 1, the thread
872
 
    does actual IO and unlock_io_cache(), which signals all the waiting
873
 
    threads that data is in the buffer.
874
 
 
875
 
  WARNING
876
 
 
877
 
    When changing this function, be careful with handling file offsets
878
 
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
879
 
    types than my_off_t unless you can be sure that their value fits.
880
 
    Same applies to differences of file offsets. (Bug #11527)
881
 
 
882
 
    When changing this function, check _my_b_read(). It might need the
883
 
    same change.
884
 
 
885
 
  RETURN
886
 
    0      we succeeded in reading all data
887
 
    1      Error: can't read requested characters
888
 
*/
889
 
 
890
 
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
891
 
{
892
 
  my_off_t pos_in_file;
893
 
  size_t length, diff_length, left_length;
894
 
  IO_CACHE_SHARE *cshare= cache->share;
895
 
 
896
 
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
897
 
  {
898
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
899
 
    memcpy(Buffer, cache->read_pos, left_length);
900
 
    Buffer+= left_length;
901
 
    Count-= left_length;
902
 
  }
903
 
  while (Count)
904
 
  {
905
 
    size_t cnt, len;
906
 
 
907
 
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
908
 
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
909
 
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
910
 
    length= ((length <= cache->read_length) ?
911
 
             length + IO_ROUND_DN(cache->read_length - length) :
912
 
             length - IO_ROUND_UP(length - cache->read_length));
913
 
    if (cache->type != READ_FIFO &&
914
 
        (length > (cache->end_of_file - pos_in_file)))
915
 
      length= (size_t) (cache->end_of_file - pos_in_file);
916
 
    if (length == 0)
917
 
    {
918
 
      cache->error= (int) left_length;
919
 
      return(1);
920
 
    }
921
 
    if (lock_io_cache(cache, pos_in_file))
922
 
    {
923
 
      /* With a synchronized write/read cache we won't come here... */
924
 
      assert(!cshare->source_cache);
925
 
      /*
926
 
        ... unless the writer has gone before this thread entered the
927
 
        lock. Simulate EOF in this case. It can be distinguished by
928
 
        cache->file.
929
 
      */
930
 
      if (cache->file < 0)
931
 
        len= 0;
932
 
      else
933
 
      {
934
 
        /*
935
 
          Whenever a function which operates on IO_CACHE flushes/writes
936
 
          some part of the IO_CACHE to disk it will set the property
937
 
          "seek_not_done" to indicate this to other functions operating
938
 
          on the IO_CACHE.
939
 
        */
940
 
        if (cache->seek_not_done)
941
 
        {
942
 
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
943
 
          {
944
 
            cache->error= -1;
945
 
            unlock_io_cache(cache);
946
 
            return(1);
947
 
          }
948
 
        }
949
 
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
950
 
      }
951
 
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
952
 
      cache->error=       (len == length ? 0 : (int) len);
953
 
      cache->pos_in_file= pos_in_file;
954
 
 
955
 
      /* Copy important values to the share. */
956
 
      cshare->error=       cache->error;
957
 
      cshare->read_end=    cache->read_end;
958
 
      cshare->pos_in_file= pos_in_file;
959
 
 
960
 
      /* Mark all threads as running and wake them. */
961
 
      unlock_io_cache(cache);
962
 
    }
963
 
    else
964
 
    {
965
 
      /*
966
 
        With a synchronized write/read cache readers always come here.
967
 
        Copy important values from the share.
968
 
      */
969
 
      cache->error=       cshare->error;
970
 
      cache->read_end=    cshare->read_end;
971
 
      cache->pos_in_file= cshare->pos_in_file;
972
 
 
973
 
      len= ((cache->error == -1) ? (size_t) -1 :
974
 
            (size_t) (cache->read_end - cache->buffer));
975
 
    }
976
 
    cache->read_pos=      cache->buffer;
977
 
    cache->seek_not_done= 0;
978
 
    if (len == 0 || len == (size_t) -1)
979
 
    {
980
 
      cache->error= (int) left_length;
981
 
      return(1);
982
 
    }
983
 
    cnt= (len > Count) ? Count : len;
984
 
    memcpy(Buffer, cache->read_pos, cnt);
985
 
    Count -= cnt;
986
 
    Buffer+= cnt;
987
 
    left_length+= cnt;
988
 
    cache->read_pos+= cnt;
989
 
  }
990
 
  return(0);
991
 
}
992
 
 
993
 
 
994
 
/*
995
 
  Copy data from write cache to read cache.
996
 
 
997
 
  SYNOPSIS
998
 
    copy_to_read_buffer()
999
 
      write_cache               The write cache.
1000
 
      write_buffer              The source of data, mostly the cache buffer.
1001
 
      write_length              The number of bytes to copy.
1002
 
 
1003
 
  NOTE
1004
 
    The write thread will wait for all read threads to join the cache
1005
 
    lock. Then it copies the data over and wakes the read threads.
1006
 
 
1007
 
  RETURN
1008
 
    void
1009
 
*/
1010
 
 
1011
 
static void copy_to_read_buffer(IO_CACHE *write_cache,
1012
 
                                const unsigned char *write_buffer, size_t write_length)
1013
 
{
1014
 
  IO_CACHE_SHARE *cshare= write_cache->share;
1015
 
 
1016
 
  assert(cshare->source_cache == write_cache);
1017
 
  /*
1018
 
    write_length is usually less or equal to buffer_length.
1019
 
    It can be bigger if _my_b_write() is called with a big length.
1020
 
  */
1021
 
  while (write_length)
1022
 
  {
1023
 
    size_t copy_length= min(write_length, write_cache->buffer_length);
1024
 
    int  rc;
1025
 
 
1026
 
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1027
 
    /* The writing thread does always have the lock when it awakes. */
1028
 
    assert(rc);
1029
 
 
1030
 
    memcpy(cshare->buffer, write_buffer, copy_length);
1031
 
 
1032
 
    cshare->error=       0;
1033
 
    cshare->read_end=    cshare->buffer + copy_length;
1034
 
    cshare->pos_in_file= write_cache->pos_in_file;
1035
 
 
1036
 
    /* Mark all threads as running and wake them. */
1037
 
    unlock_io_cache(write_cache);
1038
 
 
1039
 
    write_buffer+= copy_length;
1040
 
    write_length-= copy_length;
1041
 
  }
1042
 
}
1043
 
 
1044
 
 
1045
 
/*
1046
 
  Do sequential read from the SEQ_READ_APPEND cache.
1047
 
 
1048
 
  We do this in three stages:
1049
 
   - first read from info->buffer
1050
 
   - then if there are still data to read, try the file descriptor
1051
 
   - afterwards, if there are still data to read, try append buffer
1052
 
 
1053
 
  RETURNS
1054
 
    0  Success
1055
 
    1  Failed to read
1056
 
*/
1057
 
 
1058
 
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1059
 
{
1060
 
  size_t length, diff_length, left_length, save_count, max_length;
1061
 
  my_off_t pos_in_file;
1062
 
  save_count=Count;
1063
 
 
1064
 
  /* first, read the regular buffer */
1065
 
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1066
 
  {
1067
 
    assert(Count > left_length);        /* User is not using my_b_read() */
1068
 
    memcpy(Buffer,info->read_pos, left_length);
1069
 
    Buffer+=left_length;
1070
 
    Count-=left_length;
1071
 
  }
1072
 
  lock_append_buffer(info);
1073
 
 
1074
 
  /* pos_in_file always point on where info->buffer was read */
1075
 
  if ((pos_in_file=info->pos_in_file +
1076
 
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1077
 
    goto read_append_buffer;
1078
 
 
1079
 
  /*
1080
 
    With read-append cache we must always do a seek before we read,
1081
 
    because the write could have moved the file pointer astray
1082
 
  */
1083
 
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1084
 
  {
1085
 
   info->error= -1;
1086
 
   unlock_append_buffer(info);
1087
 
   return (1);
1088
 
  }
1089
 
  info->seek_not_done=0;
1090
 
 
1091
 
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1092
 
 
1093
 
  /* now the second stage begins - read from file descriptor */
1094
 
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1095
 
  {
1096
 
    /* Fill first intern buffer */
1097
 
    size_t read_length;
1098
 
 
1099
 
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1100
 
    if ((read_length= my_read(info->file,Buffer, length,
1101
 
                              info->myflags)) == (size_t) -1)
1102
 
    {
1103
 
      info->error= -1;
1104
 
      unlock_append_buffer(info);
1105
 
      return 1;
1106
 
    }
1107
 
    Count-=read_length;
1108
 
    Buffer+=read_length;
1109
 
    pos_in_file+=read_length;
1110
 
 
1111
 
    if (read_length != length)
1112
 
    {
1113
 
      /*
1114
 
        We only got part of data;  Read the rest of the data from the
1115
 
        write buffer
1116
 
      */
1117
 
      goto read_append_buffer;
1118
 
    }
1119
 
    left_length+=length;
1120
 
    diff_length=0;
1121
 
  }
1122
 
 
1123
 
  max_length= info->read_length-diff_length;
1124
 
  if (max_length > (info->end_of_file - pos_in_file))
1125
 
    max_length= (size_t) (info->end_of_file - pos_in_file);
1126
 
  if (!max_length)
1127
 
  {
1128
 
    if (Count)
1129
 
      goto read_append_buffer;
1130
 
    length=0;                           /* Didn't read any more chars */
1131
 
  }
1132
 
  else
1133
 
  {
1134
 
    length= my_read(info->file,info->buffer, max_length, info->myflags);
1135
 
    if (length == (size_t) -1)
1136
 
    {
1137
 
      info->error= -1;
1138
 
      unlock_append_buffer(info);
1139
 
      return 1;
1140
 
    }
1141
 
    if (length < Count)
1142
 
    {
1143
 
      memcpy(Buffer, info->buffer, length);
1144
 
      Count -= length;
1145
 
      Buffer += length;
1146
 
 
1147
 
      /*
1148
 
         added the line below to make
1149
 
         assert(pos_in_file==info->end_of_file) pass.
1150
 
         otherwise this does not appear to be needed
1151
 
      */
1152
 
      pos_in_file += length;
1153
 
      goto read_append_buffer;
1154
 
    }
1155
 
  }
1156
 
  unlock_append_buffer(info);
1157
 
  info->read_pos=info->buffer+Count;
1158
 
  info->read_end=info->buffer+length;
1159
 
  info->pos_in_file=pos_in_file;
1160
 
  memcpy(Buffer,info->buffer,(size_t) Count);
1161
 
  return 0;
1162
 
 
1163
 
read_append_buffer:
1164
 
 
1165
 
  /*
1166
 
     Read data from the current write buffer.
1167
 
     Count should never be == 0 here (The code will work even if count is 0)
1168
 
  */
1169
 
 
1170
 
  {
1171
 
    /* First copy the data to Count */
1172
 
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1173
 
    size_t copy_len;
1174
 
    size_t transfer_len;
1175
 
 
1176
 
    assert(info->append_read_pos <= info->write_pos);
1177
 
    /*
1178
 
      TODO: figure out if the assert below is needed or correct.
1179
 
    */
1180
 
    assert(pos_in_file == info->end_of_file);
1181
 
    copy_len=min(Count, len_in_buff);
1182
 
    memcpy(Buffer, info->append_read_pos, copy_len);
1183
 
    info->append_read_pos += copy_len;
1184
 
    Count -= copy_len;
1185
 
    if (Count)
1186
 
      info->error = save_count - Count;
1187
 
 
1188
 
    /* Fill read buffer with data from write buffer */
1189
 
    memcpy(info->buffer, info->append_read_pos,
1190
 
           (size_t) (transfer_len=len_in_buff - copy_len));
1191
 
    info->read_pos= info->buffer;
1192
 
    info->read_end= info->buffer+transfer_len;
1193
 
    info->append_read_pos=info->write_pos;
1194
 
    info->pos_in_file=pos_in_file+copy_len;
1195
 
    info->end_of_file+=len_in_buff;
1196
 
  }
1197
 
  unlock_append_buffer(info);
1198
 
  return Count ? 1 : 0;
1199
 
}
1200
 
 
1201
 
 
1202
541
#ifdef HAVE_AIOWAIT
1203
542
 
1204
 
/*
1205
 
  Read from the IO_CACHE into a buffer and feed asynchronously
1206
 
  from disk when needed.
1207
 
 
1208
 
  SYNOPSIS
1209
 
    _my_b_async_read()
1210
 
      info                      IO_CACHE pointer
1211
 
      Buffer                    Buffer to retrieve count bytes from file
1212
 
      Count                     Number of bytes to read into Buffer
1213
 
 
1214
 
  RETURN VALUE
1215
 
    -1          An error has occurred; my_errno is set.
1216
 
     0          Success
1217
 
     1          An error has occurred; IO_CACHE to error state.
1218
 
*/
1219
 
 
1220
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
543
/**
 
544
 * @brief
 
545
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
 
546
 *
 
547
 * @param info st_io_cache pointer
 
548
 * @param Buffer Buffer to retrieve count bytes from file
 
549
 * @param Count Number of bytes to read into Buffer
 
550
 * 
 
551
 * @retval -1 An error has occurred; errno is set.
 
552
 * @retval 0 Success
 
553
 * @retval 1 An error has occurred; st_io_cache to error state.
 
554
 */
 
555
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
1221
556
{
1222
 
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
 
557
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
1223
558
  size_t max_length;
1224
559
  my_off_t next_pos_in_file;
1225
560
  unsigned char *read_buffer;
1240
575
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1241
576
                 my_filename(info->file),
1242
577
                 info->aio_result.result.aio_errno);
1243
 
      my_errno=info->aio_result.result.aio_errno;
 
578
      errno=info->aio_result.result.aio_errno;
1244
579
      info->error= -1;
1245
580
      return(1);
1246
581
    }
1247
582
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1248
583
        read_length == (size_t) -1)
1249
584
    {
1250
 
      my_errno=0;                               /* For testing */
 
585
      errno=0;                          /* For testing */
1251
586
      info->error= (read_length == (size_t) -1 ? -1 :
1252
587
                    (int) (read_length+left_length));
1253
588
      return(1);
1280
615
      }
1281
616
    }
1282
617
        /* Copy found bytes to buffer */
1283
 
    length=min(Count,read_length);
1284
 
    memcpy(Buffer,info->read_pos,(size_t) length);
1285
 
    Buffer+=length;
1286
 
    Count-=length;
1287
 
    left_length+=length;
 
618
    length_local=min(Count,read_length);
 
619
    memcpy(Buffer,info->read_pos,(size_t) length_local);
 
620
    Buffer+=length_local;
 
621
    Count-=length_local;
 
622
    left_length+=length_local;
1288
623
    info->read_end=info->rc_pos+read_length;
1289
 
    info->read_pos+=length;
 
624
    info->read_pos+=length_local;
1290
625
  }
1291
626
  else
1292
627
    next_pos_in_file=(info->pos_in_file+ (size_t)
1324
659
      {                                 /* Didn't find hole block */
1325
660
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1326
661
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1327
 
                   my_filename(info->file),my_errno);
 
662
                   my_filename(info->file),errno);
1328
663
        info->error=(int) (read_length+left_length);
1329
664
        return 1;
1330
665
      }
1360
695
                (my_off_t) next_pos_in_file,SEEK_SET,
1361
696
                &info->aio_result.result))
1362
697
    {                                           /* Skip async io */
1363
 
      my_errno=errno;
 
698
      errno=errno;
1364
699
      if (info->request_pos != info->buffer)
1365
700
      {
1366
701
        memmove(info->buffer, info->request_pos,
1380
715
#endif
1381
716
 
1382
717
 
1383
 
/* Read one byte when buffer is empty */
1384
 
 
1385
 
int _my_b_get(IO_CACHE *info)
 
718
/**
 
719
 * @brief
 
720
 *   Read one byte when buffer is empty
 
721
 */
 
722
int _my_b_get(st_io_cache *info)
1386
723
{
1387
724
  unsigned char buff;
1388
725
  IO_CACHE_CALLBACK pre_read,post_read;
1395
732
  return (int) (unsigned char) buff;
1396
733
}
1397
734
 
1398
 
/*
1399
 
   Write a byte buffer to IO_CACHE and flush to disk
1400
 
   if IO_CACHE is full.
1401
 
 
1402
 
   RETURN VALUE
1403
 
    1 On error on write
1404
 
    0 On success
1405
 
   -1 On error; my_errno contains error code.
1406
 
*/
1407
 
 
1408
 
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
735
/**
 
736
 * @brief
 
737
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
 
738
 *
 
739
 * @retval -1 On error; errno contains error code.
 
740
 * @retval 0 On success
 
741
 * @retval 1 On error on write
 
742
 */
 
743
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
1409
744
{
1410
 
  size_t rest_length,length;
 
745
  size_t rest_length,length_local;
1411
746
 
1412
747
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1413
748
  {
1414
 
    my_errno=errno=EFBIG;
 
749
    errno=EFBIG;
1415
750
    return info->error = -1;
1416
751
  }
1417
752
 
1425
760
    return 1;
1426
761
  if (Count >= IO_SIZE)
1427
762
  {                                     /* Fill first intern buffer */
1428
 
    length=Count & (size_t) ~(IO_SIZE-1);
 
763
    length_local=Count & (size_t) ~(IO_SIZE-1);
1429
764
    if (info->seek_not_done)
1430
765
    {
1431
766
      /*
1432
 
        Whenever a function which operates on IO_CACHE flushes/writes
1433
 
        some part of the IO_CACHE to disk it will set the property
 
767
        Whenever a function which operates on st_io_cache flushes/writes
 
768
        some part of the st_io_cache to disk it will set the property
1434
769
        "seek_not_done" to indicate this to other functions operating
1435
 
        on the IO_CACHE.
 
770
        on the st_io_cache.
1436
771
      */
1437
772
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
1438
773
      {
1441
776
      }
1442
777
      info->seek_not_done=0;
1443
778
    }
1444
 
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1445
 
      return info->error= -1;
1446
 
 
1447
 
    /*
1448
 
      In case of a shared I/O cache with a writer we normally do direct
1449
 
      write cache to read cache copy. Simulate this here by direct
1450
 
      caller buffer to read cache copy. Do it after the write so that
1451
 
      the cache readers actions on the flushed part can go in parallel
1452
 
      with the write of the extra stuff. copy_to_read_buffer()
1453
 
      synchronizes writer and readers so that after this call the
1454
 
      readers can act on the extra stuff while the writer can go ahead
1455
 
      and prepare the next output. copy_to_read_buffer() relies on
1456
 
      info->pos_in_file.
1457
 
    */
1458
 
    if (info->share)
1459
 
      copy_to_read_buffer(info, Buffer, length);
1460
 
 
1461
 
    Count-=length;
1462
 
    Buffer+=length;
1463
 
    info->pos_in_file+=length;
1464
 
  }
1465
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
1466
 
  info->write_pos+=Count;
1467
 
  return 0;
1468
 
}
1469
 
 
1470
 
 
1471
 
/*
1472
 
  Append a block to the write buffer.
1473
 
  This is done with the buffer locked to ensure that we don't read from
1474
 
  the write buffer before we are ready with it.
1475
 
*/
1476
 
 
1477
 
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1478
 
{
1479
 
  size_t rest_length,length;
1480
 
 
1481
 
  /*
1482
 
    Assert that we cannot come here with a shared cache. If we do one
1483
 
    day, we might need to add a call to copy_to_read_buffer().
1484
 
  */
1485
 
  assert(!info->share);
1486
 
 
1487
 
  lock_append_buffer(info);
1488
 
  rest_length= (size_t) (info->write_end - info->write_pos);
1489
 
  if (Count <= rest_length)
1490
 
    goto end;
1491
 
  memcpy(info->write_pos, Buffer, rest_length);
1492
 
  Buffer+=rest_length;
1493
 
  Count-=rest_length;
1494
 
  info->write_pos+=rest_length;
1495
 
  if (my_b_flush_io_cache(info,0))
1496
 
  {
1497
 
    unlock_append_buffer(info);
1498
 
    return 1;
1499
 
  }
1500
 
  if (Count >= IO_SIZE)
1501
 
  {                                     /* Fill first intern buffer */
1502
 
    length=Count & (size_t) ~(IO_SIZE-1);
1503
 
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1504
 
    {
1505
 
      unlock_append_buffer(info);
1506
 
      return info->error= -1;
1507
 
    }
1508
 
    Count-=length;
1509
 
    Buffer+=length;
1510
 
    info->end_of_file+=length;
1511
 
  }
1512
 
 
1513
 
end:
1514
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
1515
 
  info->write_pos+=Count;
1516
 
  unlock_append_buffer(info);
1517
 
  return 0;
1518
 
}
1519
 
 
1520
 
 
1521
 
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1522
 
{
1523
 
  /*
1524
 
    Sasha: We are not writing this with the ? operator to avoid hitting
1525
 
    a possible compiler bug. At least gcc 2.95 cannot deal with
1526
 
    several layers of ternary operators that evaluated comma(,) operator
1527
 
    expressions inside - I do have a test case if somebody wants it
1528
 
  */
1529
 
  if (info->type == SEQ_READ_APPEND)
1530
 
    return my_b_append(info, Buffer, Count);
1531
 
  return my_b_write(info, Buffer, Count);
1532
 
}
1533
 
 
1534
 
 
1535
 
/*
1536
 
  Write a block to disk where part of the data may be inside the record
1537
 
  buffer.  As all write calls to the data goes through the cache,
1538
 
  we will never get a seek over the end of the buffer
1539
 
*/
1540
 
 
1541
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
779
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
 
780
      return info->error= -1;
 
781
 
 
782
    Count-=length_local;
 
783
    Buffer+=length_local;
 
784
    info->pos_in_file+=length_local;
 
785
  }
 
786
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
787
  info->write_pos+=Count;
 
788
  return 0;
 
789
}
 
790
 
 
791
/**
 
792
 * @brief
 
793
 *   Write a block to disk where part of the data may be inside the record buffer.
 
794
 *   As all write calls to the data goes through the cache,
 
795
 *   we will never get a seek over the end of the buffer.
 
796
 */
 
797
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
1542
798
                   my_off_t pos)
1543
799
{
1544
 
  size_t length;
 
800
  size_t length_local;
1545
801
  int error=0;
1546
802
 
1547
 
  /*
1548
 
    Assert that we cannot come here with a shared cache. If we do one
1549
 
    day, we might need to add a call to copy_to_read_buffer().
1550
 
  */
1551
 
  assert(!info->share);
1552
 
 
1553
803
  if (pos < info->pos_in_file)
1554
804
  {
1555
805
    /* Of no overlap, write everything without buffering */
1556
806
    if (pos + Count <= info->pos_in_file)
1557
807
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1558
808
    /* Write the part of the block that is before buffer */
1559
 
    length= (uint32_t) (info->pos_in_file - pos);
1560
 
    if (pwrite(info->file, Buffer, length, pos) == 0)
 
809
    length_local= (uint32_t) (info->pos_in_file - pos);
 
810
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
1561
811
      info->error= error= -1;
1562
 
    Buffer+=length;
1563
 
    pos+=  length;
1564
 
    Count-= length;
1565
 
#ifndef HAVE_PREAD
1566
 
    info->seek_not_done=1;
1567
 
#endif
 
812
    Buffer+=length_local;
 
813
    pos+=  length_local;
 
814
    Count-= length_local;
1568
815
  }
1569
816
 
1570
817
  /* Check if we want to write inside the used part of the buffer.*/
1571
 
  length= (size_t) (info->write_end - info->buffer);
1572
 
  if (pos < info->pos_in_file + length)
 
818
  length_local= (size_t) (info->write_end - info->buffer);
 
819
  if (pos < info->pos_in_file + length_local)
1573
820
  {
1574
821
    size_t offset= (size_t) (pos - info->pos_in_file);
1575
 
    length-=offset;
1576
 
    if (length > Count)
1577
 
      length=Count;
1578
 
    memcpy(info->buffer+offset, Buffer, length);
1579
 
    Buffer+=length;
1580
 
    Count-= length;
1581
 
    /* Fix length of buffer if the new data was larger */
1582
 
    if (info->buffer+length > info->write_pos)
1583
 
      info->write_pos=info->buffer+length;
 
822
    length_local-=offset;
 
823
    if (length_local > Count)
 
824
      length_local=Count;
 
825
    memcpy(info->buffer+offset, Buffer, length_local);
 
826
    Buffer+=length_local;
 
827
    Count-= length_local;
 
828
    /* Fix length_local of buffer if the new data was larger */
 
829
    if (info->buffer+length_local > info->write_pos)
 
830
      info->write_pos=info->buffer+length_local;
1584
831
    if (!Count)
1585
832
      return (error);
1586
833
  }
1590
837
  return error;
1591
838
}
1592
839
 
1593
 
 
1594
 
        /* Flush write cache */
1595
 
 
1596
 
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1597
 
  lock_append_buffer(info);
1598
 
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1599
 
  unlock_append_buffer(info);
1600
 
 
1601
 
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
 
840
/**
 
841
 * @brief
 
842
 *   Flush write cache 
 
843
 */
 
844
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
1602
845
{
1603
 
  size_t length;
1604
 
  bool append_cache;
1605
 
  my_off_t pos_in_file;
1606
 
 
1607
 
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
1608
 
    need_append_buffer_lock=0;
 
846
  size_t length_local;
 
847
  bool append_cache= false;
 
848
  my_off_t pos_in_file_local;
1609
849
 
1610
850
  if (info->type == WRITE_CACHE || append_cache)
1611
851
  {
1612
852
    if (info->file == -1)
1613
853
    {
1614
 
      if (real_open_cached_file(info))
 
854
      if (info->real_open_cached_file())
1615
855
        return((info->error= -1));
1616
856
    }
1617
 
    LOCK_APPEND_BUFFER;
 
857
    lock_append_buffer(info, need_append_buffer_lock);
1618
858
 
1619
 
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
 
859
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
1620
860
    {
1621
 
      /*
1622
 
        In case of a shared I/O cache with a writer we do direct write
1623
 
        cache to read cache copy. Do it before the write here so that
1624
 
        the readers can work in parallel with the write.
1625
 
        copy_to_read_buffer() relies on info->pos_in_file.
1626
 
      */
1627
 
      if (info->share)
1628
 
        copy_to_read_buffer(info, info->write_buffer, length);
1629
 
 
1630
 
      pos_in_file=info->pos_in_file;
 
861
      pos_in_file_local=info->pos_in_file;
1631
862
      /*
1632
863
        If we have append cache, we always open the file with
1633
864
        O_APPEND which moves the pos to EOF automatically on every write
1634
865
      */
1635
866
      if (!append_cache && info->seek_not_done)
1636
867
      {                                 /* File touched, do seek */
1637
 
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
868
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
1638
869
        {
1639
 
          UNLOCK_APPEND_BUFFER;
 
870
          unlock_append_buffer(info, need_append_buffer_lock);
1640
871
          return((info->error= -1));
1641
872
        }
1642
873
        if (!append_cache)
1643
874
          info->seek_not_done=0;
1644
875
      }
1645
876
      if (!append_cache)
1646
 
        info->pos_in_file+=length;
 
877
        info->pos_in_file+=length_local;
1647
878
      info->write_end= (info->write_buffer+info->buffer_length-
1648
 
                        ((pos_in_file+length) & (IO_SIZE-1)));
 
879
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
1649
880
 
1650
 
      if (my_write(info->file,info->write_buffer,length,
 
881
      if (my_write(info->file,info->write_buffer,length_local,
1651
882
                   info->myflags | MY_NABP))
1652
883
        info->error= -1;
1653
884
      else
1654
885
        info->error= 0;
1655
886
      if (!append_cache)
1656
887
      {
1657
 
        set_if_bigger(info->end_of_file,(pos_in_file+length));
 
888
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
1658
889
      }
1659
890
      else
1660
891
      {
1664
895
      }
1665
896
 
1666
897
      info->append_read_pos=info->write_pos=info->write_buffer;
1667
 
      UNLOCK_APPEND_BUFFER;
 
898
      unlock_append_buffer(info, need_append_buffer_lock);
1668
899
      return(info->error);
1669
900
    }
1670
901
  }
1675
906
    info->inited=0;
1676
907
  }
1677
908
#endif
1678
 
  UNLOCK_APPEND_BUFFER;
 
909
  unlock_append_buffer(info, need_append_buffer_lock);
1679
910
  return(0);
1680
911
}
1681
912
 
1682
 
/*
1683
 
  Free an IO_CACHE object
1684
 
 
1685
 
  SYNOPSOS
1686
 
    end_io_cache()
1687
 
    info                IO_CACHE Handle to free
1688
 
 
1689
 
  NOTES
1690
 
    It's currently safe to call this if one has called init_io_cache()
1691
 
    on the 'info' object, even if init_io_cache() failed.
1692
 
    This function is also safe to call twice with the same handle.
1693
 
 
1694
 
  RETURN
1695
 
   0  ok
1696
 
   #  Error
1697
 
*/
1698
 
 
1699
 
int end_io_cache(IO_CACHE *info)
 
913
/**
 
914
 * @brief
 
915
 *   Free an st_io_cache object
 
916
 * 
 
917
 * @detail
 
918
 *   It's currently safe to call this if one has called init_io_cache()
 
919
 *   on the 'info' object, even if init_io_cache() failed.
 
920
 *   This function is also safe to call twice with the same handle.
 
921
 * 
 
922
 * @param info st_io_cache Handle to free
 
923
 * 
 
924
 * @retval 0 ok
 
925
 * @retval # Error
 
926
 */
 
927
int st_io_cache::end_io_cache()
1700
928
{
1701
 
  int error=0;
1702
 
  IO_CACHE_CALLBACK pre_close;
1703
 
 
1704
 
  /*
1705
 
    Every thread must call remove_io_thread(). The last one destroys
1706
 
    the share elements.
1707
 
  */
1708
 
  assert(!info->share || !info->share->total_threads);
1709
 
 
1710
 
  if ((pre_close=info->pre_close))
1711
 
  {
1712
 
    (*pre_close)(info);
1713
 
    info->pre_close= 0;
1714
 
  }
1715
 
  if (info->alloced_buffer)
1716
 
  {
1717
 
    info->alloced_buffer=0;
1718
 
    if (info->file != -1)                       /* File doesn't exist */
1719
 
      error= my_b_flush_io_cache(info,1);
1720
 
    free((unsigned char*) info->buffer);
1721
 
    info->buffer=info->read_pos=(unsigned char*) 0;
1722
 
  }
1723
 
  if (info->type == SEQ_READ_APPEND)
1724
 
  {
1725
 
    /* Destroy allocated mutex */
1726
 
    info->type= TYPE_NOT_SET;
1727
 
    pthread_mutex_destroy(&info->append_buffer_lock);
1728
 
  }
1729
 
  return(error);
 
929
  int _error=0;
 
930
 
 
931
  if (pre_close)
 
932
  {
 
933
    (*pre_close)(this);
 
934
    pre_close= 0;
 
935
  }
 
936
  if (alloced_buffer)
 
937
  {
 
938
    if (type == READ_CACHE)
 
939
      global_read_buffer.sub(buffer_length);
 
940
    alloced_buffer=0;
 
941
    if (file != -1)                     /* File doesn't exist */
 
942
      _error= my_b_flush_io_cache(this, 1);
 
943
    free((unsigned char*) buffer);
 
944
    buffer=read_pos=(unsigned char*) 0;
 
945
  }
 
946
 
 
947
  return _error;
1730
948
} /* end_io_cache */
1731
949
 
1732
 
 
1733
 
/**********************************************************************
1734
 
 Testing of MF_IOCACHE
1735
 
**********************************************************************/
1736
 
 
1737
 
#ifdef MAIN
1738
 
 
1739
 
#include <my_dir.h>
1740
 
 
1741
 
void die(const char* fmt, ...)
1742
 
{
1743
 
  va_list va_args;
1744
 
  va_start(va_args,fmt);
1745
 
  fprintf(stderr,"Error:");
1746
 
  vfprintf(stderr, fmt,va_args);
1747
 
  fprintf(stderr,", errno=%d\n", errno);
1748
 
  exit(1);
1749
 
}
1750
 
 
1751
 
int open_file(const char* fname, IO_CACHE* info, int cache_size)
1752
 
{
1753
 
  int fd;
1754
 
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1755
 
    die("Could not open %s", fname);
1756
 
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1757
 
    die("failed in init_io_cache()");
1758
 
  return fd;
1759
 
}
1760
 
 
1761
 
void close_file(IO_CACHE* info)
1762
 
{
1763
 
  end_io_cache(info);
1764
 
  my_close(info->file, MYF(MY_WME));
1765
 
}
1766
 
 
1767
 
int main(int argc, char** argv)
1768
 
{
1769
 
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1770
 
  MY_STAT status;
1771
 
  const char* fname="/tmp/iocache.test";
1772
 
  int cache_size=16384;
1773
 
  char llstr_buf[22];
1774
 
  int max_block,total_bytes=0;
1775
 
  int i,num_loops=100,error=0;
1776
 
  char *p;
1777
 
  char* block, *block_end;
1778
 
  MY_INIT(argv[0]);
1779
 
  max_block = cache_size*3;
1780
 
  if (!(block=(char*)malloc(max_block)))
1781
 
    die("Not enough memory to allocate test block");
1782
 
  block_end = block + max_block;
1783
 
  for (p = block,i=0; p < block_end;i++)
1784
 
  {
1785
 
    *p++ = (char)i;
1786
 
  }
1787
 
  if (my_stat(fname,&status, MYF(0)) &&
1788
 
      my_delete(fname,MYF(MY_WME)))
1789
 
    {
1790
 
      die("Delete of %s failed, aborting", fname);
1791
 
    }
1792
 
  open_file(fname,&sra_cache, cache_size);
1793
 
  for (i = 0; i < num_loops; i++)
1794
 
  {
1795
 
    char buf[4];
1796
 
    int block_size = abs(rand() % max_block);
1797
 
    int4store(buf, block_size);
1798
 
    if (my_b_append(&sra_cache,buf,4) ||
1799
 
        my_b_append(&sra_cache, block, block_size))
1800
 
      die("write failed");
1801
 
    total_bytes += 4+block_size;
1802
 
  }
1803
 
  close_file(&sra_cache);
1804
 
  free(block);
1805
 
  if (!my_stat(fname,&status,MYF(MY_WME)))
1806
 
    die("%s failed to stat, but I had just closed it,\
1807
 
 wonder how that happened");
1808
 
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
1809
 
         llstr(status.st_size,llstr_buf),
1810
 
         total_bytes);
1811
 
  my_delete(fname, MYF(MY_WME));
1812
 
  /* check correctness of tests */
1813
 
  if (total_bytes != status.st_size)
1814
 
  {
1815
 
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
1816
 
supposedly written\n");
1817
 
    error=1;
1818
 
  }
1819
 
  exit(error);
1820
 
  return 0;
1821
 
}
1822
 
#endif
 
950
} /* namespace internal */
 
951
} /* namespace drizzled */