~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2003 MySQL AB
2
3
  This program is free software; you can redistribute it and/or modify
4
  it under the terms of the GNU General Public License as published by
5
  the Free Software Foundation; version 2 of the License.
6
7
  This program is distributed in the hope that it will be useful,
8
  but WITHOUT ANY WARRANTY; without even the implied warranty of
9
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
  GNU General Public License for more details.
11
12
  You should have received a copy of the GNU General Public License
13
  along with this program; if not, write to the Free Software
14
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17
#pragma implementation        // gcc: Class implementation
18
#endif
19
20
#include "mysql_priv.h"
21
#include <myisam.h>
22
23
#include "ha_archive.h"
24
#include <my_dir.h>
25
26
#include <mysql/plugin.h>
27
28
/*
29
  First, if you want to understand storage engines you should look at 
30
  ha_example.cc and ha_example.h. 
31
32
  This example was written as a test case for a customer who needed
33
  a storage engine without indexes that could compress data very well.
34
  So, welcome to a completely compressed storage engine. This storage
35
  engine only does inserts. No replace, deletes, or updates. All reads are 
36
  complete table scans. Compression is done through a combination of packing
37
  and making use of the zlib library
38
  
39
  We keep a file pointer open for each instance of ha_archive for each read
40
  but for writes we keep one open file handle just for that. We flush it
41
  only if we have a read occur. azip handles compressing lots of records
42
  at once much better then doing lots of little records between writes.
43
  It is possible to not lock on writes but this would then mean we couldn't
44
  handle bulk inserts as well (that is if someone was trying to read at
45
  the same time since we would want to flush).
46
47
  A "meta" file is kept alongside the data file. This file serves two purpose.
48
  The first purpose is to track the number of rows in the table. The second 
49
  purpose is to determine if the table was closed properly or not. When the 
50
  meta file is first opened it is marked as dirty. It is opened when the table 
51
  itself is opened for writing. When the table is closed the new count for rows 
52
  is written to the meta file and the file is marked as clean. If the meta file 
53
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
54
  this point an error occurs and the user is told to rebuild the file.
55
  A rebuild scans the rows and rewrites the meta file. If corruption is found
56
  in the data file then the meta file is not repaired.
57
58
  At some point a recovery method for such a drastic case needs to be divised.
59
60
  Locks are row level, and you will get a consistant read. 
61
62
  For performance as far as table scans go it is quite fast. I don't have
63
  good numbers but locally it has out performed both Innodb and MyISAM. For
64
  Innodb the question will be if the table can be fit into the buffer
65
  pool. For MyISAM its a question of how much the file system caches the
66
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
  doesn't have enough memory to cache entire table that archive turns out 
68
  to be any faster. 
69
70
  Examples between MyISAM (packed) and Archive.
71
72
  Table with 76695844 identical rows:
73
  29680807 a_archive.ARZ
74
  920350317 a.MYD
75
76
77
  Table with 8991478 rows (all of Slashdot's comments):
78
  1922964506 comment_archive.ARZ
79
  2944970297 comment_text.MYD
80
81
82
  TODO:
83
   Allow users to set compression level.
84
   Allow adjustable block size.
85
   Implement versioning, should be easy.
86
   Allow for errors, find a way to mark bad rows.
87
   Add optional feature so that rows can be flushed at interval (which will cause less
88
     compression but may speed up ordered searches).
89
   Checkpoint the meta file to allow for faster rebuilds.
90
   Option to allow for dirty reads, this would lower the sync calls, which would make
91
     inserts a lot faster, but would mean highly arbitrary reads.
92
93
    -Brian
94
*/
95
96
/* Variables for archive share methods */
97
pthread_mutex_t archive_mutex;
98
static HASH archive_open_tables;
99
static unsigned int global_version;
100
101
/* The file extension */
102
#define ARZ ".ARZ"               // The data file
103
#define ARN ".ARN"               // Files used during an optimize call
104
#define ARM ".ARM"               // Meta file (deprecated)
105
106
/*
107
  uchar + uchar
108
*/
109
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
110
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
111
112
/* Static declarations for handerton */
113
static handler *archive_create_handler(handlerton *hton, 
114
                                       TABLE_SHARE *table, 
115
                                       MEM_ROOT *mem_root);
116
int archive_discover(handlerton *hton, THD* thd, const char *db, 
117
                     const char *name,
118
                     uchar **frmblob, 
119
                     size_t *frmlen);
120
121
static my_bool archive_use_aio= FALSE;
122
123
/*
124
  Number of rows that will force a bulk insert.
125
*/
126
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
127
128
/*
129
  Size of header used for row
130
*/
131
#define ARCHIVE_ROW_HEADER_SIZE 4
132
133
static handler *archive_create_handler(handlerton *hton,
134
                                       TABLE_SHARE *table, 
135
                                       MEM_ROOT *mem_root)
136
{
137
  return new (mem_root) ha_archive(hton, table);
138
}
139
140
/*
141
  Used for hash table that tracks open tables.
142
*/
143
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
144
                             my_bool not_used __attribute__((unused)))
145
{
146
  *length=share->table_name_length;
147
  return (uchar*) share->table_name;
148
}
149
150
151
/*
152
  Initialize the archive handler.
153
154
  SYNOPSIS
155
    archive_db_init()
156
    void *
157
158
  RETURN
159
    FALSE       OK
160
    TRUE        Error
161
*/
162
163
int archive_db_init(void *p)
164
{
165
  DBUG_ENTER("archive_db_init");
166
  handlerton *archive_hton;
167
168
  archive_hton= (handlerton *)p;
169
  archive_hton->state= SHOW_OPTION_YES;
170
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
171
  archive_hton->create= archive_create_handler;
172
  archive_hton->flags= HTON_NO_FLAGS;
173
  archive_hton->discover= archive_discover;
174
175
  /* When the engine starts up set the first version */
176
  global_version= 1;
177
178
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
179
    goto error;
180
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
181
                (hash_get_key) archive_get_key, 0, 0))
