~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/azio.c

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  azio is a modified version of gzio. It  makes use of mysys and removes mallocs.
 
3
    -Brian Aker
 
4
*/
 
5
 
 
6
/* gzio.c -- IO on .gz files
 
7
 * Copyright (C) 1995-2005 Jean-loup Gailly.
 
8
 * For conditions of distribution and use, see copyright notice in zlib.h
 
9
 *
 
10
 */
 
11
 
 
12
/* @(#) $Id$ */
 
13
 
 
14
#include "azio.h"
 
15
 
 
16
#include <stdio.h>
 
17
#include <string.h>
 
18
#include <assert.h>
 
19
 
 
20
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
 
21
 
 
22
/* gzip flag uchar */
 
23
#define ASCII_FLAG   0x01 /* bit 0 set: file probably ascii text */
 
24
#define HEAD_CRC     0x02 /* bit 1 set: header CRC present */
 
25
#define EXTRA_FIELD  0x04 /* bit 2 set: extra field present */
 
26
#define ORIG_NAME    0x08 /* bit 3 set: original file name present */
 
27
#define COMMENT      0x10 /* bit 4 set: file comment present */
 
28
#define RESERVED     0xE0 /* bits 5..7: reserved */
 
29
 
 
30
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len);
 
31
static int azrewind (azio_stream *s);
 
32
static unsigned int azio_enable_aio(azio_stream *s);
 
33
static int do_flush(azio_stream *file, int flush);
 
34
static int    get_byte(azio_stream *s);
 
35
static void   check_header(azio_stream *s);
 
36
static void write_header(azio_stream *s);
 
37
static int    destroy(azio_stream *s);
 
38
static void putLong(azio_stream *s, uLong x);
 
39
static uLong  getLong(azio_stream *s);
 
40
static void read_header(azio_stream *s, unsigned char *buffer);
 
41
static void get_block(azio_stream *s);
 
42
#ifdef AZIO_AIO
 
43
static void do_aio_cleanup(azio_stream *s);
 
44
#endif
 
45
 
 
46
static pthread_handler_t run_task(void *p)
 
47
{
 
48
  int fd;
 
49
  char *buffer;
 
50
  size_t offset;
 
51
  azio_stream *s= (azio_stream *)p;  
 
52
 
 
53
  my_thread_init();
 
54
 
 
55
  while (1)
 
56
  {
 
57
    pthread_mutex_lock(&s->container.thresh_mutex);
 
58
    while (s->container.ready == AZ_THREAD_FINISHED)
 
59
    {
 
60
      pthread_cond_wait(&s->container.threshhold, &s->container.thresh_mutex);
 
61
    }
 
62
    offset= s->container.offset;
 
63
    fd= s->container.fd;
 
64
    buffer= s->container.buffer;
 
65
    pthread_mutex_unlock(&s->container.thresh_mutex);
 
66
 
 
67
    if (s->container.ready == AZ_THREAD_DEAD)
 
68
      break;
 
69
 
 
70
    s->container.read_size= my_pread(fd, (uchar *)buffer, AZ_BUFSIZE_READ, 
 
71
                                     offset, MYF(0));
 
72
 
 
73
    pthread_mutex_lock(&s->container.thresh_mutex);
 
74
    s->container.ready= AZ_THREAD_FINISHED; 
 
75
    pthread_mutex_unlock(&s->container.thresh_mutex);
 
76
  }
 
77
 
 
78
  my_thread_end();
 
79
 
 
80
  return 0;
 
81
}
 
82
 
 
83
static void azio_kill(azio_stream *s)
 
84
{
 
85
  pthread_mutex_lock(&s->container.thresh_mutex);
 
86
  s->container.ready= AZ_THREAD_DEAD; 
 
87
  pthread_mutex_unlock(&s->container.thresh_mutex);
 
88
 
 
89
  pthread_cond_signal(&s->container.threshhold);
 
90
  pthread_join(s->container.mainthread, (void *)NULL);
 
91
}
 
92
 
 
93
static size_t azio_return(azio_stream *s)
 
94
{
 
95
  return s->container.read_size;
 
96
}
 
97
 
 
98
/*
 
99
  Worried about spin?
 
100
  Don't be. In tests it never has spun more then 1 times.
 
101
*/
 
102
 
 
103
static az_thread_type azio_ready(azio_stream *s)
 
104
{
 
105
  az_thread_type temp;
 
106
 
 
107
  while (1)
 
108
  {
 
109
    pthread_mutex_lock(&s->container.thresh_mutex);
 
110
    temp= s->container.ready;
 
111
    pthread_mutex_unlock(&s->container.thresh_mutex);
 
112
 
 
113
    if (temp == AZ_THREAD_FINISHED || temp == AZ_THREAD_DEAD)
 
114
      break;
 
115
  }
 
116
 
 
117
  return temp;
 
118
}
 
119
 
 
120
static int azio_start(azio_stream *s)
 
