19
19
#include <unistd.h>
26
using namespace drizzled;
28
21
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
24
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
25
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
26
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
27
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
28
#define COMMENT 0x10 /* bit 4 set: file comment present */
29
#define RESERVED 0xE0 /* bits 5..7: reserved */
30
31
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
31
32
static int azrewind (azio_stream *s);
32
33
static unsigned int azio_enable_aio(azio_stream *s);
33
34
static int do_flush(azio_stream *file, int flush);
34
35
static int get_byte(azio_stream *s);
35
36
static void check_header(azio_stream *s);
36
static int write_header(azio_stream *s);
37
static void write_header(azio_stream *s);
37
38
static int destroy(azio_stream *s);
38
39
static void putLong(azio_stream *s, uLong x);
39
40
static uLong getLong(azio_stream *s);
43
44
static void do_aio_cleanup(azio_stream *s);
46
extern "C" pthread_handler_t run_task(void *p);
48
extern "C" pthread_handler_t run_task(void *p)
47
static pthread_handler_t run_task(void *p)
53
azio_stream *s= (azio_stream *)p;
52
azio_stream *s= (azio_stream *)p;
55
internal::my_thread_init();
64
63
offset= s->container.offset;
65
64
fd= s->container.fd;
66
buffer= (char *)s->container.buffer;
65
buffer= s->container.buffer;
67
66
pthread_mutex_unlock(&s->container.thresh_mutex);
69
68
if (s->container.ready == AZ_THREAD_DEAD)
85
84
static void azio_kill(azio_stream *s)
87
86
pthread_mutex_lock(&s->container.thresh_mutex);
88
s->container.ready= AZ_THREAD_DEAD;
87
s->container.ready= AZ_THREAD_DEAD;
89
88
pthread_mutex_unlock(&s->container.thresh_mutex);
91
90
pthread_cond_signal(&s->container.threshhold);
92
pthread_join(s->container.mainthread, NULL);
91
pthread_join(s->container.mainthread, (void *)NULL);
95
94
static size_t azio_return(azio_stream *s)
127
126
pthread_attr_init(&attr);
128
127
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
130
s->container.ready= AZ_THREAD_FINISHED;
129
s->container.ready= AZ_THREAD_FINISHED;
132
131
/* If we don't create a thread, signal the caller */
133
132
if (pthread_create(&s->container.mainthread, &attr, run_task,
142
141
static int azio_read(azio_stream *s)
144
143
pthread_mutex_lock(&s->container.thresh_mutex);
145
s->container.ready= AZ_THREAD_ACTIVE;
144
s->container.ready= AZ_THREAD_ACTIVE;
146
145
pthread_mutex_unlock(&s->container.thresh_mutex);
147
146
pthread_cond_broadcast(&s->container.threshhold);
188
187
s->method= method;
191
We do our own version of append by nature.
190
We do our own version of append by nature.
192
191
We must always have write access to take card of the header.
194
193
assert(Flags | O_APPEND);
195
194
assert(Flags | O_WRONLY);
202
201
err = deflateInit2(&(s->stream), level,
203
202
Z_DEFLATED, -MAX_WBITS, 8, strategy);
229
228
s->stream.avail_out = AZ_BUFSIZE_WRITE;
232
s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
231
s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
234
233
s->container.fd= s->file;
243
if (Flags & O_CREAT || Flags & O_TRUNC)
242
if (Flags & O_CREAT || Flags & O_TRUNC)
246
245
s->forced_flushes= 0;
254
253
s->frm_length= 0;
255
254
s->dirty= 1; /* We create the file dirty */
256
255
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
259
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
257
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
261
else if (s->mode == 'w')
259
else if (s->mode == 'w')
263
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
264
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
265
if(pread(s->file, buffer, read_size, 0) < read_size)
261
uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
267
263
read_header(s, buffer);
268
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
264
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
310
306
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
311
307
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
312
308
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
313
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
314
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
315
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
316
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
317
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
309
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
318
314
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
319
315
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
320
int4store(ptr+ AZ_FRM_POS,
316
int4store(ptr+ AZ_FRM_POS,
321
317
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
322
318
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
324
320
/* Always begin at the begining, and end there as well */
325
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
326
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
321
pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
332
324
/* ===========================================================================
335
327
IN assertion: the stream s has been sucessfully opened for reading.
337
int get_byte(azio_stream *s)
339
332
if (s->z_eof) return EOF;
340
if (s->stream.avail_in == 0)
333
if (s->stream.avail_in == 0)
343
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
375
368
if (len) s->inbuf[0] = s->stream.next_in[0];
377
len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
370
len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
379
372
if (len == (uInt)-1) s->z_err = Z_ERRNO;
380
373
s->stream.avail_in += len;
409
402
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
410
403
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
411
404
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
412
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
413
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
414
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
415
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
405
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
406
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
407
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
408
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
416
409
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
417
410
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
418
411
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
432
425
* Cleanup then free the given azio_stream. Return a zlib error code.
433
426
Try freeing in the reverse order of allocations.
435
int destroy (azio_stream *s)
439
if (s->stream.state != NULL)
433
if (s->stream.state != NULL)
443
437
err = deflateEnd(&(s->stream));
444
internal::my_sync(s->file, MYF(0));
438
my_sync(s->file, MYF(0));
446
else if (s->mode == 'r')
440
else if (s->mode == 'r')
447
441
err = inflateEnd(&(s->stream));
450
444
do_aio_cleanup(s);
452
if (s->file > 0 && internal::my_close(s->file, MYF(0)))
446
if (s->file > 0 && my_close(s->file, MYF(0)))
463
457
Reads the given number of uncompressed bytes from the compressed file.
464
458
azread returns the number of bytes actually read (0 for end of file).
467
This function is legacy, do not use.
469
Reads the given number of uncompressed bytes from the compressed file.
470
If the input file was not in gzip format, gzread copies the given number
471
of bytes into the buffer.
472
gzread returns the number of uncompressed bytes actually read (0 for
473
end of file, -1 for error).
475
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
460
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
477
462
Bytef *start = (Bytef*)buf; /* starting point for crc computation */
478
463
Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */
481
466
if (s->mode != 'r')
483
468
*error= Z_STREAM_ERROR;
487
472
if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
489
474
*error= s->z_err;
493
478
if (s->z_err == Z_STREAM_END) /* EOF */
571
556
/* ===========================================================================
572
Experimental Interface. We abstract out a concecpt of rows
557
Experimental Interface. We abstract out a concecpt of rows
574
559
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
632
617
s->stream.next_in = (Bytef*)buf;
633
618
s->stream.avail_in = len;
635
while (s->stream.avail_in != 0)
620
while (s->stream.avail_in != 0)
637
if (s->stream.avail_out == 0)
622
if (s->stream.avail_out == 0)
640
625
s->stream.next_out = s->outbuf;
641
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
626
if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
643
628
s->z_err = Z_ERRNO;
674
659
s->stream.avail_in = 0; /* should be zero already anyway */
678
663
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
682
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
667
if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len)
684
669
s->z_err = Z_ERRNO;
712
697
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
714
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
699
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
718
702
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
721
705
static unsigned int azio_enable_aio(azio_stream *s)
723
pthread_cond_init(&s->container.threshhold, NULL);
724
pthread_mutex_init(&s->container.thresh_mutex, NULL);
707
VOID(pthread_cond_init(&s->container.threshhold, NULL));
708
VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
734
pthread_mutex_destroy(&s->container.thresh_mutex);
735
pthread_cond_destroy(&s->container.threshhold);
718
VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
719
VOID(pthread_cond_destroy(&s->container.threshhold));
737
721
s->method= AZ_METHOD_BLOCK;
740
int ZEXPORT azflush (azio_stream *s,int flush)
724
int ZEXPORT azflush (s, flush)
746
732
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
747
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
748
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
733
pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
750
734
read_header(s, buffer); /* skip the .az header */
822
807
SEEK_END is not implemented, returns error.
823
808
In this version of the library, azseek can be extremely slow.
825
size_t azseek (azio_stream *s, size_t offset, int whence)
810
size_t azseek (s, offset, whence)
828
816
if (s == NULL || whence == SEEK_END ||
829
817
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
835
if (whence == SEEK_SET)
823
if (whence == SEEK_SET)
838
826
/* At this point, offset is the number of zero bytes to write. */
839
827
/* There was a zmemzero here if inbuf was null -Brian */
842
830
uInt size = AZ_BUFSIZE_READ;
843
831
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
845
833
size = azwrite(s, s->inbuf, size);
834
if (size == 0) return -1L;
877
864
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
879
866
size = azread_internal(s, s->outbuf, size, &error);
880
if (error < 0) return SIZE_MAX;
867
if (error < 0) return -1L;
888
875
given compressed file. This position represents a number of bytes in the
889
876
uncompressed data stream.
891
size_t ZEXPORT aztell (azio_stream *file)
878
size_t ZEXPORT aztell (file)
893
881
return azseek(file, 0L, SEEK_CUR);
900
888
void putLong (azio_stream *s, uLong x)
903
unsigned char buffer[1];
905
for (n = 0; n < 4; n++)
893
for (n = 0; n < 4; n++)
907
895
buffer[0]= (int)(x & 0xff);
908
size_t ret= pwrite(s->file, buffer, 1, s->pos);
896
pwrite(s->file, buffer, 1, s->pos);
975
Though this was added to support MySQL's FRM file, anything can be
962
Though this was added to support MySQL's FRM file, anything can be
976
963
stored in this location.
978
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
965
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
986
973
s->frm_start_pos= (uint) s->start;
987
974
s->frm_length= length;
988
975
s->start+= length;
990
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
977
pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
994
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
980
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
999
985
int azread_frm(azio_stream *s, char *blob)
1001
ssize_t r= pread(s->file, (unsigned char*) blob,
1002
s->frm_length, s->frm_start_pos);
1003
if (r != (ssize_t)s->frm_length)
987
pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
1011
994
Simple comment field
1013
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
996
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1021
1004
s->comment_start_pos= (uint) s->start;
1022
1005
s->comment_length= length;
1023
1006
s->start+= length;
1025
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1026
s->comment_length, s->comment_start_pos);
1027
if (r != (ssize_t)s->comment_length)
1008
pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1030
1010
write_header(s);
1031
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1011
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1036
1016
int azread_comment(azio_stream *s, char *blob)
1038
ssize_t r= pread(s->file, (unsigned char*) blob,
1039
s->comment_length, s->comment_start_pos);
1040
if (r != (ssize_t)s->comment_length)
1018
pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1058
1035
Normally all IO goes through azio_read(), but in case of error or non-support
1059
1036
we make use of pread().
1061
1038
static void get_block(azio_stream *s)
1063
1040
#ifdef AZIO_AIO
1064
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1041
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1065
1042
&& s->pos < s->check_point
1066
1043
&& s->aio_inited)
1077
1054
s->pos+= s->stream.avail_in;
1078
1055
s->inbuf= (Byte *)s->container.buffer;
1079
/* We only azio_read when we know there is more data to be read */
1056
/* We only aio_read when we know there is more data to be read */
1080
1057
if (s->pos >= s->check_point)
1082
1059
s->aio_inited= 0;
1092
1069
#ifdef AZIO_AIO
1095
s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1072
s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
1096
1073
AZ_BUFSIZE_READ, s->pos);
1097
1074
s->pos+= s->stream.avail_in;