1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 - 2010 Toru Maesaka
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
using namespace drizzled;
25
namespace po= boost::program_options;
27
static pthread_mutex_t blitz_utility_mutex;
29
static const char *ha_blitz_exts[] = {
36
/* Global Variables for Startup Options */
37
uint64_t blitz_estimated_rows;
39
class BlitzEngine : public drizzled::plugin::StorageEngine {
41
TCMAP *blitz_table_cache;
44
BlitzEngine(const std::string &name_arg) :
45
drizzled::plugin::StorageEngine(name_arg,
46
drizzled::HTON_NULL_IN_KEY |
47
drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
48
drizzled::HTON_STATS_RECORDS_IS_EXACT |
49
drizzled::HTON_SKIP_STORE_LOCK) {
50
table_definition_ext = BLITZ_SYSTEM_EXT;
53
virtual ~BlitzEngine() {
54
pthread_mutex_destroy(&blitz_utility_mutex);
55
tcmapdel(blitz_table_cache);
58
virtual drizzled::Cursor *create(drizzled::Table &table) {
59
return new ha_blitz(*this, table);
62
const char **bas_ext() const {
66
int doCreateTable(drizzled::Session &session,
67
drizzled::Table &table_arg,
68
const drizzled::TableIdentifier &identifier,
69
drizzled::message::Table &table_proto);
71
int doRenameTable(drizzled::Session &session,
72
const drizzled::TableIdentifier &from_identifier,
73
const drizzled::TableIdentifier &to_identifier);
75
int doDropTable(drizzled::Session &session,
76
const drizzled::TableIdentifier &identifier);
78
int doGetTableDefinition(drizzled::Session &session,
79
const drizzled::TableIdentifier &identifier,
80
drizzled::message::Table &table_proto);
82
void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
83
const drizzled::SchemaIdentifier &schema_identifier,
84
drizzled::TableIdentifier::vector &set_of_identifiers);
86
bool doDoesTableExist(drizzled::Session &session,
87
const drizzled::TableIdentifier &identifier);
89
bool validateCreateTableOption(const std::string &key,
90
const std::string &state);
92
bool doCreateTableCache(void);
94
BlitzShare *getTableShare(const std::string &name);
95
void cacheTableShare(const std::string &name, BlitzShare *share);
96
void deleteTableShare(const std::string &name);
98
uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
99
uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
100
uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
102
uint32_t index_flags(enum drizzled::ha_key_alg) const {
103
return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
104
HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
108
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
109
a key to that row in the data dictionary. Two keys are merged and
110
stored as a key because we want to avoid reading the leaf node and
111
thus save disk IO and some computation in the tree. Note that the
112
comparison function of BlitzDB's btree only takes into accound the
113
actual index key. See blitzcmp.cc for details.
115
With the above in mind, this helper function returns a pointer to
116
the dictionary key by calculating the offset. */
117
static char *skip_btree_key(const char *key, const size_t skip_len,
120
static bool str_is_numeric(const std::string &str);
122
int BlitzEngine::doCreateTable(drizzled::Session &,
123
drizzled::Table &table,
124
const drizzled::TableIdentifier &identifier,
125
drizzled::message::Table &proto) {
130
/* Temporary fix for blocking composite keys. We need to add this
131
check because version 1 doesn't handle composite indexes. */
132
for (uint32_t i = 0; i < table.getShare()->keys; i++) {
133
if (table.key_info[i].key_parts > 1)
134
return HA_ERR_UNSUPPORTED;
137
/* Create relevant files for a new table and close them immediately.
138
All we want to do here is somewhat like UNIX touch(1). */
139
if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
142
if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
145
/* Create b+tree index(es) for this table. */
146
for (uint32_t i = 0; i < table.getShare()->keys; i++) {
147
if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
151
/* Write the table definition to system table. */
152
if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
155
if (!dict.write_table_definition(proto)) {
156
dict.close_system_table();
157
return HA_ERR_CRASHED_ON_USAGE;
160
dict.close_system_table();
164
int BlitzEngine::doRenameTable(drizzled::Session &,
165
const drizzled::TableIdentifier &from,
166
const drizzled::TableIdentifier &to) {
169
BlitzData blitz_table;
174
/* Write the table definition to system table. */
175
if ((ecode = dict.open_system_table(from.getPath(), HDBOWRITER)) != 0)
178
drizzled::message::Table proto;
180
int proto_string_len;
182
proto_string = dict.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
183
BLITZ_TABLE_PROTO_KEY.length(),
186
if (proto_string == NULL) {
190
if (!proto.ParseFromArray(proto_string, proto_string_len)) {
192
return HA_ERR_CRASHED_ON_USAGE;
197
proto.set_name(to.getTableName());
198
proto.set_schema(to.getSchemaName());
199
proto.set_catalog(to.getCatalogName());
201
if (!dict.write_table_definition(proto)) {
202
dict.close_system_table();
203
return HA_ERR_CRASHED_ON_USAGE;
206
dict.close_system_table();
208
/* Find out the number of indexes in this table. This information
209
is required because BlitzDB creates a file for each indexes.*/
210
if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
211
return HA_ERR_CRASHED_ON_USAGE;
213
nkeys = blitz_table.read_meta_keycount();
215
if (blitz_table.close_data_table() != 0)
216
return HA_ERR_CRASHED_ON_USAGE;
218
/* We're now ready to rename the file(s) for this table. Start by
219
attempting to rename the data and system files. */
220
if (rename_file_ext(from.getPath().c_str(),
221
to.getPath().c_str(), BLITZ_DATA_EXT)) {
222
if ((rv = errno) != ENOENT)
226
if (rename_file_ext(from.getPath().c_str(),
227
to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
228
if ((rv = errno) != ENOENT)
232
/* So far so good. Rename the index file(s) and we're done. */
235
for (uint32_t i = 0; i < nkeys; i++) {
236
if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
237
return HA_ERR_CRASHED_ON_USAGE;
243
int BlitzEngine::doDropTable(drizzled::Session &,
244
const drizzled::TableIdentifier &identifier) {
251
/* We open the dictionary to extract meta data from it */
252
if ((err = dict.open_data_table(identifier.getPath().c_str(),
257
nkeys = dict.read_meta_keycount();
259
/* We no longer need the dictionary to be open */
260
dict.close_data_table();
262
/* Drop the Data Dictionary */
263
snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
264
if ((err = unlink(buf)) == -1) {
268
/* Drop the System Table */
269
snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
270
if ((err = unlink(buf)) == -1) {
274
/* Drop Index file(s) */
275
for (uint32_t i = 0; i < nkeys; i++) {
276
if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
284
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
285
const drizzled::TableIdentifier &identifier,
286
drizzled::message::Table &proto) {
287
struct stat stat_info;
288
std::string path(identifier.getPath());
290
path.append(BLITZ_SYSTEM_EXT);
292
if (stat(path.c_str(), &stat_info)) {
298
int proto_string_len;
300
if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
301
return HA_ERR_CRASHED_ON_USAGE;
304
proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
305
BLITZ_TABLE_PROTO_KEY.length(),
308
if (db.close_system_table() != 0) {
309
return HA_ERR_CRASHED_ON_USAGE;
312
if (proto_string == NULL) {
316
if (!proto.ParseFromArray(proto_string, proto_string_len)) {
318
return HA_ERR_CRASHED_ON_USAGE;
326
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
327
const drizzled::SchemaIdentifier &schema_id,
328
drizzled::TableIdentifier::vector &ids) {
329
drizzled::CachedDirectory::Entries entries = directory.getEntries();
331
for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
332
entry_iter != entries.end(); ++entry_iter) {
334
drizzled::CachedDirectory::Entry *entry = *entry_iter;
335
const std::string *filename = &entry->filename;
337
assert(filename->size());
339
const char *ext = strchr(filename->c_str(), '.');
341
if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
342
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
344
char uname[NAME_LEN + 1];
345
uint32_t file_name_len;
347
file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
351
uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
352
ids.push_back(TableIdentifier(schema_id, uname));
357
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
358
const drizzled::TableIdentifier &identifier) {
359
std::string proto_path(identifier.getPath());
360
proto_path.append(BLITZ_DATA_EXT);
362
return (access(proto_path.c_str(), F_OK)) ? false : true;
365
bool BlitzEngine::validateCreateTableOption(const std::string &key,
366
const std::string &state) {
367
if (key == "ESTIMATED_ROWS" || key == "estimated_rows") {
368
if (str_is_numeric(state))
374
bool BlitzEngine::doCreateTableCache(void) {
375
return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
378
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
381
BlitzShare *rv = NULL;
383
fetched = tcmapget(blitz_table_cache, table_name.c_str(),
384
table_name.length(), &vlen);
386
/* dereference the object */
388
rv = *(BlitzShare **)fetched;
393
void BlitzEngine::cacheTableShare(const std::string &table_name,
395
/* Cache the memory address of the share object */
396
tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
397
&share, sizeof(share));
400
void BlitzEngine::deleteTableShare(const std::string &table_name) {
401
tcmapout2(blitz_table_cache, table_name.c_str());
404
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
405
Table &table_arg) : Cursor(engine_arg, table_arg),
409
thread_locked(false),
417
int ha_blitz::open(const char *table_name, int, uint32_t) {
418
if ((share = get_share(table_name)) == NULL)
419
return HA_ERR_CRASHED_ON_USAGE;
421
pthread_mutex_lock(&blitz_utility_mutex);
423
btree_cursor = new BlitzCursor[share->nkeys];
425
for (uint32_t i = 0; i < share->nkeys; i++) {
426
if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
428
pthread_mutex_unlock(&blitz_utility_mutex);
429
return HA_ERR_OUT_OF_MEM;
433
if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
435
pthread_mutex_unlock(&blitz_utility_mutex);
436
return HA_ERR_OUT_OF_MEM;
439
if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
441
pthread_mutex_unlock(&blitz_utility_mutex);
442
return HA_ERR_OUT_OF_MEM;
445
if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
448
free(key_merge_buffer);
449
pthread_mutex_unlock(&blitz_utility_mutex);
450
return HA_ERR_OUT_OF_MEM;
453
secondary_row_buffer = NULL;
454
secondary_row_buffer_size = 0;
455
key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
457
/* 'ref_length' determines the size of the buffer that the kernel
458
will use to uniquely identify a row. The actual allocation is
459
done by the kernel so all we do here is specify the size of it.*/
460
if (share->primary_key_exists) {
461
ref_length = getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
463
ref_length = sizeof(held_key_len) + sizeof(uint64_t);
466
pthread_mutex_unlock(&blitz_utility_mutex);
470
int ha_blitz::close(void) {
471
for (uint32_t i = 0; i < share->nkeys; i++) {
472
share->btrees[i].destroy_cursor(&btree_cursor[i]);
474
delete [] btree_cursor;
477
free(key_merge_buffer);
479
free(secondary_row_buffer);
483
int ha_blitz::info(uint32_t flag) {
484
if (flag & HA_STATUS_VARIABLE) {
485
stats.records = share->dict.nrecords();
486
stats.data_file_length = share->dict.table_size();
489
if (flag & HA_STATUS_AUTO)
490
stats.auto_increment_value = share->auto_increment_value + 1;
492
if (flag & HA_STATUS_ERRKEY)
498
int ha_blitz::doStartTableScan(bool scan) {
499
/* Obtain the query type for this scan */
500
sql_command_type = session_sql_command(getTable()->getSession());
504
/* Obtain the most suitable lock for the given statement type. */
505
blitz_optimal_lock();
507
/* Get the first record from TCHDB. Let the scanner take
508
care of checking return value errors. */
510
current_key = share->dict.next_key_and_row(NULL, 0,
518
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
520
const char *next_row;
527
if (current_key == NULL) {
528
getTable()->status = STATUS_NOT_FOUND;
529
return HA_ERR_END_OF_FILE;
532
ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
534
/* Unpack and copy the current row to Drizzle's result buffer. */
535
unpack_row(drizzle_buf, current_row, current_row_len);
537
/* Retrieve both key and row of the next record with one allocation. */
538
next_key = share->dict.next_key_and_row(current_key, current_key_len,
539
&next_key_len, &next_row,
542
/* Memory region for "current_row" will be freed as "held key" on
543
the next iteration. This is because "current_key" points to the
544
region of memory that contains "current_row" and "held_key" points
545
to it. If there isn't another iteration then it is freed in doEndTableScan(). */
546
current_row = next_row;
547
current_row_len = next_row_len;
549
/* Remember the current row because delete, update or replace
550
function could be called after this function. This pointer is
551
also used to free the previous key and row, which resides on
553
held_key = current_key;
554
held_key_len = current_key_len;
556
/* It is now memory-leak-safe to point current_key to next_key. */
557
current_key = next_key;
558
current_key_len = next_key_len;
559
getTable()->status = 0;
563
int ha_blitz::doEndTableScan() {
564
if (table_scan && current_key)
566
if (table_scan && held_key)
577
blitz_optimal_unlock();
582
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
585
int key_len, row_len;
587
memcpy(&key_len, pos, sizeof(key_len));
588
key = (char *)(pos + sizeof(key_len));
590
/* TODO: Find a better error type. */
592
return HA_ERR_KEY_NOT_FOUND;
594
row = share->dict.get_row(key, key_len, &row_len);
597
return HA_ERR_KEY_NOT_FOUND;
599
unpack_row(copy_to, row, row_len);
601
/* Remember the key location on memory if the thread is not doing
602
a table scan. This is because either update_row() or delete_row()
603
might be called after this function. */
606
held_key_len = key_len;
613
void ha_blitz::position(const unsigned char *) {
614
int length = sizeof(held_key_len);
615
memcpy(ref, &held_key_len, length);
616
memcpy(ref + length, (unsigned char *)held_key, held_key_len);
619
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
623
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
624
active_index = key_num;
625
sql_command_type = session_sql_command(getTable()->getSession());
627
/* This is unlikely to happen but just for assurance, re-obtain
628
the lock if this thread already has a certain lock. This makes
629
sure that this thread will get the most appropriate lock for
630
the current statement. */
632
blitz_optimal_unlock();
634
blitz_optimal_lock();
638
int ha_blitz::index_first(unsigned char *buf) {
639
char *dict_key, *bt_key, *row;
640
int dict_klen, bt_klen, prefix_len, rlen;
642
bt_key = btree_cursor[active_index].first_key(&bt_klen);
645
return HA_ERR_END_OF_FILE;
647
prefix_len = btree_key_length(bt_key, active_index);
648
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
650
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
652
return HA_ERR_KEY_NOT_FOUND;
655
unpack_row(buf, row, rlen);
656
keep_track_of_key(bt_key, bt_klen);
663
int ha_blitz::index_next(unsigned char *buf) {
664
char *dict_key, *bt_key, *row;
665
int dict_klen, bt_klen, prefix_len, rlen;
667
bt_key = btree_cursor[active_index].next_key(&bt_klen);
669
if (bt_key == NULL) {
670
getTable()->status = STATUS_NOT_FOUND;
671
return HA_ERR_END_OF_FILE;
674
prefix_len = btree_key_length(bt_key, active_index);
675
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
677
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
679
getTable()->status = STATUS_NOT_FOUND;
680
return HA_ERR_KEY_NOT_FOUND;
683
unpack_row(buf, row, rlen);
684
keep_track_of_key(bt_key, bt_klen);
691
int ha_blitz::index_prev(unsigned char *buf) {
692
char *dict_key, *bt_key, *row;
693
int dict_klen, bt_klen, prefix_len, rlen;
695
bt_key = btree_cursor[active_index].prev_key(&bt_klen);
698
return HA_ERR_END_OF_FILE;
700
prefix_len = btree_key_length(bt_key, active_index);
701
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
703
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
705
return HA_ERR_KEY_NOT_FOUND;
708
unpack_row(buf, row, rlen);
709
keep_track_of_key(bt_key, bt_klen);
716
int ha_blitz::index_last(unsigned char *buf) {
717
char *dict_key, *bt_key, *row;
718
int dict_klen, bt_klen, prefix_len, rlen;
720
bt_key = btree_cursor[active_index].final_key(&bt_klen);
723
return HA_ERR_KEY_NOT_FOUND;
725
prefix_len = btree_key_length(bt_key, active_index);
726
dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
728
if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
730
errkey_id = active_index;
731
return HA_ERR_KEY_NOT_FOUND;
734
unpack_row(buf, row, rlen);
735
keep_track_of_key(bt_key, bt_klen);
742
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
743
uint32_t key_len, enum ha_rkey_function find_flag) {
744
return index_read_idx(buf, active_index, key, key_len, find_flag);
747
/* This is where the read related index logic lives. It is used by both
748
BlitzDB and the Database Kernel (specifically, by the optimizer). */
749
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
750
const unsigned char *key, uint32_t,
751
enum ha_rkey_function search_mode) {
753
/* If the provided key is NULL, we are required to return the first
754
row in the active_index. */
756
return this->index_first(buf);
758
/* Otherwise we search for it. Prepare the key to look up the tree. */
760
char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
762
/* Lookup the tree and get the master key. */
766
unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
767
packed_klen, &unique_klen);
769
if (unique_key == NULL) {
771
return HA_ERR_KEY_NOT_FOUND;
774
/* Got the master key. Prepare it to lookup the data dictionary. */
776
int skip_len = btree_key_length(unique_key, key_num);
777
char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
779
/* Fetch the packed row from the data dictionary. */
781
char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
783
if (fetched_row == NULL) {
786
return HA_ERR_KEY_NOT_FOUND;
789
/* Unpack it into Drizzle's return buffer and keep track of the
790
master key for future use (before index_end() is called). */
791
unpack_row(buf, fetched_row, row_len);
792
keep_track_of_key(unique_key, unique_klen);
799
int ha_blitz::doEndIndexScan(void) {
803
btree_cursor[active_index].moved = false;
806
blitz_optimal_unlock();
811
int ha_blitz::enable_indexes(uint32_t) {
812
return HA_ERR_UNSUPPORTED;
815
int ha_blitz::disable_indexes(uint32_t) {
816
return HA_ERR_UNSUPPORTED;
819
/* Find the estimated number of rows between min_key and max_key.
820
Leave the proper implementation of this for now since there are
821
too many exceptions to cover. */
822
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
823
drizzled::key_range * /*min_key*/,
824
drizzled::key_range * /*max_key*/) {
825
return BLITZ_WORST_CASE_RANGE;
828
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
831
ha_statistic_increment(&system_status_var::ha_write_count);
833
/* Prepare Auto Increment field if one exists. */
834
if (getTable()->next_number_field && drizzle_row == getTable()->getInsertRecord()) {
835
pthread_mutex_lock(&blitz_utility_mutex);
836
if ((rv = update_auto_increment()) != 0) {
837
pthread_mutex_unlock(&blitz_utility_mutex);
841
uint64_t next_val = getTable()->next_number_field->val_int();
843
if (next_val > share->auto_increment_value) {
844
share->auto_increment_value = next_val;
845
stats.auto_increment_value = share->auto_increment_value + 1;
847
pthread_mutex_unlock(&blitz_utility_mutex);
850
/* Serialize a primary key for this row. If a PK doesn't exist,
851
an internal hidden ID will be generated. We obtain the PK here
852
and pack it to this function's local buffer instead of the
853
thread's own 'key_buffer' because the PK value needs to be
854
remembered when writing non-PK keys AND because the 'key_buffer'
855
will be used to generate these non-PK keys. */
856
char temp_pkbuf[BLITZ_MAX_KEY_LEN];
857
size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
859
/* Obtain a buffer that can accommodate this row. We then pack
860
the provided row into it. Note that this code works most
861
efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
862
unsigned char *row_buf = get_pack_buffer(max_row_length());
863
size_t row_len = pack_row(row_buf, drizzle_row);
865
uint32_t curr_key = 0;
866
uint32_t lock_id = 0;
868
if (share->nkeys > 0) {
869
lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
870
share->blitz_lock.slotted_lock(lock_id);
873
/* We isolate this condition outside the key loop to avoid the CPU
874
from going through unnecessary conditional branching on heavy
875
insertion load. TODO: Optimize this block. PK should not need
876
to go through merge_key() since this information is redundant. */
877
if (share->primary_key_exists) {
881
key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
883
rv = share->btrees[curr_key].write_unique(key, klen);
885
if (rv == HA_ERR_FOUND_DUPP_KEY) {
886
errkey_id = curr_key;
887
share->blitz_lock.slotted_unlock(lock_id);
893
/* Loop over the keys and write them to it's exclusive tree. */
894
while (curr_key < share->nkeys) {
896
size_t prefix_len = 0;
899
prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
900
key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
902
if (share->btrees[curr_key].unique) {
903
rv = share->btrees[curr_key].write_unique(key, klen);
905
rv = share->btrees[curr_key].write(key, klen);
909
errkey_id = curr_key;
910
share->blitz_lock.slotted_unlock(lock_id);
917
/* Write the row to the Data Dictionary. */
918
rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
920
if (share->nkeys > 0)
921
share->blitz_lock.slotted_unlock(lock_id);
926
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
927
unsigned char *new_row) {
929
uint32_t lock_id = 0;
931
ha_statistic_increment(&system_status_var::ha_update_count);
934
if (share->nkeys > 0) {
935
/* BlitzDB cannot update an indexed row on table scan. */
937
return HA_ERR_UNSUPPORTED;
939
if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
942
lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
943
share->blitz_lock.slotted_lock(lock_id);
945
/* Update all relevant index entries. Start by deleting the
946
the existing key then write the new key. Something we should
947
consider in the future is to take a diff of the keys and only
948
update changed keys. */
949
int skip = btree_key_length(held_key, active_index);
950
char *suffix = held_key + skip;
951
uint16_t suffix_len = uint2korr(suffix);
953
suffix += sizeof(suffix_len);
955
for (uint32_t i = 0; i < share->nkeys; i++) {
957
size_t prefix_len, klen;
960
prefix_len = make_index_key(key_buffer, i, old_row);
961
key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
963
if (share->btrees[i].delete_key(key, klen) != 0) {
965
share->blitz_lock.slotted_unlock(lock_id);
966
return HA_ERR_KEY_NOT_FOUND;
969
/* Now write the new key. */
970
prefix_len = make_index_key(key_buffer, i, new_row);
972
if (i == getTable()->getShare()->getPrimaryKey()) {
973
key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
974
rv = share->btrees[i].write(key, klen);
976
key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
977
rv = share->btrees[i].write(key, klen);
982
share->blitz_lock.slotted_unlock(lock_id);
988
/* Getting this far means that the index has been successfully
989
updated. We now update the Data Dictionary. This implementation
990
is admittedly far from optimial and will be revisited. */
991
size_t row_len = max_row_length();
992
unsigned char *row_buf = get_pack_buffer(row_len);
993
row_len = pack_row(row_buf, new_row);
995
/* This is a basic case where we can simply overwrite the key. */
997
rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
999
int klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), old_row);
1001
/* Delete with the old key. */
1002
share->dict.delete_row(key_buffer, klen);
1004
/* Write with the new key. */
1005
klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), new_row);
1006
rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
1009
if (share->nkeys > 0)
1010
share->blitz_lock.slotted_unlock(lock_id);
1015
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
1018
ha_statistic_increment(&system_status_var::ha_delete_count);
1020
char *dict_key = held_key;
1021
int dict_klen = held_key_len;
1022
uint32_t lock_id = 0;
1024
if (share->nkeys > 0) {
1025
lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
1026
share->blitz_lock.slotted_lock(lock_id);
1028
/* Loop over the indexes and delete all relevant entries for
1029
this row. We do this by reproducing the key in BlitzDB's
1030
unique key format. The procedure is simple.
1032
(1): Compute the key value for this index from the row then
1033
pack it into key_buffer (not unique at this point).
1035
(2): Append the suffix of the held_key to the key generated
1036
in step 1. The key is then guaranteed to be unique. */
1037
for (uint32_t i = 0; i < share->nkeys; i++) {
1038
/* In this case, we don't need to search for the key because
1039
TC's cursor is already pointing at the key that we want
1040
to delete. We wouldn't be here otherwise. */
1041
if (i == active_index) {
1042
btree_cursor[active_index].delete_position();
1046
int klen = make_index_key(key_buffer, i, row_to_delete);
1047
int skip_len = btree_key_length(held_key, active_index);
1048
uint16_t suffix_len = uint2korr(held_key + skip_len);
1050
/* Append the suffix to the key */
1051
memcpy(key_buffer + klen, held_key + skip_len,
1052
sizeof(suffix_len) + suffix_len);
1054
/* Update the key length to cover the generated key. */
1055
klen = klen + sizeof(suffix_len) + suffix_len;
1057
if (share->btrees[i].delete_key(key_buffer, klen) != 0)
1058
return HA_ERR_KEY_NOT_FOUND;
1061
/* Skip to the data dictionary key. */
1062
int dict_key_offset = btree_key_length(dict_key, active_index);
1063
dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
1066
rv = share->dict.delete_row(dict_key, dict_klen);
1068
if (share->nkeys > 0)
1069
share->blitz_lock.slotted_unlock(lock_id);
1074
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
1075
uint64_t, uint64_t *first_value,
1076
uint64_t *nb_reserved_values) {
1077
*first_value = share->auto_increment_value + 1;
1078
*nb_reserved_values = UINT64_MAX;
1081
int ha_blitz::reset_auto_increment(uint64_t value) {
1082
share->auto_increment_value = (value == 0) ? 1 : value;
1086
int ha_blitz::delete_all_rows(void) {
1087
for (uint32_t i = 0; i < share->nkeys; i++) {
1088
if (share->btrees[i].delete_all() != 0) {
1090
return HA_ERR_CRASHED_ON_USAGE;
1093
return (share->dict.delete_all_rows()) ? 0 : -1;
1096
uint32_t ha_blitz::max_row_length(void) {
1097
uint32_t length = (getTable()->getRecordLength() + getTable()->sizeFields() * 2);
1098
uint32_t *pos = getTable()->getBlobField();
1099
uint32_t *end = pos + getTable()->sizeBlobFields();
1101
while (pos != end) {
1102
length += 2 + ((Field_blob *)getTable()->getField(*pos))->get_length();
1109
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
1110
if (!share->primary_key_exists) {
1111
uint64_t next_id = share->dict.next_hidden_row_id();
1112
int8store(pack_to, next_id);
1113
return sizeof(next_id);
1116
/* Getting here means that there is a PK in this table. Get the
1117
binary representation of the PK, pack it to BlitzDB's key buffer
1118
and return the size of it. */
1119
return make_index_key(pack_to, getTable()->getShare()->getPrimaryKey(), row);
1122
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
1123
const unsigned char *row) {
1124
KeyInfo *key = &getTable()->key_info[key_num];
1125
KeyPartInfo *key_part = key->key_part;
1126
KeyPartInfo *key_part_end = key_part + key->key_parts;
1128
unsigned char *pos = (unsigned char *)pack_to;
1132
memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
1134
/* Loop through key part(s) and pack them as we go. */
1135
for (; key_part != key_part_end; key_part++) {
1136
if (key_part->null_bit) {
1137
if (row[key_part->null_offset] & key_part->null_bit) {
1144
/* Here we normalize VARTEXT1 to VARTEXT2 for simplicity. */
1145
if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1146
/* Extract the length of the string from the row. */
1147
uint16_t data_len = *(uint8_t *)(row + key_part->offset);
1149
/* Copy the length of the string. Use 2 bytes. */
1150
int2store(pos, data_len);
1151
pos += sizeof(data_len);
1153
/* Copy the string data */
1154
memcpy(pos, row + key_part->offset + sizeof(uint8_t), data_len);
1157
end = key_part->field->pack(pos, row + key_part->offset);
1163
return ((char *)pos - pack_to);
1166
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
1167
const size_t b_len, size_t *merged_len) {
1169
size_t total = a_len + sizeof(uint16_t) + b_len;
1171
if (total > key_merge_buffer_len) {
1172
key_merge_buffer = (char *)realloc(key_merge_buffer, total);
1174
if (key_merge_buffer == NULL) {
1175
errno = HA_ERR_OUT_OF_MEM;
1178
key_merge_buffer_len = total;
1181
char *pos = key_merge_buffer;
1183
/* Copy the prefix. */
1184
memcpy(pos, a, a_len);
1187
/* Copy the length of b. */
1188
int2store(pos, (uint16_t)b_len);
1189
pos += sizeof(uint16_t);
1191
/* Copy the suffix and we're done. */
1192
memcpy(pos, b, b_len);
1194
*merged_len = total;
1195
return key_merge_buffer;
1198
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
1199
KeyInfo *key_info = &getTable()->key_info[key_num];
1200
KeyPartInfo *key_part = key_info->key_part;
1201
KeyPartInfo *key_part_end = key_part + key_info->key_parts;
1202
char *pos = (char *)key;
1206
for (; key_part != key_part_end; key_part++) {
1207
if (key_part->null_bit) {
1214
if (key_part->type == HA_KEYTYPE_VARTEXT1 ||
1215
key_part->type == HA_KEYTYPE_VARTEXT2) {
1216
len = uint2korr(pos);
1217
rv += len + sizeof(uint16_t);
1219
len = key_part->field->key_length();
1229
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
1230
memcpy(held_key_buf, key, klen);
1231
held_key = held_key_buf;
1232
held_key_len = klen;
1235
/* Converts a native Drizzle index key to BlitzDB's format. */
1236
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
1237
const int key_num, int *return_key_len) {
1238
KeyInfo *key = &getTable()->key_info[key_num];
1239
KeyPartInfo *key_part = key->key_part;
1240
KeyPartInfo *key_part_end = key_part + key->key_parts;
1242
unsigned char *key_pos = (unsigned char *)native_key;
1243
unsigned char *keybuf_pos = (unsigned char *)key_buffer;
1248
memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
1250
for (; key_part != key_part_end; key_part++) {
1251
if (key_part->null_bit) {
1254
/* This key is NULL */
1255
if (!(*keybuf_pos++ = (*key_pos++ == 0)))
1259
/* Normalize a VARTEXT1 key to VARTEXT2. */
1260
if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1261
uint16_t str_len = *(uint16_t *)key_pos;
1263
/* Copy the length of the string over to key buffer. */
1264
int2store(keybuf_pos, str_len);
1265
keybuf_pos += sizeof(str_len);
1267
/* Copy the actual value over to the key buffer. */
1268
memcpy(keybuf_pos, key_pos + sizeof(str_len), str_len);
1269
keybuf_pos += str_len;
1271
/* NULL byte + Length of str (2 byte) + Actual String. */
1272
offset = 1 + sizeof(str_len) + str_len;
1274
end = key_part->field->pack(keybuf_pos, key_pos);
1275
offset = end - keybuf_pos;
1276
keybuf_pos += offset;
1280
key_pos += key_part->field->key_length();
1283
*return_key_len = key_size;
1287
size_t ha_blitz::pack_row(unsigned char *row_buffer,
1288
unsigned char *row_to_pack) {
1291
/* Nothing special to do if the table is fixed length */
1292
if (share->fixed_length_table) {
1293
memcpy(row_buffer, row_to_pack, getTable()->getShare()->getRecordLength());
1294
return (size_t)getTable()->getShare()->getRecordLength();
1297
/* Copy NULL bits */
1298
memcpy(row_buffer, row_to_pack, getTable()->getShare()->null_bytes);
1299
pos = row_buffer + getTable()->getShare()->null_bytes;
1301
/* Pack each field into the buffer */
1302
for (Field **field = getTable()->getFields(); *field; field++) {
1303
if (!((*field)->is_null()))
1304
pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
1307
return (size_t)(pos - row_buffer);
1310
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
1311
const size_t from_len) {
1312
const unsigned char *pos;
1314
/* Nothing special to do */
1315
if (share->fixed_length_table) {
1316
memcpy(to, from, from_len);
1320
/* Start by copying NULL bits which is the beginning block
1321
of a Drizzle row. */
1322
pos = (const unsigned char *)from;
1323
memcpy(to, pos, getTable()->getShare()->null_bytes);
1324
pos += getTable()->getShare()->null_bytes;
1326
/* Unpack all fields in the provided row. */
1327
for (Field **field = getTable()->getFields(); *field; field++) {
1328
if (!((*field)->is_null())) {
1329
pos = (*field)->unpack(to + (*field)->offset(getTable()->getInsertRecord()), pos);
1336
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
1337
unsigned char *buf = pack_buffer;
1339
/* This is a shitty case where the row size is larger than 2KB. */
1340
if (size > BLITZ_MAX_ROW_STACK) {
1341
if (size > secondary_row_buffer_size) {
1342
void *new_ptr = realloc(secondary_row_buffer, size);
1344
if (new_ptr == NULL) {
1345
errno = HA_ERR_OUT_OF_MEM;
1349
secondary_row_buffer_size = size;
1350
secondary_row_buffer = (unsigned char *)new_ptr;
1352
buf = secondary_row_buffer;
1357
static BlitzEngine *blitz_engine = NULL;
1359
BlitzShare *ha_blitz::get_share(const char *name) {
1360
BlitzShare *share_ptr;
1361
BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1362
std::string table_path(name);
1364
pthread_mutex_lock(&blitz_utility_mutex);
1366
/* Look up the table cache to see if the table resource is available */
1367
share_ptr = bz_engine->getTableShare(table_path);
1370
share_ptr->use_count++;
1371
pthread_mutex_unlock(&blitz_utility_mutex);
1375
/* Table wasn't cached so create a new table handler */
1376
share_ptr = new BlitzShare();
1378
/* Prepare the Data Dictionary */
1379
if (share_ptr->dict.startup(table_path.c_str()) != 0) {
1381
pthread_mutex_unlock(&blitz_utility_mutex);
1385
/* Prepare Index Structure(s) */
1386
KeyInfo *curr = &getTable()->getMutableShare()->getKeyInfo(0);
1387
share_ptr->btrees = new BlitzTree[getTable()->getShare()->keys];
1389
for (uint32_t i = 0; i < getTable()->getShare()->keys; i++, curr++) {
1390
share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
1391
share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
1393
if (getTable()->key_info[i].flags & HA_NOSAME)
1394
share_ptr->btrees[i].unique = true;
1396
share_ptr->btrees[i].length = curr->key_length;
1397
share_ptr->btrees[i].nparts = curr->key_parts;
1399
/* Record Meta Data of the Key Segments */
1400
for (uint32_t j = 0; j < curr->key_parts; j++) {
1401
Field *f = curr->key_part[j].field;
1404
share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
1405
share_ptr->btrees[i].parts[j].null_pos
1406
= (uint32_t)(f->null_ptr - (unsigned char *)getTable()->getInsertRecord());
1409
share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
1411
if (f->type() == DRIZZLE_TYPE_BLOB) {
1412
share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
1415
share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
1416
share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
1417
share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
1422
share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
1423
share_ptr->table_name = table_path;
1424
share_ptr->nkeys = getTable()->getShare()->keys;
1425
share_ptr->use_count = 1;
1427
share_ptr->fixed_length_table = !(getTable()->getShare()->db_create_options
1428
& HA_OPTION_PACK_RECORD);
1430
if (getTable()->getShare()->getPrimaryKey() >= MAX_KEY)
1431
share_ptr->primary_key_exists = false;
1433
share_ptr->primary_key_exists = true;
1435
/* Done creating the share object. Cache it for later
1436
use by another cursor object.*/
1437
bz_engine->cacheTableShare(table_path, share_ptr);
1439
pthread_mutex_unlock(&blitz_utility_mutex);
1443
int ha_blitz::free_share(void) {
1444
pthread_mutex_lock(&blitz_utility_mutex);
1446
/* BlitzShare could still be used by another thread. Check the
1447
reference counter to see if it's safe to free it */
1448
if (--share->use_count == 0) {
1449
share->dict.write_meta_autoinc(share->auto_increment_value);
1451
if (share->dict.shutdown() != 0) {
1452
pthread_mutex_unlock(&blitz_utility_mutex);
1453
return HA_ERR_CRASHED_ON_USAGE;
1456
for (uint32_t i = 0; i < share->nkeys; i++) {
1457
delete[] share->btrees[i].parts;
1458
share->btrees[i].close();
1461
BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1462
bz_engine->deleteTableShare(share->table_name);
1464
delete[] share->btrees;
1468
pthread_mutex_unlock(&blitz_utility_mutex);
1472
static int blitz_init(drizzled::module::Context &context) {
1473
blitz_engine = new BlitzEngine("BLITZDB");
1475
if (!blitz_engine->doCreateTableCache()) {
1476
delete blitz_engine;
1477
return HA_ERR_OUT_OF_MEM;
1480
pthread_mutex_init(&blitz_utility_mutex, NULL);
1481
context.add(blitz_engine);
1482
context.registerVariable(new sys_var_uint64_t_ptr("estimated-rows",
1483
&blitz_estimated_rows));
1487
/* Read the prototype of this function for details. */
1488
static char *skip_btree_key(const char *key, const size_t skip_len,
1490
char *pos = (char *)key;
1491
*return_klen = uint2korr(pos + skip_len);
1492
return pos + skip_len + sizeof(uint16_t);
1495
static bool str_is_numeric(const std::string &str) {
1496
for (uint32_t i = 0; i < str.length(); i++) {
1497
if (!std::isdigit(str[i]))
1503
static void blitz_init_options(drizzled::module::option_context &context)
1505
context("estimated-rows",
1506
po::value<uint64_t>(&blitz_estimated_rows)->default_value(0),
1507
N_("Estimated number of rows that a BlitzDB table will store."));
1510
DRIZZLE_PLUGIN(blitz_init, NULL, blitz_init_options);