~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/mf_iocache.cc

  • Committer: Eric Day
  • Date: 2009-08-27 07:26:22 UTC
  • mto: This revision was merged to the branch mainline in revision 1131.
  • Revision ID: eday@oddments.org-20090827072622-72te13ua0wdlc2ky
Reworked listen interface to not require binding of TCP ports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
  write buffer to the read buffer before we start to reuse it.
48
48
*/
49
49
 
50
 
#define MAP_TO_USE_RAID
51
 
#include "mysys_priv.h"
 
50
#include "mysys/mysys_priv.h"
52
51
#include <mystrings/m_string.h>
53
52
#ifdef HAVE_AIOWAIT
54
 
#include "mysys_err.h"
 
53
#include "mysys/mysys_err.h"
 
54
#include <mysys/aio_result.h>
55
55
static void my_aiowait(my_aio_result *result);
56
56
#endif
 
57
#include <mysys/iocache.h>
57
58
#include <errno.h>
 
59
#include <drizzled/util/test.h>
 
60
#include <stdlib.h>
 
61
#include <algorithm>
 
62
 
 
63
using namespace std;
58
64
 
59
65
#define lock_append_buffer(info) \
60
66
 pthread_mutex_lock(&(info)->append_buffer_lock)
150
156
                  bool use_async_io, myf cache_myflags)
151
157
{
152
158
  size_t min_cache;
153
 
  my_off_t pos;
 
159
  off_t pos;
154
160
  my_off_t end_of_file= ~(my_off_t) 0;
155
161
 
156
162
  info->file= file;
164
170
 
165
171
  if (file >= 0)
166
172
  {
167
 
    pos= my_tell(file, MYF(0));
168
 
    if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
 
173
    pos= lseek(file, 0, SEEK_CUR);
 
174
    if ((pos == MY_FILEPOS_ERROR) && (my_errno == ESPIPE))
169
175
    {
170
176
      /*
171
177
         This kind of object doesn't support seek() or tell(). Don't set a
180
186
      assert(seek_offset == 0);
181
187
    }
182
188
    else
183
 
      info->seek_not_done= test(seek_offset != pos);
 
189
      info->seek_not_done= test(seek_offset != (my_off_t)pos);
184
190
  }
185
191
 
186
 
  info->disk_writes= 0;
187
192
  info->share=0;
188
193
 
189
194
  if (!cachesize && !(cachesize= my_default_record_cache_size))
194
199
    if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
195
200
    {
196
201
      /* Calculate end of file to avoid allocating oversized buffers */
197
 
      end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
 
202
      end_of_file=lseek(file,0L,SEEK_END);
198
203
      /* Need to reset seek_not_done now that we just did a seek. */
199
204
      info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200
205
      if (end_of_file < seek_offset)
221
226
      if (type == SEQ_READ_APPEND)
222
227
        buffer_block *= 2;
223
228
      if ((info->buffer=
224
 
           (unsigned char*) my_malloc(buffer_block,
225
 
                             MYF((cache_myflags & ~ MY_WME) |
226
 
                                 (cachesize == min_cache ? MY_WME : 0)))) != 0)
 
229
           (unsigned char*) malloc(buffer_block)) != 0)
227
230
      {
228
231
        info->write_buffer=info->buffer;
229
232
        if (type == SEQ_READ_APPEND)
247
250
    info->write_end = info->write_buffer + info->buffer_length;
248
251
    pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
249
252
  }
250
 
#if defined(SAFE_MUTEX)
251
 
  else
252
 
  {
253
 
    /* Clear mutex so that safe_mutex will notice that it's not initialized */
254
 
    memset(&info->append_buffer_lock, 0, sizeof(info));
255
 
  }
256
 
#endif
257
253
 
258
254
  if (type == WRITE_CACHE)
259
255
    info->write_end=
313
309
 
314
310
bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
315
311
                        my_off_t seek_offset,
316
 
                        bool use_async_io __attribute__((unused)),
 
312
                        bool use_async_io,
317
313
                        bool clear_cache)
