13
13
along with this program; if not, write to the Free Software
14
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17
#pragma implementation // gcc: Class implementation
17
#include "drizzled/server_includes.h"
18
#include "drizzled/field.h"
19
#include "drizzled/field/blob.h"
20
#include "drizzled/field/timestamp.h"
21
#include "plugin/myisam/myisam.h"
22
#include "drizzled/table.h"
23
#include "drizzled/session.h"
24
#include <mysys/my_dir.h>
20
#include <drizzled/common_includes.h>
21
#include <storage/myisam/myisam.h>
26
23
#include "ha_archive.h"
34
static const string engine_name("ARCHIVE");
37
First, if you want to understand storage engines you should look at
38
ha_example.cc and ha_example.h.
26
First, if you want to understand storage engines you should look at
27
ha_example.cc and ha_example.h.
40
29
This example was written as a test case for a customer who needed
41
30
a storage engine without indexes that could compress data very well.
42
31
So, welcome to a completely compressed storage engine. This storage
43
engine only does inserts. No replace, deletes, or updates. All reads are
32
engine only does inserts. No replace, deletes, or updates. All reads are
44
33
complete table scans. Compression is done through a combination of packing
45
34
and making use of the zlib library
47
36
We keep a file pointer open for each instance of ha_archive for each read
48
37
but for writes we keep one open file handle just for that. We flush it
49
38
only if we have a read occur. azip handles compressing lots of records
53
42
the same time since we would want to flush).
55
44
A "meta" file is kept alongside the data file. This file serves two purpose.
56
The first purpose is to track the number of rows in the table. The second
57
purpose is to determine if the table was closed properly or not. When the
58
meta file is first opened it is marked as dirty. It is opened when the table
59
itself is opened for writing. When the table is closed the new count for rows
60
is written to the meta file and the file is marked as clean. If the meta file
61
is opened and it is marked as dirty, it is assumed that a crash occured. At
45
The first purpose is to track the number of rows in the table. The second
46
purpose is to determine if the table was closed properly or not. When the
47
meta file is first opened it is marked as dirty. It is opened when the table
48
itself is opened for writing. When the table is closed the new count for rows
49
is written to the meta file and the file is marked as clean. If the meta file
50
is opened and it is marked as dirty, it is assumed that a crash occured. At
62
51
this point an error occurs and the user is told to rebuild the file.
63
52
A rebuild scans the rows and rewrites the meta file. If corruption is found
64
53
in the data file then the meta file is not repaired.
66
55
At some point a recovery method for such a drastic case needs to be divised.
68
Locks are row level, and you will get a consistant read.
57
Locks are row level, and you will get a consistant read.
70
59
For performance as far as table scans go it is quite fast. I don't have
71
60
good numbers but locally it has out performed both Innodb and MyISAM. For
72
61
Innodb the question will be if the table can be fit into the buffer
73
62
pool. For MyISAM its a question of how much the file system caches the
74
63
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
75
doesn't have enough memory to cache entire table that archive turns out
64
doesn't have enough memory to cache entire table that archive turns out
78
67
Examples between MyISAM (packed) and Archive.
104
93
/* Variables for archive share methods */
105
pthread_mutex_t archive_mutex= PTHREAD_MUTEX_INITIALIZER;
94
pthread_mutex_t archive_mutex;
95
static HASH archive_open_tables;
107
96
static unsigned int global_version;
109
98
/* The file extension */
110
#define ARZ ".arz" // The data file
99
#define ARZ ".ARZ" // The data file
111
100
#define ARN ".ARN" // Files used during an optimize call
101
#define ARM ".ARM" // Meta file (deprecated)
106
#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
107
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
109
/* Static declarations for handerton */
110
static handler *archive_create_handler(handlerton *hton,
113
int archive_discover(handlerton *hton, THD* thd, const char *db,
115
118
static bool archive_use_aio= false;
125
128
#define ARCHIVE_ROW_HEADER_SIZE 4
130
static handler *archive_create_handler(handlerton *hton,
134
return new (mem_root) ha_archive(hton, table);
128
We just implement one additional file extension.
138
Used for hash table that tracks open tables.
130
static const char *ha_archive_exts[] = {
135
class ArchiveEngine : public drizzled::plugin::StorageEngine
137
typedef std::map<string, ArchiveShare*> ArchiveMap;
138
ArchiveMap archive_open_tables;
141
ArchiveEngine(const string &name_arg)
142
: drizzled::plugin::StorageEngine(name_arg,
144
HTON_STATS_RECORDS_IS_EXACT |
146
HTON_HAS_DATA_DICTIONARY)
148
table_definition_ext= ARZ;
151
virtual Cursor *create(TableShare &table,
154
return new (mem_root) ha_archive(*this, table);
157
const char **bas_ext() const {
158
return ha_archive_exts;
161
int doCreateTable(Session *session, const char *table_name,
163
drizzled::message::Table& proto);
165
int doGetTableDefinition(Session& session,
168
const char *table_name,
170
drizzled::message::Table *table_proto);
172
void doGetTableNames(CachedDirectory &directory, string& , set<string>& set_of_names);
174
int doDropTable(Session&, const string table_path);
175
ArchiveShare *findOpenTable(const string table_name);
176
void addOpenTable(const string &table_name, ArchiveShare *);
177
void deleteOpenTable(const string &table_name);
179
uint32_t max_supported_keys() const { return 1; }
180
uint32_t max_supported_key_length() const { return sizeof(uint64_t); }
181
uint32_t max_supported_key_part_length() const { return sizeof(uint64_t); }
185
ArchiveShare *ArchiveEngine::findOpenTable(const string table_name)
187
ArchiveMap::iterator find_iter=
188
archive_open_tables.find(table_name);
190
if (find_iter != archive_open_tables.end())
191
return (*find_iter).second;
196
void ArchiveEngine::addOpenTable(const string &table_name, ArchiveShare *share)
198
archive_open_tables[table_name]= share;
201
void ArchiveEngine::deleteOpenTable(const string &table_name)
203
archive_open_tables.erase(table_name);
207
void ArchiveEngine::doGetTableNames(CachedDirectory &directory,
209
set<string>& set_of_names)
211
CachedDirectory::Entries entries= directory.getEntries();
213
for (CachedDirectory::Entries::iterator entry_iter= entries.begin();
214
entry_iter != entries.end(); ++entry_iter)
216
CachedDirectory::Entry *entry= *entry_iter;
217
string *filename= &entry->filename;
219
assert(filename->size());
221
const char *ext= strchr(filename->c_str(), '.');
223
if (ext == NULL || my_strcasecmp(system_charset_info, ext, ARZ) ||
224
is_prefix(filename->c_str(), TMP_FILE_PREFIX))
228
char uname[NAME_LEN + 1];
229
uint32_t file_name_len;
231
file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
232
// TODO: Remove need for memory copy here
233
uname[file_name_len - sizeof(ARZ) + 1]= '\0'; // Subtract ending, place NULL
234
set_of_names.insert(uname);
240
int ArchiveEngine::doDropTable(Session&,
241
const string table_path)
243
string new_path(table_path);
247
int error= unlink(new_path.c_str());
251
error= my_errno= errno;
257
int ArchiveEngine::doGetTableDefinition(Session&,
262
drizzled::message::Table *table_proto)
264
struct stat stat_info;
268
proto_path.reserve(FN_REFLEN);
269
proto_path.assign(path);
271
proto_path.append(ARZ);
273
if (stat(proto_path.c_str(),&stat_info))
280
azio_stream proto_stream;
282
if (azopen(&proto_stream, proto_path.c_str(), O_RDONLY, AZ_METHOD_BLOCK) == 0)
283
return HA_ERR_CRASHED_ON_USAGE;
285
proto_string= (char*)malloc(sizeof(char) * proto_stream.frm_length);
286
if (proto_string == NULL)
288
azclose(&proto_stream);
292
azread_frm(&proto_stream, proto_string);
294
if (table_proto->ParseFromArray(proto_string, proto_stream.frm_length) == false)
295
error= HA_ERR_CRASHED_ON_USAGE;
297
azclose(&proto_stream);
304
static ArchiveEngine *archive_engine= NULL;
140
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141
bool not_used __attribute__((unused)))
143
*length=share->table_name_length;
144
return (uchar*) share->table_name;
307
Initialize the archive Cursor.
149
Initialize the archive handler.
310
152
archive_db_init()
318
static int archive_db_init(drizzled::plugin::Registry ®istry)
160
int archive_db_init(void *p)
162
handlerton *archive_hton;
321
pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST);
322
archive_engine= new ArchiveEngine(engine_name);
323
registry.add(archive_engine);
164
archive_hton= (handlerton *)p;
165
archive_hton->state= SHOW_OPTION_YES;
166
archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
167
archive_hton->create= archive_create_handler;
168
archive_hton->flags= HTON_NO_FLAGS;
169
archive_hton->discover= archive_discover;
325
171
/* When the engine starts up set the first version */
326
172
global_version= 1;
174
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
176
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
177
(hash_get_key) archive_get_key, 0, 0))
179
VOID(pthread_mutex_destroy(&archive_mutex));
332
Release the archive Cursor.
190
Release the archive handler.
335
193
archive_db_done()
342
static int archive_db_done(drizzled::plugin::Registry ®istry)
200
int archive_db_done(void *p __attribute__((unused)))
344
registry.remove(archive_engine);
345
delete archive_engine;
347
pthread_mutex_destroy(&archive_mutex);
202
hash_free(&archive_open_tables);
203
VOID(pthread_mutex_destroy(&archive_mutex));
353
ha_archive::ha_archive(drizzled::plugin::StorageEngine &engine_arg,
354
TableShare &table_arg)
355
:Cursor(engine_arg, table_arg), delayed_insert(0), bulk_insert(0)
209
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
210
:handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
357
212
/* Set our original buffer from pre-allocated memory */
358
213
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
362
217
archive_reader_open= false;
220
int archive_discover(handlerton *hton __attribute__((unused)),
221
THD* thd __attribute__((unused)),
227
azio_stream frm_stream;
228
char az_file[FN_REFLEN];
230
struct stat file_stat;
232
fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
234
if (stat(az_file, &file_stat))
237
if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY, AZ_METHOD_BLOCK)))
239
if (errno == EROFS || errno == EACCES)
240
return(my_errno= errno);
241
return(HA_ERR_CRASHED_ON_USAGE);
244
if (frm_stream.frm_length == 0)
247
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
248
azread_frm(&frm_stream, frm_ptr);
249
azclose(&frm_stream);
251
*frmlen= frm_stream.frm_length;
252
*frmblob= (uchar*) frm_ptr;
366
261
This method reads the header of a datafile and returns whether or not it was successful.
379
ArchiveShare::ArchiveShare():
380
use_count(0), archive_write_open(false), dirty(false), crashed(false),
381
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
386
ArchiveShare::ArchiveShare(const char *name):
387
use_count(0), archive_write_open(false), dirty(false), crashed(false),
388
mean_rec_length(0), version(0), rows_recorded(0), version_rows(0)
390
memset(&archive_write, 0, sizeof(azio_stream)); /* Archive file we are working with */
391
table_name.append(name);
392
fn_format(data_file_name, table_name.c_str(), "",
393
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
395
We will use this lock for rows.
397
pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
400
ArchiveShare::~ArchiveShare()
402
thr_lock_delete(&lock);
403
pthread_mutex_destroy(&mutex);
405
We need to make sure we don't reset the crashed state.
406
If we open a crashed file, wee need to close it as crashed unless
407
it has been repaired.
408
Since we will close the data down after this, we go on and count
411
if (archive_write_open == true)
412
(void)azclose(&archive_write);
415
bool ArchiveShare::prime(uint64_t *auto_increment)
417
azio_stream archive_tmp;
420
We read the meta file, but do not mark it dirty. Since we are not
421
doing a write we won't mark it dirty (and we won't open it for
422
anything but reading... open it for write and we will generate null
425
if (!(azopen(&archive_tmp, data_file_name, O_RDONLY,
429
*auto_increment= archive_tmp.auto_increment + 1;
430
rows_recorded= (ha_rows)archive_tmp.rows;
431
crashed= archive_tmp.dirty;
432
if (version < global_version)
434
version_rows= rows_recorded;
435
version= global_version;
437
azclose(&archive_tmp);
444
We create the shared memory space that we will use for the open table.
276
We create the shared memory space that we will use for the open table.
445
277
No matter what we try to get or create a share. This is so that a repair
446
table operation can occur.
278
table operation can occur.
448
280
See ha_example.cc for a longer description.
450
ArchiveShare *ha_archive::get_share(const char *table_name, int *rc)
282
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
452
286
pthread_mutex_lock(&archive_mutex);
454
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
455
share= a_engine->findOpenTable(table_name);
287
length=(uint) strlen(table_name);
289
if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
459
share= new ArchiveShare(table_name);
294
azio_stream archive_tmp;
296
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
297
&share, sizeof(*share),
463
301
pthread_mutex_unlock(&archive_mutex);
464
302
*rc= HA_ERR_OUT_OF_MEM;
468
if (share->prime(&stats.auto_increment_value) == false)
307
share->table_name_length= length;
308
share->table_name= tmp_name;
309
share->crashed= false;
310
share->archive_write_open= false;
311
fn_format(share->data_file_name, table_name, "",
312
ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
313
stpcpy(share->table_name, table_name);
315
We will use this lock for rows.
317
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
320
We read the meta file, but do not mark it dirty. Since we are not
321
doing a write we won't mark it dirty (and we won't open it for
322
anything but reading... open it for write and we will generate null
325
if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY,
328
VOID(pthread_mutex_destroy(&share->mutex));
470
330
pthread_mutex_unlock(&archive_mutex);
471
331
*rc= HA_ERR_CRASHED_ON_REPAIR;
477
a_engine->addOpenTable(share->table_name, share);
334
stats.auto_increment_value= archive_tmp.auto_increment + 1;
335
share->rows_recorded= (ha_rows)archive_tmp.rows;
336
share->crashed= archive_tmp.dirty;
337
if (share->version < global_version)
339
share->version_rows= share->rows_recorded;
340
share->version= global_version;
342
azclose(&archive_tmp);
344
VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
478
345
thr_lock_init(&share->lock);
480
347
share->use_count++;
482
348
if (share->crashed)
483
349
*rc= HA_ERR_CRASHED_ON_USAGE;
484
350
pthread_mutex_unlock(&archive_mutex);
492
358
See ha_example.cc for a description.
494
360
int ha_archive::free_share()
496
364
pthread_mutex_lock(&archive_mutex);
497
365
if (!--share->use_count)
499
ArchiveEngine *a_engine= static_cast<ArchiveEngine *>(engine);
500
a_engine->deleteOpenTable(share->table_name);
367
hash_delete(&archive_open_tables, (uchar*) share);
368
thr_lock_delete(&share->lock);
369
VOID(pthread_mutex_destroy(&share->mutex));
371
We need to make sure we don't reset the crashed state.
372
If we open a crashed file, wee need to close it as crashed unless
373
it has been repaired.
374
Since we will close the data down after this, we go on and count
377
if (share->archive_write_open == true)
379
if (azclose(&(share->archive_write)))
382
my_free((uchar*) share, MYF(0));
503
384
pthread_mutex_unlock(&archive_mutex);
508
389
int ha_archive::init_archive_writer()
511
392
It is expensive to open and close the data files and since you can't have
512
393
a gzip file that can be both read and written we keep a writer open
513
394
that is shared amoung all open tables.
515
if (!(azopen(&(share->archive_write), share->data_file_name,
516
O_RDWR, AZ_METHOD_BLOCK)))
396
if (!(azopen(&(share->archive_write), share->data_file_name,
397
O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
518
399
share->crashed= true;
447
We just implement one additional file extension.
449
static const char *ha_archive_exts[] = {
454
const char **ha_archive::bas_ext() const
456
return ha_archive_exts;
565
461
When opening a file we:
566
462
Create/get our shared structure.
568
464
We open the file we will read from.
570
int ha_archive::open(const char *name, int, uint32_t)
466
int ha_archive::open(const char *name,
467
int mode __attribute__((unused)),
573
471
share= get_share(name, &rc);
576
We either fix it ourselves, or we just take it offline
578
@todo Create some documentation in the recovery tools shipped with the engine.
580
if (rc == HA_ERR_CRASHED_ON_USAGE)
473
if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
475
/* purecov: begin inspected */
587
480
else if (rc == HA_ERR_OUT_OF_MEM)
646
We create our data file here. The format is pretty simple.
544
We create our data file here. The format is pretty simple.
647
545
You can read about the format of the data file above.
648
Unlike other storage engines we do not "pack" our data. Since we
649
are about to do a general compression, packing would just be a waste of
650
CPU time. If the table has blobs they are written after the row in the order
546
Unlike other storage engines we do not "pack" our data. Since we
547
are about to do a general compression, packing would just be a waste of
548
CPU time. If the table has blobs they are written after the row in the order
654
int ArchiveEngine::doCreateTable(Session *,
655
const char *table_name,
657
drizzled::message::Table& proto)
552
int ha_archive::create(const char *name, TABLE *table_arg,
553
HA_CREATE_INFO *create_info)
659
555
char name_buff[FN_REFLEN];
556
char linkname[FN_REFLEN];
661
558
azio_stream create_stream; /* Archive file we are working with */
662
uint64_t auto_increment_value;
663
string serialized_proto;
665
auto_increment_value= proto.options().auto_increment_value();
667
for (uint32_t key= 0; key < table_arg.sizeKeys(); key++)
559
File frm_file; /* File handler for readers */
560
struct stat file_stat;
563
stats.auto_increment_value= create_info->auto_increment_value;
565
for (uint key= 0; key < table_arg->sizeKeys(); key++)
669
KEY *pos= table_arg.key_info+key;
567
KEY *pos= table_arg->key_info+key;
670
568
KEY_PART_INFO *key_part= pos->key_part;
671
569
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
686
584
We reuse name_buff since it is available.
688
fn_format(name_buff, table_name, "", ARZ,
689
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
692
if (azopen(&create_stream, name_buff, O_CREAT|O_RDWR,
693
AZ_METHOD_BLOCK) == 0)
586
if (create_info->data_file_name && create_info->data_file_name[0] != '#')
588
fn_format(name_buff, create_info->data_file_name, "", ARZ,
589
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
590
fn_format(linkname, name, "", ARZ,
591
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
699
proto.SerializeToString(&serialized_proto);
701
if (azwrite_frm(&create_stream, serialized_proto.c_str(),
702
serialized_proto.length()))
705
if (proto.options().has_comment())
709
write_length= azwrite_comment(&create_stream,
710
proto.options().comment().c_str(),
711
proto.options().comment().length());
713
if (write_length < 0)
595
fn_format(name_buff, name, "", ARZ,
596
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
721
Yes you need to do this, because the starting value
722
for the autoincrement may not be zero.
601
There is a chance that the file was "discovered". In this case
602
just use whatever file is there.
724
create_stream.auto_increment= auto_increment_value ?
725
auto_increment_value - 1 : 0;
727
if (azclose(&create_stream))
604
if (!stat(name_buff, &file_stat))
607
if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY,
615
my_symlink(name_buff, linkname, MYF(0));
616
fn_format(name_buff, name, "", ".frm",
617
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
620
Here is where we open up the frm and pass it to archive to store
622
if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
624
if (fstat(frm_file, &file_stat))
626
frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
629
my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
630
azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
631
my_free((uchar*)frm_ptr, MYF(0));
634
my_close(frm_file, MYF(0));
637
if (create_info->comment.str)
638
azwrite_comment(&create_stream, create_info->comment.str,
639
(unsigned int)create_info->comment.length);
642
Yes you need to do this, because the starting value
643
for the autoincrement may not be zero.
645
create_stream.auto_increment= stats.auto_increment_value ?
646
stats.auto_increment_value - 1 : 0;
647
if (azclose(&create_stream))
739
661
/* Return error number, if we got one */
740
662
return(error ? error : -1);
811
733
Look at ha_archive::open() for an explanation of the row format.
812
734
Here we just write out the row.
814
736
Wondering about start_bulk_insert()? We don't implement it for
815
737
archive since it optimizes for lots of writes. The only save
816
for implementing start_bulk_insert() is that we could skip
738
for implementing start_bulk_insert() is that we could skip
817
739
setting dirty to true each time.
819
int ha_archive::write_row(unsigned char *buf)
741
int ha_archive::write_row(uchar *buf)
822
unsigned char *read_buf= NULL;
744
uchar *read_buf= NULL;
823
745
uint64_t temp_auto;
824
unsigned char *record= table->record[0];
746
uchar *record= table->record[0];
826
748
if (share->crashed)
827
749
return(HA_ERR_CRASHED_ON_USAGE);
829
751
ha_statistic_increment(&SSV::ha_write_count);
752
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
753
table->timestamp_field->set_time();
830
754
pthread_mutex_lock(&share->mutex);
832
756
if (share->archive_write_open == false)
844
768
We don't support decremening auto_increment. They make the performance
847
if (temp_auto <= share->archive_write.auto_increment &&
771
if (temp_auto <= share->archive_write.auto_increment &&
848
772
mkey->flags & HA_NOSAME)
850
774
rc= HA_ERR_FOUND_DUPP_KEY;
779
Bad news, this will cause a search for the unique value which is very
780
expensive since we will have to do a table scan which will lock up
781
all other writers during this period. This could perhaps be optimized
786
First we create a buffer that we can use for reading rows, and can pass
789
if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
791
rc= HA_ERR_OUT_OF_MEM;
795
All of the buffer must be written out or we won't see all of the
798
azflush(&(share->archive_write), Z_SYNC_FLUSH);
800
Set the position of the local read thread to the beginning postion.
802
if (read_data_header(&archive))
804
rc= HA_ERR_CRASHED_ON_USAGE;
808
Field *mfield= table->next_number_field;
810
while (!(get_row(&archive, read_buf)))
812
if (!memcmp(read_buf + mfield->offset(record),
813
table->next_number_field->ptr,
814
mfield->max_display_length()))
816
rc= HA_ERR_FOUND_DUPP_KEY;
855
824
if (temp_auto > share->archive_write.auto_increment)
868
837
pthread_mutex_unlock(&share->mutex);
870
free((unsigned char*) read_buf);
839
my_free((uchar*) read_buf, MYF(0));
876
void ha_archive::get_auto_increment(uint64_t, uint64_t, uint64_t,
877
uint64_t *first_value, uint64_t *nb_reserved_values)
845
void ha_archive::get_auto_increment(uint64_t offset __attribute__((unused)),
846
uint64_t increment __attribute__((unused)),
847
uint64_t nb_desired_values __attribute__((unused)),
848
uint64_t *first_value __attribute__((unused)),
849
uint64_t *nb_reserved_values __attribute__((unused)))
879
851
*nb_reserved_values= UINT64_MAX;
880
852
*first_value= share->archive_write.auto_increment + 1;
883
855
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
884
int ha_archive::index_init(uint32_t keynr, bool)
856
int ha_archive::index_init(uint keynr, bool sorted __attribute__((unused)))
886
858
active_index= keynr;
1134
1111
share->archive_write_open= false;
1138
proto_string= (char*)malloc(sizeof(char) * archive.frm_length);
1139
if (proto_string == NULL)
1143
azread_frm(&archive, proto_string);
1145
1114
/* Lets create a file to contain the new data */
1146
fn_format(writer_filename, share->table_name.c_str(), "", ARN,
1115
fn_format(writer_filename, share->table_name, "", ARN,
1147
1116
MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1149
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR, AZ_METHOD_BLOCK)))
1152
return(HA_ERR_CRASHED_ON_USAGE);
1155
azwrite_frm(&writer, proto_string, archive.frm_length);
1158
An extended rebuild is a lot more effort. We open up each row and re-record it.
1159
Any dead rows are removed (aka rows that may have been partially recorded).
1118
if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY, AZ_METHOD_BLOCK)))
1119
return(HA_ERR_CRASHED_ON_USAGE);
1122
An extended rebuild is a lot more effort. We open up each row and re-record it.
1123
Any dead rows are removed (aka rows that may have been partially recorded).
1161
1125
As of Archive format 3, this is the only type that is performed, before this
1162
1126
version it was just done on T_EXTEND
1223
1185
azclose(&writer);
1224
1186
share->dirty= false;
1226
1188
azclose(&archive);
1228
1190
// make the file we just wrote be our data file
1229
1191
rc = my_rename(writer_filename,share->data_file_name,MYF(0));
1235
1196
azclose(&writer);
1241
1202
Below is an example of how to setup row level locking.
1243
THR_LOCK_DATA **ha_archive::store_lock(Session *session,
1204
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1244
1205
THR_LOCK_DATA **to,
1245
1206
enum thr_lock_type lock_type)
1247
delayed_insert= false;
1208
if (lock_type == TL_WRITE_DELAYED)
1209
delayed_insert= true;
1211
delayed_insert= false;
1249
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1213
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1252
1216
Here is where we get into the guts of a row level lock.
1254
If we are not doing a LOCK Table or DISCARD/IMPORT
1255
TABLESPACE, then allow multiple writers
1218
If we are not doing a LOCK TABLE or DISCARD/IMPORT
1219
TABLESPACE, then allow multiple writers
1258
1222
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1259
lock_type <= TL_WRITE)
1260
&& !session_tablespace_op(session))
1223
lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1224
&& !thd_tablespace_op(thd))
1261
1225
lock_type = TL_WRITE_ALLOW_WRITE;
1264
1228
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1265
1229
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1266
1230
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1267
1231
to t2. Convert the lock to a normal read lock to allow
1268
concurrent inserts to t2.
1232
concurrent inserts to t2.
1271
if (lock_type == TL_READ_NO_INSERT)
1235
if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1272
1236
lock_type = TL_READ;
1274
1238
lock.type=lock_type;
1359
We just return state if asked.
1361
bool ha_archive::is_crashed() const
1363
return(share->crashed);
1380
1367
Simple scan of the tables to make sure everything is ok.
1383
int ha_archive::check(Session* session)
1370
int ha_archive::check(THD* thd,
1371
HA_CHECK_OPT* check_opt __attribute__((unused)))
1386
1374
const char *old_proc_info;
1389
old_proc_info= get_session_proc_info(session);
1390
set_session_proc_info(session, "Checking table");
1377
old_proc_info= thd_proc_info(thd, "Checking table");
1391
1378
/* Flush any waiting data */
1392
1379
pthread_mutex_lock(&share->mutex);
1393
1380
azflush(&(share->archive_write), Z_SYNC_FLUSH);
1394
1381
pthread_mutex_unlock(&share->mutex);
1397
Now we will rewind the archive file so that we are positioned at the
1384
Now we will rewind the archive file so that we are positioned at the
1398
1385
start of the file.
1400
1387
init_archive_reader();
1424
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1412
Check and repair the table if needed.
1414
bool ha_archive::check_and_repair(THD *thd)
1416
HA_CHECK_OPT check_opt;
1420
return(repair(thd, &check_opt));
1423
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1426
1425
archive_record_buffer *r;
1427
if (!(r= (archive_record_buffer*) malloc(sizeof(archive_record_buffer))))
1427
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1430
return(NULL); /* purecov: inspected */
1431
1432
r->length= (int)length;
1433
if (!(r->buffer= (unsigned char*) malloc(r->length)))
1434
if (!(r->buffer= (uchar*) my_malloc(r->length,
1437
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
1438
return(NULL); /* purecov: inspected */
1442
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1444
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1444
free((char*) r->buffer);
1446
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
1447
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));