~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Monty Taylor
  • Date: 2008-08-16 21:06:22 UTC
  • Revision ID: monty@inaugust.com-20080816210622-zpnn13unyinqzn72
Updated po files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
 
50
#define MAP_TO_USE_RAID
50
51
#include "mysys_priv.h"
51
52
#include <mystrings/m_string.h>
52
53
#ifdef HAVE_AIOWAIT
53
54
#include "mysys_err.h"
54
 
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
57
 
#include <mysys/iocache.h>
58
57
#include <errno.h>
59
 
#include <drizzled/util/test.h>
60
 
#include <stdlib.h>
61
58
 
62
59
#define lock_append_buffer(info) \
63
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
153
150
                  bool use_async_io, myf cache_myflags)
154
151
{
155
152
  size_t min_cache;
156
 
  off_t pos;
 
153
  my_off_t pos;
157
154
  my_off_t end_of_file= ~(my_off_t) 0;
158
155
 
159
156
  info->file= file;
167
164
 
168
165
  if (file >= 0)
169
166
  {
170
 
    pos= lseek(file, 0, SEEK_CUR);
171
 
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
 
167
    pos= my_tell(file, MYF(0));
 
168
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
172
169
    {
173
170
      /*
174
171
         This kind of object doesn't support seek() or tell(). Don't set a
183
180
      assert(seek_offset == 0);
184
181
    }
185
182
    else
186
 
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
 
183
      info->seek_not_done= test(seek_offset != pos);
187
184
  }
188
185
 
189
186
  info->disk_writes= 0;
197
194
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
198
195
    {
199
196
      /* Calculate end of file to avoid allocating oversized buffers */
200
 
      end_of_file=lseek(file,0L,SEEK_END);
 
197
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
201
198
      /* Need to reset seek_not_done now that we just did a seek. */
202
199
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
203
200
      if (end_of_file < seek_offset)
224
221
      if (type == SEQ_READ_APPEND)
225
222
        buffer_block *= 2;
226
223
      if ((info->buffer=
227
 
           (unsigned char*) malloc(buffer_block)) != 0)
 
224
           (uchar*) my_malloc(buffer_block,
 
225
                             MYF((cache_myflags & ~ MY_WME) |
 
226
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
228
227
      {
229
228
        info->write_buffer=info->buffer;
230
229
        if (type == SEQ_READ_APPEND)
314
313
 
315
314
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
316
315
                        my_off_t seek_offset,
317
 
                        bool use_async_io,
 
316
                        bool use_async_io __attribute__((unused)),
318
317
                        bool clear_cache)
319
318
{
320
319
  /* One can't do reinit with the following types */
328
327
      seek_offset <= my_b_tell(info))
329
328
  {
330
329
    /* Reuse current buffer without flushing it to disk */
331
 
    unsigned char *pos;
 
330
    uchar *pos;
332
331
    if (info->type == WRITE_CACHE && type == READ_CACHE)
333
332
    {
334
333
      info->read_end=info->write_pos;
396
395
    info->read_function=_my_b_async_read;
397
396
  }
398
397
  info->inited=0;
399
 
#else
400
 
  (void)use_async_io;
401
398
#endif
402
399
  return(0);
403
400
} /* reinit_io_cache */
432
429
    1      Error: can't read requested characters
433
430
*/
434
431
 
435
 
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
432
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
436
433
{
437
434
  size_t length,diff_length,left_length, max_length;
438
435
  my_off_t pos_in_file;
448
445
  /* pos_in_file always point on where info->buffer was read */
449
446
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
450
447
 
451
 
  /*
 
448
  /* 
452
449
    Whenever a function which operates on IO_CACHE flushes/writes
453
450
    some part of the IO_CACHE to disk it will set the property
454
451
    "seek_not_done" to indicate this to other functions operating
456
453
  */
457
454
  if (info->seek_not_done)
458
455
  {
459
 
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
 
456
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
457
        != MY_FILEPOS_ERROR))
460
458
    {
461
459
      /* No error, reset seek_not_done flag. */
462
460
      info->seek_not_done= 0;
599
597
*/
600
598
 
601
599
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
602
 
                         IO_CACHE *write_cache, uint32_t num_threads)
 
600
                         IO_CACHE *write_cache, uint num_threads)
603
601
{
604
602
  assert(num_threads > 1);
605
603
  assert(read_cache->type == READ_CACHE);
651
649
void remove_io_thread(IO_CACHE *cache)
652
650
{
653
651
  IO_CACHE_SHARE *cshare= cache->share;
654
 
  uint32_t total;
 
652
  uint total;
655
653
 
656
654
  /* If the writer goes, it needs to flush the write cache. */
657
655
  if (cache == cshare->source_cache)
892
890
    1      Error: can't read requested characters
893
891
*/
894
892
 
895
 
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
893
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
896
894
{
897
895
  my_off_t pos_in_file;
898
896
  size_t length, diff_length, left_length;
944
942
        */
945
943
        if (cache->seek_not_done)
946
944
        {
947
 
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
945
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
946
              == MY_FILEPOS_ERROR)
948
947
          {
949
948
            cache->error= -1;
950
949
            unlock_io_cache(cache);
1014
1013
*/
1015
1014
 
1016
1015
static void copy_to_read_buffer(IO_CACHE *write_cache,
1017
 
                                const unsigned char *write_buffer, size_t write_length)
 
1016
                                const uchar *write_buffer, size_t write_length)
1018
1017
{
1019
1018
  IO_CACHE_SHARE *cshare= write_cache->share;
1020
1019
 
1025
1024
  */
1026
1025
  while (write_length)
1027
1026
  {
1028
 
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
1029
 
    int  rc;
 
1027
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1028
    int  __attribute__((unused)) rc;
1030
1029
 
1031
1030
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1032
1031
    /* The writing thread does always have the lock when it awakes. */
1049
1048
 
1050
1049
/*
1051
1050
  Do sequential read from the SEQ_READ_APPEND cache.
1052
 
 
 
1051
  
1053
1052
  We do this in three stages:
1054
1053
   - first read from info->buffer
1055
1054
   - then if there are still data to read, try the file descriptor
1060
1059
    1  Failed to read
1061
1060
*/
1062
1061
 
1063
 
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1062
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1064
1063
{
1065
1064
  size_t length, diff_length, left_length, save_count, max_length;
1066
1065
  my_off_t pos_in_file;
1085
1084
    With read-append cache we must always do a seek before we read,
1086
1085
    because the write could have moved the file pointer astray
1087
1086
  */
1088
 
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1087
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1089
1088
  {
1090
1089
   info->error= -1;
1091
1090
   unlock_append_buffer(info);
1183
1182
      TODO: figure out if the assert below is needed or correct.
1184
1183
    */
1185
1184
    assert(pos_in_file == info->end_of_file);
1186
 
    copy_len=cmin(Count, len_in_buff);
 
1185
    copy_len=min(Count, len_in_buff);
1187
1186
    memcpy(Buffer, info->append_read_pos, copy_len);
1188
1187
    info->append_read_pos += copy_len;
1189
1188
    Count -= copy_len;
1222
1221
     1          An error has occurred; IO_CACHE to error state.
1223
1222
*/
1224
1223
 
1225
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1224
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1226
1225
{
1227
1226
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1228
1227
  size_t max_length;
1229
1228
  my_off_t next_pos_in_file;
1230
 
  unsigned char *read_buffer;
 
1229
  uchar *read_buffer;
1231
1230
 
1232
1231
  memcpy(Buffer,info->read_pos,
1233
1232
         (left_length= (size_t) (info->read_end-info->read_pos)));
1285
1284
      }
1286
1285
    }
1287
1286
        /* Copy found bytes to buffer */
1288
 
    length=cmin(Count,read_length);
 
1287
    length=min(Count,read_length);
1289
1288
    memcpy(Buffer,info->read_pos,(size_t) length);
1290
1289
    Buffer+=length;
1291
1290
    Count-=length;
1305
1304
      info->error=(int) (read_length+left_length);
1306
1305
      return 1;
1307
1306
    }
1308
 
 
1309
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1307
    
 
1308
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1309
        == MY_FILEPOS_ERROR)
1310
1310
    {
1311
1311
      info->error= -1;
1312
1312
      return (1);
1318
1318
      if ((read_length=my_read(info->file,info->request_pos,
1319
1319
                               read_length, info->myflags)) == (size_t) -1)
1320
1320
        return info->error= -1;
1321
 
      use_length=cmin(Count,read_length);
 
1321
      use_length=min(Count,read_length);
1322
1322
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1323
1323
      info->read_pos=info->request_pos+Count;
1324
1324
      info->read_end=info->request_pos+read_length;
1362
1362
  {
1363
1363
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
1364
1364
    if (aioread(info->file,read_buffer, max_length,
1365
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1365
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
1366
1366
                &info->aio_result.result))
1367
1367
    {                                           /* Skip async io */
1368
1368
      my_errno=errno;
1369
1369
      if (info->request_pos != info->buffer)
1370
1370
      {
1371
 
        memmove(info->buffer, info->request_pos,
1372
 
                (size_t) (info->read_end - info->read_pos));
 
1371
        memcpy(info->buffer, info->request_pos,
 
1372
               (size_t) (info->read_end - info->read_pos));
1373
1373
        info->request_pos=info->buffer;
1374
1374
        info->read_pos-=info->read_length;
1375
1375
        info->read_end-=info->read_length;
1389
1389
 
1390
1390
int _my_b_get(IO_CACHE *info)
1391
1391
{
1392
 
  unsigned char buff;
 
1392
  uchar buff;
1393
1393
  IO_CACHE_CALLBACK pre_read,post_read;
1394
1394
  if ((pre_read = info->pre_read))
1395
1395
    (*pre_read)(info);
1397
1397
    return my_b_EOF;
1398
1398
  if ((post_read = info->post_read))
1399
1399
    (*post_read)(info);
1400
 
  return (int) (unsigned char) buff;
 
1400
  return (int) (uchar) buff;
1401
1401
}
1402
1402
 
1403
 
/*
 
1403
/* 
1404
1404
   Write a byte buffer to IO_CACHE and flush to disk
1405
1405
   if IO_CACHE is full.
1406
1406
 
1410
1410
   -1 On error; my_errno contains error code.
1411
1411
*/
1412
1412
 
1413
 
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1413
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1414
1414
{
1415
1415
  size_t rest_length,length;
1416
1416
 
1439
1439
        "seek_not_done" to indicate this to other functions operating
1440
1440
        on the IO_CACHE.
1441
1441
      */
1442
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1442
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1443
1443
      {
1444
1444
        info->error= -1;
1445
1445
        return (1);
1479
1479
  the write buffer before we are ready with it.
1480
1480
*/
1481
1481
 
1482
 
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1482
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1483
1483
{
1484
1484
  size_t rest_length,length;
1485
1485
 
1523
1523
}
1524
1524
 
1525
1525
 
1526
 
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1526
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1527
1527
{
1528
1528
  /*
1529
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
1530
 
    a possible compiler bug. At least gcc 2.95 cannot deal with
 
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with 
1531
1531
    several layers of ternary operators that evaluated comma(,) operator
1532
1532
    expressions inside - I do have a test case if somebody wants it
1533
1533
  */
1543
1543
  we will never get a seek over the end of the buffer
1544
1544
*/
1545
1545
 
1546
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
1546
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1547
1547
                   my_off_t pos)
1548
1548
{
1549
1549
  size_t length;
1639
1639
      */
1640
1640
      if (!append_cache && info->seek_not_done)
1641
1641
      {                                 /* File touched, do seek */
1642
 
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1642
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1643
            MY_FILEPOS_ERROR)
1643
1644
        {
1644
1645
          UNLOCK_APPEND_BUFFER;
1645
1646
          return((info->error= -1));
1664
1665
      else
1665
1666
      {
1666
1667
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1667
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
1668
 
        assert(info->end_of_file == tell_ret);
 
1668
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
1669
1669
      }
1670
1670
 
1671
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
1723
1723
    info->alloced_buffer=0;
1724
1724
    if (info->file != -1)                       /* File doesn't exist */
1725
1725
      error= my_b_flush_io_cache(info,1);
1726
 
    free((unsigned char*) info->buffer);
1727
 
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1726
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1727
    info->buffer=info->read_pos=(uchar*) 0;
1728
1728
  }
1729
1729
  if (info->type == SEQ_READ_APPEND)
1730
1730
  {
1783
1783
  char* block, *block_end;
1784
1784
  MY_INIT(argv[0]);
1785
1785
  max_block = cache_size*3;
1786
 
  if (!(block=(char*)malloc(max_block)))
 
1786
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1787
1787
    die("Not enough memory to allocate test block");
1788
1788
  block_end = block + max_block;
1789
1789
  for (p = block,i=0; p < block_end;i++)
1807
1807
    total_bytes += 4+block_size;
1808
1808
  }
1809
1809
  close_file(&sra_cache);
1810
 
  free(block);
 
1810
  my_free(block,MYF(MY_WME));
1811
1811
  if (!my_stat(fname,&status,MYF(MY_WME)))
1812
1812
    die("%s failed to stat, but I had just closed it,\
1813
1813
 wonder how that happened");