~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Jay Pipes
  • Date: 2009-11-16 22:00:02 UTC
  • mto: (1234.1.1 push) (1237.2.10 push)
  • mto: This revision was merged to the branch mainline in revision 1229.
  • Revision ID: jpipes@serialcoder-20091116220002-rdsha64utt41i8w8
Adds INFORMATION_SCHEMA views for the transaction log:

TRANSACTION_LOG
TRANSACTION_LOG_ENTRIES
TRANSACTION_LOG_TRANSACTIONS

Adds a new user-defined function:

PRINT_TRANSACTION_MESSAGE(filename, offset)

Adds tests for all of the above

Implementation notes:

An indexer now runs when transaction messages are applied
to the transaction log.  It creates a simple index of the
transaction log entries.  This index is used when the
information schema views' fillTable() method is called.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
54
 
#include "drizzled/drizzled.h"
 
50
#include "mysys/mysys_priv.h"
 
51
#include <mystrings/m_string.h>
55
52
#ifdef HAVE_AIOWAIT
56
 
#include "drizzled/error.h"
57
 
#include "drizzled/internal/aio_result.h"
 
53
#include "mysys/mysys_err.h"
 
54
#include <mysys/aio_result.h>
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include "drizzled/internal/iocache.h"
 
57
#include <mysys/iocache.h>
61
58
#include <errno.h>
62
59
#include <drizzled/util/test.h>
63
60
#include <stdlib.h>
65
62
 
66
63
using namespace std;
67
64
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(register st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(register st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
65
extern "C" {
 
66
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
 
67
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
 
68
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
 
69
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
 
70
}
 
71
 
 
72
#define lock_append_buffer(info) \
 
73
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
74
#define unlock_append_buffer(info) \
 
75
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
76
 
 
77
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
78
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
79
 
 
80
/*
 
81
  Setup internal pointers inside IO_CACHE
 
82
 
 
83
  SYNOPSIS
 
84
    setup_io_cache()
 
85
    info                IO_CACHE handler
 
86
 
 
87
  NOTES
 
88
    This is called on automaticly on init or reinit of IO_CACHE
 
89
    It must be called externally if one moves or copies an IO_CACHE
 
90
    object.
 
91
*/
 
92
 
 
93
void setup_io_cache(IO_CACHE* info)
126
94
{
127
95
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
96
  if (info->type == WRITE_CACHE)
129
97
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
98
    info->current_pos= &info->write_pos;
 
99
    info->current_end= &info->write_end;
132
100
  }
133
101
  else
134
102
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
103
    info->current_pos= &info->read_pos;
 
104
    info->current_end= &info->read_end;
137
105
  }
138
106
}
139
107
 
140
108
 
141
 
void st_io_cache::init_functions()
 
109
static void
 
110
init_functions(IO_CACHE* info)
142
111
{
 
112
  enum cache_type type= info->type;
143
113
  switch (type) {
144
114
  case READ_NET:
145
115
    /*
150
120
      as myisamchk
151
121
    */
152
122
    break;
 
123
  case SEQ_READ_APPEND:
 
124
    info->read_function = _my_b_seq_read;
 
125
    info->write_function = 0;                   /* Force a core if used */
 
126
    break;
153
127
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
128
    info->read_function =
 
129
                          info->share ? _my_b_read_r :
 
130
                                        _my_b_read;
 
131
    info->write_function = _my_b_write;
156
132
  }
157
133
 
158
 
  setup_io_cache();
 
134
  setup_io_cache(info);
159
135
}
160
136
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
137
 
 
138
/*
 
139
  Initialize an IO_CACHE object
 
140
 
 
141
  SYNOPSOS
 
142
    init_io_cache()
 
143
    info                cache handler to initialize
 
144
    file                File that should be associated to to the handler
 
145
                        If == -1 then real_open_cached_file()
 
146
                        will be called when it's time to open file.
 
147
    cachesize           Size of buffer to allocate for read/write
 
148
                        If == 0 then use my_default_record_cache_size
 
149
    type                Type of cache
 
150
    seek_offset         Where cache should start reading/writing
 
151
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
152
    cache_myflags       Bitmap of differnt flags
 
153
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
154
                        MY_DONT_CHECK_FILESIZE
 
155
 
 
156
  RETURN
 
157
    0  ok
 
158
    #  error
 
159
*/
 
160
 
 
161
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
162
                  enum cache_type type, my_off_t seek_offset,
 
163
                  bool use_async_io, myf cache_myflags)
181
164
{
182
165
  size_t min_cache;
183
166
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
167
  my_off_t end_of_file= ~(my_off_t) 0;
185
168
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
169
  info->file= file;
 
170
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
171
  info->pos_in_file= seek_offset;
 
172
  info->pre_close = info->pre_read = info->post_read = 0;
 
173
  info->arg = 0;
 
174
  info->alloced_buffer = 0;
 
175
  info->buffer=0;
 
176
  info->seek_not_done= 0;
194
177
 
195
178
  if (file >= 0)
196
179
  {
197
180
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
181
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
199
182
    {
200
183
      /*
201
184
         This kind of object doesn't support seek() or tell(). Don't set a
202
185
         flag that will make us again try to seek() later and fail.
203
186
      */
204
 
      seek_not_done= 0;
 
187
      info->seek_not_done= 0;
205
188
      /*
206
189
        Additionally, if we're supposed to start somewhere other than the
207
190
        the beginning of whatever this file is, then somebody made a bad
210
193
      assert(seek_offset == 0);
211
194
    }
212
195
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
196
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
214
197
  }
215
198
 
 
199
  info->share=0;
 
200
 
216
201
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
202
    return(1);                          /* No cache requested */
218
203
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
204
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
205
  {                                             /* Assume file isn't growing */
221
206
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
207
    {
223
208
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
209
      end_of_file=lseek(file,0L,SEEK_END);
225
210
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
211
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
212
      if (end_of_file < seek_offset)
 
213
        end_of_file=seek_offset;
229
214
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
215
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
216
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
217
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
218
        use_async_io=0;                         /* No need to use async */
234
219
      }
235
220
    }
236
221
  }
237
222
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
223
  if (type != READ_NET && type != WRITE_NET)
239
224
  {
240
225
    /* Retry allocating memory in smaller blocks until we get one */
241
226
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
230
      if (cachesize < min_cache)
246
231
        cachesize = min_cache;
247
232
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
 
233
      if (type == SEQ_READ_APPEND)
 
234
        buffer_block *= 2;
 
235
      if ((info->buffer=
255
236
           (unsigned char*) malloc(buffer_block)) != 0)
256
237
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
238
        info->write_buffer=info->buffer;
 
239
        if (type == SEQ_READ_APPEND)
 
240
          info->write_buffer = info->buffer + cachesize;
 
241
        info->alloced_buffer=1;
259
242
        break;                                  /* Enough memory found */
260
243
      }
261
244
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
245
        return(2);                              /* Can't alloc cache */
267
246
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
247
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
248
    }
272
249
  }
