~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Brian Aker
  • Date: 2008-06-29 20:10:28 UTC
  • mto: This revision was merged to the branch mainline in revision 16.
  • Revision ID: brian@tangent.org-20080629201028-923bdzz0qcjmd6cm
Cleaned up show status.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
  Cashing of files with only does (sequential) read or writes of fixed-
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
54
 
#include "drizzled/drizzled.h"
 
50
#define MAP_TO_USE_RAID
 
51
#include "mysys_priv.h"
 
52
#include <m_string.h>
55
53
#ifdef HAVE_AIOWAIT
56
 
#include "drizzled/error.h"
57
 
#include "drizzled/internal/aio_result.h"
 
54
#include "mysys_err.h"
58
55
static void my_aiowait(my_aio_result *result);
59
56
#endif
60
 
#include "drizzled/internal/iocache.h"
61
57
#include <errno.h>
62
 
#include <drizzled/util/test.h>
63
 
#include <stdlib.h>
64
 
#include <algorithm>
65
 
 
66
 
using namespace std;
67
 
 
68
 
namespace drizzled
69
 
{
70
 
namespace internal
71
 
{
72
 
 
73
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count);
75
 
 
76
 
/**
77
 
 * @brief
78
 
 *   Lock appends for the st_io_cache if required (need_append_buffer_lock)   
79
 
 */
80
 
inline
81
 
static void lock_append_buffer(register st_io_cache *, int )
82
 
{
83
 
}
84
 
 
85
 
/**
86
 
 * @brief
87
 
 *   Unlock appends for the st_io_cache if required (need_append_buffer_lock)   
88
 
 */
89
 
inline
90
 
static void unlock_append_buffer(register st_io_cache *, int )
91
 
{
92
 
}
93
 
 
94
 
/**
95
 
 * @brief
96
 
 *   Round up to the nearest IO_SIZE boundary 
97
 
 */
98
 
inline
99
 
static size_t io_round_up(size_t x)
100
 
{
101
 
  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
102
 
}
103
 
 
104
 
/**
105
 
 * @brief
106
 
 *   Round down to the nearest IO_SIZE boundary   
107
 
 */
108
 
inline
109
 
static size_t io_round_dn(size_t x)
110
 
{
111
 
  return (x & ~(IO_SIZE-1));
112
 
}
113
 
 
114
 
 
115
 
/**
116
 
 * @brief 
117
 
 *   Setup internal pointers inside st_io_cache
118
 
 * 
119
 
 * @details
120
 
 *   This is called on automatically on init or reinit of st_io_cache
121
 
 *   It must be called externally if one moves or copies an st_io_cache object.
122
 
 * 
123
 
 * @param info Cache handler to setup
124
 
 */
125
 
void st_io_cache::setup_io_cache()
 
58
 
 
59
#ifdef THREAD
 
60
#define lock_append_buffer(info) \
 
61
 pthread_mutex_lock(&(info)->append_buffer_lock)
 
62
#define unlock_append_buffer(info) \
 
63
 pthread_mutex_unlock(&(info)->append_buffer_lock)
 
64
#else
 
65
#define lock_append_buffer(info)
 
66
#define unlock_append_buffer(info)
 
67
#endif
 
68
 
 
69
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 
70
#define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
71
 
 
72
/*
 
73
  Setup internal pointers inside IO_CACHE
 
74
 
 
75
  SYNOPSIS
 
76
    setup_io_cache()
 
77
    info                IO_CACHE handler
 
78
 
 
79
  NOTES
 
80
    This is called on automaticly on init or reinit of IO_CACHE
 
81
    It must be called externally if one moves or copies an IO_CACHE
 
82
    object.
 
83
*/
 
84
 
 
85
void setup_io_cache(IO_CACHE* info)
126
86
{
127
87
  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
128
 
  if (type == WRITE_CACHE)
 
88
  if (info->type == WRITE_CACHE)
129
89
  {
130
 
    current_pos= &write_pos;
131
 
    current_end= &write_end;
 
90
    info->current_pos= &info->write_pos;
 
91
    info->current_end= &info->write_end;
132
92
  }
133
93
  else
134
94
  {
135
 
    current_pos= &read_pos;
136
 
    current_end= &read_end;
 
95
    info->current_pos= &info->read_pos;
 
96
    info->current_end= &info->read_end;
137
97
  }
138
98
}
139
99
 
140
100
 
141
 
void st_io_cache::init_functions()
 
101
static void
 
102
init_functions(IO_CACHE* info)
142
103
{
 
104
  enum cache_type type= info->type;
143
105
  switch (type) {
144
106
  case READ_NET:
145
107
    /*
150
112
      as myisamchk
151
113
    */
152
114
    break;
 
115
  case SEQ_READ_APPEND:
 
116
    info->read_function = _my_b_seq_read;
 
117
    info->write_function = 0;                   /* Force a core if used */
 
118
    break;
153
119
  default:
154
 
    read_function = _my_b_read;
155
 
    write_function = _my_b_write;
 
120
    info->read_function =
 
121
#ifdef THREAD
 
122
                          info->share ? _my_b_read_r :
 
123
#endif
 
124
                                        _my_b_read;
 
125
    info->write_function = _my_b_write;
156
126
  }
157
127
 
158
 
  setup_io_cache();
 
128
  setup_io_cache(info);
159
129
}
160
130
 
161
 
/**
162
 
 * @brief
163
 
 *   Initialize an st_io_cache object
164
 
 *
165
 
 * @param file File that should be associated with the handler
166
 
 *                 If == -1 then real_open_cached_file() will be called when it's time to open file.
167
 
 * @param cachesize Size of buffer to allocate for read/write
168
 
 *                      If == 0 then use my_default_record_cache_size
169
 
 * @param type Type of cache
170
 
 * @param seek_offset Where cache should start reading/writing
171
 
 * @param use_async_io Set to 1 if we should use async_io (if avaiable)
172
 
 * @param cache_myflags Bitmap of different flags
173
 
                            MY_WME | MY_FAE | MY_NABP | MY_FNABP | MY_DONT_CHECK_FILESIZE
174
 
 * 
175
 
 * @retval 0 success
176
 
 * @retval # error
177
 
 */
178
 
int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
179
 
                               enum cache_type type_arg, my_off_t seek_offset,
180
 
                               bool use_async_io, myf cache_myflags)
 
131
 
 
132
/*
 
133
  Initialize an IO_CACHE object
 
134
 
 
135
  SYNOPSOS
 
136
    init_io_cache()
 
137
    info                cache handler to initialize
 
138
    file                File that should be associated to to the handler
 
139
                        If == -1 then real_open_cached_file()
 
140
                        will be called when it's time to open file.
 
141
    cachesize           Size of buffer to allocate for read/write
 
142
                        If == 0 then use my_default_record_cache_size
 
143
    type                Type of cache
 
144
    seek_offset         Where cache should start reading/writing
 
145
    use_async_io        Set to 1 of we should use async_io (if avaiable)
 
146
    cache_myflags       Bitmap of differnt flags
 
147
                        MY_WME | MY_FAE | MY_NABP | MY_FNABP |
 
148
                        MY_DONT_CHECK_FILESIZE
 
149
 
 
150
  RETURN
 
151
    0  ok
 
152
    #  error
 
153
*/
 
154
 
 
155
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
 
156
                  enum cache_type type, my_off_t seek_offset,
 
157
                  pbool use_async_io, myf cache_myflags)