182
  {
183
    VOID(pthread_mutex_destroy(&archive_mutex));
184
  }
185
  else
186
  {
187
    DBUG_RETURN(FALSE);
188
  }
189
error:
190
  DBUG_RETURN(TRUE);
191
}
192
193
/*
194
  Release the archive handler.
195
196
  SYNOPSIS
197
    archive_db_done()
198
    void
199
200
  RETURN
201
    FALSE       OK
202
*/
203
204
int archive_db_done(void *p)
205
{
206
  hash_free(&archive_open_tables);
207
  VOID(pthread_mutex_destroy(&archive_mutex));
208
209
  return 0;
210
}
211
212
213
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
214
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
215
{
216
  /* Set our original buffer from pre-allocated memory */
217
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
218
219
  /* The size of the offset value we will use for position() */
220
  ref_length= sizeof(my_off_t);
221
  archive_reader_open= FALSE;
222
}
223
224
int archive_discover(handlerton *hton, THD* thd, const char *db, 
225
                     const char *name,
226
                     uchar **frmblob, 
227
                     size_t *frmlen)
228
{
229
  DBUG_ENTER("archive_discover");
230
  DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name)); 
231
  azio_stream frm_stream;
232
  char az_file[FN_REFLEN];
233
  char *frm_ptr;
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
234
  struct stat file_stat; 
1 by brian
clean slate
235
236
  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
237
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
238
  if (stat(az_file, &file_stat))
1 by brian
clean slate
239
    goto err;
240
241
  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
242
  {
243
    if (errno == EROFS || errno == EACCES)
244
      DBUG_RETURN(my_errno= errno);
245
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
246
  }
247
248
  if (frm_stream.frm_length == 0)
249
    goto err;
250
251
  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
252
  azread_frm(&frm_stream, frm_ptr);
253
  azclose(&frm_stream);
254
255
  *frmlen= frm_stream.frm_length;
256
  *frmblob= (uchar*) frm_ptr;
257
258
  DBUG_RETURN(0);
259
err:
260
  my_errno= 0;
261
  DBUG_RETURN(1);
262
}
263
264
/*
265
  This method reads the header of a datafile and returns whether or not it was successful.
266
*/
267
int ha_archive::read_data_header(azio_stream *file_to_read)
268
{
269
  DBUG_ENTER("ha_archive::read_data_header");
270
271
  if (azread_init(file_to_read) == -1)
272
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
273
274
  if (file_to_read->version >= 3)
275
    DBUG_RETURN(0);
276
277
  DBUG_RETURN(1);
278
}
279
280
281
/*
282
  We create the shared memory space that we will use for the open table. 
283
  No matter what we try to get or create a share. This is so that a repair
284
  table operation can occur. 
285
286
  See ha_example.cc for a longer description.
287
*/
288
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
289
{
290
  uint length;
291
  DBUG_ENTER("ha_archive::get_share");
292
293
  pthread_mutex_lock(&archive_mutex);
294
  length=(uint) strlen(table_name);
295
296
  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
297
                                           (uchar*) table_name,
298
                                           length)))
299
  {
300
    char *tmp_name;
301
    azio_stream archive_tmp;
302
303
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
304
                          &share, sizeof(*share),
305
                          &tmp_name, length+1,
306
                          NullS)) 
307
    {
308
      pthread_mutex_unlock(&archive_mutex);
309
      *rc= HA_ERR_OUT_OF_MEM;
310
      DBUG_RETURN(NULL);
311
    }
312
313
    share->use_count= 0;
314
    share->table_name_length= length;
315
    share->table_name= tmp_name;
316
    share->crashed= FALSE;
317
    share->archive_write_open= FALSE;
318
    fn_format(share->data_file_name, table_name, "",
319
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
320
    strmov(share->table_name, table_name);
321
    DBUG_PRINT("ha_archive", ("Data File %s", 
322
                        share->data_file_name));
323
    /*
324
      We will use this lock for rows.
325
    */
326
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
327
    
328
    /*
329
      We read the meta file, but do not mark it dirty. Since we are not
330
      doing a write we won't mark it dirty (and we won't open it for
331
      anything but reading... open it for write and we will generate null
332
      compression writes).
333
    */
334
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
335
                 AZ_METHOD_BLOCK)))
336
    {
337
      VOID(pthread_mutex_destroy(&share->mutex));
338
      free(share);
339
      pthread_mutex_unlock(&archive_mutex);
340
      *rc= HA_ERR_CRASHED_ON_REPAIR;
341
      DBUG_RETURN(NULL);
342
    }
343
    stats.auto_increment_value= archive_tmp.auto_increment + 1;
344
    share->rows_recorded= (ha_rows)archive_tmp.rows;
345
    share->crashed= archive_tmp.dirty;
346
    if (share->version < global_version)
347
    {
348
      share->version_rows= share->rows_recorded;
349
      share->version= global_version;
350
    }
351
    azclose(&archive_tmp);
352
353
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
354
    thr_lock_init(&share->lock);
355
  }
356
  share->use_count++;
357
  DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now", 
358
                      share->table_name_length, share->table_name,
359
                      share->use_count));
360
  if (share->crashed)
361
    *rc= HA_ERR_CRASHED_ON_USAGE;
362
  pthread_mutex_unlock(&archive_mutex);
363
364
  DBUG_RETURN(share);
