13
12
You should have received a copy of the GNU General Public License
14
13
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include "drizzled/server_includes.h"
18
#include "drizzled/field.h"
19
#include "drizzled/field/blob.h"
20
#include "drizzled/field/timestamp.h"
21
#include "plugin/myisam/myisam.h"
22
#include "drizzled/table.h"
23
#include "drizzled/session.h"
25
#include "ha_archive.h"
24
31
using namespace std;
25
using namespace drizzled;
33
static const string engine_name("ARCHIVE");
29
36
First, if you want to understand storage engines you should look at
96
/* When the engine starts up set the first version */
97
static uint64_t global_version= 1;
99
// We use this to find out the state of the archive aio option.
100
extern bool archive_aio_state(void);
103
/* Variables for archive share methods */
104
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
106
std::map<const char *, ArchiveShare *> archive_open_tables;
108
static unsigned int global_version;
110
/* The file extension */
111
#define ARZ ".ARZ" // The data file
112
#define ARN ".ARN" // Files used during an optimize call
116
static bool archive_use_aio= false;
103
119
Number of rows that will force a bulk insert.
110
126
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const TableIdentifier &identifier,
152
drizzled::message::Table &table_proto)
154
struct stat stat_info;
158
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
161
proto_path.append(ARZ);
163
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
if (proto_string == NULL)
177
azclose(proto_stream.get());
181
azread_frm(proto_stream.get(), proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
129
We just implement one additional file extension.
131
static const char *ha_archive_exts[] = {
136
class ArchiveEngine : public StorageEngine
139
ArchiveEngine(const string &name_arg) : StorageEngine(name_arg) {}
140
virtual handler *create(TableShare *table,
143
return new (mem_root) ha_archive(this, table);
146
const char **bas_ext() const {
147
return ha_archive_exts;
150
int createTableImpl(Session *session, const char *table_name,
151
Table *table_arg, HA_CREATE_INFO *create_info);
154
static ArchiveEngine *archive_engine= NULL;
157
Initialize the archive handler.
168
int archive_db_init(PluginRegistry ®istry)
171
pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
172
archive_engine= new ArchiveEngine(engine_name);
173
registry.add(archive_engine);
175
/* When the engine starts up set the first version */
182
Release the archive handler.
192
int archive_db_done(PluginRegistry ®istry)
194
registry.remove(archive_engine);
195
delete archive_engine;
197
pthread_mutex_destroy(&archive_mutex);
203
ha_archive::ha_archive(StorageEngine *engine_arg, TableShare *table_arg)
204
:handler(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
206
/* Set our original buffer from pre-allocated memory */
205
207
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
209
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
210
ref_length= sizeof(my_off_t);
209
211
archive_reader_open= false;
237
239
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
240
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
241
fn_format(data_file_name, table_name.c_str(), "",
242
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
242
244
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
246
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
247
249
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
251
thr_lock_delete(&lock);
252
pthread_mutex_destroy(&mutex);
252
254
We need to make sure we don't reset the crashed state.
253
255
If we open a crashed file, wee need to close it as crashed unless
269
271
anything but reading... open it for write and we will generate null
270
272
compression writes).
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
274
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
273
275
AZ_METHOD_BLOCK)))
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
278
*auto_increment= archive_tmp.auto_increment + 1;
279
rows_recorded= (ha_rows)archive_tmp.rows;
280
crashed= archive_tmp.dirty;
279
281
if (version < global_version)
281
283
version_rows= rows_recorded;
282
284
version= global_version;
284
azclose(archive_tmp.get());
286
azclose(&archive_tmp);
309
318
if (share == NULL)
311
pthread_mutex_unlock(&a_engine->mutex());
320
pthread_mutex_unlock(&archive_mutex);
312
321
*rc= HA_ERR_OUT_OF_MEM;
316
325
if (share->prime(&stats.auto_increment_value) == false)
318
pthread_mutex_unlock(&a_engine->mutex());
327
pthread_mutex_unlock(&archive_mutex);
319
328
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
334
archive_open_tables[share->table_name.c_str()]= share;
335
thr_lock_init(&share->lock);
328
337
share->use_count++;
330
338
if (share->crashed)
331
339
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
340
pthread_mutex_unlock(&archive_mutex);
414
423
We open the file we will read from.
416
int ha_archive::doOpen(const TableIdentifier &identifier, int , uint32_t )
425
int ha_archive::open(const char *name, int, uint32_t open_options)
419
share= get_share(identifier.getPath().c_str(), &rc);
422
We either fix it ourselves, or we just take it offline
424
@todo Create some documentation in the recovery tools shipped with the engine.
426
if (rc == HA_ERR_CRASHED_ON_USAGE)
428
share= get_share(name, &rc);
430
if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
432
/* purecov: begin inspected */
433
437
else if (rc == HA_ERR_OUT_OF_MEM)
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::TableIdentifier &identifier,
503
drizzled::message::Table& proto)
509
int ArchiveEngine::createTableImpl(Session *session, const char *table_name,
511
HA_CREATE_INFO *create_info)
513
char name_buff[FN_REFLEN];
514
char linkname[FN_REFLEN];
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
516
azio_stream create_stream; /* Archive file we are working with */
517
FILE *frm_file; /* File handler for readers */
518
struct stat file_stat;
519
unsigned char *frm_ptr;
507
521
uint64_t auto_increment_value;
508
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
523
auto_increment_value= create_info->auto_increment_value;
525
for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
527
KEY *pos= table_arg->key_info+key;
528
KEY_PART_INFO *key_part= pos->key_part;
529
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
518
531
for (; key_part != key_part_end; key_part++)
522
535
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
AZ_METHOD_BLOCK) == 0)
544
We reuse name_buff since it is available.
546
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
548
fn_format(name_buff, create_info->data_file_name, "", ARZ,
549
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
550
fn_format(linkname, table_name, "", ARZ,
551
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
555
fn_format(name_buff, table_name, "", ARZ,
556
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
561
There is a chance that the file was "discovered". In this case
562
just use whatever file is there.
564
r= stat(name_buff, &file_stat);
565
if (r == -1 && errno!=ENOENT)
570
return HA_ERR_TABLE_EXIST;
573
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
581
my_symlink(name_buff, linkname, MYF(0));
582
fn_format(name_buff, table_name, "", ".frm",
583
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
586
Here is where we open up the frm and pass it to archive to store
588
if ((frm_file= fopen(name_buff, "r")) > 0)
590
if (fstat(fileno(frm_file), &file_stat))
571
unlink(named_file.c_str());
573
return(error ? error : -1);
592
if ((uint64_t)file_stat.st_size > SIZE_MAX)
597
frm_ptr= (unsigned char *)malloc((size_t)file_stat.st_size);
601
length_io= read(fileno(frm_file), frm_ptr, (size_t)file_stat.st_size);
603
if (length_io != (size_t)file_stat.st_size)
609
length_io= azwrite_frm(&create_stream, (char *)frm_ptr, (size_t)file_stat.st_size);
611
if (length_io != (size_t)file_stat.st_size)
623
if (create_info->comment.str)
627
write_length= azwrite_comment(&create_stream, create_info->comment.str,
628
(unsigned int)create_info->comment.length);
630
if (write_length == (size_t)create_info->comment.length)
578
635
Yes you need to do this, because the starting value
579
636
for the autoincrement may not be zero.
581
create_stream->auto_increment= auto_increment_value ?
638
create_stream.auto_increment= auto_increment_value ?
582
639
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
641
if (azclose(&create_stream))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
650
deleteTable(session, table_name);
652
/* Return error number, if we got one */
653
return(error ? error : -1);
643
704
unsigned char *ptr;
645
706
if (fix_rec_buff(max_row_length(record)))
646
return(HA_ERR_OUT_OF_MEM);
707
return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
648
709
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
710
memcpy(record_buffer->buffer, record, table->s->null_bytes);
711
ptr= record_buffer->buffer + table->s->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
713
for (Field **field=table->field ; *field ; field++)
654
715
if (!((*field)->is_null()))
655
716
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
719
return((unsigned int) (ptr - record_buffer->buffer));
668
729
for implementing start_bulk_insert() is that we could skip
669
730
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
732
int ha_archive::write_row(unsigned char *buf)
674
735
unsigned char *read_buf= NULL;
675
736
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
737
unsigned char *record= table->record[0];
678
739
if (share->crashed)
679
740
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
742
ha_statistic_increment(&SSV::ha_write_count);
743
pthread_mutex_lock(&share->mutex);
683
745
if (share->archive_write_open == false)
684
746
if (init_archive_writer())
685
747
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
750
if (table->next_number_field && record == table->record[0])
752
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
753
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
754
temp_auto= table->next_number_field->val_int();
694
757
We don't support decremening auto_increment. They make the performance
697
760
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
761
mkey->flags & HA_NOSAME)
700
763
rc= HA_ERR_FOUND_DUPP_KEY;
743
806
the optimizer that we have unique indexes, we scan
745
808
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
uint32_t key_len, enum ha_rkey_function)
809
uint32_t key_len, enum ha_rkey_function find_flag)
812
rc= index_read_idx(buf, active_index, key, key_len, find_flag);
817
int ha_archive::index_read_idx(unsigned char *buf, uint32_t index, const unsigned char *key,
818
uint32_t key_len, enum ha_rkey_function)
750
current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
822
KEY *mkey= &table->s->key_info[index];
823
current_k_offset= mkey->key_part->offset;
751
824
current_key= key;
752
825
current_key_len= key_len;
754
rc= doStartTableScan(true);
889
973
return(HA_ERR_END_OF_FILE);
892
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
976
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
977
current_position= aztell(&archive);
894
978
rc= get_row(&archive, buf);
896
getTable()->status=rc ? STATUS_NOT_FOUND: 0;
980
table->status=rc ? STATUS_NOT_FOUND: 0;
903
Thanks to the table bool is_ordered this will be called after
987
Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
904
988
each call to ha_archive::rnd_next() if an ordering of the rows is
908
992
void ha_archive::position(const unsigned char *)
910
internal::my_store_ptr(ref, ref_length, current_position);
994
my_store_ptr(ref, ref_length, current_position);
962
1048
share->archive_write_open= false;
966
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
if (proto_string == NULL)
971
azread_frm(&archive, proto_string);
973
1051
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
1052
fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1053
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1055
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
1056
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
986
1059
An extended rebuild is a lot more effort. We open up each row and re-record it.
1012
1086
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1088
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1090
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1095
real_write_row(table->record[0], &writer);
1023
1097
Long term it should be possible to optimize this so that
1024
1098
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1100
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1031
field->setReadSet();
1102
Field *field= table->found_next_number_field;
1033
1103
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1104
(uint64_t) field->val_int(table->record[0] +
1105
field->offset(table->record[0]));
1036
1106
if (share->archive_write.auto_increment < auto_value)
1037
1107
stats.auto_increment_value=
1038
1108
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1111
share->rows_recorded= (ha_rows)writer.rows;
1044
1114
if (rc && rc != HA_ERR_END_OF_FILE)
1291
We just return state if asked.
1293
bool ha_archive::is_crashed() const
1295
return(share->crashed);
1207
1299
Simple scan of the tables to make sure everything is ok.
1210
int ha_archive::check(Session* session)
1302
int ha_archive::check(Session* session, HA_CHECK_OPT *)
1213
1305
const char *old_proc_info;
1215
1308
old_proc_info= get_session_proc_info(session);
1216
1309
set_session_proc_info(session, "Checking table");
1217
1310
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1311
pthread_mutex_lock(&share->mutex);
1219
1312
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1313
pthread_mutex_unlock(&share->mutex);
1223
1316
Now we will rewind the archive file so that we are positioned at the
1250
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const TableIdentifier &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::SchemaIdentifier &schema_identifier,
1283
drizzled::TableIdentifier::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1344
Check and repair the table if needed.
1346
bool ha_archive::check_and_repair(Session *session)
1348
HA_CHECK_OPT check_opt;
1352
return(repair(session, &check_opt));
1355
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1357
archive_record_buffer *r;
1358
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1360
return(NULL); /* purecov: inspected */
1362
r->length= (int)length;
1364
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1367
return(NULL); /* purecov: inspected */
1373
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1375
free((char*) r->buffer);
1380
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1381
PLUGIN_VAR_NOCMDOPT,
1382
"Whether or not to use asynchronous IO.",
1385
static struct st_mysql_sys_var* archive_system_variables[]= {
1386
DRIZZLE_SYSVAR(aio),
1390
drizzle_declare_plugin(archive)
1394
"Brian Aker, MySQL AB",
1395
"Archive storage engine",
1397
archive_db_init, /* Plugin Init */
1398
archive_db_done, /* Plugin Deinit */
1399
NULL, /* status variables */
1400
archive_system_variables, /* system variables */
1401
NULL /* config options */
1403
drizzle_declare_plugin_end;