17
17
#include <string.h>
18
19
#include <assert.h>
19
20
#include <unistd.h>
21
22
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
23
/* gzip flag unsigned char */
24
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
25
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
26
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
27
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
28
#define COMMENT 0x10 /* bit 4 set: file comment present */
29
#define RESERVED 0xE0 /* bits 5..7: reserved */
31
24
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
32
25
static int azrewind (azio_stream *s);
33
26
static unsigned int azio_enable_aio(azio_stream *s);
34
27
static int do_flush(azio_stream *file, int flush);
35
28
static int get_byte(azio_stream *s);
36
29
static void check_header(azio_stream *s);
37
static void write_header(azio_stream *s);
30
static int write_header(azio_stream *s);
38
31
static int destroy(azio_stream *s);
39
32
static void putLong(azio_stream *s, uLong x);
40
33
static uLong getLong(azio_stream *s);
44
37
static void do_aio_cleanup(azio_stream *s);
47
static pthread_handler_t run_task(void *p)
41
pthread_handler_t run_task(void *p)
52
azio_stream *s= (azio_stream *)p;
46
azio_stream *s= (azio_stream *)p;
63
57
offset= s->container.offset;
64
58
fd= s->container.fd;
65
buffer= s->container.buffer;
59
buffer= (char *)s->container.buffer;
66
60
pthread_mutex_unlock(&s->container.thresh_mutex);
68
62
if (s->container.ready == AZ_THREAD_DEAD)
84
78
static void azio_kill(azio_stream *s)
86
80
pthread_mutex_lock(&s->container.thresh_mutex);
87
s->container.ready= AZ_THREAD_DEAD;
81
s->container.ready= AZ_THREAD_DEAD;
88
82
pthread_mutex_unlock(&s->container.thresh_mutex);
90
84
pthread_cond_signal(&s->container.threshhold);
91
pthread_join(s->container.mainthread, (void *)NULL);
85
pthread_join(s->container.mainthread, NULL);
94
88
static size_t azio_return(azio_stream *s)
126
120
pthread_attr_init(&attr);
127
121
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
129
s->container.ready= AZ_THREAD_FINISHED;
123
s->container.ready= AZ_THREAD_FINISHED;
131
125
/* If we don't create a thread, signal the caller */
132
126
if (pthread_create(&s->container.mainthread, &attr, run_task,
141
135
static int azio_read(azio_stream *s)
143
137
pthread_mutex_lock(&s->container.thresh_mutex);
144
s->container.ready= AZ_THREAD_ACTIVE;
138
s->container.ready= AZ_THREAD_ACTIVE;
145
139
pthread_mutex_unlock(&s->container.thresh_mutex);
146
140
pthread_cond_broadcast(&s->container.threshhold);
187
181
s->method= method;
190
We do our own version of append by nature.
184
We do our own version of append by nature.
191
185
We must always have write access to take card of the header.
193
187
assert(Flags | O_APPEND);
194
188
assert(Flags | O_WRONLY);
201
195
err = deflateInit2(&(s->stream), level,
202
196
Z_DEFLATED, -MAX_WBITS, 8, strategy);
253
247
s->frm_length= 0;
254
248
s->dirty= 1; /* We create the file dirty */
255
249
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
257
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
252
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
259
else if (s->mode == 'w')
254
else if (s->mode == 'w')
261
256
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
257
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
258
if(pread(s->file, buffer, read_size, 0) < read_size)
263
260
read_header(s, buffer);
264
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
261
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
306
303
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
307
304
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
308
305
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
309
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
306
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
307
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
308
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
309
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
314
311
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
315
312
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
316
int4store(ptr+ AZ_FRM_POS,
313
int4store(ptr+ AZ_FRM_POS,
317
314
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
318
315
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
320
317
/* Always begin at the begining, and end there as well */
321
pwrite(s->file, (unsigned char*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
318
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
319
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
324
325
/* ===========================================================================
327
328
IN assertion: the stream s has been sucessfully opened for reading.
330
int get_byte(azio_stream *s)
332
332
if (s->z_eof) return EOF;
333
if (s->stream.avail_in == 0)
333
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
402
402
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
403
403
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
404
404
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
405
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
406
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
407
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
408
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
405
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
406
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
407
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
408
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
409
409
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
410
410
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
411
411
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
425
425
* Cleanup then free the given azio_stream. Return a zlib error code.
426
426
Try freeing in the reverse order of allocations.
428
int destroy (azio_stream *s)
433
if (s->stream.state != NULL)
432
if (s->stream.state != NULL)
437
436
err = deflateEnd(&(s->stream));
438
437
my_sync(s->file, MYF(0));
440
else if (s->mode == 'r')
439
else if (s->mode == 'r')
441
440
err = inflateEnd(&(s->stream));
444
443
do_aio_cleanup(s);
446
if (s->file > 0 && my_close(s->file, MYF(0)))
445
if (s->file > 0 && my_close(s->file, MYF(0)))
556
555
/* ===========================================================================
557
Experimental Interface. We abstract out a concecpt of rows
556
Experimental Interface. We abstract out a concecpt of rows
559
558
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
617
616
s->stream.next_in = (Bytef*)buf;
618
617
s->stream.avail_in = len;
620
while (s->stream.avail_in != 0)
619
while (s->stream.avail_in != 0)
622
if (s->stream.avail_out == 0)
621
if (s->stream.avail_out == 0)
625
624
s->stream.next_out = s->outbuf;
626
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
625
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
628
627
s->z_err = Z_ERRNO;
659
658
s->stream.avail_in = 0; /* should be zero already anyway */
663
662
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
667
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
666
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
669
668
s->z_err = Z_ERRNO;
697
696
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
699
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
698
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
702
702
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
721
721
s->method= AZ_METHOD_BLOCK;
724
int ZEXPORT azflush (s, flush)
724
int ZEXPORT azflush (azio_stream *s,int flush)
732
730
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
733
pread(s->file, (unsigned char*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
731
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
732
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
734
734
read_header(s, buffer); /* skip the .az header */
807
806
SEEK_END is not implemented, returns error.
808
807
In this version of the library, azseek can be extremely slow.
810
size_t azseek (s, offset, whence)
809
size_t azseek (azio_stream *s, size_t offset, int whence)
816
812
if (s == NULL || whence == SEEK_END ||
817
813
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
823
if (whence == SEEK_SET)
819
if (whence == SEEK_SET)
826
822
/* At this point, offset is the number of zero bytes to write. */
827
823
/* There was a zmemzero here if inbuf was null -Brian */
830
826
uInt size = AZ_BUFSIZE_READ;
831
827
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
833
829
size = azwrite(s, s->inbuf, size);
834
if (size == 0) return -1L;
864
861
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
866
863
size = azread_internal(s, s->outbuf, size, &error);
867
if (error < 0) return -1L;
864
if (error < 0) return SIZE_MAX;
875
872
given compressed file. This position represents a number of bytes in the
876
873
uncompressed data stream.
878
size_t ZEXPORT aztell (file)
875
size_t ZEXPORT aztell (azio_stream *file)
881
877
return azseek(file, 0L, SEEK_CUR);
891
887
unsigned char buffer[1];
893
for (n = 0; n < 4; n++)
889
for (n = 0; n < 4; n++)
895
891
buffer[0]= (int)(x & 0xff);
896
pwrite(s->file, buffer, 1, s->pos);
892
assert(pwrite(s->file, buffer, 1, s->pos)==1);
962
Though this was added to support MySQL's FRM file, anything can be
958
Though this was added to support MySQL's FRM file, anything can be
963
959
stored in this location.
965
961
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
973
969
s->frm_start_pos= (uint) s->start;
974
970
s->frm_length= length;
975
971
s->start+= length;
977
pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos);
973
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
980
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
977
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
985
982
int azread_frm(azio_stream *s, char *blob)
987
pread(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos);
984
ssize_t r= pread(s->file, (unsigned char*) blob,
985
s->frm_length, s->frm_start_pos);
986
if (r != (ssize_t)s->frm_length)
996
996
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1004
1004
s->comment_start_pos= (uint) s->start;
1005
1005
s->comment_length= length;
1006
1006
s->start+= length;
1008
pwrite(s->file, (unsigned char*) blob, s->comment_length, s->comment_start_pos);
1008
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1009
s->comment_length, s->comment_start_pos);
1010
if (r != (ssize_t)s->comment_length)
1010
1013
write_header(s);
1011
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1014
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1016
1019
int azread_comment(azio_stream *s, char *blob)
1018
pread(s->file, (unsigned char*) blob, s->comment_length, s->comment_start_pos);
1021
ssize_t r= pread(s->file, (unsigned char*) blob,
1022
s->comment_length, s->comment_start_pos);
1023
if (r != (ssize_t)s->comment_length)
1035
1041
Normally all IO goes through azio_read(), but in case of error or non-support
1036
1042
we make use of pread().
1038
1044
static void get_block(azio_stream *s)
1040
1046
#ifdef AZIO_AIO
1041
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1047
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1042
1048
&& s->pos < s->check_point
1043
1049
&& s->aio_inited)