1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 - 2010 Toru Maesaka
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
#include <drizzled/plugin/storage_engine.h>
26
using namespace drizzled;
27
namespace po= boost::program_options;
29
static pthread_mutex_t blitz_utility_mutex;
31
static const char *ha_blitz_exts[] = {
38
/* Global Variables for Startup Options */
39
uint64_t blitz_estimated_rows;
41
class BlitzEngine : public drizzled::plugin::StorageEngine {
43
TCMAP *blitz_table_cache;
46
BlitzEngine(const std::string &name_arg) :
47
drizzled::plugin::StorageEngine(name_arg,
48
drizzled::HTON_NULL_IN_KEY |
49
drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
50
drizzled::HTON_STATS_RECORDS_IS_EXACT |
51
drizzled::HTON_SKIP_STORE_LOCK) {
52
table_definition_ext = BLITZ_SYSTEM_EXT;
55
virtual ~BlitzEngine() {
56
pthread_mutex_destroy(&blitz_utility_mutex);
57
tcmapdel(blitz_table_cache);
60
virtual drizzled::Cursor *create(drizzled::Table &table) {
61
return new ha_blitz(*this, table);
64
const char **bas_ext() const {
68
int doCreateTable(drizzled::Session &session,
69
drizzled::Table &table_arg,
70
const drizzled::identifier::Table &identifier,
71
drizzled::message::Table &table_proto);
73
int doRenameTable(drizzled::Session &session,
74
const drizzled::identifier::Table &from_identifier,
75
const drizzled::identifier::Table &to_identifier);
77
int doDropTable(drizzled::Session &session,
78
const drizzled::identifier::Table &identifier);
80
int doGetTableDefinition(drizzled::Session &session,
81
const drizzled::identifier::Table &identifier,
82
drizzled::message::Table &table_proto);
84
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
85
const drizzled::identifier::Schema &schema_identifier,
86
drizzled::identifier::Table::vector &set_of_identifiers);
88
bool doDoesTableExist(drizzled::Session &session,
89
const drizzled::identifier::Table &identifier);
91
bool validateCreateTableOption(const std::string &key,
92
const std::string &state);
94
bool doCreateTableCache(void);
96
BlitzShare *getTableShare(const std::string &name);
97
void cacheTableShare(const std::string &name, BlitzShare *share);
98
void deleteTableShare(const std::string &name);
100
uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
101
uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
102
uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
104
uint32_t index_flags(enum drizzled::ha_key_alg) const {
105
return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
106
HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
110
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
111
a key to that row in the data dictionary. Two keys are merged and
112
stored as a key because we want to avoid reading the leaf node and
113
thus save disk IO and some computation in the tree. Note that the
114
comparison function of BlitzDB's btree only takes into accound the
115
actual index key. See blitzcmp.cc for details.
117
With the above in mind, this helper function returns a pointer to
118
the dictionary key by calculating the offset. */
119
static char *skip_btree_key(const char *key, const size_t skip_len,
122
static bool str_is_numeric(const std::string &str);
124
int BlitzEngine::doCreateTable(drizzled::Session &,
125
drizzled::Table &table,
126
const drizzled::identifier::Table &identifier,
127
drizzled::message::Table &proto) {
132
/* Temporary fix for blocking composite keys. We need to add this
133
check because version 1 doesn't handle composite indexes. */
134
for (uint32_t i = 0; i < table.getShare()->keys; i++) {
135
if (table.key_info[i].key_parts > 1)
136
return HA_ERR_UNSUPPORTED;
139
/* Create relevant files for a new table and close them immediately.
140
All we want to do here is somewhat like UNIX touch(1). */
141
if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
144
if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
147
/* Create b+tree index(es) for this table. */
148
for (uint32_t i = 0; i < table.getShare()->keys; i++) {
149
if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
153
/* Write the table definition to system table. */
154
if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
157
if (!dict.write_table_definition(proto)) {
158
dict.close_system_table();
159
return HA_ERR_CRASHED_ON_USAGE;
162
dict.close_system_table();
166
int BlitzEngine::doRenameTable(drizzled::Session &,
167
const drizzled::identifier::Table &from,
168
const drizzled::identifier::Table &to) {
171
BlitzData blitz_table;
176
/* Write the table definition to system table. */
177
if ((ecode = dict.open_system_table(from.getPath(), HDBOWRITER)) != 0)
180
drizzled::message::Table proto;
182
int proto_string_len;
184
proto_string = dict.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
185
BLITZ_TABLE_PROTO_KEY.length(),
188
if (proto_string == NULL) {
192
if (!proto.ParseFromArray(proto_string, proto_string_len)) {
194
return HA_ERR_CRASHED_ON_USAGE;
199
proto.set_name(to.getTableName());
200
proto.set_schema(to.getSchemaName());
201
proto.set_catalog(to.getCatalogName());
203
if (!dict.write_table_definition(proto)) {
204
dict.close_system_table();
205
return HA_ERR_CRASHED_ON_USAGE;
208
dict.close_system_table();
210
/* Find out the number of indexes in this table. This information
211
is required because BlitzDB creates a file for each indexes.*/
212
if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
213
return HA_ERR_CRASHED_ON_USAGE;
215
nkeys = blitz_table.read_meta_keycount();
217
if (blitz_table.close_data_table() != 0)
218
return HA_ERR_CRASHED_ON_USAGE;
220
/* We're now ready to rename the file(s) for this table. Start by
221
attempting to rename the data and system files. */
222
if (rename_file_ext(from.getPath().c_str(),
223
to.getPath().c_str(), BLITZ_DATA_EXT)) {
224
if ((rv = errno) != ENOENT)
228
if (rename_file_ext(from.getPath().c_str(),
229
to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
230
if ((rv = errno) != ENOENT)
234
/* So far so good. Rename the index file(s) and we're done. */
237
for (uint32_t i = 0; i < nkeys; i++) {
238
if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
239
return HA_ERR_CRASHED_ON_USAGE;
245
int BlitzEngine::doDropTable(drizzled::Session &,
246
const drizzled::identifier::Table &identifier) {
253
/* We open the dictionary to extract meta data from it */
254
if ((err = dict.open_data_table(identifier.getPath().c_str(),
259
nkeys = dict.read_meta_keycount();
261
/* We no longer need the dictionary to be open */
262
dict.close_data_table();
264
/* Drop the Data Dictionary */
265
snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
266
if ((err = unlink(buf)) == -1) {
270
/* Drop the System Table */
271
snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
272
if ((err = unlink(buf)) == -1) {
276
/* Drop Index file(s) */
277
for (uint32_t i = 0; i < nkeys; i++) {
278
if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
286
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
287
const drizzled::identifier::Table &identifier,
288
drizzled::message::Table &proto) {
289
struct stat stat_info;
290
std::string path(identifier.getPath());
292
path.append(BLITZ_SYSTEM_EXT);
294
if (stat(path.c_str(), &stat_info)) {
300
int proto_string_len;
302
if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
303
return HA_ERR_CRASHED_ON_USAGE;
306
proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
307
BLITZ_TABLE_PROTO_KEY.length(),
310
if (db.close_system_table() != 0) {
311
return HA_ERR_CRASHED_ON_USAGE;
314
if (proto_string == NULL) {
318
if (!proto.ParseFromArray(proto_string, proto_string_len)) {
320
return HA_ERR_CRASHED_ON_USAGE;
328
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
329
const drizzled::identifier::Schema &schema_id,
330
drizzled::identifier::Table::vector &ids) {
331
drizzled::CachedDirectory::Entries entries = directory.getEntries();
333
for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
334
entry_iter != entries.end(); ++entry_iter) {
336
drizzled::CachedDirectory::Entry *entry = *entry_iter;
337
const std::string *filename = &entry->filename;
339
assert(filename->size());
341
const char *ext = strchr(filename->c_str(), '.');
343
if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
344
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
346
char uname[NAME_LEN + 1];
347
uint32_t file_name_len;
349
file_name_len = identifier::Table::filename_to_tablename(filename->c_str(),
353
uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
354
ids.push_back(identifier::Table(schema_id, uname));
359
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
360
const drizzled::identifier::Table &identifier) {
361
std::string proto_path(identifier.getPath());
362
proto_path.append(BLITZ_DATA_EXT);
364
return (access(proto_path.c_str(), F_OK)) ? false : true;
367
bool BlitzEngine::validateCreateTableOption(const std::string &key,
368
const std::string &state) {
369
if (key == "ESTIMATED_ROWS" || key == "estimated_rows") {
370
if (str_is_numeric(state))
376
bool BlitzEngine::doCreateTableCache(void) {
377
return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
380
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
383
BlitzShare *rv = NULL;
385
fetched = tcmapget(blitz_table_cache, table_name.c_str(),
386
table_name.length(), &vlen);
388
/* dereference the object */
390
rv = *(BlitzShare **)fetched;
395
void BlitzEngine::cacheTableShare(const std::string &table_name,
397
/* Cache the memory address of the share object */
398
tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
399
&share, sizeof(share));
402
void BlitzEngine::deleteTableShare(const std::string &table_name) {
403
tcmapout2(blitz_table_cache, table_name.c_str());
406
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
407
Table &table_arg) : Cursor(engine_arg, table_arg),
411
thread_locked(false),
419
int ha_blitz::open(const char *table_name, int, uint32_t) {
420
if ((share = get_share(table_name)) == NULL)
421
return HA_ERR_CRASHED_ON_USAGE;
423
pthread_mutex_lock(&blitz_utility_mutex);
425
btree_cursor = new BlitzCursor[share->nkeys];
427
for (uint32_t i = 0; i < share->nkeys; i++) {
428
if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
430
pthread_mutex_unlock(&blitz_utility_mutex);
431
return HA_ERR_OUT_OF_MEM;
435
if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
437
pthread_mutex_unlock(&blitz_utility_mutex);
438
return HA_ERR_OUT_OF_MEM;
441
if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
443
pthread_mutex_unlock(&blitz_utility_mutex);
444
return HA_ERR_OUT_OF_MEM;
447
if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
450
free(key_merge_buffer);
451
pthread_mutex_unlock(&blitz_utility_mutex);
452
return HA_ERR_OUT_OF_MEM;
455
secondary_row_buffer = NULL;
456
secondary_row_buffer_size = 0;
457
key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
459
/* 'ref_length' determines the size of the buffer that the kernel
460
will use to uniquely identify a row. The actual allocation is
461
done by the kernel so all we do here is specify the size of it.*/
462
if (share->primary_key_exists) {
463
ref_length = getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
465
ref_length = sizeof(held_key_len) + sizeof(uint64_t);
468
pthread_mutex_unlock(&blitz_utility_mutex);
472
int ha_blitz::close(void) {
473
for (uint32_t i = 0; i < share->nkeys; i++) {
474
share->btrees[i].destroy_cursor(&btree_cursor[i]);
476
delete [] btree_cursor;
479
free(key_merge_buffer);
481
free(secondary_row_buffer);
485
int ha_blitz::info(uint32_t flag) {
486
if (flag & HA_STATUS_VARIABLE) {
487
stats.records = share->dict.nrecords();
488
stats.data_file_length = share->dict.table_size();
491
if (flag & HA_STATUS_AUTO)
492
stats.auto_increment_value = share->auto_increment_value + 1;
494
if (flag & HA_STATUS_ERRKEY)
500
int ha_blitz::doStartTableScan(bool scan) {
501
/* Obtain the query type for this scan */
502
sql_command_type = getTable()->getSession()->getSqlCommand();
506
/* Obtain the most suitable lock for the given statement type. */
507
blitz_optimal_lock();
509
/* Get the first record from TCHDB. Let the scanner take
510
care of checking return value errors. */
512
current_key = share->dict.next_key_and_row(NULL, 0,
520
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
522
const char *next_row;
529
if (current_key == NULL) {
530
getTable()->status = STATUS_NOT_FOUND;
531
return HA_ERR_END_OF_FILE;
534
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
536
/* Unpack and copy the current row to Drizzle's result buffer. */
537
unpack_row(drizzle_buf, current_row, current_row_len);
539
/* Retrieve both key and row of the next record with one allocation. */
540
next_key = share->dict.next_key_and_row(current_key, current_key_len,
541
&next_key_len, &next_row,
544
/* Memory region for "current_row" will be freed as "held key" on
545
the next iteration. This is because "current_key" points to the
546
region of memory that contains "current_row" and "held_key" points
547
to it. If there isn't another iteration then it is freed in doEndTableScan(). */
548
current_row = next_row;
549
current_row_len = next_row_len;
551
/* Remember the current row because delete, update or replace
552
function could be called after this function. This pointer is
553
also used to free the previous key and row, which resides on
555
held_key = current_key;
556
held_key_len = current_key_len;
558
/* It is now memory-leak-safe to point current_key to next_key. */
559
current_key = next_key;
560
current_key_len = next_key_len;
561
getTable()->status = 0;
565
int ha_blitz::doEndTableScan() {
566
if (table_scan && current_key)
568
if (table_scan && held_key)
579
blitz_optimal_unlock();
584
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
587
int key_len, row_len;
589
memcpy(&key_len, pos, sizeof(key_len));
590
key = (char *)(pos + sizeof(key_len));
592
/* TODO: Find a better error type. */
594
return HA_ERR_KEY_NOT_FOUND;
596
row = share->dict.get_row(key, key_len, &row_len);
599
return HA_ERR_KEY_NOT_FOUND;
601
unpack_row(copy_to, row, row_len);
603
/* Remember the key location on memory if the thread is not doing
604
a table scan. This is because either update_row() or delete_row()
605
might be called after this function. */
608
held_key_len = key_len;
615
void ha_blitz::position(const unsigned char *) {
616
int length = sizeof(held_key_len);
617
memcpy(ref, &held_key_len, length);
618
memcpy(ref + length, (unsigned char *)held_key, held_key_len);
621
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
625
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
626
active_index = key_num;
627
sql_command_type = getTable()->getSession()->getSqlCommand();
629
/* This is unlikely to happen but just for assurance, re-obtain
630
the lock if this thread already has a certain lock. This makes
631
sure that this thread will get the most appropriate lock for
632
the current statement. */
634
blitz_optimal_unlock();
636
blitz_optimal_lock();
640
int ha_blitz::index_first(unsigned char *buf) {
641
char *dict_key, *bt_key, *row;
642
int dict_klen, bt_klen, prefix_len, rlen;
644
bt_key = btree_cursor[active_index].first_key(&bt_klen);
647
return HA_ERR_END_OF_FILE;
649
prefix_len = btree_key_length(bt_key, active_index);
650
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
652
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
654
return HA_ERR_KEY_NOT_FOUND;
657
unpack_row(buf, row, rlen);
658
keep_track_of_key(bt_key, bt_klen);
665
int ha_blitz::index_next(unsigned char *buf) {
666
char *dict_key, *bt_key, *row;
667
int dict_klen, bt_klen, prefix_len, rlen;
669
bt_key = btree_cursor[active_index].next_key(&bt_klen);
671
if (bt_key == NULL) {
672
getTable()->status = STATUS_NOT_FOUND;
673
return HA_ERR_END_OF_FILE;
676
prefix_len = btree_key_length(bt_key, active_index);
677
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
679
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
681
getTable()->status = STATUS_NOT_FOUND;
682
return HA_ERR_KEY_NOT_FOUND;
685
unpack_row(buf, row, rlen);
686
keep_track_of_key(bt_key, bt_klen);
693
int ha_blitz::index_prev(unsigned char *buf) {
694
char *dict_key, *bt_key, *row;
695
int dict_klen, bt_klen, prefix_len, rlen;
697
bt_key = btree_cursor[active_index].prev_key(&bt_klen);
700
return HA_ERR_END_OF_FILE;
702
prefix_len = btree_key_length(bt_key, active_index);
703
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
705
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
707
return HA_ERR_KEY_NOT_FOUND;
710
unpack_row(buf, row, rlen);
711
keep_track_of_key(bt_key, bt_klen);
718
int ha_blitz::index_last(unsigned char *buf) {
719
char *dict_key, *bt_key, *row;
720
int dict_klen, bt_klen, prefix_len, rlen;
722
bt_key = btree_cursor[active_index].final_key(&bt_klen);
725
return HA_ERR_KEY_NOT_FOUND;
727
prefix_len = btree_key_length(bt_key, active_index);
728
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
730
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
732
errkey_id = active_index;
733
return HA_ERR_KEY_NOT_FOUND;
736
unpack_row(buf, row, rlen);
737
keep_track_of_key(bt_key, bt_klen);
744
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
745
uint32_t key_len, enum ha_rkey_function find_flag) {
746
return index_read_idx(buf, active_index, key, key_len, find_flag);
749
/* This is where the read related index logic lives. It is used by both
750
BlitzDB and the Database Kernel (specifically, by the optimizer). */
751
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
752
const unsigned char *key, uint32_t,
753
enum ha_rkey_function search_mode) {
755
/* If the provided key is NULL, we are required to return the first
756
row in the active_index. */
758
return this->index_first(buf);
760
/* Otherwise we search for it. Prepare the key to look up the tree. */
762
char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
764
/* Lookup the tree and get the master key. */
768
unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
769
packed_klen, &unique_klen);
771
if (unique_key == NULL) {
773
return HA_ERR_KEY_NOT_FOUND;
776
/* Got the master key. Prepare it to lookup the data dictionary. */
778
int skip_len = btree_key_length(unique_key, key_num);
779
char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
781
/* Fetch the packed row from the data dictionary. */
783
char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
785
if (fetched_row == NULL) {
788
return HA_ERR_KEY_NOT_FOUND;
791
/* Unpack it into Drizzle's return buffer and keep track of the
792
master key for future use (before index_end() is called). */
793
unpack_row(buf, fetched_row, row_len);
794
keep_track_of_key(unique_key, unique_klen);
801
int ha_blitz::doEndIndexScan(void) {
805
btree_cursor[active_index].moved = false;
808
blitz_optimal_unlock();
813
int ha_blitz::enable_indexes(uint32_t) {
814
return HA_ERR_UNSUPPORTED;
817
int ha_blitz::disable_indexes(uint32_t) {
818
return HA_ERR_UNSUPPORTED;
821
/* Find the estimated number of rows between min_key and max_key.
822
Leave the proper implementation of this for now since there are
823
too many exceptions to cover. */
824
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
825
drizzled::key_range * /*min_key*/,
826
drizzled::key_range * /*max_key*/) {
827
return BLITZ_WORST_CASE_RANGE;
830
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
833
ha_statistic_increment(&system_status_var::ha_write_count);
835
/* Prepare Auto Increment field if one exists. */
836
if (getTable()->next_number_field && drizzle_row == getTable()->getInsertRecord()) {
837
pthread_mutex_lock(&blitz_utility_mutex);
838
if ((rv = update_auto_increment()) != 0) {
839
pthread_mutex_unlock(&blitz_utility_mutex);
843
uint64_t next_val = getTable()->next_number_field->val_int();
845
if (next_val > share->auto_increment_value) {
846
share->auto_increment_value = next_val;
847
stats.auto_increment_value = share->auto_increment_value + 1;
849
pthread_mutex_unlock(&blitz_utility_mutex);
852
/* Serialize a primary key for this row. If a PK doesn't exist,
853
an internal hidden ID will be generated. We obtain the PK here
854
and pack it to this function's local buffer instead of the
855
thread's own 'key_buffer' because the PK value needs to be
856
remembered when writing non-PK keys AND because the 'key_buffer'
857
will be used to generate these non-PK keys. */
858
char temp_pkbuf[BLITZ_MAX_KEY_LEN];
859
size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
861
/* Obtain a buffer that can accommodate this row. We then pack
862
the provided row into it. Note that this code works most
863
efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
864
unsigned char *row_buf = get_pack_buffer(max_row_length());
865
size_t row_len = pack_row(row_buf, drizzle_row);
867
uint32_t curr_key = 0;
868
uint32_t lock_id = 0;
870
if (share->nkeys > 0) {
871
lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
872
share->blitz_lock.slotted_lock(lock_id);
875
/* We isolate this condition outside the key loop to avoid the CPU
876
from going through unnecessary conditional branching on heavy
877
insertion load. TODO: Optimize this block. PK should not need
878
to go through merge_key() since this information is redundant. */
879
if (share->primary_key_exists) {
883
key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
885
rv = share->btrees[curr_key].write_unique(key, klen);
887
if (rv == HA_ERR_FOUND_DUPP_KEY) {
888
errkey_id = curr_key;
889
share->blitz_lock.slotted_unlock(lock_id);
895
/* Loop over the keys and write them to it's exclusive tree. */
896
while (curr_key < share->nkeys) {
898
size_t prefix_len = 0;
901
prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
902
key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
904
if (share->btrees[curr_key].unique) {
905
rv = share->btrees[curr_key].write_unique(key, klen);
907
rv = share->btrees[curr_key].write(key, klen);
911
errkey_id = curr_key;
912
share->blitz_lock.slotted_unlock(lock_id);
919
/* Write the row to the Data Dictionary. */
920
rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
922
if (share->nkeys > 0)
923
share->blitz_lock.slotted_unlock(lock_id);
928
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
929
unsigned char *new_row) {
931
uint32_t lock_id = 0;
933
ha_statistic_increment(&system_status_var::ha_update_count);
936
if (share->nkeys > 0) {
937
/* BlitzDB cannot update an indexed row on table scan. */
939
return HA_ERR_UNSUPPORTED;
941
if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
944
lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
945
share->blitz_lock.slotted_lock(lock_id);
947
/* Update all relevant index entries. Start by deleting the
948
the existing key then write the new key. Something we should
949
consider in the future is to take a diff of the keys and only
950
update changed keys. */
951
int skip = btree_key_length(held_key, active_index);
952
char *suffix = held_key + skip;
953
uint16_t suffix_len = uint2korr(suffix);
955
suffix += sizeof(suffix_len);
957
for (uint32_t i = 0; i < share->nkeys; i++) {
959
size_t prefix_len, klen;
962
prefix_len = make_index_key(key_buffer, i, old_row);
963
key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
965
if (share->btrees[i].delete_key(key, klen) != 0) {
967
share->blitz_lock.slotted_unlock(lock_id);
968
return HA_ERR_KEY_NOT_FOUND;
971
/* Now write the new key. */
972
prefix_len = make_index_key(key_buffer, i, new_row);
974
if (i == getTable()->getShare()->getPrimaryKey()) {
975
key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
976
rv = share->btrees[i].write(key, klen);
978
key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
979
rv = share->btrees[i].write(key, klen);
984
share->blitz_lock.slotted_unlock(lock_id);
990
/* Getting this far means that the index has been successfully
991
updated. We now update the Data Dictionary. This implementation
992
is admittedly far from optimial and will be revisited. */
993
size_t row_len = max_row_length();
994
unsigned char *row_buf = get_pack_buffer(row_len);
995
row_len = pack_row(row_buf, new_row);
997
/* This is a basic case where we can simply overwrite the key. */
999
rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
1001
int klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), old_row);
1003
/* Delete with the old key. */
1004
share->dict.delete_row(key_buffer, klen);
1006
/* Write with the new key. */
1007
klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), new_row);
1008
rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
1011
if (share->nkeys > 0)
1012
share->blitz_lock.slotted_unlock(lock_id);
1017
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
1020
ha_statistic_increment(&system_status_var::ha_delete_count);
1022
char *dict_key = held_key;
1023
int dict_klen = held_key_len;
1024
uint32_t lock_id = 0;
1026
if (share->nkeys > 0) {
1027
lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
1028
share->blitz_lock.slotted_lock(lock_id);
1030
/* Loop over the indexes and delete all relevant entries for
1031
this row. We do this by reproducing the key in BlitzDB's
1032
unique key format. The procedure is simple.
1034
(1): Compute the key value for this index from the row then
1035
pack it into key_buffer (not unique at this point).
1037
(2): Append the suffix of the held_key to the key generated
1038
in step 1. The key is then guaranteed to be unique. */
1039
for (uint32_t i = 0; i < share->nkeys; i++) {
1040
/* In this case, we don't need to search for the key because
1041
TC's cursor is already pointing at the key that we want
1042
to delete. We wouldn't be here otherwise. */
1043
if (i == active_index) {
1044
btree_cursor[active_index].delete_position();
1048
int klen = make_index_key(key_buffer, i, row_to_delete);
1049
int skip_len = btree_key_length(held_key, active_index);
1050
uint16_t suffix_len = uint2korr(held_key + skip_len);
1052
/* Append the suffix to the key */
1053
memcpy(key_buffer + klen, held_key + skip_len,
1054
sizeof(suffix_len) + suffix_len);
1056
/* Update the key length to cover the generated key. */
1057
klen = klen + sizeof(suffix_len) + suffix_len;
1059
if (share->btrees[i].delete_key(key_buffer, klen) != 0)
1060
return HA_ERR_KEY_NOT_FOUND;
1063
/* Skip to the data dictionary key. */
1064
int dict_key_offset = btree_key_length(dict_key, active_index);
1065
dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
1068
rv = share->dict.delete_row(dict_key, dict_klen);
1070
if (share->nkeys > 0)
1071
share->blitz_lock.slotted_unlock(lock_id);
1076
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
1077
uint64_t, uint64_t *first_value,
1078
uint64_t *nb_reserved_values) {
1079
*first_value = share->auto_increment_value + 1;
1080
*nb_reserved_values = UINT64_MAX;
1083
int ha_blitz::reset_auto_increment(uint64_t value) {
1084
share->auto_increment_value = (value == 0) ? 1 : value;
1088
int ha_blitz::delete_all_rows(void) {
1089
for (uint32_t i = 0; i < share->nkeys; i++) {
1090
if (share->btrees[i].delete_all() != 0) {
1092
return HA_ERR_CRASHED_ON_USAGE;
1095
return (share->dict.delete_all_rows()) ? 0 : -1;
1098
uint32_t ha_blitz::max_row_length(void) {
1099
uint32_t length = (getTable()->getRecordLength() + getTable()->sizeFields() * 2);
1100
uint32_t *pos = getTable()->getBlobField();
1101
uint32_t *end = pos + getTable()->sizeBlobFields();
1103
while (pos != end) {
1104
length += 2 + ((Field_blob *)getTable()->getField(*pos))->get_length();
1111
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
1112
if (!share->primary_key_exists) {
1113
uint64_t next_id = share->dict.next_hidden_row_id();
1114
int8store(pack_to, next_id);
1115
return sizeof(next_id);
1118
/* Getting here means that there is a PK in this table. Get the
1119
binary representation of the PK, pack it to BlitzDB's key buffer
1120
and return the size of it. */
1121
return make_index_key(pack_to, getTable()->getShare()->getPrimaryKey(), row);
1124
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
1125
const unsigned char *row) {
1126
KeyInfo *key = &getTable()->key_info[key_num];
1127
KeyPartInfo *key_part = key->key_part;
1128
KeyPartInfo *key_part_end = key_part + key->key_parts;
1130
unsigned char *pos = (unsigned char *)pack_to;
1134
memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
1136
/* Loop through key part(s) and pack them as we go. */
1137
for (; key_part != key_part_end; key_part++) {
1138
if (key_part->null_bit) {
1139
if (row[key_part->null_offset] & key_part->null_bit) {
1146
/* Here we normalize VARTEXT1 to VARTEXT2 for simplicity. */
1147
if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1148
/* Extract the length of the string from the row. */
1149
uint16_t data_len = *(uint8_t *)(row + key_part->offset);
1151
/* Copy the length of the string. Use 2 bytes. */
1152
int2store(pos, data_len);
1153
pos += sizeof(data_len);
1155
/* Copy the string data */
1156
memcpy(pos, row + key_part->offset + sizeof(uint8_t), data_len);
1159
end = key_part->field->pack(pos, row + key_part->offset);
1165
return ((char *)pos - pack_to);
1168
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
1169
const size_t b_len, size_t *merged_len) {
1171
size_t total = a_len + sizeof(uint16_t) + b_len;
1173
if (total > key_merge_buffer_len) {
1174
key_merge_buffer = (char *)realloc(key_merge_buffer, total);
1176
if (key_merge_buffer == NULL) {
1177
errno = HA_ERR_OUT_OF_MEM;
1180
key_merge_buffer_len = total;
1183
char *pos = key_merge_buffer;
1185
/* Copy the prefix. */
1186
memcpy(pos, a, a_len);
1189
/* Copy the length of b. */
1190
int2store(pos, (uint16_t)b_len);
1191
pos += sizeof(uint16_t);
1193
/* Copy the suffix and we're done. */
1194
memcpy(pos, b, b_len);
1196
*merged_len = total;
1197
return key_merge_buffer;
1200
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
1201
KeyInfo *key_info = &getTable()->key_info[key_num];
1202
KeyPartInfo *key_part = key_info->key_part;
1203
KeyPartInfo *key_part_end = key_part + key_info->key_parts;
1204
char *pos = (char *)key;
1208
for (; key_part != key_part_end; key_part++) {
1209
if (key_part->null_bit) {
1216
if (key_part->type == HA_KEYTYPE_VARTEXT1 ||
1217
key_part->type == HA_KEYTYPE_VARTEXT2) {
1218
len = uint2korr(pos);
1219
rv += len + sizeof(uint16_t);
1221
len = key_part->field->key_length();
1231
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
1232
memcpy(held_key_buf, key, klen);
1233
held_key = held_key_buf;
1234
held_key_len = klen;
1237
/* Converts a native Drizzle index key to BlitzDB's format. */
1238
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
1239
const int key_num, int *return_key_len) {
1240
KeyInfo *key = &getTable()->key_info[key_num];
1241
KeyPartInfo *key_part = key->key_part;
1242
KeyPartInfo *key_part_end = key_part + key->key_parts;
1244
unsigned char *key_pos = (unsigned char *)native_key;
1245
unsigned char *keybuf_pos = (unsigned char *)key_buffer;
1250
memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
1252
for (; key_part != key_part_end; key_part++) {
1253
if (key_part->null_bit) {
1256
/* This key is NULL */
1257
if (!(*keybuf_pos++ = (*key_pos++ == 0)))
1261
/* Normalize a VARTEXT1 key to VARTEXT2. */
1262
if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1263
uint16_t str_len = *(uint16_t *)key_pos;
1265
/* Copy the length of the string over to key buffer. */
1266
int2store(keybuf_pos, str_len);
1267
keybuf_pos += sizeof(str_len);
1269
/* Copy the actual value over to the key buffer. */
1270
memcpy(keybuf_pos, key_pos + sizeof(str_len), str_len);
1271
keybuf_pos += str_len;
1273
/* NULL byte + Length of str (2 byte) + Actual String. */
1274
offset = 1 + sizeof(str_len) + str_len;
1276
end = key_part->field->pack(keybuf_pos, key_pos);
1277
offset = end - keybuf_pos;
1278
keybuf_pos += offset;
1282
key_pos += key_part->field->key_length();
1285
*return_key_len = key_size;
1289
size_t ha_blitz::pack_row(unsigned char *row_buffer,
1290
unsigned char *row_to_pack) {
1293
/* Nothing special to do if the table is fixed length */
1294
if (share->fixed_length_table) {
1295
memcpy(row_buffer, row_to_pack, getTable()->getShare()->getRecordLength());
1296
return (size_t)getTable()->getShare()->getRecordLength();
1299
/* Copy NULL bits */
1300
memcpy(row_buffer, row_to_pack, getTable()->getShare()->null_bytes);
1301
pos = row_buffer + getTable()->getShare()->null_bytes;
1303
/* Pack each field into the buffer */
1304
for (Field **field = getTable()->getFields(); *field; field++) {
1305
if (!((*field)->is_null()))
1306
pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
1309
return (size_t)(pos - row_buffer);
1312
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
1313
const size_t from_len) {
1314
const unsigned char *pos;
1316
/* Nothing special to do */
1317
if (share->fixed_length_table) {
1318
memcpy(to, from, from_len);
1322
/* Start by copying NULL bits which is the beginning block
1323
of a Drizzle row. */
1324
pos = (const unsigned char *)from;
1325
memcpy(to, pos, getTable()->getShare()->null_bytes);
1326
pos += getTable()->getShare()->null_bytes;
1328
/* Unpack all fields in the provided row. */
1329
for (Field **field = getTable()->getFields(); *field; field++) {
1330
if (!((*field)->is_null())) {
1331
pos = (*field)->unpack(to + (*field)->offset(getTable()->getInsertRecord()), pos);
1338
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
1339
unsigned char *buf = pack_buffer;
1341
/* This is a shitty case where the row size is larger than 2KB. */
1342
if (size > BLITZ_MAX_ROW_STACK) {
1343
if (size > secondary_row_buffer_size) {
1344
void *new_ptr = realloc(secondary_row_buffer, size);
1346
if (new_ptr == NULL) {
1347
errno = HA_ERR_OUT_OF_MEM;
1351
secondary_row_buffer_size = size;
1352
secondary_row_buffer = (unsigned char *)new_ptr;
1354
buf = secondary_row_buffer;
1359
static BlitzEngine *blitz_engine = NULL;
1361
BlitzShare *ha_blitz::get_share(const char *name) {
1362
BlitzShare *share_ptr;
1363
BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1364
std::string table_path(name);
1366
pthread_mutex_lock(&blitz_utility_mutex);
1368
/* Look up the table cache to see if the table resource is available */
1369
share_ptr = bz_engine->getTableShare(table_path);
1372
share_ptr->use_count++;
1373
pthread_mutex_unlock(&blitz_utility_mutex);
1377
/* Table wasn't cached so create a new table handler */
1378
share_ptr = new BlitzShare();
1380
/* Prepare the Data Dictionary */
1381
if (share_ptr->dict.startup(table_path.c_str()) != 0) {
1383
pthread_mutex_unlock(&blitz_utility_mutex);
1387
/* Prepare Index Structure(s) */
1388
KeyInfo *curr = &getTable()->getMutableShare()->getKeyInfo(0);
1389
share_ptr->btrees = new BlitzTree[getTable()->getShare()->keys];
1391
for (uint32_t i = 0; i < getTable()->getShare()->keys; i++, curr++) {
1392
share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
1393
share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
1395
if (getTable()->key_info[i].flags & HA_NOSAME)
1396
share_ptr->btrees[i].unique = true;
1398
share_ptr->btrees[i].length = curr->key_length;
1399
share_ptr->btrees[i].nparts = curr->key_parts;
1401
/* Record Meta Data of the Key Segments */
1402
for (uint32_t j = 0; j < curr->key_parts; j++) {
1403
Field *f = curr->key_part[j].field;
1406
share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
1407
share_ptr->btrees[i].parts[j].null_pos
1408
= (uint32_t)(f->null_ptr - (unsigned char *)getTable()->getInsertRecord());
1411
share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
1413
if (f->type() == DRIZZLE_TYPE_BLOB) {
1414
share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
1417
share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
1418
share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
1419
share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
1424
share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
1425
share_ptr->table_name = table_path;
1426
share_ptr->nkeys = getTable()->getShare()->keys;
1427
share_ptr->use_count = 1;
1429
share_ptr->fixed_length_table = !(getTable()->getShare()->db_create_options
1430
& HA_OPTION_PACK_RECORD);
1432
if (getTable()->getShare()->getPrimaryKey() >= MAX_KEY)
1433
share_ptr->primary_key_exists = false;
1435
share_ptr->primary_key_exists = true;
1437
/* Done creating the share object. Cache it for later
1438
use by another cursor object.*/
1439
bz_engine->cacheTableShare(table_path, share_ptr);
1441
pthread_mutex_unlock(&blitz_utility_mutex);
1445
int ha_blitz::free_share(void) {
1446
pthread_mutex_lock(&blitz_utility_mutex);
1448
/* BlitzShare could still be used by another thread. Check the
1449
reference counter to see if it's safe to free it */
1450
if (--share->use_count == 0) {
1451
share->dict.write_meta_autoinc(share->auto_increment_value);
1453
if (share->dict.shutdown() != 0) {
1454
pthread_mutex_unlock(&blitz_utility_mutex);
1455
return HA_ERR_CRASHED_ON_USAGE;
1458
for (uint32_t i = 0; i < share->nkeys; i++) {
1459
delete[] share->btrees[i].parts;
1460
share->btrees[i].close();
1463
BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1464
bz_engine->deleteTableShare(share->table_name);
1466
delete[] share->btrees;
1470
pthread_mutex_unlock(&blitz_utility_mutex);
1474
static int blitz_init(drizzled::module::Context &context) {
1475
blitz_engine = new BlitzEngine("BLITZDB");
1477
if (!blitz_engine->doCreateTableCache()) {
1478
delete blitz_engine;
1479
return HA_ERR_OUT_OF_MEM;
1482
pthread_mutex_init(&blitz_utility_mutex, NULL);
1483
context.add(blitz_engine);
1484
context.registerVariable(new sys_var_uint64_t_ptr("estimated-rows",
1485
&blitz_estimated_rows));
1489
/* Read the prototype of this function for details. */
1490
static char *skip_btree_key(const char *key, const size_t skip_len,
1492
char *pos = (char *)key;
1493
*return_klen = uint2korr(pos + skip_len);
1494
return pos + skip_len + sizeof(uint16_t);
1497
static bool str_is_numeric(const std::string &str) {
1498
for (uint32_t i = 0; i < str.length(); i++) {
1499
if (!std::isdigit(str[i]))
1505
static void blitz_init_options(drizzled::module::option_context &context)
1507
context("estimated-rows",
1508
po::value<uint64_t>(&blitz_estimated_rows)->default_value(0),
1509
N_("Estimated number of rows that a BlitzDB table will store."));
1512
DRIZZLE_PLUGIN(blitz_init, NULL, blitz_init_options);