~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Monty Taylor
  • Date: 2008-11-16 23:47:43 UTC
  • mto: (584.1.10 devel)
  • mto: This revision was merged to the branch mainline in revision 589.
  • Revision ID: monty@inaugust.com-20081116234743-c38gmv0pa2kdefaj
Broke out cached_item.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "config.h"
51
 
 
52
 
#include "drizzled/internal/my_sys.h"
53
 
#include "drizzled/internal/m_string.h"
 
50
#include "mysys_priv.h"
 
51
#include <mystrings/m_string.h>
54
52
#ifdef HAVE_AIOWAIT
55
 
#include "drizzled/error.h"
56
 
#include "drizzled/internal/aio_result.h"
 
53
#include "mysys_err.h"
 
54
#include <mysys/aio_result.h>
57
55
static void my_aiowait(my_aio_result *result);
58
56
#endif
59
 
#include "drizzled/internal/iocache.h"
 
57
#include <mysys/iocache.h>
60
58
#include <errno.h>
61
59
#include <drizzled/util/test.h>
62
 
#include <stdlib.h>
63
 
#include <algorithm>
64
 
 
65
 
using namespace std;
66
 
 
67
 
namespace drizzled
68
 
{
69
 
namespace internal
70
 
{
71
 
 
72
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
73
 
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
74
 
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
75
 
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
76
 
 
77
60
#define lock_append_buffer(info) \
78
61
 pthread_mutex_lock(&(info)->append_buffer_lock)
79
62
#define unlock_append_buffer(info) \
163
146
    #  error
164
147
*/
165
148
 
166
 
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
 
149
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
167
150
                  enum cache_type type, my_off_t seek_offset,
168
151
                  bool use_async_io, myf cache_myflags)
169
152
{
170
153
  size_t min_cache;
171
 
  off_t pos;
 
154
  my_off_t pos;
172
155
  my_off_t end_of_file= ~(my_off_t) 0;
173
156
 
174
157
  info->file= file;
182
165
 
183
166
  if (file >= 0)
184
167
  {
185
 
    pos= lseek(file, 0, SEEK_CUR);
186
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
168
    pos= my_tell(file, MYF(0));
 
169
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
187
170
    {
188
171
      /*
189
172
         This kind of object doesn't support seek() or tell(). Don't set a
198
181
      assert(seek_offset == 0);
199
182
    }
200
183
    else
201
 
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
 
184
      info->seek_not_done= test(seek_offset != pos);
202
185
  }
203
186
 
 
187
  info->disk_writes= 0;
204
188
  info->share=0;
205
189
 
206
190
  if (!cachesize && !(cachesize= my_default_record_cache_size))
211
195
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
212
196
    {
213
197
      /* Calculate end of file to avoid allocating oversized buffers */
214
 
      end_of_file=lseek(file,0L,SEEK_END);
 
198
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
215
199
      /* Need to reset seek_not_done now that we just did a seek. */
216
200
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
217
201
      if (end_of_file < seek_offset)
238
222
      if (type == SEQ_READ_APPEND)
239
223
        buffer_block *= 2;
240
224
      if ((info->buffer=
241
 
           (unsigned char*) malloc(buffer_block)) != 0)
 
225
           (unsigned char*) my_malloc(buffer_block,
 
226
                             MYF((cache_myflags & ~ MY_WME) |
 
227
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
242
228
      {
243
229
        info->write_buffer=info->buffer;
244
230
        if (type == SEQ_READ_APPEND)
262
248
    info->write_end = info->write_buffer + info->buffer_length;
263
249
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
264
250
  }
 
251
#if defined(SAFE_MUTEX)
 
252
  else
 
253
  {
 
254
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
255
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
256
  }
 
257
#endif
265
258
 
266
259
  if (type == WRITE_CACHE)
267
260
    info->write_end=
321
314
 
322
315
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
323
316
                        my_off_t seek_offset,
324
 
                        bool use_async_io,
 
317
                        bool use_async_io __attribute__((unused)),
325
318
                        bool clear_cache)
326
319
{
327
320
  /* One can't do reinit with the following types */
403
396
    info->read_function=_my_b_async_read;
404
397
  }
405
398
  info->inited=0;
406
 
#else
407
 
  (void)use_async_io;
408
399
#endif
409
400
  return(0);
410
401
} /* reinit_io_cache */
439
430
    1      Error: can't read requested characters
440
431
*/
441
432
 
442
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
433
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
443
434
{
444
435
  size_t length,diff_length,left_length, max_length;
445
436
  my_off_t pos_in_file;
455
446
  /* pos_in_file always point on where info->buffer was read */
456
447
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
457
448
 
458
 
  /*
 
449
  /* 
459
450
    Whenever a function which operates on IO_CACHE flushes/writes
460
451
    some part of the IO_CACHE to disk it will set the property
461
452
    "seek_not_done" to indicate this to other functions operating
463
454
  */
464
455
  if (info->seek_not_done)
465
456
  {
466
 
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
 
457
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
458
        != MY_FILEPOS_ERROR))
467
459
    {
468
460
      /* No error, reset seek_not_done flag. */
469
461
      info->seek_not_done= 0;
475
467
        info->file is a pipe or socket or FIFO.  We never should have tried
476
468
        to seek on that.  See Bugs#25807 and #22828 for more info.
477
469
      */
478
 
      assert(errno != ESPIPE);
 
470
      assert(my_errno != ESPIPE);
479
471
      info->error= -1;
480
472
      return(1);
481
473
    }
513
505
  {
514
506
    if (Count)
515
507
    {
516
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
508
      info->error= left_length;         /* We only got this many char */
517
509
      return(1);
518
510
    }
519
511
    length=0;                           /* Didn't read any chars */
951
943
        */
952
944
        if (cache->seek_not_done)
953
945
        {
954
 
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
946
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
947
              == MY_FILEPOS_ERROR)
955
948
          {
956
949
            cache->error= -1;
957
950
            unlock_io_cache(cache);
1032
1025
  */
1033
1026
  while (write_length)
1034
1027
  {
1035
 
    size_t copy_length= min(write_length, write_cache->buffer_length);
1036
 
    int  rc;
 
1028
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
 
1029
    int  __attribute__((unused)) rc;
1037
1030
 
1038
1031
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1039
1032
    /* The writing thread does always have the lock when it awakes. */
1056
1049
 
1057
1050
/*
1058
1051
  Do sequential read from the SEQ_READ_APPEND cache.
1059
 
 
 
1052
  
1060
1053
  We do this in three stages:
1061
1054
   - first read from info->buffer
1062
1055
   - then if there are still data to read, try the file descriptor
1092
1085
    With read-append cache we must always do a seek before we read,
1093
1086
    because the write could have moved the file pointer astray
1094
1087
  */
1095
 
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1088
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1096
1089
  {
1097
1090
   info->error= -1;
1098
1091
   unlock_append_buffer(info);
1190
1183
      TODO: figure out if the assert below is needed or correct.
1191
1184
    */
1192
1185
    assert(pos_in_file == info->end_of_file);
1193
 
    copy_len=min(Count, len_in_buff);
 
1186
    copy_len=cmin(Count, len_in_buff);
1194
1187
    memcpy(Buffer, info->append_read_pos, copy_len);
1195
1188
    info->append_read_pos += copy_len;
1196
1189
    Count -= copy_len;
1197
1190
    if (Count)
1198
 
      info->error = static_cast<int>(save_count - Count);
 
1191
      info->error = save_count - Count;
1199
1192
 
1200
1193
    /* Fill read buffer with data from write buffer */
1201
1194
    memcpy(info->buffer, info->append_read_pos,
1224
1217
      Count                     Number of bytes to read into Buffer
1225
1218
 
1226
1219
  RETURN VALUE
1227
 
    -1          An error has occurred; errno is set.
 
1220
    -1          An error has occurred; my_errno is set.
1228
1221
     0          Success
1229
1222
     1          An error has occurred; IO_CACHE to error state.
1230
1223
*/
1252
1245
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1253
1246
                 my_filename(info->file),
1254
1247
                 info->aio_result.result.aio_errno);
1255
 
      errno=info->aio_result.result.aio_errno;
 
1248
      my_errno=info->aio_result.result.aio_errno;
1256
1249
      info->error= -1;
1257
1250
      return(1);
1258
1251
    }
1259
1252
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1260
1253
        read_length == (size_t) -1)
1261
1254
    {
1262
 
      errno=0;                          /* For testing */
 
1255
      my_errno=0;                               /* For testing */
1263
1256
      info->error= (read_length == (size_t) -1 ? -1 :
1264
1257
                    (int) (read_length+left_length));
1265
1258
      return(1);
1292
1285
      }
1293
1286
    }
1294
1287
        /* Copy found bytes to buffer */
1295
 
    length=min(Count,read_length);
 
1288
    length=cmin(Count,read_length);
1296
1289
    memcpy(Buffer,info->read_pos,(size_t) length);
1297
1290
    Buffer+=length;
1298
1291
    Count-=length;
1312
1305
      info->error=(int) (read_length+left_length);
1313
1306
      return 1;
1314
1307
    }
1315
 
 
1316
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1308
    
 
1309
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1310
        == MY_FILEPOS_ERROR)
1317
1311
    {
1318
1312
      info->error= -1;
1319
1313
      return (1);
1325
1319
      if ((read_length=my_read(info->file,info->request_pos,
1326
1320
                               read_length, info->myflags)) == (size_t) -1)
1327
1321
        return info->error= -1;
1328
 
      use_length=min(Count,read_length);
 
1322
      use_length=cmin(Count,read_length);
1329
1323
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1330
1324
      info->read_pos=info->request_pos+Count;
1331
1325
      info->read_end=info->request_pos+read_length;
1336
1330
      {                                 /* Didn't find hole block */
1337
1331
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1338
1332
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1339
 
                   my_filename(info->file),errno);
 
1333
                   my_filename(info->file),my_errno);
1340
1334
        info->error=(int) (read_length+left_length);
1341
1335
        return 1;
1342
1336
      }
1369
1363
  {
1370
1364
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
1371
1365
    if (aioread(info->file,read_buffer, max_length,
1372
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1366
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
1373
1367
                &info->aio_result.result))
1374
1368
    {                                           /* Skip async io */
1375
 
      errno=errno;
 
1369
      my_errno=errno;
1376
1370
      if (info->request_pos != info->buffer)
1377
1371
      {
1378
 
        memmove(info->buffer, info->request_pos,
1379
 
                (size_t) (info->read_end - info->read_pos));
 
1372
        memcpy(info->buffer, info->request_pos,
 
1373
               (size_t) (info->read_end - info->read_pos));
1380
1374
        info->request_pos=info->buffer;
1381
1375
        info->read_pos-=info->read_length;
1382
1376
        info->read_end-=info->read_length;
1407
1401
  return (int) (unsigned char) buff;
1408
1402
}
1409
1403
 
1410
 
/*
 
1404
/* 
1411
1405
   Write a byte buffer to IO_CACHE and flush to disk
1412
1406
   if IO_CACHE is full.
1413
1407
 
1414
1408
   RETURN VALUE
1415
1409
    1 On error on write
1416
1410
    0 On success
1417
 
   -1 On error; errno contains error code.
 
1411
   -1 On error; my_errno contains error code.
1418
1412
*/
1419
1413
 
1420
1414
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1423
1417
 
1424
1418
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1425
1419
  {
1426
 
    errno=errno=EFBIG;
 
1420
    my_errno=errno=EFBIG;
1427
1421
    return info->error = -1;
1428
1422
  }
1429
1423
 
1446
1440
        "seek_not_done" to indicate this to other functions operating
1447
1441
        on the IO_CACHE.
1448
1442
      */
1449
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1443
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1450
1444
      {
1451
1445
        info->error= -1;
1452
1446
        return (1);
1479
1473
  return 0;
1480
1474
}
1481
1475
 
 
1476
 
 
1477
/*
 
1478
  Append a block to the write buffer.
 
1479
  This is done with the buffer locked to ensure that we don't read from
 
1480
  the write buffer before we are ready with it.
 
1481
*/
 
1482
 
 
1483
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1484
{
 
1485
  size_t rest_length,length;
 
1486
 
 
1487
  /*
 
1488
    Assert that we cannot come here with a shared cache. If we do one
 
1489
    day, we might need to add a call to copy_to_read_buffer().
 
1490
  */
 
1491
  assert(!info->share);
 
1492
 
 
1493
  lock_append_buffer(info);
 
1494
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1495
  if (Count <= rest_length)
 
1496
    goto end;
 
1497
  memcpy(info->write_pos, Buffer, rest_length);
 
1498
  Buffer+=rest_length;
 
1499
  Count-=rest_length;
 
1500
  info->write_pos+=rest_length;
 
1501
  if (my_b_flush_io_cache(info,0))
 
1502
  {
 
1503
    unlock_append_buffer(info);
 
1504
    return 1;
 
1505
  }
 
1506
  if (Count >= IO_SIZE)
 
1507
  {                                     /* Fill first intern buffer */
 
1508
    length=Count & (size_t) ~(IO_SIZE-1);
 
1509
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1510
    {
 
1511
      unlock_append_buffer(info);
 
1512
      return info->error= -1;
 
1513
    }
 
1514
    Count-=length;
 
1515
    Buffer+=length;
 
1516
    info->end_of_file+=length;
 
1517
  }
 
1518
 
 
1519
end:
 
1520
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1521
  info->write_pos+=Count;
 
1522
  unlock_append_buffer(info);
 
1523
  return 0;
 
1524
}
 
1525
 
 
1526
 
 
1527
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1528
{
 
1529
  /*
 
1530
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1531
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1532
    several layers of ternary operators that evaluated comma(,) operator
 
1533
    expressions inside - I do have a test case if somebody wants it
 
1534
  */
 
1535
  if (info->type == SEQ_READ_APPEND)
 
1536
    return my_b_append(info, Buffer, Count);
 
1537
  return my_b_write(info, Buffer, Count);
 
1538
}
 
1539
 
 
1540
 
1482
1541
/*
1483
1542
  Write a block to disk where part of the data may be inside the record
1484
1543
  buffer.  As all write calls to the data goes through the cache,
1503
1562
    if (pos + Count <= info->pos_in_file)
1504
1563
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1505
1564
    /* Write the part of the block that is before buffer */
1506
 
    length= (uint32_t) (info->pos_in_file - pos);
 
1565
    length= (uint) (info->pos_in_file - pos);
1507
1566
    if (pwrite(info->file, Buffer, length, pos) == 0)
1508
1567
      info->error= error= -1;
1509
1568
    Buffer+=length;
1510
1569
    pos+=  length;
1511
1570
    Count-= length;
 
1571
#ifndef HAVE_PREAD
 
1572
    info->seek_not_done=1;
 
1573
#endif
1512
1574
  }
1513
1575
 
1514
1576
  /* Check if we want to write inside the used part of the buffer.*/
1578
1640
      */
1579
1641
      if (!append_cache && info->seek_not_done)
1580
1642
      {                                 /* File touched, do seek */
1581
 
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1643
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1644
            MY_FILEPOS_ERROR)
1582
1645
        {
1583
1646
          UNLOCK_APPEND_BUFFER;
1584
1647
          return((info->error= -1));
1603
1666
      else
1604
1667
      {
1605
1668
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1606
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
1607
 
        assert(info->end_of_file == tell_ret);
 
1669
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
1608
1670
      }
1609
1671
 
1610
1672
      info->append_read_pos=info->write_pos=info->write_buffer;
 
1673
      ++info->disk_writes;
1611
1674
      UNLOCK_APPEND_BUFFER;
1612
1675
      return(info->error);
1613
1676
    }
1673
1736
  return(error);
1674
1737
} /* end_io_cache */
1675
1738
 
1676
 
} /* namespace internal */
1677
 
} /* namespace drizzled */
1678
1739
 
1679
1740
/**********************************************************************
1680
1741
 Testing of MF_IOCACHE
1682
1743
 
1683
1744
#ifdef MAIN
1684
1745
 
 
1746
#include <my_dir.h>
 
1747
 
1685
1748
void die(const char* fmt, ...)
1686
1749
{
1687
1750
  va_list va_args;
1721
1784
  char* block, *block_end;
1722
1785
  MY_INIT(argv[0]);
1723
1786
  max_block = cache_size*3;
1724
 
  if (!(block=(char*)malloc(max_block)))
 
1787
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1725
1788
    die("Not enough memory to allocate test block");
1726
1789
  block_end = block + max_block;
1727
1790
  for (p = block,i=0; p < block_end;i++)