318
314
{
319
315
  /* One can't do reinit with the following types */
395
391
    info->read_function=_my_b_async_read;
396
392
  }
397
393
  info->inited=0;
 
394
#else
 
395
  (void)use_async_io;
398
396
#endif
399
397
  return(0);
400
398
} /* reinit_io_cache */
445
443
  /* pos_in_file always point on where info->buffer was read */
446
444
  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
447
445
 
448
 
  /* 
 
446
  /*
449
447
    Whenever a function which operates on IO_CACHE flushes/writes
450
448
    some part of the IO_CACHE to disk it will set the property
451
449
    "seek_not_done" to indicate this to other functions operating
453
451
  */
454
452
  if (info->seek_not_done)
455
453
  {
456
 
    if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) 
457
 
        != MY_FILEPOS_ERROR))
 
454
    if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
458
455
    {
459
456
      /* No error, reset seek_not_done flag. */
460
457
      info->seek_not_done= 0;
942
939
        */
943
940
        if (cache->seek_not_done)
944
941
        {
945
 
          if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
946
 
              == MY_FILEPOS_ERROR)
 
942
          if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
947
943
          {
948
944
            cache->error= -1;
949
945
            unlock_io_cache(cache);
1024
1020
  */
1025
1021
  while (write_length)
1026
1022
  {
1027
 
    size_t copy_length= cmin(write_length, write_cache->buffer_length);
1028
 
    int  __attribute__((unused)) rc;
 
1023
    size_t copy_length= min(write_length, write_cache->buffer_length);
 
1024
    int  rc;
1029
1025
 
1030
1026
    rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1031
1027
    /* The writing thread does always have the lock when it awakes. */
1048
1044
 
1049
1045
/*
1050
1046
  Do sequential read from the SEQ_READ_APPEND cache.
1051
 
  
 
1047
 
1052
1048
  We do this in three stages:
1053
1049
   - first read from info->buffer
1054
1050
   - then if there are still data to read, try the file descriptor
1084
1080
    With read-append cache we must always do a seek before we read,
1085
1081
    because the write could have moved the file pointer astray
1086
1082
  */
1087
 
  if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
 
1083
  if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1088
1084
  {
1089
1085
   info->error= -1;
1090
1086
   unlock_append_buffer(info);
1182
1178
      TODO: figure out if the assert below is needed or correct.
1183
1179
    */
1184
1180
    assert(pos_in_file == info->end_of_file);
1185
 
    copy_len=cmin(Count, len_in_buff);
 
1181
    copy_len=min(Count, len_in_buff);
1186
1182
    memcpy(Buffer, info->append_read_pos, copy_len);
1187
1183
    info->append_read_pos += copy_len;
1188
1184
    Count -= copy_len;
1284
1280
      }
1285
1281
    }
1286
1282
        /* Copy found bytes to buffer */
1287
 
    length=cmin(Count,read_length);
 
1283
    length=min(Count,read_length);
1288
1284
    memcpy(Buffer,info->read_pos,(size_t) length);
1289
1285
    Buffer+=length;
1290
1286
    Count-=length;
1304
1300
      info->error=(int) (read_length+left_length);
1305
1301
      return 1;
1306
1302
    }
1307
 
    
1308
 
    if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1309
 
        == MY_FILEPOS_ERROR)
 
1303
 
 
1304
    if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1310
