13
12
You should have received a copy of the GNU General Public License
14
13
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
25
using namespace drizzled;
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <drizzled/common_includes.h>
18
#include <storage/myisam/myisam.h>
20
#include "ha_archive.h"
29
First, if you want to understand storage engines you should look at
30
ha_example.cc and ha_example.h.
23
First, if you want to understand storage engines you should look at
24
ha_example.cc and ha_example.h.
32
26
This example was written as a test case for a customer who needed
33
27
a storage engine without indexes that could compress data very well.
34
28
So, welcome to a completely compressed storage engine. This storage
35
engine only does inserts. No replace, deletes, or updates. All reads are
29
engine only does inserts. No replace, deletes, or updates. All reads are
36
30
complete table scans. Compression is done through a combination of packing
37
31
and making use of the zlib library
39
33
We keep a file pointer open for each instance of ha_archive for each read
40
34
but for writes we keep one open file handle just for that. We flush it
41
35
only if we have a read occur. azip handles compressing lots of records
45
39
the same time since we would want to flush).
47
41
A "meta" file is kept alongside the data file. This file serves two purpose.
48
The first purpose is to track the number of rows in the table. The second
49
purpose is to determine if the table was closed properly or not. When the
50
meta file is first opened it is marked as dirty. It is opened when the table
51
itself is opened for writing. When the table is closed the new count for rows
52
is written to the meta file and the file is marked as clean. If the meta file
53
is opened and it is marked as dirty, it is assumed that a crash occured. At
42
The first purpose is to track the number of rows in the table. The second
43
purpose is to determine if the table was closed properly or not. When the
44
meta file is first opened it is marked as dirty. It is opened when the table
45
itself is opened for writing. When the table is closed the new count for rows
46
is written to the meta file and the file is marked as clean. If the meta file
47
is opened and it is marked as dirty, it is assumed that a crash occured. At
54
48
this point an error occurs and the user is told to rebuild the file.
55
49
A rebuild scans the rows and rewrites the meta file. If corruption is found
56
50
in the data file then the meta file is not repaired.
58
52
At some point a recovery method for such a drastic case needs to be divised.
60
Locks are row level, and you will get a consistant read.
54
Locks are row level, and you will get a consistant read.
62
56
For performance as far as table scans go it is quite fast. I don't have
63
57
good numbers but locally it has out performed both Innodb and MyISAM. For
64
58
Innodb the question will be if the table can be fit into the buffer
65
59
pool. For MyISAM its a question of how much the file system caches the
66
60
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
doesn't have enough memory to cache entire table that archive turns out
61
doesn't have enough memory to cache entire table that archive turns out
70
64
Examples between MyISAM (packed) and Archive.
110
119
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const TableIdentifier &identifier,
152
drizzled::message::Table &table_proto)
154
struct stat stat_info;
158
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
161
proto_path.append(ARZ);
163
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
if (proto_string == NULL)
177
azclose(proto_stream.get());
181
azread_frm(proto_stream.get(), proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
121
static handler *archive_create_handler(handlerton *hton,
125
return new (mem_root) ha_archive(hton, table);
129
Used for hash table that tracks open tables.
131
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
132
bool not_used __attribute__((unused)))
134
*length=share->table_name_length;
135
return (unsigned char*) share->table_name;
140
Initialize the archive handler.
151
int archive_db_init(void *p)
153
handlerton *archive_hton;
155
archive_hton= (handlerton *)p;
156
archive_hton->state= SHOW_OPTION_YES;
157
archive_hton->create= archive_create_handler;
158
archive_hton->flags= HTON_NO_FLAGS;
159
archive_hton->discover= archive_discover;
161
/* When the engine starts up set the first version */
164
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
166
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
167
(hash_get_key) archive_get_key, 0, 0))
169
pthread_mutex_destroy(&archive_mutex);
180
Release the archive handler.
190
int archive_db_done(void *p __attribute__((unused)))
192
hash_free(&archive_open_tables);
193
pthread_mutex_destroy(&archive_mutex);
199
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
200
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
204
202
/* Set our original buffer from pre-allocated memory */
205
203
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
205
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
206
ref_length= sizeof(my_off_t);
209
207
archive_reader_open= false;
210
int archive_discover(handlerton *hton __attribute__((unused)),
211
Session* session __attribute__((unused)),
214
unsigned char **frmblob,
217
azio_stream frm_stream;
218
char az_file[FN_REFLEN];
220
struct stat file_stat;
222
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
224
if (stat(az_file, &file_stat))
227
if (!(azopen(&frm_stream, az_file, O_RDONLY, AZ_METHOD_BLOCK)))
229
if (errno == EROFS || errno == EACCES)
230
return(my_errno= errno);
231
return(HA_ERR_CRASHED_ON_USAGE);
234
if (frm_stream.frm_length == 0)
237
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
238
azread_frm(&frm_stream, frm_ptr);
239
azclose(&frm_stream);
241
*frmlen= frm_stream.frm_length;
242
*frmblob= (unsigned char*) frm_ptr;
213
251
This method reads the header of a datafile and returns whether or not it was successful.
226
ArchiveShare::ArchiveShare():
227
use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
233
ArchiveShare::ArchiveShare(const char *name):
234
use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
237
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
242
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
247
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
252
We need to make sure we don't reset the crashed state.
253
If we open a crashed file, wee need to close it as crashed unless
254
it has been repaired.
255
Since we will close the data down after this, we go on and count
258
if (archive_write_open == true)
259
(void)azclose(&archive_write);
262
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
267
We read the meta file, but do not mark it dirty. Since we are not
268
doing a write we won't mark it dirty (and we won't open it for
269
anything but reading... open it for write and we will generate null
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
279
if (version < global_version)
281
version_rows= rows_recorded;
282
version= global_version;
284
azclose(archive_tmp.get());
291
We create the shared memory space that we will use for the open table.
266
We create the shared memory space that we will use for the open table.
292
267
No matter what we try to get or create a share. This is so that a repair
293
table operation can occur.
268
table operation can occur.
295
270
See ha_example.cc for a longer description.
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
272
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
303
share= a_engine->findOpenTable(table_name);
276
pthread_mutex_lock(&archive_mutex);
277
length=(uint) strlen(table_name);
279
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
280
(unsigned char*) table_name,
307
share= new ArchiveShare(table_name);
284
azio_stream archive_tmp;
286
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
287
&share, sizeof(*share),
311
pthread_mutex_unlock(&a_engine->mutex());
291
pthread_mutex_unlock(&archive_mutex);
312
292
*rc= HA_ERR_OUT_OF_MEM;
316
if (share->prime(&stats.auto_increment_value) == false)
297
share->table_name_length= length;
298
share->table_name= tmp_name;
299
share->crashed= false;
300
share->archive_write_open= false;
301
fn_format(share->data_file_name, table_name, "",
302
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
303
my_stpcpy(share->table_name, table_name);
305
We will use this lock for rows.
307
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
310
We read the meta file, but do not mark it dirty. Since we are not
311
doing a write we won't mark it dirty (and we won't open it for
312
anything but reading... open it for write and we will generate null
315
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY,
318
pthread_mutex_unlock(&a_engine->mutex());
318
pthread_mutex_destroy(&share->mutex);
320
pthread_mutex_unlock(&archive_mutex);
319
321
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
324
stats.auto_increment_value= archive_tmp.auto_increment + 1;
325
share->rows_recorded= (ha_rows)archive_tmp.rows;
326
share->crashed= archive_tmp.dirty;
327
if (share->version < global_version)
329
share->version_rows= share->rows_recorded;
330
share->version= global_version;
332
azclose(&archive_tmp);
334
my_hash_insert(&archive_open_tables, (unsigned char*) share);
335
thr_lock_init(&share->lock);
328
337
share->use_count++;
330
338
if (share->crashed)
331
339
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
340
pthread_mutex_unlock(&archive_mutex);
340
348
See ha_example.cc for a description.
342
350
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
354
pthread_mutex_lock(&archive_mutex);
347
355
if (!--share->use_count)
349
a_engine->deleteOpenTable(share->table_name);
357
hash_delete(&archive_open_tables, (unsigned char*) share);
358
thr_lock_delete(&share->lock);
359
pthread_mutex_destroy(&share->mutex);
361
We need to make sure we don't reset the crashed state.
362
If we open a crashed file, wee need to close it as crashed unless
363
it has been repaired.
364
Since we will close the data down after this, we go on and count
367
if (share->archive_write_open == true)
369
if (azclose(&(share->archive_write)))
372
free((unsigned char*) share);
352
pthread_mutex_unlock(&a_engine->mutex());
374
pthread_mutex_unlock(&archive_mutex);
357
379
int ha_archive::init_archive_writer()
360
382
It is expensive to open and close the data files and since you can't have
361
383
a gzip file that can be both read and written we keep a writer open
362
384
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
386
if (!(azopen(&(share->archive_write), share->data_file_name,
365
387
O_RDWR, AZ_METHOD_BLOCK)))
367
389
share->crashed= true;
492
We create our data file here. The format is pretty simple.
534
We create our data file here. The format is pretty simple.
493
535
You can read about the format of the data file above.
494
Unlike other storage engines we do not "pack" our data. Since we
495
are about to do a general compression, packing would just be a waste of
496
CPU time. If the table has blobs they are written after the row in the order
536
Unlike other storage engines we do not "pack" our data. Since we
537
are about to do a general compression, packing would just be a waste of
538
CPU time. If the table has blobs they are written after the row in the order
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::TableIdentifier &identifier,
503
drizzled::message::Table& proto)
542
int ha_archive::create(const char *name, Table *table_arg,
543
HA_CREATE_INFO *create_info)
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
uint64_t auto_increment_value;
508
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
545
char name_buff[FN_REFLEN];
546
char linkname[FN_REFLEN];
548
azio_stream create_stream; /* Archive file we are working with */
549
File frm_file; /* File handler for readers */
550
struct stat file_stat;
551
unsigned char *frm_ptr;
553
stats.auto_increment_value= create_info->auto_increment_value;
555
for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
557
KEY *pos= table_arg->key_info+key;
558
KEY_PART_INFO *key_part= pos->key_part;
559
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
518
561
for (; key_part != key_part_end; key_part++)
522
565
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
574
We reuse name_buff since it is available.
576
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
578
fn_format(name_buff, create_info->data_file_name, "", ARZ,
579
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
580
fn_format(linkname, name, "", ARZ,
581
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
585
fn_format(name_buff, name, "", ARZ,
586
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
578
Yes you need to do this, because the starting value
579
for the autoincrement may not be zero.
591
There is a chance that the file was "discovered". In this case
592
just use whatever file is there.
581
create_stream->auto_increment= auto_increment_value ?
582
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
594
if (!stat(name_buff, &file_stat))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
597
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
605
my_symlink(name_buff, linkname, MYF(0));
606
fn_format(name_buff, name, "", ".frm",
607
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
610
Here is where we open up the frm and pass it to archive to store
612
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
614
if (fstat(frm_file, &file_stat))
616
frm_ptr= (unsigned char *)my_malloc(sizeof(unsigned char) * file_stat.st_size, MYF(0));
619
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
620
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
621
free((unsigned char*)frm_ptr);
624
my_close(frm_file, MYF(0));
627
if (create_info->comment.str)
628
azwrite_comment(&create_stream, create_info->comment.str,
629
(unsigned int)create_info->comment.length);
632
Yes you need to do this, because the starting value
633
for the autoincrement may not be zero.
635
create_stream.auto_increment= stats.auto_increment_value ?
636
stats.auto_increment_value - 1 : 0;
637
if (azclose(&create_stream))
651
/* Return error number, if we got one */
652
return(error ? error : -1);
643
703
unsigned char *ptr;
645
705
if (fix_rec_buff(max_row_length(record)))
646
return(HA_ERR_OUT_OF_MEM);
706
return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
648
708
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
709
memcpy(record_buffer->buffer, record, table->s->null_bytes);
710
ptr= record_buffer->buffer + table->s->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
712
for (Field **field=table->field ; *field ; field++)
654
714
if (!((*field)->is_null()))
655
715
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
718
return((unsigned int) (ptr - record_buffer->buffer));
663
723
Look at ha_archive::open() for an explanation of the row format.
664
724
Here we just write out the row.
666
726
Wondering about start_bulk_insert()? We don't implement it for
667
727
archive since it optimizes for lots of writes. The only save
668
for implementing start_bulk_insert() is that we could skip
728
for implementing start_bulk_insert() is that we could skip
669
729
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
731
int ha_archive::write_row(unsigned char *buf)
674
734
unsigned char *read_buf= NULL;
675
735
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
736
unsigned char *record= table->record[0];
678
738
if (share->crashed)
679
739
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
741
ha_statistic_increment(&SSV::ha_write_count);
742
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
743
table->timestamp_field->set_time();
744
pthread_mutex_lock(&share->mutex);
683
746
if (share->archive_write_open == false)
684
747
if (init_archive_writer())
685
748
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
751
if (table->next_number_field && record == table->record[0])
753
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
754
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
755
temp_auto= table->next_number_field->val_int();
694
758
We don't support decremening auto_increment. They make the performance
697
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
761
if (temp_auto <= share->archive_write.auto_increment &&
762
mkey->flags & HA_NOSAME)
700
764
rc= HA_ERR_FOUND_DUPP_KEY;
769
Bad news, this will cause a search for the unique value which is very
770
expensive since we will have to do a table scan which will lock up
771
all other writers during this period. This could perhaps be optimized
776
First we create a buffer that we can use for reading rows, and can pass
779
if (!(read_buf= (unsigned char*) my_malloc(table->s->reclength, MYF(MY_WME))))
781
rc= HA_ERR_OUT_OF_MEM;
785
All of the buffer must be written out or we won't see all of the
788
azflush(&(share->archive_write), Z_SYNC_FLUSH);
790
Set the position of the local read thread to the beginning postion.
792
if (read_data_header(&archive))
794
rc= HA_ERR_CRASHED_ON_USAGE;
798
Field *mfield= table->next_number_field;
800
while (!(get_row(&archive, read_buf)))
802
if (!memcmp(read_buf + mfield->offset(record),
803
table->next_number_field->ptr,
804
mfield->max_display_length()))
806
rc= HA_ERR_FOUND_DUPP_KEY;
705
814
if (temp_auto > share->archive_write.auto_increment)
962
1101
share->archive_write_open= false;
966
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
if (proto_string == NULL)
971
azread_frm(&archive, proto_string);
973
1104
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
986
An extended rebuild is a lot more effort. We open up each row and re-record it.
987
Any dead rows are removed (aka rows that may have been partially recorded).
1105
fn_format(writer_filename, share->table_name, "", ARN,
1106
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1108
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1109
return(HA_ERR_CRASHED_ON_USAGE);
1112
An extended rebuild is a lot more effort. We open up each row and re-record it.
1113
Any dead rows are removed (aka rows that may have been partially recorded).
989
1115
As of Archive format 3, this is the only type that is performed, before this
990
1116
version it was just done on T_EXTEND
1012
1139
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1141
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1143
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1148
real_write_row(table->record[0], &writer);
1023
1150
Long term it should be possible to optimize this so that
1024
1151
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1153
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1031
field->setReadSet();
1155
Field *field= table->found_next_number_field;
1033
1156
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1157
(uint64_t) field->val_int(table->record[0] +
1158
field->offset(table->record[0]));
1036
1159
if (share->archive_write.auto_increment < auto_value)
1037
1160
stats.auto_increment_value=
1038
1161
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1164
share->rows_recorded= (ha_rows)writer.rows;
1044
1167
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1174
share->dirty= false;
1053
1176
azclose(&archive);
1055
1178
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1179
rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1062
azclose(writer.get());
1068
1190
Below is an example of how to setup row level locking.
1070
1192
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1071
1193
THR_LOCK_DATA **to,
1072
1194
enum thr_lock_type lock_type)
1074
delayed_insert= false;
1196
if (lock_type == TL_WRITE_DELAYED)
1197
delayed_insert= true;
1199
delayed_insert= false;
1076
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1201
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1079
1204
Here is where we get into the guts of a row level lock.
1081
1206
If we are not doing a LOCK Table or DISCARD/IMPORT
1082
TABLESPACE, then allow multiple writers
1207
TABLESPACE, then allow multiple writers
1085
1210
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
lock_type <= TL_WRITE)
1211
lock_type <= TL_WRITE) && !session_in_lock_tables(session)
1087
1212
&& !session_tablespace_op(session))
1088
1213
lock_type = TL_WRITE_ALLOW_WRITE;
1091
1216
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
1217
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
1218
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
1219
to t2. Convert the lock to a normal read lock to allow
1095
concurrent inserts to t2.
1220
concurrent inserts to t2.
1098
if (lock_type == TL_READ_NO_INSERT)
1223
if (lock_type == TL_READ_NO_INSERT && !session_in_lock_tables(session))
1099
1224
lock_type = TL_READ;
1101
1226
lock.type=lock_type;
1347
We just return state if asked.
1349
bool ha_archive::is_crashed() const
1351
return(share->crashed);
1207
1355
Simple scan of the tables to make sure everything is ok.
1210
int ha_archive::check(Session* session)
1358
int ha_archive::check(Session* session,
1359
HA_CHECK_OPT* check_opt __attribute__((unused)))
1213
1362
const char *old_proc_info;
1215
1365
old_proc_info= get_session_proc_info(session);
1216
1366
set_session_proc_info(session, "Checking table");
1217
1367
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1368
pthread_mutex_lock(&share->mutex);
1219
1369
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1370
pthread_mutex_unlock(&share->mutex);
1223
Now we will rewind the archive file so that we are positioned at the
1373
Now we will rewind the archive file so that we are positioned at the
1224
1374
start of the file.
1226
1376
init_archive_reader();
1227
1377
azflush(&archive, Z_SYNC_FLUSH);
1228
1378
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1379
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1381
rc= get_row(&archive, table->record[0]);
1250
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const TableIdentifier &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::SchemaIdentifier &schema_identifier,
1283
drizzled::TableIdentifier::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1401
Check and repair the table if needed.
1403
bool ha_archive::check_and_repair(Session *session)
1405
HA_CHECK_OPT check_opt;
1409
return(repair(session, &check_opt));
1412
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1414
archive_record_buffer *r;
1416
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1419
return(NULL); /* purecov: inspected */
1421
r->length= (int)length;
1423
if (!(r->buffer= (unsigned char*) my_malloc(r->length,
1427
return(NULL); /* purecov: inspected */
1433
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1435
free((char*) r->buffer);
1440
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1441
PLUGIN_VAR_NOCMDOPT,
1442
"Whether or not to use asynchronous IO.",
1445
static struct st_mysql_sys_var* archive_system_variables[]= {
1446
DRIZZLE_SYSVAR(aio),
1450
mysql_declare_plugin(archive)
1452
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1455
"Brian Aker, MySQL AB",
1456
"Archive storage engine",
1458
archive_db_init, /* Plugin Init */
1459
archive_db_done, /* Plugin Deinit */
1460
NULL, /* status variables */
1461
archive_system_variables, /* system variables */
1462
NULL /* config options */
1464
mysql_declare_plugin_end;