181
158
{
182
159
  size_t min_cache;
183
 
  off_t pos;
184
 
  my_off_t end_of_file_local= ~(my_off_t) 0;
 
160
  my_off_t pos;
 
161
  my_off_t end_of_file= ~(my_off_t) 0;
 
162
  DBUG_ENTER("init_io_cache");
 
163
  DBUG_PRINT("enter",("cache: 0x%lx  type: %d  pos: %ld",
 
164
                      (ulong) info, (int) type, (ulong) seek_offset));
185
165
 
186
 
  file= file_arg;
187
 
  type= TYPE_NOT_SET;       /* Don't set it until mutex are created */
188
 
  pos_in_file= seek_offset;
189
 
  pre_close = pre_read = post_read = 0;
190
 
  arg = 0;
191
 
  alloced_buffer = 0;
192
 
  buffer=0;
193
 
  seek_not_done= 0;
 
166
  info->file= file;
 
167
  info->type= TYPE_NOT_SET;         /* Don't set it until mutex are created */
 
168
  info->pos_in_file= seek_offset;
 
169
  info->pre_close = info->pre_read = info->post_read = 0;
 
170
  info->arg = 0;
 
171
  info->alloced_buffer = 0;
 
172
  info->buffer=0;
 
173
  info->seek_not_done= 0;
194
174
 
195
175
  if (file >= 0)
196
176
  {
197
 
    pos= lseek(file, 0, SEEK_CUR);
198
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
177
    pos= my_tell(file, MYF(0));
 
178
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
199
179
    {
200
180
      /*
201
181
         This kind of object doesn't support seek() or tell(). Don't set a
202
182
         flag that will make us again try to seek() later and fail.
203
183
      */
204
 
      seek_not_done= 0;
 
184
      info->seek_not_done= 0;
205
185
      /*
206
186
        Additionally, if we're supposed to start somewhere other than the
207
187
        the beginning of whatever this file is, then somebody made a bad
208
188
        assumption.
209
189
      */
210
 
      assert(seek_offset == 0);
 
190
      DBUG_ASSERT(seek_offset == 0);
211
191
    }
212
192
    else
213
 
      seek_not_done= test(seek_offset != (my_off_t)pos);
 
193
      info->seek_not_done= test(seek_offset != pos);
214
194
  }
215
195
 
 
196
  info->disk_writes= 0;
 
197
#ifdef THREAD
 
198
  info->share=0;
 
199
#endif
 
200
 
216
201
  if (!cachesize && !(cachesize= my_default_record_cache_size))
217
 
    return 1;                           /* No cache requested */
 
202
    DBUG_RETURN(1);                             /* No cache requested */
218
203
  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
219
 
  if (type_arg == READ_CACHE)
 
204
  if (type == READ_CACHE || type == SEQ_READ_APPEND)
220
205
  {                                             /* Assume file isn't growing */
221
206
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
222
207
    {
223
208
      /* Calculate end of file to avoid allocating oversized buffers */
224
 
      end_of_file_local=lseek(file,0L,SEEK_END);
 
209
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
225
210
      /* Need to reset seek_not_done now that we just did a seek. */
226
 
      seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
227
 
      if (end_of_file_local < seek_offset)
228
 
        end_of_file_local=seek_offset;
 
211
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
 
212
      if (end_of_file < seek_offset)
 
213
        end_of_file=seek_offset;
229
214
      /* Trim cache size if the file is very small */
230
 
      if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
 
215
      if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
231
216
      {
232
 
        cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
 
217
        cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
233
218
        use_async_io=0;                         /* No need to use async */
234
219
      }
235
220
    }
236
221
  }
237
222
  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
238
 
  if (type_arg != READ_NET && type_arg != WRITE_NET)
 
223
  if (type != READ_NET && type != WRITE_NET)
239
224
  {
240
225
    /* Retry allocating memory in smaller blocks until we get one */
241
226
    cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
245
230
      if (cachesize < min_cache)
246
231
        cachesize = min_cache;
247
232
      buffer_block= cachesize;
248
 
      if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
249
 
      {
250
 
        my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
251
 
        return 2;
252
 
      }
253
 
 
254
 
      if ((buffer=
255
 
           (unsigned char*) malloc(buffer_block)) != 0)
256
 
      {
257
 
        write_buffer=buffer;
258
 
        alloced_buffer= true;
 
233
      if (type == SEQ_READ_APPEND)
 
234
        buffer_block *= 2;
 
235
      if ((info->buffer=
 
236
           (uchar*) my_malloc(buffer_block,
 
237
                             MYF((cache_myflags & ~ MY_WME) |
 
238
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
239
      {
 
240
        info->write_buffer=info->buffer;
 
241
        if (type == SEQ_READ_APPEND)
 
242
          info->write_buffer = info->buffer + cachesize;
 
243
        info->alloced_buffer=1;
259
244
        break;                                  /* Enough memory found */
260
245
      }
261
246
      if (cachesize == min_cache)
262
 
      {
263
 
        if (type_arg == READ_CACHE)
264
 
          global_read_buffer.sub(buffer_block);
265
 
        return 2;                               /* Can't alloc cache */
266
 
      }
 
247
        DBUG_RETURN(2);                         /* Can't alloc cache */
267
248
      /* Try with less memory */
268
 
      if (type_arg == READ_CACHE)
269
 
        global_read_buffer.sub(buffer_block);
270
249
      cachesize= (cachesize*3/4 & ~(min_cache-1));
271
250
    }
272
251
  }
273
252
 
274
 
  read_length=buffer_length=cachesize;
275
 
  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
276
 
  request_pos= read_pos= write_pos = buffer;
 
253
  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
 
254
  info->read_length=info->buffer_length=cachesize;
 
255
  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
 
256
  info->request_pos= info->read_pos= info->write_pos = info->buffer;
 
257
  if (type == SEQ_READ_APPEND)
 
258
  {
 
259
    info->append_read_pos = info->write_pos = info->write_buffer;
 
260
    info->write_end = info->write_buffer + info->buffer_length;
 
261
#ifdef THREAD
 
262
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
 
263
#endif
 
264
  }
 
265
#if defined(SAFE_MUTEX) && defined(THREAD)
 
266
  else
 
267
  {
 
268
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
269
    bzero((char*) &info->append_buffer_lock, sizeof(info));
 
270
  }
 
271
#endif
277
272
 
278
 
  if (type_arg == WRITE_CACHE)
279
 
    write_end=
280
 
      buffer+buffer_length- (seek_offset & (IO_SIZE-1));
 
273
  if (type == WRITE_CACHE)
 
274
    info->write_end=
 
275
      info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
281
276
  else
282
 
    read_end=buffer;            /* Nothing in cache */
 
277
    info->read_end=info->buffer;                /* Nothing in cache */
283
278
 
284
279
  /* End_of_file may be changed by user later */
285
 
  end_of_file= end_of_file_local;
286
 
  error= 0;
287
 
  type= type_arg;
288
 
  init_functions();
 
280
  info->end_of_file= end_of_file;
 
281
  info->error=0;
 
282
  info->type= type;
 
283
  init_functions(info);
289
284
#ifdef HAVE_AIOWAIT
290
285
  if (use_async_io && ! my_disable_async_io)
291
286
  {
292
 
    read_length/=2;
293
 
    read_function=_my_b_async_read;
 
287
    DBUG_PRINT("info",("Using async io"));
 
288
    info->read_length/=2;
 
289
    info->read_function=_my_b_async_read;
294
290
  }
295
 
  inited= aio_result.pending= 0;
 
291
  info->inited=info->aio_result.pending=0;
296
292
#endif
297
 
  return 0;
 
293
  DBUG_RETURN(0);
298
294
}                                               /* init_io_cache */
299
295
 
300
296
        /* Wait until current request is ready */
311
307
      {
312
308
        if (errno == EINTR)
313
309
          continue;
 
310
        DBUG_PRINT("error",("No aio request, error: %d",errno));
314
311
        result->pending=0;                      /* Assume everythings is ok */
315
312
        break;
316
313
      }
319
316
        break;
320
317
    }
321
318
  }
 
319
  return;
322
320
}
323
321
#endif
324
322
 
325
 
/**
326
 
 * @brief 
327
 
 *   Reset the cache
328
 
 * 
329
 
 * @detail
330
 
 *   Use this to reset cache to re-start reading or to change the type 
331
 
 *   between READ_CACHE <-> WRITE_CACHE
332
 
 *   If we are doing a reinit of a cache where we have the start of the file
333
 
 *   in the cache, we are reusing this memory without flushing it to disk.
334
 
 */
335
 
bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
336
 
                                  my_off_t seek_offset,
337
 
                                  bool use_async_io,
338
 
                                  bool clear_cache)
 
323
 
 
324
/*
 
325
  Use this to reset cache to re-start reading or to change the type
 
326
  between READ_CACHE <-> WRITE_CACHE
 
327
  If we are doing a reinit of a cache where we have the start of the file
 
328
  in the cache, we are reusing this memory without flushing it to disk.
 
329
*/
 
330
 
 
331
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
 
332
                        my_off_t seek_offset,
 
333
                        pbool use_async_io __attribute__((unused)),
 
334
                        pbool clear_cache)
339
335
{
 
336
  DBUG_ENTER("reinit_io_cache");
 
337
  DBUG_PRINT("enter",("cache: 0x%lx type: %d  seek_offset: %lu  clear_cache: %d",
 
338
                      (ulong) info, type, (ulong) seek_offset,
 
339
                      (int) clear_cache));
 
340
 
340
341
  /* One can't do reinit with the following types */
341
 
  assert(type_arg != READ_NET && type != READ_NET &&
342
 
              type_arg != WRITE_NET && type != WRITE_NET);
 
342
  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
 
343
              type != WRITE_NET && info->type != WRITE_NET &&
 
344
              type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
343
345
 
344
346
  /* If the whole file is in memory, avoid flushing to disk */
345
347
  if (! clear_cache &&
346
 
      seek_offset >= pos_in_file &&
347
 
      seek_offset <= my_b_tell(this))
 
348
      seek_offset >= info->pos_in_file &&
 
349
      seek_offset <= my_b_tell(info))
348
350
  {
349
351
    /* Reuse current buffer without flushing it to disk */
350
 
    unsigned char *pos;
351
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
 
352
    uchar *pos;
 
353
    if (info->type == WRITE_CACHE && type == READ_CACHE)
352
354
    {
353
 
      read_end=write_pos;
354
 
      end_of_file=my_b_tell(this);
 
355
      info->read_end=info->write_pos;
 
356
      info->end_of_file=my_b_tell(info);
355
357
      /*
356
358
        Trigger a new seek only if we have a valid
357
359
        file handle.
358
360
      */
359
 
      seek_not_done= (file != -1);
 
361
      info->seek_not_done= (info->file != -1);
360
362
    }
361
 
    else if (type_arg == WRITE_CACHE)
 
363
    else if (type == WRITE_CACHE)
362
364
    {
363
 
      if (type == READ_CACHE)
 
365
      if (info->type == READ_CACHE)
364
366
      {
365
 
        write_end=write_buffer+buffer_length;
366
 
        seek_not_done=1;
 
367
        info->write_end=info->write_buffer+info->buffer_length;
 
368
        info->seek_not_done=1;
367
369
      }
368
 
      end_of_file = ~(my_off_t) 0;
 
370
      info->end_of_file = ~(my_off_t) 0;
369
371
    }
370
 
    pos=request_pos+(seek_offset-pos_in_file);
371
 
    if (type_arg == WRITE_CACHE)
372
 
      write_pos=pos;
 
372
    pos=info->request_pos+(seek_offset-info->pos_in_file);
 
373
    if (type == WRITE_CACHE)
 
374
      info->write_pos=pos;
373
375
    else
374
 
      read_pos= pos;
 
376
      info->read_pos= pos;
375
377
#ifdef HAVE_AIOWAIT
376
 
    my_aiowait(&aio_result);            /* Wait for outstanding req */
 
378
    my_aiowait(&info->aio_result);              /* Wait for outstanding req */
377
379
#endif
378
380
  }
379
381
  else
382
384
      If we change from WRITE_CACHE to READ_CACHE, assume that everything
383
385
      after the current positions should be ignored
384
386
    */
385
 
    if (type == WRITE_CACHE && type_arg == READ_CACHE)
386
 
      end_of_file=my_b_tell(this);
 
387
    if (info->type == WRITE_CACHE && type == READ_CACHE)
 
388
      info->end_of_file=my_b_tell(info);
387
389
    /* flush cache if we want to reuse it */
388
 
    if (!clear_cache && my_b_flush_io_cache(this, 1))
389
 
      return 1;
390
 
    pos_in_file=seek_offset;
 
390
    if (!clear_cache && my_b_flush_io_cache(info,1))
 
391
      DBUG_RETURN(1);
 
392
    info->pos_in_file=seek_offset;
391
393
    /* Better to do always do a seek */
392
 
    seek_not_done=1;
393
 
    request_pos=read_pos=write_pos=buffer;
394
 
    if (type_arg == READ_CACHE)
 
394
    info->seek_not_done=1;
 
395
    info->request_pos=info->read_pos=info->write_pos=info->buffer;
 
396
    if (type == READ_CACHE)
395
397
    {
396
 
      read_end=buffer;          /* Nothing in cache */
 
398
      info->read_end=info->buffer;              /* Nothing in cache */
397
399
    }
398
400
    else
399
401
    {
400
 
      write_end=(buffer + buffer_length -
 
402
      info->write_end=(info->buffer + info->buffer_length -
401
403
                       (seek_offset & (IO_SIZE-1)));
402
 
      end_of_file= ~(my_off_t) 0;
 
404
      info->end_of_file= ~(my_off_t) 0;
403
405
    }
404
406
  }