273
250
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
251
  info->read_length=info->buffer_length=cachesize;
 
252
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
253
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
254
  if (type == SEQ_READ_APPEND)
 
255
  {
 
256
    info->append_read_pos = info->write_pos = info->write_buffer;
 
257
    info->write_end = info->write_buffer + info->buffer_length;
 
258
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
259
  }
277
260
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
261
  if (type == WRITE_CACHE)
 
262
    info->write_end=
 
263
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
264
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
265
    info->read_end=info->buffer;                /* Nothing in cache */
283
266
 
284
267
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
268
  info->end_of_file= end_of_file;
 
269
  info->error=0;
 
270
  info->type= type;
 
271
  init_functions(info);
289
272
#ifdef HAVE_AIOWAIT
290
273
  if (use_async_io && ! my_disable_async_io)
291
274
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
275
    info->read_length/=2;
 
276
    info->read_function=_my_b_async_read;
294
277
  }
295
 
  inited= aio_result.pending= 0;
 
278
  info->inited=info->aio_result.pending=0;
296
279
#endif
297
 
  return 0;
 
280
  return(0);
298
281
}                                               /* init_io_cache */
299
282
 
300
283
        /* Wait until current request is ready */
319
302
        break;
320
303
    }
321
304
  }
 
305
  return;
322
306
}
323
307
#endif
324
308
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
309
 
 
310
/*
 
311
  Use this to reset cache to re-start reading or to change the type
 
312
  between READ_CACHE <-> WRITE_CACHE
 
313
  If we are doing a reinit of a cache where we have the start of the file
 
314
  in the cache, we are reusing this memory without flushing it to disk.
 
315
*/
 
316
 
 
317
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
318
                        my_off_t seek_offset,
 
319
                        bool use_async_io,
 
320
                        bool clear_cache)
339
321
{
340
322
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
323
  assert(type != READ_NET && info->type != READ_NET &&
 
324
              type != WRITE_NET && info->type != WRITE_NET &&
 
325
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
326
 
344
327
  /* If the whole file is in memory, avoid flushing to disk */
345
328
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
329
      seek_offset >= info->pos_in_file &&
 
330
      seek_offset <= my_b_tell(info))
348
331
  {
349
332
    /* Reuse current buffer without flushing it to disk */
350
333
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
334
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
335
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
336
      info->read_end=info->write_pos;
 
337
      info->end_of_file=my_b_tell(info);
355
338
      /*
356
339
        Trigger a new seek only if we have a valid
357
340
        file handle.
358
341
      */
359
 
      seek_not_done= (file != -1);
 
342
      info->seek_not_done= (info->file != -1);
360
343
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
344
    else if (type == WRITE_CACHE)
362
345
    {
363
 
      if (type == READ_CACHE)
 
346
      if (info->type == READ_CACHE)
364
347
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
348
        info->write_end=info->write_buffer+info->buffer_length;
 
349
        info->seek_not_done=1;
367
350
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
351
      info->end_of_file = ~(my_off_t) 0;
369
352
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
353
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
354
    if (type == WRITE_CACHE)
 
355
      info->write_pos=pos;
373
356
    else
374
 
      read_pos= pos;
 
357
      info->read_pos= pos;
375
358
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
359
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
360
#endif
378
361
  }
379
362
  else
382
365
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
366
      after the current positions should be ignored
384
367
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
368
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
369
      info->end_of_file=my_b_tell(info);
387
370
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
371
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
372
      return(1);
 
373
    info->pos_in_file=seek_offset;
391
374
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
375
    info->seek_not_done=1;
 
376
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
377
    if (type == READ_CACHE)
395
378
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
379
      info->read_end=info->buffer;              /* Nothing in cache */
397
380
    }
398
381
    else
399
382
    {
400
 
      write_end=(buffer + buffer_length -
 
383
      info->write_end=(info->buffer + info->buffer_length -
401
384
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
385
      info->end_of_file= ~(my_off_t) 0;
403
386
    }
404
387
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
388
  info->type=type;
 
389
  info->error=0;
 
390
  init_functions(info);
408
391
 
409
392
#ifdef HAVE_AIOWAIT
410
393
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
394
      ((uint32_t) info->buffer_length <
 
395
       (uint32_t) (info->end_of_file - seek_offset)))
413
396
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
397
    info->read_length=info->buffer_length/2;
 
398
    info->read_function=_my_b_async_read;
416
399
  }
417
 
  inited= 0;
 
400
  info->inited=0;
418
401
#else
419
402
  (void)use_async_io;
420
403
#endif
421
 
  return 0;
 
404
  return(0);
422
405
} /* reinit_io_cache */
423
406
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
407
 
 
408
 
 
409
/*
 
410
  Read buffered.
 
411
 
 
412
  SYNOPSIS
 
413
    _my_b_read()
 
414
      info                      IO_CACHE pointer
 
415
      Buffer                    Buffer to retrieve count bytes from file
 
416
      Count                     Number of bytes to read into Buffer
 
417
 
 
418
  NOTE
 
419
    This function is only called from the my_b_read() macro when there
 
420
    isn't enough characters in the buffer to satisfy the request.
 
421
 
 
422
  WARNING
 
423
 
 
424
    When changing this function, be careful with handling file offsets
 
425
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
426
    types than my_off_t unless you can be sure that their value fits.
 
427
    Same applies to differences of file offsets.
 
428
 
 
429
    When changing this function, check _my_b_read_r(). It might need the
 
430
    same change.
 
431
 
 
432
  RETURN
 
433
    0      we succeeded in reading all data
 
434
    1      Error: can't read requested characters
 
435
*/
 
436
 
 
437
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
445
438
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
439
  size_t length,diff_length,left_length, max_length;
 
440
  my_off_t pos_in_file;
448
441
 
449
442
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
443
  {
455
448
  }
456
449
 
457
450
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
451
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
452
 
460
453
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
454
    Whenever a function which operates on IO_CACHE flushes/writes
 
455
    some part of the IO_CACHE to disk it will set the property
463
456
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
457
    on the IO_CACHE.
465
458
  */
466
459
  if (info->seek_not_done)
467
460
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
461
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
469
462
    {
470
463
      /* No error, reset seek_not_done flag. */
471
464
      info->seek_not_done= 0;
477
470
        info->file is a pipe or socket or FIFO.  We never should have tried
478
471
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
472
      */
480
 
      assert(errno != ESPIPE);
 
473
      assert(my_errno != ESPIPE);
481
474
      info->error= -1;
482
475
      return(1);
483
476
    }
484
477
  }
