13
12
You should have received a copy of the GNU General Public License
14
13
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include "drizzled/server_includes.h"
18
#include "drizzled/field.h"
19
#include "drizzled/field/blob.h"
20
#include "drizzled/field/timestamp.h"
21
#include "plugin/myisam/myisam.h"
22
#include "drizzled/table.h"
23
#include "drizzled/session.h"
24
#include <mysys/my_dir.h>
26
#include "ha_archive.h"
24
32
using namespace std;
25
using namespace drizzled;
34
static const string engine_name("ARCHIVE");
29
37
First, if you want to understand storage engines you should look at
110
127
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const TableIdentifier &identifier,
152
drizzled::message::Table &table_proto)
130
We just implement one additional file extension.
132
static const char *ha_archive_exts[] = {
137
class ArchiveTableNameIterator: public drizzled::plugin::TableNameIteratorImplementation
141
uint32_t current_entry;
144
ArchiveTableNameIterator(const std::string &database)
145
: drizzled::plugin::TableNameIteratorImplementation(database), dirp(NULL), current_entry(-1)
148
~ArchiveTableNameIterator();
150
int next(std::string *name);
154
ArchiveTableNameIterator::~ArchiveTableNameIterator()
160
int ArchiveTableNameIterator::next(string *name)
162
char uname[NAME_LEN + 1];
165
uint32_t file_name_len;
166
const char *wild= NULL;
171
char path[FN_REFLEN];
173
build_table_filename(path, sizeof(path), db.c_str(), "", false);
174
dirp = my_dir(path,MYF(dir ? MY_WANT_STAT : 0));
177
if (my_errno == ENOENT)
178
my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), db.c_str());
180
my_error(ER_CANT_READ_DIR, MYF(ME_BELL+ME_WAITTANG), path, my_errno);
190
if (current_entry == dirp->number_off_files)
197
file= dirp->dir_entry + current_entry;
199
if (my_strcasecmp(system_charset_info, ext=strchr(file->name,'.'), ARZ) ||
200
is_prefix(file->name, TMP_FILE_PREFIX))
204
file_name_len= filename_to_tablename(file->name, uname, sizeof(uname));
206
uname[file_name_len]= '\0';
208
if (wild && wild_compare(uname, wild, 0))
217
class ArchiveEngine : public drizzled::plugin::StorageEngine
220
ArchiveEngine(const string &name_arg)
221
: drizzled::plugin::StorageEngine(name_arg,
223
| HTON_HAS_DATA_DICTIONARY) {}
225
virtual handler *create(TableShare *table,
228
return new (mem_root) ha_archive(this, table);
231
const char **bas_ext() const {
232
return ha_archive_exts;
235
int createTableImplementation(Session *session, const char *table_name,
236
Table *table_arg, HA_CREATE_INFO *create_info,
237
drizzled::message::Table* proto);
239
int getTableProtoImplementation(const char* path,
240
drizzled::message::Table *table_proto);
242
drizzled::plugin::TableNameIteratorImplementation* tableNameIterator(const std::string &database)
244
return new ArchiveTableNameIterator(database);
248
int ArchiveEngine::getTableProtoImplementation(const char* path,
249
drizzled::message::Table *table_proto)
154
251
struct stat stat_info;
156
253
string proto_path;
158
255
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
256
proto_path.assign(path);
161
258
proto_path.append(ARZ);
163
260
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
265
azio_stream proto_stream;
170
266
char* proto_string;
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
267
if(azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
268
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
270
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
271
if (proto_string == NULL)
177
azclose(proto_stream.get());
273
azclose(&proto_stream);
181
azread_frm(proto_stream.get(), proto_string);
277
azread_frm(&proto_stream, proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
279
if(table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
280
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
282
azclose(&proto_stream);
187
283
free(proto_string);
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
289
static ArchiveEngine *archive_engine= NULL;
292
Initialize the archive handler.
303
static int archive_db_init(drizzled::plugin::Registry ®istry)
306
pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
307
archive_engine= new ArchiveEngine(engine_name);
308
registry.add(archive_engine);
310
/* When the engine starts up set the first version */
317
Release the archive handler.
327
static int archive_db_done(drizzled::plugin::Registry ®istry)
329
registry.remove(archive_engine);
330
delete archive_engine;
332
pthread_mutex_destroy(&archive_mutex);
338
ha_archive::ha_archive(drizzled::plugin::StorageEngine *engine_arg,
339
TableShare *table_arg)
340
:handler(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
342
/* Set our original buffer from pre-allocated memory */
205
343
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
345
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
346
ref_length= sizeof(my_off_t);
209
347
archive_reader_open= false;
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::TableIdentifier &identifier,
503
drizzled::message::Table& proto)
643
int ArchiveEngine::createTableImplementation(Session *session,
644
const char *table_name,
646
HA_CREATE_INFO *create_info,
647
drizzled::message::Table *proto)
649
char name_buff[FN_REFLEN];
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
651
azio_stream create_stream; /* Archive file we are working with */
507
652
uint64_t auto_increment_value;
508
653
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
655
auto_increment_value= create_info->auto_increment_value;
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
657
for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
659
KEY *pos= table_arg->key_info+key;
660
KEY_PART_INFO *key_part= pos->key_part;
661
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
518
663
for (; key_part != key_part_end; key_part++)
522
667
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
676
We reuse name_buff since it is available.
678
fn_format(name_buff, table_name, "", ARZ,
679
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
682
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
683
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
689
proto->SerializeToString(&serialized_proto);
691
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
692
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
695
if (proto->options().has_comment())
562
697
int write_length;
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
699
write_length= azwrite_comment(&create_stream,
700
proto->options().comment().c_str(),
701
proto->options().comment().length());
568
703
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
668
805
for implementing start_bulk_insert() is that we could skip
669
806
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
808
int ha_archive::write_row(unsigned char *buf)
674
811
unsigned char *read_buf= NULL;
675
812
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
813
unsigned char *record= table->record[0];
678
815
if (share->crashed)
679
816
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
818
ha_statistic_increment(&SSV::ha_write_count);
819
pthread_mutex_lock(&share->mutex);
683
821
if (share->archive_write_open == false)
684
822
if (init_archive_writer())
685
823
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
826
if (table->next_number_field && record == table->record[0])
828
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
829
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
830
temp_auto= table->next_number_field->val_int();
694
833
We don't support decremening auto_increment. They make the performance
697
836
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
837
mkey->flags & HA_NOSAME)
700
839
rc= HA_ERR_FOUND_DUPP_KEY;
1012
1175
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1177
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1179
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1184
real_write_row(table->record[0], &writer);
1023
1186
Long term it should be possible to optimize this so that
1024
1187
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1189
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1191
Field *field= table->found_next_number_field;
1030
1193
/* Since we will need to use field to translate, we need to flip its read bit */
1031
1194
field->setReadSet();
1033
1196
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1197
(uint64_t) field->val_int(table->record[0] +
1198
field->offset(table->record[0]));
1036
1199
if (share->archive_write.auto_increment < auto_value)
1037
1200
stats.auto_increment_value=
1038
1201
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1204
share->rows_recorded= (ha_rows)writer.rows;
1044
1207
if (rc && rc != HA_ERR_END_OF_FILE)
1250
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const TableIdentifier &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::SchemaIdentifier &schema_identifier,
1283
drizzled::TableIdentifier::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1422
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1424
archive_record_buffer *r;
1425
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1429
r->length= (int)length;
1431
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1440
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1442
free((char*) r->buffer);
1447
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1448
PLUGIN_VAR_NOCMDOPT,
1449
"Whether or not to use asynchronous IO.",
1452
static struct st_mysql_sys_var* archive_system_variables[]= {
1453
DRIZZLE_SYSVAR(aio),
1457
drizzle_declare_plugin(archive)
1461
"Brian Aker, MySQL AB",
1462
"Archive storage engine",
1464
archive_db_init, /* Plugin Init */
1465
archive_db_done, /* Plugin Deinit */
1466
NULL, /* status variables */
1467
archive_system_variables, /* system variables */
1468
NULL /* config options */
1470
drizzle_declare_plugin_end;