~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.c

  • Committer: Monty Taylor
  • Date: 2008-08-01 22:33:44 UTC
  • mto: (236.1.42 codestyle)
  • mto: This revision was merged to the branch mainline in revision 261.
  • Revision ID: monty@inaugust.com-20080801223344-vzhlflfmtijp1imv
First pass at gettexizing the error messages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "drizzled/internal/mysys_priv.h"
51
 
#include "drizzled/internal/m_string.h"
 
50
#define MAP_TO_USE_RAID
 
51
#include "mysys_priv.h"
 
52
#include <mystrings/m_string.h>
52
53
#ifdef HAVE_AIOWAIT
53
 
#include "drizzled/my_error.h"
54
 
#include "drizzled/internal/aio_result.h"
 
54
#include "mysys_err.h"
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
57
 
#include "drizzled/internal/iocache.h"
58
57
#include <errno.h>
59
 
#include <drizzled/util/test.h>
60
 
#include <stdlib.h>
61
 
#include <algorithm>
62
 
 
63
 
using namespace std;
64
 
 
65
 
extern "C" {
66
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
67
 
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
68
 
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
69
 
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
70
 
}
71
58
 
72
59
#define lock_append_buffer(info) \
73
60
 pthread_mutex_lock(&(info)->append_buffer_lock)
158
145
    #  error
159
146
*/
160
147
 
161
 
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
 
148
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
162
149
                  enum cache_type type, my_off_t seek_offset,
163
150
                  bool use_async_io, myf cache_myflags)