365
}
366
367
368
/* 
369
  Free the share.
370
  See ha_example.cc for a description.
371
*/
372
int ha_archive::free_share()
373
{
374
  int rc= 0;
375
  DBUG_ENTER("ha_archive::free_share");
376
  DBUG_PRINT("ha_archive",
377
             ("archive table %.*s has %d open handles on entrance", 
378
              share->table_name_length, share->table_name,
379
              share->use_count));
380
381
  pthread_mutex_lock(&archive_mutex);
382
  if (!--share->use_count)
383
  {
384
    hash_delete(&archive_open_tables, (uchar*) share);
385
    thr_lock_delete(&share->lock);
386
    VOID(pthread_mutex_destroy(&share->mutex));
387
    /* 
388
      We need to make sure we don't reset the crashed state.
389
      If we open a crashed file, wee need to close it as crashed unless
390
      it has been repaired.
391
      Since we will close the data down after this, we go on and count
392
      the flush on close;
393
    */
394
    if (share->archive_write_open == TRUE)
395
    {
396
      if (azclose(&(share->archive_write)))
397
        rc= 1;
398
    }
399
    my_free((uchar*) share, MYF(0));
400
  }
401
  pthread_mutex_unlock(&archive_mutex);
402
403
  DBUG_RETURN(rc);
404
}
405
406
int ha_archive::init_archive_writer()
407
{
408
  DBUG_ENTER("ha_archive::init_archive_writer");
409
  /* 
410
    It is expensive to open and close the data files and since you can't have
411
    a gzip file that can be both read and written we keep a writer open
412
    that is shared amoung all open tables.
413
  */
414
  if (!(azopen(&(share->archive_write), share->data_file_name, 
415
               O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
416
  {
417
    DBUG_PRINT("ha_archive", ("Could not open archive write file"));
418
    share->crashed= TRUE;
419
    DBUG_RETURN(1);
420
  }
421
  share->archive_write_open= TRUE;
422
423
  DBUG_RETURN(0);
424
}
425
426
427
/* 
428
  No locks are required because it is associated with just one handler instance
429
*/
430
int ha_archive::init_archive_reader()
431
{
432
  DBUG_ENTER("ha_archive::init_archive_reader");
433
  /* 
434
    It is expensive to open and close the data files and since you can't have
435
    a gzip file that can be both read and written we keep a writer open
436
    that is shared amoung all open tables.
437
  */
438
  if (archive_reader_open == FALSE)
439
  {
440
    az_method method;
441
442
    switch (archive_use_aio)
443
    {
444
    case FALSE:
445
      method= AZ_METHOD_BLOCK;
446
      break;
447
    case TRUE:
448
      method= AZ_METHOD_AIO;
449
      break;
450
    default:
451
      method= AZ_METHOD_BLOCK;
452
    }
453
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY, 
454
                 method)))
455
    {
456
      DBUG_PRINT("ha_archive", ("Could not open archive read file"));
457
      share->crashed= TRUE;
458
      DBUG_RETURN(1);
459
    }
460
    archive_reader_open= TRUE;
461
  }
462
463
  DBUG_RETURN(0);
464
}
465
466
467
/*
468
  We just implement one additional file extension.
469
*/
470
static const char *ha_archive_exts[] = {
471
  ARZ,
472
  NullS
473
};
474
475
const char **ha_archive::bas_ext() const
476
{
477
  return ha_archive_exts;
478
}
479
480
481
/* 
482
  When opening a file we:
483
  Create/get our shared structure.
484
  Init out lock.
485
  We open the file we will read from.
486
*/
487
int ha_archive::open(const char *name, int mode, uint open_options)
488
{
489
  int rc= 0;
490
  DBUG_ENTER("ha_archive::open");
491
492
  DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", 
493
                      (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
494
  share= get_share(name, &rc);
495
496
  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
497
  {
498
    /* purecov: begin inspected */
499
    free_share();
500
    DBUG_RETURN(rc);
501
    /* purecov: end */    
502
  }
503
  else if (rc == HA_ERR_OUT_OF_MEM)
504
  {
505
    DBUG_RETURN(rc);
506
  }
507
508
  DBUG_ASSERT(share);
509
510
  record_buffer= create_record_buffer(table->s->reclength + 
511
                                      ARCHIVE_ROW_HEADER_SIZE);
512
513
  if (!record_buffer)
514
  {
515
    free_share();
516
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
517
  }
518
519
  thr_lock_data_init(&share->lock, &lock, NULL);
520
521
  DBUG_PRINT("ha_archive", ("archive table was crashed %s", 
522
                      rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
523
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
524
  {
525
    DBUG_RETURN(0);
526
  }
527
  else
528
    DBUG_RETURN(rc);
529
}
530
531
532
/*
533
  Closes the file.
534
535
  SYNOPSIS
536
    close();
537
  
538
  IMPLEMENTATION:
539
540
  We first close this storage engines file handle to the archive and
541
  then remove our reference count to the table (and possibly free it
542
  as well).
543
544
  RETURN
545
    0  ok
546
    1  Error
547
*/
548
549
int ha_archive::close(void)
550
{
551
  int rc= 0;
552
  DBUG_ENTER("ha_archive::close");
553
554
  destroy_record_buffer(record_buffer);
555
556
  /* First close stream */
557
  if (archive_reader_open == TRUE)
558
  {
559
    if (azclose(&archive))
560
      rc= 1;
561
  }
562
  /* then also close share */
563
  rc|= free_share();
564
565
  DBUG_RETURN(rc);
566
}
567
568
569
/*
570
  We create our data file here. The format is pretty simple. 
571
  You can read about the format of the data file above.
572
  Unlike other storage engines we do not "pack" our data. Since we 
573
  are about to do a general compression, packing would just be a waste of 
574
  CPU time. If the table has blobs they are written after the row in the order 
575
  of creation.
576
*/
577
578
int ha_archive::create(const char *name, TABLE *table_arg,
579
                       HA_CREATE_INFO *create_info)
580
{
581
  char name_buff[FN_REFLEN];
582
  char linkname[FN_REFLEN];
583
  int error;
584
  azio_stream create_stream;            /* Archive file we are working with */
585
  File frm_file;                   /* File handler for readers */
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
586
  struct stat file_stat;
1 by brian
clean slate
587
  uchar *frm_ptr;
588
589
  DBUG_ENTER("ha_archive::create");
590
591
  stats.auto_increment_value= create_info->auto_increment_value;
592
593
  for (uint key= 0; key < table_arg->s->keys; key++)
594
  {
595
    KEY *pos= table_arg->key_info+key;
596
    KEY_PART_INFO *key_part=     pos->key_part;
597
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
598
599
    for (; key_part != key_part_end; key_part++)
600
    {
601
      Field *field= key_part->field;
602
603
      if (!(field->flags & AUTO_INCREMENT_FLAG))
604
      {
605
        error= -1;
606
        DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
607
        goto error;
608
      }
609
    }
610
  }
611
612
  /* 
613
    We reuse name_buff since it is available.
614
  */
615
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
616
  {
617
    DBUG_PRINT("ha_archive", ("archive will create stream file %s", 
618
                        create_info->data_file_name));
619
                        
620
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
621
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
622
    fn_format(linkname, name, "", ARZ,
623
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
624
  }
625
  else
626
  {
627
    fn_format(name_buff, name, "", ARZ,
628
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
629
    linkname[0]= 0;
630
  }
631
632
  /*
633
    There is a chance that the file was "discovered". In this case
634
    just use whatever file is there.
635
  */
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
636
  if (!stat(name_buff, &file_stat))
1 by brian
clean slate
637
  {
638
    my_errno= 0;
639
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
640
                 AZ_METHOD_BLOCK)))
641
    {
642
      error= errno;
643
      goto error2;
644
    }
645
646
    if (linkname[0])
647
      my_symlink(name_buff, linkname, MYF(0));
648
    fn_format(name_buff, name, "", ".frm",
649
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
650
651
    /*
652
      Here is where we open up the frm and pass it to archive to store 
653
    */
654
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
655
    {
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
656
      if (fstat(frm_file, &file_stat))
1 by brian
clean slate
657
      {
658
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
659
        if (frm_ptr)
660
        {
661
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
662
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
663
          my_free((uchar*)frm_ptr, MYF(0));
664
        }
665
      }
666
      my_close(frm_file, MYF(0));
667
    }
668
669
    if (create_info->comment.str)
670
      azwrite_comment(&create_stream, create_info->comment.str, 
671
                      (unsigned int)create_info->comment.length);
672
673
    /* 
674
      Yes you need to do this, because the starting value 
675
      for the autoincrement may not be zero.
676
    */
677
    create_stream.auto_increment= stats.auto_increment_value ?
678
                                    stats.auto_increment_value - 1 : 0;
679
    if (azclose(&create_stream))
680
    {
681
      error= errno;
682
      goto error2;
683
    }
684
  }
685
  else
686
    my_errno= 0;
687
688
  DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
689
  DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
690
691
692
  DBUG_RETURN(0);
693
694
error2:
695
  delete_table(name);
696
error:
697
  /* Return error number, if we got one */
698
  DBUG_RETURN(error ? error : -1);
699
}
700
701
/*
702
  This is where the actual row is written out.
703
*/
704
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
705
{
706
  my_off_t written;
707
  unsigned int r_pack_length;
708
  DBUG_ENTER("ha_archive::real_write_row");
709
710
  /* We pack the row for writing */
711
  r_pack_length= pack_row(buf);
712
713
  written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
714
  if (written != r_pack_length)
715
  {
716
    DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", 
717
                                              (uint32) written, 
718
                                              (uint32)r_pack_length));
719
    DBUG_RETURN(-1);
720
  }
721
722
  if (!delayed_insert || !bulk_insert)
723
    share->dirty= TRUE;
724
725
  DBUG_RETURN(0);
726
}
727
728
729
/* 
730
  Calculate max length needed for row. This includes
731
  the bytes required for the length in the header.
732
*/
733
734
uint32 ha_archive::max_row_length(const uchar *buf)
735
{
736
  uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
737
  length+= ARCHIVE_ROW_HEADER_SIZE;
738
739
  uint *ptr, *end;
740
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
741
       ptr != end ;
742
       ptr++)
743
  {
744
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
745
  }
746
747
  return length;
748
}
749
750
751
unsigned int ha_archive::pack_row(uchar *record)
752
{
753
  uchar *ptr;
754
755
  DBUG_ENTER("ha_archive::pack_row");
756
757
758
  if (fix_rec_buff(max_row_length(record)))
759
    DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
760
761
  /* Copy null bits */
762
  memcpy(record_buffer->buffer, record, table->s->null_bytes);
763
  ptr= record_buffer->buffer + table->s->null_bytes;
764
765
  for (Field **field=table->field ; *field ; field++)
766
  {
767
    if (!((*field)->is_null()))
768
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
769
  }
770
771
  DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
772
                           (ptr - record_buffer->buffer - 
773
                             ARCHIVE_ROW_HEADER_SIZE)));