485
478
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
479
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
480
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
481
  {                                     /* Fill first intern buffer */
489
482
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
483
    if (info->end_of_file <= pos_in_file)
491
484
    {                                   /* End of file */
492
485
      info->error= (int) left_length;
493
486
      return(1);
494
487
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
488
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
489
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
490
        != length)
497
491
    {
498
492
      info->error= (read_length == (size_t) -1 ? -1 :
499
493
                    (int) (read_length+left_length));
500
494
      return(1);
501
495
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
496
    Count-=length;
 
497
    Buffer+=length;
 
498
    pos_in_file+=length;
 
499
    left_length+=length;
506
500
    diff_length=0;
507
501
  }
508
502
 
509
503
  max_length= info->read_length-diff_length;
510
504
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
505
      max_length > (info->end_of_file - pos_in_file))
 
506
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
507
  if (!max_length)
514
508
  {
515
509
    if (Count)
517
511
      info->error= static_cast<int>(left_length);       /* We only got this many char */
518
512
      return(1);
519
513
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
514
    length=0;                           /* Didn't read any chars */
521
515
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
516
  else if ((length= my_read(info->file,info->buffer, max_length,
523
517
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
518
           length == (size_t) -1)
525
519
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
520
    if (length != (size_t) -1)
 
521
      memcpy(Buffer, info->buffer, length);
 
522
    info->pos_in_file= pos_in_file;
 
523
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
524
    info->read_pos=info->read_end=info->buffer;
531
525
    return(1);
532
526
  }
533
527
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
528
  info->read_end=info->buffer+length;
 
529
  info->pos_in_file=pos_in_file;
536
530
  memcpy(Buffer, info->buffer, Count);
537
531
  return(0);
538
532
}
539
533
 
540
534
 
 
535
/*
 
536
  Prepare IO_CACHE for shared use.
 
537
 
 
538
  SYNOPSIS
 
539
    init_io_cache_share()
 
540
      read_cache                A read cache. This will be copied for
 
541
                                every thread after setup.
 
542
      cshare                    The share.
 
543
      write_cache               If non-NULL a write cache that is to be
 
544
                                synchronized with the read caches.
 
545
      num_threads               Number of threads sharing the cache
 
546
                                including the write thread if any.
 
547
 
 
548
  DESCRIPTION
 
549
 
 
550
    The shared cache is used so: One IO_CACHE is initialized with
 
551
    init_io_cache(). This includes the allocation of a buffer. Then a
 
552
    share is allocated and init_io_cache_share() is called with the io
 
553
    cache and the share. Then the io cache is copied for each thread. So
 
554
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
555
    is shared because cache->buffer is the same for all caches.
 
556
 
 
557
    One thread reads data from the file into the buffer. All threads
 
558
    read from the buffer, but every thread maintains its own set of
 
559
    pointers into the buffer. When all threads have used up the buffer
 
560
    contents, one of the threads reads the next block of data into the
 
561
    buffer. To accomplish this, each thread enters the cache lock before
 
562
    accessing the buffer. They wait in lock_io_cache() until all threads
 
563
    joined the lock. The last thread entering the lock is in charge of
 
564
    reading from file to buffer. It wakes all threads when done.
 
565
 
 
566
    Synchronizing a write cache to the read caches works so: Whenever
 
567
    the write buffer needs a flush, the write thread enters the lock and
 
568
    waits for all other threads to enter the lock too. They do this when
 
569
    they have used up the read buffer. When all threads are in the lock,
 
570
    the write thread copies the write buffer to the read buffer and
 
571
    wakes all threads.
 
572
 
 
573
    share->running_threads is the number of threads not being in the
 
574
    cache lock. When entering lock_io_cache() the number is decreased.
 
575
    When the thread that fills the buffer enters unlock_io_cache() the
 
576
    number is reset to the number of threads. The condition
 
577
    running_threads == 0 means that all threads are in the lock. Bumping
 
578
    up the number to the full count is non-intuitive. But increasing the
 
579
    number by one for each thread that leaves the lock could lead to a
 
580
    solo run of one thread. The last thread to join a lock reads from
 
581
    file to buffer, wakes the other threads, processes the data in the
 
582
    cache and enters the lock again. If no other thread left the lock
 
583
    meanwhile, it would think it's the last one again and read the next
 
584
    block...
 
585
 
 
586
    The share has copies of 'error', 'buffer', 'read_end', and
 
587
    'pos_in_file' from the thread that filled the buffer. We may not be
 
588
    able to access this information directly from its cache because the
 
589
    thread may be removed from the share before the variables could be
 
590
    copied by all other threads. Or, if a write buffer is synchronized,
 
591
    it would change its 'pos_in_file' after waking the other threads,
 
592
    possibly before they could copy its value.
 
593
 
 
594
    However, the 'buffer' variable in the share is for a synchronized
 
595
    write cache. It needs to know where to put the data. Otherwise it
 
596
    would need access to the read cache of one of the threads that is
 
597
    not yet removed from the share.
 
598
 
 
599
  RETURN
 
600
    void
 
601
*/
 
602
 
 
603
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
604
                         IO_CACHE *write_cache, uint32_t num_threads)
 
605
{
 
606
  assert(num_threads > 1);
 
607
  assert(read_cache->type == READ_CACHE);
 
608
  assert(!write_cache || (write_cache->type == WRITE_CACHE));
 
609
 
 
610
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
611
  pthread_cond_init(&cshare->cond, 0);
 
612
  pthread_cond_init(&cshare->cond_writer, 0);
 
613
 
 
614
  cshare->running_threads= num_threads;
 
615
  cshare->total_threads=   num_threads;
 
616
  cshare->error=           0;    /* Initialize. */
 
617
  cshare->buffer=          read_cache->buffer;
 
618
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
619
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
620
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
621
 
 
622
  read_cache->share=         cshare;
 
623
  read_cache->read_function= _my_b_read_r;
 
624
  read_cache->current_pos=   NULL;
 
625
  read_cache->current_end=   NULL;
 
626
 
 
627
  if (write_cache)
 
628
    write_cache->share= cshare;
 
629
 
 
630
  return;
 
631
}
 
632
 
 
633
 
 
634
/*
 
635
  Remove a thread from shared access to IO_CACHE.
 
636
 
 
637
  SYNOPSIS
 
638
    remove_io_thread()
 
639
      cache                     The IO_CACHE to be removed from the share.
 
640
 
 
641
  NOTE
 
642
 
 
643
    Every thread must do that on exit for not to deadlock other threads.
 
644
 
 
645
    The last thread destroys the pthread resources.
 
646
 
 
647
    A writer flushes its cache first.
 
648
 
 
649
  RETURN
 
650
    void
 
651
*/
 
652
 
 
653
void remove_io_thread(IO_CACHE *cache)
 