164
151
{
165
152
  size_t min_cache;
166
 
  off_t pos;
 
153
  my_off_t pos;
167
154
  my_off_t end_of_file= ~(my_off_t) 0;
168
155
 
169
156
  info->file= file;
177
164
 
178
165
  if (file >= 0)
179
166
  {
180
 
    pos= lseek(file, 0, SEEK_CUR);
181
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
167
    pos= my_tell(file, MYF(0));
 
168
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
182
169
    {
183
170
      /*
184
171
         This kind of object doesn't support seek() or tell(). Don't set a
193
180
      assert(seek_offset == 0);
194
181
    }
195
182
    else
196
 
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
 
183
      info->seek_not_done= test(seek_offset != pos);
197
184
  }
198
185
 
 
186
  info->disk_writes= 0;
199
187
  info->share=0;
200
188
 
201
189
  if (!cachesize && !(cachesize= my_default_record_cache_size))
206
194
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
207
195
    {
208
196
      /* Calculate end of file to avoid allocating oversized buffers */
209
 
      end_of_file=lseek(file,0L,SEEK_END);
 
197
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
210
198
      /* Need to reset seek_not_done now that we just did a seek. */
211
199
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
212
200
      if (end_of_file < seek_offset)
233
221
      if (type == SEQ_READ_APPEND)
234
222
        buffer_block *= 2;
235
223
      if ((info->buffer=
236
 
           (unsigned char*) malloc(buffer_block)) != 0)
 
224
           (uchar*) my_malloc(buffer_block,
 
225
                             MYF((cache_myflags & ~ MY_WME) |
 
226
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
237
227
      {
238
228
        info->write_buffer=info->buffer;
239
229
        if (type == SEQ_READ_APPEND)
257
247
    info->write_end = info->write_buffer + info->buffer_length;
258
248
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
259
249
  }
 
250
#if defined(SAFE_MUTEX)
 
251
  else
 
252
  {
 
253
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
254
    memset((char*) &info->append_buffer_lock, 0, sizeof(info));
 
255
  }
 
256
#endif
260
257
 
261
258
  if (type == WRITE_CACHE)
262
259
    info->write_end=
316
313
 
317
314
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
318
315
                        my_off_t seek_offset,
319
 
                        bool use_async_io,
 
316
                        bool use_async_io __attribute__((unused)),
320
317
                        bool clear_cache)
321
318
{
322
319
  /* One can't do reinit with the following types */
330
327
      seek_offset <= my_b_tell(info))
331
328
  {
332
329
    /* Reuse current buffer without flushing it to disk */
333
 
    unsigned char *pos;
 
330
    uchar *pos;
334
331
    if (info->type == WRITE_CACHE && type == READ_CACHE)
335
332
    {
336
333
      info->read_end=info->write_pos;
391
388
 
392
389
#ifdef HAVE_AIOWAIT
393
390
  if (use_async_io && ! my_disable_async_io &&
394
 
      ((uint32_t) info->buffer_length <
395
 
       (uint32_t) (info->end_of_file - seek_offset)))
 
391
      ((ulong) info->buffer_length <
 
392
       (ulong) (info->end_of_file - seek_offset)))
396
393
  {
397
394
    info->read_length=info->buffer_length/2;
398
395
    info->read_function=_my_b_async_read;
399
396
  }
400
397
  info->inited=0;
401
 
#else
402
 
  (void)use_async_io;
403
398
#endif
404
399
  return(0);
405
400
} /* reinit_io_cache */
434
429
    1      Error: can't read requested characters
435
430
*/
436
431
 
437
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
432
int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
438
433
{
439
434
  size_t length,diff_length,left_length, max_length;
440
435
  my_off_t pos_in_file;
450
445
  /* pos_in_file always point on where info->buffer was read */
451
446
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
452
447
 
453
 
  /*
 
448
  /* 
454
449
    Whenever a function which operates on IO_CACHE flushes/writes
455
450
    some part of the IO_CACHE to disk it will set the property
456
451
    "seek_not_done" to indicate this to other functions operating
458
453
  */
459
454
  if (info->seek_not_done)
460
455
  {
461
 
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
 
456
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
 
457
        != MY_FILEPOS_ERROR))
462
458
    {
463
459
      /* No error, reset seek_not_done flag. */
464
460
      info->seek_not_done= 0;
470
466
        info->file is a pipe or socket or FIFO.  We never should have tried
471
467
        to seek on that.  See Bugs#25807 and #22828 for more info.
472
468
      */
473
 
      assert(errno != ESPIPE);
 
469
      assert(my_errno != ESPIPE);
474
470
      info->error= -1;
475
471
      return(1);
476
472
    }
508
504
  {
509
505
    if (Count)
510
506
    {
511
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
507
      info->error= left_length;         /* We only got this many char */
512
508
      return(1);
513
509
    }
514
510
    length=0;                           /* Didn't read any chars */
601
597
*/
602
598
 
603
599
void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
604
 
                         IO_CACHE *write_cache, uint32_t num_threads)
 
600
                         IO_CACHE *write_cache, uint num_threads)
605
601
{
606
602
  assert(num_threads > 1);
607
603
  assert(read_cache->type == READ_CACHE);
653
649
void remove_io_thread(IO_CACHE *cache)
654
650
{
655
651
  IO_CACHE_SHARE *cshare= cache->share;
656
 
  uint32_t total;
 
652
  uint total;
657
653
 
658
654
  /* If the writer goes, it needs to flush the write cache. */
659
655
  if (cache == cshare->source_cache)
894
890
    1      Error: can't read requested characters
895
891
*/
896
892
 
897
 
extern "C" int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
893
int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
898
894
{
899
895
  my_off_t pos_in_file;
900
896
  size_t length, diff_length, left_length;
946
942
        */
947
943
        if (cache->seek_not_done)
948
944
        {
949
 
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
 
945
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
 
946
              == MY_FILEPOS_ERROR)
950
947
          {
951
948
            cache->error= -1;
952
949
            unlock_io_cache(cache);
1016
1013
*/
1017
1014
 
1018
1015
static void copy_to_read_buffer(IO_CACHE *write_cache,
1019
 
                                const unsigned char *write_buffer, size_t write_length)
 
1016
                                const uchar *write_buffer, size_t write_length)
1020
1017
{
1021
1018
  IO_CACHE_SHARE *cshare= write_cache->share;
1022
1019
 
1028
1025
  while (write_length)
1029
1026
  {
1030
1027
    size_t copy_length= min(write_length, write_cache->buffer_length);
1031
 
    int  rc;
 
1028
    int  __attribute__((unused)) rc;
1032
1029
 
1033
1030
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1034
1031
    /* The writing thread does always have the lock when it awakes. */
1051
1048
 
1052
1049
/*
1053
1050
  Do sequential read from the SEQ_READ_APPEND cache.
1054
 
 
 
1051
  
1055
1052
  We do this in three stages:
1056
1053
   - first read from info->buffer
1057
1054
   - then if there are still data to read, try the file descriptor
1062
1059
    1  Failed to read
1063
1060
*/
1064
1061
 
1065
 
extern "C" int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1062
int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1066
1063
{
1067
1064
  size_t length, diff_length, left_length, save_count, max_length;
1068
1065
  my_off_t pos_in_file;
1087
1084
    With read-append cache we must always do a seek before we read,
1088
1085
    because the write could have moved the file pointer astray
1089
1086
  */
1090
 
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1087
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1091
1088
  {
1092
1089
   info->error= -1;
1093
1090
   unlock_append_buffer(info);
1190
1187
    info->append_read_pos += copy_len;
1191
1188
    Count -= copy_len;
1192
1189
    if (Count)
1193
 
      info->error = static_cast<int>(save_count - Count);
 
1190
      info->error = save_count - Count;
1194
1191
 
1195
1192
    /* Fill read buffer with data from write buffer */
1196
1193
    memcpy(info->buffer, info->append_read_pos,
1219
1216
      Count                     Number of bytes to read into Buffer
1220
1217
 
1221
1218
  RETURN VALUE
1222
 
    -1          An error has occurred; errno is set.
 
1219
    -1          An error has occurred; my_errno is set.
1223
1220
     0          Success
1224
1221
     1          An error has occurred; IO_CACHE to error state.
1225
1222
*/
1226
1223
 
1227
 
int _my_b_async_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1224
int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1228
1225
{
1229
1226
  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1230
1227
  size_t max_length;
1231
1228
  my_off_t next_pos_in_file;
1232
 
  unsigned char *read_buffer;
 
1229
  uchar *read_buffer;
1233
1230
 
1234
1231
  memcpy(Buffer,info->read_pos,
1235
1232
         (left_length= (size_t) (info->read_end-info->read_pos)));
1247
1244
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1248
1245
                 my_filename(info->file),
1249
1246
                 info->aio_result.result.aio_errno);
1250
 
      errno=info->aio_result.result.aio_errno;
 
1247
      my_errno=info->aio_result.result.aio_errno;
1251
1248
      info->error= -1;
1252
1249
      return(1);
1253
1250
    }
1254
1251
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1255
1252
        read_length == (size_t) -1)
1256
1253
    {
1257
 
      errno=0;                          /* For testing */
 
1254
      my_errno=0;                               /* For testing */
1258
1255
      info->error= (read_length == (size_t) -1 ? -1 :
1259
1256
                    (int) (read_length+left_length));
1260
1257
      return(1);
1307
1304
      info->error=(int) (read_length+left_length);
1308
1305
      return 1;
1309
1306
    }
1310
 
 
1311
 
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1307
    
 
1308
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
 
1309
        == MY_FILEPOS_ERROR)
1312
1310
    {
1313
1311
      info->error= -1;
1314
1312
      return (1);
1331
1329
      {                                 /* Didn't find hole block */
1332
1330
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1333
1331
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1334
 
                   my_filename(info->file),errno);
 
1332
                   my_filename(info->file),my_errno);
1335
1333
        info->error=(int) (read_length+left_length);
1336
1334
        return 1;
1337
1335
      }
1364
1362
  {
1365
1363
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
1366
1364
    if (aioread(info->file,read_buffer, max_length,
1367
 
                (my_off_t) next_pos_in_file,SEEK_SET,
 
1365
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
1368
1366
                &info->aio_result.result))
1369
1367
    {                                           /* Skip async io */
1370
 
      errno=errno;
 
1368
      my_errno=errno;
1371
1369
      if (info->request_pos != info->buffer)
1372
1370
      {
1373
 
        memmove(info->buffer, info->request_pos,
1374
 
                (size_t) (info->read_end - info->read_pos));
 
1371
        memcpy(info->buffer, info->request_pos,
 
1372
               (size_t) (info->read_end - info->read_pos));
1375
1373
        info->request_pos=info->buffer;
1376
1374
        info->read_pos-=info->read_length;
1377
1375
        info->read_end-=info->read_length;
1391
1389
 
1392
1390
int _my_b_get(IO_CACHE *info)
1393
1391
{
1394
 
  unsigned char buff;
 
1392
  uchar buff;
1395
1393
  IO_CACHE_CALLBACK pre_read,post_read;
1396
1394
  if ((pre_read = info->pre_read))
1397
1395
    (*pre_read)(info);
1399
1397
    return my_b_EOF;
1400
1398
  if ((post_read = info->post_read))
1401
1399
    (*post_read)(info);
1402
 
  return (int) (unsigned char) buff;
 
1400
  return (int) (uchar) buff;
1403
1401
}
1404
1402
 
1405
 
/*
 
1403
/* 
1406
1404
   Write a byte buffer to IO_CACHE and flush to disk
1407
1405
   if IO_CACHE is full.
1408
1406
 
1409
1407
   RETURN VALUE
1410
1408
    1 On error on write
1411
1409
    0 On success
1412
 
   -1 On error; errno contains error code.
 
1410
   -1 On error; my_errno contains error code.
1413
1411
*/
1414
1412
 
1415
 
extern "C" int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1413
int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1416
1414
{
1417
1415
  size_t rest_length,length;
1418
1416
 
1419
1417
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1420
1418
  {
1421
 
    errno=errno=EFBIG;
 
1419
    my_errno=errno=EFBIG;
1422
1420
    return info->error = -1;
1423
1421
  }
1424
1422
 
1441
1439
        "seek_not_done" to indicate this to other functions operating
1442
1440
        on the IO_CACHE.
1443
1441
      */
1444
 
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
 
1442
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1445
1443
      {
1446
1444
        info->error= -1;
1447
1445
        return (1);
1474
1472
  return 0;
1475
1473
}
1476
1474
 
 
1475
 
 
1476
/*
 
1477
  Append a block to the write buffer.
 
1478
  This is done with the buffer locked to ensure that we don't read from
 
1479
  the write buffer before we are ready with it.
 
1480
*/
 
1481
 
 
1482
int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1483
{
 
1484
  size_t rest_length,length;
 
1485
 
 
1486
  /*
 
1487
    Assert that we cannot come here with a shared cache. If we do one
 
1488
    day, we might need to add a call to copy_to_read_buffer().
 
1489
  */
 
1490
  assert(!info->share);
 
1491
 
 
1492
  lock_append_buffer(info);
 
1493
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1494
  if (Count <= rest_length)
 
1495
    goto end;
 
1496
  memcpy(info->write_pos, Buffer, rest_length);
 
1497
  Buffer+=rest_length;
 
1498
  Count-=rest_length;
 
1499
  info->write_pos+=rest_length;
 
1500
  if (my_b_flush_io_cache(info,0))
 
1501
  {
 
1502
    unlock_append_buffer(info);
 
1503
    return 1;
 
1504
  }
 
1505
  if (Count >= IO_SIZE)
 
1506
  {                                     /* Fill first intern buffer */
 
1507
    length=Count & (size_t) ~(IO_SIZE-1);
 
1508
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1509
    {
 
1510
      unlock_append_buffer(info);
 
1511
      return info->error= -1;
 
1512
    }
 
1513
    Count-=length;
 
1514
    Buffer+=length;
 
1515
    info->end_of_file+=length;
 
1516
  }
 
1517
 
 
1518
end:
 
1519
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1520
  info->write_pos+=Count;
 
1521
  unlock_append_buffer(info);
 
1522
  return 0;
 
1523
}
 
1524
 
 
1525
 
 
1526
int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
 
1527
{
 
1528
  /*
 
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1531
    several layers of ternary operators that evaluated comma(,) operator
 
1532
    expressions inside - I do have a test case if somebody wants it
 
1533
  */
 
1534
  if (info->type == SEQ_READ_APPEND)
 
1535
    return my_b_append(info, Buffer, Count);
 
1536
  return my_b_write(info, Buffer, Count);
 
1537
}
 
1538
 
 
1539
 
1477
1540
/*
1478
1541
  Write a block to disk where part of the data may be inside the record
1479
1542
  buffer.  As all write calls to the data goes through the cache,
1480
1543
  we will never get a seek over the end of the buffer
1481
1544
*/
1482
1545
 
1483
 
int my_block_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count,
 
1546
int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1484
1547
                   my_off_t pos)
1485
1548
{
1486
1549
  size_t length;
1498
1561
    if (pos + Count <= info->pos_in_file)
1499
1562
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1500
1563
    /* Write the part of the block that is before buffer */
1501
 
    length= (uint32_t) (info->pos_in_file - pos);
 
1564
    length= (uint) (info->pos_in_file - pos);
1502
1565
    if (pwrite(info->file, Buffer, length, pos) == 0)
1503
1566
      info->error= error= -1;
1504
1567
    Buffer+=length;
1505
1568
    pos+=  length;
1506
1569
    Count-= length;
 
1570
#ifndef HAVE_PREAD
 
1571
    info->seek_not_done=1;
 
1572
#endif
1507
1573
  }
1508
1574
 
1509
1575
  /* Check if we want to write inside the used part of the buffer.*/
1573
1639
      */
1574
1640
      if (!append_cache && info->seek_not_done)
1575
1641
      {                                 /* File touched, do seek */
1576
 
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
 
1642
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
 
1643
            MY_FILEPOS_ERROR)
1577
1644
        {
1578
1645
          UNLOCK_APPEND_BUFFER;
1579
1646
          return((info->error= -1));
1598
1665
      else
1599
1666
      {
1600
1667
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1601
 
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
1602
 
        assert(info->end_of_file == tell_ret);
 
1668
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
1603
1669
      }
1604
1670
 
1605
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
 
1672
      ++info->disk_writes;
1606
1673
      UNLOCK_APPEND_BUFFER;
1607
1674
      return(info->error);
1608
1675
    }
1656
1723
    info->alloced_buffer=0;
1657
1724
    if (info->file != -1)                       /* File doesn't exist */
1658
1725
      error= my_b_flush_io_cache(info,1);
1659
 
    free((unsigned char*) info->buffer);
1660
 
    info->buffer=info->read_pos=(unsigned char*) 0;
 
1726
    my_free((uchar*) info->buffer,MYF(MY_WME));
 
1727
    info->buffer=info->read_pos=(uchar*) 0;
1661
1728
  }
1662
1729
  if (info->type == SEQ_READ_APPEND)
1663
1730
  {
1716
1783
  char* block, *block_end;
1717
1784
  MY_INIT(argv[0]);
1718
1785
  max_block = cache_size*3;
1719
 
  if (!(block=(char*)malloc(max_block)))
 
1786
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1720
1787
    die("Not enough memory to allocate test block");
1721
1788
  block_end = block + max_block;
1722
1789
  for (p = block,i=0; p < block_end;i++)
1740
1807
    total_bytes += 4+block_size;
1741
1808
  }
1742
1809
  close_file(&sra_cache);
1743
 
  free(block);
 
1810
  my_free(block,MYF(MY_WME));
1744
1811
  if (!my_stat(fname,&status,MYF(MY_WME)))
1745
1812
    die("%s failed to stat, but I had just closed it,\
1746
1813
 wonder how that happened");