~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/archive/ha_archive.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
  This program is free software; you can redistribute it and/or modify
 
4
  it under the terms of the GNU General Public License as published by
 
5
  the Free Software Foundation; version 2 of the License.
 
6
 
 
7
  This program is distributed in the hope that it will be useful,
 
8
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
  GNU General Public License for more details.
 
11
 
 
12
  You should have received a copy of the GNU General Public License
 
13
  along with this program; if not, write to the Free Software
 
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#ifdef USE_PRAGMA_IMPLEMENTATION
 
17
#pragma implementation        // gcc: Class implementation
 
18
#endif
 
19
 
 
20
#include "mysql_priv.h"
 
21
#include <myisam.h>
 
22
 
 
23
#include "ha_archive.h"
 
24
#include <my_dir.h>
 
25
 
 
26
#include <mysql/plugin.h>
 
27
 
 
28
/*
 
29
  First, if you want to understand storage engines you should look at 
 
30
  ha_example.cc and ha_example.h. 
 
31
 
 
32
  This example was written as a test case for a customer who needed
 
33
  a storage engine without indexes that could compress data very well.
 
34
  So, welcome to a completely compressed storage engine. This storage
 
35
  engine only does inserts. No replace, deletes, or updates. All reads are 
 
36
  complete table scans. Compression is done through a combination of packing
 
37
  and making use of the zlib library
 
38
  
 
39
  We keep a file pointer open for each instance of ha_archive for each read
 
40
  but for writes we keep one open file handle just for that. We flush it
 
41
  only if we have a read occur. azip handles compressing lots of records
 
42
  at once much better then doing lots of little records between writes.
 
43
  It is possible to not lock on writes but this would then mean we couldn't
 
44
  handle bulk inserts as well (that is if someone was trying to read at
 
45
  the same time since we would want to flush).
 
46
 
 
47
  A "meta" file is kept alongside the data file. This file serves two purpose.
 
48
  The first purpose is to track the number of rows in the table. The second 
 
49
  purpose is to determine if the table was closed properly or not. When the 
 
50
  meta file is first opened it is marked as dirty. It is opened when the table 
 
51
  itself is opened for writing. When the table is closed the new count for rows 
 
52
  is written to the meta file and the file is marked as clean. If the meta file 
 
53
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
 
54
  this point an error occurs and the user is told to rebuild the file.
 
55
  A rebuild scans the rows and rewrites the meta file. If corruption is found
 
56
  in the data file then the meta file is not repaired.
 
57
 
 
58
  At some point a recovery method for such a drastic case needs to be divised.
 
59
 
 
60
  Locks are row level, and you will get a consistant read. 
 
61
 
 
62
  For performance as far as table scans go it is quite fast. I don't have
 
63
  good numbers but locally it has out performed both Innodb and MyISAM. For
 
64
  Innodb the question will be if the table can be fit into the buffer
 
65
  pool. For MyISAM its a question of how much the file system caches the
 
66
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
 
67
  doesn't have enough memory to cache entire table that archive turns out 
 
68
  to be any faster. 
 
69
 
 
70
  Examples between MyISAM (packed) and Archive.
 
71
 
 
72
  Table with 76695844 identical rows:
 
73
  29680807 a_archive.ARZ
 
74
  920350317 a.MYD
 
75
 
 
76
 
 
77
  Table with 8991478 rows (all of Slashdot's comments):
 
78
  1922964506 comment_archive.ARZ
 
79
  2944970297 comment_text.MYD
 
80
 
 
81
 
 
82
  TODO:
 
83
   Allow users to set compression level.
 
84
   Allow adjustable block size.
 
85
   Implement versioning, should be easy.
 
86
   Allow for errors, find a way to mark bad rows.
 
87
   Add optional feature so that rows can be flushed at interval (which will cause less
 
88
     compression but may speed up ordered searches).
 
89
   Checkpoint the meta file to allow for faster rebuilds.
 
90
   Option to allow for dirty reads, this would lower the sync calls, which would make
 
91
     inserts a lot faster, but would mean highly arbitrary reads.
 
92
 
 
93
    -Brian
 
94
*/
 
95
 
 
96
/* Variables for archive share methods */
 
97
pthread_mutex_t archive_mutex;
 
98
static HASH archive_open_tables;
 
99
static unsigned int global_version;
 
100
 
 
101
/* The file extension */
 
102
#define ARZ ".ARZ"               // The data file
 
103
#define ARN ".ARN"               // Files used during an optimize call
 
104
#define ARM ".ARM"               // Meta file (deprecated)
 
105
 
 
106
/*
 
107
  uchar + uchar
 
108
*/
 
109
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
 
110
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
 
111
 
 
112
/* Static declarations for handerton */
 
113
static handler *archive_create_handler(handlerton *hton, 
 
114
                                       TABLE_SHARE *table, 
 
115
                                       MEM_ROOT *mem_root);
 
116
int archive_discover(handlerton *hton, THD* thd, const char *db, 
 
117
                     const char *name,
 
118
                     uchar **frmblob, 
 
119
                     size_t *frmlen);
 
120
 
 
121
static my_bool archive_use_aio= FALSE;
 
122
 
 
123
/*
 
124
  Number of rows that will force a bulk insert.
 
125
*/
 
126
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
 
127
 
 
128
/*
 
129
  Size of header used for row
 
130
*/
 
131
#define ARCHIVE_ROW_HEADER_SIZE 4
 
132
 
 
133
static handler *archive_create_handler(handlerton *hton,
 
134
                                       TABLE_SHARE *table, 
 
135
                                       MEM_ROOT *mem_root)
 
136
{
 
137
  return new (mem_root) ha_archive(hton, table);
 
138
}
 
139
 
 
140
/*
 
141
  Used for hash table that tracks open tables.
 
142
*/
 
143
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
 
144
                             my_bool not_used __attribute__((unused)))
 
145
{
 
146
  *length=share->table_name_length;
 
147
  return (uchar*) share->table_name;
 
148
}
 
149
 
 
150
 
 
151
/*
 
152
  Initialize the archive handler.
 
153
 
 
154
  SYNOPSIS
 
155
    archive_db_init()
 
156
    void *
 
157
 
 
158
  RETURN
 
159
    FALSE       OK
 
160
    TRUE        Error
 
161
*/
 
162
 
 
163
int archive_db_init(void *p)
 
164
{
 
165
  DBUG_ENTER("archive_db_init");
 
166
  handlerton *archive_hton;
 
167
 
 
168
  archive_hton= (handlerton *)p;
 
169
  archive_hton->state= SHOW_OPTION_YES;
 
170
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
 
171
  archive_hton->create= archive_create_handler;
 
172
  archive_hton->flags= HTON_NO_FLAGS;
 
173
  archive_hton->discover= archive_discover;
 
174
 
 
175
  /* When the engine starts up set the first version */
 
176
  global_version= 1;
 
177
 
 
178
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
 
179
    goto error;
 
180
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
 
181
                (hash_get_key) archive_get_key, 0, 0))
 
