1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 - 2010 Toru Maesaka
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
using namespace drizzled;
26
static pthread_mutex_t blitz_utility_mutex;
28
static const char *ha_blitz_exts[] = {
35
class BlitzEngine : public drizzled::plugin::StorageEngine {
37
TCMAP *blitz_table_cache;
40
BlitzEngine(const std::string &name_arg) :
41
drizzled::plugin::StorageEngine(name_arg,
42
drizzled::HTON_NULL_IN_KEY |
43
drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
44
drizzled::HTON_STATS_RECORDS_IS_EXACT |
45
drizzled::HTON_SKIP_STORE_LOCK) {
46
table_definition_ext = BLITZ_SYSTEM_EXT;
49
virtual ~BlitzEngine() {
50
pthread_mutex_destroy(&blitz_utility_mutex);
51
tcmapdel(blitz_table_cache);
54
virtual drizzled::Cursor *create(drizzled::TableShare &table,
55
drizzled::memory::Root *mem_root) {
56
return new (mem_root) ha_blitz(*this, table);
59
const char **bas_ext() const {
63
int doCreateTable(drizzled::Session &session,
64
drizzled::Table &table_arg,
65
drizzled::TableIdentifier &identifier,
66
drizzled::message::Table &table_proto);
68
int doRenameTable(drizzled::Session &session,
69
drizzled::TableIdentifier &from_identifier,
70
drizzled::TableIdentifier &to_identifier);
72
int doDropTable(drizzled::Session &session,
73
drizzled::TableIdentifier &identifier);
75
int doGetTableDefinition(drizzled::Session &session,
76
drizzled::TableIdentifier &identifier,
77
drizzled::message::Table &table_proto);
79
void doGetTableNames(drizzled::CachedDirectory &directory,
80
drizzled::SchemaIdentifier &schema_identifier,
81
std::set<std::string>& set_of_names);
83
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
84
drizzled::SchemaIdentifier &schema_identifier,
85
drizzled::TableIdentifiers &set_of_identifiers);
87
bool doDoesTableExist(drizzled::Session &session,
88
drizzled::TableIdentifier &identifier);
90
bool doCreateTableCache(void);
92
BlitzShare *getTableShare(const std::string &name);
93
void cacheTableShare(const std::string &name, BlitzShare *share);
94
void deleteTableShare(const std::string &name);
96
uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
97
uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
98
uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
100
uint32_t index_flags(enum drizzled::ha_key_alg) const {
101
return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
102
HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
106
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
107
a key to that row in the data dictionary. Two keys are merged and
108
stored as a key because we want to avoid reading the leaf node and
109
thus save disk IO and some computation in the tree. Note that the
110
comparison function of BlitzDB's btree only takes into accound the
111
actual index key. See blitzcmp.cc for details.
113
With the above in mind, this helper function returns a pointer to
114
the dictionary key by calculating the offset. */
115
static char *skip_btree_key(const char *key, const size_t skip_len,
118
int BlitzEngine::doCreateTable(drizzled::Session &,
119
drizzled::Table &table,
120
drizzled::TableIdentifier &identifier,
121
drizzled::message::Table &proto) {
126
/* Temporary fix for blocking composite keys. We need to add this
127
check because version 1 doesn't handle composite indexes. */
128
for (uint32_t i = 0; i < table.s->keys; i++) {
129
if (table.key_info[i].key_parts > 1)
130
return HA_ERR_UNSUPPORTED;
133
/* Create relevant files for a new table and close them immediately.
134
All we want to do here is somewhat like UNIX touch(1). */
135
if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
138
if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
141
/* Create b+tree index(es) for this table. */
142
for (uint32_t i = 0; i < table.s->keys; i++) {
143
if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
147
/* Write the table definition to system table. */
148
if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
151
if (!dict.write_table_definition(proto)) {
152
dict.close_system_table();
153
return HA_ERR_CRASHED_ON_USAGE;
156
dict.close_system_table();
160
int BlitzEngine::doRenameTable(drizzled::Session &,
161
drizzled::TableIdentifier &from,
162
drizzled::TableIdentifier &to) {
165
BlitzData blitz_table;
168
/* Find out the number of indexes in this table. This information
169
is required because BlitzDB creates a file for each indexes.*/
170
if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
171
return HA_ERR_CRASHED_ON_USAGE;
173
nkeys = blitz_table.read_meta_keycount();
175
if (blitz_table.close_data_table() != 0)
176
return HA_ERR_CRASHED_ON_USAGE;
178
/* We're now ready to rename the file(s) for this table. Start by
179
attempting to rename the data and system files. */
180
if (rename_file_ext(from.getPath().c_str(),
181
to.getPath().c_str(), BLITZ_DATA_EXT)) {
182
if ((rv = errno) != ENOENT)
186
if (rename_file_ext(from.getPath().c_str(),
187
to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
188
if ((rv = errno) != ENOENT)
192
/* So far so good. Rename the index file(s) and we're done. */
195
for (uint32_t i = 0; i < nkeys; i++) {
196
if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
197
return HA_ERR_CRASHED_ON_USAGE;
203
int BlitzEngine::doDropTable(drizzled::Session &,
204
drizzled::TableIdentifier &identifier) {
211
/* We open the dictionary to extract meta data from it */
212
if ((err = dict.open_data_table(identifier.getPath().c_str(),
217
nkeys = dict.read_meta_keycount();
219
/* We no longer need the dictionary to be open */
220
dict.close_data_table();
222
/* Drop the Data Dictionary */
223
snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
224
if ((err = unlink(buf)) == -1) {
228
/* Drop the System Table */
229
snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
230
if ((err = unlink(buf)) == -1) {
234
/* Drop Index file(s) */
235
for (uint32_t i = 0; i < nkeys; i++) {
236
if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
244
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
245
drizzled::TableIdentifier &identifier,
246
drizzled::message::Table &proto) {
247
struct stat stat_info;
248
std::string path(identifier.getPath());
250
path.append(BLITZ_SYSTEM_EXT);
252
if (stat(path.c_str(), &stat_info)) {
258
int proto_string_len;
260
if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
261
return HA_ERR_CRASHED_ON_USAGE;
264
proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
265
BLITZ_TABLE_PROTO_KEY.length(),
268
if (db.close_system_table() != 0) {
269
return HA_ERR_CRASHED_ON_USAGE;
272
if (proto_string == NULL) {
276
if (!proto.ParseFromArray(proto_string, proto_string_len)) {
278
return HA_ERR_CRASHED_ON_USAGE;
286
void BlitzEngine::doGetTableNames(drizzled::CachedDirectory &directory,
287
drizzled::SchemaIdentifier &,
288
std::set<string> &set_of_names) {
289
drizzled::CachedDirectory::Entries entries = directory.getEntries();
291
for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
292
entry_iter != entries.end(); ++entry_iter) {
294
drizzled::CachedDirectory::Entry *entry = *entry_iter;
295
std::string *filename = &entry->filename;
297
assert(filename->size());
299
const char *ext = strchr(filename->c_str(), '.');
302
char uname[NAME_LEN + 1];
303
uint32_t file_name_len;
305
file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
309
uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
310
set_of_names.insert(uname);
315
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
316
drizzled::SchemaIdentifier &schema_id,
317
drizzled::TableIdentifiers &ids) {
318
drizzled::CachedDirectory::Entries entries = directory.getEntries();
320
for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
321
entry_iter != entries.end(); ++entry_iter) {
323
drizzled::CachedDirectory::Entry *entry = *entry_iter;
324
const std::string *filename = &entry->filename;
326
assert(filename->size());
328
const char *ext = strchr(filename->c_str(), '.');
330
if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
331
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
333
char uname[NAME_LEN + 1];
334
uint32_t file_name_len;
336
file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
340
uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
341
ids.push_back(TableIdentifier(schema_id, uname));
346
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
347
drizzled::TableIdentifier &identifier) {
348
std::string proto_path(identifier.getPath());
349
proto_path.append(BLITZ_DATA_EXT);
351
return (access(proto_path.c_str(), F_OK)) ? false : true;
354
bool BlitzEngine::doCreateTableCache(void) {
355
return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
358
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
361
BlitzShare *rv = NULL;
363
fetched = tcmapget(blitz_table_cache, table_name.c_str(),
364
table_name.length(), &vlen);
366
/* dereference the object */
368
rv = *(BlitzShare **)fetched;
373
void BlitzEngine::cacheTableShare(const std::string &table_name,
375
/* Cache the memory address of the share object */
376
tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
377
&share, sizeof(share));
380
void BlitzEngine::deleteTableShare(const std::string &table_name) {
381
tcmapout2(blitz_table_cache, table_name.c_str());
384
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
385
TableShare &table_arg) : Cursor(engine_arg, table_arg),
389
thread_locked(false),
397
int ha_blitz::open(const char *table_name, int, uint32_t) {
398
if ((share = get_share(table_name)) == NULL)
399
return HA_ERR_CRASHED_ON_USAGE;
401
pthread_mutex_lock(&blitz_utility_mutex);
403
btree_cursor = new BlitzCursor[share->nkeys];
405
for (uint32_t i = 0; i < share->nkeys; i++) {
406
if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
408
pthread_mutex_unlock(&blitz_utility_mutex);
409
return HA_ERR_OUT_OF_MEM;
413
if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
415
pthread_mutex_unlock(&blitz_utility_mutex);
416
return HA_ERR_OUT_OF_MEM;
419
if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
421
pthread_mutex_unlock(&blitz_utility_mutex);
422
return HA_ERR_OUT_OF_MEM;
425
if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
428
free(key_merge_buffer);
429
pthread_mutex_unlock(&blitz_utility_mutex);
430
return HA_ERR_OUT_OF_MEM;
433
secondary_row_buffer = NULL;
434
secondary_row_buffer_size = 0;
435
key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
437
/* 'ref_length' determines the size of the buffer that the kernel
438
will use to uniquely identify a row. The actual allocation is
439
done by the kernel so all we do here is specify the size of it.*/
440
if (share->primary_key_exists) {
441
ref_length = table->key_info[table->s->getPrimaryKey()].key_length;
443
ref_length = sizeof(held_key_len) + sizeof(uint64_t);
446
pthread_mutex_unlock(&blitz_utility_mutex);
450
int ha_blitz::close(void) {
451
for (uint32_t i = 0; i < share->nkeys; i++) {
452
share->btrees[i].destroy_cursor(&btree_cursor[i]);
454
delete [] btree_cursor;
457
free(key_merge_buffer);
459
free(secondary_row_buffer);
463
int ha_blitz::info(uint32_t flag) {
464
if (flag & HA_STATUS_VARIABLE) {
465
stats.records = share->dict.nrecords();
466
stats.data_file_length = share->dict.table_size();
469
if (flag & HA_STATUS_AUTO)
470
stats.auto_increment_value = share->auto_increment_value + 1;
472
if (flag & HA_STATUS_ERRKEY)
478
int ha_blitz::doStartTableScan(bool scan) {
479
/* Obtain the query type for this scan */
480
sql_command_type = session_sql_command(current_session);
484
/* Obtain the most suitable lock for the given statement type. */
485
critical_section_enter();
487
/* Get the first record from TCHDB. Let the scanner take
488
care of checking return value errors. */
490
current_key = share->dict.next_key_and_row(NULL, 0,
498
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
500
const char *next_row;
507
if (current_key == NULL) {
508
table->status = STATUS_NOT_FOUND;
509
return HA_ERR_END_OF_FILE;
512
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
514
/* Unpack and copy the current row to Drizzle's result buffer. */
515
unpack_row(drizzle_buf, current_row, current_row_len);
517
/* Retrieve both key and row of the next record with one allocation. */
518
next_key = share->dict.next_key_and_row(current_key, current_key_len,
519
&next_key_len, &next_row,
522
/* Memory region for "current_row" will be freed as "held key" on
523
the next iteration. This is because "current_key" points to the
524
region of memory that contains "current_row" and "held_key" points
525
to it. If there isn't another iteration then it is freed in doEndTableScan(). */
526
current_row = next_row;
527
current_row_len = next_row_len;
529
/* Remember the current row because delete, update or replace
530
function could be called after this function. This pointer is
531
also used to free the previous key and row, which resides on
533
held_key = current_key;
534
held_key_len = current_key_len;
536
/* It is now memory-leak-safe to point current_key to next_key. */
537
current_key = next_key;
538
current_key_len = next_key_len;
543
int ha_blitz::doEndTableScan() {
544
if (table_scan && current_key)
546
if (table_scan && held_key)
557
critical_section_exit();
562
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
565
int key_len, row_len;
567
memcpy(&key_len, pos, sizeof(key_len));
568
key = (char *)(pos + sizeof(key_len));
570
/* TODO: Find a better error type. */
572
return HA_ERR_KEY_NOT_FOUND;
574
row = share->dict.get_row(key, key_len, &row_len);
577
return HA_ERR_KEY_NOT_FOUND;
579
unpack_row(copy_to, row, row_len);
581
/* Remember the key location on memory if the thread is not doing
582
a table scan. This is because either update_row() or delete_row()
583
might be called after this function. */
586
held_key_len = key_len;
593
void ha_blitz::position(const unsigned char *) {
594
int length = sizeof(held_key_len);
595
memcpy(ref, &held_key_len, length);
596
memcpy(ref + length, (unsigned char *)held_key, held_key_len);
599
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
603
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
604
active_index = key_num;
605
sql_command_type = session_sql_command(current_session);
607
/* This is unlikely to happen but just for assurance, re-obtain
608
the lock if this thread already has a certain lock. This makes
609
sure that this thread will get the most appropriate lock for
610
the current statement. */
612
critical_section_exit();
614
critical_section_enter();
618
int ha_blitz::index_first(unsigned char *buf) {
619
char *dict_key, *bt_key, *row;
620
int dict_klen, bt_klen, prefix_len, rlen;
622
bt_key = btree_cursor[active_index].first_key(&bt_klen);
625
return HA_ERR_END_OF_FILE;
627
prefix_len = btree_key_length(bt_key, active_index);
628
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
630
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
632
return HA_ERR_KEY_NOT_FOUND;
635
unpack_row(buf, row, rlen);
636
keep_track_of_key(bt_key, bt_klen);
643
int ha_blitz::index_next(unsigned char *buf) {
644
char *dict_key, *bt_key, *row;
645
int dict_klen, bt_klen, prefix_len, rlen;
647
bt_key = btree_cursor[active_index].next_key(&bt_klen);
649
if (bt_key == NULL) {
650
table->status = STATUS_NOT_FOUND;
651
return HA_ERR_END_OF_FILE;
654
prefix_len = btree_key_length(bt_key, active_index);
655
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
657
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
659
table->status = STATUS_NOT_FOUND;
660
return HA_ERR_KEY_NOT_FOUND;
663
unpack_row(buf, row, rlen);
664
keep_track_of_key(bt_key, bt_klen);
671
int ha_blitz::index_prev(unsigned char *buf) {
672
char *dict_key, *bt_key, *row;
673
int dict_klen, bt_klen, prefix_len, rlen;
675
bt_key = btree_cursor[active_index].prev_key(&bt_klen);
678
return HA_ERR_END_OF_FILE;
680
prefix_len = btree_key_length(bt_key, active_index);
681
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
683
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
685
return HA_ERR_KEY_NOT_FOUND;
688
unpack_row(buf, row, rlen);
689
keep_track_of_key(bt_key, bt_klen);
696
int ha_blitz::index_last(unsigned char *buf) {
697
char *dict_key, *bt_key, *row;
698
int dict_klen, bt_klen, prefix_len, rlen;
700
bt_key = btree_cursor[active_index].final_key(&bt_klen);
703
return HA_ERR_KEY_NOT_FOUND;
705
prefix_len = btree_key_length(bt_key, active_index);
706
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
708
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
710
errkey_id = active_index;
711
return HA_ERR_KEY_NOT_FOUND;
714
unpack_row(buf, row, rlen);
715
keep_track_of_key(bt_key, bt_klen);
722
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
723
uint32_t key_len, enum ha_rkey_function find_flag) {
724
return index_read_idx(buf, active_index, key, key_len, find_flag);
727
/* This is where the read related index logic lives. It is used by both
728
BlitzDB and the Database Kernel (specifically, by the optimizer). */
729
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
730
const unsigned char *key, uint32_t,
731
enum ha_rkey_function search_mode) {
733
/* If the provided key is NULL, we are required to return the first
734
row in the active_index. */
736
return this->index_first(buf);
738
/* Otherwise we search for it. Prepare the key to look up the tree. */
740
char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
742
/* Lookup the tree and get the master key. */
746
unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
747
packed_klen, &unique_klen);
749
if (unique_key == NULL) {
751
return HA_ERR_KEY_NOT_FOUND;
754
/* Got the master key. Prepare it to lookup the data dictionary. */
756
int skip_len = btree_key_length(unique_key, key_num);
757
char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
759
/* Fetch the packed row from the data dictionary. */
761
char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
763
if (fetched_row == NULL) {
766
return HA_ERR_KEY_NOT_FOUND;
769
/* Unpack it into Drizzle's return buffer and keep track of the
770
master key for future use (before index_end() is called). */
771
unpack_row(buf, fetched_row, row_len);
772
keep_track_of_key(unique_key, unique_klen);
779
int ha_blitz::doEndIndexScan(void) {
783
btree_cursor[active_index].moved = false;
786
critical_section_exit();
791
int ha_blitz::enable_indexes(uint32_t) {
792
return HA_ERR_UNSUPPORTED;
795
int ha_blitz::disable_indexes(uint32_t) {
796
return HA_ERR_UNSUPPORTED;
799
/* Find the estimated number of rows between min_key and max_key.
800
Leave the proper implementation of this for now since there are
801
too many exceptions to cover. */
802
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
803
drizzled::key_range * /*min_key*/,
804
drizzled::key_range * /*max_key*/) {
805
return BLITZ_WORST_CASE_RANGE;
808
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
811
ha_statistic_increment(&system_status_var::ha_write_count);
813
/* Prepare Auto Increment field if one exists. */
814
if (table->next_number_field && drizzle_row == table->record[0]) {
815
pthread_mutex_lock(&blitz_utility_mutex);
816
if ((rv = update_auto_increment()) != 0) {
817
pthread_mutex_unlock(&blitz_utility_mutex);
821
uint64_t next_val = table->next_number_field->val_int();
823
if (next_val > share->auto_increment_value) {
824
share->auto_increment_value = next_val;
825
stats.auto_increment_value = share->auto_increment_value + 1;
827
pthread_mutex_unlock(&blitz_utility_mutex);
830
/* Serialize a primary key for this row. If a PK doesn't exist,
831
an internal hidden ID will be generated. We obtain the PK here
832
and pack it to this function's local buffer instead of the
833
thread's own 'key_buffer' because the PK value needs to be
834
remembered when writing non-PK keys AND because the 'key_buffer'
835
will be used to generate these non-PK keys. */
836
char temp_pkbuf[BLITZ_MAX_KEY_LEN];
837
size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
839
/* Obtain a buffer that can accommodate this row. We then pack
840
the provided row into it. Note that this code works most
841
efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
842
unsigned char *row_buf = get_pack_buffer(max_row_length());
843
size_t row_len = pack_row(row_buf, drizzle_row);
845
uint32_t curr_key = 0;
846
uint32_t lock_id = 0;
848
if (share->nkeys > 0) {
849
lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
850
share->blitz_lock.slotted_lock(lock_id);
853
/* We isolate this condition outside the key loop to avoid the CPU
854
from going through unnecessary conditional branching on heavy
855
insertion load. TODO: Optimize this block. PK should not need
856
to go through merge_key() since this information is redundant. */
857
if (share->primary_key_exists) {
861
key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
863
rv = share->btrees[curr_key].write_unique(key, klen);
865
if (rv == HA_ERR_FOUND_DUPP_KEY) {
866
errkey_id = curr_key;
867
share->blitz_lock.slotted_unlock(lock_id);
873
/* Loop over the keys and write them to it's exclusive tree. */
874
while (curr_key < share->nkeys) {
876
size_t prefix_len = 0;
879
prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
880
key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
882
if (share->btrees[curr_key].unique) {
883
rv = share->btrees[curr_key].write_unique(key, klen);
885
rv = share->btrees[curr_key].write(key, klen);
889
errkey_id = curr_key;
890
share->blitz_lock.slotted_unlock(lock_id);
897
/* Write the row to the Data Dictionary. */
898
rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
900
if (share->nkeys > 0)
901
share->blitz_lock.slotted_unlock(lock_id);
906
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
907
unsigned char *new_row) {
909
uint32_t lock_id = 0;
911
ha_statistic_increment(&system_status_var::ha_update_count);
914
if (share->nkeys > 0) {
915
/* BlitzDB cannot update an indexed row on table scan. */
917
return HA_ERR_UNSUPPORTED;
919
if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
922
lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
923
share->blitz_lock.slotted_lock(lock_id);
925
/* Update all relevant index entries. Start by deleting the
926
the existing key then write the new key. Something we should
927
consider in the future is to take a diff of the keys and only
928
update changed keys. */
929
int skip = btree_key_length(held_key, active_index);
930
char *suffix = held_key + skip;
931
uint16_t suffix_len = uint2korr(suffix);
933
suffix += sizeof(suffix_len);
935
for (uint32_t i = 0; i < share->nkeys; i++) {
937
size_t prefix_len, klen;
940
prefix_len = make_index_key(key_buffer, i, old_row);
941
key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
943
if (share->btrees[i].delete_key(key, klen) != 0) {
945
share->blitz_lock.slotted_unlock(lock_id);
946
return HA_ERR_KEY_NOT_FOUND;
949
/* Now write the new key. */
950
prefix_len = make_index_key(key_buffer, i, new_row);
952
if (i == table->s->getPrimaryKey()) {
953
key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
954
rv = share->btrees[i].write(key, klen);
956
key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
957
rv = share->btrees[i].write(key, klen);
962
share->blitz_lock.slotted_unlock(lock_id);
968
/* Getting this far means that the index has been successfully
969
updated. We now update the Data Dictionary. This implementation
970
is admittedly far from optimial and will be revisited. */
971
size_t row_len = max_row_length();
972
unsigned char *row_buf = get_pack_buffer(row_len);
973
row_len = pack_row(row_buf, new_row);
975
/* This is a basic case where we can simply overwrite the key. */
977
rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
979
int klen = make_index_key(key_buffer, table->s->getPrimaryKey(), old_row);
981
/* Delete with the old key. */
982
share->dict.delete_row(key_buffer, klen);
984
/* Write with the new key. */
985
klen = make_index_key(key_buffer, table->s->getPrimaryKey(), new_row);
986
rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
989
if (share->nkeys > 0)
990
share->blitz_lock.slotted_unlock(lock_id);
995
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
998
ha_statistic_increment(&system_status_var::ha_delete_count);
1000
char *dict_key = held_key;
1001
int dict_klen = held_key_len;
1002
uint32_t lock_id = 0;
1004
if (share->nkeys > 0) {
1005
lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
1006
share->blitz_lock.slotted_lock(lock_id);
1008
/* Loop over the indexes and delete all relevant entries for
1009
this row. We do this by reproducing the key in BlitzDB's
1010
unique key format. The procedure is simple.
1012
(1): Compute the key value for this index from the row then
1013
pack it into key_buffer (not unique at this point).
1015
(2): Append the suffix of the held_key to the key generated
1016
in step 1. The key is then guaranteed to be unique. */
1017
for (uint32_t i = 0; i < share->nkeys; i++) {
1018
/* In this case, we don't need to search for the key because
1019
TC's cursor is already pointing at the key that we want
1020
to delete. We wouldn't be here otherwise. */
1021
if (i == active_index) {
1022
btree_cursor[active_index].delete_position();
1026
int klen = make_index_key(key_buffer, i, row_to_delete);
1027
int skip_len = btree_key_length(held_key, active_index);
1028
uint16_t suffix_len = uint2korr(held_key + skip_len);
1030
/* Append the suffix to the key */
1031
memcpy(key_buffer + klen, held_key + skip_len,
1032
sizeof(suffix_len) + suffix_len);
1034
/* Update the key length to cover the generated key. */
1035
klen = klen + sizeof(suffix_len) + suffix_len;
1037
if (share->btrees[i].delete_key(key_buffer, klen) != 0)
1038
return HA_ERR_KEY_NOT_FOUND;
1041
/* Skip to the data dictionary key. */
1042
int dict_key_offset = btree_key_length(dict_key, active_index);
1043
dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
1046
rv = share->dict.delete_row(dict_key, dict_klen);
1048
if (share->nkeys > 0)
1049
share->blitz_lock.slotted_unlock(lock_id);
1054
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
1055
uint64_t, uint64_t *first_value,
1056
uint64_t *nb_reserved_values) {
1057
*first_value = share->auto_increment_value + 1;
1058
*nb_reserved_values = UINT64_MAX;
1061
int ha_blitz::reset_auto_increment(uint64_t value) {
1062
share->auto_increment_value = (value == 0) ? 1 : value;
1066
int ha_blitz::delete_all_rows(void) {
1067
for (uint32_t i = 0; i < share->nkeys; i++) {
1068
if (share->btrees[i].delete_all() != 0) {
1070
return HA_ERR_CRASHED_ON_USAGE;
1073
return (share->dict.delete_all_rows()) ? 0 : -1;
1076
uint32_t ha_blitz::max_row_length(void) {
1077
uint32_t length = (table->getRecordLength() + table->sizeFields() * 2);
1078
uint32_t *pos = table->getBlobField();
1079
uint32_t *end = pos + table->sizeBlobFields();
1081
while (pos != end) {
1082
length += 2 + ((Field_blob *)table->getField(*pos))->get_length();
1089
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
1090
if (!share->primary_key_exists) {
1091
uint64_t next_id = share->dict.next_hidden_row_id();
1092
int8store(pack_to, next_id);
1093
return sizeof(next_id);
1096
/* Getting here means that there is a PK in this table. Get the
1097
binary representation of the PK, pack it to BlitzDB's key buffer
1098
and return the size of it. */
1099
return make_index_key(pack_to, table->s->getPrimaryKey(), row);
1102
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
1103
const unsigned char *row) {
1104
KeyInfo *key = &table->key_info[key_num];
1105
KeyPartInfo *key_part = key->key_part;
1106
KeyPartInfo *key_part_end = key_part + key->key_parts;
1108
unsigned char *pos = (unsigned char *)pack_to;
1112
memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
1114
/* Loop through key part(s) and pack them as we go. */
1115
for (; key_part != key_part_end; key_part++) {
1116
if (key_part->null_bit) {
1117
if (row[key_part->null_offset] & key_part->null_bit) {
1124
end = key_part->field->pack(pos, row + key_part->offset);
1129
return ((char *)pos - pack_to);
1132
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
1133
const size_t b_len, size_t *merged_len) {
1135
size_t total = a_len + sizeof(uint16_t) + b_len;
1137
if (total > key_merge_buffer_len) {
1138
key_merge_buffer = (char *)realloc(key_merge_buffer, total);
1140
if (key_merge_buffer == NULL) {
1141
errno = HA_ERR_OUT_OF_MEM;
1144
key_merge_buffer_len = total;
1147
char *pos = key_merge_buffer;
1149
/* Copy the prefix. */
1150
memcpy(pos, a, a_len);
1153
/* Copy the length of b. */
1154
int2store(pos, (uint16_t)b_len);
1155
pos += sizeof(uint16_t);
1157
/* Copy the suffix and we're done. */
1158
memcpy(pos, b, b_len);
1160
*merged_len = total;
1161
return key_merge_buffer;
1164
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
1165
KeyInfo *key_info = &table->key_info[key_num];
1166
KeyPartInfo *key_part = key_info->key_part;
1167
KeyPartInfo *key_part_end = key_part + key_info->key_parts;
1168
char *pos = (char *)key;
1172
for (; key_part != key_part_end; key_part++) {
1173
if (key_part->null_bit) {
1179
if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1180
len = *(uint8_t *)pos;
1181
rv += len + sizeof(uint8_t);
1182
} else if (key_part->type == HA_KEYTYPE_VARTEXT2) {
1183
len = uint2korr(pos);
1184
rv += len + sizeof(uint16_t);
1186
len = key_part->field->key_length();
1196
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
1197
memcpy(held_key_buf, key, klen);
1198
held_key = held_key_buf;
1199
held_key_len = klen;
1202
/* Converts a native Drizzle index key to BlitzDB's format. */
1203
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
1204
const int key_num, int *return_key_len) {
1205
KeyInfo *key = &table->key_info[key_num];
1206
KeyPartInfo *key_part = key->key_part;
1207
KeyPartInfo *key_part_end = key_part + key->key_parts;
1209
unsigned char *key_pos = (unsigned char *)native_key;
1210
unsigned char *keybuf_pos = (unsigned char *)key_buffer;
1215
memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
1217
for (; key_part != key_part_end; key_part++) {
1218
if (key_part->null_bit) {
1221
/* This key is NULL */
1222
if (!(*keybuf_pos++ = (*key_pos++ == 0)))
1226
/* This is a temporary workaround for a bug in Drizzle's VARCHAR
1227
where a 1 byte representable length varchar's actual data is
1228
positioned 2 bytes ahead of the beginning of the buffer. The
1229
correct behavior is to be positioned 1 byte ahead. Furthermore,
1230
this is only applicable with varchar keys on READ. */
1231
if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1232
/* Dereference the 1 byte length of the value. */
1233
uint8_t varlen = *(uint8_t *)key_pos;
1234
*keybuf_pos++ = varlen;
1236
/* Read the value by skipping 2 bytes. This is the workaround. */
1237
memcpy(keybuf_pos, key_pos + sizeof(uint16_t), varlen);
1238
offset = (sizeof(uint8_t) + varlen);
1239
keybuf_pos += varlen;
1241
end = key_part->field->pack(keybuf_pos, key_pos);
1242
offset = end - keybuf_pos;
1243
keybuf_pos += offset;
1247
key_pos += key_part->field->key_length();
1250
*return_key_len = key_size;
1254
size_t ha_blitz::pack_row(unsigned char *row_buffer,
1255
unsigned char *row_to_pack) {
1258
/* Nothing special to do if the table is fixed length */
1259
if (share->fixed_length_table) {
1260
memcpy(row_buffer, row_to_pack, table->s->getRecordLength());
1261
return (size_t)table->s->getRecordLength();
1264
/* Copy NULL bits */
1265
memcpy(row_buffer, row_to_pack, table->s->null_bytes);
1266
pos = row_buffer + table->s->null_bytes;
1268
/* Pack each field into the buffer */
1269
for (Field **field = table->getFields(); *field; field++) {
1270
if (!((*field)->is_null()))
1271
pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
1274
return (size_t)(pos - row_buffer);
1277
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
1278
const size_t from_len) {
1279
const unsigned char *pos;
1281
/* Nothing special to do */
1282
if (share->fixed_length_table) {
1283
memcpy(to, from, from_len);
1287
/* Start by copying NULL bits which is the beginning block
1288
of a Drizzle row. */
1289
pos = (const unsigned char *)from;
1290
memcpy(to, pos, table->s->null_bytes);
1291
pos += table->s->null_bytes;
1293
/* Unpack all fields in the provided row. */
1294
for (Field **field = table->getFields(); *field; field++) {
1295
if (!((*field)->is_null())) {
1296
pos = (*field)->unpack(to + (*field)->offset(table->record[0]), pos);
1303
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
1304
unsigned char *buf = pack_buffer;
1306
/* This is a shitty case where the row size is larger than 2KB. */
1307
if (size > BLITZ_MAX_ROW_STACK) {
1308
if (size > secondary_row_buffer_size) {
1309
void *new_ptr = realloc(secondary_row_buffer, size);
1311
if (new_ptr == NULL) {
1312
errno = HA_ERR_OUT_OF_MEM;
1316
secondary_row_buffer_size = size;
1317
secondary_row_buffer = (unsigned char *)new_ptr;
1318
buf = secondary_row_buffer;
1324
static BlitzEngine *blitz_engine = NULL;
1326
BlitzShare *ha_blitz::get_share(const char *name) {
1327
BlitzShare *share_ptr;
1328
BlitzEngine *bz_engine = (BlitzEngine *)engine;
1329
std::string table_path(name);
1331
pthread_mutex_lock(&blitz_utility_mutex);
1333
/* Look up the table cache to see if the table resource is available */
1334
share_ptr = bz_engine->getTableShare(table_path);
1337
share_ptr->use_count++;
1338
pthread_mutex_unlock(&blitz_utility_mutex);
1342
/* Table wasn't cached so create a new table handler */
1343
share_ptr = new BlitzShare();
1345
/* Prepare the Data Dictionary */
1346
if (share_ptr->dict.startup(table_path.c_str()) != 0) {
1348
pthread_mutex_unlock(&blitz_utility_mutex);
1352
/* Prepare Index Structure(s) */
1353
KeyInfo *curr = &table->s->getKeyInfo(0);
1354
share_ptr->btrees = new BlitzTree[table->s->keys];
1356
for (uint32_t i = 0; i < table->s->keys; i++, curr++) {
1357
share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
1358
share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
1360
if (table->key_info[i].flags & HA_NOSAME)
1361
share_ptr->btrees[i].unique = true;
1363
share_ptr->btrees[i].length = curr->key_length;
1364
share_ptr->btrees[i].nparts = curr->key_parts;
1366
/* Record Meta Data of the Key Segments */
1367
for (uint32_t j = 0; j < curr->key_parts; j++) {
1368
Field *f = curr->key_part[j].field;
1371
share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
1372
share_ptr->btrees[i].parts[j].null_pos
1373
= (uint32_t)(f->null_ptr - (unsigned char *)table->record[0]);
1376
share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
1378
if (f->type() == DRIZZLE_TYPE_BLOB) {
1379
share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
1382
share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
1383
share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
1384
share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
1389
share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
1390
share_ptr->table_name = table_path;
1391
share_ptr->nkeys = table->s->keys;
1392
share_ptr->use_count = 1;
1394
share_ptr->fixed_length_table = !(table->s->db_create_options
1395
& HA_OPTION_PACK_RECORD);
1397
if (table->s->getPrimaryKey() >= MAX_KEY)
1398
share_ptr->primary_key_exists = false;
1400
share_ptr->primary_key_exists = true;
1402
/* Done creating the share object. Cache it for later
1403
use by another cursor object.*/
1404
bz_engine->cacheTableShare(table_path, share_ptr);
1406
pthread_mutex_unlock(&blitz_utility_mutex);
1410
int ha_blitz::free_share(void) {
1411
pthread_mutex_lock(&blitz_utility_mutex);
1413
/* BlitzShare could still be used by another thread. Check the
1414
reference counter to see if it's safe to free it */
1415
if (--share->use_count == 0) {
1416
share->dict.write_meta_autoinc(share->auto_increment_value);
1418
if (share->dict.shutdown() != 0) {
1419
pthread_mutex_unlock(&blitz_utility_mutex);
1420
return HA_ERR_CRASHED_ON_USAGE;
1423
for (uint32_t i = 0; i < share->nkeys; i++) {
1424
delete[] share->btrees[i].parts;
1425
share->btrees[i].close();
1428
BlitzEngine *bz_engine = (BlitzEngine *)engine;
1429
bz_engine->deleteTableShare(share->table_name);
1431
delete[] share->btrees;
1435
pthread_mutex_unlock(&blitz_utility_mutex);
1439
static int blitz_init(drizzled::module::Context &context) {
1440
blitz_engine = new BlitzEngine("BLITZDB");
1442
if (!blitz_engine->doCreateTableCache()) {
1443
delete blitz_engine;
1444
return HA_ERR_OUT_OF_MEM;
1447
pthread_mutex_init(&blitz_utility_mutex, NULL);
1448
context.add(blitz_engine);
1452
/* Read the prototype of this function for details. */
1453
static char *skip_btree_key(const char *key, const size_t skip_len,
1455
char *pos = (char *)key;
1456
*return_klen = uint2korr(pos + skip_len);
1457
return pos + skip_len + sizeof(uint16_t);
1460
DRIZZLE_PLUGIN(blitz_init, NULL);