94
95
/* Variables for archive share methods */
95
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
97
/* When the engine starts up set the first version */
98
static uint64_t global_version= 1;
100
// We use this to find out the state of the archive aio option.
101
extern bool archive_aio_state(void);
96
pthread_mutex_t archive_mutex;
97
static HASH archive_open_tables;
98
static unsigned int global_version;
100
/* The file extension */
101
#define ARZ ".ARZ" // The data file
102
#define ARN ".ARN" // Files used during an optimize call
105
/* Static declarations for handerton */
106
static handler *archive_create_handler(handlerton *hton,
110
static bool archive_use_aio= false;
104
113
Number of rows that will force a bulk insert.
111
120
#define ARCHIVE_ROW_HEADER_SIZE 4
113
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
115
ArchiveMap::iterator find_iter=
116
archive_open_tables.find(table_name);
118
if (find_iter != archive_open_tables.end())
119
return (*find_iter).second;
124
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
126
archive_open_tables[table_name]= share;
129
void ArchiveEngine::deleteOpenTable(const string &table_name)
131
archive_open_tables.erase(table_name);
135
void ArchiveEngine::doGetTableNames(drizzled::CachedDirectory &directory,
137
set<string>& set_of_names)
139
drizzled::CachedDirectory::Entries entries= directory.getEntries();
141
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
142
entry_iter != entries.end(); ++entry_iter)
144
drizzled::CachedDirectory::Entry *entry= *entry_iter;
145
const string *filename= &entry->filename;
147
assert(filename->size());
149
const char *ext= strchr(filename->c_str(), '.');
151
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
152
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
156
char uname[NAME_LEN + 1];
157
uint32_t file_name_len;
159
file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
160
// TODO: Remove need for memory copy here
161
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
162
set_of_names.insert(uname);
168
int ArchiveEngine::doDropTable(Session&, TableIdentifier &identifier)
170
string new_path(identifier.getPath());
174
int error= unlink(new_path.c_str());
184
int ArchiveEngine::doGetTableDefinition(Session&,
185
TableIdentifier &identifier,
186
drizzled::message::Table &table_proto)
188
struct stat stat_info;
192
proto_path.reserve(FN_REFLEN);
193
proto_path.assign(identifier.getPath());
195
proto_path.append(ARZ);
197
if (stat(proto_path.c_str(),&stat_info))
203
azio_stream proto_stream;
205
if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
206
return HA_ERR_CRASHED_ON_USAGE;
208
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
209
if (proto_string == NULL)
211
azclose(&proto_stream);
215
azread_frm(&proto_stream, proto_string);
217
if (table_proto.ParseFromArray(proto_string, proto_stream.frm_length) == false)
218
error= HA_ERR_CRASHED_ON_USAGE;
220
azclose(&proto_stream);
228
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
229
TableShare &table_arg)
230
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
122
static handler *archive_create_handler(handlerton *hton,
126
return new (mem_root) ha_archive(hton, table);
130
Used for hash table that tracks open tables.
132
static unsigned char* archive_get_key(ARCHIVE_SHARE *share, size_t *length, bool)
134
*length=share->table_name_length;
135
return (unsigned char*) share->table_name;
140
Initialize the archive handler.
151
int archive_db_init(void *p)
153
handlerton *archive_hton;
155
archive_hton= (handlerton *)p;
156
archive_hton->state= SHOW_OPTION_YES;
157
archive_hton->create= archive_create_handler;
158
archive_hton->flags= HTON_NO_FLAGS;
160
/* When the engine starts up set the first version */
163
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
165
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
166
(hash_get_key) archive_get_key, 0, 0))
168
pthread_mutex_destroy(&archive_mutex);
179
Release the archive handler.
189
int archive_db_done(void *)
191
hash_free(&archive_open_tables);
192
pthread_mutex_destroy(&archive_mutex);
198
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
199
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
232
201
/* Set our original buffer from pre-allocated memory */
233
202
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
235
204
/* The size of the offset value we will use for position() */
236
ref_length= sizeof(internal::my_off_t);
205
ref_length= sizeof(my_off_t);
237
206
archive_reader_open= false;
254
ArchiveShare::ArchiveShare():
255
use_count(0), archive_write_open(false), dirty(false), crashed(false),
256
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
261
ArchiveShare::ArchiveShare(const char *name):
262
use_count(0), archive_write_open(false), dirty(false), crashed(false),
263
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
265
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
266
table_name.append(name);
267
internal::fn_format(data_file_name, table_name.c_str(), "",
268
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
270
We will use this lock for rows.
272
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
275
ArchiveShare::~ArchiveShare()
277
thr_lock_delete(&lock);
278
pthread_mutex_destroy(&mutex);
280
We need to make sure we don't reset the crashed state.
281
If we open a crashed file, wee need to close it as crashed unless
282
it has been repaired.
283
Since we will close the data down after this, we go on and count
286
if (archive_write_open == true)
287
(void)azclose(&archive_write);
288
pthread_mutex_destroy(&archive_mutex);
291
bool ArchiveShare::prime(uint64_t *auto_increment)
293
azio_stream archive_tmp;
296
We read the meta file, but do not mark it dirty. Since we are not
297
doing a write we won't mark it dirty (and we won't open it for
298
anything but reading... open it for write and we will generate null
301
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
305
*auto_increment= archive_tmp.auto_increment + 1;
306
rows_recorded= (ha_rows)archive_tmp.rows;
307
crashed= archive_tmp.dirty;
308
if (version < global_version)
310
version_rows= rows_recorded;
311
version= global_version;
313
azclose(&archive_tmp);
320
225
We create the shared memory space that we will use for the open table.
324
229
See ha_example.cc for a longer description.
326
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
231
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
328
235
pthread_mutex_lock(&archive_mutex);
330
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
331
share= a_engine->findOpenTable(table_name);
236
length=(uint) strlen(table_name);
238
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
239
(unsigned char*) table_name,
335
share= new ArchiveShare(table_name);
243
azio_stream archive_tmp;
245
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
246
&share, sizeof(*share),
339
250
pthread_mutex_unlock(&archive_mutex);
340
251
*rc= HA_ERR_OUT_OF_MEM;
344
if (share->prime(&stats.auto_increment_value) == false)
256
share->table_name_length= length;
257
share->table_name= tmp_name;
258
share->crashed= false;
259
share->archive_write_open= false;
260
fn_format(share->data_file_name, table_name, "",
261
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
262
strcpy(share->table_name, table_name);
264
We will use this lock for rows.
266
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
269
We read the meta file, but do not mark it dirty. Since we are not
270
doing a write we won't mark it dirty (and we won't open it for
271
anything but reading... open it for write and we will generate null
274
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY,
277
pthread_mutex_destroy(&share->mutex);
346
279
pthread_mutex_unlock(&archive_mutex);
347
280
*rc= HA_ERR_CRASHED_ON_REPAIR;
353
a_engine->addOpenTable(share->table_name, share);
283
stats.auto_increment_value= archive_tmp.auto_increment + 1;
284
share->rows_recorded= (ha_rows)archive_tmp.rows;
285
share->crashed= archive_tmp.dirty;
286
if (share->version < global_version)
288
share->version_rows= share->rows_recorded;
289
share->version= global_version;
291
azclose(&archive_tmp);
293
my_hash_insert(&archive_open_tables, (unsigned char*) share);
354
294
thr_lock_init(&share->lock);
356
296
share->use_count++;
358
297
if (share->crashed)
359
298
*rc= HA_ERR_CRASHED_ON_USAGE;
360
299
pthread_mutex_unlock(&archive_mutex);
370
309
int ha_archive::free_share()
372
313
pthread_mutex_lock(&archive_mutex);
373
314
if (!--share->use_count)
375
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
376
a_engine->deleteOpenTable(share->table_name);
316
hash_delete(&archive_open_tables, (unsigned char*) share);
317
thr_lock_delete(&share->lock);
318
pthread_mutex_destroy(&share->mutex);
320
We need to make sure we don't reset the crashed state.
321
If we open a crashed file, wee need to close it as crashed unless
322
it has been repaired.
323
Since we will close the data down after this, we go on and count
326
if (share->archive_write_open == true)
328
if (azclose(&(share->archive_write)))
331
free((unsigned char*) share);
379
333
pthread_mutex_unlock(&archive_mutex);
384
338
int ha_archive::init_archive_writer()
396
We just implement one additional file extension.
398
static const char *ha_archive_exts[] = {
403
const char **ha_archive::bas_ext() const
405
return ha_archive_exts;
441
410
When opening a file we:
442
411
Create/get our shared structure.
444
413
We open the file we will read from.
446
int ha_archive::open(const char *name, int, uint32_t)
415
int ha_archive::open(const char *name, int, uint32_t open_options)
449
418
share= get_share(name, &rc);
452
We either fix it ourselves, or we just take it offline
454
@todo Create some documentation in the recovery tools shipped with the engine.
456
if (rc == HA_ERR_CRASHED_ON_USAGE)
420
if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
422
/* purecov: begin inspected */
463
427
else if (rc == HA_ERR_OUT_OF_MEM)
530
int ArchiveEngine::doCreateTable(Session &,
532
drizzled::TableIdentifier &identifier,
533
drizzled::message::Table& proto)
499
int ha_archive::create(const char *name, Table *table_arg,
500
HA_CREATE_INFO *create_info)
535
502
char name_buff[FN_REFLEN];
503
char linkname[FN_REFLEN];
537
505
azio_stream create_stream; /* Archive file we are working with */
538
uint64_t auto_increment_value;
539
string serialized_proto;
541
auto_increment_value= proto.options().auto_increment_value();
543
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
506
File frm_file; /* File handler for readers */
507
struct stat file_stat;
508
unsigned char *frm_ptr;
511
stats.auto_increment_value= create_info->auto_increment_value;
513
for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
545
KEY *pos= table_arg.key_info+key;
515
KEY *pos= table_arg->key_info+key;
546
516
KEY_PART_INFO *key_part= pos->key_part;
547
517
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
562
532
We reuse name_buff since it is available.
564
internal::fn_format(name_buff, identifier.getPath().c_str(), "", ARZ,
565
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
568
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
569
AZ_METHOD_BLOCK) == 0)
534
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
536
fn_format(name_buff, create_info->data_file_name, "", ARZ,
537
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
538
fn_format(linkname, name, "", ARZ,
539
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
543
fn_format(name_buff, name, "", ARZ,
544
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
549
There is a chance that the file was "discovered". In this case
550
just use whatever file is there.
552
r= stat(name_buff, &file_stat);
553
if (r == -1 && errno!=ENOENT)
558
return HA_ERR_TABLE_EXIST;
561
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
575
proto.SerializeToString(&serialized_proto);
577
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
578
serialized_proto.length()))
581
if (proto.options().has_comment())
569
my_symlink(name_buff, linkname, MYF(0));
570
fn_format(name_buff, name, "", ".frm",
571
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
574
Here is where we open up the frm and pass it to archive to store
576
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
585
write_length= azwrite_comment(&create_stream,
586
proto.options().comment().c_str(),
587
proto.options().comment().length());
589
if (write_length < 0)
578
if (fstat(frm_file, &file_stat))
580
frm_ptr= (unsigned char *)malloc(sizeof(unsigned char) *
584
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
585
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
586
free((unsigned char*)frm_ptr);
589
my_close(frm_file, MYF(0));
592
if (create_info->comment.str)
593
azwrite_comment(&create_stream, create_info->comment.str,
594
(unsigned int)create_info->comment.length);
597
597
Yes you need to do this, because the starting value
598
598
for the autoincrement may not be zero.
600
create_stream.auto_increment= auto_increment_value ?
601
auto_increment_value - 1 : 0;
600
create_stream.auto_increment= stats.auto_increment_value ?
601
stats.auto_increment_value - 1 : 0;
603
602
if (azclose(&create_stream))
726
726
rc= HA_ERR_FOUND_DUPP_KEY;
731
Bad news, this will cause a search for the unique value which is very
732
expensive since we will have to do a table scan which will lock up
733
all other writers during this period. This could perhaps be optimized
738
First we create a buffer that we can use for reading rows, and can pass
741
if (!(read_buf= (unsigned char*) malloc(table->s->reclength)))
743
rc= HA_ERR_OUT_OF_MEM;
747
All of the buffer must be written out or we won't see all of the
750
azflush(&(share->archive_write), Z_SYNC_FLUSH);
752
Set the position of the local read thread to the beginning postion.
754
if (read_data_header(&archive))
756
rc= HA_ERR_CRASHED_ON_USAGE;
760
Field *mfield= table->next_number_field;
762
while (!(get_row(&archive, read_buf)))
764
if (!memcmp(read_buf + mfield->offset(record),
765
table->next_number_field->ptr,
766
mfield->max_display_length()))
768
rc= HA_ERR_FOUND_DUPP_KEY;
731
776
if (temp_auto > share->archive_write.auto_increment)
1010
1056
share->archive_write_open= false;
1014
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
1015
if (proto_string == NULL)
1019
azread_frm(&archive, proto_string);
1021
1059
/* Lets create a file to contain the new data */
1022
internal::fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1060
fn_format(writer_filename, share->table_name, "", ARN,
1023
1061
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1025
1063
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1028
1064
return(HA_ERR_CRASHED_ON_USAGE);
1031
azwrite_frm(&writer, proto_string, archive.frm_length);
1034
1067
An extended rebuild is a lot more effort. We open up each row and re-record it.
1325
int ArchiveEngine::doRenameTable(Session&, TableIdentifier &from, TableIdentifier &to)
1329
for (const char **ext= bas_ext(); *ext ; ext++)
1331
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1333
if ((error=errno) != ENOENT)
1342
bool ArchiveEngine::doDoesTableExist(Session&,
1343
TableIdentifier &identifier)
1345
string proto_path(identifier.getPath());
1346
proto_path.append(ARZ);
1348
if (access(proto_path.c_str(), F_OK))
1356
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1357
drizzled::SchemaIdentifier &schema_identifier,
1358
drizzled::TableIdentifiers &set_of_identifiers)
1360
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1362
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1363
entry_iter != entries.end(); ++entry_iter)
1365
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1366
const string *filename= &entry->filename;
1368
assert(filename->size());
1370
const char *ext= strchr(filename->c_str(), '.');
1372
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1373
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1377
char uname[NAME_LEN + 1];
1378
uint32_t file_name_len;
1380
file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1381
// TODO: Remove need for memory copy here
1382
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1384
set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1391
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1392
PLUGIN_VAR_NOCMDOPT,
1393
"Whether or not to use asynchronous IO.",
1396
static struct st_mysql_sys_var* archive_system_variables[]= {
1397
DRIZZLE_SYSVAR(aio),
1401
mysql_declare_plugin(archive)
1403
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1406
"Brian Aker, MySQL AB",
1407
"Archive storage engine",
1409
archive_db_init, /* Plugin Init */
1410
archive_db_done, /* Plugin Deinit */
1411
NULL, /* status variables */
1412
archive_system_variables, /* system variables */
1413
NULL /* config options */
1415
mysql_declare_plugin_end;