405
 
  type= type_arg;
406
 
  error=0;
407
 
  init_functions();
 
407
  info->type=type;
 
408
  info->error=0;
 
409
  init_functions(info);
408
410
 
409
411
#ifdef HAVE_AIOWAIT
410
412
  if (use_async_io && ! my_disable_async_io &&
411
 
      ((uint32_t) buffer_length <
412
 
       (uint32_t) (end_of_file - seek_offset)))
 
413
      ((ulong) info->buffer_length <
 
414
       (ulong) (info->end_of_file - seek_offset)))
413
415
  {
414
 
    read_length=buffer_length/2;
415
 
    read_function=_my_b_async_read;
 
416
    info->read_length=info->buffer_length/2;
 
417
    info->read_function=_my_b_async_read;
416
418
  }
417
 
  inited= 0;
418
 
#else
419
 
  (void)use_async_io;
 
419
  info->inited=0;
420
420
#endif
421
 
  return 0;
 
421
  DBUG_RETURN(0);
422
422
} /* reinit_io_cache */
423
423
 
424
 
/**
425
 
 * @brief 
426
 
 *   Read buffered.
427
 
 * 
428
 
 * @detail
429
 
 *   This function is only called from the my_b_read() macro when there
430
 
 *   aren't enough characters in the buffer to satisfy the request.
431
 
 *
432
 
 * WARNING
433
 
 *   When changing this function, be careful with handling file offsets
434
 
 *   (end-of_file, pos_in_file). Do not cast them to possibly smaller
435
 
 *   types than my_off_t unless you can be sure that their value fits.
436
 
 *   Same applies to differences of file offsets.
437
 
 *
438
 
 * @param info st_io_cache pointer @param Buffer Buffer to retrieve count bytes
439
 
 * from file @param Count Number of bytes to read into Buffer
440
 
 * 
441
 
 * @retval 0 We succeeded in reading all data
442
 
 * @retval 1 Error: can't read requested characters
443
 
 */
444
 
static int _my_b_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
424
 
 
425
 
 
426
/*
 
427
  Read buffered.
 
428
 
 
429
  SYNOPSIS
 
430
    _my_b_read()
 
431
      info                      IO_CACHE pointer
 
432
      Buffer                    Buffer to retrieve count bytes from file
 
433
      Count                     Number of bytes to read into Buffer
 
434
 
 
435
  NOTE
 
436
    This function is only called from the my_b_read() macro when there
 
437
    isn't enough characters in the buffer to satisfy the request.
 
438
 
 
439
  WARNING
 
440
 
 
441
    When changing this function, be careful with handling file offsets
 
442
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
443
    types than my_off_t unless you can be sure that their value fits.
 
444
    Same applies to differences of file offsets.
 
445
 
 
446
    When changing this function, check _my_b_read_r(). It might need the
 
447
    same change.
 
448
 
 
449
  RETURN
 
450
    0      we succeeded in reading all data
 
451
    1      Error: can't read requested characters
 
452
*/
 
453
 
 
454
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
445
455
{
446
 
  size_t length_local,diff_length,left_length, max_length;
447
 
  my_off_t pos_in_file_local;
 
456
  size_t length,diff_length,left_length, max_length;
 
457
  my_off_t pos_in_file;
 
458
  DBUG_ENTER("_my_b_read");
448
459
 
449
460
  if ((left_length= (size_t) (info->read_end-info->read_pos)))
450
461
  {
451
 
    assert(Count >= left_length);       /* User is not using my_b_read() */
 
462
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
452
463
    memcpy(Buffer,info->read_pos, left_length);
453
464
    Buffer+=left_length;
454
465
    Count-=left_length;
455
466
  }
456
467
 
457
468
  /* pos_in_file always point on where info->buffer was read */
458
 
  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
 
469
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
470
 
460
 
  /*
461
 
    Whenever a function which operates on st_io_cache flushes/writes
462
 
    some part of the st_io_cache to disk it will set the property
 
471
  /* 
 
472
    Whenever a function which operates on IO_CACHE flushes/writes
 
473
    some part of the IO_CACHE to disk it will set the property
463
474
    "seek_not_done" to indicate this to other functions operating
464
 
    on the st_io_cache.
 
475
    on the IO_CACHE.
465
476
  */
466
477
  if (info->seek_not_done)
467
478
  {
468
 
    if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
 
479
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
480
        != MY_FILEPOS_ERROR))
469
481
    {
470
482
      /* No error, reset seek_not_done flag. */
471
483
      info->seek_not_done= 0;
477
489
        info->file is a pipe or socket or FIFO.  We never should have tried
478
490
        to seek on that.  See Bugs#25807 and #22828 for more info.
479
491
      */
480
 
      assert(errno != ESPIPE);
 
492
      DBUG_ASSERT(my_errno != ESPIPE);
481
493
      info->error= -1;
482
 
      return(1);
 
494
      DBUG_RETURN(1);
483
495
    }
484
496
  }
485
497
 
486
 
  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
 
498
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
487
499
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
488
500
  {                                     /* Fill first intern buffer */
489
501
    size_t read_length;
490
 
    if (info->end_of_file <= pos_in_file_local)
 
502
    if (info->end_of_file <= pos_in_file)
491
503
    {                                   /* End of file */
492
504
      info->error= (int) left_length;
493
 
      return(1);
 
505
      DBUG_RETURN(1);
494
506
    }
495
 
    length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
496
 
    if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
 
507
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
508
    if ((read_length= my_read(info->file,Buffer, length, info->myflags))
 
509
        != length)
497
510
    {
498
511
      info->error= (read_length == (size_t) -1 ? -1 :
499
512
                    (int) (read_length+left_length));
500
 
      return(1);
 
513
      DBUG_RETURN(1);
501
514
    }
502
 
    Count-= length_local;
503
 
    Buffer+= length_local;
504
 
    pos_in_file_local+= length_local;
505
 
    left_length+= length_local;
 
515
    Count-=length;
 
516
    Buffer+=length;
 
517
    pos_in_file+=length;
 
518
    left_length+=length;
506
519
    diff_length=0;
507
520
  }
508
521
 
509
522
  max_length= info->read_length-diff_length;
510
523
  if (info->type != READ_FIFO &&
511
 
      max_length > (info->end_of_file - pos_in_file_local))
512
 
    max_length= (size_t) (info->end_of_file - pos_in_file_local);
 
524
      max_length > (info->end_of_file - pos_in_file))
 
525
    max_length= (size_t) (info->end_of_file - pos_in_file);
513
526
  if (!max_length)
514
527
  {
515
528
    if (Count)
516
529
    {
517
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
518
 
      return(1);
 
530
      info->error= left_length;         /* We only got this many char */
 
531
      DBUG_RETURN(1);
519
532
    }
520
 
     length_local=0;                            /* Didn't read any chars */
 
533
    length=0;                           /* Didn't read any chars */
521
534
  }
522
 
  else if (( length_local= my_read(info->file,info->buffer, max_length,
 
535
  else if ((length= my_read(info->file,info->buffer, max_length,
523
536
                            info->myflags)) < Count ||
524
 
            length_local == (size_t) -1)
 
537
           length == (size_t) -1)
525
538
  {
526
 
    if ( length_local != (size_t) -1)
527
 
      memcpy(Buffer, info->buffer,  length_local);
528
 
    info->pos_in_file= pos_in_file_local;
529
 
    info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
 
539
    if (length != (size_t) -1)
 
540
      memcpy(Buffer, info->buffer, length);
 
541
    info->pos_in_file= pos_in_file;
 
542
    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
530
543
    info->read_pos=info->read_end=info->buffer;
531
 
    return(1);
 
544
    DBUG_RETURN(1);
532
545
  }
533
546
  info->read_pos=info->buffer+Count;
534
 
  info->read_end=info->buffer+ length_local;
535
 
  info->pos_in_file=pos_in_file_local;
 
547
  info->read_end=info->buffer+length;
 
548
  info->pos_in_file=pos_in_file;
536
549
  memcpy(Buffer, info->buffer, Count);
537
 
  return(0);
 
550
  DBUG_RETURN(0);
 
551
}
 
552
 
 
553
 
 
554
#ifdef THREAD
 
555
/*
 
556
  Prepare IO_CACHE for shared use.
 
557
 
 
558
  SYNOPSIS
 
559
    init_io_cache_share()
 
560
      read_cache                A read cache. This will be copied for
 
561
                                every thread after setup.
 
562
      cshare                    The share.
 
563
      write_cache               If non-NULL a write cache that is to be
 
564
                                synchronized with the read caches.
 
565
      num_threads               Number of threads sharing the cache
 
566
                                including the write thread if any.
 
567
 
 
568
  DESCRIPTION
 
569
 
 
570
    The shared cache is used so: One IO_CACHE is initialized with
 
571
    init_io_cache(). This includes the allocation of a buffer. Then a
 
572
    share is allocated and init_io_cache_share() is called with the io
 
573
    cache and the share. Then the io cache is copied for each thread. So
 
574
    every thread has its own copy of IO_CACHE. But the allocated buffer
 
575
    is shared because cache->buffer is the same for all caches.
 
576
 
 
577
    One thread reads data from the file into the buffer. All threads
 
578
    read from the buffer, but every thread maintains its own set of
 
579
    pointers into the buffer. When all threads have used up the buffer
 
580
    contents, one of the threads reads the next block of data into the
 
581
    buffer. To accomplish this, each thread enters the cache lock before
 
582
    accessing the buffer. They wait in lock_io_cache() until all threads
 
583
    joined the lock. The last thread entering the lock is in charge of
 
584
    reading from file to buffer. It wakes all threads when done.
 
585
 
 
586
    Synchronizing a write cache to the read caches works so: Whenever
 
587
    the write buffer needs a flush, the write thread enters the lock and
 
588
    waits for all other threads to enter the lock too. They do this when
 
589
    they have used up the read buffer. When all threads are in the lock,
 
590
    the write thread copies the write buffer to the read buffer and
 
591
    wakes all threads.
 
592
 
 
593
    share->running_threads is the number of threads not being in the
 
594
    cache lock. When entering lock_io_cache() the number is decreased.
 
595
    When the thread that fills the buffer enters unlock_io_cache() the
 
596
    number is reset to the number of threads. The condition
 
597
    running_threads == 0 means that all threads are in the lock. Bumping
 
598
    up the number to the full count is non-intuitive. But increasing the
 
599
    number by one for each thread that leaves the lock could lead to a
 
600
    solo run of one thread. The last thread to join a lock reads from
 
601
    file to buffer, wakes the other threads, processes the data in the
 
602
    cache and enters the lock again. If no other thread left the lock
 
603
    meanwhile, it would think it's the last one again and read the next
 
604
    block...
 
605
 
 
606
    The share has copies of 'error', 'buffer', 'read_end', and
 
607
    'pos_in_file' from the thread that filled the buffer. We may not be
 
608
    able to access this information directly from its cache because the
 
609
    thread may be removed from the share before the variables could be
 
610
    copied by all other threads. Or, if a write buffer is synchronized,
 
611
    it would change its 'pos_in_file' after waking the other threads,
 
612
    possibly before they could copy its value.
 
613
 
 
614
    However, the 'buffer' variable in the share is for a synchronized
 
615
    write cache. It needs to know where to put the data. Otherwise it
 
616
    would need access to the read cache of one of the threads that is
 
617
    not yet removed from the share.
 
618
 
 
619
  RETURN
 
620
    void
 
621
*/
 
622
 
 
623
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
 
624
                         IO_CACHE *write_cache, uint num_threads)
 
