19
19
#include <unistd.h>
26
using namespace drizzled;
21
28
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
24
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
25
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
26
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
27
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
28
#define COMMENT 0x10 /* bit 4 set: file comment present */
29
#define RESERVED 0xE0 /* bits 5..7: reserved */
31
30
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
32
31
static int azrewind (azio_stream *s);
33
32
static unsigned int azio_enable_aio(azio_stream *s);
34
33
static int do_flush(azio_stream *file, int flush);
35
34
static int get_byte(azio_stream *s);
36
35
static void check_header(azio_stream *s);
37
static void write_header(azio_stream *s);
36
static int write_header(azio_stream *s);
38
37
static int destroy(azio_stream *s);
39
38
static void putLong(azio_stream *s, uLong x);
40
39
static uLong getLong(azio_stream *s);
44
43
static void do_aio_cleanup(azio_stream *s);
47
static pthread_handler_t run_task(void *p)
46
extern "C" pthread_handler_t run_task(void *p);
48
extern "C" pthread_handler_t run_task(void *p)
52
azio_stream *s= (azio_stream *)p;
53
azio_stream *s= (azio_stream *)p;
63
62
offset= s->container.offset;
64
63
fd= s->container.fd;
65
buffer= s->container.buffer;
64
buffer= (char *)s->container.buffer;
66
65
pthread_mutex_unlock(&s->container.thresh_mutex);
68
67
if (s->container.ready == AZ_THREAD_DEAD)
76
75
pthread_mutex_unlock(&s->container.thresh_mutex);
84
81
static void azio_kill(azio_stream *s)
86
83
pthread_mutex_lock(&s->container.thresh_mutex);
87
s->container.ready= AZ_THREAD_DEAD;
84
s->container.ready= AZ_THREAD_DEAD;
88
85
pthread_mutex_unlock(&s->container.thresh_mutex);
90
87
pthread_cond_signal(&s->container.threshhold);
91
pthread_join(s->container.mainthread, (void *)NULL);
88
pthread_join(s->container.mainthread, NULL);
94
91
static size_t azio_return(azio_stream *s)
126
123
pthread_attr_init(&attr);
127
124
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
129
s->container.ready= AZ_THREAD_FINISHED;
126
s->container.ready= AZ_THREAD_FINISHED;
131
128
/* If we don't create a thread, signal the caller */
132
129
if (pthread_create(&s->container.mainthread, &attr, run_task,
141
138
static int azio_read(azio_stream *s)
143
140
pthread_mutex_lock(&s->container.thresh_mutex);
144
s->container.ready= AZ_THREAD_ACTIVE;
141
s->container.ready= AZ_THREAD_ACTIVE;
145
142
pthread_mutex_unlock(&s->container.thresh_mutex);
146
143
pthread_cond_broadcast(&s->container.threshhold);
187
184
s->method= method;
190
We do our own version of append by nature.
187
We do our own version of append by nature.
191
188
We must always have write access to take card of the header.
193
190
assert(Flags | O_APPEND);
194
191
assert(Flags | O_WRONLY);
201
198
err = deflateInit2(&(s->stream), level,
202
199
Z_DEFLATED, -MAX_WBITS, 8, strategy);
228
225
s->stream.avail_out = AZ_BUFSIZE_WRITE;
231
s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
228
s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
233
230
s->container.fd= s->file;
242
if (Flags & O_CREAT || Flags & O_TRUNC)
239
if (Flags & O_CREAT || Flags & O_TRUNC)
245
242
s->forced_flushes= 0;
253
250
s->frm_length= 0;
254
251
s->dirty= 1; /* We create the file dirty */
255
252
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
257
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
255
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
259
else if (s->mode == 'w')
257
else if (s->mode == 'w')
261
uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
259
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
260
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
261
if(pread(s->file, buffer, read_size, 0) < read_size)
263
263
read_header(s, buffer);
264
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
264
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
306
306
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
307
307
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
308
308
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
309
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
309
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
314
314
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
315
315
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
316
int4store(ptr+ AZ_FRM_POS,
316
int4store(ptr+ AZ_FRM_POS,
317
317
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
318
318
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
320
320
/* Always begin at the begining, and end there as well */
321
pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
321
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
322
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
324
328
/* ===========================================================================
327
331
IN assertion: the stream s has been sucessfully opened for reading.
333
int get_byte(azio_stream *s)
332
335
if (s->z_eof) return EOF;
333
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
339
if (s->stream.avail_in == 0)
368
371
if (len) s->inbuf[0] = s->stream.next_in[0];
370
len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
373
len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
372
375
if (len == (uInt)-1) s->z_err = Z_ERRNO;
373
376
s->stream.avail_in += len;
402
405
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
403
406
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
404
407
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
405
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
406
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
407
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
408
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
408
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
409
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
410
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
411
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
409
412
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
410
413
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
411
414
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
425
428
* Cleanup then free the given azio_stream. Return a zlib error code.
426
429
Try freeing in the reverse order of allocations.
431
int destroy (azio_stream *s)
433
if (s->stream.state != NULL)
435
if (s->stream.state != NULL)
437
439
err = deflateEnd(&(s->stream));
438
my_sync(s->file, MYF(0));
440
internal::my_sync(s->file, MYF(0));
440
else if (s->mode == 'r')
442
else if (s->mode == 'r')
441
443
err = inflateEnd(&(s->stream));
444
446
do_aio_cleanup(s);
446
if (s->file > 0 && my_close(s->file, MYF(0)))
448
if (s->file > 0 && internal::my_close(s->file, MYF(0)))
457
459
Reads the given number of uncompressed bytes from the compressed file.
458
460
azread returns the number of bytes actually read (0 for end of file).
460
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
463
This function is legacy, do not use.
465
Reads the given number of uncompressed bytes from the compressed file.
466
If the input file was not in gzip format, gzread copies the given number
467
of bytes into the buffer.
468
gzread returns the number of uncompressed bytes actually read (0 for
469
end of file, -1 for error).
471
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
462
473
Bytef *start = (Bytef*)buf; /* starting point for crc computation */
463
474
Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */
466
477
if (s->mode != 'r')
468
479
*error= Z_STREAM_ERROR;
472
483
if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
474
485
*error= s->z_err;
478
489
if (s->z_err == Z_STREAM_END) /* EOF */
556
567
/* ===========================================================================
557
Experimental Interface. We abstract out a concecpt of rows
568
Experimental Interface. We abstract out a concecpt of rows
559
570
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
617
628
s->stream.next_in = (Bytef*)buf;
618
629
s->stream.avail_in = len;
620
while (s->stream.avail_in != 0)
631
while (s->stream.avail_in != 0)
622
if (s->stream.avail_out == 0)
633
if (s->stream.avail_out == 0)
625
636
s->stream.next_out = s->outbuf;
626
if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
637
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
628
639
s->z_err = Z_ERRNO;
659
670
s->stream.avail_in = 0; /* should be zero already anyway */
663
674
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
667
if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len)
678
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
669
680
s->z_err = Z_ERRNO;
697
708
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
699
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
710
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
702
714
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
705
717
static unsigned int azio_enable_aio(azio_stream *s)
707
VOID(pthread_cond_init(&s->container.threshhold, NULL));
708
VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
719
pthread_cond_init(&s->container.threshhold, NULL);
720
pthread_mutex_init(&s->container.thresh_mutex, NULL);
718
VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
719
VOID(pthread_cond_destroy(&s->container.threshhold));
730
pthread_mutex_destroy(&s->container.thresh_mutex);
731
pthread_cond_destroy(&s->container.threshhold);
721
733
s->method= AZ_METHOD_BLOCK;
724
int ZEXPORT azflush (s, flush)
736
int ZEXPORT azflush (azio_stream *s,int flush)
732
742
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
733
pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
743
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
744
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
734
746
read_header(s, buffer); /* skip the .az header */
807
818
SEEK_END is not implemented, returns error.
808
819
In this version of the library, azseek can be extremely slow.
810
size_t azseek (s, offset, whence)
821
size_t azseek (azio_stream *s, size_t offset, int whence)
816
824
if (s == NULL || whence == SEEK_END ||
817
825
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
823
if (whence == SEEK_SET)
831
if (whence == SEEK_SET)
826
834
/* At this point, offset is the number of zero bytes to write. */
827
835
/* There was a zmemzero here if inbuf was null -Brian */
830
838
uInt size = AZ_BUFSIZE_READ;
831
839
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
833
841
size = azwrite(s, s->inbuf, size);
834
if (size == 0) return -1L;
864
873
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
866
875
size = azread_internal(s, s->outbuf, size, &error);
867
if (error < 0) return -1L;
876
if (error < 0) return SIZE_MAX;
875
884
given compressed file. This position represents a number of bytes in the
876
885
uncompressed data stream.
878
size_t ZEXPORT aztell (file)
887
size_t ZEXPORT aztell (azio_stream *file)
881
889
return azseek(file, 0L, SEEK_CUR);
888
896
void putLong (azio_stream *s, uLong x)
899
unsigned char buffer[1];
893
for (n = 0; n < 4; n++)
901
for (n = 0; n < 4; n++)
895
903
buffer[0]= (int)(x & 0xff);
896
pwrite(s->file, buffer, 1, s->pos);
904
size_t ret= pwrite(s->file, buffer, 1, s->pos);
962
Though this was added to support MySQL's FRM file, anything can be
971
Though this was added to support MySQL's FRM file, anything can be
963
972
stored in this location.
965
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
974
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
973
982
s->frm_start_pos= (uint) s->start;
974
983
s->frm_length= length;
975
984
s->start+= length;
977
pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
986
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
980
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
990
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
985
995
int azread_frm(azio_stream *s, char *blob)
987
pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
997
ssize_t r= pread(s->file, (unsigned char*) blob,
998
s->frm_length, s->frm_start_pos);
999
if (r != (ssize_t)s->frm_length)
994
1007
Simple comment field
996
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1009
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
1004
1017
s->comment_start_pos= (uint) s->start;
1005
1018
s->comment_length= length;
1006
1019
s->start+= length;
1008
pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1021
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1022
s->comment_length, s->comment_start_pos);
1023
if (r != (ssize_t)s->comment_length)
1010
1026
write_header(s);
1011
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1027
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1016
1032
int azread_comment(azio_stream *s, char *blob)
1018
pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1034
ssize_t r= pread(s->file, (unsigned char*) blob,
1035
s->comment_length, s->comment_start_pos);
1036
if (r != (ssize_t)s->comment_length)
1035
1054
Normally all IO goes through azio_read(), but in case of error or non-support
1036
1055
we make use of pread().
1038
1057
static void get_block(azio_stream *s)
1040
1059
#ifdef AZIO_AIO
1041
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1060
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1042
1061
&& s->pos < s->check_point
1043
1062
&& s->aio_inited)
1054
1073
s->pos+= s->stream.avail_in;
1055
1074
s->inbuf= (Byte *)s->container.buffer;
1056
/* We only aio_read when we know there is more data to be read */
1075
/* We only azio_read when we know there is more data to be read */
1057
1076
if (s->pos >= s->check_point)
1059
1078
s->aio_inited= 0;
1069
1088
#ifdef AZIO_AIO
1072
s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
1091
s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1073
1092
AZ_BUFSIZE_READ, s->pos);
1074
1093
s->pos+= s->stream.avail_in;