~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Brian Aker
  • Date: 2009-01-21 05:53:36 UTC
  • mto: This revision was merged to the branch mainline in revision 801.
  • Revision ID: brian@tangent.org-20090121055336-fxoz6wfzreo8gi9x
Removed purge

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#define MAP_TO_USE_RAID
51
50
#include "mysys_priv.h"
52
51
#include <mystrings/m_string.h>
53
52
#ifdef HAVE_AIOWAIT
54
53
#include "mysys_err.h"
 
54
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
 
57
#include <mysys/iocache.h>
57
58
#include <errno.h>
 
59
#include <drizzled/util/test.h>
 
60
#include <stdlib.h>
58
61
 
59
62
#define lock_append_buffer(info) \
60
63
 pthread_mutex_lock(&(info)->append_buffer_lock)
150
153
                  bool use_async_io, myf cache_myflags)
151
154
{
152
155
  size_t min_cache;
153
 
  my_off_t pos;
 
156
  off_t pos;
154
157
  my_off_t end_of_file= ~(my_off_t) 0;
155
158
 
156
159
  info->file= file;
164
167
 
165
168
  if (file >= 0)
166
169
  {
167
 
    pos= my_tell(file, MYF(0));
168
 
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
 
170
    pos= lseek(file, 0, SEEK_CUR);
 
171
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
169
172
    {
170
173
      /*
171
174
         This kind of object doesn't support seek() or tell(). Don't set a
180
183
      assert(seek_offset == 0);
181
184
    }
182
185
    else
183
 
      info->seek_not_done= test(seek_offset != pos);
 
186
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
184
187
  }
185
188
 
186
189
  info->disk_writes= 0;
194
197
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
195
198
    {
196
199
      /* Calculate end of file to avoid allocating oversized buffers */
197
 
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
 
200
      end_of_file=lseek(file,0L,SEEK_END);
198
201
      /* Need to reset seek_not_done now that we just did a seek. */
199
202
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200
203
      if (end_of_file < seek_offset)
221
224
      if (type == SEQ_READ_APPEND)
222
225
        buffer_block *= 2;
223
226
      if ((info->buffer=
224
 
           (uchar*) my_malloc(buffer_block,
225
 
                             MYF((cache_myflags & ~ MY_WME) |
226
 
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
227
           (unsigned char*) malloc(buffer_block)) != 0)
227
228
      {
228
229
        info->write_buffer=info->buffer;
229
230
        if (type == SEQ_READ_APPEND)
313
314
 
314
315
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
315
316
                        my_off_t seek_offset,
316
 
                        bool use_async_io __attribute__((unused)),
 
317
                        bool use_async_io,
317
318
                        bool clear_cache)
318
319
{
319
320
  /* One can't do reinit with the following types */
327
328
      seek_offset <= my_b_tell(info))
328
329
  {
329
330
    /* Reuse current buffer without flushing it to disk */
330
 
    uchar *pos;
 
331
    unsigned char *pos;
331
332
    if (info->type == WRITE_CACHE && type == READ_CACHE)
332
333
    {
333
334
      info->read_end=info->write_pos;
395
396
    info->read_function=_my_b_async_read;
396
397
  }
397
398
  info->inited=0;
 
399
#else
 
400
  (void)use_async_io;
398
401
#endif
399
402
  return(0);
400
403
} /* reinit_io_cache */
429
432
    1      Error: can't read requested characters
430
433
*/
431
434
 
432
 
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
435
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
433
436
{
434
437
  size_t length,diff_length,left_length, max_length;
435
438
  my_off_t pos_in_file;
445
448
  /* pos_in_file always point on where info->buffer was read */
446
449
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
447
450
 
448
 
  /* 
 
451
  /*
449
452
    Whenever a function which operates on IO_CACHE flushes/writes
450
453
    some part of the IO_CACHE to disk it will set the property
451
454
    "seek_not_done" to indicate this to other functions operating
453
456
  */
454
457
  if (info->seek_not_done)
455
458
  {
456
 
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
457
 
        != MY_FILEPOS_ERROR))
 
459
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
458
460
    {
459
461
      /* No error, reset seek_not_done flag. */
460
462
      info->seek_not_done= 0;
597
599
*/
598
600
 
599
601
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
600
 
                         IO_CACHE *write_cache, uint num_threads)
 
602
                         IO_CACHE *write_cache, uint32_t num_threads)
601
603
{
602
604
  assert(num_threads > 1);
603
605
  assert(read_cache->type == READ_CACHE);
649
651
void remove_io_thread(IO_CACHE *cache)
650
652
{
651
653
  IO_CACHE_SHARE *cshare= cache->share;
652
 
  uint total;
 
654
  uint32_t total;
653
655
 
654
656
  /* If the writer goes, it needs to flush the write cache. */
655
657
  if (cache == cshare->source_cache)
890
892
    1      Error: can't read requested characters
891
893
*/
892
894
 
893
 
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
895
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
894
896
{
895
897
  my_off_t pos_in_file;
896
898
  size_t length, diff_length, left_length;
942
944
        */
943
945
        if (cache->seek_not_done)
944
946
        {
945
 
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
946
 
              == MY_FILEPOS_ERROR)
 
947
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
947
948
          {
948
949
            cache->error= -1;
949
950
            unlock_io_cache(cache);
1013
1014
*/
1014
1015
 
1015
1016
static void copy_to_read_buffer(IO_CACHE *write_cache,
1016
 
                                const uchar *write_buffer, size_t write_length)
 
1017
                                const unsigned char *write_buffer, size_t write_length)
1017
1018
{
1018
1019
  IO_CACHE_SHARE *cshare= write_cache->share;
1019
1020
 
1024
1025
  */
1025
1026
  while (write_length)
1026
1027
  {
1027
 
    size_t copy_length= min(write_length, write_cache->buffer_length);
1028
 
    int  __attribute__((unused)) rc;
 
1028
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1029
    int  rc;
1029
1030
 
1030
1031
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1031
1032
    /* The writing thread does always have the lock when it awakes. */
1048
1049
 
1049
1050
/*
1050
1051
  Do sequential read from the SEQ_READ_APPEND cache.
1051
 
  
 
1052
 
1052
1053
  We do this in three stages:
1053
1054
   - first read from info->buffer
1054
1055
   - then if there are still data to read, try the file descriptor
1059
1060
    1  Failed to read
1060
1061
*/
1061
1062
 
1062
 
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1063
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1063
1064
{
1064
1065
  size_t length, diff_length, left_length, save_count, max_length;
1065
1066
  my_off_t pos_in_file;
1084
1085
    With read-append cache we must always do a seek before we read,
1085
1086
    because the write could have moved the file pointer astray
1086
1087
  */
1087
 
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1088
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1088
1089
  {
1089
1090
   info->error= -1;
1090
1091
   unlock_append_buffer(info);
1182
1183
      TODO: figure out if the assert below is needed or correct.
1183
1184
    */
1184
1185
    assert(pos_in_file == info->end_of_file);
1185
 
    copy_len=min(Count, len_in_buff);
 
1186
    copy_len=cmin(Count, len_in_buff);
1186
1187
    memcpy(Buffer, info->append_read_pos, copy_len);
1187
1188
    info->append_read_pos += copy_len;
1188
1189
    Count -= copy_len;
1221
1222
     1          An error has occurred; IO_CACHE to error state.
1222
1223
*/
1223
1224
 
1224
 
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1225
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1225
1226
{
1226
1227
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1227
1228
  size_t max_length;
1228
1229
  my_off_t next_pos_in_file;
1229
 
  uchar *read_buffer;
 
1230
  unsigned char *read_buffer;
1230
1231
 
1231
1232
  memcpy(Buffer,info->read_pos,
1232
1233
         (left_length= (size_t) (info->read_end-info->read_pos)));
1284
1285
      }
1285
1286
    }
1286
1287
        /* Copy found bytes to buffer */
1287
 
    length=min(Count,read_length);
 
1288
    length=cmin(Count,read_length);
1288
1289
    memcpy(Buffer,info->read_pos,(size_t) length);
1289
1290
    Buffer+=length;
1290
1291
    Count-=length;
1304
1305
      info->error=(int) (read_length+left_length);
1305
1306
      return 1;
1306
1307
    }
1307
 
    
1308
 
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1309
 
        == MY_FILEPOS_ERROR)
 
1308
 
 
1309
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1310
1310
    {
1311
1311
      info->error= -1;
1312
1312
      return (1);
1318
1318
      if ((read_length=my_read(info->file,info->request_pos,
1319
1319
                               read_length, info->myflags)) == (size_t) -1)
1320
1320
        return info->error= -1;
1321
 
      use_length=min(Count,read_length);
 
1321
      use_length=cmin(Count,read_length);
1322
1322
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1323
1323
      info->read_pos=info->request_pos+Count;
1324
1324
      info->read_end=info->request_pos+read_length;
1362
1362
  {
1363
1363
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
1364
1364
    if (aioread(info->file,read_buffer, max_length,
1365
 
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
 
1365
                (my_off_t) next_pos_in_file,SEEK_SET,
1366
1366
                &info->aio_result.result))
1367
1367
    {                                           /* Skip async io */
1368
1368
      my_errno=errno;
1369
1369
      if (info->request_pos != info->buffer)
1370
1370
      {
1371
 
        memcpy(info->buffer, info->request_pos,
1372
 
               (size_t) (info->read_end - info->read_pos));
 
1371
        memmove(info->buffer, info->request_pos,
 
1372
                (size_t) (info->read_end - info->read_pos));
1373
1373
        info->request_pos=info->buffer;
1374
1374
        info->read_pos-=info->read_length;
1375
1375
        info->read_end-=info->read_length;
1389
1389
 
1390
1390
int _my_b_get(IO_CACHE *info)
1391
1391
{
1392
 
  uchar buff;
 
1392
  unsigned char buff;
1393
1393
  IO_CACHE_CALLBACK pre_read,post_read;
1394
1394
  if ((pre_read = info->pre_read))
1395
1395
    (*pre_read)(info);
1397
1397
    return my_b_EOF;
1398
1398
  if ((post_read = info->post_read))
1399
1399
    (*post_read)(info);
1400
 
  return (int) (uchar) buff;
 
1400
  return (int) (unsigned char) buff;
1401
1401
}
1402
1402
 
1403
 
/* 
 
1403
/*
1404
1404
   Write a byte buffer to IO_CACHE and flush to disk
1405
1405
   if IO_CACHE is full.
1406
1406
 
1410
1410
   -1 On error; my_errno contains error code.
1411
1411
*/
1412
1412
 
1413
 
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1413
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1414
1414
{
1415
1415
  size_t rest_length,length;
1416
1416
 
1439
1439
        "seek_not_done" to indicate this to other functions operating
1440
1440
        on the IO_CACHE.
1441
1441
      */
1442
 
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
 
1442
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
1443
1443
      {
1444
1444
        info->error= -1;
1445
1445
        return (1);
1479
1479
  the write buffer before we are ready with it.
1480
1480
*/
1481
1481
 
1482
 
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1482
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1483
1483
{
1484
1484
  size_t rest_length,length;
1485
1485
 
1523
1523
}
1524
1524
 
1525
1525
 
1526
 
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1526
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1527
1527
{
1528
1528
  /*
1529
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
1530
 
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with
1531
1531
    several layers of ternary operators that evaluated comma(,) operator
1532
1532
    expressions inside - I do have a test case if somebody wants it
1533
1533
  */
1543
1543
  we will never get a seek over the end of the buffer
1544
1544
*/
1545
1545
 
1546
 
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
 
1546
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
1547
1547
                   my_off_t pos)
1548
1548
{
1549
1549
  size_t length;
1639
1639
      */
1640
1640
      if (!append_cache && info->seek_not_done)
1641
1641
      {                                 /* File touched, do seek */
1642
 
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1643
 
            MY_FILEPOS_ERROR)
 
1642
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1644
1643
        {
1645
1644
          UNLOCK_APPEND_BUFFER;
1646
1645
          return((info->error= -1));
1665
1664
      else
1666
1665
      {
1667
1666
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1668
 
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
 
1667
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
 
1668
        assert(info->end_of_file == tell_ret);
1669
1669
      }
1670
1670
 
1671
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
1723
1723
    info->alloced_buffer=0;
1724
1724
    if (info->file != -1)                       /* File doesn't exist */
1725
1725
      error= my_b_flush_io_cache(info,1);
1726
 
    my_free((uchar*) info->buffer,MYF(MY_WME));
1727
 
    info->buffer=info->read_pos=(uchar*) 0;
 
1726
    free((unsigned char*) info->buffer);
 
1727
    info->buffer=info->read_pos=(unsigned char*) 0;
1728
1728
  }
1729
1729
  if (info->type == SEQ_READ_APPEND)
1730
1730
  {
1783
1783
  char* block, *block_end;
1784
1784
  MY_INIT(argv[0]);
1785
1785
  max_block = cache_size*3;
1786
 
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1786
  if (!(block=(char*)malloc(max_block)))
1787
1787
    die("Not enough memory to allocate test block");
1788
1788
  block_end = block + max_block;
1789
1789
  for (p = block,i=0; p < block_end;i++)
1807
1807
    total_bytes += 4+block_size;
1808
1808
  }
1809
1809
  close_file(&sra_cache);
1810
 
  my_free(block,MYF(MY_WME));
 
1810
  free(block);
1811
1811
  if (!my_stat(fname,&status,MYF(MY_WME)))
1812
1812
    die("%s failed to stat, but I had just closed it,\
1813
1813
 wonder how that happened");