654
{
 
655
  IO_CACHE_SHARE *cshare= cache->share;
 
656
  uint32_t total;
 
657
 
 
658
  /* If the writer goes, it needs to flush the write cache. */
 
659
  if (cache == cshare->source_cache)
 
660
    flush_io_cache(cache);
 
661
 
 
662
  pthread_mutex_lock(&cshare->mutex);
 
663
 
 
664
  /* Remove from share. */
 
665
  total= --cshare->total_threads;
 
666
 
 
667
  /* Detach from share. */
 
668
  cache->share= NULL;
 
669
 
 
670
  /* If the writer goes, let the readers know. */
 
671
  if (cache == cshare->source_cache)
 
672
  {
 
673
    cshare->source_cache= NULL;
 
674
  }
 
675
 
 
676
  /* If all threads are waiting for me to join the lock, wake them. */
 
677
  if (!--cshare->running_threads)
 
678
  {
 
679
    pthread_cond_signal(&cshare->cond_writer);
 
680
    pthread_cond_broadcast(&cshare->cond);
 
681
  }
 
682
 
 
683
  pthread_mutex_unlock(&cshare->mutex);
 
684
 
 
685
  if (!total)
 
686
  {
 
687
    pthread_cond_destroy (&cshare->cond_writer);
 
688
    pthread_cond_destroy (&cshare->cond);
 
689
    pthread_mutex_destroy(&cshare->mutex);
 
690
  }
 
691
 
 
692
  return;
 
693
}
 
694
 
 
695
 
 
696
/*
 
697
  Lock IO cache and wait for all other threads to join.
 
698
 
 
699
  SYNOPSIS
 
700
    lock_io_cache()
 
701
      cache                     The cache of the thread entering the lock.
 
702
      pos                       File position of the block to read.
 
703
                                Unused for the write thread.
 
704
 
 
705
  DESCRIPTION
 
706
 
 
707
    Wait for all threads to finish with the current buffer. We want
 
708
    all threads to proceed in concert. The last thread to join
 
709
    lock_io_cache() will read the block from file and all threads start
 
710
    to use it. Then they will join again for reading the next block.
 
711
 
 
712
    The waiting threads detect a fresh buffer by comparing
 
713
    cshare->pos_in_file with the position they want to process next.
 
714
    Since the first block may start at position 0, we take
 
715
    cshare->read_end as an additional condition. This variable is
 
716
    initialized to NULL and will be set after a block of data is written
 
717
    to the buffer.
 
718
 
 
719
  RETURN
 
720
    1           OK, lock in place, go ahead and read.
 
721
    0           OK, unlocked, another thread did the read.
 
722
*/
 
723
 
 
724
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
725
{
 
726
  IO_CACHE_SHARE *cshare= cache->share;
 
727
 
 
728
  /* Enter the lock. */
 
729
  pthread_mutex_lock(&cshare->mutex);
 
730
  cshare->running_threads--;
 
731
 
 
732
  if (cshare->source_cache)
 
733
  {
 
734
    /* A write cache is synchronized to the read caches. */
 
735
 
 
736
    if (cache == cshare->source_cache)
 
737
    {
 
738
      /* The writer waits until all readers are here. */
 
739
      while (cshare->running_threads)
 
740
      {
 
741
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
742
      }
 
743
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
744
      return(1);
 
745
    }
 
746
 
 
747
    /* The last thread wakes the writer. */
 
748
    if (!cshare->running_threads)
 
749
    {
 
750
      pthread_cond_signal(&cshare->cond_writer);
 
751
    }
 
752
 
 
753
    /*
 
754
      Readers wait until the data is copied from the writer. Another
 
755
      reason to stop waiting is the removal of the write thread. If this
 
756
      happens, we leave the lock with old data in the buffer.
 
757
    */
 
758
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
759
           cshare->source_cache)
 
760
    {
 
761
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
762
    }
 
763
 
 
764
    /*
 
765
      If the writer was removed from the share while this thread was
 
766
      asleep, we need to simulate an EOF condition. The writer cannot
 
767
      reset the share variables as they might still be in use by readers
 
768
      of the last block. When we awake here then because the last
 
769
      joining thread signalled us. If the writer is not the last, it
 
770
      will not signal. So it is safe to clear the buffer here.
 
771
    */
 
772
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
773
    {
 
774
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
775
      cshare->error= 0; /* EOF is not an error. */
 
776
    }
 
777
  }
 
778
  else
 
779
  {
 
780
    /*
 
781
      There are read caches only. The last thread arriving in
 
782
      lock_io_cache() continues with a locked cache and reads the block.
 
783
    */
 
784
    if (!cshare->running_threads)
 
785
    {
 
786
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
787
      return(1);
 
788
    }
 
789
 
 
790
    /*
 
791
      All other threads wait until the requested block is read by the
 
792
      last thread arriving. Another reason to stop waiting is the
 
793
      removal of a thread. If this leads to all threads being in the
 
794
      lock, we have to continue also. The first of the awaken threads
 
795
      will then do the read.
 
796
    */
 
797
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
798
           cshare->running_threads)
 
799
    {
 
800
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
801
    }
 
802
 
 
803
    /* If the block is not yet read, continue with a locked cache and read. */
 
804
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
805
    {
 
806
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
807
      return(1);
 
808
    }
 
809
 
 
810
    /* Another thread did read the block already. */
 
811
  }
 
812
 
 
813
  /*
 
814
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
815
    filled the buffer did this and marked all threads as running.
 
816
  */
 
817
  pthread_mutex_unlock(&cshare->mutex);
 
818
  return(0);
 
819
}
 
820
 
 
821
 
 
822
/*
 
823
  Unlock IO cache.
 
824
 
 
825
  SYNOPSIS
 
826
    unlock_io_cache()
 
827
      cache                     The cache of the thread leaving the lock.
 
828
 
 
829
  NOTE
 
830
    This is called by the thread that filled the buffer. It marks all
 
831
    threads as running and awakes them. This must not be done by any
 
832
    other thread.
 
833
 
 
834
    Do not signal cond_writer. Either there is no writer or the writer
 
835
    is the only one who can call this function.
 
836
 
 
837
    The reason for resetting running_threads to total_threads before
 
838
    waking all other threads is that it could be possible that this
 
839
    thread is so fast with processing the buffer that it enters the lock
 
840
    before even one other thread has left it. If every awoken thread
 
841
    would increase running_threads by one, this thread could think that
 
842
    he is again the last to join and would not wait for the other
 
843
    threads to process the data.
 
844
 
 
845
  RETURN
 
846
    void
 
847
*/
 
848
 
 
849
static void unlock_io_cache(IO_CACHE *cache)
 
850
{
 
851
  IO_CACHE_SHARE *cshare= cache->share;
 
852
 
 
853
  cshare->running_threads= cshare->total_threads;
 
854
  pthread_cond_broadcast(&cshare->cond);
 
855
  pthread_mutex_unlock(&cshare->mutex);
 
856
  return;
 
857
}
 