121
{
 
122
  int rc= 0;
 
123
  pthread_attr_t attr;          /* Thread attributes */
 
124
 
 
125
  pthread_attr_init(&attr);
 
126
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
127
 
 
128
  s->container.ready= AZ_THREAD_FINISHED; 
 
129
 
 
130
  /* If we don't create a thread, signal the caller */
 
131
  if (pthread_create(&s->container.mainthread, &attr, run_task,
 
132
                     (void *)s) != 0)
 
133
    rc= 1;
 
134
 
 
135
  pthread_attr_destroy(&attr);
 
136
 
 
137
  return rc;
 
138
}
 
139
 
 
140
static int azio_read(azio_stream *s)
 
141
{
 
142
  pthread_mutex_lock(&s->container.thresh_mutex);
 
143
  s->container.ready= AZ_THREAD_ACTIVE; 
 
144
  pthread_mutex_unlock(&s->container.thresh_mutex);
 
145
  pthread_cond_broadcast(&s->container.threshhold);
 
146
 
 
147
  return 0;
 
148
}
 
149
 
 
150
/* ===========================================================================
 
151
  Opens a gzip (.gz) file for reading or writing. The mode parameter
 
152
  is as in fopen ("rb" or "wb"). The file is given either by file descriptor
 
153
  or path name (if fd == -1).
 
154
  az_open returns NULL if the file could not be opened or if there was
 
155
  insufficient memory to allocate the (de)compression state; errno
 
156
  can be checked to distinguish the two cases (if errno is zero, the
 
157
  zlib error is Z_MEM_ERROR).
 
158
*/
 
159
int azopen(azio_stream *s, const char *path, int Flags, az_method method)
 
160
{
 
161
  int err;
 
162
  int level = Z_DEFAULT_COMPRESSION ; /* compression level */
 
163
  int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
 
164
  File fd= -1;
 
165
 
 
166
  memset(s, 0, sizeof(azio_stream));
 
167
 
 
168
  s->stream.zalloc = (alloc_func)0;
 
169
  s->stream.zfree = (free_func)0;
 
170
  s->stream.opaque = (voidpf)0;
 
171
 
 
172
 
 
173
  s->container.offset= 0;
 
174
  s->container.buffer= (void *)s->buffer1;
 
175
  s->container.ready= AZ_THREAD_FINISHED;
 
176
 
 
177
  s->inbuf= s->buffer1;
 
178
  s->stream.next_in = s->inbuf;
 
179
  s->stream.next_out = s->outbuf;
 
180
  s->z_err = Z_OK;
 
181
  s->back = EOF;
 
182
  s->crc = crc32(0L, Z_NULL, 0);
 
183
  s->mode = 'r';
 
184
  s->version = (unsigned char)az_magic[1]; /* this needs to be a define to version */
 
185
  s->version = (unsigned char)az_magic[2]; /* minor version */
 
186
  s->method= method;
 
187
 
 
188
  /*
 
189
    We do our own version of append by nature. 
 
190
    We must always have write access to take card of the header.
 
191
  */
 
192
  DBUG_ASSERT(Flags | O_APPEND);
 
193
  DBUG_ASSERT(Flags | O_WRONLY);
 
194
 
 
195
  if (Flags & O_RDWR) 
 
196
    s->mode = 'w';
 
197
 
 
198
  if (s->mode == 'w') 
 
199
  {
 
200
    err = deflateInit2(&(s->stream), level,
 
201
                       Z_DEFLATED, -MAX_WBITS, 8, strategy);
 
202
    /* windowBits is passed < 0 to suppress zlib header */
 
203
 
 
204
    s->stream.next_out = s->outbuf;
 
205
    if (err != Z_OK)
 
206
    {
 
207
      destroy(s);
 
208
      return Z_NULL;
 
209
    }
 
210
  } else {
 
211
    /* Threads are only used when we are running with azio */
 
212
    s->stream.next_in  = s->inbuf;
 
213
 
 
214
    err = inflateInit2(&(s->stream), -MAX_WBITS);
 
215
    /* windowBits is passed < 0 to tell that there is no zlib header.
 
216
     * Note that in this case inflate *requires* an extra "dummy" byte
 
217
     * after the compressed stream in order to complete decompression and
 
218
     * return Z_STREAM_END. Here the gzip CRC32 ensures that 4 bytes are
 
219
     * present after the compressed stream.
 
220
   */
 
221
    if (err != Z_OK)
 
222
    {
 
223
      destroy(s);
 
224
      return Z_NULL;
 
225
    }
 
226
  }
 
227
  s->stream.avail_out = AZ_BUFSIZE_WRITE;
 
228
 
 
229
  errno = 0;
 
230
  s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
 
231
#ifdef AZIO_AIO
 
232
  s->container.fd= s->file;
 
233
#endif
 
234
 
 
235
  if (s->file < 0 ) 
 
236
  {
 
237
    destroy(s);
 
238
    return Z_NULL;
 
239
  }
 
240
 
 
241
  if (Flags & O_CREAT || Flags & O_TRUNC) 
 
242
  {
 
243
    s->rows= 0;
 
244
    s->forced_flushes= 0;
 
245
    s->shortest_row= 0;
 
246
    s->longest_row= 0;
 
247
    s->auto_increment= 0;
 
248
    s->check_point= 0;
 
249
    s->comment_start_pos= 0;
 
250
    s->comment_length= 0;
 
251
    s->frm_start_pos= 0;
 
252
    s->frm_length= 0;
 
253
    s->dirty= 1; /* We create the file dirty */
 
254
    s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
 
255
    write_header(s);
 
256
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
257
  }
 
258
  else if (s->mode == 'w') 
 
259
  {
 
260
    uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
 
261
    my_pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
 
262
             MYF(0));
 
263
    read_header(s, buffer);
 
264
    s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
265
  }
 