182
  {
 
183
    VOID(pthread_mutex_destroy(&archive_mutex));
 
184
  }
 
185
  else
 
186
  {
 
187
    DBUG_RETURN(FALSE);
 
188
  }
 
189
error:
 
190
  DBUG_RETURN(TRUE);
 
191
}
 
192
 
 
193
/*
 
194
  Release the archive handler.
 
195
 
 
196
  SYNOPSIS
 
197
    archive_db_done()
 
198
    void
 
199
 
 
200
  RETURN
 
201
    FALSE       OK
 
202
*/
 
203
 
 
204
int archive_db_done(void *p)
 
205
{
 
206
  hash_free(&archive_open_tables);
 
207
  VOID(pthread_mutex_destroy(&archive_mutex));
 
208
 
 
209
  return 0;
 
210
}
 
211
 
 
212
 
 
213
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
 
214
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
 
215
{
 
216
  /* Set our original buffer from pre-allocated memory */
 
217
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
 
218
 
 
219
  /* The size of the offset value we will use for position() */
 
220
  ref_length= sizeof(my_off_t);
 
221
  archive_reader_open= FALSE;
 
222
}
 
223
 
 
224
int archive_discover(handlerton *hton, THD* thd, const char *db, 
 
225
                     const char *name,
 
226
                     uchar **frmblob, 
 
227
                     size_t *frmlen)
 
228
{
 
229
  DBUG_ENTER("archive_discover");
 
230
  DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name)); 
 
231
  azio_stream frm_stream;
 
232
  char az_file[FN_REFLEN];
 
233
  char *frm_ptr;
 
234
  MY_STAT file_stat; 
 
235
 
 
236
  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
237
 
 
238
  if (!(my_stat(az_file, &file_stat, MYF(0))))
 
239
    goto err;
 
240
 
 
241
  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
 
242
  {
 
243
    if (errno == EROFS || errno == EACCES)
 
244
      DBUG_RETURN(my_errno= errno);
 
245
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
246
  }
 
247
 
 
248
  if (frm_stream.frm_length == 0)
 
249
    goto err;
 
250
 
 
251
  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
 
252
  azread_frm(&frm_stream, frm_ptr);
 
253
  azclose(&frm_stream);
 
254
 
 
255
  *frmlen= frm_stream.frm_length;
 
256
  *frmblob= (uchar*) frm_ptr;
 
257
 
 
258
  DBUG_RETURN(0);
 
259
err:
 
260
  my_errno= 0;
 
261
  DBUG_RETURN(1);
 
262
}
 
263
 
 
264
/*
 
265
  This method reads the header of a datafile and returns whether or not it was successful.
 
266
*/
 
267
int ha_archive::read_data_header(azio_stream *file_to_read)
 
268
{
 
269
  DBUG_ENTER("ha_archive::read_data_header");
 
270
 
 
271
  if (azread_init(file_to_read) == -1)
 
272
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
273
 
 
274
  if (file_to_read->version >= 3)
 
275
    DBUG_RETURN(0);
 
276
 
 
277
  DBUG_RETURN(1);
 
278
}
 
279
 
 
280
 
 
281
/*
 
282
  We create the shared memory space that we will use for the open table. 
 
283
  No matter what we try to get or create a share. This is so that a repair
 
284
  table operation can occur. 
 
285
 
 
286
  See ha_example.cc for a longer description.
 
287
*/
 
288
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
 
289
{
 
290
  uint length;
 
291
  DBUG_ENTER("ha_archive::get_share");
 
292
 
 
293
  pthread_mutex_lock(&archive_mutex);
 
294
  length=(uint) strlen(table_name);
 
295
 
 
296
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
 
297
                                           (uchar*) table_name,
 
298
                                           length)))
 
299
  {
 
300
    char *tmp_name;
 
301
    azio_stream archive_tmp;
 
302
 
 
303
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
304
                          &share, sizeof(*share),
 
305
                          &tmp_name, length+1,
 
306
                          NullS)) 
 
307
    {
 
308
      pthread_mutex_unlock(&archive_mutex);
 
309
      *rc= HA_ERR_OUT_OF_MEM;
 
310
      DBUG_RETURN(NULL);
 
311
    }
 
312
 
 
313
    share->use_count= 0;
 
314
    share->table_name_length= length;
 
315
    share->table_name= tmp_name;
 
316
    share->crashed= FALSE;
 
317
    share->archive_write_open= FALSE;
 
318
    fn_format(share->data_file_name, table_name, "",
 
319
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
320
    strmov(share->table_name, table_name);
 
321
    DBUG_PRINT("ha_archive", ("Data File %s", 
 
322
                        share->data_file_name));
 
323
    /*
 
324
      We will use this lock for rows.
 
325
    */
 
326
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
 
327
    
 
328
    /*
 
329
      We read the meta file, but do not mark it dirty. Since we are not
 
330
      doing a write we won't mark it dirty (and we won't open it for
 
331
      anything but reading... open it for write and we will generate null
 
332
      compression writes).
 
333
    */
 
334
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
 
335
                 AZ_METHOD_BLOCK)))
 
336
    {
 
337
      VOID(pthread_mutex_destroy(&share->mutex));
 
338
      free(share);
 
339
      pthread_mutex_unlock(&archive_mutex);
 
340
      *rc= HA_ERR_CRASHED_ON_REPAIR;
 
341
      DBUG_RETURN(NULL);
 
342
    }
 
343
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
 
344
    share->rows_recorded= (ha_rows)archive_tmp.rows;
 
345
    share->crashed= archive_tmp.dirty;
 
346
    if (share->version < global_version)
 
347
    {
 
348
      share->version_rows= share->rows_recorded;
 
349
      share->version= global_version;
 
350
    }
 
351
    azclose(&archive_tmp);
 
352
 
 
353
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
 
354
    thr_lock_init(&share->lock);
 
355
  }
 
356
  share->use_count++;
 
357
  DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now", 
 
358
                      share->table_name_length, share->table_name,
 
359
                      share->use_count));
 
360
  if (share->crashed)
 
361
    *rc= HA_ERR_CRASHED_ON_USAGE;
 
362
  pthread_mutex_unlock(&archive_mutex);
 
363
 
 
364
  DBUG_RETURN(share);
 
365
}
 
366
 
 
367
 
 
368
/* 
 
369
  Free the share.
 
370
  See ha_example.cc for a description.
 
371
*/
 
372
int ha_archive::free_share()
 