625
{
 
626
  DBUG_ENTER("init_io_cache_share");
 
627
  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
 
628
                                "write_cache: 0x%lx  threads: %u",
 
629
                                (long) read_cache, (long) cshare,
 
630
                                (long) write_cache, num_threads));
 
631
 
 
632
  DBUG_ASSERT(num_threads > 1);
 
633
  DBUG_ASSERT(read_cache->type == READ_CACHE);
 
634
  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
 
635
 
 
636
  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
 
637
  pthread_cond_init(&cshare->cond, 0);
 
638
  pthread_cond_init(&cshare->cond_writer, 0);
 
639
 
 
640
  cshare->running_threads= num_threads;
 
641
  cshare->total_threads=   num_threads;
 
642
  cshare->error=           0;    /* Initialize. */
 
643
  cshare->buffer=          read_cache->buffer;
 
644
  cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
 
645
  cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
 
646
  cshare->source_cache=    write_cache; /* Can be NULL. */
 
647
 
 
648
  read_cache->share=         cshare;
 
649
  read_cache->read_function= _my_b_read_r;
 
650
  read_cache->current_pos=   NULL;
 
651
  read_cache->current_end=   NULL;
 
652
 
 
653
  if (write_cache)
 
654
    write_cache->share= cshare;
 
655
 
 
656
  DBUG_VOID_RETURN;
 
657
}
 
658
 
 
659
 
 
660
/*
 
661
  Remove a thread from shared access to IO_CACHE.
 
662
 
 
663
  SYNOPSIS
 
664
    remove_io_thread()
 
665
      cache                     The IO_CACHE to be removed from the share.
 
666
 
 
667
  NOTE
 
668
 
 
669
    Every thread must do that on exit for not to deadlock other threads.
 
670
 
 
671
    The last thread destroys the pthread resources.
 
672
 
 
673
    A writer flushes its cache first.
 
674
 
 
675
  RETURN
 
676
    void
 
677
*/
 
678
 
 
679
void remove_io_thread(IO_CACHE *cache)
 
680
{
 
681
  IO_CACHE_SHARE *cshare= cache->share;
 
682
  uint total;
 
683
  DBUG_ENTER("remove_io_thread");
 
684
 
 
685
  /* If the writer goes, it needs to flush the write cache. */
 
686
  if (cache == cshare->source_cache)
 
687
    flush_io_cache(cache);
 
688
 
 
689
  pthread_mutex_lock(&cshare->mutex);
 
690
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
 
691
                                (cache == cshare->source_cache) ?
 
692
                                "writer" : "reader", (long) cache));
 
693
 
 
694
  /* Remove from share. */
 
695
  total= --cshare->total_threads;
 
696
  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
 
697
 
 
698
  /* Detach from share. */
 
699
  cache->share= NULL;
 
700
 
 
701
  /* If the writer goes, let the readers know. */
 
702
  if (cache == cshare->source_cache)
 
703
  {
 
704
    DBUG_PRINT("io_cache_share", ("writer leaves"));
 
705
    cshare->source_cache= NULL;
 
706
  }
 
707
 
 
708
  /* If all threads are waiting for me to join the lock, wake them. */
 
709
  if (!--cshare->running_threads)
 
710
  {
 
711
    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
 
712
    pthread_cond_signal(&cshare->cond_writer);
 
713
    pthread_cond_broadcast(&cshare->cond);
 
714
  }
 
715
 
 
716
  pthread_mutex_unlock(&cshare->mutex);
 
717
 
 
718
  if (!total)
 
719
  {
 
720
    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
 
721
    pthread_cond_destroy (&cshare->cond_writer);
 
722
    pthread_cond_destroy (&cshare->cond);
 
723
    pthread_mutex_destroy(&cshare->mutex);
 
724
  }
 
725
 
 
726
  DBUG_VOID_RETURN;
 
727
}
 
728
 
 
729
 
 
730
/*
 
731
  Lock IO cache and wait for all other threads to join.
 
732
 
 
733
  SYNOPSIS
 
734
    lock_io_cache()
 
735
      cache                     The cache of the thread entering the lock.
 
736
      pos                       File position of the block to read.
 
737
                                Unused for the write thread.
 
738
 
 
739
  DESCRIPTION
 
740
 
 
741
    Wait for all threads to finish with the current buffer. We want
 
742
    all threads to proceed in concert. The last thread to join
 
743
    lock_io_cache() will read the block from file and all threads start
 
744
    to use it. Then they will join again for reading the next block.
 
745
 
 
746
    The waiting threads detect a fresh buffer by comparing
 
747
    cshare->pos_in_file with the position they want to process next.
 
748
    Since the first block may start at position 0, we take
 
749
    cshare->read_end as an additional condition. This variable is
 
750
    initialized to NULL and will be set after a block of data is written
 
751
    to the buffer.
 
752
 
 
753
  RETURN
 
754
    1           OK, lock in place, go ahead and read.
 
755
    0           OK, unlocked, another thread did the read.
 
756
*/
 
757
 
 
758
static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 
759
{
 
760
  IO_CACHE_SHARE *cshare= cache->share;
 
761
  DBUG_ENTER("lock_io_cache");
 
762
 
 
763
  /* Enter the lock. */
 
764
  pthread_mutex_lock(&cshare->mutex);
 
765
  cshare->running_threads--;
 
766
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
767
                                (cache == cshare->source_cache) ?
 
768
                                "writer" : "reader", (long) cache, (ulong) pos,
 
769
                                cshare->running_threads));
 
770
 
 
771
  if (cshare->source_cache)
 
772
  {
 
773
    /* A write cache is synchronized to the read caches. */
 
774
 
 
775
    if (cache == cshare->source_cache)
 
776
    {
 
777
      /* The writer waits until all readers are here. */
 
778
      while (cshare->running_threads)
 
779
      {
 
780
        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
 
781
        pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
 
782
      }
 
783
      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
 
784
 
 
785
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
786
      DBUG_RETURN(1);
 
787
    }
 
788
 
 
789
    /* The last thread wakes the writer. */
 
790
    if (!cshare->running_threads)
 
791
    {
 
792
      DBUG_PRINT("io_cache_share", ("waking writer"));
 
793
      pthread_cond_signal(&cshare->cond_writer);
 
794
    }
 
795
 
 
796
    /*
 
797
      Readers wait until the data is copied from the writer. Another
 
798
      reason to stop waiting is the removal of the write thread. If this
 
799
      happens, we leave the lock with old data in the buffer.
 
800
    */
 
801
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
802
           cshare->source_cache)
 
803
    {
 
804
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
805
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
806
    }
 
807
 
 
808
    /*
 
809
      If the writer was removed from the share while this thread was
 
810
      asleep, we need to simulate an EOF condition. The writer cannot
 
811
      reset the share variables as they might still be in use by readers
 
812
      of the last block. When we awake here then because the last
 
813
      joining thread signalled us. If the writer is not the last, it
 
814
      will not signal. So it is safe to clear the buffer here.
 
815
    */
 
816
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
817
    {
 
818
      DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
 
819
      cshare->read_end= cshare->buffer; /* Empty buffer. */
 
820
      cshare->error= 0; /* EOF is not an error. */
 
821
    }
 
822
  }
 
823
  else
 
824
  {
 
825
    /*
 
826
      There are read caches only. The last thread arriving in
 
827
      lock_io_cache() continues with a locked cache and reads the block.
 
828
    */
 
829
    if (!cshare->running_threads)
 
830
    {
 
831
      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
 
832
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
833
      DBUG_RETURN(1);
 
834
    }
 
835
 
 
836
    /*
 
837
      All other threads wait until the requested block is read by the
 
838
      last thread arriving. Another reason to stop waiting is the
 
839
      removal of a thread. If this leads to all threads being in the
 
840
      lock, we have to continue also. The first of the awaken threads
 
841
      will then do the read.
 
842
    */
 
843
    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
 
844
           cshare->running_threads)
 
845
    {
 
846
      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
 
847
      pthread_cond_wait(&cshare->cond, &cshare->mutex);
 
848
    }
 
849
 
 
850
    /* If the block is not yet read, continue with a locked cache and read. */
 
851
    if (!cshare->read_end || (cshare->pos_in_file < pos))
 
852
    {
 
853
      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
 
854
      /* Stay locked. Leave the lock later by unlock_io_cache(). */
 
855
      DBUG_RETURN(1);
 
856
    }
 
857
 
 
858
    /* Another thread did read the block already. */
 
859
  }
 
860
  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
 
861
                                (uint) (cshare->read_end ? (size_t)
 
862
                                        (cshare->read_end - cshare->buffer) :
 
863
                                        0)));
 
864
 
 
865
  /*
 
866
    Leave the lock. Do not call unlock_io_cache() later. The thread that
 
867
    filled the buffer did this and marked all threads as running.
 
868
  */
 
869
  pthread_mutex_unlock(&cshare->mutex);
 
870
  DBUG_RETURN(0);
 
871
}
 
872
 
 
873
 
 
874
/*
 
875
  Unlock IO cache.
 
876
 
 
877
  SYNOPSIS
 
878
    unlock_io_cache()
 
879
      cache                     The cache of the thread leaving the lock.
 
880
 
 
881
  NOTE
 
882
    This is called by the thread that filled the buffer. It marks all
 
883
    threads as running and awakes them. This must not be done by any
 
884
    other thread.
 
885
 
 
886
    Do not signal cond_writer. Either there is no writer or the writer
 
887
    is the only one who can call this function.
 
888
 
 
889
    The reason for resetting running_threads to total_threads before
 
890
    waking all other threads is that it could be possible that this
 
891
    thread is so fast with processing the buffer that it enters the lock
 
892
    before even one other thread has left it. If every awoken thread
 
893
    would increase running_threads by one, this thread could think that
 
894
    he is again the last to join and would not wait for the other
 
895
    threads to process the data.
 
896
 
 
897
  RETURN
 
898
    void
 
899
*/
 