266
  else
 
267
  {
 
268
    check_header(s); /* skip the .az header */
 
269
  }
 
270
 
 
271
  switch (s->method)
 
272
  {
 
273
  case AZ_METHOD_AIO:
 
274
    azio_enable_aio(s);
 
275
    break;
 
276
  case AZ_METHOD_BLOCK:
 
277
  case AZ_METHOD_MAX:
 
278
    break;
 
279
  }
 
280
 
 
281
  return 1;
 
282
}
 
283
 
 
284
 
 
285
void write_header(azio_stream *s)
 
286
{
 
287
  char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
 
288
  char *ptr= buffer;
 
289
 
 
290
  s->block_size= AZ_BUFSIZE_WRITE;
 
291
  s->version = (unsigned char)az_magic[1];
 
292
  s->minor_version = (unsigned char)az_magic[2];
 
293
 
 
294
 
 
295
  /* Write a very simple .az header: */
 
296
  memset(buffer, 0, AZHEADER_SIZE + AZMETA_BUFFER_SIZE);
 
297
  *(ptr + AZ_MAGIC_POS)= az_magic[0];
 
298
  *(ptr + AZ_VERSION_POS)= (unsigned char)s->version;
 
299
  *(ptr + AZ_MINOR_VERSION_POS)= (unsigned char)s->minor_version;
 
300
  *(ptr + AZ_BLOCK_POS)= (unsigned char)(s->block_size/1024); /* Reserved for block size */
 
301
  *(ptr + AZ_STRATEGY_POS)= (unsigned char)Z_DEFAULT_STRATEGY; /* Compression Type */
 
302
 
 
303
  int4store(ptr + AZ_FRM_POS, s->frm_start_pos); /* FRM Block */
 
304
  int4store(ptr + AZ_FRM_LENGTH_POS, s->frm_length); /* FRM Block */
 
305
  int4store(ptr + AZ_COMMENT_POS, s->comment_start_pos); /* COMMENT Block */
 
306
  int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */
 
307
  int4store(ptr + AZ_META_POS, 0); /* Meta Block */
 
308
  int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */
 
309
  int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
 
310
  int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
 
311
  int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
 
312
  int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
 
313
  int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */
 
314
  int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */
 
315
  int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */
 
316
  int4store(ptr+ AZ_FRM_POS, 
 
317
            AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */
 
318
  *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */
 
319
 
 
320
  /* Always begin at the begining, and end there as well */
 
321
  my_pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
 
322
            MYF(0));
 
323
}
 
324
 
 
325
/* ===========================================================================
 
326
  Read a byte from a azio_stream; update next_in and avail_in. Return EOF
 
327
  for end of file.
 
328
  IN assertion: the stream s has been sucessfully opened for reading.
 
329
*/
 
330
int get_byte(s)
 
331
  azio_stream *s;
 
332
{
 
333
  if (s->z_eof) return EOF;
 
334
  if (s->stream.avail_in == 0) 
 
335
  {
 
336
    errno = 0;
 
337
    if (s->stream.avail_in == 0) 
 
338
    {
 
339
      s->z_eof = 1;
 
340
      return EOF;
 
341
    }
 
342
    else if (s->stream.avail_in == (uInt) -1)
 
343
    {
 
344
      s->z_eof= 1;
 
345
      s->z_err= Z_ERRNO;
 
346
      return EOF;
 
347
    }
 
348
    s->stream.next_in = s->inbuf;
 
349
  }
 
350
  s->stream.avail_in--;
 
351
  return *(s->stream.next_in)++;
 
352
}
 
353
 
 
354
/* ===========================================================================
 
355
  Check the gzip header of a azio_stream opened for reading.
 
356
  IN assertion: the stream s has already been created sucessfully;
 
357
  s->stream.avail_in is zero for the first time, but may be non-zero
 
358
  for concatenated .gz files.
 
359
*/
 
360
void check_header(azio_stream *s)
 
361
{
 
362
  uInt len;
 
363
 
 
364
  /* Assure two bytes in the buffer so we can peek ahead -- handle case
 
365
    where first byte of header is at the end of the buffer after the last
 
366
    gzip segment */
 
367
  len = s->stream.avail_in;
 
368
  if (len < 2) {
 
369
    if (len) s->inbuf[0] = s->stream.next_in[0];
 
370
    errno = 0;
 
371
    len = (uInt)my_pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos, MYF(0));
 
372
    s->pos+= len;
 
373
    if (len == (uInt)-1) s->z_err = Z_ERRNO;
 
374
    s->stream.avail_in += len;
 
375
    s->stream.next_in = s->inbuf;
 
376
  }
 
377
 
 
378
  /* Now we check the actual header */
 
379
  if ( s->stream.next_in[0] == az_magic[0]  && s->stream.next_in[1] == az_magic[1])
 