373
{
 
374
  int rc= 0;
 
375
  DBUG_ENTER("ha_archive::free_share");
 
376
  DBUG_PRINT("ha_archive",
 
377
             ("archive table %.*s has %d open handles on entrance", 
 
378
              share->table_name_length, share->table_name,
 
379
              share->use_count));
 
380
 
 
381
  pthread_mutex_lock(&archive_mutex);
 
382
  if (!--share->use_count)
 
383
  {
 
384
    hash_delete(&archive_open_tables, (uchar*) share);
 
385
    thr_lock_delete(&share->lock);
 
386
    VOID(pthread_mutex_destroy(&share->mutex));
 
387
    /* 
 
388
      We need to make sure we don't reset the crashed state.
 
389
      If we open a crashed file, wee need to close it as crashed unless
 
390
      it has been repaired.
 
391
      Since we will close the data down after this, we go on and count
 
392
      the flush on close;
 
393
    */
 
394
    if (share->archive_write_open == TRUE)
 
395
    {
 
396
      if (azclose(&(share->archive_write)))
 
397
        rc= 1;
 
398
    }
 
399
    my_free((uchar*) share, MYF(0));
 
400
  }
 
401
  pthread_mutex_unlock(&archive_mutex);
 
402
 
 
403
  DBUG_RETURN(rc);
 
404
}
 
405
 
 
406
int ha_archive::init_archive_writer()
 
407
{
 
408
  DBUG_ENTER("ha_archive::init_archive_writer");
 
409
  /* 
 
410
    It is expensive to open and close the data files and since you can't have
 
411
    a gzip file that can be both read and written we keep a writer open
 
412
    that is shared amoung all open tables.
 
413
  */
 
414
  if (!(azopen(&(share->archive_write), share->data_file_name, 
 
415
               O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
 
416
  {
 
417
    DBUG_PRINT("ha_archive", ("Could not open archive write file"));
 
418
    share->crashed= TRUE;
 
419
    DBUG_RETURN(1);
 
420
  }
 
421
  share->archive_write_open= TRUE;
 
422
 
 
423
  DBUG_RETURN(0);
 
424
}
 
425
 
 
426
 
 
427
/* 
 
428
  No locks are required because it is associated with just one handler instance
 
429
*/
 
430
int ha_archive::init_archive_reader()
 
431
{
 
432
  DBUG_ENTER("ha_archive::init_archive_reader");
 
433
  /* 
 
434
    It is expensive to open and close the data files and since you can't have
 
435
    a gzip file that can be both read and written we keep a writer open
 
436
    that is shared amoung all open tables.
 
437
  */
 
438
  if (archive_reader_open == FALSE)
 
439
  {
 
440
    az_method method;
 
441
 
 
442
    switch (archive_use_aio)
 
443
    {
 
444
    case FALSE:
 
445
      method= AZ_METHOD_BLOCK;
 
446
      break;
 
447
    case TRUE:
 
448
      method= AZ_METHOD_AIO;
 
449
      break;
 
450
    default:
 
451
      method= AZ_METHOD_BLOCK;
 
452
    }
 
453
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY, 
 
454
                 method)))
 
455
    {
 
456
      DBUG_PRINT("ha_archive", ("Could not open archive read file"));
 
457
      share->crashed= TRUE;
 
458
      DBUG_RETURN(1);
 
459
    }
 
460
    archive_reader_open= TRUE;
 
461
  }
 
462
 
 
463
  DBUG_RETURN(0);
 
464
}
 
465
 
 
466
 
 
467
/*
 
468
  We just implement one additional file extension.
 
469
*/
 
470
static const char *ha_archive_exts[] = {
 
471
  ARZ,
 
472
  NullS
 
473
};
 
474
 
 
475
const char **ha_archive::bas_ext() const
 
476
{
 
477
  return ha_archive_exts;
 
478
}
 
479
 
 
480
 
 
481
/* 
 
482
  When opening a file we:
 
483
  Create/get our shared structure.
 
484
  Init out lock.
 
485
  We open the file we will read from.
 
486
*/
 
487
int ha_archive::open(const char *name, int mode, uint open_options)
 