900
 
 
901
static void unlock_io_cache(IO_CACHE *cache)
 
902
{
 
903
  IO_CACHE_SHARE *cshare= cache->share;
 
904
  DBUG_ENTER("unlock_io_cache");
 
905
  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
 
906
                                (cache == cshare->source_cache) ?
 
907
                                "writer" : "reader",
 
908
                                (long) cache, (ulong) cshare->pos_in_file,
 
909
                                cshare->total_threads));
 
910
 
 
911
  cshare->running_threads= cshare->total_threads;
 
912
  pthread_cond_broadcast(&cshare->cond);
 
913
  pthread_mutex_unlock(&cshare->mutex);
 
914
  DBUG_VOID_RETURN;
 
915
}
 
916
 
 
917
 
 
918
/*
 
919
  Read from IO_CACHE when it is shared between several threads.
 
920
 
 
921
  SYNOPSIS
 
922
    _my_b_read_r()
 
923
      cache                     IO_CACHE pointer
 
924
      Buffer                    Buffer to retrieve count bytes from file
 
925
      Count                     Number of bytes to read into Buffer
 
926
 
 
927
  NOTE
 
928
    This function is only called from the my_b_read() macro when there
 
929
    isn't enough characters in the buffer to satisfy the request.
 
930
 
 
931
  IMPLEMENTATION
 
932
 
 
933
    It works as follows: when a thread tries to read from a file (that
 
934
    is, after using all the data from the (shared) buffer), it just
 
935
    hangs on lock_io_cache(), waiting for other threads. When the very
 
936
    last thread attempts a read, lock_io_cache() returns 1, the thread
 
937
    does actual IO and unlock_io_cache(), which signals all the waiting
 
938
    threads that data is in the buffer.
 
939
 
 
940
  WARNING
 
941
 
 
942
    When changing this function, be careful with handling file offsets
 
943
    (end-of_file, pos_in_file). Do not cast them to possibly smaller
 
944
    types than my_off_t unless you can be sure that their value fits.
 
945
    Same applies to differences of file offsets. (Bug #11527)
 
946
 
 
947
    When changing this function, check _my_b_read(). It might need the
 
948
    same change.
 
949
 
 
950
  RETURN
 
951
    0      we succeeded in reading all data
 
952
    1      Error: can't read requested characters
 
953
*/
 
954
 
 
955
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
956
{
 
957
  my_off_t pos_in_file;
 
958
  size_t length, diff_length, left_length;
 
959
  IO_CACHE_SHARE *cshare= cache->share;
 
960
  DBUG_ENTER("_my_b_read_r");
 
961
 
 
962
  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
 
963
  {
 
964
    DBUG_ASSERT(Count >= left_length);  /* User is not using my_b_read() */
 
965
    memcpy(Buffer, cache->read_pos, left_length);
 
966
    Buffer+= left_length;
 
967
    Count-= left_length;
 
968
  }
 
969
  while (Count)
 
970
  {
 
971
    size_t cnt, len;
 
972
 
 
973
    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
 
974
    diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
975
    length=IO_ROUND_UP(Count+diff_length)-diff_length;
 
976
    length= ((length <= cache->read_length) ?
 
977
             length + IO_ROUND_DN(cache->read_length - length) :
 
978
             length - IO_ROUND_UP(length - cache->read_length));
 
979
    if (cache->type != READ_FIFO &&
 
980
        (length > (cache->end_of_file - pos_in_file)))
 
981
      length= (size_t) (cache->end_of_file - pos_in_file);
 
982
    if (length == 0)
 
983
    {
 
984
      cache->error= (int) left_length;
 
985
      DBUG_RETURN(1);
 
986
    }
 
987
    if (lock_io_cache(cache, pos_in_file))
 
988
    {
 
989
      /* With a synchronized write/read cache we won't come here... */
 
990
      DBUG_ASSERT(!cshare->source_cache);
 
991
      /*
 
992
        ... unless the writer has gone before this thread entered the
 
993
        lock. Simulate EOF in this case. It can be distinguished by
 
994
        cache->file.
 
995
      */
 
996
      if (cache->file < 0)
 
997
        len= 0;
 
998
      else
 
999
      {
 
1000
        /*
 
1001
          Whenever a function which operates on IO_CACHE flushes/writes
 
1002
          some part of the IO_CACHE to disk it will set the property
 
1003
          "seek_not_done" to indicate this to other functions operating
 
1004
          on the IO_CACHE.
 
1005
        */
 
1006
        if (cache->seek_not_done)
 
1007
        {
 
1008
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
1009
              == MY_FILEPOS_ERROR)
 
1010
          {
 
1011
            cache->error= -1;
 
1012
            unlock_io_cache(cache);
 
1013
            DBUG_RETURN(1);
 
1014
          }
 
1015
        }
 
1016
        len= my_read(cache->file, cache->buffer, length, cache->myflags);
 
1017
      }
 
1018
      DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
 
1019
 
 
1020
      cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
 
1021
      cache->error=       (len == length ? 0 : (int) len);
 
1022
      cache->pos_in_file= pos_in_file;
 
1023
 
 
1024
      /* Copy important values to the share. */
 
1025
      cshare->error=       cache->error;
 
1026
      cshare->read_end=    cache->read_end;
 
1027
      cshare->pos_in_file= pos_in_file;
 
1028
 
 
1029
      /* Mark all threads as running and wake them. */
 
1030
      unlock_io_cache(cache);
 
1031
    }
 
1032
    else
 
1033
    {
 
1034
      /*
 
1035
        With a synchronized write/read cache readers always come here.
 
1036
        Copy important values from the share.
 
1037
      */
 
1038
      cache->error=       cshare->error;
 
1039
      cache->read_end=    cshare->read_end;
 
1040
      cache->pos_in_file= cshare->pos_in_file;
 
1041
 
 
1042
      len= ((cache->error == -1) ? (size_t) -1 :
 
1043
            (size_t) (cache->read_end - cache->buffer));
 
1044
    }
 
1045
    cache->read_pos=      cache->buffer;
 
1046
    cache->seek_not_done= 0;
 
1047
    if (len == 0 || len == (size_t) -1)
 
1048
    {
 
1049
      DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
 
1050
                                    (ulong) len, (ulong) left_length));
 
1051
      cache->error= (int) left_length;
 
1052
      DBUG_RETURN(1);
 
1053
    }
 
1054
    cnt= (len > Count) ? Count : len;
 
1055
    memcpy(Buffer, cache->read_pos, cnt);
 
1056
    Count -= cnt;
 
1057
    Buffer+= cnt;
 
1058
    left_length+= cnt;
 
1059
    cache->read_pos+= cnt;
 
1060
  }
 
1061
  DBUG_RETURN(0);
 
1062
}
 
1063
 
 
1064
 
 
1065
/*
 
1066
  Copy data from write cache to read cache.
 
1067
 
 
1068
  SYNOPSIS
 
1069
    copy_to_read_buffer()
 
1070
      write_cache               The write cache.
 
1071
      write_buffer              The source of data, mostly the cache buffer.
 
1072
      write_length              The number of bytes to copy.
 
1073
 
 
1074
  NOTE
 
1075
    The write thread will wait for all read threads to join the cache
 
1076
    lock. Then it copies the data over and wakes the read threads.
 
1077
 
 
1078
  RETURN
 
1079
    void
 
1080
*/
 
1081
 
 
1082
static void copy_to_read_buffer(IO_CACHE *write_cache,
 
1083
                                const uchar *write_buffer, size_t write_length)
 
1084
{
 
1085
  IO_CACHE_SHARE *cshare= write_cache->share;
 
1086
 
 
1087
  DBUG_ASSERT(cshare->source_cache == write_cache);
 
1088
  /*
 
1089
    write_length is usually less or equal to buffer_length.
 
1090
    It can be bigger if _my_b_write() is called with a big length.
 
1091
  */
 
1092
  while (write_length)
 
1093
  {
 
1094
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1095
    int  __attribute__((unused)) rc;
 
1096
 
 
1097
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
 
1098
    /* The writing thread does always have the lock when it awakes. */
 
1099
    DBUG_ASSERT(rc);
 
1100
 
 
1101
    memcpy(cshare->buffer, write_buffer, copy_length);
 
1102
 
 
1103
    cshare->error=       0;
 
1104
    cshare->read_end=    cshare->buffer + copy_length;
 
1105
    cshare->pos_in_file= write_cache->pos_in_file;
 
1106
 
 
1107
    /* Mark all threads as running and wake them. */
 
1108
    unlock_io_cache(write_cache);
 
1109
 
 
1110
    write_buffer+= copy_length;
 
1111
    write_length-= copy_length;
 
1112
  }
 
1113
}
 
1114
#endif /*THREAD*/
 
1115
 
 
1116
 
 
1117
/*
 
1118
  Do sequential read from the SEQ_READ_APPEND cache.
 
1119
  
 
1120
  We do this in three stages:
 
1121
   - first read from info->buffer
 
1122
   - then if there are still data to read, try the file descriptor
 
1123
   - afterwards, if there are still data to read, try append buffer
 
1124
 
 
1125
  RETURNS
 
1126
    0  Success
 
1127
    1  Failed to read
 
1128
*/
 