774
775
  DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
776
}
777
778
779
/* 
780
  Look at ha_archive::open() for an explanation of the row format.
781
  Here we just write out the row.
782
783
  Wondering about start_bulk_insert()? We don't implement it for
784
  archive since it optimizes for lots of writes. The only save
785
  for implementing start_bulk_insert() is that we could skip 
786
  setting dirty to true each time.
787
*/
788
int ha_archive::write_row(uchar *buf)
789
{
790
  int rc;
791
  uchar *read_buf= NULL;
792
  uint64_t temp_auto;
793
  uchar *record=  table->record[0];
794
  DBUG_ENTER("ha_archive::write_row");
795
796
  if (share->crashed)
797
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
798
799
  ha_statistic_increment(&SSV::ha_write_count);
800
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
801
    table->timestamp_field->set_time();
802
  pthread_mutex_lock(&share->mutex);
803
804
  if (share->archive_write_open == FALSE)
805
    if (init_archive_writer())
806
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
807
808
809
  if (table->next_number_field && record == table->record[0])
810
  {
811
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
812
    update_auto_increment();
813
    temp_auto= table->next_number_field->val_int();
814
815
    /*
816
      We don't support decremening auto_increment. They make the performance
817
      just cry.
818
    */
819
    if (temp_auto <= share->archive_write.auto_increment && 
820
        mkey->flags & HA_NOSAME)
821
    {
822
      rc= HA_ERR_FOUND_DUPP_KEY;
823
      goto error;
824
    }
825
#ifdef DEAD_CODE
826
    /*
827
      Bad news, this will cause a search for the unique value which is very 
828
      expensive since we will have to do a table scan which will lock up 
829
      all other writers during this period. This could perhaps be optimized 
830
      in the future.
831
    */
832
    {
833
      /* 
834
        First we create a buffer that we can use for reading rows, and can pass
835
        to get_row().
836
      */
837
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
838
      {
839
        rc= HA_ERR_OUT_OF_MEM;
840
        goto error;
841
      }
842
       /* 
843
         All of the buffer must be written out or we won't see all of the
844
         data 
845
       */
846
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
847
      /*
848
        Set the position of the local read thread to the beginning postion.
849
      */
850
      if (read_data_header(&archive))
851
      {
852
        rc= HA_ERR_CRASHED_ON_USAGE;
853
        goto error;
854
      }
855
856
      Field *mfield= table->next_number_field;
857
858
      while (!(get_row(&archive, read_buf)))
859
      {
860
        if (!memcmp(read_buf + mfield->offset(record),
861
                    table->next_number_field->ptr,
862
                    mfield->max_display_length()))
863
        {
864
          rc= HA_ERR_FOUND_DUPP_KEY;
865
          goto error;
866
        }
867
      }
868
    }
869
#endif
870
    else
871
    {
872
      if (temp_auto > share->archive_write.auto_increment)
873
        stats.auto_increment_value=
874
          (share->archive_write.auto_increment= temp_auto) + 1;
875
    }
876
  }
877
878
  /*
879
    Notice that the global auto_increment has been increased.
880
    In case of a failed row write, we will never try to reuse the value.
881
  */
882
  share->rows_recorded++;
883
  rc= real_write_row(buf,  &(share->archive_write));
884
error:
885
  pthread_mutex_unlock(&share->mutex);
886
  if (read_buf)
887
    my_free((uchar*) read_buf, MYF(0));
888
889
  DBUG_RETURN(rc);
890
}
891
892
893
void ha_archive::get_auto_increment(uint64_t offset, uint64_t increment,
894
                                    uint64_t nb_desired_values,
895
                                    uint64_t *first_value,
896
                                    uint64_t *nb_reserved_values)
