~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Brian Aker
  • Date: 2009-07-11 19:23:04 UTC
  • mfrom: (1089.1.14 merge)
  • Revision ID: brian@gaz-20090711192304-ootijyl5yf9jq9kd
Merge Brian

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#define MAP_TO_USE_RAID
51
 
#include "mysys_priv.h"
 
50
#include "mysys/mysys_priv.h"
52
51
#include <mystrings/m_string.h>
53
52
#ifdef HAVE_AIOWAIT
54
 
#include "mysys_err.h"
 
53
#include "mysys/mysys_err.h"
 
54
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
 
57
#include <mysys/iocache.h>
57
58
#include <errno.h>
 
59
#include <drizzled/util/test.h>
 
60
#include <stdlib.h>
 
61
#include <algorithm>
 
62
 
 
63
using namespace std;
58
64
 
59
65
#define lock_append_buffer(info) \
60
66
 pthread_mutex_lock(&(info)->append_buffer_lock)
150
156
                  bool use_async_io, myf cache_myflags)
151
157
{
152
158
  size_t min_cache;
153
 
  my_off_t pos;
 
159
  off_t pos;
154
160
  my_off_t end_of_file= ~(my_off_t) 0;
155
161
 
156
162
  info->file= file;
164
170
 
165
171
  if (file >= 0)
166
172
  {
167
 
    pos= my_tell(file, MYF(0));
168
 
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
 
173
    pos= lseek(file, 0, SEEK_CUR);
 
174
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
169
175
    {
170
176
      /*
171
177
         This kind of object doesn't support seek() or tell(). Don't set a
180
186
      assert(seek_offset == 0);
181
187
    }
182
188
    else
183
 
      info->seek_not_done= test(seek_offset != pos);
 
189
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
184
190
  }
185
191
 
186
 
  info->disk_writes= 0;
187
192
  info->share=0;
188
193
 
189
194
  if (!cachesize && !(cachesize= my_default_record_cache_size))
194
199
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
195
200
    {
196
201
      /* Calculate end of file to avoid allocating oversized buffers */
197
 
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
 
202
      end_of_file=lseek(file,0L,SEEK_END);
198
203
      /* Need to reset seek_not_done now that we just did a seek. */
199
204
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200
205
      if (end_of_file < seek_offset)
221
226
      if (type == SEQ_READ_APPEND)
222
227
        buffer_block *= 2;
223
228
      if ((info->buffer=
224
 
           (uchar*) my_malloc(buffer_block,
225
 
                             MYF((cache_myflags & ~ MY_WME) |
226
 
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
229
           (unsigned char*) malloc(buffer_block)) != 0)
227
230
      {
228
231
        info->write_buffer=info->buffer;
229
232
        if (type == SEQ_READ_APPEND)
247
250
    info->write_end = info->write_buffer + info->buffer_length;
248
251
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
249
252
  }
250
 
#if defined(SAFE_MUTEX)
251
 
  else
252
 
  {
253
 
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
254
 
    memset(&info->append_buffer_lock, 0, sizeof(info));
255
 
  }
256
 
#endif
257
253
 
258
254
  if (type == WRITE_CACHE)
259
255
    info->write_end=
313
309
 
314
310
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
315
311
                        my_off_t seek_offset,
316
 
                        bool use_async_io __attribute__((unused)),
 
312
                        bool use_async_io,
317
313
                        bool clear_cache)
318
314
{
319
315
  /* One can't do reinit with the following types */
327
323
      seek_offset <= my_b_tell(info))
328
324
  {
329
325
    /* Reuse current buffer without flushing it to disk */
330
 
    uchar *pos;
 
326
    unsigned char *pos;
331
327
    if (info->type == WRITE_CACHE && type == READ_CACHE)
332
328
    {
333
329
      info->read_end=info->write_pos;
395
391
    info->read_function=_my_b_async_read;
396
392
  }
397
393
  info->inited=0;
 
394
#else
 
395
  (void)use_async_io;
398
396
#endif
399
397
  return(0);
400
398
} /* reinit_io_cache */
429
427
    1      Error: can't read requested characters
430
428
*/
431
429
 
432
 
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
430
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
433
431
{
434
432
  size_t length,diff_length,left_length, max_length;
435
433
  my_off_t pos_in_file;
445
443
  /* pos_in_file always point on where info->buffer was read */
446
444
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
447
445
 
448
 
  /* 
 
446
  /*
449
447
    Whenever a function which operates on IO_CACHE flushes/writes
450
448
    some part of the IO_CACHE to disk it will set the property
451
449
    "seek_not_done" to indicate this to other functions operating
453
451
  */
454
452
  if (info->seek_not_done)
455
453
  {
456
 
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
457
 
        != MY_FILEPOS_ERROR))
 
454
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
458
455
    {
459
456
      /* No error, reset seek_not_done flag. */
460
457
      info->seek_not_done= 0;
597
594
*/
598
595
 
599
596
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
600
 
                         IO_CACHE *write_cache, uint num_threads)
 
597
                         IO_CACHE *write_cache, uint32_t num_threads)
601
598
{
602
599
  assert(num_threads > 1);
603
600
  assert(read_cache->type == READ_CACHE);
649
646
void remove_io_thread(IO_CACHE *cache)
650
647
{
651
648
  IO_CACHE_SHARE *cshare= cache->share;
652
 
  uint total;
 
649
  uint32_t total;
653
650
 
654
651
  /* If the writer goes, it needs to flush the write cache. */
655
652
  if (cache == cshare->source_cache)
890
887
    1      Error: can't read requested characters
891
888
*/
892
889
 
893
 
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
 
890
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
894
891
{
895
892
  my_off_t pos_in_file;
896
893
  size_t length, diff_length, left_length;
942
939
        */
943
940
        if (cache->seek_not_done)
944
941
        {
945
 
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
946
 
              == MY_FILEPOS_ERROR)
 
942
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
947
943
          {
948
944
            cache->error= -1;
949
945
            unlock_io_cache(cache);
1013
1009
*/
1014
1010
 
1015
1011
static void copy_to_read_buffer(IO_CACHE *write_cache,
1016
 
                                const uchar *write_buffer, size_t write_length)
 
1012
                                const unsigned char *write_buffer, size_t write_length)
1017
1013
{
1018
1014
  IO_CACHE_SHARE *cshare= write_cache->share;
1019
1015
 
1025
1021
  while (write_length)
1026
1022
  {
1027
1023
    size_t copy_length= min(write_length, write_cache->buffer_length);
1028
 
    int  __attribute__((unused)) rc;
 
1024
    int  rc;
1029
1025
 
1030
1026
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1031
1027
    /* The writing thread does always have the lock when it awakes. */
1048
1044
 
1049
1045
/*
1050
1046
  Do sequential read from the SEQ_READ_APPEND cache.
1051
 
  
 
1047
 
1052
1048
  We do this in three stages:
1053
1049
   - first read from info->buffer
1054
1050
   - then if there are still data to read, try the file descriptor
1059
1055
    1  Failed to read
1060
1056
*/
1061
1057
 
1062
 
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1058
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1063
1059
{
1064
1060
  size_t length, diff_length, left_length, save_count, max_length;
1065
1061
  my_off_t pos_in_file;
1084
1080
    With read-append cache we must always do a seek before we read,
1085
1081
    because the write could have moved the file pointer astray
1086
1082
  */
1087
 
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1083
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1088
1084
  {
1089
1085
   info->error= -1;
1090
1086
   unlock_append_buffer(info);
1221
1217
     1          An error has occurred; IO_CACHE to error state.
1222
1218
*/
1223
1219
 
1224
 
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
 
1220
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1225
1221
{
1226
1222
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1227
1223
  size_t max_length;
1228
1224
  my_off_t next_pos_in_file;
1229
 
  uchar *read_buffer;
 
1225
  unsigned char *read_buffer;
1230
1226
 
1231
1227
  memcpy(Buffer,info->read_pos,
1232
1228
         (left_length= (size_t) (info->read_end-info->read_pos)));
1304
1300
      info->error=(int) (read_length+left_length);
1305
1301
      return 1;
1306
1302
    }
1307
 
    
1308
 
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1309
 
        == MY_FILEPOS_ERROR)
 
1303
 
 
1304
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1310
1305
    {
1311
1306
      info->error= -1;
1312
1307
      return (1);
1362
1357
  {
1363
1358
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
1364
1359
    if (aioread(info->file,read_buffer, max_length,
1365
 
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
 
1360
                (my_off_t) next_pos_in_file,SEEK_SET,
1366
1361
                &info->aio_result.result))
1367
1362
    {                                           /* Skip async io */
1368
1363
      my_errno=errno;
1369
1364
      if (info->request_pos != info->buffer)
1370
1365
      {
1371
 
        memcpy(info->buffer, info->request_pos,
1372
 
               (size_t) (info->read_end - info->read_pos));
 
1366
        memmove(info->buffer, info->request_pos,
 
1367
                (size_t) (info->read_end - info->read_pos));
1373
1368
        info->request_pos=info->buffer;
1374
1369
        info->read_pos-=info->read_length;
1375
1370
        info->read_end-=info->read_length;
1389
1384
 
1390
1385
int _my_b_get(IO_CACHE *info)
1391
1386
{
1392
 
  uchar buff;
 
1387
  unsigned char buff;
1393
1388
  IO_CACHE_CALLBACK pre_read,post_read;
1394
1389
  if ((pre_read = info->pre_read))
1395
1390
    (*pre_read)(info);
1397
1392
    return my_b_EOF;
1398
1393
  if ((post_read = info->post_read))
1399
1394
    (*post_read)(info);
1400
 
  return (int) (uchar) buff;
 
1395
  return (int) (unsigned char) buff;
1401
1396
}
1402
1397
 
1403
 
/* 
 
1398
/*
1404
1399
   Write a byte buffer to IO_CACHE and flush to disk
1405
1400
   if IO_CACHE is full.
1406
1401
 
1410
1405
   -1 On error; my_errno contains error code.
1411
1406
*/
1412
1407
 
1413
 
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1408
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1414
1409
{
1415
1410
  size_t rest_length,length;
1416
1411
 
1439
1434
        "seek_not_done" to indicate this to other functions operating
1440
1435
        on the IO_CACHE.
1441
1436
      */
1442
 
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
 
1437
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
1443
1438
      {
1444
1439
        info->error= -1;
1445
1440
        return (1);
1479
1474
  the write buffer before we are ready with it.
1480
1475
*/
1481
1476
 
1482
 
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1477
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1483
1478
{
1484
1479
  size_t rest_length,length;
1485
1480
 
1523
1518
}
1524
1519
 
1525
1520
 
1526
 
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1521
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1527
1522
{
1528
1523
  /*
1529
1524
    Sasha: We are not writing this with the ? operator to avoid hitting
1530
 
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1525
    a possible compiler bug. At least gcc 2.95 cannot deal with
1531
1526
    several layers of ternary operators that evaluated comma(,) operator
1532
1527
    expressions inside - I do have a test case if somebody wants it
1533
1528
  */
1543
1538
  we will never get a seek over the end of the buffer
1544
1539
*/
1545
1540
 
1546
 
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
 
1541
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
1547
1542
                   my_off_t pos)
1548
1543
{
1549
1544
  size_t length;
1561
1556
    if (pos + Count <= info->pos_in_file)
1562
1557
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1563
1558
    /* Write the part of the block that is before buffer */
1564
 
    length= (uint) (info->pos_in_file - pos);
 
1559
    length= (uint32_t) (info->pos_in_file - pos);
1565
1560
    if (pwrite(info->file, Buffer, length, pos) == 0)
1566
1561
      info->error= error= -1;
1567
1562
    Buffer+=length;
1639
1634
      */
1640
1635
      if (!append_cache && info->seek_not_done)
1641
1636
      {                                 /* File touched, do seek */
1642
 
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1643
 
            MY_FILEPOS_ERROR)
 
1637
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1644
1638
        {
1645
1639
          UNLOCK_APPEND_BUFFER;
1646
1640
          return((info->error= -1));
1665
1659
      else
1666
1660
      {
1667
1661
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1668
 
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
 
1662
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
 
1663
        assert(info->end_of_file == tell_ret);
1669
1664
      }
1670
1665
 
1671
1666
      info->append_read_pos=info->write_pos=info->write_buffer;
1672
 
      ++info->disk_writes;
1673
1667
      UNLOCK_APPEND_BUFFER;
1674
1668
      return(info->error);
1675
1669
    }
1723
1717
    info->alloced_buffer=0;
1724
1718
    if (info->file != -1)                       /* File doesn't exist */
1725
1719
      error= my_b_flush_io_cache(info,1);
1726
 
    my_free((uchar*) info->buffer,MYF(MY_WME));
1727
 
    info->buffer=info->read_pos=(uchar*) 0;
 
1720
    free((unsigned char*) info->buffer);
 
1721
    info->buffer=info->read_pos=(unsigned char*) 0;
1728
1722
  }
1729
1723
  if (info->type == SEQ_READ_APPEND)
1730
1724
  {
1783
1777
  char* block, *block_end;
1784
1778
  MY_INIT(argv[0]);
1785
1779
  max_block = cache_size*3;
1786
 
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1780
  if (!(block=(char*)malloc(max_block)))
1787
1781
    die("Not enough memory to allocate test block");
1788
1782
  block_end = block + max_block;
1789
1783
  for (p = block,i=0; p < block_end;i++)
1807
1801
    total_bytes += 4+block_size;
1808
1802
  }
1809
1803
  close_file(&sra_cache);
1810
 
  my_free(block,MYF(MY_WME));
 
1804
  free(block);
1811
1805
  if (!my_stat(fname,&status,MYF(MY_WME)))
1812
1806
    die("%s failed to stat, but I had just closed it,\
1813
1807
 wonder how that happened");