~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/azio.c

  • Committer: Brian Aker
  • Date: 2008-11-04 15:39:09 UTC
  • mfrom: (575.1.2 devel)
  • Revision ID: brian@tangent.org-20081104153909-c72hn65udxs1ccal
Merge of Monty's work

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
22
22
 
23
 
/* gzip flag uchar */
24
 
#define ASCII_FLAG   0x01 /* bit 0 set: file probably ascii text */
25
 
#define HEAD_CRC     0x02 /* bit 1 set: header CRC present */
26
 
#define EXTRA_FIELD  0x04 /* bit 2 set: extra field present */
27
 
#define ORIG_NAME    0x08 /* bit 3 set: original file name present */
28
 
#define COMMENT      0x10 /* bit 4 set: file comment present */
29
 
#define RESERVED     0xE0 /* bits 5..7: reserved */
30
 
 
31
23
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
32
24
static int azrewind (azio_stream *s);
33
25
static unsigned int azio_enable_aio(azio_stream *s);
34
26
static int do_flush(azio_stream *file, int flush);
35
27
static int    get_byte(azio_stream *s);
36
28
static void   check_header(azio_stream *s);
37
 
static void write_header(azio_stream *s);
 
29
static int write_header(azio_stream *s);
38
30
static int    destroy(azio_stream *s);
39
31
static void putLong(azio_stream *s, uLong x);
40
32
static uLong  getLong(azio_stream *s);
253
245
    s->frm_length= 0;
254
246
    s->dirty= 1; /* We create the file dirty */
255
247
    s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
256
 
    write_header(s);
 
248
    if(write_header(s))
 
249
      return Z_NULL;
257
250
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
258
251
  }
259
252
  else if (s->mode == 'w') 
260
253
  {
261
 
    uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
262
 
    pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
 
254
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
 
255
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
 
256
    if(pread(s->file, buffer, read_size, 0) < read_size)
 
257
      return Z_NULL;
263
258
    read_header(s, buffer);
264
259
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
265
260
  }
282
277
}
283
278
 
284
279
 
285
 
void write_header(azio_stream *s)
 
