19
19
#include <unistd.h>
26
using namespace drizzled;
21
28
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
24
#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */
25
#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */
26
#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */
27
#define ORIG_NAME 0x08 /* bit 3 set: original file name present */
28
#define COMMENT 0x10 /* bit 4 set: file comment present */
29
#define RESERVED 0xE0 /* bits 5..7: reserved */
31
30
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
32
31
static int azrewind (azio_stream *s);
33
32
static unsigned int azio_enable_aio(azio_stream *s);
34
33
static int do_flush(azio_stream *file, int flush);
35
34
static int get_byte(azio_stream *s);
36
35
static void check_header(azio_stream *s);
37
static void write_header(azio_stream *s);
36
static int write_header(azio_stream *s);
38
37
static int destroy(azio_stream *s);
39
38
static void putLong(azio_stream *s, uLong x);
40
39
static uLong getLong(azio_stream *s);
44
43
static void do_aio_cleanup(azio_stream *s);
47
static pthread_handler_t run_task(void *p)
46
extern "C" pthread_handler_t run_task(void *p);
48
extern "C" pthread_handler_t run_task(void *p)
52
azio_stream *s= (azio_stream *)p;
53
azio_stream *s= (azio_stream *)p;
55
internal::my_thread_init();
63
64
offset= s->container.offset;
64
65
fd= s->container.fd;
65
buffer= s->container.buffer;
66
buffer= (char *)s->container.buffer;
66
67
pthread_mutex_unlock(&s->container.thresh_mutex);
68
69
if (s->container.ready == AZ_THREAD_DEAD)
84
85
static void azio_kill(azio_stream *s)
86
87
pthread_mutex_lock(&s->container.thresh_mutex);
87
s->container.ready= AZ_THREAD_DEAD;
88
s->container.ready= AZ_THREAD_DEAD;
88
89
pthread_mutex_unlock(&s->container.thresh_mutex);
90
91
pthread_cond_signal(&s->container.threshhold);
91
pthread_join(s->container.mainthread, (void *)NULL);
92
pthread_join(s->container.mainthread, NULL);
94
95
static size_t azio_return(azio_stream *s)
126
127
pthread_attr_init(&attr);
127
128
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
129
s->container.ready= AZ_THREAD_FINISHED;
130
s->container.ready= AZ_THREAD_FINISHED;
131
132
/* If we don't create a thread, signal the caller */
132
133
if (pthread_create(&s->container.mainthread, &attr, run_task,
141
142
static int azio_read(azio_stream *s)
143
144
pthread_mutex_lock(&s->container.thresh_mutex);
144
s->container.ready= AZ_THREAD_ACTIVE;
145
s->container.ready= AZ_THREAD_ACTIVE;
145
146
pthread_mutex_unlock(&s->container.thresh_mutex);
146
147
pthread_cond_broadcast(&s->container.threshhold);
187
188
s->method= method;
190
We do our own version of append by nature.
191
We do our own version of append by nature.
191
192
We must always have write access to take card of the header.
193
194
assert(Flags | O_APPEND);
194
195
assert(Flags | O_WRONLY);
201
202
err = deflateInit2(&(s->stream), level,
202
203
Z_DEFLATED, -MAX_WBITS, 8, strategy);
228
229
s->stream.avail_out = AZ_BUFSIZE_WRITE;
231
s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
232
s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
233
234
s->container.fd= s->file;
242
if (Flags & O_CREAT || Flags & O_TRUNC)
243
if (Flags & O_CREAT || Flags & O_TRUNC)
245
246
s->forced_flushes= 0;
253
254
s->frm_length= 0;
254
255
s->dirty= 1; /* We create the file dirty */
255
256
s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
257
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
259
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
259
else if (s->mode == 'w')
261
else if (s->mode == 'w')
261
uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
263
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
264
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
265
if(pread(s->file, buffer, read_size, 0) < read_size)
263
267
read_header(s, buffer);
264
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
268
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
306
310
int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
307
311
int4store(ptr + AZ_META_POS, 0); /* Meta Block */
308
312
int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
309
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
313
int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
314
int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
315
int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
316
int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
317
int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
314
318
int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
315
319
int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
316
int4store(ptr+ AZ_FRM_POS,
320
int4store(ptr+ AZ_FRM_POS,
317
321
AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
318
322
*(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
320
324
/* Always begin at the begining, and end there as well */
321
pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
325
const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
326
if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
324
332
/* ===========================================================================
327
335
IN assertion: the stream s has been sucessfully opened for reading.
337
int get_byte(azio_stream *s)
332
339
if (s->z_eof) return EOF;
333
if (s->stream.avail_in == 0)
340
if (s->stream.avail_in == 0)
336
if (s->stream.avail_in == 0)
343
if (s->stream.avail_in == 0)
368
375
if (len) s->inbuf[0] = s->stream.next_in[0];
370
len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
377
len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
372
379
if (len == (uInt)-1) s->z_err = Z_ERRNO;
373
380
s->stream.avail_in += len;
402
409
s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
403
410
s->block_size= 1024 * buffer[AZ_BLOCK_POS];
404
411
s->start= (size_t)uint8korr(buffer + AZ_START_POS);
405
s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
406
s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
407
s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
408
s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
412
s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
413
s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
414
s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
415
s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
409
416
s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
410
417
s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
411
418
s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
425
432
* Cleanup then free the given azio_stream. Return a zlib error code.
426
433
Try freeing in the reverse order of allocations.
435
int destroy (azio_stream *s)
433
if (s->stream.state != NULL)
439
if (s->stream.state != NULL)
437
443
err = deflateEnd(&(s->stream));
438
my_sync(s->file, MYF(0));
444
internal::my_sync(s->file, MYF(0));
440
else if (s->mode == 'r')
446
else if (s->mode == 'r')
441
447
err = inflateEnd(&(s->stream));
444
450
do_aio_cleanup(s);
446
if (s->file > 0 && my_close(s->file, MYF(0)))
452
if (s->file > 0 && internal::my_close(s->file, MYF(0)))
457
463
Reads the given number of uncompressed bytes from the compressed file.
458
464
azread returns the number of bytes actually read (0 for end of file).
460
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
467
This function is legacy, do not use.
469
Reads the given number of uncompressed bytes from the compressed file.
470
If the input file was not in gzip format, gzread copies the given number
471
of bytes into the buffer.
472
gzread returns the number of uncompressed bytes actually read (0 for
473
end of file, -1 for error).
475
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
462
477
Bytef *start = (Bytef*)buf; /* starting point for crc computation */
463
478
Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */
466
481
if (s->mode != 'r')
468
483
*error= Z_STREAM_ERROR;
472
487
if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
474
489
*error= s->z_err;
478
493
if (s->z_err == Z_STREAM_END) /* EOF */
556
571
/* ===========================================================================
557
Experimental Interface. We abstract out a concecpt of rows
572
Experimental Interface. We abstract out a concecpt of rows
559
574
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
617
632
s->stream.next_in = (Bytef*)buf;
618
633
s->stream.avail_in = len;
620
while (s->stream.avail_in != 0)
635
while (s->stream.avail_in != 0)
622
if (s->stream.avail_out == 0)
637
if (s->stream.avail_out == 0)
625
640
s->stream.next_out = s->outbuf;
626
if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
641
if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
628
643
s->z_err = Z_ERRNO;
659
674
s->stream.avail_in = 0; /* should be zero already anyway */
663
678
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
667
if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len)
682
if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
669
684
s->z_err = Z_ERRNO;
697
712
s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
699
afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
714
afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
702
718
return s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
705
721
static unsigned int azio_enable_aio(azio_stream *s)
707
VOID(pthread_cond_init(&s->container.threshhold, NULL));
708
VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
723
pthread_cond_init(&s->container.threshhold, NULL);
724
pthread_mutex_init(&s->container.thresh_mutex, NULL);
718
VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
719
VOID(pthread_cond_destroy(&s->container.threshhold));
734
pthread_mutex_destroy(&s->container.thresh_mutex);
735
pthread_cond_destroy(&s->container.threshhold);
721
737
s->method= AZ_METHOD_BLOCK;
724
int ZEXPORT azflush (s, flush)
740
int ZEXPORT azflush (azio_stream *s,int flush)
732
746
unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
733
pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
747
const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
748
if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
734
750
read_header(s, buffer); /* skip the .az header */
807
822
SEEK_END is not implemented, returns error.
808
823
In this version of the library, azseek can be extremely slow.
810
size_t azseek (s, offset, whence)
825
size_t azseek (azio_stream *s, size_t offset, int whence)
816
828
if (s == NULL || whence == SEEK_END ||
817
829
s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
823
if (whence == SEEK_SET)
835
if (whence == SEEK_SET)
826
838
/* At this point, offset is the number of zero bytes to write. */
827
839
/* There was a zmemzero here if inbuf was null -Brian */
830
842
uInt size = AZ_BUFSIZE_READ;
831
843
if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
833
845
size = azwrite(s, s->inbuf, size);
834
if (size == 0) return -1L;
864
877
if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
866
879
size = azread_internal(s, s->outbuf, size, &error);
867
if (error < 0) return -1L;
880
if (error < 0) return SIZE_MAX;
875
888
given compressed file. This position represents a number of bytes in the
876
889
uncompressed data stream.
878
size_t ZEXPORT aztell (file)
891
size_t ZEXPORT aztell (azio_stream *file)
881
893
return azseek(file, 0L, SEEK_CUR);
888
900
void putLong (azio_stream *s, uLong x)
903
unsigned char buffer[1];
893
for (n = 0; n < 4; n++)
905
for (n = 0; n < 4; n++)
895
907
buffer[0]= (int)(x & 0xff);
896
pwrite(s->file, buffer, 1, s->pos);
908
size_t ret= pwrite(s->file, buffer, 1, s->pos);
962
Though this was added to support MySQL's FRM file, anything can be
975
Though this was added to support MySQL's FRM file, anything can be
963
976
stored in this location.
965
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
978
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
973
986
s->frm_start_pos= (uint) s->start;
974
987
s->frm_length= length;
975
988
s->start+= length;
977
pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
990
if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
980
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
994
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
985
999
int azread_frm(azio_stream *s, char *blob)
987
pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
1001
ssize_t r= pread(s->file, (unsigned char*) blob,
1002
s->frm_length, s->frm_start_pos);
1003
if (r != (ssize_t)s->frm_length)
994
1011
Simple comment field
996
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1013
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
1004
1021
s->comment_start_pos= (uint) s->start;
1005
1022
s->comment_length= length;
1006
1023
s->start+= length;
1008
pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1025
ssize_t r= pwrite(s->file, (unsigned char*) blob,
1026
s->comment_length, s->comment_start_pos);
1027
if (r != (ssize_t)s->comment_length)
1010
1030
write_header(s);
1011
s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1031
s->pos= (size_t)lseek(s->file, 0, SEEK_END);
1016
1036
int azread_comment(azio_stream *s, char *blob)
1018
pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1038
ssize_t r= pread(s->file, (unsigned char*) blob,
1039
s->comment_length, s->comment_start_pos);
1040
if (r != (ssize_t)s->comment_length)
1035
1058
Normally all IO goes through azio_read(), but in case of error or non-support
1036
1059
we make use of pread().
1038
1061
static void get_block(azio_stream *s)
1040
1063
#ifdef AZIO_AIO
1041
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1064
if (s->method == AZ_METHOD_AIO && s->mode == 'r'
1042
1065
&& s->pos < s->check_point
1043
1066
&& s->aio_inited)
1054
1077
s->pos+= s->stream.avail_in;
1055
1078
s->inbuf= (Byte *)s->container.buffer;
1056
/* We only aio_read when we know there is more data to be read */
1079
/* We only azio_read when we know there is more data to be read */
1057
1080
if (s->pos >= s->check_point)
1059
1082
s->aio_inited= 0;
1069
1092
#ifdef AZIO_AIO
1072
s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
1095
s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1073
1096
AZ_BUFSIZE_READ, s->pos);
1074
1097
s->pos+= s->stream.avail_in;