~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/azio.c

  • Committer: Brian Aker
  • Date: 2008-08-18 20:57:01 UTC
  • mto: This revision was merged to the branch mainline in revision 352.
  • Revision ID: brian@tangent.org-20080818205701-rzeyd4qw4xj7wx7l
Refactoring of class Table

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#include <stdio.h>
17
17
#include <string.h>
18
 
#include <stdlib.h>
19
18
#include <assert.h>
20
19
#include <unistd.h>
21
20
 
22
21
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
23
22
 
 
23
/* gzip flag uchar */
 
24
#define ASCII_FLAG   0x01 /* bit 0 set: file probably ascii text */
 
25
#define HEAD_CRC     0x02 /* bit 1 set: header CRC present */
 
26
#define EXTRA_FIELD  0x04 /* bit 2 set: extra field present */
 
27
#define ORIG_NAME    0x08 /* bit 3 set: original file name present */
 
28
#define COMMENT      0x10 /* bit 4 set: file comment present */
 
29
#define RESERVED     0xE0 /* bits 5..7: reserved */
 
30
 
24
31
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
25
32
static int azrewind (azio_stream *s);
26
33
static unsigned int azio_enable_aio(azio_stream *s);
27
34
static int do_flush(azio_stream *file, int flush);
28
35
static int    get_byte(azio_stream *s);
29
36
static void   check_header(azio_stream *s);
30
 
static int write_header(azio_stream *s);
 
37
static void write_header(azio_stream *s);
31
38
static int    destroy(azio_stream *s);
32
39
static void putLong(azio_stream *s, uLong x);
33
40
static uLong  getLong(azio_stream *s);
37
44
static void do_aio_cleanup(azio_stream *s);
38
45
#endif
39
46
 
40
 
extern "C"
41
 
pthread_handler_t run_task(void *p)
 
47
static pthread_handler_t run_task(void *p)
42
48
{
43
49
  int fd;
44
50
  char *buffer;
56
62
    }
57
63
    offset= s->container.offset;
58
64
    fd= s->container.fd;
59
 
    buffer= (char *)s->container.buffer;
 
65
    buffer= s->container.buffer;
60
66
    pthread_mutex_unlock(&s->container.thresh_mutex);
61
67
 
62
68
    if (s->container.ready == AZ_THREAD_DEAD)
82
88
  pthread_mutex_unlock(&s->container.thresh_mutex);
83
89
 
84
90
  pthread_cond_signal(&s->container.threshhold);
85
 
  pthread_join(s->container.mainthread, NULL);
 
91
  pthread_join(s->container.mainthread, (void *)NULL);
86
92
}
87
93
 
88
94
static size_t azio_return(azio_stream *s)
247
253
    s->frm_length= 0;
248
254
    s->dirty= 1; /* We create the file dirty */
249
255
    s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
250
 
    if(write_header(s))
251
 
      return Z_NULL;
 
256
    write_header(s);
252
257
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
253
258
  }
254
259
  else if (s->mode == 'w') 
255
260
  {
256
 
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
257
 
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
258
 
    if(pread(s->file, buffer, read_size, 0) < read_size)
259
 
      return Z_NULL;
 
261
    uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
 
262
    pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
260
263
    read_header(s, buffer);
261
264
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
262
265
  }
279
282
}
280
283
 
281
284
 
282
 
int write_header(azio_stream *s)
 
