~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Jay Pipes
  • Date: 2009-03-13 23:42:05 UTC
  • mto: This revision was merged to the branch mainline in revision 937.
  • Revision ID: jpipes@serialcoder-20090313234205-fmr8bd072nvfa28v
Forgot to add the new files...

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#include "drizzled/internal/mysys_priv.h"
51
 
#include "drizzled/internal/m_string.h"
 
50
#include "mysys_priv.h"
 
51
#include <mystrings/m_string.h>
52
52
#ifdef HAVE_AIOWAIT
53
 
#include "drizzled/my_error.h"
54
 
#include "drizzled/internal/aio_result.h"
 
53
#include "mysys_err.h"
 
54
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
57
 
#include "drizzled/internal/iocache.h"
 
57
#include <mysys/iocache.h>
58
58
#include <errno.h>
59
59
#include <drizzled/util/test.h>
60
60
#include <stdlib.h>
61
 
#include <algorithm>
62
 
 
63
 
using namespace std;
64
 
 
65
 
extern "C" {
66
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
67
 
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
68
 
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
69
 
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
70
 
}
71
61
 
72
62
#define lock_append_buffer(info) \
73
63
 pthread_mutex_lock(&(info)->append_buffer_lock)
158
148
    #  error
159
149
*/
160
150
 
161
 
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
 
151
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
162
152
                  enum cache_type type, my_off_t seek_offset,
163
153
                  bool use_async_io, myf cache_myflags)
164
154
{
178
168
  if (file >= 0)
179
169
  {
180
170
    pos= lseek(file, 0, SEEK_CUR);
181
 
    if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
 
171
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
182
172
    {
183
173
      /*
184
174
         This kind of object doesn't support seek() or tell(). Don't set a
196
186
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
197
187
  }
198
188
 
 
189
  info->disk_writes= 0;
199
190
  info->share=0;
200
191
 
201
192
  if (!cachesize && !(cachesize= my_default_record_cache_size))
257
248
    info->write_end = info->write_buffer + info->buffer_length;
258
249
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
259
250
  }
 
251
#if defined(SAFE_MUTEX)
 
252
  else
 
253
  {
 
254
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
 
255
    memset(&info->append_buffer_lock, 0, sizeof(info));
 
256
  }
 
257
#endif
260
258
 
261
259
  if (type == WRITE_CACHE)
262
260
    info->write_end=
434
432
    1      Error: can't read requested characters
435
433
*/
436
434
 
437
 
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
435
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
438
436
{
439
437
  size_t length,diff_length,left_length, max_length;
440
438
  my_off_t pos_in_file;
470
468
        info->file is a pipe or socket or FIFO.  We never should have tried
471
469
        to seek on that.  See Bugs#25807 and #22828 for more info.
472
470
      */
473
 
      assert(errno != ESPIPE);
 
471
      assert(my_errno != ESPIPE);
474
472
      info->error= -1;
475
473
      return(1);
476
474
    }
508
506
  {
509
507
    if (Count)
510
508
    {
511
 
      info->error= static_cast<int>(left_length);       /* We only got this many char */
 
509
      info->error= left_length;         /* We only got this many char */
512
510
      return(1);
513
511
    }
514
512
    length=0;                           /* Didn't read any chars */
894
892
    1      Error: can't read requested characters
895
893
*/
896
894
 
897
 
extern "C" int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
 
895
int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count)
898
896
{
899
897
  my_off_t pos_in_file;
900
898
  size_t length, diff_length, left_length;
1027
1025
  */
1028
1026
  while (write_length)
1029
1027
  {
1030
 
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1028
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
1031
1029
    int  rc;
1032
1030
 
1033
1031
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1062
1060
    1  Failed to read
1063
1061
*/
1064
1062
 
1065
 
extern "C" int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
 
1063
int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
1066
1064
{
1067
1065
  size_t length, diff_length, left_length, save_count, max_length;
1068
1066
  my_off_t pos_in_file;
1185
1183
      TODO: figure out if the assert below is needed or correct.
1186
1184
    */
1187
1185
    assert(pos_in_file == info->end_of_file);
1188
 
    copy_len=min(Count, len_in_buff);
 
1186
    copy_len=cmin(Count, len_in_buff);
1189
1187
    memcpy(Buffer, info->append_read_pos, copy_len);
1190
1188
    info->append_read_pos += copy_len;
1191
1189
    Count -= copy_len;
1192
1190
    if (Count)
1193
 
      info->error = static_cast<int>(save_count - Count);
 
1191
      info->error = save_count - Count;
1194
1192
 
1195
1193
    /* Fill read buffer with data from write buffer */
1196
1194
    memcpy(info->buffer, info->append_read_pos,
1219
1217
      Count                     Number of bytes to read into Buffer
1220
1218
 
1221
1219
  RETURN VALUE
1222
 
    -1          An error has occurred; errno is set.
 
1220
    -1          An error has occurred; my_errno is set.
1223
1221
     0          Success
1224
1222
     1          An error has occurred; IO_CACHE to error state.
1225
1223
*/
1247
1245
        my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1248
1246
                 my_filename(info->file),
1249
1247
                 info->aio_result.result.aio_errno);
1250
 
      errno=info->aio_result.result.aio_errno;
 
1248
      my_errno=info->aio_result.result.aio_errno;
1251
1249
      info->error= -1;
1252
1250
      return(1);
1253
1251
    }
1254
1252
    if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1255
1253
        read_length == (size_t) -1)
1256
1254
    {
1257
 
      errno=0;                          /* For testing */
 
1255
      my_errno=0;                               /* For testing */
1258
1256
      info->error= (read_length == (size_t) -1 ? -1 :
1259
1257
                    (int) (read_length+left_length));
1260
1258
      return(1);
1287
1285
      }
1288
1286
    }
1289
1287
        /* Copy found bytes to buffer */
1290
 
    length=min(Count,read_length);
 
1288
    length=cmin(Count,read_length);
1291
1289
    memcpy(Buffer,info->read_pos,(size_t) length);
1292
1290
    Buffer+=length;
1293
1291
    Count-=length;
1320
1318
      if ((read_length=my_read(info->file,info->request_pos,
1321
1319
                               read_length, info->myflags)) == (size_t) -1)
1322
1320
        return info->error= -1;
1323
 
      use_length=min(Count,read_length);
 
1321
      use_length=cmin(Count,read_length);
1324
1322
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1325
1323
      info->read_pos=info->request_pos+Count;
1326
1324
      info->read_end=info->request_pos+read_length;
1331
1329
      {                                 /* Didn't find hole block */
1332
1330
        if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1333
1331
          my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1334
 
                   my_filename(info->file),errno);
 
1332
                   my_filename(info->file),my_errno);
1335
1333
        info->error=(int) (read_length+left_length);
1336
1334
        return 1;
1337
1335
      }
1367
1365
                (my_off_t) next_pos_in_file,SEEK_SET,
1368
1366
                &info->aio_result.result))
