12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
16
#include "heap_priv.h"
17
17
#include <drizzled/error.h>
18
18
#include <drizzled/table.h>
19
19
#include <drizzled/session.h>
20
#include <drizzled/current_session.h>
20
21
#include <drizzled/field/timestamp.h>
21
22
#include <drizzled/field/varstring.h>
22
#include "drizzled/plugin/daemon.h"
24
#include <boost/thread/mutex.hpp>
27
25
#include "ha_heap.h"
43
41
class HeapEngine : public plugin::StorageEngine
46
explicit HeapEngine(string name_arg) :
47
plugin::StorageEngine(name_arg,
48
HTON_STATS_RECORDS_IS_EXACT |
53
HTON_SKIP_STORE_LOCK |
60
hp_panic(HA_PANIC_CLOSE);
63
virtual Cursor *create(Table &table)
65
return new ha_heap(*this, table);
44
HeapEngine(string name_arg)
45
: plugin::StorageEngine(name_arg,
46
HTON_STATS_RECORDS_IS_EXACT |
51
HTON_SKIP_STORE_LOCK |
55
virtual Cursor *create(TableShare &table,
56
memory::Root *mem_root)
58
return new (mem_root) ha_heap(*this, table);
68
61
const char **bas_ext() const {
69
62
return ha_heap_exts;
72
int doCreateTable(Session &session,
74
const TableIdentifier &identifier,
65
int doCreateTable(Session *session,
66
const char *table_name,
75
68
message::Table &create_proto);
77
70
/* For whatever reason, internal tables can be created by Cursor::open()
85
78
message::Table &create_proto,
86
79
HP_SHARE **internal_share);
88
int doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to);
81
int doRenameTable(Session*, const char * from, const char * to);
90
int doDropTable(Session&, const TableIdentifier &identifier);
83
int doDropTable(Session&, const string table_path);
92
85
int doGetTableDefinition(Session& session,
93
const TableIdentifier &identifier,
94
message::Table &table_message);
88
const char *table_name,
90
message::Table *table_proto);
92
/* Temp only engine, so do not return values. */
93
void doGetTableNames(CachedDirectory &, string& , set<string>&) { };
96
95
uint32_t max_supported_keys() const { return MAX_KEY; }
97
96
uint32_t max_supported_key_part_length() const { return MAX_KEY_LENGTH; }
99
uint32_t index_flags(enum ha_key_alg ) const
98
uint32_t index_flags(enum ha_key_alg algorithm) const
101
return ( HA_ONLY_WHOLE_INDEX | HA_KEY_SCAN_NOT_ROR);
100
return ((algorithm == HA_KEY_ALG_BTREE) ?
105
HA_ONLY_WHOLE_INDEX |
106
HA_KEY_SCAN_NOT_ROR);
104
bool doDoesTableExist(Session& session, const TableIdentifier &identifier);
105
void doGetTableIdentifiers(CachedDirectory &directory,
106
const SchemaIdentifier &schema_identifier,
107
TableIdentifier::vector &set_of_identifiers);
110
void HeapEngine::doGetTableIdentifiers(CachedDirectory&,
111
const SchemaIdentifier&,
112
TableIdentifier::vector&)
116
bool HeapEngine::doDoesTableExist(Session& session, const TableIdentifier &identifier)
118
return session.getMessageCache().doesTableMessageExist(identifier);
121
int HeapEngine::doGetTableDefinition(Session &session,
122
const TableIdentifier &identifier,
123
message::Table &table_proto)
125
if (session.getMessageCache().getTableMessage(identifier, table_proto))
111
int HeapEngine::doGetTableDefinition(Session&,
116
message::Table *table_proto)
119
ProtoCache::iterator iter;
121
pthread_mutex_lock(&proto_cache_mutex);
122
iter= proto_cache.find(path);
124
if (iter!= proto_cache.end())
127
table_proto->CopyFrom(((*iter).second));
130
pthread_mutex_unlock(&proto_cache_mutex);
131
135
We have to ignore ENOENT entries as the MEMORY table is created on open and
132
136
not when doing a CREATE on the table.
134
int HeapEngine::doDropTable(Session &session, const TableIdentifier &identifier)
138
int HeapEngine::doDropTable(Session&, const string table_path)
136
session.getMessageCache().removeTableMessage(identifier);
138
int error= heap_delete_table(identifier.getPath().c_str());
140
ProtoCache::iterator iter;
142
pthread_mutex_lock(&proto_cache_mutex);
143
iter= proto_cache.find(table_path.c_str());
145
if (iter!= proto_cache.end())
146
proto_cache.erase(iter);
147
pthread_mutex_unlock(&proto_cache_mutex);
149
return heap_delete_table(table_path.c_str());
146
152
static HeapEngine *heap_storage_engine= NULL;
148
static int heap_init(module::Context &context)
154
static int heap_init(plugin::Registry ®istry)
150
156
heap_storage_engine= new HeapEngine(engine_name);
151
context.add(heap_storage_engine);
157
registry.add(heap_storage_engine);
158
pthread_mutex_init(&THR_LOCK_heap, MY_MUTEX_INIT_FAST);
162
static int heap_deinit(plugin::Registry ®istry)
164
registry.remove(heap_storage_engine);
165
delete heap_storage_engine;
167
int ret= hp_panic(HA_PANIC_CLOSE);
169
pthread_mutex_destroy(&THR_LOCK_heap);
156
176
/*****************************************************************************
158
178
*****************************************************************************/
160
180
ha_heap::ha_heap(plugin::StorageEngine &engine_arg,
181
TableShare &table_arg)
162
182
:Cursor(engine_arg, table_arg), file(0), records_changed(0), key_stat_version(0),
163
183
internal_table(0)
177
197
#define MEMORY_STATS_UPDATE_THRESHOLD 10
179
int ha_heap::doOpen(const drizzled::TableIdentifier &identifier, int mode, uint32_t test_if_locked)
199
int ha_heap::open(const char *name, int mode, uint32_t test_if_locked)
181
if ((test_if_locked & HA_OPEN_INTERNAL_TABLE) || (!(file= heap_open(identifier.getPath().c_str(), mode)) && errno == ENOENT))
201
if ((test_if_locked & HA_OPEN_INTERNAL_TABLE) || (!(file= heap_open(name, mode)) && errno == ENOENT))
203
HA_CREATE_INFO create_info;
183
204
internal_table= test(test_if_locked & HA_OPEN_INTERNAL_TABLE);
205
memset(&create_info, 0, sizeof(create_info));
185
207
HP_SHARE *internal_share= NULL;
186
208
message::Table create_proto;
188
if (not heap_storage_engine->heap_create_table(getTable()->in_use,
189
identifier.getPath().c_str(),
210
if (!heap_storage_engine->heap_create_table(ha_session(), name, table,
195
215
file= internal_table ?
196
216
heap_open_from_share(internal_share, mode) :
235
256
Do same as default implementation but use file->s->name instead of
236
table->getShare()->path. This is needed by Windows where the clone() call sees
237
'/'-delimited path in table->getShare()->path, while ha_peap::open() was called
257
table->s->path. This is needed by Windows where the clone() call sees
258
'/'-delimited path in table->s->path, while ha_peap::open() was called
238
259
with '\'-delimited path.
241
Cursor *ha_heap::clone(memory::Root *)
262
Cursor *ha_heap::clone(memory::Root *mem_root)
243
Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
244
TableIdentifier identifier(getTable()->getShare()->getSchemaName(),
245
getTable()->getShare()->getTableName(),
246
getTable()->getShare()->getPath());
264
Cursor *new_handler= table->s->db_type()->getCursor(*table->s, mem_root);
248
if (new_handler && !new_handler->ha_open(identifier, getTable()->db_stat,
266
if (new_handler && !new_handler->ha_open(table, file->s->name, table->db_stat,
249
267
HA_OPEN_IGNORE_IF_LOCKED))
250
268
return new_handler;
255
const char *ha_heap::index_type(uint32_t )
273
const char *ha_heap::index_type(uint32_t inx)
275
return ((table_share->key_info[inx].algorithm == HA_KEY_ALG_BTREE) ?
277
296
void ha_heap::set_keys_for_scanning(void)
299
for (uint32_t i= 0 ; i < table->s->keys ; i++)
301
if (table->key_info[i].algorithm == HA_KEY_ALG_BTREE)
282
307
void ha_heap::update_key_stats()
284
for (uint32_t i= 0; i < getTable()->getShare()->sizeKeys(); i++)
309
for (uint32_t i= 0; i < table->s->keys; i++)
286
KeyInfo *key= &getTable()->key_info[i];
311
KEY *key=table->key_info+i;
288
312
if (!key->rec_per_key)
314
if (key->algorithm != HA_KEY_ALG_BTREE)
292
316
if (key->flags & HA_NOSAME)
293
317
key->rec_per_key[key->key_parts-1]= 1;
296
ha_rows hash_buckets= file->getShare()->keydef[i].hash_buckets;
297
uint32_t no_records= hash_buckets ? (uint) (file->getShare()->records/hash_buckets) : 2;
320
ha_rows hash_buckets= file->s->keydef[i].hash_buckets;
321
uint32_t no_records= hash_buckets ? (uint) (file->s->records/hash_buckets) : 2;
298
322
if (no_records < 2)
300
324
key->rec_per_key[key->key_parts-1]= no_records;
304
328
records_changed= 0;
305
329
/* At the end of update_key_stats() we can proudly claim they are OK. */
306
key_stat_version= file->getShare()->key_stat_version;
330
key_stat_version= file->s->key_stat_version;
310
int ha_heap::doInsertRecord(unsigned char * buf)
334
int ha_heap::write_row(unsigned char * buf)
313
if (getTable()->next_number_field && buf == getTable()->getInsertRecord())
337
ha_statistic_increment(&SSV::ha_write_count);
338
if (table->next_number_field && buf == table->record[0])
315
340
if ((res= update_auto_increment()))
318
343
res= heap_write(file,buf);
319
344
if (!res && (++records_changed*MEMORY_STATS_UPDATE_THRESHOLD >
320
file->getShare()->records))
323
348
We can perform this safely since only one writer at the time is
324
349
allowed on the table.
326
file->getShare()->key_stat_version++;
351
file->s->key_stat_version++;
331
int ha_heap::doUpdateRecord(const unsigned char * old_data, unsigned char * new_data)
356
int ha_heap::update_row(const unsigned char * old_data, unsigned char * new_data)
359
ha_statistic_increment(&SSV::ha_update_count);
360
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
361
table->timestamp_field->set_time();
335
362
res= heap_update(file,old_data,new_data);
336
363
if (!res && ++records_changed*MEMORY_STATS_UPDATE_THRESHOLD >
337
file->getShare()->records)
340
367
We can perform this safely since only one writer at the time is
341
368
allowed on the table.
343
file->getShare()->key_stat_version++;
370
file->s->key_stat_version++;
348
int ha_heap::doDeleteRecord(const unsigned char * buf)
375
int ha_heap::delete_row(const unsigned char * buf)
378
ha_statistic_increment(&SSV::ha_delete_count);
352
379
res= heap_delete(file,buf);
353
if (!res && getTable()->getShare()->getType() == message::Table::STANDARD &&
354
++records_changed*MEMORY_STATS_UPDATE_THRESHOLD > file->getShare()->records)
380
if (!res && table->s->tmp_table == NO_TMP_TABLE &&
381
++records_changed*MEMORY_STATS_UPDATE_THRESHOLD > file->s->records)
357
384
We can perform this safely since only one writer at the time is
358
385
allowed on the table.
360
file->getShare()->key_stat_version++;
387
file->s->key_stat_version++;
388
415
key_part_map keypart_map,
389
416
enum ha_rkey_function find_flag)
391
ha_statistic_increment(&system_status_var::ha_read_key_count);
418
ha_statistic_increment(&SSV::ha_read_key_count);
392
419
int error = heap_rkey(file, buf, index, key, keypart_map, find_flag);
393
getTable()->status = error ? STATUS_NOT_FOUND : 0;
420
table->status = error ? STATUS_NOT_FOUND : 0;
397
424
int ha_heap::index_next(unsigned char * buf)
399
426
assert(inited==INDEX);
400
ha_statistic_increment(&system_status_var::ha_read_next_count);
427
ha_statistic_increment(&SSV::ha_read_next_count);
401
428
int error=heap_rnext(file,buf);
402
getTable()->status=error ? STATUS_NOT_FOUND: 0;
429
table->status=error ? STATUS_NOT_FOUND: 0;
406
433
int ha_heap::index_prev(unsigned char * buf)
408
435
assert(inited==INDEX);
409
ha_statistic_increment(&system_status_var::ha_read_prev_count);
436
ha_statistic_increment(&SSV::ha_read_prev_count);
410
437
int error=heap_rprev(file,buf);
411
getTable()->status=error ? STATUS_NOT_FOUND: 0;
438
table->status=error ? STATUS_NOT_FOUND: 0;
415
442
int ha_heap::index_first(unsigned char * buf)
417
444
assert(inited==INDEX);
418
ha_statistic_increment(&system_status_var::ha_read_first_count);
445
ha_statistic_increment(&SSV::ha_read_first_count);
419
446
int error=heap_rfirst(file, buf, active_index);
420
getTable()->status=error ? STATUS_NOT_FOUND: 0;
447
table->status=error ? STATUS_NOT_FOUND: 0;
424
451
int ha_heap::index_last(unsigned char * buf)
426
453
assert(inited==INDEX);
427
ha_statistic_increment(&system_status_var::ha_read_last_count);
454
ha_statistic_increment(&SSV::ha_read_last_count);
428
455
int error=heap_rlast(file, buf, active_index);
429
getTable()->status=error ? STATUS_NOT_FOUND: 0;
456
table->status=error ? STATUS_NOT_FOUND: 0;
433
int ha_heap::doStartTableScan(bool scan)
460
int ha_heap::rnd_init(bool scan)
435
462
return scan ? heap_scan_init(file) : 0;
438
465
int ha_heap::rnd_next(unsigned char *buf)
440
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
467
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
441
468
int error=heap_scan(file, buf);
442
getTable()->status=error ? STATUS_NOT_FOUND: 0;
469
table->status=error ? STATUS_NOT_FOUND: 0;
620
656
void ha_heap::drop_table(const char *)
622
file->getShare()->delete_on_close= 1;
658
file->s->delete_on_close= 1;
627
int HeapEngine::doRenameTable(Session &session, const TableIdentifier &from, const TableIdentifier &to)
663
int HeapEngine::doRenameTable(Session*,
664
const char *from, const char *to)
629
session.getMessageCache().renameTableMessage(from, to);
630
return heap_rename(from.getPath().c_str(), to.getPath().c_str());
666
return heap_rename(from,to);
634
670
ha_rows ha_heap::records_in_range(uint32_t inx, key_range *min_key,
635
671
key_range *max_key)
637
KeyInfo *key= &getTable()->key_info[inx];
673
KEY *key=table->key_info+inx;
674
if (key->algorithm == HA_KEY_ALG_BTREE)
675
return hp_rb_records_in_range(file, inx, min_key, max_key);
639
677
if (!min_key || !max_key ||
640
678
min_key->length != max_key->length ||
647
685
return stats.records;
649
687
/* Assert that info() did run. We need current statistics here. */
650
assert(key_stat_version == file->getShare()->key_stat_version);
688
assert(key_stat_version == file->s->key_stat_version);
651
689
return key->rec_per_key[key->key_parts-1];
654
int HeapEngine::doCreateTable(Session &session,
692
int HeapEngine::doCreateTable(Session *session,
693
const char *table_name,
655
694
Table &table_arg,
656
const TableIdentifier &identifier,
657
695
message::Table& create_proto)
660
698
HP_SHARE *internal_share;
661
const char *table_name= identifier.getPath().c_str();
663
error= heap_create_table(&session, table_name, &table_arg,
700
error= heap_create_table(session, table_name, &table_arg,
666
703
&internal_share);
670
session.getMessageCache().storeTableMessage(identifier, create_proto);
707
pthread_mutex_lock(&proto_cache_mutex);
708
proto_cache.insert(make_pair(table_name, create_proto));
709
pthread_mutex_unlock(&proto_cache_mutex);
680
719
message::Table &create_proto,
681
720
HP_SHARE **internal_share)
683
uint32_t key, parts, mem_per_row_keys= 0;
684
uint32_t keys= table_arg->getShare()->sizeKeys();
722
uint32_t key, parts, mem_per_row_keys= 0, keys= table_arg->s->keys;
685
723
uint32_t auto_key= 0, auto_key_type= 0;
686
724
uint32_t max_key_fieldnr = 0, key_part_size = 0, next_field_pos = 0;
687
uint32_t column_count= table_arg->getShare()->sizeFields();
688
std::vector<HP_KEYDEF> keydef;
725
uint32_t column_idx, column_count= table_arg->s->fields;
726
HP_COLUMNDEF *columndef;
729
char buff[FN_REFLEN];
731
TableShare *share= table_arg->s;
690
732
bool found_real_auto_increment= 0;
695
737
* can return a number more than that, we trap it here instead of casting
696
738
* to a truncated integer.
698
uint64_t num_rows= table_arg->getShare()->getMaxRows();
740
uint64_t num_rows= share->getMaxRows();
699
741
if (num_rows > UINT32_MAX)
744
if (!(columndef= (HP_COLUMNDEF*) malloc(column_count * sizeof(HP_COLUMNDEF))))
747
for (column_idx= 0; column_idx < column_count; column_idx++)
749
Field* field= *(table_arg->field + column_idx);
750
HP_COLUMNDEF* column= columndef + column_idx;
751
column->type= (uint16_t)field->type();
752
column->length= field->pack_length();
753
column->offset= field->offset(field->table->record[0]);
757
column->null_bit= field->null_bit;
758
column->null_pos= (uint) (field->null_ptr - (unsigned char*) table_arg->record[0]);
766
if (field->type() == DRIZZLE_TYPE_VARCHAR)
768
column->length_bytes= (uint8_t)(((Field_varstring*)field)->length_bytes);
772
column->length_bytes= 0;
702
776
for (key= parts= 0; key < keys; key++)
703
777
parts+= table_arg->key_info[key].key_parts;
706
std::vector<HA_KEYSEG> seg_buffer;
707
seg_buffer.resize(parts);
708
HA_KEYSEG *seg= &seg_buffer[0];
779
if (!(keydef= (HP_KEYDEF*) malloc(keys * sizeof(HP_KEYDEF) +
780
parts * sizeof(HA_KEYSEG))))
782
free((void *) columndef);
786
seg= reinterpret_cast<HA_KEYSEG*> (keydef + keys);
710
787
for (key= 0; key < keys; key++)
712
KeyInfo *pos= &table_arg->key_info[key];
713
KeyPartInfo *key_part= pos->key_part;
714
KeyPartInfo *key_part_end= key_part + pos->key_parts;
789
KEY *pos= table_arg->key_info+key;
790
KEY_PART_INFO *key_part= pos->key_part;
791
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
716
793
keydef[key].keysegs= (uint) pos->key_parts;
717
794
keydef[key].flag= (pos->flags & (HA_NOSAME | HA_NULL_ARE_EQUAL));
718
795
keydef[key].seg= seg;
720
mem_per_row_keys+= sizeof(char*) * 2; // = sizeof(HASH_INFO)
797
switch (pos->algorithm) {
798
case HA_KEY_ALG_UNDEF:
799
case HA_KEY_ALG_HASH:
800
keydef[key].algorithm= HA_KEY_ALG_HASH;
801
mem_per_row_keys+= sizeof(char*) * 2; // = sizeof(HASH_INFO)
803
case HA_KEY_ALG_BTREE:
804
keydef[key].algorithm= HA_KEY_ALG_BTREE;
805
mem_per_row_keys+=sizeof(TREE_ELEMENT)+pos->key_length+sizeof(char*);
808
assert(0); // cannot happen
722
811
for (; key_part != key_part_end; key_part++, seg++)
724
813
Field *field= key_part->field;
815
if (pos->algorithm == HA_KEY_ALG_BTREE)
816
seg->type= field->key_type();
727
819
if ((seg->type = field->key_type()) != (int) HA_KEYTYPE_TEXT &&
728
820
seg->type != HA_KEYTYPE_VARTEXT1 &&
738
830
next_field_pos= seg->start + seg->length;
739
831
if (field->type() == DRIZZLE_TYPE_VARCHAR)
741
next_field_pos+= (uint8_t)(((Field_varstring*)field)->pack_length_no_ptr());
833
next_field_pos+= (uint8_t)(((Field_varstring*)field)->length_bytes);
744
836
if (next_field_pos > key_part_size) {
745
837
key_part_size= next_field_pos;
748
if (field->flags & ENUM_FLAG)
840
if (field->flags & (ENUM_FLAG | SET_FLAG))
749
841
seg->charset= &my_charset_bin;
751
843
seg->charset= field->charset();
752
844
if (field->null_ptr)
754
846
seg->null_bit= field->null_bit;
755
seg->null_pos= (uint) (field->null_ptr - (unsigned char*) table_arg->getInsertRecord());
847
seg->null_pos= (uint) (field->null_ptr - (unsigned char*) table_arg->record[0]);
770
862
auto_key= key+ 1;
771
863
auto_key_type= field->key_type();
773
if ((uint)field->position() + 1 > max_key_fieldnr)
865
if ((uint)field->field_index + 1 > max_key_fieldnr)
775
867
/* Do not use seg->fieldnr as it's not reliable in case of temp tables */
776
max_key_fieldnr= field->position() + 1;
868
max_key_fieldnr= field->field_index + 1;
781
if (key_part_size < table_arg->getShare()->null_bytes + ((table_arg->getShare()->last_null_bit_pos+7) >> 3))
873
if (key_part_size < share->null_bytes + ((share->last_null_bit_pos+7) >> 3))
783
875
/* Make sure to include null fields regardless of the presense of keys */
784
key_part_size = table_arg->getShare()->null_bytes + ((table_arg->getShare()->last_null_bit_pos+7) >> 3);
876
key_part_size = share->null_bytes + ((share->last_null_bit_pos+7) >> 3);
789
881
if (table_arg->found_next_number_field)
791
keydef[table_arg->getShare()->next_number_index].flag|= HA_AUTO_KEY;
792
found_real_auto_increment= table_arg->getShare()->next_number_key_offset == 0;
883
keydef[share->next_number_index].flag|= HA_AUTO_KEY;
884
found_real_auto_increment= share->next_number_key_offset == 0;
794
886
HP_CREATE_INFO hp_create_info;
795
887
hp_create_info.auto_key= auto_key;
799
891
hp_create_info.max_table_size=session->variables.max_heap_table_size;
800
892
hp_create_info.with_auto_increment= found_real_auto_increment;
801
893
hp_create_info.internal_table= internal_table;
802
hp_create_info.max_chunk_size= table_arg->getShare()->block_size;
804
error= heap_create(table_name,
808
table_arg->getShare()->getRecordLength(), mem_per_row_keys,
809
static_cast<uint32_t>(num_rows), /* We check for overflow above, so cast is fine here. */
811
&hp_create_info, internal_share);
894
hp_create_info.max_chunk_size= share->block_size;
895
hp_create_info.is_dynamic= (share->row_type == ROW_TYPE_DYNAMIC);
897
error= heap_create(internal::fn_format(buff,table_name,"","",
898
MY_REPLACE_EXT|MY_UNPACK_FILENAME),
900
column_count, columndef,
901
max_key_fieldnr, key_part_size,
902
share->reclength, mem_per_row_keys,
903
static_cast<uint32_t>(num_rows), /* We check for overflow above, so cast is fine here. */
905
&hp_create_info, internal_share);
907
free((unsigned char*) keydef);
908
free((void *) columndef);