285
void write_header(azio_stream *s)
283
286
{
284
287
  char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
285
288
  char *ptr= buffer;
303
306
  int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
304
307
  int4store(ptr + AZ_META_POS, 0); /* Meta Block */
305
308
  int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
306
 
  int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
307
 
  int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
308
 
  int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
309
 
  int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
310
 
  int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
 
309
  int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
 
310
  int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
 
311
  int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
 
312
  int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
 
313
  int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
311
314
  int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
312
315
  int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
313
316
  int4store(ptr+ AZ_FRM_POS, 
315
318
  *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
316
319
 
317
320
  /* Always begin at the begining, and end there as well */
318
 
  const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
319
 
  if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
320
 
    return -1;
321
 
 
322
 
  return 0;
 
321
  pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
323
322
}
324
323
 
325
324
/* ===========================================================================
327
326
  for end of file.
328
327
  IN assertion: the stream s has been sucessfully opened for reading.
329
328
*/
330
 
int get_byte(azio_stream *s)
 
329
int get_byte(s)
 
330
  azio_stream *s;
331
331
{
332
332
  if (s->z_eof) return EOF;
333
333
  if (s->stream.avail_in == 0) 
367
367
  if (len < 2) {
368
368
    if (len) s->inbuf[0] = s->stream.next_in[0];
369
369
    errno = 0;
370
 
    len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
 
370
    len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
371
371
    s->pos+= len;
372
372
    if (len == (uInt)-1) s->z_err = Z_ERRNO;
373
373
    s->stream.avail_in += len;
402
402
    s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
403
403
    s->block_size= 1024 * buffer[AZ_BLOCK_POS];
404
404
    s->start= (size_t)uint8korr(buffer + AZ_START_POS);
405
 
    s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
406
 
    s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
407
 
    s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
408
 
    s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
 
405
    s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
 
406
    s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
 
407
    s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
 
408
    s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
409
409
    s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
410
410
    s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
411
411
    s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
425
425
 * Cleanup then free the given azio_stream. Return a zlib error code.
426
426
 Try freeing in the reverse order of allocations.
427
427
 */
428
 
int destroy (azio_stream *s)
 
428
int destroy (s)
 
429
  azio_stream *s;
429
430
{
430
431
  int err = Z_OK;
431
432
 
596
597
  new_ptr= (char *)realloc(s->row_ptr, (sizeof(char) * row_length));
597
598
 
598
599
  if (!new_ptr)
599
 
    return SIZE_MAX;
 
600
    return -1;
600
601
 
601
602
  s->row_ptr= new_ptr;
602
603
 
622
623
    {
623
624
 
624
625
      s->stream.next_out = s->outbuf;
625
 
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE) 
 
626
      if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE) 
626
627
      {
627
628
        s->z_err = Z_ERRNO;
628
629
        break;
663
664
 
664
665
    if (len != 0) 
665
666
    {
666
 
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len) 
 
667
      if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len) 
667
668
      {
668
669
        s->z_err = Z_ERRNO;
669
670
        assert(0);
696
697
    s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
697
698
 
698
699
  afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
699
 
  if(write_header(s))
700
 
    return Z_ERRNO;
 
700
  write_header(s);
701
701
 
702
702
  return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
703
703
}
704
704
 
705
705
static unsigned int azio_enable_aio(azio_stream *s)
706
706
{
707
 
  pthread_cond_init(&s->container.threshhold, NULL);
708
 
  pthread_mutex_init(&s->container.thresh_mutex, NULL);
 
707
  VOID(pthread_cond_init(&s->container.threshhold, NULL));
 
708
  VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
709
709
  azio_start(s);
710
710
 
711
711
  return 0;
715
715
{
716
716
  azio_kill(s);
717
717
 
718
 
  pthread_mutex_destroy(&s->container.thresh_mutex);
719
 
  pthread_cond_destroy(&s->container.threshhold);
 
718
  VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
 
719
  VOID(pthread_cond_destroy(&s->container.threshhold));
720
720
 
721
721
  s->method= AZ_METHOD_BLOCK;
722
722
}
723
723
 
724
 
int ZEXPORT azflush (azio_stream *s,int flush)
 
724
int ZEXPORT azflush (s, flush)
 
725
  azio_stream *s;
 
726
  int flush;
725
727
{
726
728
  int err;
727
729
 
728
730
  if (s->mode == 'r') 
729
731
  {
730
732
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
731
 
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
732
 
    if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
733
 
      return Z_ERRNO;
 
733
    pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
734
734
    read_header(s, buffer); /* skip the .az header */
735
735
    azrewind(s);
736
736
 
777
777
/* ===========================================================================
778
778
  Rewinds input file.
779
779
*/
780
 
int azrewind (azio_stream *s)
 
780
int azrewind (s)
 
781
  azio_stream *s;
781
782
{
782
783
  if (s == NULL || s->mode != 'r') return -1;
783
784
 
806
807
  SEEK_END is not implemented, returns error.
807
808
  In this version of the library, azseek can be extremely slow.
808
809
*/
809
 
size_t azseek (azio_stream *s, size_t offset, int whence)
 
810
size_t azseek (s, offset, whence)
 
811
  azio_stream *s;
 
812
  size_t offset;
 
813
  int whence;
810
814
{
811
815
 
812
816
  if (s == NULL || whence == SEEK_END ||
813
817
      s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
814
 
    return SIZE_MAX;
 
818
    return -1L;
815
819
  }
816
820
 
817
821
  if (s->mode == 'w') 
827
831
      if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
828
832
 
829
833
      size = azwrite(s, s->inbuf, size);
830
 
      if (size == 0)
831
 
        return SIZE_MAX;
 
834
      if (size == 0) return -1L;
832
835
 
833
836
      offset -= size;
834
837
    }
845
848
  if (offset >= s->out) {
846
849
    offset -= s->out;
847
850
  } else if (azrewind(s)) {
848
 
    return SIZE_MAX;
 
851
    return -1L;
849
852
  }
850
853
  /* offset is now the number of bytes to skip. */
851
854
 
861
864
    if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
862
865
 
863
866
    size = azread_internal(s, s->outbuf, size, &error);
864
 
    if (error < 0) return SIZE_MAX;
 
867
    if (error < 0) return -1L;
865
868
    offset -= size;
866
869
  }
867
870
  return s->out;
872
875
  given compressed file. This position represents a number of bytes in the
873
876
  uncompressed data stream.
874
877
*/
875
 
size_t ZEXPORT aztell (azio_stream *file)
 
878
size_t ZEXPORT aztell (file)
 
879
  azio_stream *file;
876
880
{
877
881
  return azseek(file, 0L, SEEK_CUR);
878
882
}
884
888
void putLong (azio_stream *s, uLong x)
885
889
{
886
890
  int n;
887
 
  unsigned char buffer[1];
 
891
  uchar buffer[1];
888
892
 
889
893
  for (n = 0; n < 4; n++) 
890
894
  {
891
895
    buffer[0]= (int)(x & 0xff);
892
 
    assert(pwrite(s->file, buffer, 1, s->pos)==1);
 
896
    pwrite(s->file, buffer, 1, s->pos);
893
897
    s->pos++;
894
898
    x >>= 8;
895
899
  }
970
974
  s->frm_length= length;
971
975
  s->start+= length;
972
976
 
973
 
  if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
974
 
    return 1;
 
977
  pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
975
978
 
976
979
  write_header(s);
977
980
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
981
984
 
982
985
int azread_frm(azio_stream *s, char *blob)
983
986
{
984
 
  ssize_t r= pread(s->file, (unsigned char*) blob,
985
 
                   s->frm_length, s->frm_start_pos);
986
 
  if (r != (ssize_t)s->frm_length)
987
 
    return r;
 
987
  pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
988
988
 
989
989
  return 0;
990
990
}
1005
1005
  s->comment_length= length;
1006
1006
  s->start+= length;
1007
1007
 
1008
 
  ssize_t r= pwrite(s->file, (unsigned char*) blob,
1009
 
                    s->comment_length, s->comment_start_pos);
1010
 
  if (r != (ssize_t)s->comment_length)
1011
 
    return r;
 
1008
  pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1012
1009
 
1013
1010
  write_header(s);
1014
1011
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1018
1015
 
1019
1016
int azread_comment(azio_stream *s, char *blob)
1020
1017
{
1021
 
  ssize_t r= pread(s->file, (unsigned char*) blob,
1022
 
                   s->comment_length, s->comment_start_pos);
1023
 
  if (r != (ssize_t)s->comment_length)
1024
 
    return r;
 
1018
  pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
1025
1019
 
1026
1020
  return 0;
1027
1021
}
1075
1069
#ifdef AZIO_AIO
1076
1070
use_pread:
1077
1071
#endif
1078
 
    s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
 
1072
    s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
1079
1073
                                     AZ_BUFSIZE_READ, s->pos);
1080
1074
    s->pos+= s->stream.avail_in;
1081
1075
  }