13
12
You should have received a copy of the GNU General Public License
14
13
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
25
using namespace drizzled;
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17
#pragma implementation // gcc: Class implementation
20
#include <drizzled/common_includes.h>
21
#include <storage/myisam/myisam.h>
23
#include "ha_archive.h"
29
First, if you want to understand storage engines you should look at
30
ha_example.cc and ha_example.h.
26
First, if you want to understand storage engines you should look at
27
ha_example.cc and ha_example.h.
32
29
This example was written as a test case for a customer who needed
33
30
a storage engine without indexes that could compress data very well.
34
31
So, welcome to a completely compressed storage engine. This storage
35
engine only does inserts. No replace, deletes, or updates. All reads are
32
engine only does inserts. No replace, deletes, or updates. All reads are
36
33
complete table scans. Compression is done through a combination of packing
37
34
and making use of the zlib library
39
36
We keep a file pointer open for each instance of ha_archive for each read
40
37
but for writes we keep one open file handle just for that. We flush it
41
38
only if we have a read occur. azip handles compressing lots of records
45
42
the same time since we would want to flush).
47
44
A "meta" file is kept alongside the data file. This file serves two purpose.
48
The first purpose is to track the number of rows in the table. The second
49
purpose is to determine if the table was closed properly or not. When the
50
meta file is first opened it is marked as dirty. It is opened when the table
51
itself is opened for writing. When the table is closed the new count for rows
52
is written to the meta file and the file is marked as clean. If the meta file
53
is opened and it is marked as dirty, it is assumed that a crash occured. At
45
The first purpose is to track the number of rows in the table. The second
46
purpose is to determine if the table was closed properly or not. When the
47
meta file is first opened it is marked as dirty. It is opened when the table
48
itself is opened for writing. When the table is closed the new count for rows
49
is written to the meta file and the file is marked as clean. If the meta file
50
is opened and it is marked as dirty, it is assumed that a crash occured. At
54
51
this point an error occurs and the user is told to rebuild the file.
55
52
A rebuild scans the rows and rewrites the meta file. If corruption is found
56
53
in the data file then the meta file is not repaired.
58
55
At some point a recovery method for such a drastic case needs to be divised.
60
Locks are row level, and you will get a consistant read.
57
Locks are row level, and you will get a consistant read.
62
59
For performance as far as table scans go it is quite fast. I don't have
63
60
good numbers but locally it has out performed both Innodb and MyISAM. For
64
61
Innodb the question will be if the table can be fit into the buffer
65
62
pool. For MyISAM its a question of how much the file system caches the
66
63
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
doesn't have enough memory to cache entire table that archive turns out
64
doesn't have enough memory to cache entire table that archive turns out
70
67
Examples between MyISAM (packed) and Archive.
110
128
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const identifier::Table &identifier,
152
drizzled::message::Table &table_proto)
154
struct stat stat_info;
158
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
161
proto_path.append(ARZ);
163
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
if (proto_string == NULL)
177
azclose(proto_stream.get());
181
azread_frm(proto_stream.get(), proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
130
static handler *archive_create_handler(handlerton *hton,
134
return new (mem_root) ha_archive(hton, table);
138
Used for hash table that tracks open tables.
140
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141
bool not_used __attribute__((unused)))
143
*length=share->table_name_length;
144
return (uchar*) share->table_name;
149
Initialize the archive handler.
160
int archive_db_init(void *p)
162
handlerton *archive_hton;
164
archive_hton= (handlerton *)p;
165
archive_hton->state= SHOW_OPTION_YES;
166
archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
167
archive_hton->create= archive_create_handler;
168
archive_hton->flags= HTON_NO_FLAGS;
169
archive_hton->discover= archive_discover;
171
/* When the engine starts up set the first version */
174
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
176
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
177
(hash_get_key) archive_get_key, 0, 0))
179
VOID(pthread_mutex_destroy(&archive_mutex));
190
Release the archive handler.
200
int archive_db_done(void *p __attribute__((unused)))
202
hash_free(&archive_open_tables);
203
VOID(pthread_mutex_destroy(&archive_mutex));
209
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
210
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
204
212
/* Set our original buffer from pre-allocated memory */
205
213
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
215
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
216
ref_length= sizeof(my_off_t);
209
217
archive_reader_open= false;
220
int archive_discover(handlerton *hton __attribute__((unused)),
221
THD* thd __attribute__((unused)),
227
azio_stream frm_stream;
228
char az_file[FN_REFLEN];
230
struct stat file_stat;
232
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
234
if (stat(az_file, &file_stat))
237
if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
239
if (errno == EROFS || errno == EACCES)
240
return(my_errno= errno);
241
return(HA_ERR_CRASHED_ON_USAGE);
244
if (frm_stream.frm_length == 0)
247
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
248
azread_frm(&frm_stream, frm_ptr);
249
azclose(&frm_stream);
251
*frmlen= frm_stream.frm_length;
252
*frmblob= (uchar*) frm_ptr;
213
261
This method reads the header of a datafile and returns whether or not it was successful.
226
ArchiveShare::ArchiveShare():
227
use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
233
ArchiveShare::ArchiveShare(const char *name):
234
use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
237
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
242
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
247
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
252
We need to make sure we don't reset the crashed state.
253
If we open a crashed file, wee need to close it as crashed unless
254
it has been repaired.
255
Since we will close the data down after this, we go on and count
258
if (archive_write_open == true)
259
(void)azclose(&archive_write);
262
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
267
We read the meta file, but do not mark it dirty. Since we are not
268
doing a write we won't mark it dirty (and we won't open it for
269
anything but reading... open it for write and we will generate null
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
279
if (version < global_version)
281
version_rows= rows_recorded;
282
version= global_version;
284
azclose(archive_tmp.get());
291
We create the shared memory space that we will use for the open table.
276
We create the shared memory space that we will use for the open table.
292
277
No matter what we try to get or create a share. This is so that a repair
293
table operation can occur.
278
table operation can occur.
295
280
See ha_example.cc for a longer description.
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
282
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
303
share= a_engine->findOpenTable(table_name);
286
pthread_mutex_lock(&archive_mutex);
287
length=(uint) strlen(table_name);
289
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
307
share= new ArchiveShare(table_name);
294
azio_stream archive_tmp;
296
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
297
&share, sizeof(*share),
311
pthread_mutex_unlock(&a_engine->mutex());
301
pthread_mutex_unlock(&archive_mutex);
312
302
*rc= HA_ERR_OUT_OF_MEM;
316
if (share->prime(&stats.auto_increment_value) == false)
307
share->table_name_length= length;
308
share->table_name= tmp_name;
309
share->crashed= false;
310
share->archive_write_open= false;
311
fn_format(share->data_file_name, table_name, "",
312
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
313
stpcpy(share->table_name, table_name);
315
We will use this lock for rows.
317
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
320
We read the meta file, but do not mark it dirty. Since we are not
321
doing a write we won't mark it dirty (and we won't open it for
322
anything but reading... open it for write and we will generate null
325
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
318
pthread_mutex_unlock(&a_engine->mutex());
328
VOID(pthread_mutex_destroy(&share->mutex));
330
pthread_mutex_unlock(&archive_mutex);
319
331
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
334
stats.auto_increment_value= archive_tmp.auto_increment + 1;
335
share->rows_recorded= (ha_rows)archive_tmp.rows;
336
share->crashed= archive_tmp.dirty;
337
if (share->version < global_version)
339
share->version_rows= share->rows_recorded;
340
share->version= global_version;
342
azclose(&archive_tmp);
344
VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
345
thr_lock_init(&share->lock);
328
347
share->use_count++;
330
348
if (share->crashed)
331
349
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
350
pthread_mutex_unlock(&archive_mutex);
340
358
See ha_example.cc for a description.
342
360
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
364
pthread_mutex_lock(&archive_mutex);
347
365
if (!--share->use_count)
349
a_engine->deleteOpenTable(share->table_name);
367
hash_delete(&archive_open_tables, (uchar*) share);
368
thr_lock_delete(&share->lock);
369
VOID(pthread_mutex_destroy(&share->mutex));
371
We need to make sure we don't reset the crashed state.
372
If we open a crashed file, wee need to close it as crashed unless
373
it has been repaired.
374
Since we will close the data down after this, we go on and count
377
if (share->archive_write_open == true)
379
if (azclose(&(share->archive_write)))
382
my_free((uchar*) share, MYF(0));
352
pthread_mutex_unlock(&a_engine->mutex());
384
pthread_mutex_unlock(&archive_mutex);
357
389
int ha_archive::init_archive_writer()
360
392
It is expensive to open and close the data files and since you can't have
361
393
a gzip file that can be both read and written we keep a writer open
362
394
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
365
O_RDWR, AZ_METHOD_BLOCK)))
396
if (!(azopen(&(share->archive_write), share->data_file_name,
397
O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
367
399
share->crashed= true;
492
We create our data file here. The format is pretty simple.
544
We create our data file here. The format is pretty simple.
493
545
You can read about the format of the data file above.
494
Unlike other storage engines we do not "pack" our data. Since we
495
are about to do a general compression, packing would just be a waste of
496
CPU time. If the table has blobs they are written after the row in the order
546
Unlike other storage engines we do not "pack" our data. Since we
547
are about to do a general compression, packing would just be a waste of
548
CPU time. If the table has blobs they are written after the row in the order
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::identifier::Table &identifier,
503
drizzled::message::Table& proto)
552
int ha_archive::create(const char *name, Table *table_arg,
553
HA_CREATE_INFO *create_info)
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
uint64_t auto_increment_value;
508
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
555
char name_buff[FN_REFLEN];
556
char linkname[FN_REFLEN];
558
azio_stream create_stream; /* Archive file we are working with */
559
File frm_file; /* File handler for readers */
560
struct stat file_stat;
563
stats.auto_increment_value= create_info->auto_increment_value;
565
for (uint key= 0; key < table_arg->sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
567
KEY *pos= table_arg->key_info+key;
568
KEY_PART_INFO *key_part= pos->key_part;
569
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
518
571
for (; key_part != key_part_end; key_part++)
522
575
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
584
We reuse name_buff since it is available.
586
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
588
fn_format(name_buff, create_info->data_file_name, "", ARZ,
589
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
590
fn_format(linkname, name, "", ARZ,
591
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
595
fn_format(name_buff, name, "", ARZ,
596
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
578
Yes you need to do this, because the starting value
579
for the autoincrement may not be zero.
601
There is a chance that the file was "discovered". In this case
602
just use whatever file is there.
581
create_stream->auto_increment= auto_increment_value ?
582
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
604
if (!stat(name_buff, &file_stat))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
607
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
615
my_symlink(name_buff, linkname, MYF(0));
616
fn_format(name_buff, name, "", ".frm",
617
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
620
Here is where we open up the frm and pass it to archive to store
622
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
624
if (fstat(frm_file, &file_stat))
626
frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
629
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
630
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
631
my_free((uchar*)frm_ptr, MYF(0));
634
my_close(frm_file, MYF(0));
637
if (create_info->comment.str)
638
azwrite_comment(&create_stream, create_info->comment.str,
639
(unsigned int)create_info->comment.length);
642
Yes you need to do this, because the starting value
643
for the autoincrement may not be zero.
645
create_stream.auto_increment= stats.auto_increment_value ?
646
stats.auto_increment_value - 1 : 0;
647
if (azclose(&create_stream))
661
/* Return error number, if we got one */
662
return(error ? error : -1);
596
666
This is where the actual row is written out.
598
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
668
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
601
671
unsigned int r_pack_length;
603
673
/* We pack the row for writing */
604
674
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
676
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
677
if (written != r_pack_length)
620
690
Calculate max length needed for row. This includes
621
691
the bytes required for the length in the header.
624
uint32_t ha_archive::max_row_length(const unsigned char *)
694
uint32_t ha_archive::max_row_length(const uchar *buf __attribute__((unused)))
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
696
uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
697
length+= ARCHIVE_ROW_HEADER_SIZE;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
700
for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
704
length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
641
unsigned int ha_archive::pack_row(unsigned char *record)
711
unsigned int ha_archive::pack_row(uchar *record)
645
715
if (fix_rec_buff(max_row_length(record)))
646
return(HA_ERR_OUT_OF_MEM);
716
return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
648
718
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
719
memcpy(record_buffer->buffer, record, table->s->null_bytes);
720
ptr= record_buffer->buffer + table->s->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
722
for (Field **field=table->field ; *field ; field++)
654
724
if (!((*field)->is_null()))
655
725
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
728
return((unsigned int) (ptr - record_buffer->buffer));
663
733
Look at ha_archive::open() for an explanation of the row format.
664
734
Here we just write out the row.
666
736
Wondering about start_bulk_insert()? We don't implement it for
667
737
archive since it optimizes for lots of writes. The only save
668
for implementing start_bulk_insert() is that we could skip
738
for implementing start_bulk_insert() is that we could skip
669
739
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
741
int ha_archive::write_row(uchar *buf)
674
unsigned char *read_buf= NULL;
744
uchar *read_buf= NULL;
675
745
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
746
uchar *record= table->record[0];
678
748
if (share->crashed)
679
749
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
751
ha_statistic_increment(&SSV::ha_write_count);
752
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
753
table->timestamp_field->set_time();
754
pthread_mutex_lock(&share->mutex);
683
756
if (share->archive_write_open == false)
684
757
if (init_archive_writer())
685
758
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
761
if (table->next_number_field && record == table->record[0])
763
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
764
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
765
temp_auto= table->next_number_field->val_int();
694
768
We don't support decremening auto_increment. They make the performance
697
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
771
if (temp_auto <= share->archive_write.auto_increment &&
772
mkey->flags & HA_NOSAME)
700
774
rc= HA_ERR_FOUND_DUPP_KEY;
779
Bad news, this will cause a search for the unique value which is very
780
expensive since we will have to do a table scan which will lock up
781
all other writers during this period. This could perhaps be optimized
786
First we create a buffer that we can use for reading rows, and can pass
789
if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
791
rc= HA_ERR_OUT_OF_MEM;
795
All of the buffer must be written out or we won't see all of the
798
azflush(&(share->archive_write), Z_SYNC_FLUSH);
800
Set the position of the local read thread to the beginning postion.
802
if (read_data_header(&archive))
804
rc= HA_ERR_CRASHED_ON_USAGE;
808
Field *mfield= table->next_number_field;
810
while (!(get_row(&archive, read_buf)))
812
if (!memcmp(read_buf + mfield->offset(record),
813
table->next_number_field->ptr,
814
mfield->max_display_length()))
816
rc= HA_ERR_FOUND_DUPP_KEY;
705
824
if (temp_auto > share->archive_write.auto_increment)
715
834
share->rows_recorded++;
716
835
rc= real_write_row(buf, &(share->archive_write));
718
pthread_mutex_unlock(&share->mutex());
837
pthread_mutex_unlock(&share->mutex);
720
free((unsigned char*) read_buf);
839
my_free((uchar*) read_buf, MYF(0));
726
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
727
uint64_t *first_value, uint64_t *nb_reserved_values)
845
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
846
uint64_t increment __attribute__((unused)),
847
uint64_t nb_desired_values __attribute__((unused)),
848
uint64_t *first_value __attribute__((unused)),
849
uint64_t *nb_reserved_values __attribute__((unused)))
729
851
*nb_reserved_values= UINT64_MAX;
730
852
*first_value= share->archive_write.auto_increment + 1;
733
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
855
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
856
int ha_archive::index_init(uint keynr, bool sorted __attribute__((unused)))
736
858
active_index= keynr;
962
1111
share->archive_write_open= false;
966
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
if (proto_string == NULL)
971
azread_frm(&archive, proto_string);
973
1114
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
986
An extended rebuild is a lot more effort. We open up each row and re-record it.
987
Any dead rows are removed (aka rows that may have been partially recorded).
1115
fn_format(writer_filename, share->table_name, "", ARN,
1116
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1118
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1119
return(HA_ERR_CRASHED_ON_USAGE);
1122
An extended rebuild is a lot more effort. We open up each row and re-record it.
1123
Any dead rows are removed (aka rows that may have been partially recorded).
989
1125
As of Archive format 3, this is the only type that is performed, before this
990
1126
version it was just done on T_EXTEND
1012
1149
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1151
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1153
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1158
real_write_row(table->record[0], &writer);
1023
1160
Long term it should be possible to optimize this so that
1024
1161
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1163
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1031
field->setReadSet();
1165
Field *field= table->found_next_number_field;
1033
1166
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1167
(uint64_t) field->val_int(table->record[0] +
1168
field->offset(table->record[0]));
1036
1169
if (share->archive_write.auto_increment < auto_value)
1037
1170
stats.auto_increment_value=
1038
1171
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1174
share->rows_recorded= (ha_rows)writer.rows;
1044
1177
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1184
share->dirty= false;
1053
1186
azclose(&archive);
1055
1188
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1189
rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1062
azclose(writer.get());
1068
1200
Below is an example of how to setup row level locking.
1070
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1202
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1071
1203
THR_LOCK_DATA **to,
1072
1204
enum thr_lock_type lock_type)
1074
delayed_insert= false;
1206
if (lock_type == TL_WRITE_DELAYED)
1207
delayed_insert= true;
1209
delayed_insert= false;
1076
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1211
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1079
1214
Here is where we get into the guts of a row level lock.
1081
1216
If we are not doing a LOCK Table or DISCARD/IMPORT
1082
TABLESPACE, then allow multiple writers
1217
TABLESPACE, then allow multiple writers
1085
1220
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
lock_type <= TL_WRITE)
1087
&& ! session->doing_tablespace_operation())
1221
lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1222
&& !thd_tablespace_op(thd))
1088
1223
lock_type = TL_WRITE_ALLOW_WRITE;
1091
1226
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
1227
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
1228
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
1229
to t2. Convert the lock to a normal read lock to allow
1095
concurrent inserts to t2.
1230
concurrent inserts to t2.
1098
if (lock_type == TL_READ_NO_INSERT)
1233
if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1099
1234
lock_type = TL_READ;
1101
1236
lock.type=lock_type;
1357
We just return state if asked.
1359
bool ha_archive::is_crashed() const
1361
return(share->crashed);
1207
1365
Simple scan of the tables to make sure everything is ok.
1210
int ha_archive::check(Session* session)
1368
int ha_archive::check(THD* thd,
1369
HA_CHECK_OPT* check_opt __attribute__((unused)))
1213
1372
const char *old_proc_info;
1215
old_proc_info= session->get_proc_info();
1216
session->set_proc_info("Checking table");
1375
old_proc_info= thd_proc_info(thd, "Checking table");
1217
1376
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1377
pthread_mutex_lock(&share->mutex);
1219
1378
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1379
pthread_mutex_unlock(&share->mutex);
1223
Now we will rewind the archive file so that we are positioned at the
1382
Now we will rewind the archive file so that we are positioned at the
1224
1383
start of the file.
1226
1385
init_archive_reader();
1227
1386
azflush(&archive, Z_SYNC_FLUSH);
1228
1387
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1388
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1390
rc= get_row(&archive, table->record[0]);
1237
session->set_proc_info(old_proc_info);
1396
thd_proc_info(thd, old_proc_info);
1239
if ((rc && rc != HA_ERR_END_OF_FILE))
1398
if ((rc && rc != HA_ERR_END_OF_FILE))
1241
1400
share->crashed= false;
1242
1401
return(HA_ADMIN_CORRUPT);
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const identifier::Table &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::identifier::Schema &schema_identifier,
1283
drizzled::identifier::Table::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1410
Check and repair the table if needed.
1412
bool ha_archive::check_and_repair(THD *thd)
1414
HA_CHECK_OPT check_opt;
1418
return(repair(thd, &check_opt));
1421
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1423
archive_record_buffer *r;
1425
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1428
return(NULL); /* purecov: inspected */
1430
r->length= (int)length;
1432
if (!(r->buffer= (uchar*) my_malloc(r->length,
1435
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1436
return(NULL); /* purecov: inspected */
1442
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1444
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1445
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1449
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1450
PLUGIN_VAR_NOCMDOPT,
1451
"Whether or not to use asynchronous IO.",
1454
static struct st_mysql_sys_var* archive_system_variables[]= {
1455
DRIZZLE_SYSVAR(aio),
1459
mysql_declare_plugin(archive)
1461
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1464
"Brian Aker, MySQL AB",
1465
"Archive storage engine",
1467
archive_db_init, /* Plugin Init */
1468
archive_db_done, /* Plugin Deinit */
1469
NULL, /* status variables */
1470
archive_system_variables, /* system variables */
1471
NULL /* config options */
1473
mysql_declare_plugin_end;