13
12
You should have received a copy of the GNU General Public License
14
13
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include <plugin/archive/archive_engine.h>
22
#include <boost/scoped_ptr.hpp>
25
using namespace drizzled;
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17
#pragma implementation // gcc: Class implementation
20
#include <drizzled/common_includes.h>
21
#include <storage/myisam/myisam.h>
23
#include "ha_archive.h"
29
First, if you want to understand storage engines you should look at
30
ha_example.cc and ha_example.h.
26
First, if you want to understand storage engines you should look at
27
ha_example.cc and ha_example.h.
32
29
This example was written as a test case for a customer who needed
33
30
a storage engine without indexes that could compress data very well.
34
31
So, welcome to a completely compressed storage engine. This storage
35
engine only does inserts. No replace, deletes, or updates. All reads are
32
engine only does inserts. No replace, deletes, or updates. All reads are
36
33
complete table scans. Compression is done through a combination of packing
37
34
and making use of the zlib library
39
36
We keep a file pointer open for each instance of ha_archive for each read
40
37
but for writes we keep one open file handle just for that. We flush it
41
38
only if we have a read occur. azip handles compressing lots of records
45
42
the same time since we would want to flush).
47
44
A "meta" file is kept alongside the data file. This file serves two purpose.
48
The first purpose is to track the number of rows in the table. The second
49
purpose is to determine if the table was closed properly or not. When the
50
meta file is first opened it is marked as dirty. It is opened when the table
51
itself is opened for writing. When the table is closed the new count for rows
52
is written to the meta file and the file is marked as clean. If the meta file
53
is opened and it is marked as dirty, it is assumed that a crash occured. At
45
The first purpose is to track the number of rows in the table. The second
46
purpose is to determine if the table was closed properly or not. When the
47
meta file is first opened it is marked as dirty. It is opened when the table
48
itself is opened for writing. When the table is closed the new count for rows
49
is written to the meta file and the file is marked as clean. If the meta file
50
is opened and it is marked as dirty, it is assumed that a crash occured. At
54
51
this point an error occurs and the user is told to rebuild the file.
55
52
A rebuild scans the rows and rewrites the meta file. If corruption is found
56
53
in the data file then the meta file is not repaired.
58
55
At some point a recovery method for such a drastic case needs to be divised.
60
Locks are row level, and you will get a consistant read.
57
Locks are row level, and you will get a consistant read.
62
59
For performance as far as table scans go it is quite fast. I don't have
63
60
good numbers but locally it has out performed both Innodb and MyISAM. For
64
61
Innodb the question will be if the table can be fit into the buffer
65
62
pool. For MyISAM its a question of how much the file system caches the
66
63
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
doesn't have enough memory to cache entire table that archive turns out
64
doesn't have enough memory to cache entire table that archive turns out
70
67
Examples between MyISAM (packed) and Archive.
110
128
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const identifier::Table &identifier,
152
drizzled::message::Table &table_proto)
154
struct stat stat_info;
158
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
161
proto_path.append(ARZ);
163
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
if (proto_string == NULL)
177
azclose(proto_stream.get());
181
azread_frm(proto_stream.get(), proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
130
static handler *archive_create_handler(handlerton *hton,
134
return new (mem_root) ha_archive(hton, table);
138
Used for hash table that tracks open tables.
140
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141
bool not_used __attribute__((unused)))
143
*length=share->table_name_length;
144
return (uchar*) share->table_name;
149
Initialize the archive handler.
160
int archive_db_init(void *p)
162
handlerton *archive_hton;
164
archive_hton= (handlerton *)p;
165
archive_hton->state= SHOW_OPTION_YES;
166
archive_hton->create= archive_create_handler;
167
archive_hton->flags= HTON_NO_FLAGS;
168
archive_hton->discover= archive_discover;
170
/* When the engine starts up set the first version */
173
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
175
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
176
(hash_get_key) archive_get_key, 0, 0))
178
pthread_mutex_destroy(&archive_mutex);
189
Release the archive handler.
199
int archive_db_done(void *p __attribute__((unused)))
201
hash_free(&archive_open_tables);
202
pthread_mutex_destroy(&archive_mutex);
208
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
209
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
204
211
/* Set our original buffer from pre-allocated memory */
205
212
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
214
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
215
ref_length= sizeof(my_off_t);
209
216
archive_reader_open= false;
219
int archive_discover(handlerton *hton __attribute__((unused)),
220
THD* thd __attribute__((unused)),
226
azio_stream frm_stream;
227
char az_file[FN_REFLEN];
229
struct stat file_stat;
231
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
233
if (stat(az_file, &file_stat))
236
if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
238
if (errno == EROFS || errno == EACCES)
239
return(my_errno= errno);
240
return(HA_ERR_CRASHED_ON_USAGE);
243
if (frm_stream.frm_length == 0)
246
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
247
azread_frm(&frm_stream, frm_ptr);
248
azclose(&frm_stream);
250
*frmlen= frm_stream.frm_length;
251
*frmblob= (uchar*) frm_ptr;
213
260
This method reads the header of a datafile and returns whether or not it was successful.
226
ArchiveShare::ArchiveShare():
227
use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
233
ArchiveShare::ArchiveShare(const char *name):
234
use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
237
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
242
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
247
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
252
We need to make sure we don't reset the crashed state.
253
If we open a crashed file, wee need to close it as crashed unless
254
it has been repaired.
255
Since we will close the data down after this, we go on and count
258
if (archive_write_open == true)
259
(void)azclose(&archive_write);
262
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
267
We read the meta file, but do not mark it dirty. Since we are not
268
doing a write we won't mark it dirty (and we won't open it for
269
anything but reading... open it for write and we will generate null
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
279
if (version < global_version)
281
version_rows= rows_recorded;
282
version= global_version;
284
azclose(archive_tmp.get());
291
We create the shared memory space that we will use for the open table.
275
We create the shared memory space that we will use for the open table.
292
276
No matter what we try to get or create a share. This is so that a repair
293
table operation can occur.
277
table operation can occur.
295
279
See ha_example.cc for a longer description.
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
281
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
303
share= a_engine->findOpenTable(table_name);
285
pthread_mutex_lock(&archive_mutex);
286
length=(uint) strlen(table_name);
288
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
307
share= new ArchiveShare(table_name);
293
azio_stream archive_tmp;
295
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
296
&share, sizeof(*share),
311
pthread_mutex_unlock(&a_engine->mutex());
300
pthread_mutex_unlock(&archive_mutex);
312
301
*rc= HA_ERR_OUT_OF_MEM;
316
if (share->prime(&stats.auto_increment_value) == false)
306
share->table_name_length= length;
307
share->table_name= tmp_name;
308
share->crashed= false;
309
share->archive_write_open= false;
310
fn_format(share->data_file_name, table_name, "",
311
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
312
my_stpcpy(share->table_name, table_name);
314
We will use this lock for rows.
316
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
319
We read the meta file, but do not mark it dirty. Since we are not
320
doing a write we won't mark it dirty (and we won't open it for
321
anything but reading... open it for write and we will generate null
324
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
318
pthread_mutex_unlock(&a_engine->mutex());
327
pthread_mutex_destroy(&share->mutex);
329
pthread_mutex_unlock(&archive_mutex);
319
330
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
333
stats.auto_increment_value= archive_tmp.auto_increment + 1;
334
share->rows_recorded= (ha_rows)archive_tmp.rows;
335
share->crashed= archive_tmp.dirty;
336
if (share->version < global_version)
338
share->version_rows= share->rows_recorded;
339
share->version= global_version;
341
azclose(&archive_tmp);
343
my_hash_insert(&archive_open_tables, (uchar*) share);
344
thr_lock_init(&share->lock);
328
346
share->use_count++;
330
347
if (share->crashed)
331
348
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
349
pthread_mutex_unlock(&archive_mutex);
340
357
See ha_example.cc for a description.
342
359
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
363
pthread_mutex_lock(&archive_mutex);
347
364
if (!--share->use_count)
349
a_engine->deleteOpenTable(share->table_name);
366
hash_delete(&archive_open_tables, (uchar*) share);
367
thr_lock_delete(&share->lock);
368
pthread_mutex_destroy(&share->mutex);
370
We need to make sure we don't reset the crashed state.
371
If we open a crashed file, wee need to close it as crashed unless
372
it has been repaired.
373
Since we will close the data down after this, we go on and count
376
if (share->archive_write_open == true)
378
if (azclose(&(share->archive_write)))
381
my_free((uchar*) share, MYF(0));
352
pthread_mutex_unlock(&a_engine->mutex());
383
pthread_mutex_unlock(&archive_mutex);
357
388
int ha_archive::init_archive_writer()
360
391
It is expensive to open and close the data files and since you can't have
361
392
a gzip file that can be both read and written we keep a writer open
362
393
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
365
O_RDWR, AZ_METHOD_BLOCK)))
395
if (!(azopen(&(share->archive_write), share->data_file_name,
396
O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
367
398
share->crashed= true;
492
We create our data file here. The format is pretty simple.
543
We create our data file here. The format is pretty simple.
493
544
You can read about the format of the data file above.
494
Unlike other storage engines we do not "pack" our data. Since we
495
are about to do a general compression, packing would just be a waste of
496
CPU time. If the table has blobs they are written after the row in the order
545
Unlike other storage engines we do not "pack" our data. Since we
546
are about to do a general compression, packing would just be a waste of
547
CPU time. If the table has blobs they are written after the row in the order
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::identifier::Table &identifier,
503
drizzled::message::Table& proto)
551
int ha_archive::create(const char *name, Table *table_arg,
552
HA_CREATE_INFO *create_info)
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
uint64_t auto_increment_value;
508
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
554
char name_buff[FN_REFLEN];
555
char linkname[FN_REFLEN];
557
azio_stream create_stream; /* Archive file we are working with */
558
File frm_file; /* File handler for readers */
559
struct stat file_stat;
562
stats.auto_increment_value= create_info->auto_increment_value;
564
for (uint key= 0; key < table_arg->sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
566
KEY *pos= table_arg->key_info+key;
567
KEY_PART_INFO *key_part= pos->key_part;
568
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
518
570
for (; key_part != key_part_end; key_part++)
522
574
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
583
We reuse name_buff since it is available.
585
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
587
fn_format(name_buff, create_info->data_file_name, "", ARZ,
588
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
589
fn_format(linkname, name, "", ARZ,
590
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
594
fn_format(name_buff, name, "", ARZ,
595
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
578
Yes you need to do this, because the starting value
579
for the autoincrement may not be zero.
600
There is a chance that the file was "discovered". In this case
601
just use whatever file is there.
581
create_stream->auto_increment= auto_increment_value ?
582
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
603
if (!stat(name_buff, &file_stat))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
606
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
614
my_symlink(name_buff, linkname, MYF(0));
615
fn_format(name_buff, name, "", ".frm",
616
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
619
Here is where we open up the frm and pass it to archive to store
621
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
623
if (fstat(frm_file, &file_stat))
625
frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
628
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
629
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
630
my_free((uchar*)frm_ptr, MYF(0));
633
my_close(frm_file, MYF(0));
636
if (create_info->comment.str)
637
azwrite_comment(&create_stream, create_info->comment.str,
638
(unsigned int)create_info->comment.length);
641
Yes you need to do this, because the starting value
642
for the autoincrement may not be zero.
644
create_stream.auto_increment= stats.auto_increment_value ?
645
stats.auto_increment_value - 1 : 0;
646
if (azclose(&create_stream))
660
/* Return error number, if we got one */
661
return(error ? error : -1);
596
665
This is where the actual row is written out.
598
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
667
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
601
670
unsigned int r_pack_length;
603
672
/* We pack the row for writing */
604
673
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
675
written= azwrite_row(writer, record_buffer->buffer, r_pack_length);
607
676
if (written != r_pack_length)
620
689
Calculate max length needed for row. This includes
621
690
the bytes required for the length in the header.
624
uint32_t ha_archive::max_row_length(const unsigned char *)
693
uint32_t ha_archive::max_row_length(const uchar *buf __attribute__((unused)))
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
695
uint32_t length= (uint32_t)(table->getRecordLength() + table->sizeFields()*2);
627
696
length+= ARCHIVE_ROW_HEADER_SIZE;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
699
for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
703
length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
641
unsigned int ha_archive::pack_row(unsigned char *record)
710
unsigned int ha_archive::pack_row(uchar *record)
645
714
if (fix_rec_buff(max_row_length(record)))
646
return(HA_ERR_OUT_OF_MEM);
715
return(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
648
717
/* Copy null bits */
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
718
memcpy(record_buffer->buffer, record, table->s->null_bytes);
719
ptr= record_buffer->buffer + table->s->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
721
for (Field **field=table->field ; *field ; field++)
654
723
if (!((*field)->is_null()))
655
724
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
727
return((unsigned int) (ptr - record_buffer->buffer));
663
732
Look at ha_archive::open() for an explanation of the row format.
664
733
Here we just write out the row.
666
735
Wondering about start_bulk_insert()? We don't implement it for
667
736
archive since it optimizes for lots of writes. The only save
668
for implementing start_bulk_insert() is that we could skip
737
for implementing start_bulk_insert() is that we could skip
669
738
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
740
int ha_archive::write_row(uchar *buf)
674
unsigned char *read_buf= NULL;
743
uchar *read_buf= NULL;
675
744
uint64_t temp_auto;
676
unsigned char *record= getTable()->getInsertRecord();
745
uchar *record= table->record[0];
678
747
if (share->crashed)
679
748
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
750
ha_statistic_increment(&SSV::ha_write_count);
751
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
752
table->timestamp_field->set_time();
753
pthread_mutex_lock(&share->mutex);
683
755
if (share->archive_write_open == false)
684
756
if (init_archive_writer())
685
757
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
760
if (table->next_number_field && record == table->record[0])
762
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
690
763
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
764
temp_auto= table->next_number_field->val_int();
694
767
We don't support decremening auto_increment. They make the performance
697
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
770
if (temp_auto <= share->archive_write.auto_increment &&
771
mkey->flags & HA_NOSAME)
700
773
rc= HA_ERR_FOUND_DUPP_KEY;
778
Bad news, this will cause a search for the unique value which is very
779
expensive since we will have to do a table scan which will lock up
780
all other writers during this period. This could perhaps be optimized
785
First we create a buffer that we can use for reading rows, and can pass
788
if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
790
rc= HA_ERR_OUT_OF_MEM;
794
All of the buffer must be written out or we won't see all of the
797
azflush(&(share->archive_write), Z_SYNC_FLUSH);
799
Set the position of the local read thread to the beginning postion.
801
if (read_data_header(&archive))
803
rc= HA_ERR_CRASHED_ON_USAGE;
807
Field *mfield= table->next_number_field;
809
while (!(get_row(&archive, read_buf)))
811
if (!memcmp(read_buf + mfield->offset(record),
812
table->next_number_field->ptr,
813
mfield->max_display_length()))
815
rc= HA_ERR_FOUND_DUPP_KEY;
705
823
if (temp_auto > share->archive_write.auto_increment)
715
833
share->rows_recorded++;
716
834
rc= real_write_row(buf, &(share->archive_write));
718
pthread_mutex_unlock(&share->mutex());
836
pthread_mutex_unlock(&share->mutex);
720
free((unsigned char*) read_buf);
838
my_free((uchar*) read_buf, MYF(0));
726
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
727
uint64_t *first_value, uint64_t *nb_reserved_values)
844
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
845
uint64_t increment __attribute__((unused)),
846
uint64_t nb_desired_values __attribute__((unused)),
847
uint64_t *first_value __attribute__((unused)),
848
uint64_t *nb_reserved_values __attribute__((unused)))
729
850
*nb_reserved_values= UINT64_MAX;
730
851
*first_value= share->archive_write.auto_increment + 1;
733
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
854
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
855
int ha_archive::index_init(uint keynr, bool sorted __attribute__((unused)))
736
857
active_index= keynr;
962
1110
share->archive_write_open= false;
966
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
if (proto_string == NULL)
971
azread_frm(&archive, proto_string);
973
1113
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
986
An extended rebuild is a lot more effort. We open up each row and re-record it.
987
Any dead rows are removed (aka rows that may have been partially recorded).
1114
fn_format(writer_filename, share->table_name, "", ARN,
1115
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1117
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1118
return(HA_ERR_CRASHED_ON_USAGE);
1121
An extended rebuild is a lot more effort. We open up each row and re-record it.
1122
Any dead rows are removed (aka rows that may have been partially recorded).
989
1124
As of Archive format 3, this is the only type that is performed, before this
990
1125
version it was just done on T_EXTEND
1012
1148
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1150
for (x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1152
rc= get_row(&archive, table->record[0]);
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1157
real_write_row(table->record[0], &writer);
1023
1159
Long term it should be possible to optimize this so that
1024
1160
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1162
if (table->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1031
field->setReadSet();
1164
Field *field= table->found_next_number_field;
1033
1165
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1166
(uint64_t) field->val_int(table->record[0] +
1167
field->offset(table->record[0]));
1036
1168
if (share->archive_write.auto_increment < auto_value)
1037
1169
stats.auto_increment_value=
1038
1170
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1173
share->rows_recorded= (ha_rows)writer.rows;
1044
1176
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
1183
share->dirty= false;
1053
1185
azclose(&archive);
1055
1187
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1188
rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1062
azclose(writer.get());
1068
1199
Below is an example of how to setup row level locking.
1070
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1201
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1071
1202
THR_LOCK_DATA **to,
1072
1203
enum thr_lock_type lock_type)
1074
delayed_insert= false;
1205
if (lock_type == TL_WRITE_DELAYED)
1206
delayed_insert= true;
1208
delayed_insert= false;
1076
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1210
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1079
1213
Here is where we get into the guts of a row level lock.
1081
1215
If we are not doing a LOCK Table or DISCARD/IMPORT
1082
TABLESPACE, then allow multiple writers
1216
TABLESPACE, then allow multiple writers
1085
1219
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
lock_type <= TL_WRITE)
1087
&& ! session->doing_tablespace_operation())
1220
lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1221
&& !thd_tablespace_op(thd))
1088
1222
lock_type = TL_WRITE_ALLOW_WRITE;
1091
1225
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
1226
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
1227
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
1228
to t2. Convert the lock to a normal read lock to allow
1095
concurrent inserts to t2.
1229
concurrent inserts to t2.
1098
if (lock_type == TL_READ_NO_INSERT)
1232
if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1099
1233
lock_type = TL_READ;
1101
1235
lock.type=lock_type;
1356
We just return state if asked.
1358
bool ha_archive::is_crashed() const
1360
return(share->crashed);
1207
1364
Simple scan of the tables to make sure everything is ok.
1210
int ha_archive::check(Session* session)
1367
int ha_archive::check(THD* thd,
1368
HA_CHECK_OPT* check_opt __attribute__((unused)))
1213
1371
const char *old_proc_info;
1215
old_proc_info= session->get_proc_info();
1216
session->set_proc_info("Checking table");
1374
old_proc_info= thd_proc_info(thd, "Checking table");
1217
1375
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1376
pthread_mutex_lock(&share->mutex);
1219
1377
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1378
pthread_mutex_unlock(&share->mutex);
1223
Now we will rewind the archive file so that we are positioned at the
1381
Now we will rewind the archive file so that we are positioned at the
1224
1382
start of the file.
1226
1384
init_archive_reader();
1227
1385
azflush(&archive, Z_SYNC_FLUSH);
1228
1386
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1387
for (x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1389
rc= get_row(&archive, table->record[0]);
1237
session->set_proc_info(old_proc_info);
1395
thd_proc_info(thd, old_proc_info);
1239
if ((rc && rc != HA_ERR_END_OF_FILE))
1397
if ((rc && rc != HA_ERR_END_OF_FILE))
1241
1399
share->crashed= false;
1242
1400
return(HA_ADMIN_CORRUPT);
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const identifier::Table &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::identifier::Schema &schema_identifier,
1283
drizzled::identifier::Table::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));
1409
Check and repair the table if needed.
1411
bool ha_archive::check_and_repair(THD *thd)
1413
HA_CHECK_OPT check_opt;
1417
return(repair(thd, &check_opt));
1420
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1422
archive_record_buffer *r;
1424
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1427
return(NULL); /* purecov: inspected */
1429
r->length= (int)length;
1431
if (!(r->buffer= (uchar*) my_malloc(r->length,
1434
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1435
return(NULL); /* purecov: inspected */
1441
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1443
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1444
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1448
static DRIZZLE_SYSVAR_BOOL(aio, archive_use_aio,
1449
PLUGIN_VAR_NOCMDOPT,
1450
"Whether or not to use asynchronous IO.",
1453
static struct st_mysql_sys_var* archive_system_variables[]= {
1454
DRIZZLE_SYSVAR(aio),
1458
mysql_declare_plugin(archive)
1460
DRIZZLE_STORAGE_ENGINE_PLUGIN,
1463
"Brian Aker, MySQL AB",
1464
"Archive storage engine",
1466
archive_db_init, /* Plugin Init */
1467
archive_db_done, /* Plugin Deinit */
1468
NULL, /* status variables */
1469
archive_system_variables, /* system variables */
1470
NULL /* config options */
1472
mysql_declare_plugin_end;