110
131
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const identifier::Table &identifier,
152
drizzled::message::Table &table_proto)
154
struct stat stat_info;
158
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
161
proto_path.append(ARZ);
163
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
if (proto_string == NULL)
177
azclose(proto_stream.get());
181
azread_frm(proto_stream.get(), proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
133
static handler *archive_create_handler(handlerton *hton,
137
return new (mem_root) ha_archive(hton, table);
141
Used for hash table that tracks open tables.
143
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
144
my_bool not_used __attribute__((unused)))
146
*length=share->table_name_length;
147
return (uchar*) share->table_name;
152
Initialize the archive handler.
163
int archive_db_init(void *p)
165
DBUG_ENTER("archive_db_init");
166
handlerton *archive_hton;
168
archive_hton= (handlerton *)p;
169
archive_hton->state= SHOW_OPTION_YES;
170
archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
171
archive_hton->create= archive_create_handler;
172
archive_hton->flags= HTON_NO_FLAGS;
173
archive_hton->discover= archive_discover;
175
/* When the engine starts up set the first version */
178
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
180
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
181
(hash_get_key) archive_get_key, 0, 0))
183
VOID(pthread_mutex_destroy(&archive_mutex));
194
Release the archive handler.
204
int archive_db_done(void *p __attribute__((__unused__)))
206
hash_free(&archive_open_tables);
207
VOID(pthread_mutex_destroy(&archive_mutex));
213
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
214
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
204
216
/* Set our original buffer from pre-allocated memory */
205
217
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
219
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
220
ref_length= sizeof(my_off_t);
209
221
archive_reader_open= false;
224
int archive_discover(handlerton *hton __attribute__((__unused__)),
225
THD* thd __attribute__((__unused__)),
231
DBUG_ENTER("archive_discover");
232
DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
233
azio_stream frm_stream;
234
char az_file[FN_REFLEN];
236
struct stat file_stat;
238
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
240
if (stat(az_file, &file_stat))
243
if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
245
if (errno == EROFS || errno == EACCES)
246
DBUG_RETURN(my_errno= errno);
247
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
250
if (frm_stream.frm_length == 0)
253
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
254
azread_frm(&frm_stream, frm_ptr);
255
azclose(&frm_stream);
257
*frmlen= frm_stream.frm_length;
258
*frmblob= (uchar*) frm_ptr;
213
267
This method reads the header of a datafile and returns whether or not it was successful.
215
269
int ha_archive::read_data_header(azio_stream *file_to_read)
271
DBUG_ENTER("ha_archive::read_data_header");
217
273
if (azread_init(file_to_read) == -1)
218
return(HA_ERR_CRASHED_ON_USAGE);
274
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
220
276
if (file_to_read->version >= 3)
226
ArchiveShare::ArchiveShare():
227
use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
233
ArchiveShare::ArchiveShare(const char *name):
234
use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
237
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
242
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
247
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
252
We need to make sure we don't reset the crashed state.
253
If we open a crashed file, wee need to close it as crashed unless
254
it has been repaired.
255
Since we will close the data down after this, we go on and count
258
if (archive_write_open == true)
259
(void)azclose(&archive_write);
262
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
267
We read the meta file, but do not mark it dirty. Since we are not
268
doing a write we won't mark it dirty (and we won't open it for
269
anything but reading... open it for write and we will generate null
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
279
if (version < global_version)
281
version_rows= rows_recorded;
282
version= global_version;
284
azclose(archive_tmp.get());
291
We create the shared memory space that we will use for the open table.
284
We create the shared memory space that we will use for the open table.
292
285
No matter what we try to get or create a share. This is so that a repair
293
table operation can occur.
286
table operation can occur.
295
288
See ha_example.cc for a longer description.
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
290
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
303
share= a_engine->findOpenTable(table_name);
293
DBUG_ENTER("ha_archive::get_share");
295
pthread_mutex_lock(&archive_mutex);
296
length=(uint) strlen(table_name);
298
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
307
share= new ArchiveShare(table_name);
303
azio_stream archive_tmp;
305
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
306
&share, sizeof(*share),
311
pthread_mutex_unlock(&a_engine->mutex());
310
pthread_mutex_unlock(&archive_mutex);
312
311
*rc= HA_ERR_OUT_OF_MEM;
316
if (share->prime(&stats.auto_increment_value) == false)
316
share->table_name_length= length;
317
share->table_name= tmp_name;
318
share->crashed= false;
319
share->archive_write_open= false;
320
fn_format(share->data_file_name, table_name, "",
321
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
322
strmov(share->table_name, table_name);
323
DBUG_PRINT("ha_archive", ("Data File %s",
324
share->data_file_name));
326
We will use this lock for rows.
328
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
331
We read the meta file, but do not mark it dirty. Since we are not
332
doing a write we won't mark it dirty (and we won't open it for
333
anything but reading... open it for write and we will generate null
336
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
318
pthread_mutex_unlock(&a_engine->mutex());
339
VOID(pthread_mutex_destroy(&share->mutex));
341
pthread_mutex_unlock(&archive_mutex);
319
342
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
345
stats.auto_increment_value= archive_tmp.auto_increment + 1;
346
share->rows_recorded= (ha_rows)archive_tmp.rows;
347
share->crashed= archive_tmp.dirty;
348
if (share->version < global_version)
350
share->version_rows= share->rows_recorded;
351
share->version= global_version;
353
azclose(&archive_tmp);
355
VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
356
thr_lock_init(&share->lock);
328
358
share->use_count++;
359
DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now",
360
share->table_name_length, share->table_name,
330
362
if (share->crashed)
331
363
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
364
pthread_mutex_unlock(&archive_mutex);
340
372
See ha_example.cc for a description.
342
374
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
377
DBUG_ENTER("ha_archive::free_share");
378
DBUG_PRINT("ha_archive",
379
("archive table %.*s has %d open handles on entrance",
380
share->table_name_length, share->table_name,
346
pthread_mutex_lock(&a_engine->mutex());
383
pthread_mutex_lock(&archive_mutex);
347
384
if (!--share->use_count)
349
a_engine->deleteOpenTable(share->table_name);
386
hash_delete(&archive_open_tables, (uchar*) share);
387
thr_lock_delete(&share->lock);
388
VOID(pthread_mutex_destroy(&share->mutex));
390
We need to make sure we don't reset the crashed state.
391
If we open a crashed file, wee need to close it as crashed unless
392
it has been repaired.
393
Since we will close the data down after this, we go on and count
396
if (share->archive_write_open == true)
398
if (azclose(&(share->archive_write)))
401
my_free((uchar*) share, MYF(0));
352
pthread_mutex_unlock(&a_engine->mutex());
403
pthread_mutex_unlock(&archive_mutex);
357
408
int ha_archive::init_archive_writer()
410
DBUG_ENTER("ha_archive::init_archive_writer");
360
412
It is expensive to open and close the data files and since you can't have
361
413
a gzip file that can be both read and written we keep a writer open
362
414
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
365
O_RDWR, AZ_METHOD_BLOCK)))
416
if (!(azopen(&(share->archive_write), share->data_file_name,
417
O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
419
DBUG_PRINT("ha_archive", ("Could not open archive write file"));
367
420
share->crashed= true;
370
423
share->archive_write_open= true;
377
No locks are required because it is associated with just one Cursor instance
430
No locks are required because it is associated with just one handler instance
379
432
int ha_archive::init_archive_reader()
434
DBUG_ENTER("ha_archive::init_archive_reader");
382
436
It is expensive to open and close the data files and since you can't have
383
437
a gzip file that can be both read and written we keep a writer open
384
438
that is shared amoung all open tables.
522
607
if (!(field->flags & AUTO_INCREMENT_FLAG))
610
DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
617
We reuse name_buff since it is available.
619
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
621
DBUG_PRINT("ha_archive", ("archive will create stream file %s",
622
create_info->data_file_name));
624
fn_format(name_buff, create_info->data_file_name, "", ARZ,
625
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
626
fn_format(linkname, name, "", ARZ,
627
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
631
fn_format(name_buff, name, "", ARZ,
632
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
578
Yes you need to do this, because the starting value
579
for the autoincrement may not be zero.
637
There is a chance that the file was "discovered". In this case
638
just use whatever file is there.
581
create_stream->auto_increment= auto_increment_value ?
582
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
640
if (!stat(name_buff, &file_stat))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
643
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
651
my_symlink(name_buff, linkname, MYF(0));
652
fn_format(name_buff, name, "", ".frm",
653
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
656
Here is where we open up the frm and pass it to archive to store
658
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
660
if (fstat(frm_file, &file_stat))
662
frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
665
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
666
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
667
my_free((uchar*)frm_ptr, MYF(0));
670
my_close(frm_file, MYF(0));
673
if (create_info->comment.str)
674
azwrite_comment(&create_stream, create_info->comment.str,
675
(unsigned int)create_info->comment.length);
678
Yes you need to do this, because the starting value
679
for the autoincrement may not be zero.
681
create_stream.auto_increment= stats.auto_increment_value ?
682
stats.auto_increment_value - 1 : 0;
683
if (azclose(&create_stream))
692
DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
693
DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
701
/* Return error number, if we got one */
702
DBUG_RETURN(error ? error : -1);
596
706
This is where the actual row is written out.
598
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
708
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
601
711
unsigned int r_pack_length;
712
DBUG_ENTER("ha_archive::real_write_row");
603
714
/* We pack the row for writing */
604
715
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
717
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
718
if (written != r_pack_length)
720
DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
722
(uint32)r_pack_length));
612
726
if (!delayed_insert || !bulk_insert)
613
727
share->dirty= true;
620
734
Calculate max length needed for row. This includes
621
735
the bytes required for the length in the header.
624
uint32_t ha_archive::max_row_length(const unsigned char *)
738
uint32 ha_archive::max_row_length(const uchar *buf __attribute__((__unused__)))
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
740
uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
627
741
length+= ARCHIVE_ROW_HEADER_SIZE;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
744
for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
748
length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
641
unsigned int ha_archive::pack_row(unsigned char *record)
755
unsigned int ha_archive::pack_row(uchar *record)
759
DBUG_ENTER("ha_archive::pack_row");
645
762
if (fix_rec_buff(max_row_length(record)))
646
return(HA_ERR_OUT_OF_MEM);
763
DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
648
765
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
766
memcpy(record_buffer->buffer, record, table->s->null_bytes);
767
ptr= record_buffer->buffer + table->s->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
769
for (Field **field=table->field ; *field ; field++)
654
771
if (!((*field)->is_null()))
655
772
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
775
DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
776
(ptr - record_buffer->buffer -
777
ARCHIVE_ROW_HEADER_SIZE)));
779
DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
663
784
Look at ha_archive::open() for an explanation of the row format.
664
785
Here we just write out the row.
666
787
Wondering about start_bulk_insert()? We don't implement it for
667
788
archive since it optimizes for lots of writes. The only save
668
for implementing start_bulk_insert() is that we could skip
789
for implementing start_bulk_insert() is that we could skip
669
790
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
792
int ha_archive::write_row(uchar *buf)
674
unsigned char *read_buf= NULL;
795
uchar *read_buf= NULL;
675
796
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
797
uchar *record= table->record[0];
798
DBUG_ENTER("ha_archive::write_row");
678
800
if (share->crashed)
679
return(HA_ERR_CRASHED_ON_USAGE);
801
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
803
ha_statistic_increment(&SSV::ha_write_count);
804
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
805
table->timestamp_field->set_time();
806
pthread_mutex_lock(&share->mutex);
683
808
if (share->archive_write_open == false)
684
809
if (init_archive_writer())
685
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
810
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
813
if (table->next_number_field && record == table->record[0])
815
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
816
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
817
temp_auto= table->next_number_field->val_int();
694
820
We don't support decremening auto_increment. They make the performance
697
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
823
if (temp_auto <= share->archive_write.auto_increment &&
824
mkey->flags & HA_NOSAME)
700
826
rc= HA_ERR_FOUND_DUPP_KEY;
831
Bad news, this will cause a search for the unique value which is very
832
expensive since we will have to do a table scan which will lock up
833
all other writers during this period. This could perhaps be optimized
838
First we create a buffer that we can use for reading rows, and can pass
841
if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
843
rc= HA_ERR_OUT_OF_MEM;
847
All of the buffer must be written out or we won't see all of the
850
azflush(&(share->archive_write), Z_SYNC_FLUSH);
852
Set the position of the local read thread to the beginning postion.
854
if (read_data_header(&archive))
856
rc= HA_ERR_CRASHED_ON_USAGE;
860
Field *mfield= table->next_number_field;
862
while (!(get_row(&archive, read_buf)))
864
if (!memcmp(read_buf + mfield->offset(record),
865
table->next_number_field->ptr,
866
mfield->max_display_length()))
868
rc= HA_ERR_FOUND_DUPP_KEY;
705
876
if (temp_auto > share->archive_write.auto_increment)
795
987
we assume the position will be set.
798
int ha_archive::doStartTableScan(bool scan)
990
int ha_archive::rnd_init(bool scan)
992
DBUG_ENTER("ha_archive::rnd_init");
800
994
if (share->crashed)
801
return(HA_ERR_CRASHED_ON_USAGE);
995
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
803
997
init_archive_reader();
805
999
/* We rewind the file so that we can read from the beginning if scan */
1002
DBUG_PRINT("info", ("archive will retrieve %llu rows",
1003
(uint64_t) scan_rows));
808
1005
if (read_data_header(&archive))
809
return(HA_ERR_CRASHED_ON_USAGE);
1006
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
817
This is the method that is used to read a row. It assumes that the row is
1014
This is the method that is used to read a row. It assumes that the row is
818
1015
positioned where you want it.
820
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
1017
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1020
DBUG_ENTER("ha_archive::get_row");
1021
DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1022
(uchar)file_to_read->version,
824
1024
if (file_to_read->version == ARCHIVE_VERSION)
825
1025
rc= get_row_version3(file_to_read, buf);
1029
DBUG_PRINT("ha_archive", ("Return %d\n", rc));
832
1034
/* Reallocate buffer if needed */
833
1035
bool ha_archive::fix_rec_buff(unsigned int length)
835
record_buffer.resize(length);
1037
DBUG_ENTER("ha_archive::fix_rec_buff");
1038
DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1039
length, record_buffer->length));
1040
DBUG_ASSERT(record_buffer->buffer);
1042
if (length > record_buffer->length)
1045
if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1047
MYF(MY_ALLOW_ZERO_PTR))))
1049
record_buffer->buffer= newptr;
1050
record_buffer->length= length;
1053
DBUG_ASSERT(length <= record_buffer->length);
840
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
1058
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1060
DBUG_ENTER("ha_archive::unpack_row");
842
1062
unsigned int read;
844
const unsigned char *ptr;
846
1066
read= azread_row(file_to_read, &error);
847
ptr= (const unsigned char *)file_to_read->row_ptr;
1067
ptr= (const uchar *)file_to_read->row_ptr;
849
1069
if (error || read == 0)
854
1074
/* Copy null bits */
855
memcpy(record, ptr, getTable()->getNullBytes());
856
ptr+= getTable()->getNullBytes();
857
for (Field **field= getTable()->getFields() ; *field ; field++)
1075
memcpy(record, ptr, table->s->null_bytes);
1076
ptr+= table->s->null_bytes;
1077
for (Field **field=table->field ; *field ; field++)
859
1079
if (!((*field)->is_null()))
861
ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
1081
ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
868
int ha_archive::get_row_version3(azio_stream *file_to_read, unsigned char *buf)
1088
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1090
DBUG_ENTER("ha_archive::get_row_version3");
870
1092
int returnable= unpack_row(file_to_read, buf);
1094
DBUG_RETURN(returnable);
877
1099
Called during ORDER BY. Its position is either from being called sequentially
878
1100
or by having had ha_archive::rnd_pos() called before it is called.
881
int ha_archive::rnd_next(unsigned char *buf)
1103
int ha_archive::rnd_next(uchar *buf)
1106
DBUG_ENTER("ha_archive::rnd_next");
885
1108
if (share->crashed)
886
return(HA_ERR_CRASHED_ON_USAGE);
1109
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
889
return(HA_ERR_END_OF_FILE);
1112
DBUG_RETURN(HA_ERR_END_OF_FILE);
892
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
1115
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
893
1116
current_position= aztell(&archive);
894
1117
rc= get_row(&archive, buf);
896
getTable()->status=rc ? STATUS_NOT_FOUND: 0;
1119
table->status=rc ? STATUS_NOT_FOUND: 0;
903
Thanks to the table bool is_ordered this will be called after
1126
Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
904
1127
each call to ha_archive::rnd_next() if an ordering of the rows is
908
void ha_archive::position(const unsigned char *)
1131
void ha_archive::position(const uchar *record __attribute__((__unused__)))
910
internal::my_store_ptr(ref, ref_length, current_position);
1133
DBUG_ENTER("ha_archive::position");
1134
my_store_ptr(ref, ref_length, current_position);
962
1192
share->archive_write_open= false;
966
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
if (proto_string == NULL)
971
azread_frm(&archive, proto_string);
973
1195
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
986
An extended rebuild is a lot more effort. We open up each row and re-record it.
987
Any dead rows are removed (aka rows that may have been partially recorded).
1196
fn_format(writer_filename, share->table_name, "", ARN,
1197
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1199
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1200
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1203
An extended rebuild is a lot more effort. We open up each row and re-record it.
1204
Any dead rows are removed (aka rows that may have been partially recorded).
989
1206
As of Archive format 3, this is the only type that is performed, before this
990
1207
version it was just done on T_EXTEND
1211
DBUG_PRINT("ha_archive", ("archive extended rebuild"));
995
Now we will rewind the archive file so that we are positioned at the
1214
Now we will rewind the archive file so that we are positioned at the
996
1215
start of the file.
998
1217
azflush(&archive, Z_SYNC_FLUSH);
999
1218
rc= read_data_header(&archive);
1002
1221
On success of writing out the new header, we now fetch each row and
1003
insert it into the new archive file.
1222
insert it into the new archive file.
1007
1227
uint64_t rows_restored;
1008
1228
share->rows_recorded= 0;
1009
1229
stats.auto_increment_value= 1;
1010
1230
share->archive_write.auto_increment= 0;
1231
my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1012
1233
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1235
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1237
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1242
real_write_row(table->record[0], &writer);
1023
1244
Long term it should be possible to optimize this so that
1024
1245
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1247
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1031
field->setReadSet();
1249
Field *field= table->found_next_number_field;
1033
1250
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1251
(uint64_t) field->val_int(table->record[0] +
1252
field->offset(table->record[0]));
1036
1253
if (share->archive_write.auto_increment < auto_value)
1037
1254
stats.auto_increment_value=
1038
1255
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1258
dbug_tmp_restore_column_map(table->read_set, org_bitmap);
1259
share->rows_recorded= (ha_rows)writer.rows;
1262
DBUG_PRINT("info", ("recovered %llu archive rows",
1263
(uint64_t)share->rows_recorded));
1265
DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1266
(uint64_t)share->rows_recorded));
1044
1268
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1275
share->dirty= false;
1053
1277
azclose(&archive);
1055
1279
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1280
rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1062
azclose(writer.get());
1285
DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1068
1292
Below is an example of how to setup row level locking.
1070
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1294
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1071
1295
THR_LOCK_DATA **to,
1072
1296
enum thr_lock_type lock_type)
1074
delayed_insert= false;
1298
if (lock_type == TL_WRITE_DELAYED)
1299
delayed_insert= true;
1301
delayed_insert= false;
1076
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1303
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1079
1306
Here is where we get into the guts of a row level lock.
1081
If we are not doing a LOCK Table or DISCARD/IMPORT
1082
TABLESPACE, then allow multiple writers
1308
If we are not doing a LOCK TABLE or DISCARD/IMPORT
1309
TABLESPACE, then allow multiple writers
1085
1312
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
lock_type <= TL_WRITE)
1087
&& ! session->doing_tablespace_operation())
1313
lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1314
&& !thd_tablespace_op(thd))
1088
1315
lock_type = TL_WRITE_ALLOW_WRITE;
1091
1318
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
1319
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
1320
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
1321
to t2. Convert the lock to a normal read lock to allow
1095
concurrent inserts to t2.
1322
concurrent inserts to t2.
1098
if (lock_type == TL_READ_NO_INSERT)
1325
if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1099
1326
lock_type = TL_READ;
1101
1328
lock.type=lock_type;
1158
1406
if (flag & HA_STATUS_AUTO)
1160
1408
init_archive_reader();
1161
pthread_mutex_lock(&share->mutex());
1409
pthread_mutex_lock(&share->mutex);
1162
1410
azflush(&archive, Z_SYNC_FLUSH);
1163
pthread_mutex_unlock(&share->mutex());
1411
pthread_mutex_unlock(&share->mutex);
1164
1412
stats.auto_increment_value= archive.auto_increment + 1;
1172
1420
This method tells us that a bulk insert operation is about to occur. We set
1173
a flag which will keep doInsertRecord from saying that its data is dirty. This in
1421
a flag which will keep write_row from saying that its data is dirty. This in
1174
1422
turn will keep selects from causing a sync to occur.
1175
1423
Basically, yet another optimizations to keep compression working well.
1177
1425
void ha_archive::start_bulk_insert(ha_rows rows)
1427
DBUG_ENTER("ha_archive::start_bulk_insert");
1179
1428
if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1180
1429
bulk_insert= true;
1186
1435
Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1187
1436
flag, and set the share dirty so that the next select will call sync for us.
1189
1438
int ha_archive::end_bulk_insert()
1440
DBUG_ENTER("ha_archive::end_bulk_insert");
1191
1441
bulk_insert= false;
1192
1442
share->dirty= true;
1197
1447
We cancel a truncate command. The only way to delete an archive table is to drop it.
1198
This is done for security reasons. In a later version we will enable this by
1448
This is done for security reasons. In a later version we will enable this by
1199
1449
allowing the user to select a different row format.
1201
1451
int ha_archive::delete_all_rows()
1203
return(HA_ERR_WRONG_COMMAND);
1453
DBUG_ENTER("ha_archive::delete_all_rows");
1454
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1458
We just return state if asked.
1460
bool ha_archive::is_crashed() const
1462
DBUG_ENTER("ha_archive::is_crashed");
1463
DBUG_RETURN(share->crashed);
1207
1467
Simple scan of the tables to make sure everything is ok.
1210
int ha_archive::check(Session* session)
1470
int ha_archive::check(THD* thd,
1471
HA_CHECK_OPT* check_opt __attribute__((__unused__)))
1213
1474
const char *old_proc_info;
1476
DBUG_ENTER("ha_archive::check");
1215
old_proc_info= session->get_proc_info();
1216
session->set_proc_info("Checking table");
1478
old_proc_info= thd_proc_info(thd, "Checking table");
1217
1479
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1480
pthread_mutex_lock(&share->mutex);
1219
1481
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1482
pthread_mutex_unlock(&share->mutex);
1223
Now we will rewind the archive file so that we are positioned at the
1485
Now we will rewind the archive file so that we are positioned at the
1224
1486
start of the file.
1226
1488
init_archive_reader();
1227
1489
azflush(&archive, Z_SYNC_FLUSH);
1228
1490
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1491
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1493
rc= get_row(&archive, table->record[0]);
1237
session->set_proc_info(old_proc_info);
1499
thd_proc_info(thd, old_proc_info);
1239
if ((rc && rc != HA_ERR_END_OF_FILE))
1501
if ((rc && rc != HA_ERR_END_OF_FILE))
1241
1503
share->crashed= false;
1242
return(HA_ADMIN_CORRUPT);
1504
DBUG_RETURN(HA_ADMIN_CORRUPT);
1246
return(HA_ADMIN_OK);
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const identifier::Table &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::identifier::Schema &schema_identifier,
1283
drizzled::identifier::Table::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1508
DBUG_RETURN(HA_ADMIN_OK);
1513
Check and repair the table if needed.
1515
bool ha_archive::check_and_repair(THD *thd)
1517
HA_CHECK_OPT check_opt;
1518
DBUG_ENTER("ha_archive::check_and_repair");
1522
DBUG_RETURN(repair(thd, &check_opt));
1525
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1527
DBUG_ENTER("ha_archive::create_record_buffer");
1528
archive_record_buffer *r;
1530
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1533
DBUG_RETURN(NULL); /* purecov: inspected */
1535
r->length= (int)length;
1537
if (!(r->buffer= (uchar*) my_malloc(r->length,
1540
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1541
DBUG_RETURN(NULL); /* purecov: inspected */
1547
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1549
DBUG_ENTER("ha_archive::destroy_record_buffer");
1550
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1551
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1555
static MYSQL_SYSVAR_BOOL(aio, archive_use_aio,
1556
PLUGIN_VAR_NOCMDOPT,
1557
"Whether or not to use asynchronous IO.",
1560
static struct st_mysql_sys_var* archive_system_variables[]= {
1565
struct st_mysql_storage_engine archive_storage_engine=
1566
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
1568
mysql_declare_plugin(archive)
1570
MYSQL_STORAGE_ENGINE_PLUGIN,
1571
&archive_storage_engine,
1573
"Brian Aker, MySQL AB",
1574
"Archive storage engine",
1576
archive_db_init, /* Plugin Init */
1577
archive_db_done, /* Plugin Deinit */
1579
NULL, /* status variables */
1580
archive_system_variables, /* system variables */
1581
NULL /* config options */
1583
mysql_declare_plugin_end;