1305
    {
1311
1306
      info->error= -1;
1312
1307
      return (1);
1318
1313
      if ((read_length=my_read(info->file,info->request_pos,
1319
1314
                               read_length, info->myflags)) == (size_t) -1)
1320
1315
        return info->error= -1;
1321
 
      use_length=cmin(Count,read_length);
 
1316
      use_length=min(Count,read_length);
1322
1317
      memcpy(Buffer,info->request_pos,(size_t) use_length);
1323
1318
      info->read_pos=info->request_pos+Count;
1324
1319
      info->read_end=info->request_pos+read_length;
1362
1357
  {
1363
1358
    info->aio_result.result.aio_errno=AIO_INPROGRESS;   /* Marker for test */
1364
1359
    if (aioread(info->file,read_buffer, max_length,
1365
 
                (my_off_t) next_pos_in_file,MY_SEEK_SET,
 
1360
                (my_off_t) next_pos_in_file,SEEK_SET,
1366
1361
                &info->aio_result.result))
1367
1362
    {                                           /* Skip async io */
1368
1363
      my_errno=errno;
1369
1364
      if (info->request_pos != info->buffer)
1370
1365
      {
1371
 
        memcpy(info->buffer, info->request_pos,
1372
 
               (size_t) (info->read_end - info->read_pos));
 
1366
        memmove(info->buffer, info->request_pos,
 
1367
                (size_t) (info->read_end - info->read_pos));
1373
1368
        info->request_pos=info->buffer;
1374
1369
        info->read_pos-=info->read_length;
1375
1370
        info->read_end-=info->read_length;
1400
1395
  return (int) (unsigned char) buff;
1401
1396
}
1402
1397
 
1403
 
/* 
 
1398
/*
1404
1399
   Write a byte buffer to IO_CACHE and flush to disk
1405
1400
   if IO_CACHE is full.
1406
1401
 
1439
1434
        "seek_not_done" to indicate this to other functions operating
1440
1435
        on the IO_CACHE.
1441
1436
      */
1442
 
      if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
 
1437
      if (lseek(info->file,info->pos_in_file,SEEK_SET))
1443
1438
      {
1444
1439
        info->error= -1;
1445
1440
        return (1);
1527
1522
{
1528
1523
  /*
1529
1524
    Sasha: We are not writing this with the ? operator to avoid hitting
1530
 
    a possible compiler bug. At least gcc 2.95 cannot deal with 
 
1525
    a possible compiler bug. At least gcc 2.95 cannot deal with
1531
1526
    several layers of ternary operators that evaluated comma(,) operator
1532
1527
    expressions inside - I do have a test case if somebody wants it
1533
1528
  */
1561
1556
    if (pos + Count <= info->pos_in_file)
1562
1557
      return (pwrite(info->file, Buffer, Count, pos) == 0);
1563
1558
    /* Write the part of the block that is before buffer */
1564
 
    length= (uint) (info->pos_in_file - pos);
 
1559
    length= (uint32_t) (info->pos_in_file - pos);
1565
1560
    if (pwrite(info->file, Buffer, length, pos) == 0)
1566
1561
      info->error= error= -1;
1567
1562
    Buffer+=length;
1639
1634
      */
1640
1635
      if (!append_cache && info->seek_not_done)
1641
1636
      {                                 /* File touched, do seek */
1642
 
        if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1643
 
            MY_FILEPOS_ERROR)
 
1637
        if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1644
1638
        {
1645
1639
          UNLOCK_APPEND_BUFFER;
1646
1640
          return((info->error= -1));
1665
1659
      else
1666
1660
      {
1667
1661
        info->end_of_file+=(info->write_pos-info->append_read_pos);
1668
 
        assert(info->end_of_file == my_tell(info->file,MYF(0)));
 
1662
        my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
 
1663
        assert(info->end_of_file == tell_ret);
1669
1664
      }
1670
1665
 
1671
1666
      info->append_read_pos=info->write_pos=info->write_buffer;
1672
 
      ++info->disk_writes;
1673
1667
      UNLOCK_APPEND_BUFFER;
1674
1668
      return(info->error);
1675
1669
    }
1783
1777
  char* block, *block_end;
1784
1778
  MY_INIT(argv[0]);
1785
1779
  max_block = cache_size*3;
1786
 
  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
 
1780
  if (!(block=(char*)malloc(max_block)))
1787
1781
    die("Not enough memory to allocate test block");
1788
1782
  block_end = block + max_block;
1789
1783
  for (p = block,i=0; p < block_end;i++)