26
using namespace drizzled;
20
/* TODO: For some reason these aren't showing up cleanly in the
24
extern ssize_t pread (int __fd, void *__buf, size_t __nbytes,
28
extern ssize_t pwrite (int __fd, __const void *__buf, size_t __n,
28
32
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
35
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
36
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
37
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
38
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
39
#define COMMENT 0x10 /* bit 4 set: file comment present */
40
#define RESERVED 0xE0 /* bits 5..7: reserved */
30
42
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
31
43
static int azrewind (azio_stream *s);
32
44
static unsigned int azio_enable_aio(azio_stream *s);
33
45
static int do_flush(azio_stream *file, int flush);
34
46
static int get_byte(azio_stream *s);
35
47
static void check_header(azio_stream *s);
36
static int write_header(azio_stream *s);
48
static void write_header(azio_stream *s);
37
49
static int destroy(azio_stream *s);
38
50
static void putLong(azio_stream *s, uLong x);
39
51
static uLong getLong(azio_stream *s);
43
55
static void do_aio_cleanup(azio_stream *s);
46
extern "C" pthread_handler_t run_task(void *p);
48
extern "C" pthread_handler_t run_task(void *p)
58
static pthread_handler_t run_task(void *p)
53
azio_stream *s= (azio_stream *)p;
63
azio_stream *s= (azio_stream *)p;
55
internal::my_thread_init();
64
74
offset= s->container.offset;
65
75
fd= s->container.fd;
66
buffer= (char *)s->container.buffer;
76
buffer= s->container.buffer;
67
77
pthread_mutex_unlock(&s->container.thresh_mutex);
69
79
if (s->container.ready == AZ_THREAD_DEAD)
85
95
static void azio_kill(azio_stream *s)
87
97
pthread_mutex_lock(&s->container.thresh_mutex);
88
s->container.ready= AZ_THREAD_DEAD;
98
s->container.ready= AZ_THREAD_DEAD;
89
99
pthread_mutex_unlock(&s->container.thresh_mutex);
91
101
pthread_cond_signal(&s->container.threshhold);
92
pthread_join(s->container.mainthread, NULL);
102
pthread_join(s->container.mainthread, (void *)NULL);
95
105
static size_t azio_return(azio_stream *s)
127
137
pthread_attr_init(&attr);
128
138
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
130
s->container.ready= AZ_THREAD_FINISHED;
140
s->container.ready= AZ_THREAD_FINISHED;
132
142
/* If we don't create a thread, signal the caller */
133
143
if (pthread_create(&s->container.mainthread, &attr, run_task,
142
152
static int azio_read(azio_stream *s)
144
154
pthread_mutex_lock(&s->container.thresh_mutex);
145
s->container.ready= AZ_THREAD_ACTIVE;
155
s->container.ready= AZ_THREAD_ACTIVE;
146
156
pthread_mutex_unlock(&s->container.thresh_mutex);
147
157
pthread_cond_broadcast(&s->container.threshhold);
188
198
s->method= method;
191
We do our own version of append by nature.
201
We do our own version of append by nature.
192
202
We must always have write access to take card of the header.
194
assert(Flags | O_APPEND);
195
assert(Flags | O_WRONLY);
204
DBUG_ASSERT(Flags | O_APPEND);
205
DBUG_ASSERT(Flags | O_WRONLY);
202
212
err = deflateInit2(&(s->stream), level,
203
213
Z_DEFLATED, -MAX_WBITS, 8, strategy);
229
239
s->stream.avail_out = AZ_BUFSIZE_WRITE;
232
s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
242
s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
234
244
s->container.fd= s->file;
243
if (Flags & O_CREAT || Flags & O_TRUNC)
253
if (Flags & O_CREAT || Flags & O_TRUNC)
246
256
s->forced_flushes= 0;
254
264
s->frm_length= 0;
255
265
s->dirty= 1; /* We create the file dirty */
256
266
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
259
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
268
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
261
else if (s->mode == 'w')
270
else if (s->mode == 'w')
263
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
264
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
265
if(pread(s->file, buffer, read_size, 0) < read_size)
272
uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
273
pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
267
274
read_header(s, buffer);
268
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
275
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
310
317
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
311
318
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
312
319
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
313
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
314
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
315
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
316
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
317
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
320
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
321
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
322
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
323
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
324
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
318
325
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
319
326
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
320
int4store(ptr+ AZ_FRM_POS,
327
int4store(ptr+ AZ_FRM_POS,
321
328
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
322
329
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
324
331
/* Always begin at the begining, and end there as well */
325
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
326
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
332
pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
332
335
/* ===========================================================================
335
338
IN assertion: the stream s has been sucessfully opened for reading.
337
int get_byte(azio_stream *s)
339
343
if (s->z_eof) return EOF;
340
if (s->stream.avail_in == 0)
344
if (s->stream.avail_in == 0)
343
if (s->stream.avail_in == 0)
347
if (s->stream.avail_in == 0)
375
379
if (len) s->inbuf[0] = s->stream.next_in[0];
377
len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
381
len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
379
383
if (len == (uInt)-1) s->z_err = Z_ERRNO;
380
384
s->stream.avail_in += len;
409
413
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
410
414
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
411
415
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
412
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
413
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
414
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
415
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
416
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
417
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
418
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
419
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
416
420
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
417
421
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
418
422
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
432
436
* Cleanup then free the given azio_stream. Return a zlib error code.
433
437
Try freeing in the reverse order of allocations.
435
int destroy (azio_stream *s)
439
if (s->stream.state != NULL)
444
if (s->stream.state != NULL)
443
448
err = deflateEnd(&(s->stream));
444
internal::my_sync(s->file, MYF(0));
449
my_sync(s->file, MYF(0));
446
else if (s->mode == 'r')
451
else if (s->mode == 'r')
447
452
err = inflateEnd(&(s->stream));
450
455
do_aio_cleanup(s);
452
if (s->file > 0 && internal::my_close(s->file, MYF(0)))
457
if (s->file > 0 && my_close(s->file, MYF(0)))
463
468
Reads the given number of uncompressed bytes from the compressed file.
464
469
azread returns the number of bytes actually read (0 for end of file).
467
This function is legacy, do not use.
469
Reads the given number of uncompressed bytes from the compressed file.
470
If the input file was not in gzip format, gzread copies the given number
471
of bytes into the buffer.
472
gzread returns the number of uncompressed bytes actually read (0 for
473
end of file, -1 for error).
475
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
471
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
477
473
Bytef *start = (Bytef*)buf; /* starting point for crc computation */
478
474
Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */
481
477
if (s->mode != 'r')
483
479
*error= Z_STREAM_ERROR;
487
483
if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
489
485
*error= s->z_err;
493
489
if (s->z_err == Z_STREAM_END) /* EOF */
571
567
/* ===========================================================================
572
Experimental Interface. We abstract out a concecpt of rows
568
Experimental Interface. We abstract out a concecpt of rows
574
570
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
632
628
s->stream.next_in = (Bytef*)buf;
633
629
s->stream.avail_in = len;
635
while (s->stream.avail_in != 0)
631
while (s->stream.avail_in != 0)
637
if (s->stream.avail_out == 0)
633
if (s->stream.avail_out == 0)
640
636
s->stream.next_out = s->outbuf;
641
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
637
if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
643
639
s->z_err = Z_ERRNO;
674
670
s->stream.avail_in = 0; /* should be zero already anyway */
678
674
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
682
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
678
if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len)
684
680
s->z_err = Z_ERRNO;
712
708
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
714
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
710
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
718
713
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
721
716
static unsigned int azio_enable_aio(azio_stream *s)
723
pthread_cond_init(&s->container.threshhold, NULL);
724
pthread_mutex_init(&s->container.thresh_mutex, NULL);
718
VOID(pthread_cond_init(&s->container.threshhold, NULL));
719
VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
734
pthread_mutex_destroy(&s->container.thresh_mutex);
735
pthread_cond_destroy(&s->container.threshhold);
729
VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
730
VOID(pthread_cond_destroy(&s->container.threshhold));
737
732
s->method= AZ_METHOD_BLOCK;
740
int ZEXPORT azflush (azio_stream *s,int flush)
735
int ZEXPORT azflush (s, flush)
746
743
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
747
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
748
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
744
pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
750
745
read_header(s, buffer); /* skip the .az header */
822
818
SEEK_END is not implemented, returns error.
823
819
In this version of the library, azseek can be extremely slow.
825
size_t azseek (azio_stream *s, size_t offset, int whence)
821
size_t azseek (s, offset, whence)
828
827
if (s == NULL || whence == SEEK_END ||
829
828
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
835
if (whence == SEEK_SET)
834
if (whence == SEEK_SET)
838
837
/* At this point, offset is the number of zero bytes to write. */
839
838
/* There was a zmemzero here if inbuf was null -Brian */
842
841
uInt size = AZ_BUFSIZE_READ;
843
842
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
845
844
size = azwrite(s, s->inbuf, size);
845
if (size == 0) return -1L;
877
875
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
879
877
size = azread_internal(s, s->outbuf, size, &error);
880
if (error < 0) return SIZE_MAX;
878
if (error < 0) return -1L;
888
886
given compressed file. This position represents a number of bytes in the
889
887
uncompressed data stream.
891
size_t ZEXPORT aztell (azio_stream *file)
889
size_t ZEXPORT aztell (file)
893
892
return azseek(file, 0L, SEEK_CUR);
900
899
void putLong (azio_stream *s, uLong x)
903
unsigned char buffer[1];
905
for (n = 0; n < 4; n++)
904
for (n = 0; n < 4; n++)
907
906
buffer[0]= (int)(x & 0xff);
908
size_t ret= pwrite(s->file, buffer, 1, s->pos);
907
pwrite(s->file, buffer, 1, s->pos);
975
Though this was added to support MySQL's FRM file, anything can be
973
Though this was added to support MySQL's FRM file, anything can be
976
974
stored in this location.
978
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
976
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
986
984
s->frm_start_pos= (uint) s->start;
987
985
s->frm_length= length;
988
986
s->start+= length;
990
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
988
pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
994
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
991
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
999
996
int azread_frm(azio_stream *s, char *blob)
1001
ssize_t r= pread(s->file, (unsigned char*) blob,
1002
s->frm_length, s->frm_start_pos);
1003
if (r != (ssize_t)s->frm_length)
998
pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
1011
1005
Simple comment field
1013
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
1007
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1021
1015
s->comment_start_pos= (uint) s->start;
1022
1016
s->comment_length= length;
1023
1017
s->start+= length;
1025
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1026
s->comment_length, s->comment_start_pos);
1027
if (r != (ssize_t)s->comment_length)
1019
pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1030
1021
write_header(s);
1031
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1022
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1036
1027
int azread_comment(azio_stream *s, char *blob)
1038
ssize_t r= pread(s->file, (unsigned char*) blob,
1039
s->comment_length, s->comment_start_pos);
1040
if (r != (ssize_t)s->comment_length)
1029
pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1058
1046
Normally all IO goes through azio_read(), but in case of error or non-support
1059
1047
we make use of pread().
1061
1049
static void get_block(azio_stream *s)
1063
1051
#ifdef AZIO_AIO
1064
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1052
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1065
1053
&& s->pos < s->check_point
1066
1054
&& s->aio_inited)
1077
1065
s->pos+= s->stream.avail_in;
1078
1066
s->inbuf= (Byte *)s->container.buffer;
1079
/* We only azio_read when we know there is more data to be read */
1067
/* We only aio_read when we know there is more data to be read */
1080
1068
if (s->pos >= s->check_point)
1082
1070
s->aio_inited= 0;
1092
1080
#ifdef AZIO_AIO
1095
s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1083
s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
1096
1084
AZ_BUFSIZE_READ, s->pos);
1097
1085
s->pos+= s->stream.avail_in;