1129
 
 
1130
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1131
{
 
1132
  size_t length, diff_length, left_length, save_count, max_length;
 
1133
  my_off_t pos_in_file;
 
1134
  save_count=Count;
 
1135
 
 
1136
  /* first, read the regular buffer */
 
1137
  if ((left_length=(size_t) (info->read_end-info->read_pos)))
 
1138
  {
 
1139
    DBUG_ASSERT(Count > left_length);   /* User is not using my_b_read() */
 
1140
    memcpy(Buffer,info->read_pos, left_length);
 
1141
    Buffer+=left_length;
 
1142
    Count-=left_length;
 
1143
  }
 
1144
  lock_append_buffer(info);
 
1145
 
 
1146
  /* pos_in_file always point on where info->buffer was read */
 
1147
  if ((pos_in_file=info->pos_in_file +
 
1148
       (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
 
1149
    goto read_append_buffer;
 
1150
 
 
1151
  /*
 
1152
    With read-append cache we must always do a seek before we read,
 
1153
    because the write could have moved the file pointer astray
 
1154
  */
 
1155
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1156
  {
 
1157
   info->error= -1;
 
1158
   unlock_append_buffer(info);
 
1159
   return (1);
 
1160
  }
 
1161
  info->seek_not_done=0;
 
1162
 
 
1163
  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
 
1164
 
 
1165
  /* now the second stage begins - read from file descriptor */
 
1166
  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
 
1167
  {
 
1168
    /* Fill first intern buffer */
 
1169
    size_t read_length;
 
1170
 
 
1171
    length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
 
1172
    if ((read_length= my_read(info->file,Buffer, length,
 
1173
                              info->myflags)) == (size_t) -1)
 
1174
    {
 
1175
      info->error= -1;
 
1176
      unlock_append_buffer(info);
 
1177
      return 1;
 
1178
    }
 
1179
    Count-=read_length;
 
1180
    Buffer+=read_length;
 
1181
    pos_in_file+=read_length;
 
1182
 
 
1183
    if (read_length != length)
 
1184
    {
 
1185
      /*
 
1186
        We only got part of data;  Read the rest of the data from the
 
1187
        write buffer
 
1188
      */
 
1189
      goto read_append_buffer;
 
1190
    }
 
1191
    left_length+=length;
 
1192
    diff_length=0;
 
1193
  }
 
1194
 
 
1195
  max_length= info->read_length-diff_length;
 
1196
  if (max_length > (info->end_of_file - pos_in_file))
 
1197
    max_length= (size_t) (info->end_of_file - pos_in_file);
 
1198
  if (!max_length)
 
1199
  {
 
1200
    if (Count)
 
1201
      goto read_append_buffer;
 
1202
    length=0;                           /* Didn't read any more chars */
 
1203
  }
 
1204
  else
 
1205
  {
 
1206
    length= my_read(info->file,info->buffer, max_length, info->myflags);
 
1207
    if (length == (size_t) -1)
 
1208
    {
 
1209
      info->error= -1;
 
1210
      unlock_append_buffer(info);
 
1211
      return 1;
 
1212
    }
 
1213
    if (length < Count)
 
1214
    {
 
1215
      memcpy(Buffer, info->buffer, length);
 
1216
      Count -= length;
 
1217
      Buffer += length;
 
1218
 
 
1219
      /*
 
1220
         added the line below to make
 
1221
         DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
 
1222
         otherwise this does not appear to be needed
 
1223
      */
 
1224
      pos_in_file += length;
 
1225
      goto read_append_buffer;
 
1226
    }
 
1227
  }
 
1228
  unlock_append_buffer(info);
 
1229
  info->read_pos=info->buffer+Count;
 
1230
  info->read_end=info->buffer+length;
 
1231
  info->pos_in_file=pos_in_file;
 
1232
  memcpy(Buffer,info->buffer,(size_t) Count);
 
1233
  return 0;
 
1234
 
 
1235
read_append_buffer:
 
1236
 
 
1237
  /*
 
1238
     Read data from the current write buffer.
 
1239
     Count should never be == 0 here (The code will work even if count is 0)
 
1240
  */
 
1241
 
 
1242
  {
 
1243
    /* First copy the data to Count */
 
1244
    size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
 
1245
    size_t copy_len;
 
1246
    size_t transfer_len;
 
1247
 
 
1248
    DBUG_ASSERT(info->append_read_pos <= info->write_pos);
 
1249
    /*
 
1250
      TODO: figure out if the assert below is needed or correct.
 
1251
    */
 
1252
    DBUG_ASSERT(pos_in_file == info->end_of_file);
 
1253
    copy_len=min(Count, len_in_buff);
 
1254
    memcpy(Buffer, info->append_read_pos, copy_len);
 
1255
    info->append_read_pos += copy_len;
 
1256
    Count -= copy_len;
 
1257
    if (Count)
 
1258
      info->error = save_count - Count;
 
1259
 
 
1260
    /* Fill read buffer with data from write buffer */
 
1261
    memcpy(info->buffer, info->append_read_pos,
 
1262
           (size_t) (transfer_len=len_in_buff - copy_len));
 
1263
    info->read_pos= info->buffer;
 
1264
    info->read_end= info->buffer+transfer_len;
 
1265
    info->append_read_pos=info->write_pos;
 
1266
    info->pos_in_file=pos_in_file+copy_len;
 
1267
    info->end_of_file+=len_in_buff;
 
1268
  }
 
1269
  unlock_append_buffer(info);
 
1270
  return Count ? 1 : 0;
538
1271
}
539
1272
 
540
1273
 
541
1274
#ifdef HAVE_AIOWAIT
542
1275
 
543
 
/**
544
 
 * @brief
545
 
 *   Read from the st_io_cache into a buffer and feed asynchronously from disk when needed.
546
 
 *
547
 
 * @param info st_io_cache pointer
548
 
 * @param Buffer Buffer to retrieve count bytes from file
549
 
 * @param Count Number of bytes to read into Buffer
550
 
 * 
551
 
 * @retval -1 An error has occurred; errno is set.
552
 
 * @retval 0 Success
553
 
 * @retval 1 An error has occurred; st_io_cache to error state.
554
 
 */
555
 
int _my_b_async_read(register st_io_cache *info, unsigned char *Buffer, size_t Count)
 
1276
/*
 
1277
  Read from the IO_CACHE into a buffer and feed asynchronously
 
1278
  from disk when needed.
 
1279
 
 
1280
  SYNOPSIS
 
1281
    _my_b_async_read()
 
1282
      info                      IO_CACHE pointer
 
1283
      Buffer                    Buffer to retrieve count bytes from file
 
1284
      Count                     Number of bytes to read into Buffer
 
1285
 
 
1286
  RETURN VALUE
 
1287
    -1          An error has occurred; my_errno is set.
 
1288
     0          Success
 
1289
     1          An error has occurred; IO_CACHE to error state.
 
1290
*/
 
1291
 
 
1292
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
556
1293
{
557
 
  size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
 
1294
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
558
1295
  size_t max_length;
559
1296
  my_off_t next_pos_in_file;
560
 
  unsigned char *read_buffer;
 
1297
  uchar *read_buffer;
561
1298
 
562
1299
  memcpy(Buffer,info->read_pos,
563
1300
         (left_length= (size_t) (info->read_end-info->read_pos)));
575
1312
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
576
1313
                 my_filename(info->file),
577
1314
                 info->aio_result.result.aio_errno);
578
 
      errno=info->aio_result.result.aio_errno;
 
1315
      my_errno=info->aio_result.result.aio_errno;
579
1316
      info->error= -1;
580
1317
      return(1);
581
1318
    }
582
1319
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
583
1320
        read_length == (size_t) -1)
584
1321
    {
585
 
      errno=0;                          /* For testing */
 
1322
      my_errno=0;                               /* For testing */
586
1323
      info->error= (read_length == (size_t) -1 ? -1 :
587
1324
                    (int) (read_length+left_length));
588
1325
      return(1);
614
1351
        read_length-=offset;                    /* Bytes left from read_pos */
615
1352
      }
616
1353
    }
 
1354
#ifndef DBUG_OFF
 
1355
    if (info->aio_read_pos > info->pos_in_file)
 
1356
    {
 
1357
      my_errno=EINVAL;
 
1358
      return(info->read_length= (size_t) -1);
 
1359
    }
 
1360
#endif
617
1361
        /* Copy found bytes to buffer */
618
 
    length_local=min(Count,read_length);
619
 
    memcpy(Buffer,info->read_pos,(size_t) length_local);
620
 
    Buffer+=length_local;
621
 
    Count-=length_local;
622
 
    left_length+=length_local;
 
1362
    length=min(Count,read_length);
 
1363
    memcpy(Buffer,info->read_pos,(size_t) length);
 
1364
    Buffer+=length;
 
1365
    Count-=length;
 
1366
    left_length+=length;
623
1367
    info->read_end=info->rc_pos+read_length;
624
 
    info->read_pos+=length_local;
 
1368
    info->read_pos+=length;
625
1369
  }
626
1370
  else
627
1371
    next_pos_in_file=(info->pos_in_file+ (size_t)
635
1379
      info->error=(int) (read_length+left_length);
636
1380
      return 1;
637
1381
    }
638
 
 
639
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1382
    
 
1383
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1384
        == MY_FILEPOS_ERROR)
640
1385
    {
641
1386
      info->error= -1;
642
1387
      return (1);
659
1404
      {                                 /* Didn't find hole block */
660
1405
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
661
1406
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
662
 
                   my_filename(info->file),errno);
 
1407
                   my_filename(info->file),my_errno);
663
1408
        info->error=(int) (read_length+left_length);
664
1409
        return 1;
665
1410
      }
691
1436
  if (max_length)
692
1437
  {
693
1438
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
 
1439
    DBUG_PRINT("aioread",("filepos: %ld  length: %lu",
 
1440
                          (ulong) next_pos_in_file, (ulong) max_length));
694
1441
    if (aioread(info->file,read_buffer, max_length,
695
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1442
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
696
1443
                &info->aio_result.result))
697
1444
    {                                           /* Skip async io */
698
 
      errno=errno;
 
1445
      my_errno=errno;
 
1446
      DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
 
1447
                          errno, info->aio_result.result.aio_errno));
699
1448
      if (info->request_pos != info->buffer)
700
1449
      {
701
 
        memmove(info->buffer, info->request_pos,
702
 
                (size_t) (info->read_end - info->read_pos));
 
1450
        bmove(info->buffer,info->request_pos,
 
1451
              (size_t) (info->read_end - info->read_pos));
703
1452
        info->request_pos=info->buffer;
704
1453
        info->read_pos-=info->read_length;
705
1454
        info->read_end-=info->read_length;
715
1464
#endif
716
1465
 
717
1466
 
718
 
/**
719
 
 * @brief
720
 
 *   Read one byte when buffer is empty
721
 
 */
722
 
int _my_b_get(st_io_cache *info)
 
1467
/* Read one byte when buffer is empty */
 
1468
 
 
1469
int _my_b_get(IO_CACHE *info)
723
1470
{
724
 
  unsigned char buff;
 
1471
  uchar buff;
725
1472
  IO_CACHE_CALLBACK pre_read,post_read;
726
1473
  if ((pre_read = info->pre_read))
727
1474
    (*pre_read)(info);
729
1476
    return my_b_EOF;
730
1477
  if ((post_read = info->post_read))
731
1478
    (*post_read)(info);
732
 
  return (int) (unsigned char) buff;
 
1479
  return (int) (uchar) buff;
733
1480
}
734
1481
 
735
 
/**
736
 
 * @brief
737
 
 *   Write a byte buffer to st_io_cache and flush to disk if st_io_cache is full.
738
 
 *
739
 
 * @retval -1 On error; errno contains error code.
740
 
 * @retval 0 On success
741
 
 * @retval 1 On error on write
742
 
 */
743
 
int _my_b_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count)
 