380
  {
 
381
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
 
382
 
 
383
    for (len = 0; len < (AZHEADER_SIZE + AZMETA_BUFFER_SIZE); len++)
 
384
      buffer[len]= get_byte(s);
 
385
    s->z_err = s->z_eof ? Z_DATA_ERROR : Z_OK;
 
386
    read_header(s, buffer);
 
387
    for (; len < s->start; len++)
 
388
      get_byte(s);
 
389
  }
 
390
  else
 
391
  {
 
392
    s->z_err = Z_OK;
 
393
 
 
394
    return;
 
395
  }
 
396
}
 
397
 
 
398
void read_header(azio_stream *s, unsigned char *buffer)
 
399
{
 
400
  if (buffer[0] == az_magic[0]  && buffer[1] == az_magic[1])
 
401
  {
 
402
    s->version= (unsigned int)buffer[AZ_VERSION_POS];
 
403
    s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS];
 
404
    s->block_size= 1024 * buffer[AZ_BLOCK_POS];
 
405
    s->start= (size_t)uint8korr(buffer + AZ_START_POS);
 
406
    s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS);
 
407
    s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS);
 
408
    s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS);
 
409
    s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS);
 
410
    s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS);
 
411
    s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS);
 
412
    s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS);
 
413
    s->frm_length= (unsigned int)uint4korr(buffer + AZ_FRM_LENGTH_POS);
 
414
    s->comment_start_pos= (unsigned int)uint4korr(buffer + AZ_COMMENT_POS);
 
415
    s->comment_length= (unsigned int)uint4korr(buffer + AZ_COMMENT_LENGTH_POS);
 
416
    s->dirty= (unsigned int)buffer[AZ_DIRTY_POS];
 
417
  }
 
418
  else
 
419
  {
 
420
    s->z_err = Z_OK;
 
421
    return;
 
422
  }
 
423
}
 
424
 
 
425
/* ===========================================================================
 
426
 * Cleanup then free the given azio_stream. Return a zlib error code.
 
427
 Try freeing in the reverse order of allocations.
 
428
 */
 
429
int destroy (s)
 
430
  azio_stream *s;
 
431
{
 
432
  int err = Z_OK;
 
433
 
 
434
  if (s->stream.state != NULL) 
 
435
  {
 
436
    if (s->mode == 'w') 
 
437
    {
 
438
      err = deflateEnd(&(s->stream));
 
439
      my_sync(s->file, MYF(0));
 
440
    }
 
441
    else if (s->mode == 'r') 
 
442
      err = inflateEnd(&(s->stream));
 
443
  }
 
444
 
 
445
  do_aio_cleanup(s);
 
446
 
 
447
  if (s->file > 0 && my_close(s->file, MYF(0))) 
 
448
      err = Z_ERRNO;
 
449
 
 
450
  s->file= -1;
 
451
 
 
452
  if (s->z_err < 0) err = s->z_err;
 
453
 
 
454
  return err;
 
455
}
 
456
 
 
457
/* ===========================================================================
 
458
  Reads the given number of uncompressed bytes from the compressed file.
 
459
  azread returns the number of bytes actually read (0 for end of file).
 
460
*/
 
461
unsigned int azread_internal( azio_stream *s, voidp buf, unsigned int len, int *error)
 
462
{
 
463
  Bytef *start = (Bytef*)buf; /* starting point for crc computation */
 
464
  Byte  *next_out; /* == stream.next_out but not forced far (for MSDOS) */
 
465
  *error= 0;
 
466
 
 
467
  if (s->mode != 'r')
 
468
  { 
 
469
    *error= Z_STREAM_ERROR;
 
470
    return 0;
 
471
  }
 
472
 
 
473
  if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
 
474
  { 
 
475
    *error= s->z_err;
 
476
    return 0;
 
477
  }
 
478
 
 
479
  if (s->z_err == Z_STREAM_END)  /* EOF */
 
480
  { 
 
481
    return 0;
 
482
  }
 
483
 
 
484
  next_out = (Byte*)buf;
 
485
  s->stream.next_out = (Bytef*)buf;
 
486
  s->stream.avail_out = len;
 
487
 
 
488
  if (s->stream.avail_out && s->back != EOF) {
 
489
    *next_out++ = s->back;
 
490
    s->stream.next_out++;
 
491
    s->stream.avail_out--;
 
492
    s->back = EOF;
 
493
    s->out++;
 
494
    start++;
 
495
    if (s->last) {
 
496
      s->z_err = Z_STREAM_END;
 
497
      { 
 
498
        return 1;
 
499
      }
 
500
    }
 
501
  }
 
502
 
 
503
  while (s->stream.avail_out != 0) {
 
504
 
 
505
    if (s->stream.avail_in == 0 && !s->z_eof) {
 
506
 
 
507
      errno = 0;
 
508
      get_block(s);
 
509
      if (s->stream.avail_in == 0) 
 
510
      {
 
511
        s->z_eof = 1;
 
512
      }
 
513
      s->stream.next_in = (Bytef *)s->inbuf;
 
514
    }
 
515
    s->in += s->stream.avail_in;
 
516
    s->out += s->stream.avail_out;
 
517
    s->z_err = inflate(&(s->stream), Z_NO_FLUSH);
 
518
    s->in -= s->stream.avail_in;
 
519
    s->out -= s->stream.avail_out;
 
520
 
 
521
    if (s->z_err == Z_STREAM_END) {
 
522
      /* Check CRC and original size */
 
523
      s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start));
 
524
      start = s->stream.next_out;
 
525
 
 
526
      if (getLong(s) != s->crc) {
 
527
        s->z_err = Z_DATA_ERROR;
 
528
      } else {
 
529
        (void)getLong(s);
 
530
        /* The uncompressed length returned by above getlong() may be
 
531
         * different from s->out in case of concatenated .gz files.
 
532
         * Check for such files:
 
533
       */
 
534
        check_header(s);
 
535
        if (s->z_err == Z_OK) 
 
536
        {
 
537
          inflateReset(&(s->stream));
 
538
          s->crc = crc32(0L, Z_NULL, 0);
 
539
        }
 
540
      }
 
541
    }
 
