1
/* Copyright (C) 2003 MySQL AB
2
Copyright (C) 2010 Brian Aker
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include "plugin/archive/archive_engine.h"
22
#include <boost/scoped_ptr.hpp>
25
using namespace drizzled;
29
First, if you want to understand storage engines you should look at
30
ha_example.cc and ha_example.h.
32
This example was written as a test case for a customer who needed
33
a storage engine without indexes that could compress data very well.
34
So, welcome to a completely compressed storage engine. This storage
35
engine only does inserts. No replace, deletes, or updates. All reads are
36
complete table scans. Compression is done through a combination of packing
37
and making use of the zlib library
39
We keep a file pointer open for each instance of ha_archive for each read
40
but for writes we keep one open file handle just for that. We flush it
41
only if we have a read occur. azip handles compressing lots of records
42
at once much better then doing lots of little records between writes.
43
It is possible to not lock on writes but this would then mean we couldn't
44
handle bulk inserts as well (that is if someone was trying to read at
45
the same time since we would want to flush).
47
A "meta" file is kept alongside the data file. This file serves two purpose.
48
The first purpose is to track the number of rows in the table. The second
49
purpose is to determine if the table was closed properly or not. When the
50
meta file is first opened it is marked as dirty. It is opened when the table
51
itself is opened for writing. When the table is closed the new count for rows
52
is written to the meta file and the file is marked as clean. If the meta file
53
is opened and it is marked as dirty, it is assumed that a crash occured. At
54
this point an error occurs and the user is told to rebuild the file.
55
A rebuild scans the rows and rewrites the meta file. If corruption is found
56
in the data file then the meta file is not repaired.
58
At some point a recovery method for such a drastic case needs to be divised.
60
Locks are row level, and you will get a consistant read.
62
For performance as far as table scans go it is quite fast. I don't have
63
good numbers but locally it has out performed both Innodb and MyISAM. For
64
Innodb the question will be if the table can be fit into the buffer
65
pool. For MyISAM its a question of how much the file system caches the
66
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
67
doesn't have enough memory to cache entire table that archive turns out
70
Examples between MyISAM (packed) and Archive.
72
Table with 76695844 identical rows:
73
29680807 a_archive.ARZ
77
Table with 8991478 rows (all of Slashdot's comments):
78
1922964506 comment_archive.ARZ
79
2944970297 comment_text.MYD
83
Allow users to set compression level.
84
Allow adjustable block size.
85
Implement versioning, should be easy.
86
Allow for errors, find a way to mark bad rows.
87
Add optional feature so that rows can be flushed at interval (which will cause less
88
compression but may speed up ordered searches).
89
Checkpoint the meta file to allow for faster rebuilds.
90
Option to allow for dirty reads, this would lower the sync calls, which would make
91
inserts a lot faster, but would mean highly arbitrary reads.
96
/* When the engine starts up set the first version */
97
static uint64_t global_version= 1;
99
// We use this to find out the state of the archive aio option.
100
extern bool archive_aio_state(void);
103
Number of rows that will force a bulk insert.
105
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
108
Size of header used for row
110
#define ARCHIVE_ROW_HEADER_SIZE 4
112
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
114
ArchiveMap::iterator find_iter=
115
archive_open_tables.find(table_name);
117
if (find_iter != archive_open_tables.end())
118
return (*find_iter).second;
123
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
125
archive_open_tables[table_name]= share;
128
void ArchiveEngine::deleteOpenTable(const string &table_name)
130
archive_open_tables.erase(table_name);
134
int ArchiveEngine::doDropTable(Session&, const identifier::Table &identifier)
136
string new_path(identifier.getPath());
140
int error= unlink(new_path.c_str());
150
int ArchiveEngine::doGetTableDefinition(Session&,
151
const identifier::Table &identifier,
152
drizzled::message::Table &table_proto)
154
struct stat stat_info;
158
proto_path.reserve(FN_REFLEN);
159
proto_path.assign(identifier.getPath());
161
proto_path.append(ARZ);
163
if (stat(proto_path.c_str(),&stat_info))
169
boost::scoped_ptr<azio_stream> proto_stream(new azio_stream);
171
if (azopen(proto_stream.get(), proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
172
return HA_ERR_CRASHED_ON_USAGE;
174
proto_string= (char*)malloc(sizeof(char) * proto_stream->frm_length);
175
if (proto_string == NULL)
177
azclose(proto_stream.get());
181
azread_frm(proto_stream.get(), proto_string);
183
if (table_proto.ParseFromArray(proto_string, proto_stream->frm_length) == false)
184
error= HA_ERR_CRASHED_ON_USAGE;
186
azclose(proto_stream.get());
190
/* We set the name from what we've asked for as in RENAME TABLE for ARCHIVE
191
we do not rewrite the table proto (as it's wedged in the file header)
193
table_proto.set_schema(identifier.getSchemaName());
194
table_proto.set_name(identifier.getTableName());
200
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
202
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
204
/* Set our original buffer from pre-allocated memory */
205
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
207
/* The size of the offset value we will use for position() */
208
ref_length= sizeof(internal::my_off_t);
209
archive_reader_open= false;
213
This method reads the header of a datafile and returns whether or not it was successful.
215
int ha_archive::read_data_header(azio_stream *file_to_read)
217
if (azread_init(file_to_read) == -1)
218
return(HA_ERR_CRASHED_ON_USAGE);
220
if (file_to_read->version >= 3)
226
ArchiveShare::ArchiveShare():
227
use_count(0), archive_write_open(false), dirty(false), crashed(false),
228
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
233
ArchiveShare::ArchiveShare(const char *name):
234
use_count(0), archive_write_open(false), dirty(false), crashed(false),
235
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
237
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
238
table_name.append(name);
239
data_file_name.assign(table_name);
240
data_file_name.append(ARZ);
242
We will use this lock for rows.
244
pthread_mutex_init(&_mutex,MY_MUTEX_INIT_FAST);
247
ArchiveShare::~ArchiveShare()
250
pthread_mutex_destroy(&_mutex);
252
We need to make sure we don't reset the crashed state.
253
If we open a crashed file, wee need to close it as crashed unless
254
it has been repaired.
255
Since we will close the data down after this, we go on and count
258
if (archive_write_open == true)
259
(void)azclose(&archive_write);
262
bool ArchiveShare::prime(uint64_t *auto_increment)
264
boost::scoped_ptr<azio_stream> archive_tmp(new azio_stream);
267
We read the meta file, but do not mark it dirty. Since we are not
268
doing a write we won't mark it dirty (and we won't open it for
269
anything but reading... open it for write and we will generate null
272
if (!(azopen(archive_tmp.get(), data_file_name.c_str(), O_RDONLY,
276
*auto_increment= archive_tmp->auto_increment + 1;
277
rows_recorded= (ha_rows)archive_tmp->rows;
278
crashed= archive_tmp->dirty;
279
if (version < global_version)
281
version_rows= rows_recorded;
282
version= global_version;
284
azclose(archive_tmp.get());
291
We create the shared memory space that we will use for the open table.
292
No matter what we try to get or create a share. This is so that a repair
293
table operation can occur.
295
See ha_example.cc for a longer description.
297
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
299
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
301
pthread_mutex_lock(&a_engine->mutex());
303
share= a_engine->findOpenTable(table_name);
307
share= new ArchiveShare(table_name);
311
pthread_mutex_unlock(&a_engine->mutex());
312
*rc= HA_ERR_OUT_OF_MEM;
316
if (share->prime(&stats.auto_increment_value) == false)
318
pthread_mutex_unlock(&a_engine->mutex());
319
*rc= HA_ERR_CRASHED_ON_REPAIR;
325
a_engine->addOpenTable(share->table_name, share);
326
thr_lock_init(&share->_lock);
331
*rc= HA_ERR_CRASHED_ON_USAGE;
332
pthread_mutex_unlock(&a_engine->mutex());
340
See ha_example.cc for a description.
342
int ha_archive::free_share()
344
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(getEngine());
346
pthread_mutex_lock(&a_engine->mutex());
347
if (!--share->use_count)
349
a_engine->deleteOpenTable(share->table_name);
352
pthread_mutex_unlock(&a_engine->mutex());
357
int ha_archive::init_archive_writer()
360
It is expensive to open and close the data files and since you can't have
361
a gzip file that can be both read and written we keep a writer open
362
that is shared amoung all open tables.
364
if (!(azopen(&(share->archive_write), share->data_file_name.c_str(),
365
O_RDWR, AZ_METHOD_BLOCK)))
367
share->crashed= true;
370
share->archive_write_open= true;
377
No locks are required because it is associated with just one Cursor instance
379
int ha_archive::init_archive_reader()
382
It is expensive to open and close the data files and since you can't have
383
a gzip file that can be both read and written we keep a writer open
384
that is shared amoung all open tables.
386
if (archive_reader_open == false)
390
if (archive_aio_state())
392
method= AZ_METHOD_AIO;
396
method= AZ_METHOD_BLOCK;
398
if (!(azopen(&archive, share->data_file_name.c_str(), O_RDONLY,
401
share->crashed= true;
404
archive_reader_open= true;
411
When opening a file we:
412
Create/get our shared structure.
414
We open the file we will read from.
416
int ha_archive::doOpen(const identifier::Table &identifier, int , uint32_t )
419
share= get_share(identifier.getPath().c_str(), &rc);
422
We either fix it ourselves, or we just take it offline
424
@todo Create some documentation in the recovery tools shipped with the engine.
426
if (rc == HA_ERR_CRASHED_ON_USAGE)
433
else if (rc == HA_ERR_OUT_OF_MEM)
440
record_buffer.resize(getTable()->getShare()->getRecordLength() + ARCHIVE_ROW_HEADER_SIZE);
442
lock.init(&share->_lock);
447
// Should never be called
448
int ha_archive::open(const char *, int, uint32_t)
463
We first close this storage engines file handle to the archive and
464
then remove our reference count to the table (and possibly free it
472
int ha_archive::close(void)
476
record_buffer.clear();
478
/* First close stream */
479
if (archive_reader_open == true)
481
if (azclose(&archive))
484
/* then also close share */
492
We create our data file here. The format is pretty simple.
493
You can read about the format of the data file above.
494
Unlike other storage engines we do not "pack" our data. Since we
495
are about to do a general compression, packing would just be a waste of
496
CPU time. If the table has blobs they are written after the row in the order
500
int ArchiveEngine::doCreateTable(Session &,
502
const drizzled::identifier::Table &identifier,
503
drizzled::message::Table& proto)
506
boost::scoped_ptr<azio_stream> create_stream(new azio_stream);
507
uint64_t auto_increment_value;
508
string serialized_proto;
510
auto_increment_value= proto.options().auto_increment_value();
512
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
514
KeyInfo *pos= &table_arg.key_info[key];
515
KeyPartInfo *key_part= pos->key_part;
516
KeyPartInfo *key_part_end= key_part + pos->key_parts;
518
for (; key_part != key_part_end; key_part++)
520
Field *field= key_part->field;
522
if (!(field->flags & AUTO_INCREMENT_FLAG))
529
std::string named_file= identifier.getPath();
530
named_file.append(ARZ);
533
if (azopen(create_stream.get(), named_file.c_str(), O_CREAT|O_RDWR,
534
AZ_METHOD_BLOCK) == 0)
537
unlink(named_file.c_str());
539
return(error ? error : -1);
543
proto.SerializeToString(&serialized_proto);
547
unlink(named_file.c_str());
549
return(error ? error : -1);
552
if (azwrite_frm(create_stream.get(), serialized_proto.c_str(),
553
serialized_proto.length()))
555
unlink(named_file.c_str());
557
return(error ? error : -1);
560
if (proto.options().has_comment())
564
write_length= azwrite_comment(create_stream.get(),
565
proto.options().comment().c_str(),
566
proto.options().comment().length());
568
if (write_length < 0)
571
unlink(named_file.c_str());
573
return(error ? error : -1);
578
Yes you need to do this, because the starting value
579
for the autoincrement may not be zero.
581
create_stream->auto_increment= auto_increment_value ?
582
auto_increment_value - 1 : 0;
584
if (azclose(create_stream.get()))
587
unlink(named_file.c_str());
589
return(error ? error : -1);
596
This is where the actual row is written out.
598
int ha_archive::real_write_row(unsigned char *buf, azio_stream *writer)
601
unsigned int r_pack_length;
603
/* We pack the row for writing */
604
r_pack_length= pack_row(buf);
606
written= azwrite_row(writer, &record_buffer[0], r_pack_length);
607
if (written != r_pack_length)
612
if (!delayed_insert || !bulk_insert)
620
Calculate max length needed for row. This includes
621
the bytes required for the length in the header.
624
uint32_t ha_archive::max_row_length(const unsigned char *)
626
uint32_t length= (uint32_t)(getTable()->getRecordLength() + getTable()->sizeFields()*2);
627
length+= ARCHIVE_ROW_HEADER_SIZE;
630
for (ptr= getTable()->getBlobField(), end=ptr + getTable()->sizeBlobFields();
634
length += 2 + ((Field_blob*)getTable()->getField(*ptr))->get_length();
641
unsigned int ha_archive::pack_row(unsigned char *record)
645
if (fix_rec_buff(max_row_length(record)))
646
return(HA_ERR_OUT_OF_MEM);
649
memcpy(&record_buffer[0], record, getTable()->getShare()->null_bytes);
650
ptr= &record_buffer[0] + getTable()->getShare()->null_bytes;
652
for (Field **field=getTable()->getFields() ; *field ; field++)
654
if (!((*field)->is_null()))
655
ptr= (*field)->pack(ptr, record + (*field)->offset(record));
658
return((unsigned int) (ptr - &record_buffer[0]));
663
Look at ha_archive::open() for an explanation of the row format.
664
Here we just write out the row.
666
Wondering about start_bulk_insert()? We don't implement it for
667
archive since it optimizes for lots of writes. The only save
668
for implementing start_bulk_insert() is that we could skip
669
setting dirty to true each time.
671
int ha_archive::doInsertRecord(unsigned char *buf)
674
unsigned char *read_buf= NULL;
676
unsigned char *record= getTable()->getInsertRecord();
679
return(HA_ERR_CRASHED_ON_USAGE);
681
pthread_mutex_lock(&share->mutex());
683
if (share->archive_write_open == false)
684
if (init_archive_writer())
685
return(HA_ERR_CRASHED_ON_USAGE);
688
if (getTable()->next_number_field && record == getTable()->getInsertRecord())
690
update_auto_increment();
691
temp_auto= getTable()->next_number_field->val_int();
694
We don't support decremening auto_increment. They make the performance
697
if (temp_auto <= share->archive_write.auto_increment &&
698
getTable()->getShare()->getKeyInfo(0).flags & HA_NOSAME)
700
rc= HA_ERR_FOUND_DUPP_KEY;
705
if (temp_auto > share->archive_write.auto_increment)
706
stats.auto_increment_value=
707
(share->archive_write.auto_increment= temp_auto) + 1;
712
Notice that the global auto_increment has been increased.
713
In case of a failed row write, we will never try to reuse the value.
715
share->rows_recorded++;
716
rc= real_write_row(buf, &(share->archive_write));
718
pthread_mutex_unlock(&share->mutex());
720
free((unsigned char*) read_buf);
726
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
727
uint64_t *first_value, uint64_t *nb_reserved_values)
729
*nb_reserved_values= UINT64_MAX;
730
*first_value= share->archive_write.auto_increment + 1;
733
/* Initialized at each key walk (called multiple times unlike doStartTableScan()) */
734
int ha_archive::doStartIndexScan(uint32_t keynr, bool)
742
No indexes, so if we get a request for an index search since we tell
743
the optimizer that we have unique indexes, we scan
745
int ha_archive::index_read(unsigned char *buf, const unsigned char *key,
746
uint32_t key_len, enum ha_rkey_function)
750
current_k_offset= getTable()->getShare()->getKeyInfo(0).key_part->offset;
752
current_key_len= key_len;
754
rc= doStartTableScan(true);
759
while (!(get_row(&archive, buf)))
761
if (!memcmp(current_key, buf + current_k_offset, current_key_len))
772
return(rc ? rc : HA_ERR_END_OF_FILE);
776
int ha_archive::index_next(unsigned char * buf)
780
while (!(get_row(&archive, buf)))
782
if (!memcmp(current_key, buf+current_k_offset, current_key_len))
789
return(found ? 0 : HA_ERR_END_OF_FILE);
793
All calls that need to scan the table start with this method. If we are told
794
that it is a table scan we rewind the file to the beginning, otherwise
795
we assume the position will be set.
798
int ha_archive::doStartTableScan(bool scan)
801
return(HA_ERR_CRASHED_ON_USAGE);
803
init_archive_reader();
805
/* We rewind the file so that we can read from the beginning if scan */
808
if (read_data_header(&archive))
809
return(HA_ERR_CRASHED_ON_USAGE);
817
This is the method that is used to read a row. It assumes that the row is
818
positioned where you want it.
820
int ha_archive::get_row(azio_stream *file_to_read, unsigned char *buf)
824
if (file_to_read->version == ARCHIVE_VERSION)
825
rc= get_row_version3(file_to_read, buf);
832
/* Reallocate buffer if needed */
833
bool ha_archive::fix_rec_buff(unsigned int length)
835
record_buffer.resize(length);
840
int ha_archive::unpack_row(azio_stream *file_to_read, unsigned char *record)
844
const unsigned char *ptr;
846
read= azread_row(file_to_read, &error);
847
ptr= (const unsigned char *)file_to_read->row_ptr;
849
if (error || read == 0)
855
memcpy(record, ptr, getTable()->getNullBytes());
856
ptr+= getTable()->getNullBytes();
857
for (Field **field= getTable()->getFields() ; *field ; field++)
859
if (!((*field)->is_null()))
861
ptr= (*field)->unpack(record + (*field)->offset(getTable()->getInsertRecord()), ptr);
868
int ha_archive::get_row_version3(azio_stream *file_to_read, unsigned char *buf)
870
int returnable= unpack_row(file_to_read, buf);
877
Called during ORDER BY. Its position is either from being called sequentially
878
or by having had ha_archive::rnd_pos() called before it is called.
881
int ha_archive::rnd_next(unsigned char *buf)
886
return(HA_ERR_CRASHED_ON_USAGE);
889
return(HA_ERR_END_OF_FILE);
892
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
893
current_position= aztell(&archive);
894
rc= get_row(&archive, buf);
896
getTable()->status=rc ? STATUS_NOT_FOUND: 0;
903
Thanks to the table bool is_ordered this will be called after
904
each call to ha_archive::rnd_next() if an ordering of the rows is
908
void ha_archive::position(const unsigned char *)
910
internal::my_store_ptr(ref, ref_length, current_position);
916
This is called after a table scan for each row if the results of the
917
scan need to be ordered. It will take *pos and use it to move the
918
cursor in the file so that the next row that is called is the
919
correctly ordered row.
922
int ha_archive::rnd_pos(unsigned char * buf, unsigned char *pos)
924
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
925
current_position= (internal::my_off_t)internal::my_get_ptr(pos, ref_length);
926
if (azseek(&archive, (size_t)current_position, SEEK_SET) == (size_t)(-1L))
927
return(HA_ERR_CRASHED_ON_USAGE);
928
return(get_row(&archive, buf));
932
This method repairs the meta file. It does this by walking the datafile and
933
rewriting the meta file. Currently it does this by calling optimize with
936
int ha_archive::repair()
941
return(HA_ERR_CRASHED_ON_REPAIR);
943
share->crashed= false;
948
The table can become fragmented if data was inserted, read, and then
949
inserted again. What we do is open up the file and recompress it completely.
951
int ha_archive::optimize()
954
boost::scoped_ptr<azio_stream> writer(new azio_stream);
956
init_archive_reader();
958
// now we close both our writer and our reader for the rename
959
if (share->archive_write_open)
961
azclose(&(share->archive_write));
962
share->archive_write_open= false;
966
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
967
if (proto_string == NULL)
971
azread_frm(&archive, proto_string);
973
/* Lets create a file to contain the new data */
974
std::string writer_filename= share->table_name;
975
writer_filename.append(ARN);
977
if (!(azopen(writer.get(), writer_filename.c_str(), O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
980
return(HA_ERR_CRASHED_ON_USAGE);
983
azwrite_frm(writer.get(), proto_string, archive.frm_length);
986
An extended rebuild is a lot more effort. We open up each row and re-record it.
987
Any dead rows are removed (aka rows that may have been partially recorded).
989
As of Archive format 3, this is the only type that is performed, before this
990
version it was just done on T_EXTEND
995
Now we will rewind the archive file so that we are positioned at the
998
azflush(&archive, Z_SYNC_FLUSH);
999
rc= read_data_header(&archive);
1002
On success of writing out the new header, we now fetch each row and
1003
insert it into the new archive file.
1007
uint64_t rows_restored;
1008
share->rows_recorded= 0;
1009
stats.auto_increment_value= 1;
1010
share->archive_write.auto_increment= 0;
1012
rows_restored= archive.rows;
1014
for (uint64_t x= 0; x < rows_restored ; x++)
1016
rc= get_row(&archive, getTable()->getInsertRecord());
1021
real_write_row(getTable()->getInsertRecord(), writer.get());
1023
Long term it should be possible to optimize this so that
1024
it is not called on each row.
1026
if (getTable()->found_next_number_field)
1028
Field *field= getTable()->found_next_number_field;
1030
/* Since we will need to use field to translate, we need to flip its read bit */
1031
field->setReadSet();
1033
uint64_t auto_value=
1034
(uint64_t) field->val_int_internal(getTable()->getInsertRecord() +
1035
field->offset(getTable()->getInsertRecord()));
1036
if (share->archive_write.auto_increment < auto_value)
1037
stats.auto_increment_value=
1038
(share->archive_write.auto_increment= auto_value) + 1;
1041
share->rows_recorded= (ha_rows)writer->rows;
1044
if (rc && rc != HA_ERR_END_OF_FILE)
1050
azclose(writer.get());
1051
share->dirty= false;
1055
// make the file we just wrote be our data file
1056
rc = internal::my_rename(writer_filename.c_str(), share->data_file_name.c_str(), MYF(0));
1062
azclose(writer.get());
1068
Below is an example of how to setup row level locking.
1070
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1072
enum thr_lock_type lock_type)
1074
delayed_insert= false;
1076
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1079
Here is where we get into the guts of a row level lock.
1081
If we are not doing a LOCK Table or DISCARD/IMPORT
1082
TABLESPACE, then allow multiple writers
1085
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1086
lock_type <= TL_WRITE)
1087
&& ! session->doing_tablespace_operation())
1088
lock_type = TL_WRITE_ALLOW_WRITE;
1091
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1092
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1093
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1094
to t2. Convert the lock to a normal read lock to allow
1095
concurrent inserts to t2.
1098
if (lock_type == TL_READ_NO_INSERT)
1099
lock_type = TL_READ;
1101
lock.type=lock_type;
1110
Hints for optimizer, see ha_tina for more information
1112
int ha_archive::info(uint32_t flag)
1115
If dirty, we lock, and then reset/flush the data.
1116
I found that just calling azflush() doesn't always work.
1118
pthread_mutex_lock(&share->mutex());
1119
if (share->dirty == true)
1121
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1122
share->rows_recorded= share->archive_write.rows;
1123
share->dirty= false;
1124
if (share->version < global_version)
1126
share->version_rows= share->rows_recorded;
1127
share->version= global_version;
1133
This should be an accurate number now, though bulk and delayed inserts can
1134
cause the number to be inaccurate.
1136
stats.records= share->rows_recorded;
1137
pthread_mutex_unlock(&share->mutex());
1139
scan_rows= stats.records;
1142
/* Costs quite a bit more to get all information */
1143
if (flag & HA_STATUS_TIME)
1145
struct stat file_stat; // Stat information for the data file
1147
stat(share->data_file_name.c_str(), &file_stat);
1149
stats.mean_rec_length= getTable()->getRecordLength()+ buffer.alloced_length();
1150
stats.data_file_length= file_stat.st_size;
1151
stats.create_time= file_stat.st_ctime;
1152
stats.update_time= file_stat.st_mtime;
1153
stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1155
stats.delete_length= 0;
1156
stats.index_file_length=0;
1158
if (flag & HA_STATUS_AUTO)
1160
init_archive_reader();
1161
pthread_mutex_lock(&share->mutex());
1162
azflush(&archive, Z_SYNC_FLUSH);
1163
pthread_mutex_unlock(&share->mutex());
1164
stats.auto_increment_value= archive.auto_increment + 1;
1172
This method tells us that a bulk insert operation is about to occur. We set
1173
a flag which will keep doInsertRecord from saying that its data is dirty. This in
1174
turn will keep selects from causing a sync to occur.
1175
Basically, yet another optimizations to keep compression working well.
1177
void ha_archive::start_bulk_insert(ha_rows rows)
1179
if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1186
Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1187
flag, and set the share dirty so that the next select will call sync for us.
1189
int ha_archive::end_bulk_insert()
1197
We cancel a truncate command. The only way to delete an archive table is to drop it.
1198
This is done for security reasons. In a later version we will enable this by
1199
allowing the user to select a different row format.
1201
int ha_archive::delete_all_rows()
1203
return(HA_ERR_WRONG_COMMAND);
1207
Simple scan of the tables to make sure everything is ok.
1210
int ha_archive::check(Session* session)
1213
const char *old_proc_info;
1215
old_proc_info= session->get_proc_info();
1216
session->set_proc_info("Checking table");
1217
/* Flush any waiting data */
1218
pthread_mutex_lock(&share->mutex());
1219
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1220
pthread_mutex_unlock(&share->mutex());
1223
Now we will rewind the archive file so that we are positioned at the
1226
init_archive_reader();
1227
azflush(&archive, Z_SYNC_FLUSH);
1228
read_data_header(&archive);
1229
for (uint64_t x= 0; x < share->archive_write.rows; x++)
1231
rc= get_row(&archive, getTable()->getInsertRecord());
1237
session->set_proc_info(old_proc_info);
1239
if ((rc && rc != HA_ERR_END_OF_FILE))
1241
share->crashed= false;
1242
return(HA_ADMIN_CORRUPT);
1246
return(HA_ADMIN_OK);
1250
int ArchiveEngine::doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to)
1254
for (const char **ext= bas_ext(); *ext ; ext++)
1256
if (rename_file_ext(from.getPath().c_str(), to.getPath().c_str(), *ext))
1258
if ((error=errno) != ENOENT)
1267
bool ArchiveEngine::doDoesTableExist(Session&,
1268
const identifier::Table &identifier)
1270
string proto_path(identifier.getPath());
1271
proto_path.append(ARZ);
1273
if (access(proto_path.c_str(), F_OK))
1281
void ArchiveEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
1282
const drizzled::identifier::Schema &schema_identifier,
1283
drizzled::identifier::Table::vector &set_of_identifiers)
1285
drizzled::CachedDirectory::Entries entries= directory.getEntries();
1287
for (drizzled::CachedDirectory::Entries::iterator entry_iter= entries.begin();
1288
entry_iter != entries.end(); ++entry_iter)
1290
drizzled::CachedDirectory::Entry *entry= *entry_iter;
1291
const string *filename= &entry->filename;
1293
assert(filename->size());
1295
const char *ext= strchr(filename->c_str(), '.');
1297
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
1298
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
1302
char uname[NAME_LEN + 1];
1303
uint32_t file_name_len;
1305
file_name_len= identifier::Table::filename_to_tablename(filename->c_str(), uname, sizeof(uname));
1306
// TODO: Remove need for memory copy here
1307
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
1309
set_of_identifiers.push_back(identifier::Table(schema_identifier, uname));