280
int write_header(azio_stream *s)
286
281
{
287
282
  char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
288
283
  char *ptr= buffer;
306
301
  int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
307
302
  int4store(ptr + AZ_META_POS, 0); /* Meta Block */
308
303
  int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
309
 
  int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
310
 
  int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
311
 
  int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
312
 
  int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
313
 
  int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
 
304
  int8store(ptr + AZ_START_POS, (uint64_t)s->start); /* Start of Data Block Index Block */
 
305
  int8store(ptr + AZ_ROW_POS, (uint64_t)s->rows); /* Start of Data Block Index Block */
 
306
  int8store(ptr + AZ_FLUSH_POS, (uint64_t)s->forced_flushes); /* Start of Data Block Index Block */
 
307
  int8store(ptr + AZ_CHECK_POS, (uint64_t)s->check_point); /* Start of Data Block Index Block */
 
308
  int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
314
309
  int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
315
310
  int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
316
311
  int4store(ptr+ AZ_FRM_POS, 
318
313
  *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
319
314
 
320
315
  /* Always begin at the begining, and end there as well */
321
 
  pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
 
316
  const ssize_t write_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
 
317
  if(pwrite(s->file, (unsigned char*) buffer, write_size, 0)!=write_size)
 
318
    return -1;
 
319
 
 
320
  return 0;
322
321
}
323
322
 
324
323
/* ===========================================================================
367
366
  if (len < 2) {
368
367
    if (len) s->inbuf[0] = s->stream.next_in[0];
369
368
    errno = 0;
370
 
    len = (uInt)pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
 
369
    len = (uInt)pread(s->file, (unsigned char *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos);
371
370
    s->pos+= len;
372
371
    if (len == (uInt)-1) s->z_err = Z_ERRNO;
373
372
    s->stream.avail_in += len;
402
401
    s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
403
402
    s->block_size= 1024 * buffer[AZ_BLOCK_POS];
404
403
    s->start= (size_t)uint8korr(buffer + AZ_START_POS);
405
 
    s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
406
 
    s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
407
 
    s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
408
 
    s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
 
404
    s->rows= (uint64_t)uint8korr(buffer + AZ_ROW_POS);
 
405
    s->check_point= (uint64_t)uint8korr(buffer + AZ_CHECK_POS);
 
406
    s->forced_flushes= (uint64_t)uint8korr(buffer + AZ_FLUSH_POS);
 
407
    s->auto_increment= (uint64_t)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
409
408
    s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
410
409
    s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
411
410
    s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
623
622
    {
624
623
 
625
624
      s->stream.next_out = s->outbuf;
626
 
      if (pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE) 
 
625
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE) 
627
626
      {
628
627
        s->z_err = Z_ERRNO;
629
628
        break;
664
663
 
665
664
    if (len != 0) 
666
665
    {
667
 
      if ((uInt)pwrite(s->file, (uchar *)s->outbuf, len, s->pos) != len) 
 
666
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len) 
668
667
      {
669
668
        s->z_err = Z_ERRNO;
670
669
        assert(0);
697
696
    s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
698
697
 
699
698
  afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
700
 
  write_header(s);
 
699
  if(write_header(s))
 
700
    return Z_ERRNO;
701
701
 
702
702
  return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
703
703
}
704
704
 
705
705
static unsigned int azio_enable_aio(azio_stream *s)
706
706
{
707
 
  VOID(pthread_cond_init(&s->container.threshhold, NULL));
708
 
  VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
 
707
  pthread_cond_init(&s->container.threshhold, NULL);
 
708
  pthread_mutex_init(&s->container.thresh_mutex, NULL);
709
709
  azio_start(s);
710
710
 
711
711
  return 0;
715
715
{
716
716
  azio_kill(s);
717
717
 
718
 
  VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
719
 
  VOID(pthread_cond_destroy(&s->container.threshhold));
 
718
  pthread_mutex_destroy(&s->container.thresh_mutex);
 
719
  pthread_cond_destroy(&s->container.threshhold);
720
720
 
721
721
  s->method= AZ_METHOD_BLOCK;
722
722
}
730
730
  if (s->mode == 'r') 
731
731
  {
732
732
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
733
 
    pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0);
 
733
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
 
734
    if(pread(s->file, (unsigned char*) buffer, read_size, 0)!=read_size)
 
735
      return Z_ERRNO;
734
736
    read_header(s, buffer); /* skip the .az header */
735
737
    azrewind(s);
736
738
 
888
890
void putLong (azio_stream *s, uLong x)
889
891
{
890
892
  int n;
891
 
  uchar buffer[1];
 
893
  unsigned char buffer[1];
892
894
 
893
895
  for (n = 0; n < 4; n++) 
894
896
  {
895
897
    buffer[0]= (int)(x & 0xff);
896
 
    pwrite(s->file, buffer, 1, s->pos);
 
898
    assert(pwrite(s->file, buffer, 1, s->pos)==1);
897
899
    s->pos++;
898
900
    x >>= 8;
899
901
  }
974
976
  s->frm_length= length;
975
977
  s->start+= length;
976
978
 
977
 
  pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
 
979
  if (pwrite(s->file, (unsigned char*) blob, s->frm_length, s->frm_start_pos) != (ssize_t)s->frm_length)
 
980
    return 1;
978
981
 
979
982
  write_header(s);
980
983
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
984
987
 
985
988
int azread_frm(azio_stream *s, char *blob)
986
989
{
987
 
  pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos);
 
990
  ssize_t r= pread(s->file, (unsigned char*) blob,
 
991
                   s->frm_length, s->frm_start_pos);
 
992
  if (r != (ssize_t)s->frm_length)
 
993
    return r;
988
994
 
989
995
  return 0;
990
996
}
1005
1011
  s->comment_length= length;
1006
1012
  s->start+= length;
1007
1013
 
1008
 
  pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
 
1014
  ssize_t r= pwrite(s->file, (unsigned char*) blob,
 
1015
                    s->comment_length, s->comment_start_pos);
 
1016
  if (r != (ssize_t)s->comment_length)
 
1017
    return r;
1009
1018
 
1010
1019
  write_header(s);
1011
1020
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1015
1024
 
1016
1025
int azread_comment(azio_stream *s, char *blob)
1017
1026
{
1018
 
  pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos);
 
1027
  ssize_t r= pread(s->file, (unsigned char*) blob,
 
1028
                   s->comment_length, s->comment_start_pos);
 
1029
  if (r != (ssize_t)s->comment_length)
 
1030
    return r;
1019
1031
 
1020
1032
  return 0;
1021
1033
}
1069
1081
#ifdef AZIO_AIO
1070
1082
use_pread:
1071
1083
#endif
1072
 
    s->stream.avail_in = (uInt)pread(s->file, (uchar *)s->inbuf,
 
1084
    s->stream.avail_in = (uInt)pread(s->file, (unsigned char *)s->inbuf,
1073
1085
                                     AZ_BUFSIZE_READ, s->pos);
1074
1086
    s->pos+= s->stream.avail_in;
1075
1087
  }