542
    if (s->z_err != Z_OK || s->z_eof) break;
 
543
  }
 
544
  s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start));
 
545
 
 
546
  if (len == s->stream.avail_out &&
 
547
      (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO))
 
548
  {
 
549
    *error= s->z_err;
 
550
 
 
551
    return 0;
 
552
  }
 
553
 
 
554
  return (len - s->stream.avail_out);
 
555
}
 
556
 
 
557
/* ===========================================================================
 
558
  Experimental Interface. We abstract out a concecpt of rows 
 
559
*/
 
560
size_t azwrite_row(azio_stream *s, void *buf, unsigned int len)
 
561
{
 
562
  size_t length;
 
563
  /* First we write length */
 
564
  length= azwrite(s, &len, sizeof(unsigned int));
 
565
 
 
566
  if (length != sizeof(unsigned int))
 
567
    return length;
 
568
 
 
569
  /* Now we write the actual data */
 
570
  length= (size_t)azwrite(s, buf, len);
 
571
 
 
572
  if (length > 0)
 
573
    s->rows++;
 
574
 
 
575
  if (len > s->longest_row)
 
576
    s->longest_row= len;
 
577
 
 
578
  if (len < s->shortest_row || !(s->shortest_row))
 
579
    s->shortest_row= len;
 
580
 
 
581
  return length;
 
582
}
 
583
 
 
584
size_t azread_row(azio_stream *s, int *error)
 
585
{
 
586
  unsigned int row_length; /* Currently we are limited to this size for rows */
 
587
  char buffer[sizeof(unsigned int)];
 
588
  char *new_ptr;
 
589
  size_t read;
 
590
 
 
591
  read= azread_internal(s, buffer, sizeof(unsigned int), error);
 
592
  
 
593
  /* On error the return value will be zero as well */
 
594
  if (read == 0)
 
595
    return read;
 
596
  memcpy(&row_length, buffer, sizeof(unsigned int));
 
597
 
 
598
  new_ptr= (char *)realloc(s->row_ptr, (sizeof(char) * row_length));
 
599
 
 
600
  if (!new_ptr)
 
601
    return -1;
 
602
 
 
603
  s->row_ptr= new_ptr;
 
604
 
 
605
  /* TODO We should now adjust the length... */
 
606
  read= azread_internal(s, (voidp)s->row_ptr, row_length, error);
 
607
 
 
608
  return read;
 
609
}
 
610
 
 
611
 
 
612
/* ===========================================================================
 
613
  Writes the given number of uncompressed bytes into the compressed file.
 
614
  azwrite returns the number of bytes actually written (0 in case of error).
 
615
*/
 
616
static unsigned int azwrite(azio_stream *s, void *buf, unsigned int len)
 
617
{
 
618
  s->stream.next_in = (Bytef*)buf;
 
619
  s->stream.avail_in = len;
 
620
 
 
621
  while (s->stream.avail_in != 0) 
 
622
  {
 
623
    if (s->stream.avail_out == 0) 
 
624
    {
 
625
 
 
626
      s->stream.next_out = s->outbuf;
 
627
      if (my_pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos, 
 
628
                   MYF(0)) != AZ_BUFSIZE_WRITE) 
 
629
      {
 
630
        s->z_err = Z_ERRNO;
 
631
        break;
 
632
      }
 
633
      s->pos+= AZ_BUFSIZE_WRITE;
 
634
      s->stream.avail_out = AZ_BUFSIZE_WRITE;
 
635
    }
 
636
    s->in += s->stream.avail_in;
 
637
    s->out += s->stream.avail_out;
 
638
    s->z_err = deflate(&(s->stream), Z_NO_FLUSH);
 
639
    s->in -= s->stream.avail_in;
 
640
    s->out -= s->stream.avail_out;
 
641
    if (s->z_err != Z_OK) break;
 
642
  }
 
643
  s->crc = crc32(s->crc, (const Bytef *)buf, len);
 
644
 
 
645
  return (unsigned int)(len - s->stream.avail_in);
 
646
}
 
647
 
 
648
 
 
649
/* ===========================================================================
 
650
  Flushes all pending output into the compressed file. The parameter
 
651
  flush is as in the deflate() function.
 
652
*/
 
