19
19
#include <unistd.h>
26
using namespace drizzled;
28
21
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
24
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
25
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
26
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
27
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
28
#define COMMENT 0x10 /* bit 4 set: file comment present */
29
#define RESERVED 0xE0 /* bits 5..7: reserved */
30
31
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
31
32
static int azrewind (azio_stream *s);
32
33
static unsigned int azio_enable_aio(azio_stream *s);
33
34
static int do_flush(azio_stream *file, int flush);
34
35
static int get_byte(azio_stream *s);
35
36
static void check_header(azio_stream *s);
36
static int write_header(azio_stream *s);
37
static void write_header(azio_stream *s);
37
38
static int destroy(azio_stream *s);
38
39
static void putLong(azio_stream *s, uLong x);
39
40
static uLong getLong(azio_stream *s);
43
44
static void do_aio_cleanup(azio_stream *s);
46
extern "C" pthread_handler_t run_task(void *p);
48
extern "C" pthread_handler_t run_task(void *p)
47
static pthread_handler_t run_task(void *p)
53
azio_stream *s= (azio_stream *)p;
52
azio_stream *s= (azio_stream *)p;
62
63
offset= s->container.offset;
63
64
fd= s->container.fd;
64
buffer= (char *)s->container.buffer;
65
buffer= s->container.buffer;
65
66
pthread_mutex_unlock(&s->container.thresh_mutex);
67
68
if (s->container.ready == AZ_THREAD_DEAD)
75
76
pthread_mutex_unlock(&s->container.thresh_mutex);
81
84
static void azio_kill(azio_stream *s)
83
86
pthread_mutex_lock(&s->container.thresh_mutex);
84
s->container.ready= AZ_THREAD_DEAD;
87
s->container.ready= AZ_THREAD_DEAD;
85
88
pthread_mutex_unlock(&s->container.thresh_mutex);
87
90
pthread_cond_signal(&s->container.threshhold);
88
pthread_join(s->container.mainthread, NULL);
91
pthread_join(s->container.mainthread, (void *)NULL);
91
94
static size_t azio_return(azio_stream *s)
123
126
pthread_attr_init(&attr);
124
127
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
126
s->container.ready= AZ_THREAD_FINISHED;
129
s->container.ready= AZ_THREAD_FINISHED;
128
131
/* If we don't create a thread, signal the caller */
129
132
if (pthread_create(&s->container.mainthread, &attr, run_task,
138
141
static int azio_read(azio_stream *s)
140
143
pthread_mutex_lock(&s->container.thresh_mutex);
141
s->container.ready= AZ_THREAD_ACTIVE;
144
s->container.ready= AZ_THREAD_ACTIVE;
142
145
pthread_mutex_unlock(&s->container.thresh_mutex);
143
146
pthread_cond_broadcast(&s->container.threshhold);
184
187
s->method= method;
187
We do our own version of append by nature.
190
We do our own version of append by nature.
188
191
We must always have write access to take card of the header.
190
193
assert(Flags | O_APPEND);
191
194
assert(Flags | O_WRONLY);
198
201
err = deflateInit2(&(s->stream), level,
199
202
Z_DEFLATED, -MAX_WBITS, 8, strategy);
225
228
s->stream.avail_out = AZ_BUFSIZE_WRITE;
228
s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
231
s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
230
233
s->container.fd= s->file;
239
if (Flags & O_CREAT || Flags & O_TRUNC)
242
if (Flags & O_CREAT || Flags & O_TRUNC)
242
245
s->forced_flushes= 0;
250
253
s->frm_length= 0;
251
254
s->dirty= 1; /* We create the file dirty */
252
255
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
255
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
257
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
257
else if (s->mode == 'w')
259
else if (s->mode == 'w')
259
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
260
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
261
if(pread(s->file, buffer, read_size, 0) < read_size)
261
uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
263
263
read_header(s, buffer);
264
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
264
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
306
306
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
307
307
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
308
308
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
309
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
309
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
314
314
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
315
315
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
316
int4store(ptr+ AZ_FRM_POS,
316
int4store(ptr+ AZ_FRM_POS,
317
317
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
318
318
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
320
320
/* Always begin at the begining, and end there as well */
321
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
322
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
321
pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
328
324
/* ===========================================================================
331
327
IN assertion: the stream s has been sucessfully opened for reading.
333
int get_byte(azio_stream *s)
335
332
if (s->z_eof) return EOF;
336
if (s->stream.avail_in == 0)
333
if (s->stream.avail_in == 0)
339
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
371
368
if (len) s->inbuf[0] = s->stream.next_in[0];
373
len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
370
len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
375
372
if (len == (uInt)-1) s->z_err = Z_ERRNO;
376
373
s->stream.avail_in += len;
405
402
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
406
403
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
407
404
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
408
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
409
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
410
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
411
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
405
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
406
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
407
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
408
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
412
409
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
413
410
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
414
411
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
428
425
* Cleanup then free the given azio_stream. Return a zlib error code.
429
426
Try freeing in the reverse order of allocations.
431
int destroy (azio_stream *s)
435
if (s->stream.state != NULL)
433
if (s->stream.state != NULL)
439
437
err = deflateEnd(&(s->stream));
440
internal::my_sync(s->file, MYF(0));
438
my_sync(s->file, MYF(0));
442
else if (s->mode == 'r')
440
else if (s->mode == 'r')
443
441
err = inflateEnd(&(s->stream));
446
444
do_aio_cleanup(s);
448
if (s->file > 0 && internal::my_close(s->file, MYF(0)))
446
if (s->file > 0 && my_close(s->file, MYF(0)))
459
457
Reads the given number of uncompressed bytes from the compressed file.
460
458
azread returns the number of bytes actually read (0 for end of file).
463
This function is legacy, do not use.
465
Reads the given number of uncompressed bytes from the compressed file.
466
If the input file was not in gzip format, gzread copies the given number
467
of bytes into the buffer.
468
gzread returns the number of uncompressed bytes actually read (0 for
469
end of file, -1 for error).
471
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
460
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
473
462
Bytef *start = (Bytef*)buf; /* starting point for crc computation */
474
463
Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */
477
466
if (s->mode != 'r')
479
468
*error= Z_STREAM_ERROR;
483
472
if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
485
474
*error= s->z_err;
489
478
if (s->z_err == Z_STREAM_END) /* EOF */
567
556
/* ===========================================================================
568
Experimental Interface. We abstract out a concecpt of rows
557
Experimental Interface. We abstract out a concecpt of rows
570
559
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
628
617
s->stream.next_in = (Bytef*)buf;
629
618
s->stream.avail_in = len;
631
while (s->stream.avail_in != 0)
620
while (s->stream.avail_in != 0)
633
if (s->stream.avail_out == 0)
622
if (s->stream.avail_out == 0)
636
625
s->stream.next_out = s->outbuf;
637
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
626
if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
639
628
s->z_err = Z_ERRNO;
670
659
s->stream.avail_in = 0; /* should be zero already anyway */
674
663
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
678
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
667
if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len)
680
669
s->z_err = Z_ERRNO;
708
697
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
710
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
699
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
714
702
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
733
721
s->method= AZ_METHOD_BLOCK;
736
int ZEXPORT azflush (azio_stream *s,int flush)
724
int ZEXPORT azflush (s, flush)
742
732
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
743
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
744
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
733
pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
746
734
read_header(s, buffer); /* skip the .az header */
818
807
SEEK_END is not implemented, returns error.
819
808
In this version of the library, azseek can be extremely slow.
821
size_t azseek (azio_stream *s, size_t offset, int whence)
810
size_t azseek (s, offset, whence)
824
816
if (s == NULL || whence == SEEK_END ||
825
817
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
831
if (whence == SEEK_SET)
823
if (whence == SEEK_SET)
834
826
/* At this point, offset is the number of zero bytes to write. */
835
827
/* There was a zmemzero here if inbuf was null -Brian */
838
830
uInt size = AZ_BUFSIZE_READ;
839
831
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
841
833
size = azwrite(s, s->inbuf, size);
834
if (size == 0) return -1L;
873
864
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
875
866
size = azread_internal(s, s->outbuf, size, &error);
876
if (error < 0) return SIZE_MAX;
867
if (error < 0) return -1L;
884
875
given compressed file. This position represents a number of bytes in the
885
876
uncompressed data stream.
887
size_t ZEXPORT aztell (azio_stream *file)
878
size_t ZEXPORT aztell (file)
889
881
return azseek(file, 0L, SEEK_CUR);
896
888
void putLong (azio_stream *s, uLong x)
899
unsigned char buffer[1];
901
for (n = 0; n < 4; n++)
893
for (n = 0; n < 4; n++)
903
895
buffer[0]= (int)(x & 0xff);
904
size_t ret= pwrite(s->file, buffer, 1, s->pos);
896
pwrite(s->file, buffer, 1, s->pos);
971
Though this was added to support MySQL's FRM file, anything can be
962
Though this was added to support MySQL's FRM file, anything can be
972
963
stored in this location.
974
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
965
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
982
973
s->frm_start_pos= (uint) s->start;
983
974
s->frm_length= length;
984
975
s->start+= length;
986
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
977
pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
990
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
980
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
995
985
int azread_frm(azio_stream *s, char *blob)
997
ssize_t r= pread(s->file, (unsigned char*) blob,
998
s->frm_length, s->frm_start_pos);
999
if (r != (ssize_t)s->frm_length)
987
pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
1007
994
Simple comment field
1009
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
996
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1017
1004
s->comment_start_pos= (uint) s->start;
1018
1005
s->comment_length= length;
1019
1006
s->start+= length;
1021
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1022
s->comment_length, s->comment_start_pos);
1023
if (r != (ssize_t)s->comment_length)
1008
pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1026
1010
write_header(s);
1027
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1011
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1032
1016
int azread_comment(azio_stream *s, char *blob)
1034
ssize_t r= pread(s->file, (unsigned char*) blob,
1035
s->comment_length, s->comment_start_pos);
1036
if (r != (ssize_t)s->comment_length)
1018
pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1054
1035
Normally all IO goes through azio_read(), but in case of error or non-support
1055
1036
we make use of pread().
1057
1038
static void get_block(azio_stream *s)
1059
1040
#ifdef AZIO_AIO
1060
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1041
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1061
1042
&& s->pos < s->check_point
1062
1043
&& s->aio_inited)
1073
1054
s->pos+= s->stream.avail_in;
1074
1055
s->inbuf= (Byte *)s->container.buffer;
1075
/* We only azio_read when we know there is more data to be read */
1056
/* We only aio_read when we know there is more data to be read */
1076
1057
if (s->pos >= s->check_point)
1078
1059
s->aio_inited= 0;
1088
1069
#ifdef AZIO_AIO
1091
s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1072
s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
1092
1073
AZ_BUFSIZE_READ, s->pos);
1093
1074
s->pos+= s->stream.avail_in;