897
{
898
  *nb_reserved_values= ULONGLONG_MAX;
899
  *first_value= share->archive_write.auto_increment + 1;
900
}
901
902
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
903
int ha_archive::index_init(uint keynr, bool sorted)
904
{
905
  DBUG_ENTER("ha_archive::index_init");
906
  active_index= keynr;
907
  DBUG_RETURN(0);
908
}
909
910
911
/*
912
  No indexes, so if we get a request for an index search since we tell
913
  the optimizer that we have unique indexes, we scan
914
*/
915
int ha_archive::index_read(uchar *buf, const uchar *key,
916
                             uint key_len, enum ha_rkey_function find_flag)
917
{
918
  int rc;
919
  DBUG_ENTER("ha_archive::index_read");
920
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
921
  DBUG_RETURN(rc);
922
}
923
924
925
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
926
                                 uint key_len, enum ha_rkey_function find_flag)
927
{
928
  int rc;
929
  bool found= 0;
930
  KEY *mkey= &table->s->key_info[index];
931
  current_k_offset= mkey->key_part->offset;
932
  current_key= key;
933
  current_key_len= key_len;
934
935
936
  DBUG_ENTER("ha_archive::index_read_idx");
937
938
  rc= rnd_init(TRUE);
939
940
  if (rc)
941
    goto error;
942
943
  while (!(get_row(&archive, buf)))
944
  {
945
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
946
    {
947
      found= 1;
948
      break;
949
    }
950
  }
951
952
  if (found)
953
    DBUG_RETURN(0);
954
955
error:
956
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
957
}
958
959
960
int ha_archive::index_next(uchar * buf) 
961
{ 
962
  bool found= 0;
963
964
  DBUG_ENTER("ha_archive::index_next");
965
966
  while (!(get_row(&archive, buf)))
967
  {
968
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
969
    {
970
      found= 1;
971
      break;
972
    }
973
  }
974
975
  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
976
}
977
978
/*
979
  All calls that need to scan the table start with this method. If we are told
980
  that it is a table scan we rewind the file to the beginning, otherwise
981
  we assume the position will be set.
982
*/
983
984
int ha_archive::rnd_init(bool scan)
985
{
986
  DBUG_ENTER("ha_archive::rnd_init");
987
  
988
  if (share->crashed)
989
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
990
991
  init_archive_reader();
992
993
  /* We rewind the file so that we can read from the beginning if scan */
994
  if (scan)
995
  {
996
    DBUG_PRINT("info", ("archive will retrieve %llu rows", 
997
                        (unsigned long long) scan_rows));
998
999
    if (read_data_header(&archive))
1000
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1001
  }
1002
1003
  DBUG_RETURN(0);
1004
}
1005
1006
1007
/*
1008
  This is the method that is used to read a row. It assumes that the row is 
1009
  positioned where you want it.
1010
*/
1011
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1012
{
1013
  int rc;
1014
  DBUG_ENTER("ha_archive::get_row");
1015
  DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", 
1016
                            (uchar)file_to_read->version, 
1017
                            ARCHIVE_VERSION));
