26
20
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
23
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
24
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
25
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
26
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
27
#define COMMENT 0x10 /* bit 4 set: file comment present */
28
#define RESERVED 0xE0 /* bits 5..7: reserved */
28
30
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
29
31
static int azrewind (azio_stream *s);
30
32
static unsigned int azio_enable_aio(azio_stream *s);
31
33
static int do_flush(azio_stream *file, int flush);
32
34
static int get_byte(azio_stream *s);
33
35
static void check_header(azio_stream *s);
34
static int write_header(azio_stream *s);
36
static void write_header(azio_stream *s);
35
37
static int destroy(azio_stream *s);
36
38
static void putLong(azio_stream *s, uLong x);
37
39
static uLong getLong(azio_stream *s);
41
43
static void do_aio_cleanup(azio_stream *s);
44
extern "C" pthread_handler_t run_task(void *p);
46
extern "C" pthread_handler_t run_task(void *p)
46
static pthread_handler_t run_task(void *p)
51
azio_stream *s= (azio_stream *)p;
51
azio_stream *s= (azio_stream *)p;
62
62
offset= s->container.offset;
63
63
fd= s->container.fd;
64
buffer= (char *)s->container.buffer;
64
buffer= s->container.buffer;
65
65
pthread_mutex_unlock(&s->container.thresh_mutex);
67
67
if (s->container.ready == AZ_THREAD_DEAD)
70
s->container.read_size= pread((int)fd, (void *)buffer,
71
(size_t)AZ_BUFSIZE_READ, (off_t)offset);
70
s->container.read_size= my_pread(fd, (uchar *)buffer, AZ_BUFSIZE_READ,
73
73
pthread_mutex_lock(&s->container.thresh_mutex);
74
s->container.ready= AZ_THREAD_FINISHED;
74
s->container.ready= AZ_THREAD_FINISHED;
75
75
pthread_mutex_unlock(&s->container.thresh_mutex);
83
83
static void azio_kill(azio_stream *s)
85
85
pthread_mutex_lock(&s->container.thresh_mutex);
86
s->container.ready= AZ_THREAD_DEAD;
86
s->container.ready= AZ_THREAD_DEAD;
87
87
pthread_mutex_unlock(&s->container.thresh_mutex);
89
89
pthread_cond_signal(&s->container.threshhold);
90
pthread_join(s->container.mainthread, NULL);
90
pthread_join(s->container.mainthread, (void *)NULL);
93
93
static size_t azio_return(azio_stream *s)
125
125
pthread_attr_init(&attr);
126
126
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
128
s->container.ready= AZ_THREAD_FINISHED;
128
s->container.ready= AZ_THREAD_FINISHED;
130
130
/* If we don't create a thread, signal the caller */
131
131
if (pthread_create(&s->container.mainthread, &attr, run_task,
140
140
static int azio_read(azio_stream *s)
142
142
pthread_mutex_lock(&s->container.thresh_mutex);
143
s->container.ready= AZ_THREAD_ACTIVE;
143
s->container.ready= AZ_THREAD_ACTIVE;
144
144
pthread_mutex_unlock(&s->container.thresh_mutex);
145
145
pthread_cond_broadcast(&s->container.threshhold);
186
186
s->method= method;
189
We do our own version of append by nature.
189
We do our own version of append by nature.
190
190
We must always have write access to take card of the header.
192
assert(Flags | O_APPEND);
193
assert(Flags | O_WRONLY);
192
DBUG_ASSERT(Flags | O_APPEND);
193
DBUG_ASSERT(Flags | O_WRONLY);
200
200
err = deflateInit2(&(s->stream), level,
201
201
Z_DEFLATED, -MAX_WBITS, 8, strategy);
252
252
s->frm_length= 0;
253
253
s->dirty= 1; /* We create the file dirty */
254
254
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
257
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
256
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
259
else if (s->mode == 'w')
258
else if (s->mode == 'w')
261
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
263
if(pread(s->file, buffer, read_size, 0) < read_size)
260
uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
261
my_pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
265
263
read_header(s, buffer);
266
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
264
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
308
306
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
309
307
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
310
308
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
311
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
314
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
315
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
309
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
316
314
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
317
315
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
318
int4store(ptr+ AZ_FRM_POS,
316
int4store(ptr+ AZ_FRM_POS,
319
317
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
320
318
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
322
320
/* Always begin at the begining, and end there as well */
323
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
324
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
321
my_pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
330
325
/* ===========================================================================
333
328
IN assertion: the stream s has been sucessfully opened for reading.
335
int get_byte(azio_stream *s)
337
333
if (s->z_eof) return EOF;
338
if (s->stream.avail_in == 0)
334
if (s->stream.avail_in == 0)
341
if (s->stream.avail_in == 0)
337
if (s->stream.avail_in == 0)
373
369
if (len) s->inbuf[0] = s->stream.next_in[0];
375
len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
371
len = (uInt)my_pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos, MYF(0));
377
373
if (len == (uInt)-1) s->z_err = Z_ERRNO;
378
374
s->stream.avail_in += len;
407
403
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
408
404
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
409
405
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
410
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
411
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
412
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
413
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
406
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
407
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
408
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
409
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
414
410
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
415
411
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
416
412
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
430
426
* Cleanup then free the given azio_stream. Return a zlib error code.
431
427
Try freeing in the reverse order of allocations.
433
int destroy (azio_stream *s)
437
if (s->stream.state != NULL)
434
if (s->stream.state != NULL)
441
438
err = deflateEnd(&(s->stream));
442
439
my_sync(s->file, MYF(0));
444
else if (s->mode == 'r')
441
else if (s->mode == 'r')
445
442
err = inflateEnd(&(s->stream));
448
445
do_aio_cleanup(s);
450
if (s->file > 0 && my_close(s->file, MYF(0)))
447
if (s->file > 0 && my_close(s->file, MYF(0)))
461
458
Reads the given number of uncompressed bytes from the compressed file.
462
459
azread returns the number of bytes actually read (0 for end of file).
465
This function is legacy, do not use.
467
Reads the given number of uncompressed bytes from the compressed file.
468
If the input file was not in gzip format, gzread copies the given number
469
of bytes into the buffer.
470
gzread returns the number of uncompressed bytes actually read (0 for
471
end of file, -1 for error).
473
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
461
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
475
463
Bytef *start = (Bytef*)buf; /* starting point for crc computation */
476
464
Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */
479
467
if (s->mode != 'r')
481
469
*error= Z_STREAM_ERROR;
485
473
if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
487
475
*error= s->z_err;
491
479
if (s->z_err == Z_STREAM_END) /* EOF */
569
557
/* ===========================================================================
570
Experimental Interface. We abstract out a concecpt of rows
558
Experimental Interface. We abstract out a concecpt of rows
572
560
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
630
618
s->stream.next_in = (Bytef*)buf;
631
619
s->stream.avail_in = len;
633
while (s->stream.avail_in != 0)
621
while (s->stream.avail_in != 0)
635
if (s->stream.avail_out == 0)
623
if (s->stream.avail_out == 0)
638
626
s->stream.next_out = s->outbuf;
639
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
627
if (my_pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos,
628
MYF(0)) != AZ_BUFSIZE_WRITE)
641
630
s->z_err = Z_ERRNO;
672
661
s->stream.avail_in = 0; /* should be zero already anyway */
676
665
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
680
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
669
if ((uInt)my_pwrite(s->file, (uchar *)s->outbuf, len, s->pos, MYF(0)) != len)
682
671
s->z_err = Z_ERRNO;
710
699
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
712
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
701
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
716
704
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
719
707
static unsigned int azio_enable_aio(azio_stream *s)
721
pthread_cond_init(&s->container.threshhold, NULL);
722
pthread_mutex_init(&s->container.thresh_mutex, NULL);
709
VOID(pthread_cond_init(&s->container.threshhold, NULL));
710
VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
728
static void azio_disable_aio(azio_stream *s)
716
void azio_disable_aio(azio_stream *s)
732
pthread_mutex_destroy(&s->container.thresh_mutex);
733
pthread_cond_destroy(&s->container.threshhold);
720
VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
721
VOID(pthread_cond_destroy(&s->container.threshhold));
735
723
s->method= AZ_METHOD_BLOCK;
738
int ZEXPORT azflush (azio_stream *s,int flush)
726
int ZEXPORT azflush (s, flush)
744
734
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
745
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
746
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
735
my_pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
748
737
read_header(s, buffer); /* skip the .az header */
820
810
SEEK_END is not implemented, returns error.
821
811
In this version of the library, azseek can be extremely slow.
823
size_t azseek (azio_stream *s, size_t offset, int whence)
813
size_t azseek (s, offset, whence)
826
819
if (s == NULL || whence == SEEK_END ||
827
820
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
833
if (whence == SEEK_SET)
826
if (whence == SEEK_SET)
836
829
/* At this point, offset is the number of zero bytes to write. */
837
830
/* There was a zmemzero here if inbuf was null -Brian */
840
833
uInt size = AZ_BUFSIZE_READ;
841
834
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
843
836
size = azwrite(s, s->inbuf, size);
837
if (size == 0) return -1L;
875
867
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
877
869
size = azread_internal(s, s->outbuf, size, &error);
878
if (error < 0) return SIZE_MAX;
870
if (error < 0) return -1L;
886
878
given compressed file. This position represents a number of bytes in the
887
879
uncompressed data stream.
889
size_t ZEXPORT aztell (azio_stream *file)
881
size_t ZEXPORT aztell (file)
891
884
return azseek(file, 0L, SEEK_CUR);
898
891
void putLong (azio_stream *s, uLong x)
901
unsigned char buffer[1];
903
for (n = 0; n < 4; n++)
896
for (n = 0; n < 4; n++)
905
898
buffer[0]= (int)(x & 0xff);
906
size_t ret= pwrite(s->file, buffer, 1, s->pos);
899
my_pwrite(s->file, buffer, 1, s->pos, MYF(0));
973
Though this was added to support MySQL's FRM file, anything can be
965
Though this was added to support MySQL's FRM file, anything can be
974
966
stored in this location.
976
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
968
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
984
976
s->frm_start_pos= (uint) s->start;
985
977
s->frm_length= length;
986
978
s->start+= length;
988
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
980
my_pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos, MYF(0));
992
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
983
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
997
988
int azread_frm(azio_stream *s, char *blob)
999
ssize_t r= pread(s->file, (unsigned char*) blob,
1000
s->frm_length, s->frm_start_pos);
1001
if (r != (ssize_t)s->frm_length)
990
my_pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos, MYF(0));
1009
997
Simple comment field
1011
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
999
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1019
1007
s->comment_start_pos= (uint) s->start;
1020
1008
s->comment_length= length;
1021
1009
s->start+= length;
1023
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1024
s->comment_length, s->comment_start_pos);
1025
if (r != (ssize_t)s->comment_length)
1011
my_pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos,
1028
1014
write_header(s);
1029
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1015
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1034
1020
int azread_comment(azio_stream *s, char *blob)
1036
ssize_t r= pread(s->file, (unsigned char*) blob,
1037
s->comment_length, s->comment_start_pos);
1038
if (r != (ssize_t)s->comment_length)
1022
my_pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos,
1056
1040
Normally all IO goes through azio_read(), but in case of error or non-support
1057
1041
we make use of pread().
1059
1043
static void get_block(azio_stream *s)
1061
1045
#ifdef AZIO_AIO
1062
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1046
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1063
1047
&& s->pos < s->check_point
1064
1048
&& s->aio_inited)
1075
1059
s->pos+= s->stream.avail_in;
1076
1060
s->inbuf= (Byte *)s->container.buffer;
1077
/* We only azio_read when we know there is more data to be read */
1061
/* We only aio_read when we know there is more data to be read */
1078
1062
if (s->pos >= s->check_point)
1080
1064
s->aio_inited= 0;
1090
1074
#ifdef AZIO_AIO
1093
s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1094
AZ_BUFSIZE_READ, s->pos);
1077
s->stream.avail_in = (uInt)my_pread(s->file, (uchar *)s->inbuf, AZ_BUFSIZE_READ, s->pos, MYF(0));
1095
1078
s->pos+= s->stream.avail_in;