488
{
 
489
  int rc= 0;
 
490
  DBUG_ENTER("ha_archive::open");
 
491
 
 
492
  DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", 
 
493
                      (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
 
494
  share= get_share(name, &rc);
 
495
 
 
496
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
 
497
  {
 
498
    /* purecov: begin inspected */
 
499
    free_share();
 
500
    DBUG_RETURN(rc);
 
501
    /* purecov: end */    
 
502
  }
 
503
  else if (rc == HA_ERR_OUT_OF_MEM)
 
504
  {
 
505
    DBUG_RETURN(rc);
 
506
  }
 
507
 
 
508
  DBUG_ASSERT(share);
 
509
 
 
510
  record_buffer= create_record_buffer(table->s->reclength + 
 
511
                                      ARCHIVE_ROW_HEADER_SIZE);
 
512
 
 
513
  if (!record_buffer)
 
514
  {
 
515
    free_share();
 
516
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
517
  }
 
518
 
 
519
  thr_lock_data_init(&share->lock, &lock, NULL);
 
520
 
 
521
  DBUG_PRINT("ha_archive", ("archive table was crashed %s", 
 
522
                      rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
 
523
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
 
524
  {
 
525
    DBUG_RETURN(0);
 
526
  }
 
527
  else
 
528
    DBUG_RETURN(rc);
 
529
}
 
530
 
 
531
 
 
532
/*
 
533
  Closes the file.
 
534
 
 
535
  SYNOPSIS
 
536
    close();
 
537
  
 
538
  IMPLEMENTATION:
 
539
 
 
540
  We first close this storage engines file handle to the archive and
 
541
  then remove our reference count to the table (and possibly free it
 
542
  as well).
 
543
 
 
544
  RETURN
 
545
    0  ok
 
546
    1  Error
 
547
*/
 
548
 
 
549
int ha_archive::close(void)
 
550
{
 
551
  int rc= 0;
 
552
  DBUG_ENTER("ha_archive::close");
 
553
 
 
554
  destroy_record_buffer(record_buffer);
 
555
 
 
556
  /* First close stream */
 
557
  if (archive_reader_open == TRUE)
 
558
  {
 
559
    if (azclose(&archive))
 
560
      rc= 1;
 
561
  }
 
562
  /* then also close share */
 
563
  rc|= free_share();
 
564
 
 
565
  DBUG_RETURN(rc);
 
566
}
 
567
 
 
568
 
 
569
/*
 
570
  We create our data file here. The format is pretty simple. 
 
571
  You can read about the format of the data file above.
 
572
  Unlike other storage engines we do not "pack" our data. Since we 
 
573
  are about to do a general compression, packing would just be a waste of 
 
574
  CPU time. If the table has blobs they are written after the row in the order 
 
575
  of creation.
 
576
*/
 
577
 
 
578
int ha_archive::create(const char *name, TABLE *table_arg,
 
579
                       HA_CREATE_INFO *create_info)
 
580
{
 
581
  char name_buff[FN_REFLEN];
 
582
  char linkname[FN_REFLEN];
 
583
  int error;
 
584
  azio_stream create_stream;            /* Archive file we are working with */
 
585
  File frm_file;                   /* File handler for readers */
 
586
  MY_STAT file_stat;  // Stat information for the data file
 
587
  uchar *frm_ptr;
 
588
 
 
589
  DBUG_ENTER("ha_archive::create");
 
590
 
 
591
  stats.auto_increment_value= create_info->auto_increment_value;
 
592
 
 
593
  for (uint key= 0; key < table_arg->s->keys; key++)
 
594
  {
 
595
    KEY *pos= table_arg->key_info+key;
 
596
    KEY_PART_INFO *key_part=     pos->key_part;
 
597
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
 
598
 
 
599
    for (; key_part != key_part_end; key_part++)
 
600
    {
 
601
      Field *field= key_part->field;
 
602
 
 
603
      if (!(field->flags & AUTO_INCREMENT_FLAG))
 
604
      {
 
605
        error= -1;
 
606
        DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
 
607
        goto error;
 
608
      }
 
609
    }
 
610
  }
 
611
 
 
612
  /* 
 
613
    We reuse name_buff since it is available.
 
614
  */
 
615
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
 
616
  {
 
617
    DBUG_PRINT("ha_archive", ("archive will create stream file %s", 
 
618
                        create_info->data_file_name));
 
619
                        
 
620
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
 
621
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
622
    fn_format(linkname, name, "", ARZ,
 
623
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
624
  }
 
625
  else
 
626
  {
 
627
    fn_format(name_buff, name, "", ARZ,
 
628
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
629
    linkname[0]= 0;
 
630
  }
 
631
 
 
632
  /*
 
633
    There is a chance that the file was "discovered". In this case
 
634
    just use whatever file is there.
 
635
  */
 
636
  if (!(my_stat(name_buff, &file_stat, MYF(0))))
 
637
  {
 
638
    my_errno= 0;
 
639
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
 
640
                 AZ_METHOD_BLOCK)))
 
641
    {
 
642
      error= errno;
 
643
      goto error2;
 
644
    }
 
645
 
 
646
    if (linkname[0])
 
647
      my_symlink(name_buff, linkname, MYF(0));
 
648
    fn_format(name_buff, name, "", ".frm",
 
649
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
650
 
 
651
    /*
 
652
      Here is where we open up the frm and pass it to archive to store 
 
653
    */
 
654
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
 
655
    {
 
656
      if (!my_fstat(frm_file, &file_stat, MYF(MY_WME)))
 
657
      {
 
658
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
 
659
        if (frm_ptr)
 
660
        {
 
661
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
 
662
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
 
663
          my_free((uchar*)frm_ptr, MYF(0));
 
664
        }
 
665
      }
 
666
      my_close(frm_file, MYF(0));
 
667
    }
 
668
 
 
669
    if (create_info->comment.str)
 
670
      azwrite_comment(&create_stream, create_info->comment.str, 
 
671
                      (unsigned int)create_info->comment.length);
 
672
 
 
673
    /* 
 
674
      Yes you need to do this, because the starting value 
 
675
      for the autoincrement may not be zero.
 
676
    */
 
677
    create_stream.auto_increment= stats.auto_increment_value ?
 
678
                                    stats.auto_increment_value - 1 : 0;
 
679
    if (azclose(&create_stream))
 
680
    {
 
681
      error= errno;
 
682
      goto error2;
 
683
    }
 
684
  }
 
685
  else
 
686
    my_errno= 0;
 
687
 
 
688
  DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
 
689
  DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
 
690
 
 
691
 
 
692
  DBUG_RETURN(0);
 
693
 
 
694
error2:
 
695
  delete_table(name);
 
696
error:
 
697
  /* Return error number, if we got one */
 
698
  DBUG_RETURN(error ? error : -1);
 
699
}
 
700
 
 
701
/*
 
702
  This is where the actual row is written out.
 
703
*/
 
704
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
 
705
{
 
706
  my_off_t written;
 
707
  unsigned int r_pack_length;
 
708
  DBUG_ENTER("ha_archive::real_write_row");
 
709
 
 
710
  /* We pack the row for writing */
 
711
  r_pack_length= pack_row(buf);
 
712
 
 
713
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
 
714
  if (written != r_pack_length)
 
715
  {
 
716
    DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", 
 
717
                                              (uint32) written, 
 
718
                                              (uint32)r_pack_length));
 
719
    DBUG_RETURN(-1);
 
720
  }
 
721
 
 
722
  if (!delayed_insert || !bulk_insert)
 
723
    share->dirty= TRUE;
 
724
 
 
725
  DBUG_RETURN(0);
 
726
}
 
727
 
 
728
 
 
729
/* 
 
730
  Calculate max length needed for row. This includes
 
731
  the bytes required for the length in the header.
 
732
*/
 
733
 
 
734
uint32 ha_archive::max_row_length(const uchar *buf)
 
735
{
 
736
  uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
 
737
  length+= ARCHIVE_ROW_HEADER_SIZE;
 
738
 
 
739
  uint *ptr, *end;
 
740
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
 
741
       ptr != end ;
 
742
       ptr++)
 
743
  {
 
744
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
 
745
  }
 
746
 
 
747
  return length;
 
748
}
 
749
 
 
750
 
 
751
unsigned int ha_archive::pack_row(uchar *record)
 
752
{
 
753
  uchar *ptr;
 
754
 
 
755
  DBUG_ENTER("ha_archive::pack_row");
 
756
 
 
757
 
 
758
  if (fix_rec_buff(max_row_length(record)))
 
759
    DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
 
760
 
 
761
  /* Copy null bits */
 
762
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
 
763
  ptr= record_buffer->buffer + table->s->null_bytes;
 
764
 
 
765
  for (Field **field=table->field ; *field ; field++)
 
766
  {
 
767
    if (!((*field)->is_null()))
 
768
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
 
769
  }
 
770
 
 
771
  DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
 
772
                           (ptr - record_buffer->buffer - 
 
773
                             ARCHIVE_ROW_HEADER_SIZE)));
 
774
 
 
775
  DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
 
776
}
 
777
 
 
778
 
 
779
/* 
 
780
  Look at ha_archive::open() for an explanation of the row format.
 
781
  Here we just write out the row.
 
782
 
 
783
  Wondering about start_bulk_insert()? We don't implement it for
 
784
  archive since it optimizes for lots of writes. The only save
 
785
  for implementing start_bulk_insert() is that we could skip 
 
786
  setting dirty to true each time.
 
787
*/
 
788
int ha_archive::write_row(uchar *buf)
 