1018
  if (file_to_read->version == ARCHIVE_VERSION)
1019
    rc= get_row_version3(file_to_read, buf);
1020
  else
1021
    rc= -1;
1022
1023
  DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1024
1025
  DBUG_RETURN(rc);
1026
}
1027
1028
/* Reallocate buffer if needed */
1029
bool ha_archive::fix_rec_buff(unsigned int length)
1030
{
1031
  DBUG_ENTER("ha_archive::fix_rec_buff");
1032
  DBUG_PRINT("ha_archive", ("Fixing %u for %u", 
1033
                            length, record_buffer->length));
1034
  DBUG_ASSERT(record_buffer->buffer);
1035
1036
  if (length > record_buffer->length)
1037
  {
1038
    uchar *newptr;
1039
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
1040
                                    length,
1041
				    MYF(MY_ALLOW_ZERO_PTR))))
1042
      DBUG_RETURN(1);
1043
    record_buffer->buffer= newptr;
1044
    record_buffer->length= length;
1045
  }
1046
1047
  DBUG_ASSERT(length <= record_buffer->length);
1048
1049
  DBUG_RETURN(0);
1050
}
1051
1052
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1053
{
1054
  DBUG_ENTER("ha_archive::unpack_row");
1055
1056
  unsigned int read;
1057
  int error;
1058
  const uchar *ptr;
1059
1060
  read= azread_row(file_to_read, &error);
1061
  ptr= (const uchar *)file_to_read->row_ptr;
1062
1063
  if (error || read == 0)
1064
  {
1065
    DBUG_RETURN(-1);
1066
  }
1067
1068
  /* Copy null bits */
1069
  memcpy(record, ptr, table->s->null_bytes);
1070
  ptr+= table->s->null_bytes;
1071
  for (Field **field=table->field ; *field ; field++)
1072
  {
1073
    if (!((*field)->is_null()))
1074
    {
1075
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1076
    }
1077
  }
1078
  DBUG_RETURN(0);
1079
}
1080
1081
1082
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1083
{
1084
  DBUG_ENTER("ha_archive::get_row_version3");
1085
1086
  int returnable= unpack_row(file_to_read, buf);
1087
1088
  DBUG_RETURN(returnable);
1089
}
1090
1091
1092
/* 
1093
  Called during ORDER BY. Its position is either from being called sequentially
1094
  or by having had ha_archive::rnd_pos() called before it is called.
1095
*/
1096
1097
int ha_archive::rnd_next(uchar *buf)
1098
{
1099
  int rc;
1100
  DBUG_ENTER("ha_archive::rnd_next");
1101
1102
  if (share->crashed)
1103
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1104
1105
  if (!scan_rows)
1106
    DBUG_RETURN(HA_ERR_END_OF_FILE);
1107
  scan_rows--;
1108
1109
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1110
  current_position= aztell(&archive);
1111
  rc= get_row(&archive, buf);
1112
1113
  table->status=rc ? STATUS_NOT_FOUND: 0;
1114
1115
  DBUG_RETURN(rc);
1116
}
1117
1118
1119
/*
1120
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1121
  each call to ha_archive::rnd_next() if an ordering of the rows is
1122
  needed.
1123
*/
1124
1125
void ha_archive::position(const uchar *record)
1126
{
1127
  DBUG_ENTER("ha_archive::position");
1128
  my_store_ptr(ref, ref_length, current_position);
1129
  DBUG_VOID_RETURN;
1130
}
1131
1132
1133
/*
1134
  This is called after a table scan for each row if the results of the
1135
  scan need to be ordered. It will take *pos and use it to move the
1136
  cursor in the file so that the next row that is called is the
1137
  correctly ordered row.
1138
*/
1139
1140
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1141
{
1142
  DBUG_ENTER("ha_archive::rnd_pos");
1143
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1144
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
1145
  if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
1146
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1147
  DBUG_RETURN(get_row(&archive, buf));
1148
}
1149
1150
/*
1151
  This method repairs the meta file. It does this by walking the datafile and 
1152
  rewriting the meta file. Currently it does this by calling optimize with
1153
  the extended flag.
1154
*/
1155
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1156
{
1157
  DBUG_ENTER("ha_archive::repair");
1158
  check_opt->flags= T_EXTEND;
1159
  int rc= optimize(thd, check_opt);
1160
1161
  if (rc)
1162
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1163
1164
  share->crashed= FALSE;
1165
  DBUG_RETURN(0);
1166
}
1167
1168
/*
1169
  The table can become fragmented if data was inserted, read, and then
1170
  inserted again. What we do is open up the file and recompress it completely. 
1171
*/
1172
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1173
{
1174
  DBUG_ENTER("ha_archive::optimize");
1175
  int rc= 0;
1176
  azio_stream writer;
1177
  char writer_filename[FN_REFLEN];
1178
1179
  init_archive_reader();
1180
1181
  // now we close both our writer and our reader for the rename
1182
  if (share->archive_write_open)
1183
  {
1184
    azclose(&(share->archive_write));
1185
    share->archive_write_open= FALSE;
1186
  }
1187
1188
  /* Lets create a file to contain the new data */
1189
  fn_format(writer_filename, share->table_name, "", ARN, 
1190
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1191
1192
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1193
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 
1194
1195
  /* 
1196
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
1197
    Any dead rows are removed (aka rows that may have been partially recorded). 
1198
1199
    As of Archive format 3, this is the only type that is performed, before this
1200
    version it was just done on T_EXTEND
1201
  */
1202
  if (1)
1203
  {
1204
    DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1205
1206
    /*
1207
      Now we will rewind the archive file so that we are positioned at the 
1208
      start of the file.
1209
    */
1210
    azflush(&archive, Z_SYNC_FLUSH);
1211
    rc= read_data_header(&archive);
1212
1213
    /* 
1214
      On success of writing out the new header, we now fetch each row and
1215
      insert it into the new archive file. 
1216
    */
1217
    if (!rc)
1218
    {
1219
      unsigned long long x;
1220
      unsigned long long rows_restored;
1221
      share->rows_recorded= 0;
1222
      stats.auto_increment_value= 1;
1223
      share->archive_write.auto_increment= 0;
1224
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1225
1226
      rows_restored= archive.rows;
1227
1228
      for (x= 0; x < rows_restored ; x++)
1229
      {
1230
        rc= get_row(&archive, table->record[0]);
1231
1232
        if (rc != 0)
1233
          break;
1234
1235
        real_write_row(table->record[0], &writer);
1236
        /*
1237
          Long term it should be possible to optimize this so that
1238
          it is not called on each row.
1239
        */
1240
        if (table->found_next_number_field)
1241
        {
1242
          Field *field= table->found_next_number_field;
1243
          uint64_t auto_value=
1244
            (uint64_t) field->val_int(table->record[0] +
1245
                                       field->offset(table->record[0]));
1246
          if (share->archive_write.auto_increment < auto_value)
1247
            stats.auto_increment_value=
1248
              (share->archive_write.auto_increment= auto_value) + 1;
1249
        }
1250
      }
1251
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
1252
      share->rows_recorded= (ha_rows)writer.rows;
1253
    }
1254
1255
    DBUG_PRINT("info", ("recovered %llu archive rows", 
1256
                        (unsigned long long)share->rows_recorded));
1257
1258
    DBUG_PRINT("ha_archive", ("recovered %llu archive rows", 
1259
                        (unsigned long long)share->rows_recorded));
1260
1261
    if (rc && rc != HA_ERR_END_OF_FILE)
1262
    {
1263
      goto error;
1264
    }
1265
  } 
1266
1267
  azclose(&writer);
1268
  share->dirty= FALSE;
1269
  
1270
  azclose(&archive);
1271
1272
  // make the file we just wrote be our data file
1273
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1274
1275
1276
  DBUG_RETURN(rc);
1277
error:
1278
  DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1279
  azclose(&writer);
1280
1281
  DBUG_RETURN(rc); 
1282
}
1283
1284
/* 
1285
  Below is an example of how to setup row level locking.
1286
*/
1287
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1288
                                       THR_LOCK_DATA **to,
1289
                                       enum thr_lock_type lock_type)
