47
47
write buffer to the read buffer before we start to reuse it.
52
#include "drizzled/internal/my_sys.h"
53
#include "drizzled/internal/m_string.h"
50
#include "mysys_priv.h"
51
#include <mystrings/m_string.h>
54
52
#ifdef HAVE_AIOWAIT
55
#include "drizzled/error.h"
56
#include "drizzled/internal/aio_result.h"
53
#include "mysys_err.h"
54
#include <mysys/aio_result.h>
57
55
static void my_aiowait(my_aio_result *result);
59
#include "drizzled/internal/iocache.h"
57
#include <mysys/iocache.h>
61
59
#include <drizzled/util/test.h>
72
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
73
static int _my_b_read_r(register IO_CACHE *cache, unsigned char *Buffer, size_t Count);
74
static int _my_b_seq_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count);
75
static int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count);
77
60
#define lock_append_buffer(info) \
78
61
pthread_mutex_lock(&(info)->append_buffer_lock)
79
62
#define unlock_append_buffer(info) \
166
int init_io_cache(IO_CACHE *info, int file, size_t cachesize,
149
int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
167
150
enum cache_type type, my_off_t seek_offset,
168
151
bool use_async_io, myf cache_myflags)
170
153
size_t min_cache;
172
155
my_off_t end_of_file= ~(my_off_t) 0;
174
157
info->file= file;
185
pos= lseek(file, 0, SEEK_CUR);
186
if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
168
pos= my_tell(file, MYF(0));
169
if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
189
172
This kind of object doesn't support seek() or tell(). Don't set a
211
195
if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
213
197
/* Calculate end of file to avoid allocating oversized buffers */
214
end_of_file=lseek(file,0L,SEEK_END);
198
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
215
199
/* Need to reset seek_not_done now that we just did a seek. */
216
200
info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
217
201
if (end_of_file < seek_offset)
238
222
if (type == SEQ_READ_APPEND)
239
223
buffer_block *= 2;
240
224
if ((info->buffer=
241
(unsigned char*) malloc(buffer_block)) != 0)
225
(unsigned char*) my_malloc(buffer_block,
226
MYF((cache_myflags & ~ MY_WME) |
227
(cachesize == min_cache ? MY_WME : 0)))) != 0)
243
229
info->write_buffer=info->buffer;
244
230
if (type == SEQ_READ_APPEND)
262
248
info->write_end = info->write_buffer + info->buffer_length;
263
249
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
251
#if defined(SAFE_MUTEX)
254
/* Clear mutex so that safe_mutex will notice that it's not initialized */
255
memset(&info->append_buffer_lock, 0, sizeof(info));
266
259
if (type == WRITE_CACHE)
439
430
1 Error: can't read requested characters
442
static int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
433
int _my_b_read(register IO_CACHE *info, unsigned char *Buffer, size_t Count)
444
435
size_t length,diff_length,left_length, max_length;
445
436
my_off_t pos_in_file;
455
446
/* pos_in_file always point on where info->buffer was read */
456
447
pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
459
450
Whenever a function which operates on IO_CACHE flushes/writes
460
451
some part of the IO_CACHE to disk it will set the property
461
452
"seek_not_done" to indicate this to other functions operating
464
455
if (info->seek_not_done)
466
if ((lseek(info->file,pos_in_file,SEEK_SET) != MY_FILEPOS_ERROR))
457
if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))
458
!= MY_FILEPOS_ERROR))
468
460
/* No error, reset seek_not_done flag. */
469
461
info->seek_not_done= 0;
952
944
if (cache->seek_not_done)
954
if (lseek(cache->file, pos_in_file, SEEK_SET) == MY_FILEPOS_ERROR)
946
if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
956
949
cache->error= -1;
957
950
unlock_io_cache(cache);
1033
1026
while (write_length)
1035
size_t copy_length= min(write_length, write_cache->buffer_length);
1028
size_t copy_length= cmin(write_length, write_cache->buffer_length);
1029
int __attribute__((unused)) rc;
1038
1031
rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1039
1032
/* The writing thread does always have the lock when it awakes. */
1092
1085
With read-append cache we must always do a seek before we read,
1093
1086
because the write could have moved the file pointer astray
1095
if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1088
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1097
1090
info->error= -1;
1098
1091
unlock_append_buffer(info);
1190
1183
TODO: figure out if the assert below is needed or correct.
1192
1185
assert(pos_in_file == info->end_of_file);
1193
copy_len=min(Count, len_in_buff);
1186
copy_len=cmin(Count, len_in_buff);
1194
1187
memcpy(Buffer, info->append_read_pos, copy_len);
1195
1188
info->append_read_pos += copy_len;
1196
1189
Count -= copy_len;
1198
info->error = static_cast<int>(save_count - Count);
1191
info->error = save_count - Count;
1200
1193
/* Fill read buffer with data from write buffer */
1201
1194
memcpy(info->buffer, info->append_read_pos,
1252
1245
my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1253
1246
my_filename(info->file),
1254
1247
info->aio_result.result.aio_errno);
1255
errno=info->aio_result.result.aio_errno;
1248
my_errno=info->aio_result.result.aio_errno;
1256
1249
info->error= -1;
1259
1252
if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1260
1253
read_length == (size_t) -1)
1262
errno=0; /* For testing */
1255
my_errno=0; /* For testing */
1263
1256
info->error= (read_length == (size_t) -1 ? -1 :
1264
1257
(int) (read_length+left_length));
1294
1287
/* Copy found bytes to buffer */
1295
length=min(Count,read_length);
1288
length=cmin(Count,read_length);
1296
1289
memcpy(Buffer,info->read_pos,(size_t) length);
1297
1290
Buffer+=length;
1312
1305
info->error=(int) (read_length+left_length);
1316
if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1309
if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1310
== MY_FILEPOS_ERROR)
1318
1312
info->error= -1;
1325
1319
if ((read_length=my_read(info->file,info->request_pos,
1326
1320
read_length, info->myflags)) == (size_t) -1)
1327
1321
return info->error= -1;
1328
use_length=min(Count,read_length);
1322
use_length=cmin(Count,read_length);
1329
1323
memcpy(Buffer,info->request_pos,(size_t) use_length);
1330
1324
info->read_pos=info->request_pos+Count;
1331
1325
info->read_end=info->request_pos+read_length;
1336
1330
{ /* Didn't find hole block */
1337
1331
if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1338
1332
my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1339
my_filename(info->file),errno);
1333
my_filename(info->file),my_errno);
1340
1334
info->error=(int) (read_length+left_length);
1370
1364
info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1371
1365
if (aioread(info->file,read_buffer, max_length,
1372
(my_off_t) next_pos_in_file,SEEK_SET,
1366
(my_off_t) next_pos_in_file,MY_SEEK_SET,
1373
1367
&info->aio_result.result))
1374
1368
{ /* Skip async io */
1376
1370
if (info->request_pos != info->buffer)
1378
memmove(info->buffer, info->request_pos,
1379
(size_t) (info->read_end - info->read_pos));
1372
memcpy(info->buffer, info->request_pos,
1373
(size_t) (info->read_end - info->read_pos));
1380
1374
info->request_pos=info->buffer;
1381
1375
info->read_pos-=info->read_length;
1382
1376
info->read_end-=info->read_length;
1407
1401
return (int) (unsigned char) buff;
1411
1405
Write a byte buffer to IO_CACHE and flush to disk
1412
1406
if IO_CACHE is full.
1415
1409
1 On error on write
1417
-1 On error; errno contains error code.
1411
-1 On error; my_errno contains error code.
1420
1414
int _my_b_write(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1446
1440
"seek_not_done" to indicate this to other functions operating
1447
1441
on the IO_CACHE.
1449
if (lseek(info->file,info->pos_in_file,SEEK_SET))
1443
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1451
1445
info->error= -1;
1478
Append a block to the write buffer.
1479
This is done with the buffer locked to ensure that we don't read from
1480
the write buffer before we are ready with it.
1483
int my_b_append(register IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1485
size_t rest_length,length;
1488
Assert that we cannot come here with a shared cache. If we do one
1489
day, we might need to add a call to copy_to_read_buffer().
1491
assert(!info->share);
1493
lock_append_buffer(info);
1494
rest_length= (size_t) (info->write_end - info->write_pos);
1495
if (Count <= rest_length)
1497
memcpy(info->write_pos, Buffer, rest_length);
1498
Buffer+=rest_length;
1500
info->write_pos+=rest_length;
1501
if (my_b_flush_io_cache(info,0))
1503
unlock_append_buffer(info);
1506
if (Count >= IO_SIZE)
1507
{ /* Fill first intern buffer */
1508
length=Count & (size_t) ~(IO_SIZE-1);
1509
if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1511
unlock_append_buffer(info);
1512
return info->error= -1;
1516
info->end_of_file+=length;
1520
memcpy(info->write_pos,Buffer,(size_t) Count);
1521
info->write_pos+=Count;
1522
unlock_append_buffer(info);
1527
int my_b_safe_write(IO_CACHE *info, const unsigned char *Buffer, size_t Count)
1530
Sasha: We are not writing this with the ? operator to avoid hitting
1531
a possible compiler bug. At least gcc 2.95 cannot deal with
1532
several layers of ternary operators that evaluated comma(,) operator
1533
expressions inside - I do have a test case if somebody wants it
1535
if (info->type == SEQ_READ_APPEND)
1536
return my_b_append(info, Buffer, Count);
1537
return my_b_write(info, Buffer, Count);
1483
1542
Write a block to disk where part of the data may be inside the record
1484
1543
buffer. As all write calls to the data goes through the cache,
1503
1562
if (pos + Count <= info->pos_in_file)
1504
1563
return (pwrite(info->file, Buffer, Count, pos) == 0);
1505
1564
/* Write the part of the block that is before buffer */
1506
length= (uint32_t) (info->pos_in_file - pos);
1565
length= (uint) (info->pos_in_file - pos);
1507
1566
if (pwrite(info->file, Buffer, length, pos) == 0)
1508
1567
info->error= error= -1;
1509
1568
Buffer+=length;
1511
1570
Count-= length;
1572
info->seek_not_done=1;
1514
1576
/* Check if we want to write inside the used part of the buffer.*/
1579
1641
if (!append_cache && info->seek_not_done)
1580
1642
{ /* File touched, do seek */
1581
if (lseek(info->file,pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
1643
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1583
1646
UNLOCK_APPEND_BUFFER;
1584
1647
return((info->error= -1));
1605
1668
info->end_of_file+=(info->write_pos-info->append_read_pos);
1606
my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
1607
assert(info->end_of_file == tell_ret);
1669
assert(info->end_of_file == my_tell(info->file,MYF(0)));
1610
1672
info->append_read_pos=info->write_pos=info->write_buffer;
1673
++info->disk_writes;
1611
1674
UNLOCK_APPEND_BUFFER;
1612
1675
return(info->error);
1721
1784
char* block, *block_end;
1722
1785
MY_INIT(argv[0]);
1723
1786
max_block = cache_size*3;
1724
if (!(block=(char*)malloc(max_block)))
1787
if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1725
1788
die("Not enough memory to allocate test block");
1726
1789
block_end = block + max_block;
1727
1790
for (p = block,i=0; p < block_end;i++)