1369
1367
    {                                           /* Skip async io */
1370
 
      errno=errno;
 
1368
      my_errno=errno;
1371
1369
      if (info->request_pos != info->buffer)
1372
1370
      {
1373
1371
        memmove(info->buffer, info->request_pos,
1409
1407
   RETURN VALUE
1410
1408
    1 On error on write
1411
1409
    0 On success
1412
 
   -1 On error; errno contains error code.
 
1410
   -1 On error; my_errno contains error code.
1413
1411
*/
1414
1412
 
1415
 
extern "C" int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1413
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1416
1414
{
1417
1415
  size_t rest_length,length;
1418
1416
 
1419
1417
  if (info->pos_in_file+info->buffer_length > info->end_of_file)
1420
1418
  {
1421
 
    errno=errno=EFBIG;
 
1419
    my_errno=errno=EFBIG;
1422
1420
    return info->error = -1;
1423
1421
  }
1424
1422
 
1474
1472
  return 0;
1475
1473
}
1476
1474
 
 
1475
 
 
1476
/*
 
1477
  Append a block to the write buffer.
 
1478
  This is done with the buffer locked to ensure that we don't read from
 
1479
  the write buffer before we are ready with it.
 
1480
*/
 
1481
 
 
1482
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1483
{
 
1484
  size_t rest_length,length;
 
1485
 
 
1486
  /*
 
1487
    Assert that we cannot come here with a shared cache. If we do one
 
1488
    day, we might need to add a call to copy_to_read_buffer().
 
1489
  */
 
1490
  assert(!info->share);
 
1491
 
 
1492
  lock_append_buffer(info);
 
1493
  rest_length= (size_t) (info->write_end - info->write_pos);
 
1494
  if (Count <= rest_length)
 
1495
    goto end;
 
1496
  memcpy(info->write_pos, Buffer, rest_length);
 
1497
  Buffer+=rest_length;
 
1498
  Count-=rest_length;
 
1499
  info->write_pos+=rest_length;
 
1500
  if (my_b_flush_io_cache(info,0))
 
1501
  {
 
1502
    unlock_append_buffer(info);
 
1503
    return 1;
 
1504
  }
 
1505
  if (Count >= IO_SIZE)
 
1506
  {                                     /* Fill first intern buffer */
 
1507
    length=Count & (size_t) ~(IO_SIZE-1);
 
1508
    if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
 
1509
    {
 
1510
      unlock_append_buffer(info);
 
1511
      return info->error= -1;
 
1512
    }
 
1513
    Count-=length;
 
1514
    Buffer+=length;
 
1515
    info->end_of_file+=length;
 
1516
  }
 
1517
 
 
1518
end:
 
1519
  memcpy(info->write_pos,Buffer,(size_t) Count);
 
1520
  info->write_pos+=Count;
 
1521
  unlock_append_buffer(info);
 
1522
  return 0;
 
1523
}
 
1524
 
 
1525
 
 
1526
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
 
1527
{
 
1528
  /*
 
1529
    Sasha: We are not writing this with the ? operator to avoid hitting
 
1530
    a possible compiler bug. At least gcc 2.95 cannot deal with
 
1531
    several layers of ternary operators that evaluated comma(,) operator
 
1532
    expressions inside - I do have a test case if somebody wants it
 
1533
  */
 
1534
  if (info->type == SEQ_READ_APPEND)
 
1535
    return my_b_append(info, Buffer, Count);
 
1536
  return my_b_write(info, Buffer, Count);
 
1537
}
 
1538
 
 
1539
 
1477
1540
/*
1478
1541
  Write a block to disk where part of the data may be inside the record
1479
1542
  buffer.  As all write calls to the data goes through the cache,
1504
1567
    Buffer+=length;
1505
1568
    pos+=  length;
1506
1569
    Count-= length;
 
1570
#ifndef HAVE_PREAD
 
1571
    info->seek_not_done=1;
 
1572
#endif
1507
1573
  }
1508
1574
 
1509
1575
  /* Check if we want to write inside the used part of the buffer.*/
1603
1669
      }
1604
1670
 
1605
1671
      info->append_read_pos=info->write_pos=info->write_buffer;
 
1672
      ++info->disk_writes;
1606
1673
      UNLOCK_APPEND_BUFFER;
1607
1674
      return(info->error);
1608
1675
    }