1290
{
1291
  if (lock_type == TL_WRITE_DELAYED)
1292
    delayed_insert= TRUE;
1293
  else
1294
    delayed_insert= FALSE;
1295
1296
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
1297
  {
1298
    /* 
1299
      Here is where we get into the guts of a row level lock.
1300
      If TL_UNLOCK is set 
1301
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
1302
      TABLESPACE, then allow multiple writers 
1303
    */
1304
1305
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1306
         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1307
        && !thd_tablespace_op(thd))
1308
      lock_type = TL_WRITE_ALLOW_WRITE;
1309
1310
    /* 
1311
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1312
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1313
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1314
      to t2. Convert the lock to a normal read lock to allow
1315
      concurrent inserts to t2. 
1316
    */
1317
1318
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
1319
      lock_type = TL_READ;
1320
1321
    lock.type=lock_type;
1322
  }
1323
1324
  *to++= &lock;
1325
1326
  return to;
1327
}
1328
1329
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1330
{
1331
  DBUG_ENTER("ha_archive::update_create_info");
1332
1333
  ha_archive::info(HA_STATUS_AUTO);
1334
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1335
  {
1336
    create_info->auto_increment_value= stats.auto_increment_value;
1337
  }
1338
1339
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
1340
    create_info->data_file_name= share->real_path;
1341
1342
  DBUG_VOID_RETURN;
1343
}
1344
1345
1346
/*
1347
  Hints for optimizer, see ha_tina for more information
1348
*/
1349
int ha_archive::info(uint flag)
1350
{
1351
  DBUG_ENTER("ha_archive::info");
1352
1353
  /* 
1354
    If dirty, we lock, and then reset/flush the data.
1355
    I found that just calling azflush() doesn't always work.
1356
  */
1357
  pthread_mutex_lock(&share->mutex);
1358
  if (share->dirty == TRUE)
1359
  {
1360
    DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1361
    azflush(&(share->archive_write), Z_SYNC_FLUSH);
1362
    share->rows_recorded= share->archive_write.rows;
1363
    share->dirty= FALSE;
1364
    if (share->version < global_version)
1365
    {
1366
      share->version_rows= share->rows_recorded;
1367
      share->version= global_version;
1368
    }
1369
1370
  }
1371
1372
  /* 
1373
    This should be an accurate number now, though bulk and delayed inserts can
1374
    cause the number to be inaccurate.
1375
  */
1376
  stats.records= share->rows_recorded;
1377
  pthread_mutex_unlock(&share->mutex);
1378
1379
  scan_rows= stats.records;
1380
  stats.deleted= 0;
1381
1382
  DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1383
  /* Costs quite a bit more to get all information */
1384
  if (flag & HA_STATUS_TIME)
1385
  {
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
1386
    struct stat file_stat;  // Stat information for the data file
1 by brian
clean slate
1387
12.2.3 by Stewart Smith
remove my_stat and my_fstat, replace with standard POSIX functions and remove eleventy billion causes of bugs and confusion
1388
    VOID(stat(share->data_file_name, &file_stat));
1 by brian
clean slate
1389
1390
    stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
1391
    stats.data_file_length= file_stat.st_size;
1392
    stats.create_time= file_stat.st_ctime;
1393
    stats.update_time= file_stat.st_mtime;
1394
    stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1395
  }
1396
  stats.delete_length= 0;
1397
  stats.index_file_length=0;
1398
1399
  if (flag & HA_STATUS_AUTO)
1400
  {
1401
    init_archive_reader();
1402
    pthread_mutex_lock(&share->mutex);
1403
    azflush(&archive, Z_SYNC_FLUSH);
1404
    pthread_mutex_unlock(&share->mutex);
1405
    stats.auto_increment_value= archive.auto_increment + 1;
1406
  }
1407
1408
  DBUG_RETURN(0);
1409
}
1410
1411
1412
/*
1413
  This method tells us that a bulk insert operation is about to occur. We set
1414
  a flag which will keep write_row from saying that its data is dirty. This in
1415
  turn will keep selects from causing a sync to occur.
1416
  Basically, yet another optimizations to keep compression working well.
1417
*/
1418
void ha_archive::start_bulk_insert(ha_rows rows)
1419
{
1420
  DBUG_ENTER("ha_archive::start_bulk_insert");
1421
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1422
    bulk_insert= TRUE;
1423
  DBUG_VOID_RETURN;
1424
}
1425
1426
1427
/* 
1428
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1429
  flag, and set the share dirty so that the next select will call sync for us.
1430
*/
1431
int ha_archive::end_bulk_insert()
1432
{
1433
  DBUG_ENTER("ha_archive::end_bulk_insert");
1434
  bulk_insert= FALSE;
1435
  share->dirty= TRUE;
1436
  DBUG_RETURN(0);
1437
}
1438
1439
/*
1440
  We cancel a truncate command. The only way to delete an archive table is to drop it.
1441
  This is done for security reasons. In a later version we will enable this by 
1442
  allowing the user to select a different row format.
1443
*/
1444
int ha_archive::delete_all_rows()
1445
{
1446
  DBUG_ENTER("ha_archive::delete_all_rows");
1447
  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1448
}
1449
1450
/*
1451
  We just return state if asked.
1452
*/
1453
bool ha_archive::is_crashed() const 
1454
{
1455
  DBUG_ENTER("ha_archive::is_crashed");
1456
  DBUG_RETURN(share->crashed); 
1457
}
1458
1459
/*
1460
  Simple scan of the tables to make sure everything is ok.
1461
*/
1462
1463
int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1464
{
1465
  int rc= 0;
1466
  const char *old_proc_info;
1467
  unsigned long long x;
1468
  DBUG_ENTER("ha_archive::check");
1469
1470
  old_proc_info= thd_proc_info(thd, "Checking table");
1471
  /* Flush any waiting data */
1472
  pthread_mutex_lock(&share->mutex);
1473
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1474
  pthread_mutex_unlock(&share->mutex);
1475
1476
  /*
1477
    Now we will rewind the archive file so that we are positioned at the 
1478
    start of the file.
1479
  */
1480
  init_archive_reader();
1481
  azflush(&archive, Z_SYNC_FLUSH);
1482
  read_data_header(&archive);
1483
  for (x= 0; x < share->archive_write.rows; x++)
1484
  {
1485
    rc= get_row(&archive, table->record[0]);
1486
1487
    if (rc != 0)
1488
      break;
1489
  }
1490
1491
  thd_proc_info(thd, old_proc_info);
1492
1493
  if ((rc && rc != HA_ERR_END_OF_FILE))  
1494
  {
1495
    share->crashed= FALSE;
1496
    DBUG_RETURN(HA_ADMIN_CORRUPT);
1497
  }
1498
  else
1499
  {
1500
    DBUG_RETURN(HA_ADMIN_OK);
1501
  }
1502
}
1503
1504
/*
1505
  Check and repair the table if needed.
1506
*/
1507
bool ha_archive::check_and_repair(THD *thd) 
1508
{
1509
  HA_CHECK_OPT check_opt;
1510
  DBUG_ENTER("ha_archive::check_and_repair");
1511
1512
  check_opt.init();
1513
1514
  DBUG_RETURN(repair(thd, &check_opt));
1515
}
1516
1517
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
1518
{
1519
  DBUG_ENTER("ha_archive::create_record_buffer");
1520
  archive_record_buffer *r;
1521
  if (!(r= 
1522
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1523
                                           MYF(MY_WME))))
