~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/azio.c

  • Committer: Monty Taylor
  • Date: 2008-10-23 00:05:28 UTC
  • Revision ID: monty@inaugust.com-20081023000528-grdvrd8c4058nutm
Moved my_handler to myisam, which is where it actually belongs.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
/* @(#) $Id$ */
13
13
 
14
 
#include "config.h"
15
 
 
16
14
#include "azio.h"
17
15
 
18
 
#include <fcntl.h>
 
16
#include <stdio.h>
 
17
#include <string.h>
 
18
#include <assert.h>
19
19
#include <unistd.h>
20
20
 
21
 
#include <cstdio>
22
 
#include <cstring>
23
 
#include <cstdlib>
24
 
#include <cassert>
25
 
 
26
 
using namespace drizzled;
27
 
 
28
21
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
29
22
 
30
23
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
43
36
static void do_aio_cleanup(azio_stream *s);
44
37
#endif
45
38
 
46
 
extern "C" pthread_handler_t run_task(void *p);
47
 
 
48
 
extern "C" pthread_handler_t run_task(void *p)
 
39
static pthread_handler_t run_task(void *p)
49
40
{
50
41
  int fd;
51
42
  char *buffer;
52
43
  size_t offset;
53
 
  azio_stream *s= (azio_stream *)p;
 
44
  azio_stream *s= (azio_stream *)p;  
54
45
 
55
 
  internal::my_thread_init();
 
46
  my_thread_init();
56
47
 
57
48
  while (1)
58
49
  {
63
54
    }
64
55
    offset= s->container.offset;
65
56
    fd= s->container.fd;
66
 
    buffer= (char *)s->container.buffer;
 
57
    buffer= s->container.buffer;
67
58
    pthread_mutex_unlock(&s->container.thresh_mutex);
68
59
 
69
60
    if (s->container.ready == AZ_THREAD_DEAD)
77
68
    pthread_mutex_unlock(&s->container.thresh_mutex);
78
69
  }
79
70
 
80
 
  internal::my_thread_end();
 
71
  my_thread_end();
81
72
 
82
73
  return 0;
83
74
}
85
76
static void azio_kill(azio_stream *s)
86
77
{
87
78
  pthread_mutex_lock(&s->container.thresh_mutex);
88
 
  s->container.ready= AZ_THREAD_DEAD;
 
79
  s->container.ready= AZ_THREAD_DEAD; 
89
80
  pthread_mutex_unlock(&s->container.thresh_mutex);
90
81
 
91
82
  pthread_cond_signal(&s->container.threshhold);
92
 
  pthread_join(s->container.mainthread, NULL);
 
83
  pthread_join(s->container.mainthread, (void *)NULL);
93
84
}
94
85
 
95
86
static size_t azio_return(azio_stream *s)
127
118
  pthread_attr_init(&attr);
128
119
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
129
120
 
130
 
  s->container.ready= AZ_THREAD_FINISHED;
 
121
  s->container.ready= AZ_THREAD_FINISHED; 
131
122
 
132
123
  /* If we don't create a thread, signal the caller */