1482
/* 
 
1483
   Write a byte buffer to IO_CACHE and flush to disk
 
1484
   if IO_CACHE is full.
 
1485
 
 
1486
   RETURN VALUE
 
1487
    1 On error on write
 
1488
    0 On success
 
1489
   -1 On error; my_errno contains error code.
 
1490
*/
 
1491
 
 
1492
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
744
1493
{
745
 
  size_t rest_length,length_local;
 
1494
  size_t rest_length,length;
746
1495
 
747
1496
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
748
1497
  {
749
 
    errno=EFBIG;
 
1498
    my_errno=errno=EFBIG;
750
1499
    return info->error = -1;
751
1500
  }
752
1501
 
760
1509
    return 1;
761
1510
  if (Count >= IO_SIZE)
762
1511
  {                                     /* Fill first intern buffer */
763
 
    length_local=Count & (size_t) ~(IO_SIZE-1);
 
1512
    length=Count & (size_t) ~(IO_SIZE-1);
764
1513
    if (info->seek_not_done)
765
1514
    {
766
1515
      /*
767
 
        Whenever a function which operates on st_io_cache flushes/writes
768
 
        some part of the st_io_cache to disk it will set the property
 
1516
        Whenever a function which operates on IO_CACHE flushes/writes
 
1517
        some part of the IO_CACHE to disk it will set the property
769
1518
        "seek_not_done" to indicate this to other functions operating
770
 
        on the st_io_cache.
 
1519
        on the IO_CACHE.
771
1520
      */
772
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1521
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
773
1522
      {
774
1523
        info->error= -1;
775
1524
        return (1);
776
1525
      }
777
1526
      info->seek_not_done=0;
778
1527
    }
779
 
    if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
780
 
      return info->error= -1;
781
 
 
782
 
    Count-=length_local;
783
 
    Buffer+=length_local;
784
 
    info->pos_in_file+=length_local;
785
 
  }
786
 
  memcpy(info->write_pos,Buffer,(size_t) Count);
787
 
  info->write_pos+=Count;
788
 
  return 0;
789
 
}
790
 
 
791
 
/**
792
 
 * @brief
793
 
 *   Write a block to disk where part of the data may be inside the record buffer.
794
 
 *   As all write calls to the data goes through the cache,
795
 
 *   we will never get a seek over the end of the buffer.
796
 
 */
797
 
int my_block_write(register st_io_cache *info, const unsigned char *Buffer, size_t Count,
 
1528
    if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
 
1529
      return info->error= -1;
 
1530
 
 
1531
#ifdef THREAD
 
1532
    /*
 
1533
      In case of a shared I/O cache with a writer we normally do direct
 
1534
      write cache to read cache copy. Simulate this here by direct
 
1535
      caller buffer to read cache copy. Do it after the write so that
 
1536
      the cache readers actions on the flushed part can go in parallel
 
1537
      with the write of the extra stuff. copy_to_read_buffer()
 
1538
      synchronizes writer and readers so that after this call the
 
1539
      readers can act on the extra stuff while the writer can go ahead
 
1540
      and prepare the next output. copy_to_read_buffer() relies on
 
1541
      info->pos_in_file.
 
1542
    */
 
1543
    if (info->share)
 
1544
      copy_to_read_buffer(info, Buffer, length);
 
1545
#endif
 
1546
 
 
1547
    Count-=length;
 
1548
    Buffer+=length;
 
1549
    info->pos_in_file+=length;
 
1550
  }
 
1551
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1552
  info->write_pos+=Count;
 
1553
  return 0;
 
1554
}
 
1555
 
 
1556
 
 
1557
/*
 
1558
  Append a block to the write buffer.
 
1559
  This is done with the buffer locked to ensure that we don't read from
 
1560
  the write buffer before we are ready with it.
 
1561
*/
 
1562
 
 
1563
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1564
{
 
1565
  size_t rest_length,length;
 
1566
 
 
1567
#ifdef THREAD
 
1568
  /*
 
1569
    Assert that we cannot come here with a shared cache. If we do one
 
1570
    day, we might need to add a call to copy_to_read_buffer().
 
1571
  */
 
1572
  DBUG_ASSERT(!info->share);
 
1573
#endif
 
1574
 
 
1575
  lock_append_buffer(info);
 
1576
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1577
  if (Count <= rest_length)
 
1578
    goto end;
 
1579
  memcpy(info->write_pos, Buffer, rest_length);
 
1580
  Buffer+=rest_length;
 
1581
  Count-=rest_length;
 
1582
  info->write_pos+=rest_length;
 
1583
  if (my_b_flush_io_cache(info,0))
 
1584
  {
 
1585
    unlock_append_buffer(info);
 
1586
    return 1;
 
1587
  }
 
1588
  if (Count >= IO_SIZE)
 
1589
  {                                     /* Fill first intern buffer */
 
1590
    length=Count & (size_t) ~(IO_SIZE-1);
 
1591
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1592
    {
 
1593
      unlock_append_buffer(info);
 
1594
      return info->error= -1;
 
1595
    }
 
1596
    Count-=length;
 
1597
    Buffer+=length;
 
1598
    info->end_of_file+=length;
 
1599
  }
 
1600
 
 
1601
end:
 
1602
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1603
  info->write_pos+=Count;
 
1604
  unlock_append_buffer(info);
 
1605
  return 0;
 
1606
}
 
1607
 
 
1608
 
 
1609
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1610
{
 
1611
  /*
 
1612
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1613
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1614
    several layers of ternary operators that evaluated comma(,) operator
 
1615
    expressions inside - I do have a test case if somebody wants it
 
1616
  */
 
1617
  if (info->type == SEQ_READ_APPEND)
 
1618
    return my_b_append(info, Buffer, Count);
 
1619
  return my_b_write(info, Buffer, Count);
 
1620
}
 
1621
 
 
1622
 
 
1623
/*
 
1624
  Write a block to disk where part of the data may be inside the record
 
1625
  buffer.  As all write calls to the data goes through the cache,
 
1626
  we will never get a seek over the end of the buffer
 
1627
*/
 
1628
 
 
1629
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
798
1630
                   my_off_t pos)
799
1631
{
800
 
  size_t length_local;
 
1632
  size_t length;
801
1633
  int error=0;
802
1634
 
 
1635
#ifdef THREAD
 
1636
  /*
 
1637
    Assert that we cannot come here with a shared cache. If we do one
 
1638
    day, we might need to add a call to copy_to_read_buffer().
 
1639
  */
 
1640
  DBUG_ASSERT(!info->share);
 
1641
#endif
 
1642
 
803
1643
  if (pos < info->pos_in_file)
804
1644
  {
805
1645
    /* Of no overlap, write everything without buffering */
806
1646
    if (pos + Count <= info->pos_in_file)
807
 
      return (pwrite(info->file, Buffer, Count, pos) == 0);
 
1647
      return my_pwrite(info->file, Buffer, Count, pos,
 
1648
                       info->myflags | MY_NABP);
808
1649
    /* Write the part of the block that is before buffer */
809
 
    length_local= (uint32_t) (info->pos_in_file - pos);
810
 
    if (pwrite(info->file, Buffer, length_local, pos) == 0)
 
1650
    length= (uint) (info->pos_in_file - pos);
 
1651
    if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
811
1652
      info->error= error= -1;
812
 
    Buffer+=length_local;
813
 
    pos+=  length_local;
814
 
    Count-= length_local;
 
1653
    Buffer+=length;
 
1654
    pos+=  length;
 
1655
    Count-= length;
 
1656
#ifndef HAVE_PREAD
 
1657
    info->seek_not_done=1;
 
1658
#endif
815
1659
  }
816
1660
 
817
1661
  /* Check if we want to write inside the used part of the buffer.*/
818
 
  length_local= (size_t) (info->write_end - info->buffer);
819
 
  if (pos < info->pos_in_file + length_local)
 
1662
  length= (size_t) (info->write_end - info->buffer);
 
1663
  if (pos < info->pos_in_file + length)
820
1664
  {
821
1665
    size_t offset= (size_t) (pos - info->pos_in_file);
822
 
    length_local-=offset;
823
 
    if (length_local > Count)
824
 
      length_local=Count;
825
 
    memcpy(info->buffer+offset, Buffer, length_local);
826
 
    Buffer+=length_local;
827
 
    Count-= length_local;
828
 
    /* Fix length_local of buffer if the new data was larger */
829
 
    if (info->buffer+length_local > info->write_pos)
830
 
      info->write_pos=info->buffer+length_local;
 
1666
    length-=offset;
 
1667
    if (length > Count)
 
1668
      length=Count;
 
1669
    memcpy(info->buffer+offset, Buffer, length);
 
1670
    Buffer+=length;
 
1671
    Count-= length;
 
1672
    /* Fix length of buffer if the new data was larger */
 
1673
    if (info->buffer+length > info->write_pos)
 
1674
      info->write_pos=info->buffer+length;
831
1675
    if (!Count)
832
1676
      return (error);
833
1677
  }
837
1681
  return error;
838
1682
}
839
1683
 
840
 
/**
841
 
 * @brief
842
 
 *   Flush write cache 
843
 
 */
844
 
int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
 
1684
 
 
1685
        /* Flush write cache */
 
1686
 
 
1687
#ifdef THREAD
 
1688
#define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1689
  lock_append_buffer(info);
 
1690
#define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
 
1691
  unlock_append_buffer(info);
 
1692
#else
 
1693
#define LOCK_APPEND_BUFFER
 
1694
#define UNLOCK_APPEND_BUFFER
 
1695
#endif
 
