13
13
along with this program; if not, write to the Free Software
14
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17
#pragma implementation // gcc: Class implementation
20
#include <drizzled/common_includes.h>
21
#include <storage/myisam/myisam.h>
17
#include "drizzled/server_includes.h"
18
#include "drizzled/field.h"
19
#include "drizzled/field/blob.h"
20
#include "drizzled/field/timestamp.h"
21
#include "plugin/myisam/myisam.h"
22
#include "drizzled/table.h"
23
#include "drizzled/session.h"
23
25
#include "ha_archive.h"
33
static const string engine_name("ARCHIVE");
26
First, if you want to understand storage engines you should look at
27
ha_example.cc and ha_example.h.
36
First, if you want to understand storage engines you should look at
37
ha_example.cc and ha_example.h.
29
39
This example was written as a test case for a customer who needed
30
40
a storage engine without indexes that could compress data very well.
31
41
So, welcome to a completely compressed storage engine. This storage
32
engine only does inserts. No replace, deletes, or updates. All reads are
42
engine only does inserts. No replace, deletes, or updates. All reads are
33
43
complete table scans. Compression is done through a combination of packing
34
44
and making use of the zlib library
36
46
We keep a file pointer open for each instance of ha_archive for each read
37
47
but for writes we keep one open file handle just for that. We flush it
38
48
only if we have a read occur. azip handles compressing lots of records
42
52
the same time since we would want to flush).
44
54
A "meta" file is kept alongside the data file. This file serves two purpose.
45
The first purpose is to track the number of rows in the table. The second
46
purpose is to determine if the table was closed properly or not. When the
47
meta file is first opened it is marked as dirty. It is opened when the table
48
itself is opened for writing. When the table is closed the new count for rows
49
is written to the meta file and the file is marked as clean. If the meta file
50
is opened and it is marked as dirty, it is assumed that a crash occured. At
55
The first purpose is to track the number of rows in the table. The second
56
purpose is to determine if the table was closed properly or not. When the
57
meta file is first opened it is marked as dirty. It is opened when the table
58
itself is opened for writing. When the table is closed the new count for rows
59
is written to the meta file and the file is marked as clean. If the meta file
60
is opened and it is marked as dirty, it is assumed that a crash occured. At
51
61
this point an error occurs and the user is told to rebuild the file.
52
62
A rebuild scans the rows and rewrites the meta file. If corruption is found
53
63
in the data file then the meta file is not repaired.
55
65
At some point a recovery method for such a drastic case needs to be divised.
57
Locks are row level, and you will get a consistant read.
67
Locks are row level, and you will get a consistant read.
59
69
For performance as far as table scans go it is quite fast. I don't have
60
70
good numbers but locally it has out performed both Innodb and MyISAM. For
61
71
Innodb the question will be if the table can be fit into the buffer
62
72
pool. For MyISAM its a question of how much the file system caches the
63
73
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
64
doesn't have enough memory to cache entire table that archive turns out
74
doesn't have enough memory to cache entire table that archive turns out
67
77
Examples between MyISAM (packed) and Archive.
93
103
/* Variables for archive share methods */
94
pthread_mutex_t archive_mutex;
95
static HASH archive_open_tables;
104
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
106
std::map<const char *, ArchiveShare *> archive_open_tables;
96
108
static unsigned int global_version;
98
110
/* The file extension */
99
111
#define ARZ ".ARZ" // The data file
100
112
#define ARN ".ARN" // Files used during an optimize call
101
#define ARM ".ARM" // Meta file (deprecated)
104
unsigned char + unsigned char
106
#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
107
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
109
/* Static declarations for handerton */
110
static handler *archive_create_handler(handlerton *hton,
113
int archive_discover(handlerton *hton, THD* thd, const char *db,
115
unsigned char **frmblob,
118
116
static bool archive_use_aio= false;
128
126
#define ARCHIVE_ROW_HEADER_SIZE 4
130
static handler *archive_create_handler(handlerton *hton,
134
return new (mem_root) ha_archive(hton, table);
138
Used for hash table that tracks open tables.
140
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141
bool not_used __attribute__((unused)))
143
*length=share->table_name_length;
144
return (unsigned char*) share->table_name;
128
class ArchiveEngine : public StorageEngine
131
ArchiveEngine(const string &name_arg) : StorageEngine(name_arg) {}
132
virtual handler *create(TableShare *table,
135
return new (mem_root) ha_archive(this, table);
139
static ArchiveEngine *archive_engine= NULL;
149
142
Initialize the archive handler.
160
int archive_db_init(void *p)
153
int archive_db_init(PluginRegistry ®istry)
162
handlerton *archive_hton;
164
archive_hton= (handlerton *)p;
165
archive_hton->state= SHOW_OPTION_YES;
166
archive_hton->create= archive_create_handler;
167
archive_hton->flags= HTON_NO_FLAGS;
168
archive_hton->discover= archive_discover;
156
pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
157
archive_engine= new ArchiveEngine(engine_name);
158
registry.add(archive_engine);
170
160
/* When the engine starts up set the first version */
171
161
global_version= 1;
173
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
175
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
176
(hash_get_key) archive_get_key, 0, 0))
178
pthread_mutex_destroy(&archive_mutex);
199
int archive_db_done(void *p __attribute__((unused)))
177
int archive_db_done(PluginRegistry ®istry)
201
hash_free(&archive_open_tables);
179
registry.remove(archive_engine);
180
delete archive_engine;
202
182
pthread_mutex_destroy(&archive_mutex);
208
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
209
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
188
ha_archive::ha_archive(StorageEngine *engine_arg, TableShare *table_arg)
189
:handler(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
211
191
/* Set our original buffer from pre-allocated memory */
212
192
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
216
196
archive_reader_open= false;
219
int archive_discover(handlerton *hton __attribute__((unused)),
220
THD* thd __attribute__((unused)),
223
unsigned char **frmblob,
226
azio_stream frm_stream;
227
char az_file[FN_REFLEN];
229
struct stat file_stat;
231
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
233
if (stat(az_file, &file_stat))
236
if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
238
if (errno == EROFS || errno == EACCES)
239
return(my_errno= errno);
240
return(HA_ERR_CRASHED_ON_USAGE);
243
if (frm_stream.frm_length == 0)
246
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
247
azread_frm(&frm_stream, frm_ptr);
248
azclose(&frm_stream);
250
*frmlen= frm_stream.frm_length;
251
*frmblob= (unsigned char*) frm_ptr;
260
200
This method reads the header of a datafile and returns whether or not it was successful.
213
ArchiveShare::ArchiveShare():
214
use_count(0), archive_write_open(false), dirty(false), crashed(false),
215
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
220
ArchiveShare::ArchiveShare(const char *name):
221
use_count(0), archive_write_open(false), dirty(false), crashed(false),
222
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
224
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
225
table_name.append(name);
226
fn_format(data_file_name, table_name.c_str(), "",
227
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
229
We will use this lock for rows.
231
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
234
ArchiveShare::~ArchiveShare()
236
thr_lock_delete(&lock);
237
pthread_mutex_destroy(&mutex);
239
We need to make sure we don't reset the crashed state.
240
If we open a crashed file, wee need to close it as crashed unless
241
it has been repaired.
242
Since we will close the data down after this, we go on and count
245
if (archive_write_open == true)
246
(void)azclose(&archive_write);
249
bool ArchiveShare::prime(uint64_t *auto_increment)
251
azio_stream archive_tmp;
254
We read the meta file, but do not mark it dirty. Since we are not
255
doing a write we won't mark it dirty (and we won't open it for
256
anything but reading... open it for write and we will generate null
259
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
263
*auto_increment= archive_tmp.auto_increment + 1;
264
rows_recorded= (ha_rows)archive_tmp.rows;
265
crashed= archive_tmp.dirty;
266
if (version < global_version)
268
version_rows= rows_recorded;
269
version= global_version;
271
azclose(&archive_tmp);
275
We create the shared memory space that we will use for the open table.
278
We create the shared memory space that we will use for the open table.
276
279
No matter what we try to get or create a share. This is so that a repair
277
table operation can occur.
280
table operation can occur.
279
282
See ha_example.cc for a longer description.
281
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
284
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
287
map<const char *, ArchiveShare *> ::iterator find_iter;
285
289
pthread_mutex_lock(&archive_mutex);
286
290
length=(uint) strlen(table_name);
288
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
289
(unsigned char*) table_name,
292
find_iter= archive_open_tables.find(table_name);
294
if (find_iter != archive_open_tables.end())
295
share= (*find_iter).second;
293
azio_stream archive_tmp;
301
share= new ArchiveShare(table_name);
295
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
296
&share, sizeof(*share),
300
305
pthread_mutex_unlock(&archive_mutex);
301
306
*rc= HA_ERR_OUT_OF_MEM;
306
share->table_name_length= length;
307
share->table_name= tmp_name;
308
share->crashed= false;
309
share->archive_write_open= false;
310
fn_format(share->data_file_name, table_name, "",
311
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
312
my_stpcpy(share->table_name, table_name);
314
We will use this lock for rows.
316
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
319
We read the meta file, but do not mark it dirty. Since we are not
320
doing a write we won't mark it dirty (and we won't open it for
321
anything but reading... open it for write and we will generate null
324
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
310
if (share->prime(&stats.auto_increment_value) == false)
327
pthread_mutex_destroy(&share->mutex);
329
312
pthread_mutex_unlock(&archive_mutex);
330
313
*rc= HA_ERR_CRASHED_ON_REPAIR;
333
stats.auto_increment_value= archive_tmp.auto_increment + 1;
334
share->rows_recorded= (ha_rows)archive_tmp.rows;
335
share->crashed= archive_tmp.dirty;
336
if (share->version < global_version)
338
share->version_rows= share->rows_recorded;
339
share->version= global_version;
341
azclose(&archive_tmp);
343
my_hash_insert(&archive_open_tables, (unsigned char*) share);
319
archive_open_tables[share->table_name.c_str()]= share;
344
320
thr_lock_init(&share->lock);
346
322
share->use_count++;
357
333
See ha_example.cc for a description.
359
335
int ha_archive::free_share()
363
337
pthread_mutex_lock(&archive_mutex);
364
338
if (!--share->use_count)
366
hash_delete(&archive_open_tables, (unsigned char*) share);
367
thr_lock_delete(&share->lock);
368
pthread_mutex_destroy(&share->mutex);
370
We need to make sure we don't reset the crashed state.
371
If we open a crashed file, wee need to close it as crashed unless
372
it has been repaired.
373
Since we will close the data down after this, we go on and count
376
if (share->archive_write_open == true)
378
if (azclose(&(share->archive_write)))
381
free((unsigned char*) share);
340
archive_open_tables.erase(share->table_name.c_str());
383
343
pthread_mutex_unlock(&archive_mutex);
388
348
int ha_archive::init_archive_writer()
391
351
It is expensive to open and close the data files and since you can't have
392
352
a gzip file that can be both read and written we keep a writer open
393
353
that is shared amoung all open tables.
395
if (!(azopen(&(share->archive_write), share->data_file_name,
396
O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
355
if (!(azopen(&(share->archive_write), share->data_file_name,
356
O_RDWR, AZ_METHOD_BLOCK)))
398
358
share->crashed= true;
600
559
There is a chance that the file was "discovered". In this case
601
560
just use whatever file is there.
603
if (!stat(name_buff, &file_stat))
606
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
614
my_symlink(name_buff, linkname, MYF(0));
615
fn_format(name_buff, name, "", ".frm",
616
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
619
Here is where we open up the frm and pass it to archive to store
621
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
623
if (fstat(frm_file, &file_stat))
625
frm_ptr= (unsigned char *)my_malloc(sizeof(unsigned char) * file_stat.st_size, MYF(0));
628
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
629
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
630
free((unsigned char*)frm_ptr);
633
my_close(frm_file, MYF(0));
636
if (create_info->comment.str)
637
azwrite_comment(&create_stream, create_info->comment.str,
638
(unsigned int)create_info->comment.length);
641
Yes you need to do this, because the starting value
642
for the autoincrement may not be zero.
644
create_stream.auto_increment= stats.auto_increment_value ?
645
stats.auto_increment_value - 1 : 0;
646
if (azclose(&create_stream))
562
r= stat(name_buff, &file_stat);
563
if (r == -1 && errno!=ENOENT)
568
return HA_ERR_TABLE_EXIST;
571
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
579
my_symlink(name_buff, linkname, MYF(0));
580
fn_format(name_buff, name, "", ".frm",
581
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
584
Here is where we open up the frm and pass it to archive to store
586
if ((frm_file= fopen(name_buff, "r")) > 0)
588
if (fstat(fileno(frm_file), &file_stat))
590
if ((uint64_t)file_stat.st_size > SIZE_MAX)
595
frm_ptr= (unsigned char *)malloc((size_t)file_stat.st_size);
599
length_io= read(fileno(frm_file), frm_ptr, (size_t)file_stat.st_size);
601
if (length_io != (size_t)file_stat.st_size)
607
length_io= azwrite_frm(&create_stream, (char *)frm_ptr, (size_t)file_stat.st_size);
609
if (length_io != (size_t)file_stat.st_size)
621
if (create_info->comment.str)
625
write_length= azwrite_comment(&create_stream, create_info->comment.str,
626
(unsigned int)create_info->comment.length);
628
if (write_length == (size_t)create_info->comment.length)
633
Yes you need to do this, because the starting value
634
for the autoincrement may not be zero.
636
create_stream.auto_increment= stats.auto_increment_value ?
637
stats.auto_increment_value - 1 : 0;
638
if (azclose(&create_stream))
767
754
We don't support decremening auto_increment. They make the performance
770
if (temp_auto <= share->archive_write.auto_increment &&
757
if (temp_auto <= share->archive_write.auto_increment &&
771
758
mkey->flags & HA_NOSAME)
773
760
rc= HA_ERR_FOUND_DUPP_KEY;
778
Bad news, this will cause a search for the unique value which is very
779
expensive since we will have to do a table scan which will lock up
780
all other writers during this period. This could perhaps be optimized
785
First we create a buffer that we can use for reading rows, and can pass
788
if (!(read_buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
790
rc= HA_ERR_OUT_OF_MEM;
794
All of the buffer must be written out or we won't see all of the
797
azflush(&(share->archive_write), Z_SYNC_FLUSH);
799
Set the position of the local read thread to the beginning postion.
801
if (read_data_header(&archive))
803
rc= HA_ERR_CRASHED_ON_USAGE;
807
Field *mfield= table->next_number_field;
809
while (!(get_row(&archive, read_buf)))
811
if (!memcmp(read_buf + mfield->offset(record),
812
table->next_number_field->ptr,
813
mfield->max_display_length()))
815
rc= HA_ERR_FOUND_DUPP_KEY;
823
765
if (temp_auto > share->archive_write.auto_increment)
844
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
845
uint64_t increment __attribute__((unused)),
846
uint64_t nb_desired_values __attribute__((unused)),
847
uint64_t *first_value __attribute__((unused)),
848
uint64_t *nb_reserved_values __attribute__((unused)))
786
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
787
uint64_t *first_value, uint64_t *nb_reserved_values)
850
789
*nb_reserved_values= UINT64_MAX;
851
790
*first_value= share->archive_write.auto_increment + 1;
854
793
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
855
int ha_archive::index_init(uint32_t keynr, bool sorted __attribute__((unused)))
794
int ha_archive::index_init(uint32_t keynr, bool)
857
796
active_index= keynr;
1113
1048
/* Lets create a file to contain the new data */
1114
fn_format(writer_filename, share->table_name, "", ARN,
1049
fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1115
1050
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1117
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1118
return(HA_ERR_CRASHED_ON_USAGE);
1052
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1053
return(HA_ERR_CRASHED_ON_USAGE);
1121
An extended rebuild is a lot more effort. We open up each row and re-record it.
1122
Any dead rows are removed (aka rows that may have been partially recorded).
1056
An extended rebuild is a lot more effort. We open up each row and re-record it.
1057
Any dead rows are removed (aka rows that may have been partially recorded).
1124
1059
As of Archive format 3, this is the only type that is performed, before this
1125
1060
version it was just done on T_EXTEND
1208
1143
delayed_insert= false;
1210
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1145
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1213
1148
Here is where we get into the guts of a row level lock.
1215
1150
If we are not doing a LOCK Table or DISCARD/IMPORT
1216
TABLESPACE, then allow multiple writers
1151
TABLESPACE, then allow multiple writers
1219
1154
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1220
lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1221
&& !thd_tablespace_op(thd))
1155
lock_type <= TL_WRITE) && !session_in_lock_tables(session)
1156
&& !session_tablespace_op(session))
1222
1157
lock_type = TL_WRITE_ALLOW_WRITE;
1225
1160
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1226
1161
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1227
1162
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1228
1163
to t2. Convert the lock to a normal read lock to allow
1229
concurrent inserts to t2.
1164
concurrent inserts to t2.
1232
if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1167
if (lock_type == TL_READ_NO_INSERT && !session_in_lock_tables(session))
1233
1168
lock_type = TL_READ;
1235
1170
lock.type=lock_type;
1356
1291
We just return state if asked.
1358
bool ha_archive::is_crashed() const
1293
bool ha_archive::is_crashed() const
1360
return(share->crashed);
1295
return(share->crashed);
1364
1299
Simple scan of the tables to make sure everything is ok.
1367
int ha_archive::check(THD* thd,
1368
HA_CHECK_OPT* check_opt __attribute__((unused)))
1302
int ha_archive::check(Session* session, HA_CHECK_OPT *)
1371
1305
const char *old_proc_info;
1374
old_proc_info= thd_proc_info(thd, "Checking table");
1308
old_proc_info= get_session_proc_info(session);
1309
set_session_proc_info(session, "Checking table");
1375
1310
/* Flush any waiting data */
1376
1311
pthread_mutex_lock(&share->mutex);
1377
1312
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1378
1313
pthread_mutex_unlock(&share->mutex);
1381
Now we will rewind the archive file so that we are positioned at the
1316
Now we will rewind the archive file so that we are positioned at the
1382
1317
start of the file.
1384
1319
init_archive_reader();
1409
1344
Check and repair the table if needed.
1411
bool ha_archive::check_and_repair(THD *thd)
1346
bool ha_archive::check_and_repair(Session *session)
1413
1348
HA_CHECK_OPT check_opt;
1415
1350
check_opt.init();
1417
return(repair(thd, &check_opt));
1352
return(repair(session, &check_opt));
1420
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1355
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1422
1357
archive_record_buffer *r;
1424
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1358
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1427
1360
return(NULL); /* purecov: inspected */
1429
1362
r->length= (int)length;
1431
if (!(r->buffer= (unsigned char*) my_malloc(r->length,
1364
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1434
1366
free((char*) r);
1435
1367
return(NULL); /* purecov: inspected */