12
13
You should have received a copy of the GNU General Public License
13
14
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include "drizzled/server_includes.h"
18
#include "drizzled/field.h"
19
#include "drizzled/field/blob.h"
20
#include "drizzled/field/timestamp.h"
21
#include "plugin/myisam/myisam.h"
22
#include "drizzled/table.h"
23
#include "drizzled/session.h"
24
#include <mysys/my_dir.h>
26
#include "ha_archive.h"
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
32
24
using namespace std;
25
using namespace drizzled;
34
static const string engine_name("ARCHIVE");
37
29
First, if you want to understand storage engines you should look at
127
110
#define ARCHIVE_ROW_HEADER_SIZE 4
130
We just implement one additional file extension.
132
static const char *ha_archive_exts[] = {
137
class ArchiveTableNameIterator: public TableNameIteratorImplementation
141
uint32_t current_entry;
144
ArchiveTableNameIterator(const std::string &database)
145
: TableNameIteratorImplementation(database), dirp(NULL), current_entry(-1)
148
~ArchiveTableNameIterator();
150
int next(std::string *name);
154
ArchiveTableNameIterator::~ArchiveTableNameIterator()
160
int ArchiveTableNameIterator::next(string *name)
162
char uname[NAME_LEN + 1];
165
uint32_t file_name_len;
166
const char *wild= NULL;
171
char path[FN_REFLEN];
173
build_table_filename(path, sizeof(path), db.c_str(), "", false);
174
dirp = my_dir(path,MYF(dir ? MY_WANT_STAT : 0));
177
if (my_errno == ENOENT)
178
my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), db.c_str());
180
my_error(ER_CANT_READ_DIR, MYF(ME_BELL+ME_WAITTANG), path, my_errno);
190
if (current_entry == dirp->number_off_files)
197
file= dirp->dir_entry + current_entry;
199
if (my_strcasecmp(system_charset_info, ext=strchr(file->name,'.'), ARZ) ||
200
is_prefix(file->name, TMP_FILE_PREFIX))
204
file_name_len= filename_to_tablename(file->name, uname, sizeof(uname));
206
uname[file_name_len]= '\0';
208
if (wild && wild_compare(uname, wild, 0))
217
class ArchiveEngine : public StorageEngine
220
ArchiveEngine(const string &name_arg) : StorageEngine(name_arg,
222
| HTON_HAS_DATA_DICTIONARY
225
virtual handler *create(TableShare *table,
228
return new (mem_root) ha_archive(this, table);
231
const char **bas_ext() const {
232
return ha_archive_exts;
235
int createTableImplementation(Session *session, const char *table_name,
236
Table *table_arg, HA_CREATE_INFO *create_info,
237
drizzled::message::Table* proto);
239
int getTableProtoImplementation(const char* path,
240
drizzled::message::Table *table_proto);
242
TableNameIteratorImplementation* tableNameIterator(const std::string &database)
244
return new ArchiveTableNameIterator(database);
248
int ArchiveEngine::getTableProtoImplementation(const char* path,
249
drizzled::message::Table *table_proto)
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const identifier::Table &identifier,
152
drizzled::message::Table &table_proto)
251
154
struct stat stat_info;
253
156
string proto_path;
255
158
proto_path.reserve(FN_REFLEN);
256
proto_path.assign(path);
159
proto_path.assign(identifier.getPath());
258
161
proto_path.append(ARZ);
260
163
if (stat(proto_path.c_str(),&stat_info))
265
azio_stream proto_stream;
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
266
170
char* proto_string;
267
if(azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
268
172
return HA_ERR_CRASHED_ON_USAGE;
270
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
271
175
if (proto_string == NULL)
273
azclose(&proto_stream);
177
azclose(proto_stream.get());
277
azread_frm(&proto_stream, proto_string);
181
azread_frm(proto_stream.get(), proto_string);
279
if(table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
280
184
error= HA_ERR_CRASHED_ON_USAGE;
282
azclose(&proto_stream);
186
azclose(proto_stream.get());
283
187
free(proto_string);
289
static ArchiveEngine *archive_engine= NULL;
292
Initialize the archive handler.
303
static int archive_db_init(drizzled::plugin::Registry ®istry)
306
pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
307
archive_engine= new ArchiveEngine(engine_name);
308
registry.add(archive_engine);
310
/* When the engine starts up set the first version */
317
Release the archive handler.
327
static int archive_db_done(drizzled::plugin::Registry ®istry)
329
registry.remove(archive_engine);
330
delete archive_engine;
332
pthread_mutex_destroy(&archive_mutex);
338
ha_archive::ha_archive(StorageEngine *engine_arg, TableShare *table_arg)
339
:handler(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
341
204
/* Set our original buffer from pre-allocated memory */
342
205
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
344
207
/* The size of the offset value we will use for position() */
345
ref_length= sizeof(my_off_t);
208
ref_length= sizeof(internal::my_off_t);
346
209
archive_reader_open= false;
644
int ArchiveEngine::createTableImplementation(Session *session,
645
const char *table_name,
647
HA_CREATE_INFO *create_info,
648
drizzled::message::Table *proto)
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::identifier::Table &identifier,
503
drizzled::message::Table& proto)
650
char name_buff[FN_REFLEN];
651
char linkname[FN_REFLEN];
653
azio_stream create_stream; /* Archive file we are working with */
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
654
507
uint64_t auto_increment_value;
655
508
string serialized_proto;
657
auto_increment_value= create_info->auto_increment_value;
510
auto_increment_value= proto.options().auto_increment_value();
659
for (uint32_t key= 0; key < table_arg->sizeKeys(); key++)
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
661
KEY *pos= table_arg->key_info+key;
662
KEY_PART_INFO *key_part= pos->key_part;
663
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
665
518
for (; key_part != key_part_end; key_part++)
669
522
if (!(field->flags & AUTO_INCREMENT_FLAG))
678
We reuse name_buff since it is available.
680
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
682
fn_format(name_buff, create_info->data_file_name, "", ARZ,
683
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
684
fn_format(linkname, table_name, "", ARZ,
685
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
689
fn_format(name_buff, table_name, "", ARZ,
690
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
695
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
696
534
AZ_METHOD_BLOCK) == 0)
703
if(symlink(name_buff, linkname) != 0)
706
proto->SerializeToString(&serialized_proto);
708
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
709
553
serialized_proto.length()))
712
if (create_info->comment.str)
716
write_length= azwrite_comment(&create_stream, create_info->comment.str,
717
(unsigned int)create_info->comment.length);
719
if (write_length == (size_t)create_info->comment.length)
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
724
578
Yes you need to do this, because the starting value
725
579
for the autoincrement may not be zero.
727
create_stream.auto_increment= auto_increment_value ?
581
create_stream->auto_increment= auto_increment_value ?
728
582
auto_increment_value - 1 : 0;
730
if (azclose(&create_stream))
584
if (azclose(create_stream.get()))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
739
deleteTable(session, table_name);
741
/* Return error number, if we got one */
742
return(error ? error : -1);
818
668
for implementing start_bulk_insert() is that we could skip
819
669
setting dirty to true each time.
821
int ha_archive::write_row(unsigned char *buf)
671
int ha_archive::doInsertRecord(unsigned char *buf)
824
674
unsigned char *read_buf= NULL;
825
675
uint64_t temp_auto;
826
unsigned char *record= table->record[0];
676
unsigned char *record= getTable()->getInsertRecord();
828
678
if (share->crashed)
829
679
return(HA_ERR_CRASHED_ON_USAGE);
831
ha_statistic_increment(&SSV::ha_write_count);
832
pthread_mutex_lock(&share->mutex);
681
pthread_mutex_lock(&share->mutex());
834
683
if (share->archive_write_open == false)
835
684
if (init_archive_writer())
836
685
return(HA_ERR_CRASHED_ON_USAGE);
839
if (table->next_number_field && record == table->record[0])
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
841
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
842
690
update_auto_increment();
843
temp_auto= table->next_number_field->val_int();
691
temp_auto= getTable()->next_number_field->val_int();
846
694
We don't support decremening auto_increment. They make the performance
849
697
if (temp_auto <= share->archive_write.auto_increment &&
850
mkey->flags & HA_NOSAME)
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
852
700
rc= HA_ERR_FOUND_DUPP_KEY;
1188
1012
rows_restored= archive.rows;
1190
for (x= 0; x < rows_restored ; x++)
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1192
rc= get_row(&archive, table->record[0]);
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1197
real_write_row(table->record[0], &writer);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1199
1023
Long term it should be possible to optimize this so that
1200
1024
it is not called on each row.
1202
if (table->found_next_number_field)
1026
if (getTable()->found_next_number_field)
1204
Field *field= table->found_next_number_field;
1028
Field *field= getTable()->found_next_number_field;
1206
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1207
1031
field->setReadSet();
1209
1033
uint64_t auto_value=
1210
(uint64_t) field->val_int(table->record[0] +
1211
field->offset(table->record[0]));
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1212
1036
if (share->archive_write.auto_increment < auto_value)
1213
1037
stats.auto_increment_value=
1214
1038
(share->archive_write.auto_increment= auto_value) + 1;
1217
share->rows_recorded= (ha_rows)writer.rows;
1041
share->rows_recorded= (ha_rows)writer->rows;
1220
1044
if (rc && rc != HA_ERR_END_OF_FILE)
1453
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1455
archive_record_buffer *r;
1456
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1458
return(NULL); /* purecov: inspected */
1460
r->length= (int)length;
1462
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1465
return(NULL); /* purecov: inspected */
1471
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1473
free((char*) r->buffer);
1478
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1479
PLUGIN_VAR_NOCMDOPT,
1480
"Whether or not to use asynchronous IO.",
1483
static struct st_mysql_sys_var* archive_system_variables[]= {
1484
DRIZZLE_SYSVAR(aio),
1488
drizzle_declare_plugin(archive)
1492
"Brian Aker, MySQL AB",
1493
"Archive storage engine",
1495
archive_db_init, /* Plugin Init */
1496
archive_db_done, /* Plugin Deinit */
1497
NULL, /* status variables */
1498
archive_system_variables, /* system variables */
1499
NULL /* config options */
1501
drizzle_declare_plugin_end;
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const identifier::Table &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::identifier::Schema &schema_identifier,
1283
drizzled::identifier::Table::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));