653
int do_flush (azio_stream *s, int flush)
 
654
{
 
655
  uInt len;
 
656
  int done = 0;
 
657
  size_t afterwrite_pos;
 
658
 
 
659
  if (s == NULL || s->mode != 'w') return Z_STREAM_ERROR;
 
660
 
 
661
  s->stream.avail_in = 0; /* should be zero already anyway */
 
662
 
 
663
  for (;;) 
 
664
  {
 
665
    len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
 
666
 
 
667
    if (len != 0) 
 
668
    {
 
669
      if ((uInt)my_pwrite(s->file, (uchar *)s->outbuf, len, s->pos, MYF(0)) != len) 
 
670
      {
 
671
        s->z_err = Z_ERRNO;
 
672
        assert(0);
 
673
        return Z_ERRNO;
 
674
      }
 
675
      s->pos+= len;
 
676
      s->check_point= s->pos;
 
677
      s->stream.next_out = s->outbuf;
 
678
      s->stream.avail_out = AZ_BUFSIZE_WRITE;
 
679
    }
 
680
    if (done) break;
 
681
    s->out += s->stream.avail_out;
 
682
    s->z_err = deflate(&(s->stream), flush);
 
683
    s->out -= s->stream.avail_out;
 
684
 
 
685
    /* Ignore the second of two consecutive flushes: */
 
686
    if (len == 0 && s->z_err == Z_BUF_ERROR) s->z_err = Z_OK;
 
687
 
 
688
    /* deflate has finished flushing only when it hasn't used up
 
689
     * all the available space in the output buffer:
 
690
   */
 
691
    done = (s->stream.avail_out != 0 || s->z_err == Z_STREAM_END);
 
692
 
 
693
    if (s->z_err != Z_OK && s->z_err != Z_STREAM_END) break;
 
694
  }
 
695
 
 
696
  if (flush == Z_FINISH)
 
697
    s->dirty= AZ_STATE_CLEAN; /* Mark it clean, we should be good now */
 
698
  else
 
699
    s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
 
700
 
 
701
  afterwrite_pos= (size_t)my_tell(s->file, MYF(0));
 
702
  write_header(s);
 
703
 
 
704
  return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
 
705
}
 
706
 
 
707
static unsigned int azio_enable_aio(azio_stream *s)
 
708
{
 
709
  VOID(pthread_cond_init(&s->container.threshhold, NULL));
 
710
  VOID(pthread_mutex_init(&s->container.thresh_mutex, NULL));
 
711
  azio_start(s);
 
712
 
 
713
  return 0;
 
714
}
 
715
 
 
716
void azio_disable_aio(azio_stream *s)
 
717
{
 
718
  azio_kill(s);
 
719
 
 
720
  VOID(pthread_mutex_destroy(&s->container.thresh_mutex));
 
721
  VOID(pthread_cond_destroy(&s->container.threshhold));
 
722
 
 
723
  s->method= AZ_METHOD_BLOCK;
 
724
}
 
725
 
 
726
int ZEXPORT azflush (s, flush)
 
727
  azio_stream *s;
 
728
  int flush;
 
729
{
 
730
  int err;
 
731
 
 
732
  if (s->mode == 'r') 
 
733
  {
 
734
    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
 
735
    my_pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
 
736
             MYF(0));
 
737
    read_header(s, buffer); /* skip the .az header */
 
738
    azrewind(s);
 
739
 
 
740
    return Z_OK;
 
741
  }
 
742
  else
 
743
  {
 
744
    s->forced_flushes++;
 
745
    err= do_flush(s, flush);
 
746
 
 
747
    if (err) return err;
 
748
    my_sync(s->file, MYF(0));
 
749
    return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
 
750
  }
 
751
}
 
752
 
 
753
/* ===========================================================================
 
754
  Initiazliaze for reading
 
755
*/
 
756
int azread_init(azio_stream *s)
 
757
{
 
758
  int returnable;
 
759
 
 
760
  /* This will reset any aio reads */
 
761
  returnable= azrewind(s);
 
762
 
 
763
  if (returnable == -1)
 
764
    return returnable;
 
765
 
 
766
  /* Put one in the chamber */
 
767
  if (s->method != AZ_METHOD_BLOCK)
 
768
  {
 
769
    do_aio_cleanup(s);
 
770
    s->container.offset= s->pos;
 
771
    s->container.buffer= (unsigned char *)s->buffer1;
 
772
    azio_read(s);
 
773
    s->aio_inited= 1;
 
774
  }
 
775
 
 
776
 
 
777
  return returnable;
 
778
}
 
779
 
 
780
/* ===========================================================================
 
781
  Rewinds input file.
 
782
*/
 
783
int azrewind (s)
 
784
  azio_stream *s;
 
785
{
 
786
  if (s == NULL || s->mode != 'r') return -1;
 
787
 
 
788
#ifdef AZIO_AIO
 
789
  do_aio_cleanup(s);
 
790
#endif
 
791
  s->z_err = Z_OK;
 
792
  s->z_eof = 0;
 
793
  s->back = EOF;
 
794
  s->stream.avail_in = 0;
 
795
  s->stream.next_in = (Bytef *)s->inbuf;
 
796
  s->crc = crc32(0L, Z_NULL, 0);
 
797
  (void)inflateReset(&s->stream);
 
798
  s->in = 0;
 
799
  s->out = 0;
 
800
  s->aio_inited= 0; /* Reset the AIO reader */
 
801
  s->pos= s->start;
 
802
  return 0;
 
803
}
 