858
 
 
859
 
 
860
/*
 
861
  Read from IO_CACHE when it is shared between several threads.
 
862
 
 
863
  SYNOPSIS
 
864
    _my_b_read_r()
 
865
      cache                     IO_CACHE pointer
 
866
      Buffer                    Buffer to retrieve count bytes from file
 
867
      Count                     Number of bytes to read into Buffer
 
868
 
 
869
  NOTE
 
870
    This function is only called from the my_b_read() macro when there
 
871
    isn't enough characters in the buffer to satisfy the request.
 
872
 
 
873
  IMPLEMENTATION
 
874
 
 
875
    It works as follows: when a thread tries to read from a file (that
 
876
    is, after using all the data from the (shared) buffer), it just
 
877
    hangs on lock_io_cache(), waiting for other threads. When the very
 
878
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
879
    does actual IO and unlock_io_cache(), which signals all the waiting
 
880
    threads that data is in the buffer.
 
881
 
 
882
  WARNING
 
883
 
 
884
    When changing this function, be careful with handling file offsets
 
885
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
886
    types than my_off_t unless you can be sure that their value fits.
 
887
    Same applies to differences of file offsets. (Bug #11527)
 
888
 
 
889
    When changing this function, check _my_b_read(). It might need the
 
890
    same change.
 
891
 
 
892
  RETURN
 
893
    0      we succeeded in reading all data
 
894
    1      Error: can't read requested characters
 
895
*/
 
896
 
 
897
extern "C" int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
898
{
 
899
  my_off_t pos_in_file;
 
900
  size_t length, diff_length, left_length;
 
901
  IO_CACHE_SHARE *cshare= cache->share;
 
902
 
 
903
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
904
  {
 
905
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
906
    memcpy(Buffer, cache->read_pos, left_length);
 
907
    Buffer+= left_length;
 
908
    Count-= left_length;
 
909
  }
 
910
  while (Count)
 
911
  {
 
912
    size_t cnt, len;
 
913
 
 
914
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
915
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
916
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
917
    length= ((length <= cache->read_length) ?
 
918
             length + IO_ROUND_DN(cache->read_length - length) :
 
919
             length - IO_ROUND_UP(length - cache->read_length));
 
920
    if (cache->type != READ_FIFO &&
 
921
        (length > (cache->end_of_file - pos_in_file)))
 
922
      length= (size_t) (cache->end_of_file - pos_in_file);
 
923
    if (length == 0)
 
924
    {
 
925
      cache->error= (int) left_length;
 
926
      return(1);
 
927
    }
 
928
    if (lock_io_cache(cache, pos_in_file))
 
929
    {
 
930
      /* With a synchronized write/read cache we won't come here... */
 
931
      assert(!cshare->source_cache);
 
932
      /*
 
933
        ... unless the writer has gone before this thread entered the
 
934
        lock. Simulate EOF in this case. It can be distinguished by
 
935
        cache->file.
 
936
      */
 
937
      if (cache->file < 0)
 
938
        len= 0;
 
939
      else
 
940
      {
 
941
        /*
 
942
          Whenever a function which operates on IO_CACHE flushes/writes
 
943
          some part of the IO_CACHE to disk it will set the property
 
944
          "seek_not_done" to indicate this to other functions operating
 
945
          on the IO_CACHE.
 
946
        */
 
947
        if (cache->seek_not_done)
 
948
        {
 
949
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
950
          {
 
951
            cache->error= -1;
 
952
            unlock_io_cache(cache);
 
953
            return(1);
 
954
          }
 
955
        }
 
956
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
957
      }
 
958
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
959
      cache->error=       (len == length ? 0 : (int) len);
 
960
      cache->pos_in_file= pos_in_file;
 
961
 
 
962
      /* Copy important values to the share. */
 
963
      cshare->error=       cache->error;
 
964
      cshare->read_end=    cache->read_end;
 
965
      cshare->pos_in_file= pos_in_file;
 
966
 
 
967
      /* Mark all threads as running and wake them. */
 
968
      unlock_io_cache(cache);
 
969
    }
 
970
    else
 
971
    {
 
972
      /*
 
973
        With a synchronized write/read cache readers always come here.
 
974
        Copy important values from the share.
 
975
      */
 
976
      cache->error=       cshare->error;
 
977
      cache->read_end=    cshare->read_end;
 
978
      cache->pos_in_file= cshare->pos_in_file;
 
979
 
 
980
      len= ((cache->error == -1) ? (size_t) -1 :
 
981
            (size_t) (cache->read_end - cache->buffer));
 
982
    }
 
983
    cache->read_pos=      cache->buffer;
 
984
    cache->seek_not_done= 0;
 
985
    if (len == 0 || len == (size_t) -1)
 
986
    {
 
987
      cache->error= (int) left_length;
 
988
      return(1);
 
989
    }
 
990
    cnt= (len > Count) ? Count : len;
 
991
    memcpy(Buffer, cache->read_pos, cnt);
 
992
    Count -= cnt;
 
993
    Buffer+= cnt;
 
994
    left_length+= cnt;
 
995
    cache->read_pos+= cnt;
 
996
  }
 
997
  return(0);
 
998
}
 
999
 
 
1000
 
 
1001
/*
 
1002
  Copy data from write cache to read cache.
 
1003
 
 
1004
  SYNOPSIS
 
1005
    copy_to_read_buffer()
 
1006
      write_cache               The write cache.
 
1007
      write_buffer              The source of data, mostly the cache buffer.
 
1008
      write_length              The number of bytes to copy.
 
1009
 
 
1010
  NOTE
 
1011
    The write thread will wait for all read threads to join the cache
 
1012
    lock. Then it copies the data over and wakes the read threads.
 
1013
 
 
1014
  RETURN
 
1015
    void
 
1016
*/
 
1017
 
 
1018
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1019
                                const unsigned char *write_buffer, size_t write_length)
 
1020
{
 
1021
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1022
 
 
1023
  assert(cshare->source_cache == write_cache);
 
1024
  /*
 
1025
    write_length is usually less or equal to buffer_length.
 
1026
    It can be bigger if _my_b_write() is called with a big length.
 
1027
  */
 
1028
  while (write_length)
 
1029
  {
 
1030
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1031
    int  rc;
 
1032
 
 
1033
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1034
    /* The writing thread does always have the lock when it awakes. */
 
1035
    assert(rc);
 
1036
 
 
1037
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1038
 
 
1039
    cshare->error=       0;
 
1040
    cshare->read_end=    cshare->buffer + copy_length;
 
1041
    cshare->pos_in_file= write_cache->pos_in_file;
 
1042
 
 
1043
    /* Mark all threads as running and wake them. */
 
1044
    unlock_io_cache(write_cache);
 
1045
 
 
1046
    write_buffer+= copy_length;
 
1047
    write_length-= copy_length;
 
1048
  }
 
1049
}
 