789
{
 
790
  int rc;
 
791
  uchar *read_buf= NULL;
 
792
  uint64_t temp_auto;
 
793
  uchar *record=  table->record[0];
 
794
  DBUG_ENTER("ha_archive::write_row");
 
795
 
 
796
  if (share->crashed)
 
797
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
798
 
 
799
  ha_statistic_increment(&SSV::ha_write_count);
 
800
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
 
801
    table->timestamp_field->set_time();
 
802
  pthread_mutex_lock(&share->mutex);
 
803
 
 
804
  if (share->archive_write_open == FALSE)
 
805
    if (init_archive_writer())
 
806
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
807
 
 
808
 
 
809
  if (table->next_number_field && record == table->record[0])
 
810
  {
 
811
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
 
812
    update_auto_increment();
 
813
    temp_auto= table->next_number_field->val_int();
 
814
 
 
815
    /*
 
816
      We don't support decremening auto_increment. They make the performance
 
817
      just cry.
 
818
    */
 
819
    if (temp_auto <= share->archive_write.auto_increment && 
 
820
        mkey->flags & HA_NOSAME)
 
821
    {
 
822
      rc= HA_ERR_FOUND_DUPP_KEY;
 
823
      goto error;
 
824
    }
 
825
#ifdef DEAD_CODE
 
826
    /*
 
827
      Bad news, this will cause a search for the unique value which is very 
 
828
      expensive since we will have to do a table scan which will lock up 
 
829
      all other writers during this period. This could perhaps be optimized 
 
830
      in the future.
 
831
    */
 
832
    {
 
833
      /* 
 
834
        First we create a buffer that we can use for reading rows, and can pass
 
835
        to get_row().
 
836
      */
 
837
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
 
838
      {
 
839
        rc= HA_ERR_OUT_OF_MEM;
 
840
        goto error;
 
841
      }
 
842
       /* 
 
843
         All of the buffer must be written out or we won't see all of the
 
844
         data 
 
845
       */
 
846
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
847
      /*
 
848
        Set the position of the local read thread to the beginning postion.
 
849
      */
 
850
      if (read_data_header(&archive))
 
851
      {
 
852
        rc= HA_ERR_CRASHED_ON_USAGE;
 
853
        goto error;
 
854
      }
 
855
 
 
856
      Field *mfield= table->next_number_field;
 
857
 
 
858
      while (!(get_row(&archive, read_buf)))
 
859
      {
 
860
        if (!memcmp(read_buf + mfield->offset(record),
 
861
                    table->next_number_field->ptr,
 
862
                    mfield->max_display_length()))
 
863
        {
 
864
          rc= HA_ERR_FOUND_DUPP_KEY;
 
865
          goto error;
 
866
        }
 
867
      }
 
868
    }
 
869
#endif
 
870
    else
 
871
    {
 
872
      if (temp_auto > share->archive_write.auto_increment)
 
873
        stats.auto_increment_value=
 
874
          (share->archive_write.auto_increment= temp_auto) + 1;
 
875
    }
 
876
  }
 
877
 
 
878
  /*
 
879
    Notice that the global auto_increment has been increased.
 
880
    In case of a failed row write, we will never try to reuse the value.
 
881
  */
 
882
  share->rows_recorded++;
 
883
  rc= real_write_row(buf,  &(share->archive_write));
 
884
error:
 
885
  pthread_mutex_unlock(&share->mutex);
 
886
  if (read_buf)
 
887
    my_free((uchar*) read_buf, MYF(0));
 
888
 
 
889
  DBUG_RETURN(rc);
 
890
}
 
891
 
 
892
 
 
893
void ha_archive::get_auto_increment(uint64_t offset, uint64_t increment,
 
894
                                    uint64_t nb_desired_values,
 
895
                                    uint64_t *first_value,
 
896
                                    uint64_t *nb_reserved_values)
 
897
{
 
898
  *nb_reserved_values= ULONGLONG_MAX;
 
899
  *first_value= share->archive_write.auto_increment + 1;
 
900
}
 
901
 
 
902
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
 
903
int ha_archive::index_init(uint keynr, bool sorted)
 
904
{
 
905
  DBUG_ENTER("ha_archive::index_init");
 
906
  active_index= keynr;
 
907
  DBUG_RETURN(0);
 
908
}
 
909
 
 
910
 
 
911
/*
 
912
  No indexes, so if we get a request for an index search since we tell
 
913
  the optimizer that we have unique indexes, we scan
 
914
*/
 
915
int ha_archive::index_read(uchar *buf, const uchar *key,
 
916
                             uint key_len, enum ha_rkey_function find_flag)
 
917
{
 
918
  int rc;
 
919
  DBUG_ENTER("ha_archive::index_read");
 
920
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
 
921
  DBUG_RETURN(rc);
 
922
}
 
923
 
 
924
 
 
925
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
 
926
                                 uint key_len, enum ha_rkey_function find_flag)
 
927
{
 
928
  int rc;
 
929
  bool found= 0;
 
930
  KEY *mkey= &table->s->key_info[index];
 
931
  current_k_offset= mkey->key_part->offset;
 
932
  current_key= key;
 
933
  current_key_len= key_len;
 
934
 
 
935
 
 
936
  DBUG_ENTER("ha_archive::index_read_idx");
 
937
 
 
938
  rc= rnd_init(TRUE);
 
939
 
 
940
  if (rc)
 
941
    goto error;
 
942
 
 
943
  while (!(get_row(&archive, buf)))
 
944
  {
 
945
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
 
946
    {
 
947
      found= 1;
 
948
      break;
 
949
    }
 
950
  }
 
951
 
 
952
  if (found)
 
953
    DBUG_RETURN(0);
 
954
 
 
955
error:
 
956
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
 
957
}
 
958
 
 
959
 
 
960
int ha_archive::index_next(uchar * buf) 
 
961
 
962
  bool found= 0;
 
963
 
 
964
  DBUG_ENTER("ha_archive::index_next");
 
965
 
 
966
  while (!(get_row(&archive, buf)))
 
967
  {
 
968
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
 
969
    {
 
970
      found= 1;
 
971
      break;
 
972
    }
 
973
  }
 
974
 
 
975
  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
 
976
}
 
977
 
 
978
/*
 
979
  All calls that need to scan the table start with this method. If we are told
 
980
  that it is a table scan we rewind the file to the beginning, otherwise
 
981
  we assume the position will be set.
 
982
*/
 
983
 
 
984
int ha_archive::rnd_init(bool scan)
 
985
{
 
986
  DBUG_ENTER("ha_archive::rnd_init");
 
987
  
 
988
  if (share->crashed)
 
989
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
990
 
 
991
  init_archive_reader();
 
992
 
 
993
  /* We rewind the file so that we can read from the beginning if scan */
 
994
  if (scan)
 
995
  {
 
996
    DBUG_PRINT("info", ("archive will retrieve %llu rows", 
 
997
                        (unsigned long long) scan_rows));
 
998
 
 
999
    if (read_data_header(&archive))
 
1000
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1001
  }
 
1002
 
 
1003
  DBUG_RETURN(0);
 
1004
}
 
1005
 
 
1006
 
 
1007
/*
 
1008
  This is the method that is used to read a row. It assumes that the row is 
 
1009
  positioned where you want it.
 
1010
*/
 
1011
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
 
1012
{
 
1013
  int rc;
 
1014
  DBUG_ENTER("ha_archive::get_row");
 
1015
  DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", 
 
1016
                            (uchar)file_to_read->version, 
 
1017
                            ARCHIVE_VERSION));
 