804
 
 
805
/* ===========================================================================
 
806
  Sets the starting position for the next azread or azwrite on the given
 
807
  compressed file. The offset represents a number of bytes in the
 
808
  azseek returns the resulting offset location as measured in bytes from
 
809
  the beginning of the uncompressed stream, or -1 in case of error.
 
810
  SEEK_END is not implemented, returns error.
 
811
  In this version of the library, azseek can be extremely slow.
 
812
*/
 
813
size_t azseek (s, offset, whence)
 
814
  azio_stream *s;
 
815
  size_t offset;
 
816
  int whence;
 
817
{
 
818
 
 
819
  if (s == NULL || whence == SEEK_END ||
 
820
      s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) {
 
821
    return -1L;
 
822
  }
 
823
 
 
824
  if (s->mode == 'w') 
 
825
  {
 
826
    if (whence == SEEK_SET) 
 
827
      offset -= s->in;
 
828
 
 
829
    /* At this point, offset is the number of zero bytes to write. */
 
830
    /* There was a zmemzero here if inbuf was null -Brian */
 
831
    while (offset > 0)  
 
832
    {
 
833
      uInt size = AZ_BUFSIZE_READ;
 
834
      if (offset < AZ_BUFSIZE_READ) size = (uInt)offset;
 
835
 
 
836
      size = azwrite(s, s->inbuf, size);
 
837
      if (size == 0) return -1L;
 
838
 
 
839
      offset -= size;
 
840
    }
 
841
    return s->in;
 
842
  }
 
843
  /* Rest of function is for reading only */
 
844
 
 
845
  /* compute absolute position */
 
846
  if (whence == SEEK_CUR) {
 
847
    offset += s->out;
 
848
  }
 
849
 
 
850
  /* For a negative seek, rewind and use positive seek */
 
851
  if (offset >= s->out) {
 
852
    offset -= s->out;
 
853
  } else if (azrewind(s)) {
 
854
    return -1L;
 
855
  }
 
856
  /* offset is now the number of bytes to skip. */
 
857
 
 
858
  if (offset && s->back != EOF) {
 
859
    s->back = EOF;
 
860
    s->out++;
 
861
    offset--;
 
862
    if (s->last) s->z_err = Z_STREAM_END;
 
863
  }
 
864
  while (offset > 0)  {
 
865
    int error;
 
866
    unsigned int size = AZ_BUFSIZE_WRITE;
 
867
    if (offset < AZ_BUFSIZE_WRITE) size = (int)offset;
 
868
 
 
869
    size = azread_internal(s, s->outbuf, size, &error);
 
870
    if (error < 0) return -1L;
 
871
    offset -= size;
 
872
  }
 
873
  return s->out;
 
874
}
 
875
 
 
876
/* ===========================================================================
 
877
  Returns the starting position for the next azread or azwrite on the
 
878
  given compressed file. This position represents a number of bytes in the
 
879
  uncompressed data stream.
 
880
*/
 
881
size_t ZEXPORT aztell (file)
 
882
  azio_stream *file;
 
883
{
 
884
  return azseek(file, 0L, SEEK_CUR);
 
885
}
 
886
 
 
887
 
 
888
/* ===========================================================================
 
889
  Outputs a long in LSB order to the given file
 
890
*/
 
891
void putLong (azio_stream *s, uLong x)
 
892
{
 
893
  int n;
 
894
  uchar buffer[1];
 
895
 
 
896
  for (n = 0; n < 4; n++) 
 
897
  {
 
898
    buffer[0]= (int)(x & 0xff);
 
899
    my_pwrite(s->file, buffer, 1, s->pos, MYF(0));
 
900
    s->pos++;
 
901
    x >>= 8;
 
902
  }
 
903
}
 
904
 
 
905
/* ===========================================================================
 
906
  Reads a long in LSB order from the given azio_stream. Sets z_err in case
 
907
  of error.
 
908
*/
 
909
uLong getLong (azio_stream *s)
 
910
{
 
911
  uLong x = (uLong)get_byte(s);
 
912
  int c;
 
913
 
 
914
  x += ((uLong)get_byte(s))<<8;
 
915
  x += ((uLong)get_byte(s))<<16;
 
916
  c = get_byte(s);
 
917
  if (c == EOF) s->z_err = Z_DATA_ERROR;
 
918
  x += ((uLong)c)<<24;
 
919
  return x;
 
920
}
 
921
 
 
922
/* ===========================================================================
 
923
  Flushes all pending output if necessary, closes the compressed file
 
924
  and deallocates all the (de)compression state.
 
925
*/
 
926
int azclose (azio_stream *s)
 