1696
 
 
1697
 
 
1698
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
845
1699
{
846
 
  size_t length_local;
847
 
  bool append_cache= false;
848
 
  my_off_t pos_in_file_local;
 
1700
  size_t length;
 
1701
  my_bool append_cache;
 
1702
  my_off_t pos_in_file;
 
1703
  DBUG_ENTER("my_b_flush_io_cache");
 
1704
  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
 
1705
 
 
1706
  if (!(append_cache = (info->type == SEQ_READ_APPEND)))
 
1707
    need_append_buffer_lock=0;
849
1708
 
850
1709
  if (info->type == WRITE_CACHE || append_cache)
851
1710
  {
852
1711
    if (info->file == -1)
853
1712
    {
854
 
      if (info->real_open_cached_file())
855
 
        return((info->error= -1));
 
1713
      if (real_open_cached_file(info))
 
1714
        DBUG_RETURN((info->error= -1));
856
1715
    }
857
 
    lock_append_buffer(info, need_append_buffer_lock);
 
1716
    LOCK_APPEND_BUFFER;
858
1717
 
859
 
    if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
 
1718
    if ((length=(size_t) (info->write_pos - info->write_buffer)))
860
1719
    {
861
 
      pos_in_file_local=info->pos_in_file;
 
1720
#ifdef THREAD
 
1721
      /*
 
1722
        In case of a shared I/O cache with a writer we do direct write
 
1723
        cache to read cache copy. Do it before the write here so that
 
1724
        the readers can work in parallel with the write.
 
1725
        copy_to_read_buffer() relies on info->pos_in_file.
 
1726
      */
 
1727
      if (info->share)
 
1728
        copy_to_read_buffer(info, info->write_buffer, length);
 
1729
#endif
 
1730
 
 
1731
      pos_in_file=info->pos_in_file;
862
1732
      /*
863
1733
        If we have append cache, we always open the file with
864
1734
        O_APPEND which moves the pos to EOF automatically on every write
865
1735
      */
866
1736
      if (!append_cache && info->seek_not_done)
867
1737
      {                                 /* File touched, do seek */
868
 
        if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
 
1738
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1739
            MY_FILEPOS_ERROR)
869
1740
        {
870
 
          unlock_append_buffer(info, need_append_buffer_lock);
871
 
          return((info->error= -1));
 
1741
          UNLOCK_APPEND_BUFFER;
 
1742
          DBUG_RETURN((info->error= -1));
872
1743
        }
873
1744
        if (!append_cache)
874
1745
          info->seek_not_done=0;
875
1746
      }
876
1747
      if (!append_cache)
877
 
        info->pos_in_file+=length_local;
 
1748
        info->pos_in_file+=length;
878
1749
      info->write_end= (info->write_buffer+info->buffer_length-
879
 
                        ((pos_in_file_local+length_local) & (IO_SIZE-1)));
 
1750
                        ((pos_in_file+length) & (IO_SIZE-1)));
880
1751
 
881
 
      if (my_write(info->file,info->write_buffer,length_local,
 
1752
      if (my_write(info->file,info->write_buffer,length,
882
1753
                   info->myflags | MY_NABP))
883
1754
        info->error= -1;
884
1755
      else
885
1756
        info->error= 0;
886
1757
      if (!append_cache)
887
1758
      {
888
 
        set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
 
1759
        set_if_bigger(info->end_of_file,(pos_in_file+length));
889
1760
      }
890
1761
      else
891
1762
      {
892
1763
        info->end_of_file+=(info->write_pos-info->append_read_pos);
893
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
894
 
        assert(info->end_of_file == tell_ret);
 
1764
        DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
895
1765
      }
896
1766
 
897
1767
      info->append_read_pos=info->write_pos=info->write_buffer;
898
 
      unlock_append_buffer(info, need_append_buffer_lock);
899
 
      return(info->error);
 
1768
      ++info->disk_writes;
 
1769
      UNLOCK_APPEND_BUFFER;
 
1770
      DBUG_RETURN(info->error);
900
1771
    }
901
1772
  }
902
1773
#ifdef HAVE_AIOWAIT
906
1777
    info->inited=0;
907
1778
  }
908
1779
#endif
909
 
  unlock_append_buffer(info, need_append_buffer_lock);
910
 
  return(0);
 
1780
  UNLOCK_APPEND_BUFFER;
 
1781
  DBUG_RETURN(0);
911
1782
}
912
1783
 
913
 
/**
914
 
 * @brief
915
 
 *   Free an st_io_cache object
916
 
 * 
917
 
 * @detail
918
 
 *   It's currently safe to call this if one has called init_io_cache()
919
 
 *   on the 'info' object, even if init_io_cache() failed.
920
 
 *   This function is also safe to call twice with the same handle.
921
 
 * 
922
 
 * @param info st_io_cache Handle to free
923
 
 * 
924
 
 * @retval 0 ok
925
 
 * @retval # Error
926
 
 */
927
 
int st_io_cache::end_io_cache()
 
1784
/*
 
1785
  Free an IO_CACHE object
 
1786
 
 
1787
  SYNOPSOS
 
1788
    end_io_cache()
 
1789
    info                IO_CACHE Handle to free
 
1790
 
 
1791
  NOTES
 
1792
    It's currently safe to call this if one has called init_io_cache()
 
1793
    on the 'info' object, even if init_io_cache() failed.
 
1794
    This function is also safe to call twice with the same handle.
 
1795
 
 
1796
  RETURN
 
1797
   0  ok
 
1798
   #  Error
 
1799
*/
 
1800
 
 
1801
int end_io_cache(IO_CACHE *info)
928
1802
{
929
 
  int _error=0;
930
 
 
931
 
  if (pre_close)
932
 
  {
933
 
    (*pre_close)(this);
934
 
    pre_close= 0;
935
 
  }
936
 
  if (alloced_buffer)
937
 
  {
938
 
    if (type == READ_CACHE)
939
 
      global_read_buffer.sub(buffer_length);
940
 
    alloced_buffer=0;
941
 
    if (file != -1)                     /* File doesn't exist */
942
 
      _error= my_b_flush_io_cache(this, 1);
943
 
    free((unsigned char*) buffer);
944
 
    buffer=read_pos=(unsigned char*) 0;
945
 
  }
946
 
 
947
 
  return _error;
 
1803
  int error=0;
 
1804
  IO_CACHE_CALLBACK pre_close;
 
1805
  DBUG_ENTER("end_io_cache");
 
1806
  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
 
1807
 
 
1808
#ifdef THREAD
 
1809
  /*
 
1810
    Every thread must call remove_io_thread(). The last one destroys
 
1811
    the share elements.
 
1812
  */
 
1813
  DBUG_ASSERT(!info->share || !info->share->total_threads);
 
1814
#endif
 
1815
 
 
1816
  if ((pre_close=info->pre_close))
 
1817
  {
 
1818
    (*pre_close)(info);
 
1819
    info->pre_close= 0;
 
1820
  }
 
1821
  if (info->alloced_buffer)
 
1822
  {
 
1823
    info->alloced_buffer=0;
 
1824
    if (info->file != -1)                       /* File doesn't exist */
 
1825
      error= my_b_flush_io_cache(info,1);
 
1826
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1827
    info->buffer=info->read_pos=(uchar*) 0;
 
1828
  }
 
1829
  if (info->type == SEQ_READ_APPEND)
 
1830
  {
 
1831
    /* Destroy allocated mutex */
 
1832
    info->type= TYPE_NOT_SET;
 
1833
#ifdef THREAD
 
1834
    pthread_mutex_destroy(&info->append_buffer_lock);
 
1835
#endif
 
1836
  }
 
1837
  DBUG_RETURN(error);
948
1838
} /* end_io_cache */
949
1839
 
950
 
} /* namespace internal */
951
 
} /* namespace drizzled */
 
1840
 
 
1841
/**********************************************************************
 
1842
 Testing of MF_IOCACHE
 
1843
**********************************************************************/
 
1844
 
 
1845
#ifdef MAIN
 
1846
 
 
1847
#include <my_dir.h>
 
1848
 
 
1849
void die(const char* fmt, ...)
 
1850
{
 
1851
  va_list va_args;
 
1852
  va_start(va_args,fmt);
 
1853
  fprintf(stderr,"Error:");
 
1854
  vfprintf(stderr, fmt,va_args);
 
1855
  fprintf(stderr,", errno=%d\n", errno);
 
1856
  exit(1);
 
1857
}
 
1858
 
 
1859
int open_file(const char* fname, IO_CACHE* info, int cache_size)
 
1860
{
 
1861
  int fd;
 
1862
  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
 
1863
    die("Could not open %s", fname);
 
1864
  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
 
1865
    die("failed in init_io_cache()");
 
1866
  return fd;
 
1867
}
 
1868
 
 
1869
void close_file(IO_CACHE* info)
 
1870
{
 
1871
  end_io_cache(info);
 
1872
  my_close(info->file, MYF(MY_WME));
 
1873
}
 
1874
 
 
1875
int main(int argc, char** argv)
 
1876
{
 
1877
  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
 
1878
  MY_STAT status;
 
1879
  const char* fname="/tmp/iocache.test";
 
1880
  int cache_size=16384;
 
1881
  char llstr_buf[22];
 
1882
  int max_block,total_bytes=0;
 
1883
  int i,num_loops=100,error=0;
 
1884
  char *p;
 
1885
  char* block, *block_end;
 
1886
  MY_INIT(argv[0]);
 
1887
  max_block = cache_size*3;
 
1888
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1889
    die("Not enough memory to allocate test block");
 
1890
  block_end = block + max_block;
 
1891
  for (p = block,i=0; p < block_end;i++)
 
1892
  {
 
1893
    *p++ = (char)i;
 
1894
  }
 
1895
  if (my_stat(fname,&status, MYF(0)) &&
 
1896
      my_delete(fname,MYF(MY_WME)))
 
1897
    {
 
1898
      die("Delete of %s failed, aborting", fname);
 
1899
    }
 
1900
  open_file(fname,&sra_cache, cache_size);
 
1901
  for (i = 0; i < num_loops; i++)
 
1902
  {
 
1903
    char buf[4];
 
1904
    int block_size = abs(rand() % max_block);
 
1905
    int4store(buf, block_size);
 
1906
    if (my_b_append(&sra_cache,buf,4) ||
 
1907
        my_b_append(&sra_cache, block, block_size))
 
1908
      die("write failed");
 
1909
    total_bytes += 4+block_size;
 
1910
  }
 
1911
  close_file(&sra_cache);
 
1912
  my_free(block,MYF(MY_WME));
 
1913
  if (!my_stat(fname,&status,MYF(MY_WME)))
 
1914
    die("%s failed to stat, but I had just closed it,\
 
1915
 wonder how that happened");
 
1916
  printf("Final size of %s is %s, wrote %d bytes\n",fname,
 
1917
         llstr(status.st_size,llstr_buf),
 
1918
         total_bytes);
 
1919
  my_delete(fname, MYF(MY_WME));
 
1920
  /* check correctness of tests */
 
1921
  if (total_bytes != status.st_size)
 
1922
  {
 
1923
    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
 
1924
supposedly written\n");
 
1925
    error=1;
 
1926
  }
 
1927
  exit(error);
 
1928
  return 0;
 
1929
}
 
1930
#endif