1050
 
 
1051
 
 
1052
/*
 
1053
  Do sequential read from the SEQ_READ_APPEND cache.
 
1054
 
 
1055
  We do this in three stages:
 
1056
   - first read from info->buffer
 
1057
   - then if there are still data to read, try the file descriptor
 
1058
   - afterwards, if there are still data to read, try append buffer
 
1059
 
 
1060
  RETURNS
 
1061
    0  Success
 
1062
    1  Failed to read
 
1063
*/
 
1064
 
 
1065
extern "C" int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1066
{
 
1067
  size_t length, diff_length, left_length, save_count, max_length;
 
1068
  my_off_t pos_in_file;
 
1069
  save_count=Count;
 
1070
 
 
1071
  /* first, read the regular buffer */
 
1072
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1073
  {
 
1074
    assert(Count > left_length);        /* User is not using my_b_read() */
 
1075
    memcpy(Buffer,info->read_pos, left_length);
 
1076
    Buffer+=left_length;
 
1077
    Count-=left_length;
 
1078
  }
 
1079
  lock_append_buffer(info);
 
1080
 
 
1081
  /* pos_in_file always point on where info->buffer was read */
 
1082
  if ((pos_in_file=info->pos_in_file +
 
1083
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1084
    goto read_append_buffer;
 
1085
 
 
1086
  /*
 
1087
    With read-append cache we must always do a seek before we read,
 
1088
    because the write could have moved the file pointer astray
 
1089
  */
 
1090
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1091
  {
 
1092
   info->error= -1;
 
1093
   unlock_append_buffer(info);
 
1094
   return (1);
 
1095
  }
 
1096
  info->seek_not_done=0;
 
1097
 
 
1098
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1099
 
 
1100
  /* now the second stage begins - read from file descriptor */
 
1101
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1102
  {
 
1103
    /* Fill first intern buffer */
 
1104
    size_t read_length;
 
1105
 
 
1106
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1107
    if ((read_length= my_read(info->file,Buffer, length,
 
1108
                              info->myflags)) == (size_t) -1)
 
1109
    {
 
1110
      info->error= -1;
 
1111
      unlock_append_buffer(info);
 
1112
      return 1;
 
1113
    }
 
1114
    Count-=read_length;
 
1115
    Buffer+=read_length;
 
1116
    pos_in_file+=read_length;
 
1117
 
 
1118
    if (read_length != length)
 
1119
    {
 
1120
      /*
 
1121
        We only got part of data;  Read the rest of the data from the
 
1122
        write buffer
 
1123
      */
 
1124
      goto read_append_buffer;
 
1125
    }
 
1126
    left_length+=length;
 
1127
    diff_length=0;
 
1128
  }
 
1129
 
 
1130
  max_length= info->read_length-diff_length;
 
1131
  if (max_length > (info->end_of_file - pos_in_file))
 
1132
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1133
  if (!max_length)
 
1134
  {
 
1135
    if (Count)
 
1136
      goto read_append_buffer;
 
1137
    length=0;                           /* Didn't read any more chars */
 
1138
  }
 
1139
  else
 
1140
  {
 
1141
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1142
    if (length == (size_t) -1)
 
1143
    {
 
1144
      info->error= -1;
 
1145
      unlock_append_buffer(info);
 
1146
      return 1;
 
1147
    }
 
1148
    if (length < Count)
 
1149
    {
 
1150
      memcpy(Buffer, info->buffer, length);
 
1151
      Count -= length;
 
1152
      Buffer += length;
 
1153
 
 
1154
      /*
 
1155
         added the line below to make
 
1156
         assert(pos_in_file==info->end_of_file) pass.
 
1157
         otherwise this does not appear to be needed
 
1158
      */
 
1159
      pos_in_file += length;
 
1160
      goto read_append_buffer;
 
1161
    }
 
1162
  }
 
1163
  unlock_append_buffer(info);
 
1164
  info->read_pos=info->buffer+Count;
 
1165
  info->read_end=info->buffer+length;
 
1166
  info->pos_in_file=pos_in_file;
 
1167
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1168
  return 0;
 
1169
 
 
1170
read_append_buffer:
 
1171
 
 
1172
  /*
 
1173
     Read data from the current write buffer.
 
1174
     Count should never be == 0 here (The code will work even if count is 0)
 
1175
  */
 
1176
 
 
1177
  {
 
1178
    /* First copy the data to Count */
 
1179
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1180
    size_t copy_len;
 
1181
    size_t transfer_len;
 
1182
 
 
1183
    assert(info->append_read_pos <= info->write_pos);
 
1184
    /*
 
1185
      TODO: figure out if the assert below is needed or correct.
 
1186
    */
 
1187
    assert(pos_in_file == info->end_of_file);
 
1188
    copy_len=min(Count, len_in_buff);
 
1189
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1190
    info->append_read_pos += copy_len;
 
1191
    Count -= copy_len;
 
1192
    if (Count)
 
1193
      info->error = static_cast<int>(save_count - Count);
 
1194
 
 
1195
    /* Fill read buffer with data from write buffer */
 
1196
    memcpy(info->buffer, info->append_read_pos,
 
1197
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1198
    info->read_pos= info->buffer;
 
1199
    info->read_end= info->buffer+transfer_len;
 
1200
    info->append_read_pos=info->write_pos;
 
1201
    info->pos_in_file=pos_in_file+copy_len;
 
1202
    info->end_of_file+=len_in_buff;
 
1203
  }
 
1204
  unlock_append_buffer(info);
 
1205
  return Count ? 1 : 0;
 
1206
}
 
1207
 
 
1208
 
541
1209
#ifdef HAVE_AIOWAIT
542
1210
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1211
/*
 
1212
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1213
  from disk when needed.
 
1214
 
 
1215
  SYNOPSIS
 
1216
    _my_b_async_read()
 
1217
      info                      IO_CACHE pointer
 
1218
      Buffer                    Buffer to retrieve count bytes from file
 
1219
      Count                     Number of bytes to read into Buffer
 
1220
 
 
1221
  RETURN VALUE
 
1222
    -1          An error has occurred; my_errno is set.
 
1223
     0          Success
 
1224
     1          An error has occurred; IO_CACHE to error state.
 
1225
*/
 
1226
 
 
1227
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
556
1228
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1229
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1230
  size_t max_length;
559
1231
  my_off_t next_pos_in_file;
560
1232
  unsigned char *read_buffer;
575
1247
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1248
                 my_filename(info->file),
577
1249
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1250
      my_errno=info->aio_result.result.aio_errno;
579
1251
      info->error= -1;
580
1252
      return(1);
581
1253
    }
582
1254
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1255
        read_length == (size_t) -1)
584
1256
    {
585
 
      errno=0;                          /* For testing */
 
1257
      my_errno=0;                               /* For testing */
586
1258
      info->error= (read_length == (size_t) -1 ? -1 :
587
1259
                    (int) (read_length+left_length));
588
1260
      return(1);
615
1287
      }
616
1288
    }