133
124
  if (pthread_create(&s->container.mainthread, &attr, run_task,
142
133
static int azio_read(azio_stream *s)
143
134
{
144
135
  pthread_mutex_lock(&s->container.thresh_mutex);
145
 
  s->container.ready= AZ_THREAD_ACTIVE;
 
136
  s->container.ready= AZ_THREAD_ACTIVE; 
146
137
  pthread_mutex_unlock(&s->container.thresh_mutex);
147
138
  pthread_cond_broadcast(&s->container.threshhold);
148
139
 
163
154
  int err;
164
155
  int level = Z_DEFAULT_COMPRESSION ; /* compression level */
165
156
  int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
166
 
  int fd= -1;
 
157
  File fd= -1;
167
158
 
168
159
  memset(s, 0, sizeof(azio_stream));
169
160
 
188
179
  s->method= method;
189
180
 
190
181
  /*
191
 
    We do our own version of append by nature.
 
182
    We do our own version of append by nature. 
192
183
    We must always have write access to take card of the header.
193
184
  */
194
185
  assert(Flags | O_APPEND);
195
186
  assert(Flags | O_WRONLY);
196
187
 
197
 
  if (Flags & O_RDWR)
 
188
  if (Flags & O_RDWR) 
198
189
    s->mode = 'w';
199
190
 
200
 
  if (s->mode == 'w')
 
191
  if (s->mode == 'w') 
201
192
  {
202
193
    err = deflateInit2(&(s->stream), level,
203
194
                       Z_DEFLATED, -MAX_WBITS, 8, strategy);
229
220
  s->stream.avail_out = AZ_BUFSIZE_WRITE;
230
221
 
231
222
  errno = 0;
232
 
  s->file = fd < 0 ? internal::my_open(path, Flags, MYF(0)) : fd;
 
223
  s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
233
224
#ifdef AZIO_AIO
234
225
  s->container.fd= s->file;
235
226
#endif
236
227
 
237
 
  if (s->file < 0 )
 
228
  if (s->file < 0 ) 
238
229
  {
239
230
    destroy(s);
240
231
    return Z_NULL;
241
232
  }
242
233
 
243
 
  if (Flags & O_CREAT || Flags & O_TRUNC)
 
234
  if (Flags & O_CREAT || Flags & O_TRUNC) 
244
235
  {
245
236
    s->rows= 0;
246
237
    s->forced_flushes= 0;
256
247
    s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
257
248
    if(write_header(s))
258
249
      return Z_NULL;
259
 
    s->pos= (size_t)lseek(s->file, 0, SEEK_END);
 
250
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
260
251
  }
261
 
  else if (s->mode == 'w')
 
252
  else if (s->mode == 'w') 
262
253
  {
263
254
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
264
255
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
265
256
    if(pread(s->file, buffer, read_size, 0) < read_size)
266
257
      return Z_NULL;
267
258
    read_header(s, buffer);
268
 
    s->pos= (size_t)lseek(s->file, 0, SEEK_END);
 
259
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
269
260
  }
270
261
  else
271
262
  {
317
308
  int8store(ptr + AZ_AUTOINCREMENT_POS, (uint64_t)s->auto_increment); /* Start of Data Block Index Block */
318
309
  int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
319
310
  int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
320
 
  int4store(ptr+ AZ_FRM_POS,
 
311
  int4store(ptr+ AZ_FRM_POS, 
321
312
            AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
322
313
  *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
323
314
 
334
325
  for end of file.
335
326
  IN assertion: the stream s has been sucessfully opened for reading.
336
327
*/
337
 
int get_byte(azio_stream *s)
 
328
int get_byte(s)
 
329
  azio_stream *s;
338
330
{
339
331
  if (s->z_eof) return EOF;
340
 
  if (s->stream.avail_in == 0)
 
332
  if (s->stream.avail_in == 0) 
341
333
  {
342
334
    errno = 0;
343
 
    if (s->stream.avail_in == 0)
 
335
    if (s->stream.avail_in == 0) 
344
336
    {
345
337
      s->z_eof = 1;
346
338
      return EOF;
432
424
 * Cleanup then free the given azio_stream. Return a zlib error code.
433
425
 Try freeing in the reverse order of allocations.
434
426
 */
435
 
int destroy (azio_stream *s)
 
427
int destroy (s)
 
428
  azio_stream *s;
436
429
{
437
430
  int err = Z_OK;
438
431
 
439
 
  if (s->stream.state != NULL)
 
432
  if (s->stream.state != NULL) 
440
433
  {
441
 
    if (s->mode == 'w')
 
434
    if (s->mode == 'w') 
442
435
    {
443
436
      err = deflateEnd(&(s->stream));
444
 
      internal::my_sync(s->file, MYF(0));
 
437
      my_sync(s->file, MYF(0));
445
438
    }
446
 
    else if (s->mode == 'r')
 
439
    else if (s->mode == 'r') 
447
440
      err = inflateEnd(&(s->stream));
448
441
  }
449
442
 
450
443
  do_aio_cleanup(s);
451
444
 
452
 
  if (s->file > 0 && internal::my_close(s->file, MYF(0)))
 
445
  if (s->file > 0 && my_close(s->file, MYF(0))) 
453
446
      err = Z_ERRNO;
454
447
 
455
448
  s->file= -1;
463
456
  Reads the given number of uncompressed bytes from the compressed file.
464
457
  azread returns the number of bytes actually read (0 for end of file).
465
458
*/
466
 
/*
467
 
   This function is legacy, do not use.
468
 
 
469
 
     Reads the given number of uncompressed bytes from the compressed file.
470
 
   If the input file was not in gzip format, gzread copies the given number
471
 
   of bytes into the buffer.
472
 
     gzread returns the number of uncompressed bytes actually read (0 for
473
 
   end of file, -1 for error).
474
 
*/
475
 
static unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
 
459
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
476
460
{
477
461
  Bytef *start = (Bytef*)buf; /* starting point for crc computation */
478
462
  Byte  *next_out; /* == stream.next_out but not forced far (for MSDOS) */
479
463
  *error= 0;
480
464
 
481
465
  if (s->mode != 'r')
482
 
  {
 
466
  { 
483
467
    *error= Z_STREAM_ERROR;
484
468
    return 0;
485
469
  }
486
470
 
487
471
  if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
488
 
  {
 
472
  { 
489
473
    *error= s->z_err;
490
474
    return 0;
491
475
  }
492
476
 
493
477
  if (s->z_err == Z_STREAM_END)  /* EOF */
494
 
  {
 
478
  { 
495
479
    return 0;
496
480
  }
497
481
 
508
492
    start++;
509
493
    if (s->last) {
510
494
      s->z_err = Z_STREAM_END;
511
 
      {
 
495
      { 
512
496
        return 1;
513
497
      }
514
498
    }
520
504
 
521
505
      errno = 0;
522
506
      get_block(s);
523
 
      if (s->stream.avail_in == 0)
 
507
      if (s->stream.avail_in == 0) 
524
508
      {
525
509
        s->z_eof = 1;
526
510
      }
546
530
         * Check for such files:
547
531
       */
548
532
        check_header(s);
549
 
        if (s->z_err == Z_OK)
 
533
        if (s->z_err == Z_OK) 
550
534
        {
551
535
          inflateReset(&(s->stream));
552
536
          s->crc = crc32(0L, Z_NULL, 0);
569
553
}
570
554
 
571
555
/* ===========================================================================
572
 
  Experimental Interface. We abstract out a concecpt of rows
 
556
  Experimental Interface. We abstract out a concecpt of rows 
573
557
*/
574
558
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
575
559
{
603
587
  size_t read;
604
588
 
605
589
  read= azread_internal(s, buffer, sizeof(unsigned int), error);
606
 
 
 
590
  
607
591
  /* On error the return value will be zero as well */
608
592
  if (read == 0)
609
593
    return read;
612
596
  new_ptr= (char *)realloc(s->row_ptr, (sizeof(char) * row_length));
613
597
 
614
598
  if (!new_ptr)
615
 
    return SIZE_MAX;
 
599
    return -1;
616
600
 
617
601
  s->row_ptr= new_ptr;
618
602
 
632
616
  s->stream.next_in = (Bytef*)buf;
633
617
  s->stream.avail_in = len;
634
618
 
635
 
  while (s->stream.avail_in != 0)
 
619
  while (s->stream.avail_in != 0) 
636
620
  {
637
 
    if (s->stream.avail_out == 0)
 
621
    if (s->stream.avail_out == 0) 
638
622
    {
639
623
 
640
624
      s->stream.next_out = s->outbuf;
641
 
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE)
 
625
      if (pwrite(s->file, (unsigned char *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos) != AZ_BUFSIZE_WRITE) 
642
626
      {
643
627
        s->z_err = Z_ERRNO;
644
628
        break;
673
657
 
674
658
  s->stream.avail_in = 0; /* should be zero already anyway */
675
659
 
676
 
  for (;;)
 
660
  for (;;) 
677
661
  {
678
662
    len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
679
663
 
680
 
    if (len != 0)
 
664
    if (len != 0) 
681
665
    {
682
 
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len)
 
666
      if ((uInt)pwrite(s->file, (unsigned char *)s->outbuf, len, s->pos) != len) 
683
667
      {
684
668
        s->z_err = Z_ERRNO;
685
669
        assert(0);
711
695
  else
712
696
    s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
713
697
 
714
 
  afterwrite_pos= (size_t)lseek(s->file, 0, SEEK_CUR);
 
698
  afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
715
699
  if(write_header(s))
716
700
    return Z_ERRNO;
717
701
 
737
721
  s->method= AZ_METHOD_BLOCK;
738
722
}
739
723
 
740
 
int ZEXPORT azflush (azio_stream *s,int flush)
 
724
int ZEXPORT azflush (s, flush)
 
725
  azio_stream *s;
 
726
  int flush;
741
727
{
742
728
  int err;
743
729
 
744
 
  if (s->mode == 'r')
 
730
  if (s->mode == 'r') 
745
731
  {
746
732
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
747
733
    const ssize_t read_size= AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
758
744
    err= do_flush(s, flush);
759
745
 
760
746
    if (err) return err;
761
 
    internal::my_sync(s->file, MYF(0));
 
747
    my_sync(s->file, MYF(0));
762
748
    return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
763
749
  }
764
750
}
793
779
/* ===========================================================================
794
780
  Rewinds input file.
795
781
*/
796
 
int azrewind (azio_stream *s)
 
782
int azrewind (s)
 
783
  azio_stream *s;
797
784
{
798
785
  if (s == NULL || s->mode != 'r') return -1;
799
786
 
822
809
  SEEK_END is not implemented, returns error.
823
810
  In this version of the library, azseek can be extremely slow.
824
811
*/
825
 
size_t azseek (azio_stream *s, size_t offset, int whence)
 
812
size_t azseek (s, offset, whence)
 
813
  azio_stream *s;
 
814
  size_t offset;
 
815
  int whence;
826
816
{
827
817
 
828
818
  if (s == NULL || whence == SEEK_END ||
829
819
      s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
830
 
    return SIZE_MAX;
 
820
    return -1L;
831
821
  }
832
822
 
833
 
  if (s->mode == 'w')
 
823
  if (s->mode == 'w') 
834
824
  {
835
 
    if (whence == SEEK_SET)
 
825
    if (whence == SEEK_SET) 
836
826
      offset -= s->in;
837
827
 
838
828
    /* At this point, offset is the number of zero bytes to write. */
839
829
    /* There was a zmemzero here if inbuf was null -Brian */
840
 
    while (offset > 0)
 
830
    while (offset > 0)  
841
831
    {
842
832
      uInt size = AZ_BUFSIZE_READ;
843
833
      if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
844
834
 
845
835
      size = azwrite(s, s->inbuf, size);
846
 
      if (size == 0)
847
 
        return SIZE_MAX;
 
836
      if (size == 0) return -1L;
848
837
 
849
838
      offset -= size;
850
839
    }
861
850
  if (offset >= s->out) {
862
851
    offset -= s->out;
863
852
  } else if (azrewind(s)) {
864
 
    return SIZE_MAX;
 
853
    return -1L;
865
854
  }
866
855
  /* offset is now the number of bytes to skip. */
867
856
 
877
866
    if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
878
867
 
879
868
    size = azread_internal(s, s->outbuf, size, &error);
880
 
    if (error < 0) return SIZE_MAX;
 
869
    if (error < 0) return -1L;
881
870
    offset -= size;
882
871
  }
883
872
  return s->out;
888
877
  given compressed file. This position represents a number of bytes in the
889
878
  uncompressed data stream.
890
879
*/
891
 
size_t ZEXPORT aztell (azio_stream *file)
 
880
size_t ZEXPORT aztell (file)
 
881
  azio_stream *file;
892
882
{
893
883
  return azseek(file, 0L, SEEK_CUR);
894
884
}
902
892
  int n;
903
893
  unsigned char buffer[1];
904
894
 
905
 
  for (n = 0; n < 4; n++)
 
895
  for (n = 0; n < 4; n++) 
906
896
  {
907
897
    buffer[0]= (int)(x & 0xff);
908
 
    size_t ret= pwrite(s->file, buffer, 1, s->pos);
909
 
    assert(ret == 1);
 
898
    assert(pwrite(s->file, buffer, 1, s->pos)==1);
910
899
    s->pos++;
911
900
    x >>= 8;
912
901
  }
938
927
  int returnable;
939
928
 
940
929
  if (s == NULL) return Z_STREAM_ERROR;
941
 
 
 
930
  
942
931
  if (s->file < 1) return Z_OK;
943
932
 
944
 
  if (s->mode == 'w')
 
933
  if (s->mode == 'w') 
945
934
  {
946
935
    if (do_flush(s, Z_FINISH) != Z_OK)
947
936
      return destroy(s);
972
961
}
973
962
 
974
963
/*
975
 
  Though this was added to support MySQL's FRM file, anything can be
 
964
  Though this was added to support MySQL's FRM file, anything can be 
976
965
  stored in this location.
977
966
*/
978
 
int azwrite_frm(azio_stream *s, const char *blob, unsigned int length)
 
967
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
979
968
{
980
 
  if (s->mode == 'r')
 
969
  if (s->mode == 'r') 
981
970
    return 1;
982
971
 
983
 
  if (s->rows > 0)
 
972
  if (s->rows > 0) 
984
973
    return 1;
985
974
 
986
975
  s->frm_start_pos= (uint) s->start;
991
980
    return 1;
992
981
 
993
982
  write_header(s);
994
 
  s->pos= (size_t)lseek(s->file, 0, SEEK_END);
 
983
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
995
984
 
996
985
  return 0;
997
986
}
1010
999
/*
1011
1000
  Simple comment field
1012
1001
*/
1013
 
int azwrite_comment(azio_stream *s, const char *blob, unsigned int length)
 
1002
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
1014
1003
{
1015
 
  if (s->mode == 'r')
1016
 
    return -1;
 
1004
  if (s->mode == 'r') 
 
1005
    return 1;
1017
1006
 
1018
 
  if (s->rows > 0)
1019
 
    return -1;
 
1007
  if (s->rows > 0) 
 
1008
    return 1;
1020
1009
 
1021
1010
  s->comment_start_pos= (uint) s->start;
1022
1011
  s->comment_length= length;
1025
1014
  ssize_t r= pwrite(s->file, (unsigned char*) blob,
1026
1015
                    s->comment_length, s->comment_start_pos);
1027
1016
  if (r != (ssize_t)s->comment_length)
1028
 
    return -1;
 
1017
    return r;
1029
1018
 
1030
1019
  write_header(s);
1031
 
  s->pos= (size_t)lseek(s->file, 0, SEEK_END);
 
1020
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
1032
1021
 
1033
1022
  return 0;
1034
1023
}
1054
1043
}
1055
1044
#endif
1056
1045
 
1057
 
/*
 
1046
/* 
1058
1047
  Normally all IO goes through azio_read(), but in case of error or non-support
1059
1048
  we make use of pread().
1060
1049
*/
1061
1050
static void get_block(azio_stream *s)
1062
1051
{
1063
1052
#ifdef AZIO_AIO
1064
 
  if (s->method == AZ_METHOD_AIO && s->mode == 'r'
 
1053
  if (s->method == AZ_METHOD_AIO && s->mode == 'r' 
1065
1054
      && s->pos < s->check_point
1066
1055
      && s->aio_inited)
1067
1056
  {
1076
1065
    }
1077
1066
    s->pos+= s->stream.avail_in;
1078
1067
    s->inbuf= (Byte *)s->container.buffer;
1079
 
    /* We only azio_read when we know there is more data to be read */
 
1068
    /* We only aio_read when we know there is more data to be read */
1080
1069
    if (s->pos >= s->check_point)
1081
1070
    {
1082
1071
      s->aio_inited= 0;