1524
  {
1525
    DBUG_RETURN(NULL); /* purecov: inspected */
1526
  }
1527
  r->length= (int)length;
1528
1529
  if (!(r->buffer= (uchar*) my_malloc(r->length,
1530
                                    MYF(MY_WME))))
1531
  {
1532
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1533
    DBUG_RETURN(NULL); /* purecov: inspected */
1534
  }
1535
1536
  DBUG_RETURN(r);
1537
}
1538
1539
void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
1540
{
1541
  DBUG_ENTER("ha_archive::destroy_record_buffer");
1542
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1543
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1544
  DBUG_VOID_RETURN;
1545
}
1546
1547
static MYSQL_SYSVAR_BOOL(aio, archive_use_aio,
1548
  PLUGIN_VAR_NOCMDOPT,
1549
  "Whether or not to use asynchronous IO.",
1550
  NULL, NULL, TRUE);
1551
1552
static struct st_mysql_sys_var* archive_system_variables[]= {
1553
  MYSQL_SYSVAR(aio),
1554
  NULL
1555
};
1556
1557
struct st_mysql_storage_engine archive_storage_engine=
1558
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
1559
1560
mysql_declare_plugin(archive)
1561
{
1562
  MYSQL_STORAGE_ENGINE_PLUGIN,
1563
  &archive_storage_engine,
1564
  "ARCHIVE",
1565
  "Brian Aker, MySQL AB",
1566
  "Archive storage engine",
1567
  PLUGIN_LICENSE_GPL,
1568
  archive_db_init, /* Plugin Init */
1569
  archive_db_done, /* Plugin Deinit */
1570
  0x0350 /* 3.0 */,
1571
  NULL,                       /* status variables                */
1572
  archive_system_variables,   /* system variables                */
1573
  NULL                        /* config options                  */
1574
}
1575
mysql_declare_plugin_end;
1576