617
1289
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1290
    length=min(Count,read_length);
 
1291
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1292
    Buffer+=length;
 
1293
    Count-=length;
 
1294
    left_length+=length;
623
1295
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1296
    info->read_pos+=length;
625
1297
  }
626
1298
  else
627
1299
    next_pos_in_file=(info->pos_in_file+ (size_t)
659
1331
      {                                 /* Didn't find hole block */
660
1332
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1333
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1334
                   my_filename(info->file),my_errno);
663
1335
        info->error=(int) (read_length+left_length);
664
1336
        return 1;
665
1337
      }
695
1367
                (my_off_t) next_pos_in_file,SEEK_SET,
696
1368
                &info->aio_result.result))
697
1369
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1370
      my_errno=errno;
699
1371
      if (info->request_pos != info->buffer)
700
1372
      {
701
1373
        memmove(info->buffer, info->request_pos,
715
1387
#endif
716
1388
 
717
1389
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1390
/* Read one byte when buffer is empty */
 
1391
 
 
1392
int _my_b_get(IO_CACHE *info)
723
1393
{
724
1394
  unsigned char buff;
725
1395
  IO_CACHE_CALLBACK pre_read,post_read;
732
1402
  return (int) (unsigned char) buff;
733
1403
}
734
1404
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1405
/*
 
1406
   Write a byte buffer to IO_CACHE and flush to disk
 
1407
   if IO_CACHE is full.
 
1408
 
 
1409
   RETURN VALUE
 
1410
    1 On error on write
 
1411
    0 On success
 
1412
   -1 On error; my_errno contains error code.
 
1413
*/
 
1414
 
 
1415
extern "C" int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
744
1416
{
745
 
  size_t rest_length,length_local;
 
1417
  size_t rest_length,length;
746
1418
 
747
1419
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1420
  {
749
 
    errno=EFBIG;
 
1421
    my_errno=errno=EFBIG;
750
1422
    return info->error = -1;
751
1423
  }
752
1424
 
760
1432
    return 1;
761
1433
  if (Count >= IO_SIZE)
762
1434
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1435
    length=Count & (size_t) ~(IO_SIZE-1);
764
1436
    if (info->seek_not_done)
765
1437
    {
766
1438
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1439
        Whenever a function which operates on IO_CACHE flushes/writes
 
1440
        some part of the IO_CACHE to disk it will set the property
769
1441
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1442
        on the IO_CACHE.
771
1443
      */
772
1444
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
773
1445
      {
776
1448
      }
777
1449
      info->seek_not_done=0;
778
1450
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
 
1451
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
780
1452
      return info->error= -1;
781
1453
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
 
1454
    /*
 
1455
      In case of a shared I/O cache with a writer we normally do direct
 
1456
      write cache to read cache copy. Simulate this here by direct
 
1457
      caller buffer to read cache copy. Do it after the write so that
 
1458
      the cache readers actions on the flushed part can go in parallel
 
1459
      with the write of the extra stuff. copy_to_read_buffer()
 
1460
      synchronizes writer and readers so that after this call the
 
1461
      readers can act on the extra stuff while the writer can go ahead
 
1462
      and prepare the next output. copy_to_read_buffer() relies on
 
1463
      info->pos_in_file.
 
1464
    */
 
1465
    if (info->share)
 
1466
      copy_to_read_buffer(info, Buffer, length);
 
1467
 
 
1468
    Count-=length;
 
1469
    Buffer+=length;
 
1470
    info->pos_in_file+=length;
785
1471
  }
786
1472
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
1473
  info->write_pos+=Count;
788
1474
  return 0;
789
1475
}
790
1476
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1477
/*
 
1478
  Write a block to disk where part of the data may be inside the record
 
1479
  buffer.  As all write calls to the data goes through the cache,
 
1480
  we will never get a seek over the end of the buffer
 
1481
*/
 
1482
 
 
1483
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
798
1484
                   my_off_t pos)
799
1485
{
800
 
  size_t length_local;
 
1486
  size_t length;
801
1487
  int error=0;
802
1488
 
 
1489
  /*
 
1490
    Assert that we cannot come here with a shared cache. If we do one
 
1491
    day, we might need to add a call to copy_to_read_buffer().
 
1492
  */
 
1493
  assert(!info->share);
 
1494
 
803
1495
  if (pos < info->pos_in_file)
804
1496
  {
805
1497
    /* Of no overlap, write everything without buffering */
806
1498
    if (pos + Count <= info->pos_in_file)
807
1499
      return (pwrite(info->file, Buffer, Count, pos) == 0);
808
1500
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1501
    length= (uint32_t) (info->pos_in_file - pos);
 
1502
    if (pwrite(info->file, Buffer, length, pos) == 0)
811
1503
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1504
    Buffer+=length;
 
1505
    pos+=  length;
 
1506
    Count-= length;
 
1507
#ifndef HAVE_PREAD
 
1508
    info->seek_not_done=1;
 
1509
#endif
815
1510
  }
816
1511
 
817
1512
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1513
  length= (size_t) (info->write_end - info->buffer);
 
1514
  if (pos < info->pos_in_file + length)
820
1515
  {
821
1516
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1517
    length-=offset;
 
1518
    if (length > Count)
 
1519
      length=Count;
 
1520
    memcpy(info->buffer+offset, Buffer, length);
 
1521
    Buffer+=length;
 
1522
    Count-= length;
 
1523
    /* Fix length of buffer if the new data was larger */
 
1524
    if (info->buffer+length > info->write_pos)
 
1525
      info->write_pos=info->buffer+length;
831
1526
    if (!Count)
832
1527
      return (error);
833
1528
  }
837
1532
  return error;
838
1533
}
839
1534
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1535
 
 
1536
        /* Flush write cache */
 
1537
 
 
1538
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1539
  lock_append_buffer(info);
 
1540
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1541
  unlock_append_buffer(info);
 
1542
 
 
1543
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1544
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1545
  size_t length;
 
1546
  bool append_cache;
 
1547
  my_off_t pos_in_file;
 
1548
 
 
1549
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1550
    need_append_buffer_lock=0;
849
1551
 
850
1552
  if (info->type == WRITE_CACHE || append_cache)
851
1553
  {
852
1554
    if (info->file == -1)
853
1555
    {
854
 
      if (info->real_open_cached_file())
 
1556
      if (real_open_cached_file(info))
855
1557
        return((info->error= -1));
856
1558
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1559
    LOCK_APPEND_BUFFER;
858
1560
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1561
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1562
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1563
      /*
 
1564
        In case of a shared I/O cache with a writer we do direct write
 
1565
        cache to read cache copy. Do it before the write here so that
 
1566
        the readers can work in parallel with the write.
 
1567
        copy_to_read_buffer() relies on info->pos_in_file.
 
1568
      */
 
1569
      if (info->share)
 
1570
        copy_to_read_buffer(info, info->write_buffer, length);
 
1571
 
 
1572
      pos_in_file=info->pos_in_file;
862
1573
      /*
863
1574
        If we have append cache, we always open the file with
864
1575
        O_APPEND which moves the pos to EOF automatically on every write
865
1576
      */
866
1577
      if (!append_cache && info->seek_not_done)
867
1578
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1579
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
869
1580
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
 
1581
          UNLOCK_APPEND_BUFFER;
871
1582
          return((info->error= -1));
872
1583
        }
873
1584
        if (!append_cache)
874
1585
          info->seek_not_done=0;
875
1586
      }
876
1587
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1588
        info->pos_in_file+=length;
878
1589
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1590
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1591
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1592
      if (my_write(info->file,info->write_buffer,length,
882
1593
                   info->myflags | MY_NABP))
883
1594
        info->error= -1;
884
1595
      else
885
1596
        info->error= 0;
886
1597
      if (!append_cache)
887
1598
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1599
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1600
      }