1018
  if (file_to_read->version == ARCHIVE_VERSION)
 
1019
    rc= get_row_version3(file_to_read, buf);
 
1020
  else
 
1021
    rc= -1;
 
1022
 
 
1023
  DBUG_PRINT("ha_archive", ("Return %d\n", rc));
 
1024
 
 
1025
  DBUG_RETURN(rc);
 
1026
}
 
1027
 
 
1028
/* Reallocate buffer if needed */
 
1029
bool ha_archive::fix_rec_buff(unsigned int length)
 
1030
{
 
1031
  DBUG_ENTER("ha_archive::fix_rec_buff");
 
1032
  DBUG_PRINT("ha_archive", ("Fixing %u for %u", 
 
1033
                            length, record_buffer->length));
 
1034
  DBUG_ASSERT(record_buffer->buffer);
 
1035
 
 
1036
  if (length > record_buffer->length)
 
1037
  {
 
1038
    uchar *newptr;
 
1039
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
 
1040
                                    length,
 
1041
                                    MYF(MY_ALLOW_ZERO_PTR))))
 
1042
      DBUG_RETURN(1);
 
1043
    record_buffer->buffer= newptr;
 
1044
    record_buffer->length= length;
 
1045
  }
 
1046
 
 
1047
  DBUG_ASSERT(length <= record_buffer->length);
 
1048
 
 
1049
  DBUG_RETURN(0);
 
1050
}
 
1051
 
 
1052
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
 
1053
{
 
1054
  DBUG_ENTER("ha_archive::unpack_row");
 
1055
 
 
1056
  unsigned int read;
 
1057
  int error;
 
1058
  const uchar *ptr;
 
1059
 
 
1060
  read= azread_row(file_to_read, &error);
 
1061
  ptr= (const uchar *)file_to_read->row_ptr;
 
1062
 
 
1063
  if (error || read == 0)
 
1064
  {
 
1065
    DBUG_RETURN(-1);
 
1066
  }
 
1067
 
 
1068
  /* Copy null bits */
 
1069
  memcpy(record, ptr, table->s->null_bytes);
 
1070
  ptr+= table->s->null_bytes;
 
1071
  for (Field **field=table->field ; *field ; field++)
 
1072
  {
 
1073
    if (!((*field)->is_null()))
 
1074
    {
 
1075
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
 
1076
    }
 
1077
  }
 
1078
  DBUG_RETURN(0);
 
1079
}
 
1080
 
 
1081
 
 
1082
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
 
1083
{
 
1084
  DBUG_ENTER("ha_archive::get_row_version3");
 
1085
 
 
1086
  int returnable= unpack_row(file_to_read, buf);
 
1087
 
 
1088
  DBUG_RETURN(returnable);
 
1089
}
 
1090
 
 
1091
 
 
1092
/* 
 
1093
  Called during ORDER BY. Its position is either from being called sequentially
 
1094
  or by having had ha_archive::rnd_pos() called before it is called.
 
1095
*/
 
1096
 
 
1097
int ha_archive::rnd_next(uchar *buf)
 
1098
{
 
1099
  int rc;
 
1100
  DBUG_ENTER("ha_archive::rnd_next");
 
1101
 
 
1102
  if (share->crashed)
 
1103
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1104
 
 
1105
  if (!scan_rows)
 
1106
    DBUG_RETURN(HA_ERR_END_OF_FILE);
 
1107
  scan_rows--;
 
1108
 
 
1109
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1110
  current_position= aztell(&archive);
 
1111
  rc= get_row(&archive, buf);
 
1112
 
 
1113
  table->status=rc ? STATUS_NOT_FOUND: 0;
 
1114
 
 
1115
  DBUG_RETURN(rc);
 
1116
}
 
1117
 
 
1118
 
 
1119
/*
 
1120
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
 
1121
  each call to ha_archive::rnd_next() if an ordering of the rows is
 
1122
  needed.
 
1123
*/
 
1124
 
 
1125
void ha_archive::position(const uchar *record)
 
1126
{
 
1127
  DBUG_ENTER("ha_archive::position");
 
1128
  my_store_ptr(ref, ref_length, current_position);
 
1129
  DBUG_VOID_RETURN;
 
1130
}
 
1131
 
 
1132
 
 
1133
/*
 
1134
  This is called after a table scan for each row if the results of the
 
1135
  scan need to be ordered. It will take *pos and use it to move the
 
1136
  cursor in the file so that the next row that is called is the
 
1137
  correctly ordered row.
 
1138
*/
 
1139
 
 
1140
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
 
1141
{
 
1142
  DBUG_ENTER("ha_archive::rnd_pos");
 
1143
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
 
1144
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
 
1145
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
 
1146
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
 
1147
  DBUG_RETURN(get_row(&archive, buf));
 
1148
}
 
1149
 
 
1150
/*
 
1151
  This method repairs the meta file. It does this by walking the datafile and 
 
1152
  rewriting the meta file. Currently it does this by calling optimize with
 
1153
  the extended flag.
 
1154
*/
 
1155
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
 
1156
{
 
1157
  DBUG_ENTER("ha_archive::repair");
 
1158
  check_opt->flags= T_EXTEND;
 
1159
  int rc= optimize(thd, check_opt);
 
1160
 
 
1161
  if (rc)
 
1162
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
 
1163
 
 
1164
  share->crashed= FALSE;
 
1165
  DBUG_RETURN(0);
 
1166
}
 
1167
 
 
1168
/*
 
1169
  The table can become fragmented if data was inserted, read, and then
 
1170
  inserted again. What we do is open up the file and recompress it completely. 
 
1171
*/
 
1172
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
 
