1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17
#pragma implementation // gcc: Class implementation
20
#include "mysql_priv.h"
23
#include "ha_archive.h"
26
#include <mysql/plugin.h>
29
First, if you want to understand storage engines you should look at
30
ha_example.cc and ha_example.h.
32
This example was written as a test case for a customer who needed
33
a storage engine without indexes that could compress data very well.
34
So, welcome to a completely compressed storage engine. This storage
35
engine only does inserts. No replace, deletes, or updates. All reads are
36
complete table scans. Compression is done through a combination of packing
37
and making use of the zlib library
39
We keep a file pointer open for each instance of ha_archive for each read
40
but for writes we keep one open file handle just for that. We flush it
41
only if we have a read occur. azip handles compressing lots of records
42
at once much better then doing lots of little records between writes.
43
It is possible to not lock on writes but this would then mean we couldn't
44
handle bulk inserts as well (that is if someone was trying to read at
45
the same time since we would want to flush).
47
A "meta" file is kept alongside the data file. This file serves two purpose.
48
The first purpose is to track the number of rows in the table. The second
49
purpose is to determine if the table was closed properly or not. When the
50
meta file is first opened it is marked as dirty. It is opened when the table
51
itself is opened for writing. When the table is closed the new count for rows
52
is written to the meta file and the file is marked as clean. If the meta file
53
is opened and it is marked as dirty, it is assumed that a crash occured. At
54
this point an error occurs and the user is told to rebuild the file.
55
A rebuild scans the rows and rewrites the meta file. If corruption is found
56
in the data file then the meta file is not repaired.
58
At some point a recovery method for such a drastic case needs to be divised.
60
Locks are row level, and you will get a consistant read.
62
For performance as far as table scans go it is quite fast. I don't have
63
good numbers but locally it has out performed both Innodb and MyISAM. For
64
Innodb the question will be if the table can be fit into the buffer
65
pool. For MyISAM its a question of how much the file system caches the
66
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
doesn't have enough memory to cache entire table that archive turns out
70
Examples between MyISAM (packed) and Archive.
72
Table with 76695844 identical rows:
73
29680807 a_archive.ARZ
77
Table with 8991478 rows (all of Slashdot's comments):
78
1922964506 comment_archive.ARZ
79
2944970297 comment_text.MYD
83
Allow users to set compression level.
84
Allow adjustable block size.
85
Implement versioning, should be easy.
86
Allow for errors, find a way to mark bad rows.
87
Add optional feature so that rows can be flushed at interval (which will cause less
88
compression but may speed up ordered searches).
89
Checkpoint the meta file to allow for faster rebuilds.
90
Option to allow for dirty reads, this would lower the sync calls, which would make
91
inserts a lot faster, but would mean highly arbitrary reads.
96
/* Variables for archive share methods */
97
pthread_mutex_t archive_mutex;
98
static HASH archive_open_tables;
99
static unsigned int global_version;
101
/* The file extension */
102
#define ARZ ".ARZ" // The data file
103
#define ARN ".ARN" // Files used during an optimize call
104
#define ARM ".ARM" // Meta file (deprecated)
109
#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
110
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
112
/* Static declarations for handerton */
113
static handler *archive_create_handler(handlerton *hton,
116
int archive_discover(handlerton *hton, THD* thd, const char *db,
121
static my_bool archive_use_aio= FALSE;
124
Number of rows that will force a bulk insert.
126
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
129
Size of header used for row
131
#define ARCHIVE_ROW_HEADER_SIZE 4
133
static handler *archive_create_handler(handlerton *hton,
137
return new (mem_root) ha_archive(hton, table);
141
Used for hash table that tracks open tables.
143
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
144
my_bool not_used __attribute__((unused)))
146
*length=share->table_name_length;
147
return (uchar*) share->table_name;
152
Initialize the archive handler.
163
int archive_db_init(void *p)
165
DBUG_ENTER("archive_db_init");
166
handlerton *archive_hton;
168
archive_hton= (handlerton *)p;
169
archive_hton->state= SHOW_OPTION_YES;
170
archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
171
archive_hton->create= archive_create_handler;
172
archive_hton->flags= HTON_NO_FLAGS;
173
archive_hton->discover= archive_discover;
175
/* When the engine starts up set the first version */
178
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
180
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
181
(hash_get_key) archive_get_key, 0, 0))
183
VOID(pthread_mutex_destroy(&archive_mutex));
194
Release the archive handler.
204
int archive_db_done(void *p)
206
hash_free(&archive_open_tables);
207
VOID(pthread_mutex_destroy(&archive_mutex));
213
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
214
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
216
/* Set our original buffer from pre-allocated memory */
217
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
219
/* The size of the offset value we will use for position() */
220
ref_length= sizeof(my_off_t);
221
archive_reader_open= FALSE;
224
int archive_discover(handlerton *hton, THD* thd, const char *db,
229
DBUG_ENTER("archive_discover");
230
DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
231
azio_stream frm_stream;
232
char az_file[FN_REFLEN];
236
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
238
if (!(my_stat(az_file, &file_stat, MYF(0))))
241
if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
243
if (errno == EROFS || errno == EACCES)
244
DBUG_RETURN(my_errno= errno);
245
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
248
if (frm_stream.frm_length == 0)
251
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
252
azread_frm(&frm_stream, frm_ptr);
253
azclose(&frm_stream);
255
*frmlen= frm_stream.frm_length;
256
*frmblob= (uchar*) frm_ptr;
265
This method reads the header of a datafile and returns whether or not it was successful.
267
int ha_archive::read_data_header(azio_stream *file_to_read)
269
DBUG_ENTER("ha_archive::read_data_header");
271
if (azread_init(file_to_read) == -1)
272
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
274
if (file_to_read->version >= 3)
282
We create the shared memory space that we will use for the open table.
283
No matter what we try to get or create a share. This is so that a repair
284
table operation can occur.
286
See ha_example.cc for a longer description.
288
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
291
DBUG_ENTER("ha_archive::get_share");
293
pthread_mutex_lock(&archive_mutex);
294
length=(uint) strlen(table_name);
296
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
301
azio_stream archive_tmp;
303
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
304
&share, sizeof(*share),
308
pthread_mutex_unlock(&archive_mutex);
309
*rc= HA_ERR_OUT_OF_MEM;
314
share->table_name_length= length;
315
share->table_name= tmp_name;
316
share->crashed= FALSE;
317
share->archive_write_open= FALSE;
318
fn_format(share->data_file_name, table_name, "",
319
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
320
strmov(share->table_name, table_name);
321
DBUG_PRINT("ha_archive", ("Data File %s",
322
share->data_file_name));
324
We will use this lock for rows.
326
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
329
We read the meta file, but do not mark it dirty. Since we are not
330
doing a write we won't mark it dirty (and we won't open it for
331
anything but reading... open it for write and we will generate null
334
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
337
VOID(pthread_mutex_destroy(&share->mutex));
339
pthread_mutex_unlock(&archive_mutex);
340
*rc= HA_ERR_CRASHED_ON_REPAIR;
343
stats.auto_increment_value= archive_tmp.auto_increment + 1;
344
share->rows_recorded= (ha_rows)archive_tmp.rows;
345
share->crashed= archive_tmp.dirty;
346
if (share->version < global_version)
348
share->version_rows= share->rows_recorded;
349
share->version= global_version;
351
azclose(&archive_tmp);
353
VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
354
thr_lock_init(&share->lock);
357
DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now",
358
share->table_name_length, share->table_name,
361
*rc= HA_ERR_CRASHED_ON_USAGE;
362
pthread_mutex_unlock(&archive_mutex);
370
See ha_example.cc for a description.
372
int ha_archive::free_share()
375
DBUG_ENTER("ha_archive::free_share");
376
DBUG_PRINT("ha_archive",
377
("archive table %.*s has %d open handles on entrance",
378
share->table_name_length, share->table_name,
381
pthread_mutex_lock(&archive_mutex);
382
if (!--share->use_count)
384
hash_delete(&archive_open_tables, (uchar*) share);
385
thr_lock_delete(&share->lock);
386
VOID(pthread_mutex_destroy(&share->mutex));
388
We need to make sure we don't reset the crashed state.
389
If we open a crashed file, wee need to close it as crashed unless
390
it has been repaired.
391
Since we will close the data down after this, we go on and count
394
if (share->archive_write_open == TRUE)
396
if (azclose(&(share->archive_write)))
399
my_free((uchar*) share, MYF(0));
401
pthread_mutex_unlock(&archive_mutex);
406
int ha_archive::init_archive_writer()
408
DBUG_ENTER("ha_archive::init_archive_writer");
410
It is expensive to open and close the data files and since you can't have
411
a gzip file that can be both read and written we keep a writer open
412
that is shared amoung all open tables.
414
if (!(azopen(&(share->archive_write), share->data_file_name,
415
O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
417
DBUG_PRINT("ha_archive", ("Could not open archive write file"));
418
share->crashed= TRUE;
421
share->archive_write_open= TRUE;
428
No locks are required because it is associated with just one handler instance
430
int ha_archive::init_archive_reader()
432
DBUG_ENTER("ha_archive::init_archive_reader");
434
It is expensive to open and close the data files and since you can't have
435
a gzip file that can be both read and written we keep a writer open
436
that is shared amoung all open tables.
438
if (archive_reader_open == FALSE)
442
switch (archive_use_aio)
445
method= AZ_METHOD_BLOCK;
448
method= AZ_METHOD_AIO;
451
method= AZ_METHOD_BLOCK;
453
if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY,
456
DBUG_PRINT("ha_archive", ("Could not open archive read file"));
457
share->crashed= TRUE;
460
archive_reader_open= TRUE;
468
We just implement one additional file extension.
470
static const char *ha_archive_exts[] = {
475
const char **ha_archive::bas_ext() const
477
return ha_archive_exts;
482
When opening a file we:
483
Create/get our shared structure.
485
We open the file we will read from.
487
int ha_archive::open(const char *name, int mode, uint open_options)
490
DBUG_ENTER("ha_archive::open");
492
DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
493
(open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
494
share= get_share(name, &rc);
496
if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
498
/* purecov: begin inspected */
503
else if (rc == HA_ERR_OUT_OF_MEM)
510
record_buffer= create_record_buffer(table->s->reclength +
511
ARCHIVE_ROW_HEADER_SIZE);
516
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
519
thr_lock_data_init(&share->lock, &lock, NULL);
521
DBUG_PRINT("ha_archive", ("archive table was crashed %s",
522
rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
523
if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
540
We first close this storage engines file handle to the archive and
541
then remove our reference count to the table (and possibly free it
549
int ha_archive::close(void)
552
DBUG_ENTER("ha_archive::close");
554
destroy_record_buffer(record_buffer);
556
/* First close stream */
557
if (archive_reader_open == TRUE)
559
if (azclose(&archive))
562
/* then also close share */
570
We create our data file here. The format is pretty simple.
571
You can read about the format of the data file above.
572
Unlike other storage engines we do not "pack" our data. Since we
573
are about to do a general compression, packing would just be a waste of
574
CPU time. If the table has blobs they are written after the row in the order
578
int ha_archive::create(const char *name, TABLE *table_arg,
579
HA_CREATE_INFO *create_info)
581
char name_buff[FN_REFLEN];
582
char linkname[FN_REFLEN];
584
azio_stream create_stream; /* Archive file we are working with */
585
File frm_file; /* File handler for readers */
586
MY_STAT file_stat; // Stat information for the data file
589
DBUG_ENTER("ha_archive::create");
591
stats.auto_increment_value= create_info->auto_increment_value;
593
for (uint key= 0; key < table_arg->s->keys; key++)
595
KEY *pos= table_arg->key_info+key;
596
KEY_PART_INFO *key_part= pos->key_part;
597
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
599
for (; key_part != key_part_end; key_part++)
601
Field *field= key_part->field;
603
if (!(field->flags & AUTO_INCREMENT_FLAG))
606
DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
613
We reuse name_buff since it is available.
615
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
617
DBUG_PRINT("ha_archive", ("archive will create stream file %s",
618
create_info->data_file_name));
620
fn_format(name_buff, create_info->data_file_name, "", ARZ,
621
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
622
fn_format(linkname, name, "", ARZ,
623
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
627
fn_format(name_buff, name, "", ARZ,
628
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
633
There is a chance that the file was "discovered". In this case
634
just use whatever file is there.
636
if (!(my_stat(name_buff, &file_stat, MYF(0))))
639
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
647
my_symlink(name_buff, linkname, MYF(0));
648
fn_format(name_buff, name, "", ".frm",
649
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
652
Here is where we open up the frm and pass it to archive to store
654
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
656
if (!my_fstat(frm_file, &file_stat, MYF(MY_WME)))
658
frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
661
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
662
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
663
my_free((uchar*)frm_ptr, MYF(0));
666
my_close(frm_file, MYF(0));
669
if (create_info->comment.str)
670
azwrite_comment(&create_stream, create_info->comment.str,
671
(unsigned int)create_info->comment.length);
674
Yes you need to do this, because the starting value
675
for the autoincrement may not be zero.
677
create_stream.auto_increment= stats.auto_increment_value ?
678
stats.auto_increment_value - 1 : 0;
679
if (azclose(&create_stream))
688
DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
689
DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
697
/* Return error number, if we got one */
698
DBUG_RETURN(error ? error : -1);
702
This is where the actual row is written out.
704
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
707
unsigned int r_pack_length;
708
DBUG_ENTER("ha_archive::real_write_row");
710
/* We pack the row for writing */
711
r_pack_length= pack_row(buf);
713
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
714
if (written != r_pack_length)
716
DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
718
(uint32)r_pack_length));
722
if (!delayed_insert || !bulk_insert)
730
Calculate max length needed for row. This includes
731
the bytes required for the length in the header.
734
uint32 ha_archive::max_row_length(const uchar *buf)
736
uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
737
length+= ARCHIVE_ROW_HEADER_SIZE;
740
for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
744
length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
751
unsigned int ha_archive::pack_row(uchar *record)
755
DBUG_ENTER("ha_archive::pack_row");
758
if (fix_rec_buff(max_row_length(record)))
759
DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
762
memcpy(record_buffer->buffer, record, table->s->null_bytes);
763
ptr= record_buffer->buffer + table->s->null_bytes;
765
for (Field **field=table->field ; *field ; field++)
767
if (!((*field)->is_null()))
768
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
771
DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
772
(ptr - record_buffer->buffer -
773
ARCHIVE_ROW_HEADER_SIZE)));
775
DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
780
Look at ha_archive::open() for an explanation of the row format.
781
Here we just write out the row.
783
Wondering about start_bulk_insert()? We don't implement it for
784
archive since it optimizes for lots of writes. The only save
785
for implementing start_bulk_insert() is that we could skip
786
setting dirty to true each time.
788
int ha_archive::write_row(uchar *buf)
791
uchar *read_buf= NULL;
793
uchar *record= table->record[0];
794
DBUG_ENTER("ha_archive::write_row");
797
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
799
ha_statistic_increment(&SSV::ha_write_count);
800
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
801
table->timestamp_field->set_time();
802
pthread_mutex_lock(&share->mutex);
804
if (share->archive_write_open == FALSE)
805
if (init_archive_writer())
806
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
809
if (table->next_number_field && record == table->record[0])
811
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
812
update_auto_increment();
813
temp_auto= table->next_number_field->val_int();
816
We don't support decremening auto_increment. They make the performance
819
if (temp_auto <= share->archive_write.auto_increment &&
820
mkey->flags & HA_NOSAME)
822
rc= HA_ERR_FOUND_DUPP_KEY;
827
Bad news, this will cause a search for the unique value which is very
828
expensive since we will have to do a table scan which will lock up
829
all other writers during this period. This could perhaps be optimized
834
First we create a buffer that we can use for reading rows, and can pass
837
if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
839
rc= HA_ERR_OUT_OF_MEM;
843
All of the buffer must be written out or we won't see all of the
846
azflush(&(share->archive_write), Z_SYNC_FLUSH);
848
Set the position of the local read thread to the beginning postion.
850
if (read_data_header(&archive))
852
rc= HA_ERR_CRASHED_ON_USAGE;
856
Field *mfield= table->next_number_field;
858
while (!(get_row(&archive, read_buf)))
860
if (!memcmp(read_buf + mfield->offset(record),
861
table->next_number_field->ptr,
862
mfield->max_display_length()))
864
rc= HA_ERR_FOUND_DUPP_KEY;
872
if (temp_auto > share->archive_write.auto_increment)
873
stats.auto_increment_value=
874
(share->archive_write.auto_increment= temp_auto) + 1;
879
Notice that the global auto_increment has been increased.
880
In case of a failed row write, we will never try to reuse the value.
882
share->rows_recorded++;
883
rc= real_write_row(buf, &(share->archive_write));
885
pthread_mutex_unlock(&share->mutex);
887
my_free((uchar*) read_buf, MYF(0));
893
void ha_archive::get_auto_increment(uint64_t offset, uint64_t increment,
894
uint64_t nb_desired_values,
895
uint64_t *first_value,
896
uint64_t *nb_reserved_values)
898
*nb_reserved_values= ULONGLONG_MAX;
899
*first_value= share->archive_write.auto_increment + 1;
902
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
903
int ha_archive::index_init(uint keynr, bool sorted)
905
DBUG_ENTER("ha_archive::index_init");
912
No indexes, so if we get a request for an index search since we tell
913
the optimizer that we have unique indexes, we scan
915
int ha_archive::index_read(uchar *buf, const uchar *key,
916
uint key_len, enum ha_rkey_function find_flag)
919
DBUG_ENTER("ha_archive::index_read");
920
rc= index_read_idx(buf, active_index, key, key_len, find_flag);
925
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
926
uint key_len, enum ha_rkey_function find_flag)
930
KEY *mkey= &table->s->key_info[index];
931
current_k_offset= mkey->key_part->offset;
933
current_key_len= key_len;
936
DBUG_ENTER("ha_archive::index_read_idx");
943
while (!(get_row(&archive, buf)))
945
if (!memcmp(current_key, buf + current_k_offset, current_key_len))
956
DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
960
int ha_archive::index_next(uchar * buf)
964
DBUG_ENTER("ha_archive::index_next");
966
while (!(get_row(&archive, buf)))
968
if (!memcmp(current_key, buf+current_k_offset, current_key_len))
975
DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE);
979
All calls that need to scan the table start with this method. If we are told
980
that it is a table scan we rewind the file to the beginning, otherwise
981
we assume the position will be set.
984
int ha_archive::rnd_init(bool scan)
986
DBUG_ENTER("ha_archive::rnd_init");
989
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
991
init_archive_reader();
993
/* We rewind the file so that we can read from the beginning if scan */
996
DBUG_PRINT("info", ("archive will retrieve %llu rows",
997
(unsigned long long) scan_rows));
999
if (read_data_header(&archive))
1000
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1008
This is the method that is used to read a row. It assumes that the row is
1009
positioned where you want it.
1011
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1014
DBUG_ENTER("ha_archive::get_row");
1015
DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1016
(uchar)file_to_read->version,
1018
if (file_to_read->version == ARCHIVE_VERSION)
1019
rc= get_row_version3(file_to_read, buf);
1023
DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1028
/* Reallocate buffer if needed */
1029
bool ha_archive::fix_rec_buff(unsigned int length)
1031
DBUG_ENTER("ha_archive::fix_rec_buff");
1032
DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1033
length, record_buffer->length));
1034
DBUG_ASSERT(record_buffer->buffer);
1036
if (length > record_buffer->length)
1039
if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1041
MYF(MY_ALLOW_ZERO_PTR))))
1043
record_buffer->buffer= newptr;
1044
record_buffer->length= length;
1047
DBUG_ASSERT(length <= record_buffer->length);
1052
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1054
DBUG_ENTER("ha_archive::unpack_row");
1060
read= azread_row(file_to_read, &error);
1061
ptr= (const uchar *)file_to_read->row_ptr;
1063
if (error || read == 0)
1068
/* Copy null bits */
1069
memcpy(record, ptr, table->s->null_bytes);
1070
ptr+= table->s->null_bytes;
1071
for (Field **field=table->field ; *field ; field++)
1073
if (!((*field)->is_null()))
1075
ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1082
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1084
DBUG_ENTER("ha_archive::get_row_version3");
1086
int returnable= unpack_row(file_to_read, buf);
1088
DBUG_RETURN(returnable);
1093
Called during ORDER BY. Its position is either from being called sequentially
1094
or by having had ha_archive::rnd_pos() called before it is called.
1097
int ha_archive::rnd_next(uchar *buf)
1100
DBUG_ENTER("ha_archive::rnd_next");
1103
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1106
DBUG_RETURN(HA_ERR_END_OF_FILE);
1109
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1110
current_position= aztell(&archive);
1111
rc= get_row(&archive, buf);
1113
table->status=rc ? STATUS_NOT_FOUND: 0;
1120
Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1121
each call to ha_archive::rnd_next() if an ordering of the rows is
1125
void ha_archive::position(const uchar *record)
1127
DBUG_ENTER("ha_archive::position");
1128
my_store_ptr(ref, ref_length, current_position);
1134
This is called after a table scan for each row if the results of the
1135
scan need to be ordered. It will take *pos and use it to move the
1136
cursor in the file so that the next row that is called is the
1137
correctly ordered row.
1140
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1142
DBUG_ENTER("ha_archive::rnd_pos");
1143
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1144
current_position= (my_off_t)my_get_ptr(pos, ref_length);
1145
if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
1146
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1147
DBUG_RETURN(get_row(&archive, buf));
1151
This method repairs the meta file. It does this by walking the datafile and
1152
rewriting the meta file. Currently it does this by calling optimize with
1155
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1157
DBUG_ENTER("ha_archive::repair");
1158
check_opt->flags= T_EXTEND;
1159
int rc= optimize(thd, check_opt);
1162
DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1164
share->crashed= FALSE;
1169
The table can become fragmented if data was inserted, read, and then
1170
inserted again. What we do is open up the file and recompress it completely.
1172
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1174
DBUG_ENTER("ha_archive::optimize");
1177
char writer_filename[FN_REFLEN];
1179
init_archive_reader();
1181
// now we close both our writer and our reader for the rename
1182
if (share->archive_write_open)
1184
azclose(&(share->archive_write));
1185
share->archive_write_open= FALSE;
1188
/* Lets create a file to contain the new data */
1189
fn_format(writer_filename, share->table_name, "", ARN,
1190
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1192
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1193
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1196
An extended rebuild is a lot more effort. We open up each row and re-record it.
1197
Any dead rows are removed (aka rows that may have been partially recorded).
1199
As of Archive format 3, this is the only type that is performed, before this
1200
version it was just done on T_EXTEND
1204
DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1207
Now we will rewind the archive file so that we are positioned at the
1210
azflush(&archive, Z_SYNC_FLUSH);
1211
rc= read_data_header(&archive);
1214
On success of writing out the new header, we now fetch each row and
1215
insert it into the new archive file.
1219
unsigned long long x;
1220
unsigned long long rows_restored;
1221
share->rows_recorded= 0;
1222
stats.auto_increment_value= 1;
1223
share->archive_write.auto_increment= 0;
1224
my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1226
rows_restored= archive.rows;
1228
for (x= 0; x < rows_restored ; x++)
1230
rc= get_row(&archive, table->record[0]);
1235
real_write_row(table->record[0], &writer);
1237
Long term it should be possible to optimize this so that
1238
it is not called on each row.
1240
if (table->found_next_number_field)
1242
Field *field= table->found_next_number_field;
1243
uint64_t auto_value=
1244
(uint64_t) field->val_int(table->record[0] +
1245
field->offset(table->record[0]));
1246
if (share->archive_write.auto_increment < auto_value)
1247
stats.auto_increment_value=
1248
(share->archive_write.auto_increment= auto_value) + 1;
1251
dbug_tmp_restore_column_map(table->read_set, org_bitmap);
1252
share->rows_recorded= (ha_rows)writer.rows;
1255
DBUG_PRINT("info", ("recovered %llu archive rows",
1256
(unsigned long long)share->rows_recorded));
1258
DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1259
(unsigned long long)share->rows_recorded));
1261
if (rc && rc != HA_ERR_END_OF_FILE)
1268
share->dirty= FALSE;
1272
// make the file we just wrote be our data file
1273
rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1278
DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1285
Below is an example of how to setup row level locking.
1287
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1289
enum thr_lock_type lock_type)
1291
if (lock_type == TL_WRITE_DELAYED)
1292
delayed_insert= TRUE;
1294
delayed_insert= FALSE;
1296
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1299
Here is where we get into the guts of a row level lock.
1301
If we are not doing a LOCK TABLE or DISCARD/IMPORT
1302
TABLESPACE, then allow multiple writers
1305
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1306
lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1307
&& !thd_tablespace_op(thd))
1308
lock_type = TL_WRITE_ALLOW_WRITE;
1311
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1312
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1313
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1314
to t2. Convert the lock to a normal read lock to allow
1315
concurrent inserts to t2.
1318
if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1319
lock_type = TL_READ;
1321
lock.type=lock_type;
1329
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1331
DBUG_ENTER("ha_archive::update_create_info");
1333
ha_archive::info(HA_STATUS_AUTO);
1334
if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1336
create_info->auto_increment_value= stats.auto_increment_value;
1339
if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
1340
create_info->data_file_name= share->real_path;
1347
Hints for optimizer, see ha_tina for more information
1349
int ha_archive::info(uint flag)
1351
DBUG_ENTER("ha_archive::info");
1354
If dirty, we lock, and then reset/flush the data.
1355
I found that just calling azflush() doesn't always work.
1357
pthread_mutex_lock(&share->mutex);
1358
if (share->dirty == TRUE)
1360
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1361
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1362
share->rows_recorded= share->archive_write.rows;
1363
share->dirty= FALSE;
1364
if (share->version < global_version)
1366
share->version_rows= share->rows_recorded;
1367
share->version= global_version;
1373
This should be an accurate number now, though bulk and delayed inserts can
1374
cause the number to be inaccurate.
1376
stats.records= share->rows_recorded;
1377
pthread_mutex_unlock(&share->mutex);
1379
scan_rows= stats.records;
1382
DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1383
/* Costs quite a bit more to get all information */
1384
if (flag & HA_STATUS_TIME)
1386
MY_STAT file_stat; // Stat information for the data file
1388
VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));
1390
stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
1391
stats.data_file_length= file_stat.st_size;
1392
stats.create_time= file_stat.st_ctime;
1393
stats.update_time= file_stat.st_mtime;
1394
stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1396
stats.delete_length= 0;
1397
stats.index_file_length=0;
1399
if (flag & HA_STATUS_AUTO)
1401
init_archive_reader();
1402
pthread_mutex_lock(&share->mutex);
1403
azflush(&archive, Z_SYNC_FLUSH);
1404
pthread_mutex_unlock(&share->mutex);
1405
stats.auto_increment_value= archive.auto_increment + 1;
1413
This method tells us that a bulk insert operation is about to occur. We set
1414
a flag which will keep write_row from saying that its data is dirty. This in
1415
turn will keep selects from causing a sync to occur.
1416
Basically, yet another optimizations to keep compression working well.
1418
void ha_archive::start_bulk_insert(ha_rows rows)
1420
DBUG_ENTER("ha_archive::start_bulk_insert");
1421
if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1428
Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1429
flag, and set the share dirty so that the next select will call sync for us.
1431
int ha_archive::end_bulk_insert()
1433
DBUG_ENTER("ha_archive::end_bulk_insert");
1440
We cancel a truncate command. The only way to delete an archive table is to drop it.
1441
This is done for security reasons. In a later version we will enable this by
1442
allowing the user to select a different row format.
1444
int ha_archive::delete_all_rows()
1446
DBUG_ENTER("ha_archive::delete_all_rows");
1447
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1451
We just return state if asked.
1453
bool ha_archive::is_crashed() const
1455
DBUG_ENTER("ha_archive::is_crashed");
1456
DBUG_RETURN(share->crashed);
1460
Simple scan of the tables to make sure everything is ok.
1463
int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1466
const char *old_proc_info;
1467
unsigned long long x;
1468
DBUG_ENTER("ha_archive::check");
1470
old_proc_info= thd_proc_info(thd, "Checking table");
1471
/* Flush any waiting data */
1472
pthread_mutex_lock(&share->mutex);
1473
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1474
pthread_mutex_unlock(&share->mutex);
1477
Now we will rewind the archive file so that we are positioned at the
1480
init_archive_reader();
1481
azflush(&archive, Z_SYNC_FLUSH);
1482
read_data_header(&archive);
1483
for (x= 0; x < share->archive_write.rows; x++)
1485
rc= get_row(&archive, table->record[0]);
1491
thd_proc_info(thd, old_proc_info);
1493
if ((rc && rc != HA_ERR_END_OF_FILE))
1495
share->crashed= FALSE;
1496
DBUG_RETURN(HA_ADMIN_CORRUPT);
1500
DBUG_RETURN(HA_ADMIN_OK);
1505
Check and repair the table if needed.
1507
bool ha_archive::check_and_repair(THD *thd)
1509
HA_CHECK_OPT check_opt;
1510
DBUG_ENTER("ha_archive::check_and_repair");
1514
DBUG_RETURN(repair(thd, &check_opt));
1517
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1519
DBUG_ENTER("ha_archive::create_record_buffer");
1520
archive_record_buffer *r;
1522
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1525
DBUG_RETURN(NULL); /* purecov: inspected */
1527
r->length= (int)length;
1529
if (!(r->buffer= (uchar*) my_malloc(r->length,
1532
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1533
DBUG_RETURN(NULL); /* purecov: inspected */
1539
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1541
DBUG_ENTER("ha_archive::destroy_record_buffer");
1542
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1543
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1547
static MYSQL_SYSVAR_BOOL(aio, archive_use_aio,
1548
PLUGIN_VAR_NOCMDOPT,
1549
"Whether or not to use asynchronous IO.",
1552
static struct st_mysql_sys_var* archive_system_variables[]= {
1557
struct st_mysql_storage_engine archive_storage_engine=
1558
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
1560
mysql_declare_plugin(archive)
1562
MYSQL_STORAGE_ENGINE_PLUGIN,
1563
&archive_storage_engine,
1565
"Brian Aker, MySQL AB",
1566
"Archive storage engine",
1568
archive_db_init, /* Plugin Init */
1569
archive_db_done, /* Plugin Deinit */
1571
NULL, /* status variables */
1572
archive_system_variables, /* system variables */
1573
NULL /* config options */
1575
mysql_declare_plugin_end;