890
1601
      else
891
1602
      {
895
1606
      }
896
1607
 
897
1608
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
 
1609
      UNLOCK_APPEND_BUFFER;
899
1610
      return(info->error);
900
1611
    }
901
1612
  }
906
1617
    info->inited=0;
907
1618
  }
908
1619
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
 
1620
  UNLOCK_APPEND_BUFFER;
910
1621
  return(0);
911
1622
}
912
1623
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1624
/*
 
1625
  Free an IO_CACHE object
 
1626
 
 
1627
  SYNOPSOS
 
1628
    end_io_cache()
 
1629
    info                IO_CACHE Handle to free
 
1630
 
 
1631
  NOTES
 
1632
    It's currently safe to call this if one has called init_io_cache()
 
1633
    on the 'info' object, even if init_io_cache() failed.
 
1634
    This function is also safe to call twice with the same handle.
 
1635
 
 
1636
  RETURN
 
1637
   0  ok
 
1638
   #  Error
 
1639
*/
 
1640
 
 
1641
int end_io_cache(IO_CACHE *info)
928
1642
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1643
  int error=0;
 
1644
  IO_CACHE_CALLBACK pre_close;
 
1645
 
 
1646
  /*
 
1647
    Every thread must call remove_io_thread(). The last one destroys
 
1648
    the share elements.
 
1649
  */
 
1650
  assert(!info->share || !info->share->total_threads);
 
1651
 
 
1652
  if ((pre_close=info->pre_close))
 
1653
  {
 
1654
    (*pre_close)(info);
 
1655
    info->pre_close= 0;
 
1656
  }
 
1657
  if (info->alloced_buffer)
 
1658
  {
 
1659
    info->alloced_buffer=0;
 
1660
    if (info->file != -1)                       /* File doesn't exist */
 
1661
      error= my_b_flush_io_cache(info,1);
 
1662
    free((unsigned char*) info->buffer);
 
1663
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1664
  }
 
1665
  if (info->type == SEQ_READ_APPEND)
 
1666
  {
 
1667
    /* Destroy allocated mutex */
 
1668
    info->type= TYPE_NOT_SET;
 
1669
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1670
  }
 
1671
  return(error);
948
1672
} /* end_io_cache */
949
1673
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1674
 
 
1675
/**********************************************************************
 
1676
 Testing of MF_IOCACHE
 
1677
**********************************************************************/
 
1678
 
 
1679
#ifdef MAIN
 
1680
 
 
1681
#include <my_dir.h>
 
1682
 
 
1683
void die(const char* fmt, ...)
 
1684
{
 
1685
  va_list va_args;
 
1686
  va_start(va_args,fmt);
 
1687
  fprintf(stderr,"Error:");
 
1688
  vfprintf(stderr, fmt,va_args);
 
1689
  fprintf(stderr,", errno=%d\n", errno);
 
1690
  exit(1);
 
1691
}
 
1692
 
 
1693
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1694
{
 
1695
  int fd;
 
1696
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1697
    die("Could not open %s", fname);
 
1698
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1699
    die("failed in init_io_cache()");
 
1700
  return fd;
 
1701
}
 
1702
 
 
1703
void close_file(IO_CACHE* info)
 
1704
{
 
1705
  end_io_cache(info);
 
1706
  my_close(info->file, MYF(MY_WME));
 
1707
}
 
1708
 
 
1709
int main(int argc, char** argv)
 
1710
{
 
1711
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1712
  MY_STAT status;
 
1713
  const char* fname="/tmp/iocache.test";
 
1714
  int cache_size=16384;
 
1715
  char llstr_buf[22];
 
1716
  int max_block,total_bytes=0;
 
1717
  int i,num_loops=100,error=0;
 
1718
  char *p;
 
1719
  char* block, *block_end;
 
1720
  MY_INIT(argv[0]);
 
1721
  max_block = cache_size*3;
 
1722
  if (!(block=(char*)malloc(max_block)))
 
1723
    die("Not enough memory to allocate test block");
 
1724
  block_end = block + max_block;
 
1725
  for (p = block,i=0; p < block_end;i++)
 
1726
  {
 
1727
    *p++ = (char)i;
 
1728
  }
 
1729
  if (my_stat(fname,&status, MYF(0)) &&
 
1730
      my_delete(fname,MYF(MY_WME)))
 
1731
    {
 
1732
      die("Delete of %s failed, aborting", fname);
 
1733
    }
 
1734
  open_file(fname,&sra_cache, cache_size);
 
1735
  for (i = 0; i < num_loops; i++)
 
1736
  {
 
1737
    char buf[4];
 
1738
    int block_size = abs(rand() % max_block);
 
1739
    int4store(buf, block_size);
 
1740
    if (my_b_append(&sra_cache,buf,4) ||
 
1741
        my_b_append(&sra_cache, block, block_size))
 
1742
      die("write failed");
 
1743
    total_bytes += 4+block_size;
 
1744
  }
 
1745
  close_file(&sra_cache);
 
1746
  free(block);
 
1747
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1748
    die("%s failed to stat, but I had just closed it,\
 
1749
 wonder how that happened");
 
1750
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1751
         llstr(status.st_size,llstr_buf),
 
1752
         total_bytes);
 
1753
  my_delete(fname, MYF(MY_WME));
 
1754
  /* check correctness of tests */
 
1755
  if (total_bytes != status.st_size)
 
1756
  {
 
1757
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1758
supposedly written\n");
 
1759
    error=1;
 
1760
  }
 
1761
  exit(error);
 
1762
  return 0;
 
1763
}
 
1764
#endif