927
{
 
928
  int returnable;
 
929
 
 
930
  if (s == NULL) return Z_STREAM_ERROR;
 
931
  
 
932
  if (s->file < 1) return Z_OK;
 
933
 
 
934
  if (s->mode == 'w') 
 
935
  {
 
936
    if (do_flush(s, Z_FINISH) != Z_OK)
 
937
      return destroy(s);
 
938
 
 
939
    putLong(s, s->crc);
 
940
    putLong(s, (uLong)(s->in & 0xffffffff));
 
941
    s->dirty= AZ_STATE_CLEAN;
 
942
    write_header(s);
 
943
  }
 
944
 
 
945
  returnable= destroy(s);
 
946
 
 
947
  switch (s->method)
 
948
  {
 
949
  case AZ_METHOD_AIO:
 
950
    azio_disable_aio(s);
 
951
    break;
 
952
  case AZ_METHOD_BLOCK:
 
953
  case AZ_METHOD_MAX:
 
954
    break;
 
955
  }
 
956
 
 
957
  /* If we allocated memory for row reading, now free it */
 
958
  if (s->row_ptr)
 
959
    free(s->row_ptr);
 
960
 
 
961
  return returnable;
 
962
}
 
963
 
 
964
/*
 
965
  Though this was added to support MySQL's FRM file, anything can be 
 
966
  stored in this location.
 
967
*/
 
968
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
 
969
{
 
970
  if (s->mode == 'r') 
 
971
    return 1;
 
972
 
 
973
  if (s->rows > 0) 
 
974
    return 1;
 
975
 
 
976
  s->frm_start_pos= (uint) s->start;
 
977
  s->frm_length= length;
 
978
  s->start+= length;
 
979
 
 
980
  my_pwrite(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos, MYF(0));
 
981
 
 
982
  write_header(s);
 
983
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
984
 
 
985
  return 0;
 
986
}
 
987
 
 
988
int azread_frm(azio_stream *s, char *blob)
 
989
{
 
990
  my_pread(s->file, (uchar*) blob, s->frm_length, s->frm_start_pos, MYF(0));
 
991
 
 
992
  return 0;
 
993
}
 
994
 
 
995
 
 
996
/*
 
997
  Simple comment field
 
998
*/
 
999
int azwrite_comment(azio_stream *s, char *blob, unsigned int length)
 
1000
{
 
1001
  if (s->mode == 'r') 
 
1002
    return 1;
 
1003
 
 
1004
  if (s->rows > 0) 
 
1005
    return 1;
 
1006
 
 
1007
  s->comment_start_pos= (uint) s->start;
 
1008
  s->comment_length= length;
 
1009
  s->start+= length;
 
1010
 
 
1011
  my_pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos,
 
1012
            MYF(0));
 
1013
 
 
1014
  write_header(s);
 
1015
  s->pos= (size_t)my_seek(s->file, 0, MY_SEEK_END, MYF(0));
 
1016
 
 
1017
  return 0;
 
1018
}
 
1019
 
 
1020
int azread_comment(azio_stream *s, char *blob)
 
1021
{
 
1022
  my_pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos,
 
1023
           MYF(0));
 
1024
 
 
1025
  return 0;
 
1026
}
 
1027
 
 
1028
#ifdef AZIO_AIO
 
1029
static void do_aio_cleanup(azio_stream *s)
 
1030
{
 
1031
  if (s->method == AZ_METHOD_BLOCK)
 
1032
    return;
 
1033
 
 
1034
  azio_ready(s);
 
1035
 
 
1036
}
 
1037
#endif
 
1038
 
 
1039
/* 
 
1040
  Normally all IO goes through azio_read(), but in case of error or non-support
 
1041
  we make use of pread().
 
1042
*/
 
1043
static void get_block(azio_stream *s)
 
1044
{
 
1045
#ifdef AZIO_AIO
 
1046
  if (s->method == AZ_METHOD_AIO && s->mode == 'r' 
 
1047
      && s->pos < s->check_point
 
1048
      && s->aio_inited)
 
1049
  {
 
1050
    azio_ready(s);
 
1051
    s->stream.avail_in= (unsigned int)azio_return(s);
 
1052
    if ((int)(s->stream.avail_in) == -1)
 
1053
      goto use_pread;
 
1054
    else if ((int)(s->stream.avail_in) == 0)
 
1055
    {
 
1056
      s->aio_inited= 0;
 
1057
      return;
 
1058
    }
 
1059
    s->pos+= s->stream.avail_in;
 
1060
    s->inbuf= (Byte *)s->container.buffer;
 
1061
    /* We only aio_read when we know there is more data to be read */
 
1062
    if (s->pos >= s->check_point)
 
1063
    {
 
1064
      s->aio_inited= 0;
 
1065
      return;
 
1066
    }
 
1067
    s->container.buffer= (s->container.buffer == s->buffer2) ? s->buffer1 : s->buffer2;
 
1068
    s->container.offset= s->pos;
 
1069
    azio_read(s);
 
1070
  }
 
1071
  else
 
1072
#endif
 
1073
  {
 
1074
#ifdef AZIO_AIO
 
1075
use_pread:
 
1076
#endif
 
1077
    s->stream.avail_in = (uInt)my_pread(s->file, (uchar *)s->inbuf, AZ_BUFSIZE_READ, s->pos, MYF(0));
 
1078
    s->pos+= s->stream.avail_in;
 
1079
  }
 
1080
}