1173
{
 
1174
  DBUG_ENTER("ha_archive::optimize");
 
1175
  int rc= 0;
 
1176
  azio_stream writer;
 
1177
  char writer_filename[FN_REFLEN];
 
1178
 
 
1179
  init_archive_reader();
 
1180
 
 
1181
  // now we close both our writer and our reader for the rename
 
1182
  if (share->archive_write_open)
 
1183
  {
 
1184
    azclose(&(share->archive_write));
 
1185
    share->archive_write_open= FALSE;
 
1186
  }
 
1187
 
 
1188
  /* Lets create a file to contain the new data */
 
1189
  fn_format(writer_filename, share->table_name, "", ARN, 
 
1190
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
 
1191
 
 
1192
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
 
1193
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 
 
1194
 
 
1195
  /* 
 
1196
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
 
1197
    Any dead rows are removed (aka rows that may have been partially recorded). 
 
1198
 
 
1199
    As of Archive format 3, this is the only type that is performed, before this
 
1200
    version it was just done on T_EXTEND
 
1201
  */
 
1202
  if (1)
 
1203
  {
 
1204
    DBUG_PRINT("ha_archive", ("archive extended rebuild"));
 
1205
 
 
1206
    /*
 
1207
      Now we will rewind the archive file so that we are positioned at the 
 
1208
      start of the file.
 
1209
    */
 
1210
    azflush(&archive, Z_SYNC_FLUSH);
 
1211
    rc= read_data_header(&archive);
 
1212
 
 
1213
    /* 
 
1214
      On success of writing out the new header, we now fetch each row and
 
1215
      insert it into the new archive file. 
 
1216
    */
 
1217
    if (!rc)
 
1218
    {
 
1219
      unsigned long long x;
 
1220
      unsigned long long rows_restored;
 
1221
      share->rows_recorded= 0;
 
1222
      stats.auto_increment_value= 1;
 
1223
      share->archive_write.auto_increment= 0;
 
1224
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
 
1225
 
 
1226
      rows_restored= archive.rows;
 
1227
 
 
1228
      for (x= 0; x < rows_restored ; x++)
 
1229
      {
 
1230
        rc= get_row(&archive, table->record[0]);
 
1231
 
 
1232
        if (rc != 0)
 
1233
          break;
 
1234
 
 
1235
        real_write_row(table->record[0], &writer);
 
1236
        /*
 
1237
          Long term it should be possible to optimize this so that
 
1238
          it is not called on each row.
 
1239
        */
 
1240
        if (table->found_next_number_field)
 
1241
        {
 
1242
          Field *field= table->found_next_number_field;
 
1243
          uint64_t auto_value=
 
1244
            (uint64_t) field->val_int(table->record[0] +
 
1245
                                       field->offset(table->record[0]));
 
1246
          if (share->archive_write.auto_increment < auto_value)
 
1247
            stats.auto_increment_value=
 
1248
              (share->archive_write.auto_increment= auto_value) + 1;
 
1249
        }
 
1250
      }
 
1251
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
 
1252
      share->rows_recorded= (ha_rows)writer.rows;
 
1253
    }
 
1254
 
 
1255
    DBUG_PRINT("info", ("recovered %llu archive rows", 
 
1256
                        (unsigned long long)share->rows_recorded));
 
1257
 
 
1258
    DBUG_PRINT("ha_archive", ("recovered %llu archive rows", 
 
1259
                        (unsigned long long)share->rows_recorded));
 
1260
 
 
1261
    if (rc && rc != HA_ERR_END_OF_FILE)
 
1262
    {
 
1263
      goto error;
 
1264
    }
 
1265
  } 
 
1266
 
 
1267
  azclose(&writer);
 
1268
  share->dirty= FALSE;
 
1269
  
 
1270
  azclose(&archive);
 
1271
 
 
1272
  // make the file we just wrote be our data file
 
1273
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
 
1274
 
 
1275
 
 
1276
  DBUG_RETURN(rc);
 
1277
error:
 
1278
  DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
 
1279
  azclose(&writer);
 
1280
 
 
1281
  DBUG_RETURN(rc); 
 
1282
}
 
1283
 
 
1284
/* 
 
1285
  Below is an example of how to setup row level locking.
 
1286
*/
 
1287
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
 
1288
                                       THR_LOCK_DATA **to,
 
1289
                                       enum thr_lock_type lock_type)
 
1290
{
 
1291
  if (lock_type == TL_WRITE_DELAYED)
 
1292
    delayed_insert= TRUE;
 
1293
  else
 
1294
    delayed_insert= FALSE;
 
1295
 
 
1296
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
 
1297
  {
 
1298
    /* 
 
1299
      Here is where we get into the guts of a row level lock.
 
1300
      If TL_UNLOCK is set 
 
1301
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
 
1302
      TABLESPACE, then allow multiple writers 
 
1303
    */
 
1304
 
 
1305
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
 
1306
         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
 
1307
        && !thd_tablespace_op(thd))
 
1308
      lock_type = TL_WRITE_ALLOW_WRITE;
 
1309
 
 
1310
    /* 
 
1311
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
 
1312
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
 
1313
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
 
1314
      to t2. Convert the lock to a normal read lock to allow
 
1315
      concurrent inserts to t2. 
 
1316
    */
 
1317
 
 
1318
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
 
1319
      lock_type = TL_READ;
 
1320
 
 
1321
    lock.type=lock_type;
 
1322
  }
 
1323
 
 
1324
  *to++= &lock;
 
1325
 
 
1326
  return to;
 
1327
}
 
1328
 
 
1329
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
 
1330
{
 
1331
  DBUG_ENTER("ha_archive::update_create_info");
 
1332
 
 
1333
  ha_archive::info(HA_STATUS_AUTO);
 
1334
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
 
1335
  {
 
1336
    create_info->auto_increment_value= stats.auto_increment_value;
 
1337
  }
 
1338
 
 
1339
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
 
1340
    create_info->data_file_name= share->real_path;
 
1341
 
 
1342
  DBUG_VOID_RETURN;
 
1343
}
 
1344
 
 
1345
 
 
1346
/*
 
1347
  Hints for optimizer, see ha_tina for more information
 
1348
*/
 
1349
int ha_archive::info(uint flag)
 
1350
{
 
1351
  DBUG_ENTER("ha_archive::info");
 
1352
 
 
1353
  /* 
 
1354
    If dirty, we lock, and then reset/flush the data.
 
1355
    I found that just calling azflush() doesn't always work.
 
1356
  */
 
1357
  pthread_mutex_lock(&share->mutex);
 
1358
  if (share->dirty == TRUE)
 
1359
  {
 
1360
    DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
 
1361
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
1362
    share->rows_recorded= share->archive_write.rows;
 
1363
    share->dirty= FALSE;
 
1364
    if (share->version < global_version)
 
1365
    {
 
1366
      share->version_rows= share->rows_recorded;
 
1367
      share->version= global_version;
 
1368
    }
 
1369
 
 
1370
  }
 
1371
 
 
1372
  /* 
 
1373
    This should be an accurate number now, though bulk and delayed inserts can
 
1374
    cause the number to be inaccurate.
 
1375
  */
 
1376
  stats.records= share->rows_recorded;
 
1377
  pthread_mutex_unlock(&share->mutex);
 
1378
 
 
1379
  scan_rows= stats.records;
 
1380
  stats.deleted= 0;
 
1381
 
 
1382
  DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
 
1383
  /* Costs quite a bit more to get all information */
 
1384
  if (flag & HA_STATUS_TIME)
 
1385
  {
 
1386
    MY_STAT file_stat;  // Stat information for the data file
 
1387
 
 
1388
    VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));
 
1389
 
 
1390
    stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
 
1391
    stats.data_file_length= file_stat.st_size;
 
1392
    stats.create_time= file_stat.st_ctime;
 
1393
    stats.update_time= file_stat.st_mtime;
 
1394
    stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
 
1395
  }
 
1396
  stats.delete_length= 0;
 
1397
  stats.index_file_length=0;
 
1398
 
 
1399
  if (flag & HA_STATUS_AUTO)
 
