13
12
You should have received a copy of the GNU General Public License
14
13
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include "drizzled/server_includes.h"
18
#include "drizzled/field.h"
19
#include "drizzled/field/blob.h"
20
#include "drizzled/field/timestamp.h"
21
#include "plugin/myisam/myisam.h"
22
#include "drizzled/table.h"
23
#include "drizzled/session.h"
24
#include <mysys/my_dir.h>
26
#include "ha_archive.h"
24
32
using namespace std;
25
using namespace drizzled;
34
static const string engine_name("ARCHIVE");
29
37
First, if you want to understand storage engines you should look at
110
127
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const TableIdentifier &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const TableIdentifier &identifier,
152
drizzled::message::Table &table_proto)
130
We just implement one additional file extension.
132
static const char *ha_archive_exts[] = {
137
class ArchiveTableNameIterator: public TableNameIteratorImplementation
141
uint32_t current_entry;
144
ArchiveTableNameIterator(const std::string &database)
145
: TableNameIteratorImplementation(database), dirp(NULL), current_entry(-1)
148
~ArchiveTableNameIterator();
150
int next(std::string *name);
154
ArchiveTableNameIterator::~ArchiveTableNameIterator()
160
int ArchiveTableNameIterator::next(string *name)
162
char uname[NAME_LEN + 1];
165
uint32_t file_name_len;
166
const char *wild= NULL;
171
char path[FN_REFLEN];
173
build_table_filename(path, sizeof(path), db.c_str(), "", false);
174
dirp = my_dir(path,MYF(dir ? MY_WANT_STAT : 0));
177
if (my_errno == ENOENT)
178
my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), db.c_str());
180
my_error(ER_CANT_READ_DIR, MYF(ME_BELL+ME_WAITTANG), path, my_errno);
190
if (current_entry == dirp->number_off_files)
197
file= dirp->dir_entry + current_entry;
199
if (my_strcasecmp(system_charset_info, ext=strchr(file->name,'.'), ARZ) ||
200
is_prefix(file->name, TMP_FILE_PREFIX))
204
file_name_len= filename_to_tablename(file->name, uname, sizeof(uname));
206
uname[file_name_len]= '\0';
208
if (wild && wild_compare(uname, wild, 0))
217
class ArchiveEngine : public StorageEngine
220
ArchiveEngine(const string &name_arg) : StorageEngine(name_arg,
222
| HTON_HAS_DATA_DICTIONARY) {}
224
virtual handler *create(TableShare *table,
227
return new (mem_root) ha_archive(this, table);
230
const char **bas_ext() const {
231
return ha_archive_exts;
234
int createTableImplementation(Session *session, const char *table_name,
235
Table *table_arg, HA_CREATE_INFO *create_info,
236
drizzled::message::Table* proto);
238
int getTableProtoImplementation(const char* path,
239
drizzled::message::Table *table_proto);
241
TableNameIteratorImplementation* tableNameIterator(const std::string &database)
243
return new ArchiveTableNameIterator(database);
247
int ArchiveEngine::getTableProtoImplementation(const char* path,
248
drizzled::message::Table *table_proto)
154
250
struct stat stat_info;
156
252
string proto_path;
158
254
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
255
proto_path.assign(path);
161
257
proto_path.append(ARZ);
163
259
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
264
azio_stream proto_stream;
170
265
char* proto_string;
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
266
if(azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
267
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
269
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
175
270
if (proto_string == NULL)
177
azclose(proto_stream.get());
272
azclose(&proto_stream);
181
azread_frm(proto_stream.get(), proto_string);
276
azread_frm(&proto_stream, proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
278
if(table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
184
279
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
281
azclose(&proto_stream);
187
282
free(proto_string);
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
288
static ArchiveEngine *archive_engine= NULL;
291
Initialize the archive handler.
302
static int archive_db_init(drizzled::plugin::Registry ®istry)
305
pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
306
archive_engine= new ArchiveEngine(engine_name);
307
registry.add(archive_engine);
309
/* When the engine starts up set the first version */
316
Release the archive handler.
326
static int archive_db_done(drizzled::plugin::Registry ®istry)
328
registry.remove(archive_engine);
329
delete archive_engine;
331
pthread_mutex_destroy(&archive_mutex);
337
ha_archive::ha_archive(StorageEngine *engine_arg, TableShare *table_arg)
338
:handler(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
340
/* Set our original buffer from pre-allocated memory */
205
341
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
343
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
344
ref_length= sizeof(my_off_t);
209
345
archive_reader_open= false;
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::TableIdentifier &identifier,
503
drizzled::message::Table& proto)
641
int ArchiveEngine::createTableImplementation(Session *session,
642
const char *table_name,
644
HA_CREATE_INFO *create_info,
645
drizzled::message::Table *proto)
647
char name_buff[FN_REFLEN];
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
649
azio_stream create_stream; /* Archive file we are working with */
507
650
uint64_t auto_increment_value;
508
651
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
653
auto_increment_value= create_info->auto_increment_value;
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
655
for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
657
KEY *pos= table_arg->key_info+key;
658
KEY_PART_INFO *key_part= pos->key_part;
659
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
518
661
for (; key_part != key_part_end; key_part++)
522
665
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
674
We reuse name_buff since it is available.
676
fn_format(name_buff, table_name, "", ARZ,
677
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
680
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
534
681
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
687
proto->SerializeToString(&serialized_proto);
689
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
553
690
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
693
if (proto->options().has_comment())
562
695
int write_length;
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
697
write_length= azwrite_comment(&create_stream,
698
proto->options().comment().c_str(),
699
proto->options().comment().length());
568
701
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
668
803
for implementing start_bulk_insert() is that we could skip
669
804
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
806
int ha_archive::write_row(unsigned char *buf)
674
809
unsigned char *read_buf= NULL;
675
810
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
811
unsigned char *record= table->record[0];
678
813
if (share->crashed)
679
814
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
816
ha_statistic_increment(&SSV::ha_write_count);
817
pthread_mutex_lock(&share->mutex);
683
819
if (share->archive_write_open == false)
684
820
if (init_archive_writer())
685
821
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
824
if (table->next_number_field && record == table->record[0])
826
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
827
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
828
temp_auto= table->next_number_field->val_int();
694
831
We don't support decremening auto_increment. They make the performance
697
834
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
835
mkey->flags & HA_NOSAME)
700
837
rc= HA_ERR_FOUND_DUPP_KEY;
1012
1173
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1175
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1177
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1182
real_write_row(table->record[0], &writer);
1023
1184
Long term it should be possible to optimize this so that
1024
1185
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1187
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1189
Field *field= table->found_next_number_field;
1030
1191
/* Since we will need to use field to translate, we need to flip its read bit */
1031
1192
field->setReadSet();
1033
1194
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1195
(uint64_t) field->val_int(table->record[0] +
1196
field->offset(table->record[0]));
1036
1197
if (share->archive_write.auto_increment < auto_value)
1037
1198
stats.auto_increment_value=
1038
1199
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1202
share->rows_recorded= (ha_rows)writer.rows;
1044
1205
if (rc && rc != HA_ERR_END_OF_FILE)
1250
int ArchiveEngine::doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const TableIdentifier &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::SchemaIdentifier &schema_identifier,
1283
drizzled::TableIdentifier::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= TableIdentifier::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(TableIdentifier(schema_identifier, uname));
1420
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1422
archive_record_buffer *r;
1423
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1427
r->length= (int)length;
1429
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1438
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1440
free((char*) r->buffer);
1445
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1446
PLUGIN_VAR_NOCMDOPT,
1447
"Whether or not to use asynchronous IO.",
1450
static struct st_mysql_sys_var* archive_system_variables[]= {
1451
DRIZZLE_SYSVAR(aio),
1455
drizzle_declare_plugin(archive)
1459
"Brian Aker, MySQL AB",
1460
"Archive storage engine",
1462
archive_db_init, /* Plugin Init */
1463
archive_db_done, /* Plugin Deinit */
1464
NULL, /* status variables */
1465
archive_system_variables, /* system variables */
1466
NULL /* config options */
1468
drizzle_declare_plugin_end;