1400
  {
 
1401
    init_archive_reader();
 
1402
    pthread_mutex_lock(&share->mutex);
 
1403
    azflush(&archive, Z_SYNC_FLUSH);
 
1404
    pthread_mutex_unlock(&share->mutex);
 
1405
    stats.auto_increment_value= archive.auto_increment + 1;
 
1406
  }
 
1407
 
 
1408
  DBUG_RETURN(0);
 
1409
}
 
1410
 
 
1411
 
 
1412
/*
 
1413
  This method tells us that a bulk insert operation is about to occur. We set
 
1414
  a flag which will keep write_row from saying that its data is dirty. This in
 
1415
  turn will keep selects from causing a sync to occur.
 
1416
  Basically, yet another optimizations to keep compression working well.
 
1417
*/
 
1418
void ha_archive::start_bulk_insert(ha_rows rows)
 
1419
{
 
1420
  DBUG_ENTER("ha_archive::start_bulk_insert");
 
1421
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
 
1422
    bulk_insert= TRUE;
 
1423
  DBUG_VOID_RETURN;
 
1424
}
 
1425
 
 
1426
 
 
1427
/* 
 
1428
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
 
1429
  flag, and set the share dirty so that the next select will call sync for us.
 
1430
*/
 
1431
int ha_archive::end_bulk_insert()
 
1432
{
 
1433
  DBUG_ENTER("ha_archive::end_bulk_insert");
 
1434
  bulk_insert= FALSE;
 
1435
  share->dirty= TRUE;
 
1436
  DBUG_RETURN(0);
 
1437
}
 
1438
 
 
1439
/*
 
1440
  We cancel a truncate command. The only way to delete an archive table is to drop it.
 
1441
  This is done for security reasons. In a later version we will enable this by 
 
1442
  allowing the user to select a different row format.
 
1443
*/
 
1444
int ha_archive::delete_all_rows()
 
1445
{
 
1446
  DBUG_ENTER("ha_archive::delete_all_rows");
 
1447
  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 
1448
}
 
1449
 
 
1450
/*
 
1451
  We just return state if asked.
 
1452
*/
 
1453
bool ha_archive::is_crashed() const 
 
1454
{
 
1455
  DBUG_ENTER("ha_archive::is_crashed");
 
1456
  DBUG_RETURN(share->crashed); 
 
1457
}
 
1458
 
 
1459
/*
 
1460
  Simple scan of the tables to make sure everything is ok.
 
1461
*/
 
1462
 
 
1463
int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
 
1464
{
 
1465
  int rc= 0;
 
1466
  const char *old_proc_info;
 
1467
  unsigned long long x;
 
1468
  DBUG_ENTER("ha_archive::check");
 
1469
 
 
1470
  old_proc_info= thd_proc_info(thd, "Checking table");
 
1471
  /* Flush any waiting data */
 
1472
  pthread_mutex_lock(&share->mutex);
 
1473
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
 
1474
  pthread_mutex_unlock(&share->mutex);
 
1475
 
 
1476
  /*
 
1477
    Now we will rewind the archive file so that we are positioned at the 
 
1478
    start of the file.
 
1479
  */
 
1480
  init_archive_reader();
 
1481
  azflush(&archive, Z_SYNC_FLUSH);
 
1482
  read_data_header(&archive);
 
1483
  for (x= 0; x < share->archive_write.rows; x++)
 
1484
  {
 
1485
    rc= get_row(&archive, table->record[0]);
 
1486
 
 
1487
    if (rc != 0)
 
1488
      break;
 
1489
  }
 
1490
 
 
1491
  thd_proc_info(thd, old_proc_info);
 
1492
 
 
1493
  if ((rc && rc != HA_ERR_END_OF_FILE))  
 
1494
  {
 
1495
    share->crashed= FALSE;
 
1496
    DBUG_RETURN(HA_ADMIN_CORRUPT);
 
1497
  }
 
1498
  else
 
1499
  {
 
1500
    DBUG_RETURN(HA_ADMIN_OK);
 
1501
  }
 
1502
}
 
1503
 
 
1504
/*
 
1505
  Check and repair the table if needed.
 
1506
*/
 
1507
bool ha_archive::check_and_repair(THD *thd) 
 
1508
{
 
1509
  HA_CHECK_OPT check_opt;
 
1510
  DBUG_ENTER("ha_archive::check_and_repair");
 
1511
 
 
1512
  check_opt.init();
 
1513
 
 
1514
  DBUG_RETURN(repair(thd, &check_opt));
 
1515
}
 
1516
 
 
1517
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
 
1518
{
 
1519
  DBUG_ENTER("ha_archive::create_record_buffer");
 
1520
  archive_record_buffer *r;
 
1521
  if (!(r= 
 
1522
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
 
1523
                                           MYF(MY_WME))))
 
1524
  {
 
1525
    DBUG_RETURN(NULL); /* purecov: inspected */
 
1526
  }
 
1527
  r->length= (int)length;
 
1528
 
 
1529
  if (!(r->buffer= (uchar*) my_malloc(r->length,
 
1530
                                    MYF(MY_WME))))
 
1531
  {
 
1532
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
 
1533
    DBUG_RETURN(NULL); /* purecov: inspected */
 
1534
  }
 
1535
 
 
1536
  DBUG_RETURN(r);
 
1537
}
 
1538
 
 
1539
void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
 
1540
{
 
1541
  DBUG_ENTER("ha_archive::destroy_record_buffer");
 
1542
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
 
1543
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
 
1544
  DBUG_VOID_RETURN;
 
1545
}
 
1546
 
 
1547
static MYSQL_SYSVAR_BOOL(aio, archive_use_aio,
 
1548
  PLUGIN_VAR_NOCMDOPT,
 
1549
  "Whether or not to use asynchronous IO.",
 
1550
  NULL, NULL, TRUE);
 
1551
 
 
1552
static struct st_mysql_sys_var* archive_system_variables[]= {
 
1553
  MYSQL_SYSVAR(aio),
 
1554
  NULL
 
1555
};
 
1556
 
 
1557
struct st_mysql_storage_engine archive_storage_engine=
 
1558
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
 
1559
 
 
1560
mysql_declare_plugin(archive)
 
1561
{
 
1562
  MYSQL_STORAGE_ENGINE_PLUGIN,
 
1563
  &archive_storage_engine,
 
1564
  "ARCHIVE",
 
1565
  "Brian Aker, MySQL AB",
 
1566
  "Archive storage engine",
 
1567
  PLUGIN_LICENSE_GPL,
 
1568
  archive_db_init, /* Plugin Init */
 
1569
  archive_db_done, /* Plugin Deinit */
 
1570
  0x0350 /* 3.0 */,
 
1571
  NULL,                       /* status variables                */
 
1572
  archive_system_variables,   /* system variables                */
 
1573
  NULL                        /* config options                  */
 
1574
}
 
1575
mysql_declare_plugin_end;
 
1576