~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/blitzdb/ha_blitz.cc

  • Committer: Brian Aker
  • Date: 2010-12-08 22:35:56 UTC
  • mfrom: (1819.9.158 update-innobase)
  • Revision ID: brian@tangent.org-20101208223556-37mi4omqg7lkjzf3
Merge in Stewart's changes, 1.3 changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 - 2010 Toru Maesaka
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; version 2 of the License.
 
9
 *
 
10
 *  This program is distributed in the hope that it will be useful,
 
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 *  GNU General Public License for more details.
 
14
 *
 
15
 *  You should have received a copy of the GNU General Public License
 
16
 *  along with this program; if not, write to the Free Software
 
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 */
 
19
 
 
20
#include "config.h"
 
21
#include "ha_blitz.h"
 
22
 
 
23
using namespace std;
 
24
using namespace drizzled;
 
25
namespace po= boost::program_options;
 
26
 
 
27
static pthread_mutex_t blitz_utility_mutex;
 
28
 
 
29
static const char *ha_blitz_exts[] = {
 
30
  BLITZ_DATA_EXT,
 
31
  BLITZ_INDEX_EXT,
 
32
  BLITZ_SYSTEM_EXT,
 
33
  NULL
 
34
};
 
35
 
 
36
/* Global Variables for Startup Options */
 
37
uint64_t blitz_estimated_rows;
 
38
 
 
39
class BlitzEngine : public drizzled::plugin::StorageEngine {
 
40
private:
 
41
  TCMAP *blitz_table_cache;
 
42
 
 
43
public:
 
44
  BlitzEngine(const std::string &name_arg) :
 
45
    drizzled::plugin::StorageEngine(name_arg,
 
46
                                    drizzled::HTON_NULL_IN_KEY |
 
47
                                    drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
 
48
                                    drizzled::HTON_STATS_RECORDS_IS_EXACT |
 
49
                                    drizzled::HTON_SKIP_STORE_LOCK) {
 
50
    table_definition_ext = BLITZ_SYSTEM_EXT;
 
51
  }
 
52
 
 
53
  virtual ~BlitzEngine() {
 
54
    pthread_mutex_destroy(&blitz_utility_mutex);
 
55
    tcmapdel(blitz_table_cache);
 
56
  }
 
57
 
 
58
  virtual drizzled::Cursor *create(drizzled::Table &table) {
 
59
    return new ha_blitz(*this, table);
 
60
  }
 
61
 
 
62
  const char **bas_ext() const {
 
63
    return ha_blitz_exts;
 
64
  }
 
65
 
 
66
  int doCreateTable(drizzled::Session &session,
 
67
                    drizzled::Table &table_arg,
 
68
                    const drizzled::TableIdentifier &identifier,
 
69
                    drizzled::message::Table &table_proto);
 
70
 
 
71
  int doRenameTable(drizzled::Session &session,
 
72
                    const drizzled::TableIdentifier &from_identifier,
 
73
                    const drizzled::TableIdentifier &to_identifier);
 
74
 
 
75
  int doDropTable(drizzled::Session &session,
 
76
                  const drizzled::TableIdentifier &identifier);
 
77
 
 
78
  int doGetTableDefinition(drizzled::Session &session,
 
79
                           const drizzled::TableIdentifier &identifier,
 
80
                           drizzled::message::Table &table_proto);
 
81
 
 
82
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
83
                             const drizzled::SchemaIdentifier &schema_identifier,
 
84
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
85
 
 
86
  bool doDoesTableExist(drizzled::Session &session,
 
87
                        const drizzled::TableIdentifier &identifier);
 
88
 
 
89
  bool validateCreateTableOption(const std::string &key,
 
90
                                 const std::string &state);
 
91
 
 
92
  bool doCreateTableCache(void);
 
93
 
 
94
  BlitzShare *getTableShare(const std::string &name);
 
95
  void cacheTableShare(const std::string &name, BlitzShare *share);
 
96
  void deleteTableShare(const std::string &name);
 
97
 
 
98
  uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
 
99
  uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
 
100
  uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
 
101
 
 
102
  uint32_t index_flags(enum drizzled::ha_key_alg) const {
 
103
    return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
 
104
            HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
 
105
  }
 
106
};
 
107
 
 
108
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
 
109
   a key to that row in the data dictionary. Two keys are merged and
 
110
   stored as a key because we want to avoid reading the leaf node and
 
111
   thus save disk IO and some computation in the tree. Note that the
 
112
   comparison function of BlitzDB's btree only takes into accound the
 
113
   actual index key. See blitzcmp.cc for details.
 
114
 
 
115
   With the above in mind, this helper function returns a pointer to
 
116
   the dictionary key by calculating the offset. */
 
117
static char *skip_btree_key(const char *key, const size_t skip_len,
 
118
                            int *return_klen);
 
119
 
 
120
static bool str_is_numeric(const std::string &str);
 
121
 
 
122
int BlitzEngine::doCreateTable(drizzled::Session &,
 
123
                               drizzled::Table &table,
 
124
                               const drizzled::TableIdentifier &identifier,
 
125
                               drizzled::message::Table &proto) {
 
126
  BlitzData dict;
 
127
  BlitzTree btree;
 
128
  int ecode;
 
129
 
 
130
  /* Temporary fix for blocking composite keys. We need to add this
 
131
     check because version 1 doesn't handle composite indexes. */
 
132
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
 
133
    if (table.key_info[i].key_parts > 1)
 
134
      return HA_ERR_UNSUPPORTED;
 
135
  }
 
136
 
 
137
  /* Create relevant files for a new table and close them immediately.
 
138
     All we want to do here is somewhat like UNIX touch(1). */
 
139
  if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
 
140
    return ecode;
 
141
 
 
142
  if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
 
143
    return ecode;
 
144
 
 
145
  /* Create b+tree index(es) for this table. */
 
146
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
 
147
    if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
 
148
      return ecode;
 
149
  }
 
150
 
 
151
  /* Write the table definition to system table. */
 
152
  if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
 
153
    return ecode;
 
154
 
 
155
  if (!dict.write_table_definition(proto)) {
 
156
    dict.close_system_table();
 
157
    return HA_ERR_CRASHED_ON_USAGE;
 
158
  }
 
159
 
 
160
  dict.close_system_table();
 
161
  return 0;
 
162
}
 
163
 
 
164
int BlitzEngine::doRenameTable(drizzled::Session &,
 
165
                               const drizzled::TableIdentifier &from,
 
166
                               const drizzled::TableIdentifier &to) {
 
167
  int rv = 0;
 
168
 
 
169
  BlitzData blitz_table;
 
170
  uint32_t nkeys;
 
171
 
 
172
  BlitzData dict;
 
173
  int ecode;
 
174
  /* Write the table definition to system table. */
 
175
  if ((ecode = dict.open_system_table(from.getPath(), HDBOWRITER)) != 0)
 
176
    return ecode;
 
177
 
 
178
  drizzled::message::Table proto;
 
179
  char *proto_string;
 
180
  int proto_string_len;
 
181
 
 
182
  proto_string = dict.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
 
183
                                       BLITZ_TABLE_PROTO_KEY.length(),
 
184
                                       &proto_string_len);
 
185
 
 
186
  if (proto_string == NULL) {
 
187
    return ENOMEM;
 
188
  }
 
189
 
 
190
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
 
191
    free(proto_string);
 
192
    return HA_ERR_CRASHED_ON_USAGE;
 
193
  }
 
194
 
 
195
  free(proto_string);
 
196
 
 
197
  proto.set_name(to.getTableName());
 
198
  proto.set_schema(to.getSchemaName());
 
199
  proto.set_catalog(to.getCatalogName());
 
200
 
 
201
  if (!dict.write_table_definition(proto)) {
 
202
    dict.close_system_table();
 
203
    return HA_ERR_CRASHED_ON_USAGE;
 
204
  }
 
205
 
 
206
  dict.close_system_table();
 
207
 
 
208
  /* Find out the number of indexes in this table. This information
 
209
     is required because BlitzDB creates a file for each indexes.*/
 
210
  if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
 
211
    return HA_ERR_CRASHED_ON_USAGE;
 
212
 
 
213
  nkeys = blitz_table.read_meta_keycount();
 
214
 
 
215
  if (blitz_table.close_data_table() != 0)
 
216
    return HA_ERR_CRASHED_ON_USAGE;
 
217
 
 
218
  /* We're now ready to rename the file(s) for this table. Start by
 
219
     attempting to rename the data and system files. */
 
220
  if (rename_file_ext(from.getPath().c_str(),
 
221
                      to.getPath().c_str(), BLITZ_DATA_EXT)) {
 
222
    if ((rv = errno) != ENOENT)
 
223
      return rv;
 
224
  }
 
225
 
 
226
  if (rename_file_ext(from.getPath().c_str(),
 
227
                      to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
 
228
    if ((rv = errno) != ENOENT)
 
229
      return rv;
 
230
  }
 
231
 
 
232
  /* So far so good. Rename the index file(s) and we're done. */
 
233
  BlitzTree btree;
 
234
 
 
235
  for (uint32_t i = 0; i < nkeys; i++) {
 
236
    if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
 
237
      return HA_ERR_CRASHED_ON_USAGE;
 
238
  }
 
239
 
 
240
  return rv;
 
241
}
 
242
 
 
243
int BlitzEngine::doDropTable(drizzled::Session &,
 
244
                             const drizzled::TableIdentifier &identifier) {
 
245
  BlitzData dict;
 
246
  BlitzTree btree;
 
247
  char buf[FN_REFLEN];
 
248
  uint32_t nkeys;
 
249
  int err;
 
250
 
 
251
  /* We open the dictionary to extract meta data from it */
 
252
  if ((err = dict.open_data_table(identifier.getPath().c_str(),
 
253
                                  HDBOREADER)) != 0) {
 
254
    return err;
 
255
  }
 
256
 
 
257
  nkeys = dict.read_meta_keycount();
 
258
 
 
259
  /* We no longer need the dictionary to be open */
 
260
  dict.close_data_table();
 
261
 
 
262
  /* Drop the Data Dictionary */
 
263
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
 
264
  if ((err = unlink(buf)) == -1) {
 
265
    return err;
 
266
  }
 
267
 
 
268
  /* Drop the System Table */
 
269
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
 
270
  if ((err = unlink(buf)) == -1) {
 
271
    return err;
 
272
  }
 
273
 
 
274
  /* Drop Index file(s) */
 
275
  for (uint32_t i = 0; i < nkeys; i++) {
 
276
    if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
 
277
      return err;
 
278
    }
 
279
  }
 
280
 
 
281
  return 0;
 
282
}
 
283
 
 
284
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
 
285
                                      const drizzled::TableIdentifier &identifier,
 
286
                                      drizzled::message::Table &proto) {
 
287
  struct stat stat_info;
 
288
  std::string path(identifier.getPath());
 
289
 
 
290
  path.append(BLITZ_SYSTEM_EXT);
 
291
 
 
292
  if (stat(path.c_str(), &stat_info)) {
 
293
    return errno;
 
294
  }
 
295
 
 
296
  BlitzData db;
 
297
  char *proto_string;
 
298
  int proto_string_len;
 
299
 
 
300
  if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
 
301
    return HA_ERR_CRASHED_ON_USAGE;
 
302
  }
 
303
 
 
304
  proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
 
305
                                     BLITZ_TABLE_PROTO_KEY.length(),
 
306
                                     &proto_string_len);
 
307
 
 
308
  if (db.close_system_table() != 0) {
 
309
    return HA_ERR_CRASHED_ON_USAGE;
 
310
  }
 
311
 
 
312
  if (proto_string == NULL) {
 
313
    return ENOMEM;
 
314
  }
 
315
 
 
316
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
 
317
    free(proto_string);
 
318
    return HA_ERR_CRASHED_ON_USAGE;
 
319
  }
 
320
 
 
321
  free(proto_string);
 
322
 
 
323
  return EEXIST;
 
324
}
 
325
 
 
326
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
327
                                        const drizzled::SchemaIdentifier &schema_id,
 
328
                                        drizzled::TableIdentifier::vector &ids) {
 
329
  drizzled::CachedDirectory::Entries entries = directory.getEntries();
 
330
 
 
331
  for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
 
332
       entry_iter != entries.end(); ++entry_iter) {
 
333
 
 
334
    drizzled::CachedDirectory::Entry *entry = *entry_iter;
 
335
    const std::string *filename = &entry->filename;
 
336
 
 
337
    assert(filename->size());
 
338
 
 
339
    const char *ext = strchr(filename->c_str(), '.');
 
340
 
 
341
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
 
342
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
 
343
    } else {
 
344
      char uname[NAME_LEN + 1];
 
345
      uint32_t file_name_len;
 
346
 
 
347
      file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
 
348
                                                             uname,
 
349
                                                             sizeof(uname));
 
350
 
 
351
      uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
 
352
      ids.push_back(TableIdentifier(schema_id, uname));
 
353
    }
 
354
  }
 
355
}
 
356
 
 
357
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
 
358
                                   const drizzled::TableIdentifier &identifier) {
 
359
  std::string proto_path(identifier.getPath());
 
360
  proto_path.append(BLITZ_DATA_EXT);
 
361
 
 
362
  return (access(proto_path.c_str(), F_OK)) ? false : true;
 
363
}
 
364
 
 
365
bool BlitzEngine::validateCreateTableOption(const std::string &key,
 
366
                                            const std::string &state) {
 
367
  if (key == "ESTIMATED_ROWS" || key == "estimated_rows") {
 
368
    if (str_is_numeric(state))
 
369
      return true;
 
370
  }
 
371
  return false;
 
372
}
 
373
 
 
374
bool BlitzEngine::doCreateTableCache(void) {
 
375
  return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
 
376
}
 
377
 
 
378
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
 
379
  int vlen;
 
380
  const void *fetched;
 
381
  BlitzShare *rv = NULL;
 
382
 
 
383
  fetched = tcmapget(blitz_table_cache, table_name.c_str(),
 
384
                     table_name.length(), &vlen);
 
385
 
 
386
  /* dereference the object */
 
387
  if (fetched)
 
388
    rv = *(BlitzShare **)fetched;
 
389
 
 
390
  return rv;
 
391
}
 
392
 
 
393
void BlitzEngine::cacheTableShare(const std::string &table_name,
 
394
                                  BlitzShare *share) {
 
395
  /* Cache the memory address of the share object */
 
396
  tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
 
397
           &share, sizeof(share));
 
398
}
 
399
 
 
400
void BlitzEngine::deleteTableShare(const std::string &table_name) {
 
401
  tcmapout2(blitz_table_cache, table_name.c_str());
 
402
}
 
403
 
 
404
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
 
405
                   Table &table_arg) : Cursor(engine_arg, table_arg),
 
406
                                            btree_cursor(NULL),
 
407
                                            table_scan(false),
 
408
                                            table_based(false),
 
409
                                            thread_locked(false),
 
410
                                            held_key(NULL),
 
411
                                            held_key_len(0),
 
412
                                            current_key(NULL),
 
413
                                            current_key_len(0),
 
414
                                            key_buffer(NULL),
 
415
                                            errkey_id(0) {}
 
416
 
 
417
int ha_blitz::open(const char *table_name, int, uint32_t) {
 
418
  if ((share = get_share(table_name)) == NULL)
 
419
    return HA_ERR_CRASHED_ON_USAGE;
 
420
 
 
421
  pthread_mutex_lock(&blitz_utility_mutex);
 
422
 
 
423
  btree_cursor = new BlitzCursor[share->nkeys];
 
424
 
 
425
  for (uint32_t i = 0; i < share->nkeys; i++) {
 
426
    if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
 
427
      free_share();
 
428
      pthread_mutex_unlock(&blitz_utility_mutex);
 
429
      return HA_ERR_OUT_OF_MEM;
 
430
    }
 
431
  }
 
432
 
 
433
  if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
 
434
    free_share();
 
435
    pthread_mutex_unlock(&blitz_utility_mutex);
 
436
    return HA_ERR_OUT_OF_MEM;
 
437
  }
 
438
 
 
439
  if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
 
440
    free_share();
 
441
    pthread_mutex_unlock(&blitz_utility_mutex);
 
442
    return HA_ERR_OUT_OF_MEM;
 
443
  }
 
444
 
 
445
  if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
 
446
    free_share();
 
447
    free(key_buffer);
 
448
    free(key_merge_buffer);
 
449
    pthread_mutex_unlock(&blitz_utility_mutex);
 
450
    return HA_ERR_OUT_OF_MEM;
 
451
  }
 
452
 
 
453
  secondary_row_buffer = NULL;
 
454
  secondary_row_buffer_size = 0;
 
455
  key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
 
456
 
 
457
  /* 'ref_length' determines the size of the buffer that the kernel
 
458
     will use to uniquely identify a row. The actual allocation is
 
459
     done by the kernel so all we do here is specify the size of it.*/
 
460
  if (share->primary_key_exists) {
 
461
    ref_length = getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
 
462
  } else {
 
463
    ref_length = sizeof(held_key_len) + sizeof(uint64_t);
 
464
  }
 
465
 
 
466
  pthread_mutex_unlock(&blitz_utility_mutex);
 
467
  return 0;
 
468
}
 
469
 
 
470
int ha_blitz::close(void) {
 
471
  for (uint32_t i = 0; i < share->nkeys; i++) {
 
472
    share->btrees[i].destroy_cursor(&btree_cursor[i]);
 
473
  }
 
474
  delete [] btree_cursor;
 
475
 
 
476
  free(key_buffer);
 
477
  free(key_merge_buffer);
 
478
  free(held_key_buf);
 
479
  free(secondary_row_buffer);
 
480
  return free_share();
 
481
}
 
482
 
 
483
int ha_blitz::info(uint32_t flag) {
 
484
  if (flag & HA_STATUS_VARIABLE) {
 
485
    stats.records = share->dict.nrecords();
 
486
    stats.data_file_length = share->dict.table_size();
 
487
  }
 
488
 
 
489
  if (flag & HA_STATUS_AUTO)
 
490
    stats.auto_increment_value = share->auto_increment_value + 1;
 
491
 
 
492
  if (flag & HA_STATUS_ERRKEY)
 
493
    errkey = errkey_id;
 
494
 
 
495
  return 0;
 
496
}
 
497
 
 
498
int ha_blitz::doStartTableScan(bool scan) {
 
499
  /* Obtain the query type for this scan */
 
500
  sql_command_type = session_sql_command(getTable()->getSession());
 
501
  table_scan = scan;
 
502
  table_based = true;
 
503
 
 
504
  /* Obtain the most suitable lock for the given statement type. */
 
505
  blitz_optimal_lock();
 
506
 
 
507
  /* Get the first record from TCHDB. Let the scanner take
 
508
     care of checking return value errors. */
 
509
  if (table_scan) {
 
510
    current_key = share->dict.next_key_and_row(NULL, 0,
 
511
                                               &current_key_len,
 
512
                                               &current_row,
 
513
                                               &current_row_len);
 
514
  }
 
515
  return 0;
 
516
}
 
517
 
 
518
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
 
519
  char *next_key;
 
520
  const char *next_row;
 
521
  int next_row_len;
 
522
  int next_key_len;
 
523
 
 
524
  free(held_key);
 
525
  held_key = NULL;
 
526
 
 
527
  if (current_key == NULL) {
 
528
    getTable()->status = STATUS_NOT_FOUND;
 
529
    return HA_ERR_END_OF_FILE;
 
530
  }
 
531
 
 
532
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
533
 
 
534
  /* Unpack and copy the current row to Drizzle's result buffer. */
 
535
  unpack_row(drizzle_buf, current_row, current_row_len);
 
536
 
 
537
  /* Retrieve both key and row of the next record with one allocation. */
 
538
  next_key = share->dict.next_key_and_row(current_key, current_key_len,
 
539
                                          &next_key_len, &next_row,
 
540
                                          &next_row_len);
 
541
 
 
542
  /* Memory region for "current_row" will be freed as "held key" on
 
543
     the next iteration. This is because "current_key" points to the
 
544
     region of memory that contains "current_row" and "held_key" points
 
545
     to it. If there isn't another iteration then it is freed in doEndTableScan(). */
 
546
  current_row = next_row;
 
547
  current_row_len = next_row_len;
 
548
 
 
549
  /* Remember the current row because delete, update or replace
 
550
     function could be called after this function. This pointer is
 
551
     also used to free the previous key and row, which resides on
 
552
     the same buffer. */
 
553
  held_key = current_key;
 
554
  held_key_len = current_key_len;
 
555
 
 
556
  /* It is now memory-leak-safe to point current_key to next_key. */
 
557
  current_key = next_key;
 
558
  current_key_len = next_key_len;
 
559
  getTable()->status = 0;
 
560
  return 0;
 
561
}
 
562
 
 
563
int ha_blitz::doEndTableScan() {
 
564
  if (table_scan && current_key)
 
565
    free(current_key);
 
566
  if (table_scan && held_key)
 
567
    free(held_key);
 
568
 
 
569
  current_key = NULL;
 
570
  held_key = NULL;
 
571
  current_key_len = 0;
 
572
  held_key_len = 0;
 
573
  table_scan = false;
 
574
  table_based = false;
 
575
 
 
576
  if (thread_locked)
 
577
    blitz_optimal_unlock();
 
578
 
 
579
  return 0;
 
580
}
 
581
 
 
582
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
 
583
  char *row;
 
584
  char *key = NULL;
 
585
  int key_len, row_len;
 
586
 
 
587
  memcpy(&key_len, pos, sizeof(key_len));
 
588
  key = (char *)(pos + sizeof(key_len));
 
589
 
 
590
  /* TODO: Find a better error type. */
 
591
  if (key == NULL)
 
592
    return HA_ERR_KEY_NOT_FOUND;
 
593
 
 
594
  row = share->dict.get_row(key, key_len, &row_len);
 
595
 
 
596
  if (row == NULL)
 
597
    return HA_ERR_KEY_NOT_FOUND;
 
598
 
 
599
  unpack_row(copy_to, row, row_len);
 
600
 
 
601
  /* Remember the key location on memory if the thread is not doing
 
602
     a table scan. This is because either update_row() or delete_row()
 
603
     might be called after this function. */
 
604
  if (!table_scan) {
 
605
    held_key = key;
 
606
    held_key_len = key_len;
 
607
  }
 
608
 
 
609
  free(row);
 
610
  return 0;
 
611
}
 
612
 
 
613
void ha_blitz::position(const unsigned char *) {
 
614
  int length = sizeof(held_key_len);
 
615
  memcpy(ref, &held_key_len, length);
 
616
  memcpy(ref + length, (unsigned char *)held_key, held_key_len);
 
617
}
 
618
 
 
619
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
 
620
  return "BTREE";
 
621
}
 
622
 
 
623
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
 
624
  active_index = key_num;
 
625
  sql_command_type = session_sql_command(getTable()->getSession());
 
626
 
 
627
  /* This is unlikely to happen but just for assurance, re-obtain
 
628
     the lock if this thread already has a certain lock. This makes
 
629
     sure that this thread will get the most appropriate lock for
 
630
     the current statement. */
 
631
  if (thread_locked)
 
632
    blitz_optimal_unlock();
 
633
 
 
634
  blitz_optimal_lock();
 
635
  return 0;
 
636
}
 
637
 
 
638
int ha_blitz::index_first(unsigned char *buf) {
 
639
  char *dict_key, *bt_key, *row;
 
640
  int dict_klen, bt_klen, prefix_len, rlen;
 
641
 
 
642
  bt_key = btree_cursor[active_index].first_key(&bt_klen);
 
643
 
 
644
  if (bt_key == NULL)
 
645
    return HA_ERR_END_OF_FILE;
 
646
 
 
647
  prefix_len = btree_key_length(bt_key, active_index);
 
648
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
649
 
 
650
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
651
    free(bt_key);
 
652
    return HA_ERR_KEY_NOT_FOUND;
 
653
  }
 
654
 
 
655
  unpack_row(buf, row, rlen);
 
656
  keep_track_of_key(bt_key, bt_klen);
 
657
 
 
658
  free(bt_key);
 
659
  free(row);
 
660
  return 0;
 
661
}
 
662
 
 
663
int ha_blitz::index_next(unsigned char *buf) {
 
664
  char *dict_key, *bt_key, *row;
 
665
  int dict_klen, bt_klen, prefix_len, rlen;
 
666
 
 
667
  bt_key = btree_cursor[active_index].next_key(&bt_klen);
 
668
 
 
669
  if (bt_key == NULL) {
 
670
    getTable()->status = STATUS_NOT_FOUND;
 
671
    return HA_ERR_END_OF_FILE;
 
672
  }
 
673
 
 
674
  prefix_len = btree_key_length(bt_key, active_index);
 
675
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
676
 
 
677
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
678
    free(bt_key);
 
679
    getTable()->status = STATUS_NOT_FOUND;
 
680
    return HA_ERR_KEY_NOT_FOUND;
 
681
  }
 
682
 
 
683
  unpack_row(buf, row, rlen);
 
684
  keep_track_of_key(bt_key, bt_klen);
 
685
 
 
686
  free(bt_key);
 
687
  free(row);
 
688
  return 0;
 
689
}
 
690
 
 
691
int ha_blitz::index_prev(unsigned char *buf) {
 
692
  char *dict_key, *bt_key, *row;
 
693
  int dict_klen, bt_klen, prefix_len, rlen;
 
694
 
 
695
  bt_key = btree_cursor[active_index].prev_key(&bt_klen);
 
696
 
 
697
  if (bt_key == NULL)
 
698
    return HA_ERR_END_OF_FILE;
 
699
 
 
700
  prefix_len = btree_key_length(bt_key, active_index);
 
701
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
702
 
 
703
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
704
    free(bt_key);
 
705
    return HA_ERR_KEY_NOT_FOUND;
 
706
  }
 
707
 
 
708
  unpack_row(buf, row, rlen);
 
709
  keep_track_of_key(bt_key, bt_klen);
 
710
 
 
711
  free(bt_key);
 
712
  free(row);
 
713
  return 0;
 
714
}
 
715
 
 
716
int ha_blitz::index_last(unsigned char *buf) {
 
717
  char *dict_key, *bt_key, *row;
 
718
  int dict_klen, bt_klen, prefix_len, rlen;
 
719
 
 
720
  bt_key = btree_cursor[active_index].final_key(&bt_klen);
 
721
 
 
722
  if (bt_key == NULL)
 
723
    return HA_ERR_KEY_NOT_FOUND;
 
724
 
 
725
  prefix_len = btree_key_length(bt_key, active_index);
 
726
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
727
 
 
728
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
729
    free(bt_key);
 
730
    errkey_id = active_index;
 
731
    return HA_ERR_KEY_NOT_FOUND;
 
732
  }
 
733
 
 
734
  unpack_row(buf, row, rlen);
 
735
  keep_track_of_key(bt_key, bt_klen);
 
736
 
 
737
  free(bt_key);
 
738
  free(row);
 
739
  return 0;
 
740
}
 
741
 
 
742
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
 
743
                         uint32_t key_len, enum ha_rkey_function find_flag) {
 
744
  return index_read_idx(buf, active_index, key, key_len, find_flag);
 
745
}
 
746
 
 
747
/* This is where the read related index logic lives. It is used by both
 
748
   BlitzDB and the Database Kernel (specifically, by the optimizer). */
 
749
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
 
750
                             const unsigned char *key, uint32_t,
 
751
                             enum ha_rkey_function search_mode) {
 
752
 
 
753
  /* If the provided key is NULL, we are required to return the first
 
754
     row in the active_index. */
 
755
  if (key == NULL)
 
756
    return this->index_first(buf);
 
757
 
 
758
  /* Otherwise we search for it. Prepare the key to look up the tree. */
 
759
  int packed_klen;
 
760
  char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
 
761
 
 
762
  /* Lookup the tree and get the master key. */
 
763
  int unique_klen;
 
764
  char *unique_key;
 
765
 
 
766
  unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
 
767
                                              packed_klen, &unique_klen);
 
768
 
 
769
  if (unique_key == NULL) {
 
770
    errkey_id = key_num;
 
771
    return HA_ERR_KEY_NOT_FOUND;
 
772
  }
 
773
 
 
774
  /* Got the master key. Prepare it to lookup the data dictionary. */
 
775
  int dict_klen;
 
776
  int skip_len = btree_key_length(unique_key, key_num);
 
777
  char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
 
778
 
 
779
  /* Fetch the packed row from the data dictionary. */
 
780
  int row_len;
 
781
  char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
 
782
 
 
783
  if (fetched_row == NULL) {
 
784
    errkey_id = key_num;
 
785
    free(unique_key);
 
786
    return HA_ERR_KEY_NOT_FOUND;
 
787
  }
 
788
 
 
789
  /* Unpack it into Drizzle's return buffer and keep track of the
 
790
     master key for future use (before index_end() is called). */
 
791
  unpack_row(buf, fetched_row, row_len);
 
792
  keep_track_of_key(unique_key, unique_klen);
 
793
 
 
794
  free(unique_key);
 
795
  free(fetched_row);
 
796
  return 0;
 
797
}
 
798
 
 
799
int ha_blitz::doEndIndexScan(void) {
 
800
  held_key = NULL;
 
801
  held_key_len = 0;
 
802
 
 
803
  btree_cursor[active_index].moved = false;
 
804
 
 
805
  if (thread_locked)
 
806
    blitz_optimal_unlock();
 
807
 
 
808
  return 0;
 
809
}
 
810
 
 
811
int ha_blitz::enable_indexes(uint32_t) {
 
812
  return HA_ERR_UNSUPPORTED;
 
813
}
 
814
 
 
815
int ha_blitz::disable_indexes(uint32_t) {
 
816
  return HA_ERR_UNSUPPORTED;
 
817
}
 
818
 
 
819
/* Find the estimated number of rows between min_key and max_key.
 
820
   Leave the proper implementation of this for now since there are
 
821
   too many exceptions to cover. */
 
822
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
 
823
                                   drizzled::key_range * /*min_key*/,
 
824
                                   drizzled::key_range * /*max_key*/) {
 
825
  return BLITZ_WORST_CASE_RANGE;
 
826
}
 
827
 
 
828
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
 
829
  int rv;
 
830
 
 
831
  ha_statistic_increment(&system_status_var::ha_write_count);
 
832
 
 
833
  /* Prepare Auto Increment field if one exists. */
 
834
  if (getTable()->next_number_field && drizzle_row == getTable()->getInsertRecord()) {
 
835
    pthread_mutex_lock(&blitz_utility_mutex);
 
836
    if ((rv = update_auto_increment()) != 0) {
 
837
      pthread_mutex_unlock(&blitz_utility_mutex);
 
838
      return rv;
 
839
    }
 
840
 
 
841
    uint64_t next_val = getTable()->next_number_field->val_int();
 
842
 
 
843
    if (next_val > share->auto_increment_value) {
 
844
      share->auto_increment_value = next_val;
 
845
      stats.auto_increment_value = share->auto_increment_value + 1;
 
846
    }
 
847
    pthread_mutex_unlock(&blitz_utility_mutex);
 
848
  }
 
849
 
 
850
  /* Serialize a primary key for this row. If a PK doesn't exist,
 
851
     an internal hidden ID will be generated. We obtain the PK here
 
852
     and pack it to this function's local buffer instead of the
 
853
     thread's own 'key_buffer' because the PK value needs to be
 
854
     remembered when writing non-PK keys AND because the 'key_buffer'
 
855
     will be used to generate these non-PK keys. */
 
856
  char temp_pkbuf[BLITZ_MAX_KEY_LEN];
 
857
  size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
 
858
 
 
859
  /* Obtain a buffer that can accommodate this row. We then pack
 
860
     the provided row into it. Note that this code works most
 
861
     efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
 
862
  unsigned char *row_buf = get_pack_buffer(max_row_length());
 
863
  size_t row_len = pack_row(row_buf, drizzle_row);
 
864
 
 
865
  uint32_t curr_key = 0;
 
866
  uint32_t lock_id = 0;
 
867
 
 
868
  if (share->nkeys > 0) {
 
869
    lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
 
870
    share->blitz_lock.slotted_lock(lock_id);
 
871
  }
 
872
 
 
873
  /* We isolate this condition outside the key loop to avoid the CPU
 
874
     from going through unnecessary conditional branching on heavy
 
875
     insertion load. TODO: Optimize this block. PK should not need
 
876
     to go through merge_key() since this information is redundant. */
 
877
  if (share->primary_key_exists) {
 
878
    char *key = NULL;
 
879
    size_t klen = 0;
 
880
 
 
881
    key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
 
882
 
 
883
    rv = share->btrees[curr_key].write_unique(key, klen);
 
884
 
 
885
    if (rv == HA_ERR_FOUND_DUPP_KEY) {
 
886
      errkey_id = curr_key;
 
887
      share->blitz_lock.slotted_unlock(lock_id);
 
888
      return rv;
 
889
    }
 
890
    curr_key = 1;
 
891
  }
 
892
 
 
893
  /* Loop over the keys and write them to it's exclusive tree. */
 
894
  while (curr_key < share->nkeys) {
 
895
    char *key = NULL;
 
896
    size_t prefix_len = 0;
 
897
    size_t klen = 0;
 
898
 
 
899
    prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
 
900
    key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
 
901
 
 
902
    if (share->btrees[curr_key].unique) {
 
903
      rv = share->btrees[curr_key].write_unique(key, klen);
 
904
    } else {
 
905
      rv = share->btrees[curr_key].write(key, klen);
 
906
    }
 
907
 
 
908
    if (rv != 0) {
 
909
      errkey_id = curr_key;
 
910
      share->blitz_lock.slotted_unlock(lock_id);
 
911
      return rv;
 
912
    }
 
913
 
 
914
    curr_key++;
 
915
  }
 
916
 
 
917
  /* Write the row to the Data Dictionary. */
 
918
  rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
 
919
 
 
920
  if (share->nkeys > 0)
 
921
    share->blitz_lock.slotted_unlock(lock_id);
 
922
 
 
923
  return rv;
 
924
}
 
925
 
 
926
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
 
927
                             unsigned char *new_row) {
 
928
  int rv;
 
929
  uint32_t lock_id = 0;
 
930
 
 
931
  ha_statistic_increment(&system_status_var::ha_update_count);
 
932
 
 
933
  /* Handle Indexes */
 
934
  if (share->nkeys > 0) {
 
935
    /* BlitzDB cannot update an indexed row on table scan. */
 
936
    if (table_scan)
 
937
      return HA_ERR_UNSUPPORTED;
 
938
 
 
939
    if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
 
940
      return rv;
 
941
 
 
942
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
 
943
    share->blitz_lock.slotted_lock(lock_id);
 
944
 
 
945
    /* Update all relevant index entries. Start by deleting the
 
946
       the existing key then write the new key. Something we should
 
947
       consider in the future is to take a diff of the keys and only
 
948
       update changed keys. */
 
949
    int skip = btree_key_length(held_key, active_index);
 
950
    char *suffix = held_key + skip;
 
951
    uint16_t suffix_len = uint2korr(suffix);
 
952
 
 
953
    suffix += sizeof(suffix_len);
 
954
 
 
955
    for (uint32_t i = 0; i < share->nkeys; i++) {
 
956
      char *key;
 
957
      size_t prefix_len, klen;
 
958
 
 
959
      klen = 0;
 
960
      prefix_len = make_index_key(key_buffer, i, old_row);
 
961
      key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
 
962
 
 
963
      if (share->btrees[i].delete_key(key, klen) != 0) {
 
964
        errkey_id = i;
 
965
        share->blitz_lock.slotted_unlock(lock_id);
 
966
        return HA_ERR_KEY_NOT_FOUND;
 
967
      }
 
968
 
 
969
      /* Now write the new key. */
 
970
      prefix_len = make_index_key(key_buffer, i, new_row);
 
971
 
 
972
      if (i == getTable()->getShare()->getPrimaryKey()) {
 
973
        key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
 
974
        rv = share->btrees[i].write(key, klen);
 
975
      } else {
 
976
        key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
 
977
        rv = share->btrees[i].write(key, klen);
 
978
      }
 
979
 
 
980
      if (rv != 0) {
 
981
        errkey_id = i;
 
982
        share->blitz_lock.slotted_unlock(lock_id);
 
983
        return rv;
 
984
      }
 
985
    }
 
986
  }
 
987
 
 
988
  /* Getting this far means that the index has been successfully
 
989
     updated. We now update the Data Dictionary. This implementation
 
990
     is admittedly far from optimial and will be revisited. */
 
991
  size_t row_len = max_row_length();
 
992
  unsigned char *row_buf = get_pack_buffer(row_len);
 
993
  row_len = pack_row(row_buf, new_row);
 
994
 
 
995
  /* This is a basic case where we can simply overwrite the key. */
 
996
  if (table_based) {
 
997
    rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
 
998
  } else {
 
999
    int klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), old_row);
 
1000
 
 
1001
    /* Delete with the old key. */
 
1002
    share->dict.delete_row(key_buffer, klen);
 
1003
 
 
1004
    /* Write with the new key. */
 
1005
    klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), new_row);
 
1006
    rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
 
1007
  }
 
1008
 
 
1009
  if (share->nkeys > 0)
 
1010
    share->blitz_lock.slotted_unlock(lock_id);
 
1011
 
 
1012
  return rv;
 
1013
}
 
1014
 
 
1015
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
 
1016
  int rv;
 
1017
 
 
1018
  ha_statistic_increment(&system_status_var::ha_delete_count);
 
1019
 
 
1020
  char *dict_key = held_key;
 
1021
  int dict_klen = held_key_len;
 
1022
  uint32_t lock_id = 0;
 
1023
 
 
1024
  if (share->nkeys > 0) {
 
1025
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
 
1026
    share->blitz_lock.slotted_lock(lock_id);
 
1027
 
 
1028
    /* Loop over the indexes and delete all relevant entries for
 
1029
       this row. We do this by reproducing the key in BlitzDB's
 
1030
       unique key format. The procedure is simple.
 
1031
 
 
1032
       (1): Compute the key value for this index from the row then
 
1033
            pack it into key_buffer (not unique at this point).
 
1034
 
 
1035
       (2): Append the suffix of the held_key to the key generated
 
1036
            in step 1. The key is then guaranteed to be unique. */
 
1037
    for (uint32_t i = 0; i < share->nkeys; i++) {
 
1038
      /* In this case, we don't need to search for the key because
 
1039
         TC's cursor is already pointing at the key that we want
 
1040
         to delete. We wouldn't be here otherwise. */
 
1041
      if (i == active_index) {
 
1042
        btree_cursor[active_index].delete_position();
 
1043
        continue;
 
1044
      }
 
1045
 
 
1046
      int klen = make_index_key(key_buffer, i, row_to_delete);
 
1047
      int skip_len = btree_key_length(held_key, active_index);
 
1048
      uint16_t suffix_len = uint2korr(held_key + skip_len);
 
1049
 
 
1050
      /* Append the suffix to the key */
 
1051
      memcpy(key_buffer + klen, held_key + skip_len,
 
1052
             sizeof(suffix_len) + suffix_len);
 
1053
 
 
1054
      /* Update the key length to cover the generated key. */
 
1055
      klen = klen + sizeof(suffix_len) + suffix_len;
 
1056
 
 
1057
      if (share->btrees[i].delete_key(key_buffer, klen) != 0)
 
1058
        return HA_ERR_KEY_NOT_FOUND;
 
1059
    }
 
1060
 
 
1061
    /* Skip to the data dictionary key. */
 
1062
    int dict_key_offset = btree_key_length(dict_key, active_index);
 
1063
    dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
 
1064
  }
 
1065
 
 
1066
  rv = share->dict.delete_row(dict_key, dict_klen);
 
1067
 
 
1068
  if (share->nkeys > 0)
 
1069
    share->blitz_lock.slotted_unlock(lock_id);
 
1070
 
 
1071
  return rv;
 
1072
}
 
1073
 
 
1074
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
 
1075
                                  uint64_t, uint64_t *first_value,
 
1076
                                  uint64_t *nb_reserved_values) {
 
1077
  *first_value = share->auto_increment_value + 1;
 
1078
  *nb_reserved_values = UINT64_MAX;
 
1079
}
 
1080
 
 
1081
int ha_blitz::reset_auto_increment(uint64_t value) {
 
1082
  share->auto_increment_value = (value == 0) ? 1 : value;
 
1083
  return 0;
 
1084
}
 
1085
 
 
1086
int ha_blitz::delete_all_rows(void) {
 
1087
  for (uint32_t i = 0; i < share->nkeys; i++) {
 
1088
    if (share->btrees[i].delete_all() != 0) {
 
1089
      errkey = i;
 
1090
      return HA_ERR_CRASHED_ON_USAGE;
 
1091
    }
 
1092
  }
 
1093
  return (share->dict.delete_all_rows()) ? 0 : -1;
 
1094
}
 
1095
 
 
1096
uint32_t ha_blitz::max_row_length(void) {
 
1097
  uint32_t length = (getTable()->getRecordLength() + getTable()->sizeFields() * 2);
 
1098
  uint32_t *pos = getTable()->getBlobField();
 
1099
  uint32_t *end = pos + getTable()->sizeBlobFields();
 
1100
 
 
1101
  while (pos != end) {
 
1102
    length += 2 + ((Field_blob *)getTable()->getField(*pos))->get_length();
 
1103
    pos++;
 
1104
  }
 
1105
 
 
1106
  return length;
 
1107
}
 
1108
 
 
1109
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
 
1110
  if (!share->primary_key_exists) {
 
1111
    uint64_t next_id = share->dict.next_hidden_row_id();
 
1112
    int8store(pack_to, next_id);
 
1113
    return sizeof(next_id);
 
1114
  }
 
1115
 
 
1116
  /* Getting here means that there is a PK in this table. Get the
 
1117
     binary representation of the PK, pack it to BlitzDB's key buffer
 
1118
     and return the size of it. */
 
1119
  return make_index_key(pack_to, getTable()->getShare()->getPrimaryKey(), row);
 
1120
}
 
1121
 
 
1122
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
 
1123
                                const unsigned char *row) {
 
1124
  KeyInfo *key = &getTable()->key_info[key_num];
 
1125
  KeyPartInfo *key_part = key->key_part;
 
1126
  KeyPartInfo *key_part_end = key_part + key->key_parts;
 
1127
 
 
1128
  unsigned char *pos = (unsigned char *)pack_to;
 
1129
  unsigned char *end;
 
1130
  int offset = 0;
 
1131
 
 
1132
  memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
 
1133
 
 
1134
  /* Loop through key part(s) and pack them as we go. */
 
1135
  for (; key_part != key_part_end; key_part++) {
 
1136
    if (key_part->null_bit) {
 
1137
      if (row[key_part->null_offset] & key_part->null_bit) {
 
1138
        *pos++ = 0;
 
1139
        continue;
 
1140
      }
 
1141
      *pos++ = 1;
 
1142
    }
 
1143
 
 
1144
    /* Here we normalize VARTEXT1 to VARTEXT2 for simplicity. */
 
1145
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
 
1146
      /* Extract the length of the string from the row. */
 
1147
      uint16_t data_len = *(uint8_t *)(row + key_part->offset);
 
1148
 
 
1149
      /* Copy the length of the string. Use 2 bytes. */
 
1150
      int2store(pos, data_len);
 
1151
      pos += sizeof(data_len);
 
1152
 
 
1153
      /* Copy the string data */
 
1154
      memcpy(pos, row + key_part->offset + sizeof(uint8_t), data_len);
 
1155
      pos += data_len;
 
1156
    } else {
 
1157
      end = key_part->field->pack(pos, row + key_part->offset);
 
1158
      offset = end - pos;
 
1159
      pos += offset;
 
1160
    }
 
1161
  }
 
1162
 
 
1163
  return ((char *)pos - pack_to);
 
1164
}
 
1165
 
 
1166
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
 
1167
                          const size_t b_len, size_t *merged_len) {
 
1168
 
 
1169
  size_t total = a_len + sizeof(uint16_t) + b_len;
 
1170
 
 
1171
  if (total > key_merge_buffer_len) {
 
1172
    key_merge_buffer = (char *)realloc(key_merge_buffer, total);
 
1173
 
 
1174
    if (key_merge_buffer == NULL) {
 
1175
      errno = HA_ERR_OUT_OF_MEM;
 
1176
      return NULL;
 
1177
    }
 
1178
    key_merge_buffer_len = total;
 
1179
  }
 
1180
 
 
1181
  char *pos = key_merge_buffer;
 
1182
 
 
1183
  /* Copy the prefix. */
 
1184
  memcpy(pos, a, a_len);
 
1185
  pos += a_len;
 
1186
 
 
1187
  /* Copy the length of b. */
 
1188
  int2store(pos, (uint16_t)b_len);
 
1189
  pos += sizeof(uint16_t);
 
1190
 
 
1191
  /* Copy the suffix and we're done. */
 
1192
  memcpy(pos, b, b_len);
 
1193
 
 
1194
  *merged_len = total;
 
1195
  return key_merge_buffer;
 
1196
}
 
1197
 
 
1198
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
 
1199
  KeyInfo *key_info = &getTable()->key_info[key_num];
 
1200
  KeyPartInfo *key_part = key_info->key_part;
 
1201
  KeyPartInfo *key_part_end = key_part + key_info->key_parts;
 
1202
  char *pos = (char *)key;
 
1203
  uint64_t len = 0;
 
1204
  size_t rv = 0;
 
1205
 
 
1206
  for (; key_part != key_part_end; key_part++) {
 
1207
    if (key_part->null_bit) {
 
1208
      pos++;
 
1209
      rv++;
 
1210
      if (*key == 0)
 
1211
        continue;
 
1212
    }
 
1213
 
 
1214
    if (key_part->type == HA_KEYTYPE_VARTEXT1 ||
 
1215
        key_part->type == HA_KEYTYPE_VARTEXT2) {
 
1216
      len = uint2korr(pos);
 
1217
      rv += len + sizeof(uint16_t);
 
1218
    } else {
 
1219
      len = key_part->field->key_length();
 
1220
      rv += len;
 
1221
    }
 
1222
    pos += len;
 
1223
    len = 0;
 
1224
  }
 
1225
 
 
1226
  return rv;
 
1227
}
 
1228
 
 
1229
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
 
1230
  memcpy(held_key_buf, key, klen);
 
1231
  held_key = held_key_buf;
 
1232
  held_key_len = klen;
 
1233
}
 
1234
 
 
1235
/* Converts a native Drizzle index key to BlitzDB's format. */
 
1236
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
 
1237
                                    const int key_num, int *return_key_len) {
 
1238
  KeyInfo *key = &getTable()->key_info[key_num];
 
1239
  KeyPartInfo *key_part = key->key_part;
 
1240
  KeyPartInfo *key_part_end = key_part + key->key_parts;
 
1241
 
 
1242
  unsigned char *key_pos = (unsigned char *)native_key;
 
1243
  unsigned char *keybuf_pos = (unsigned char *)key_buffer;
 
1244
  unsigned char *end;
 
1245
  int key_size = 0;
 
1246
  int offset = 0;
 
1247
 
 
1248
  memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
 
1249
 
 
1250
  for (; key_part != key_part_end; key_part++) {
 
1251
    if (key_part->null_bit) {
 
1252
      key_size++;
 
1253
 
 
1254
      /* This key is NULL */
 
1255
      if (!(*keybuf_pos++ = (*key_pos++ == 0)))
 
1256
        continue;
 
1257
    }
 
1258
 
 
1259
    /* Normalize a VARTEXT1 key to VARTEXT2. */
 
1260
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
 
1261
      uint16_t str_len = *(uint16_t *)key_pos;
 
1262
 
 
1263
      /* Copy the length of the string over to key buffer. */
 
1264
      int2store(keybuf_pos, str_len);
 
1265
      keybuf_pos += sizeof(str_len);
 
1266
 
 
1267
      /* Copy the actual value over to the key buffer. */
 
1268
      memcpy(keybuf_pos, key_pos + sizeof(str_len), str_len);
 
1269
      keybuf_pos += str_len;
 
1270
 
 
1271
      /* NULL byte + Length of str (2 byte) + Actual String. */
 
1272
      offset = 1 + sizeof(str_len) + str_len;
 
1273
    } else {
 
1274
      end = key_part->field->pack(keybuf_pos, key_pos);
 
1275
      offset = end - keybuf_pos;
 
1276
      keybuf_pos += offset;
 
1277
    }
 
1278
 
 
1279
    key_size += offset;
 
1280
    key_pos += key_part->field->key_length();
 
1281
  }
 
1282
 
 
1283
  *return_key_len = key_size;
 
1284
  return key_buffer;
 
1285
}
 
1286
 
 
1287
size_t ha_blitz::pack_row(unsigned char *row_buffer,
 
1288
                          unsigned char *row_to_pack) {
 
1289
  unsigned char *pos;
 
1290
 
 
1291
  /* Nothing special to do if the table is fixed length */
 
1292
  if (share->fixed_length_table) {
 
1293
    memcpy(row_buffer, row_to_pack, getTable()->getShare()->getRecordLength());
 
1294
    return (size_t)getTable()->getShare()->getRecordLength();
 
1295
  }
 
1296
 
 
1297
  /* Copy NULL bits */
 
1298
  memcpy(row_buffer, row_to_pack, getTable()->getShare()->null_bytes);
 
1299
  pos = row_buffer + getTable()->getShare()->null_bytes;
 
1300
 
 
1301
  /* Pack each field into the buffer */
 
1302
  for (Field **field = getTable()->getFields(); *field; field++) {
 
1303
    if (!((*field)->is_null()))
 
1304
      pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
 
1305
  }
 
1306
 
 
1307
  return (size_t)(pos - row_buffer);
 
1308
}
 
1309
 
 
1310
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
 
1311
                          const size_t from_len) {
 
1312
  const unsigned char *pos;
 
1313
 
 
1314
  /* Nothing special to do */
 
1315
  if (share->fixed_length_table) {
 
1316
    memcpy(to, from, from_len);
 
1317
    return true;
 
1318
  }
 
1319
 
 
1320
  /* Start by copying NULL bits which is the beginning block
 
1321
     of a Drizzle row. */
 
1322
  pos = (const unsigned char *)from;
 
1323
  memcpy(to, pos, getTable()->getShare()->null_bytes);
 
1324
  pos += getTable()->getShare()->null_bytes;
 
1325
 
 
1326
  /* Unpack all fields in the provided row. */
 
1327
  for (Field **field = getTable()->getFields(); *field; field++) {
 
1328
    if (!((*field)->is_null())) {
 
1329
      pos = (*field)->unpack(to + (*field)->offset(getTable()->getInsertRecord()), pos);
 
1330
    }
 
1331
  }
 
1332
 
 
1333
  return true;
 
1334
}
 
1335
 
 
1336
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
 
1337
  unsigned char *buf = pack_buffer;
 
1338
 
 
1339
  /* This is a shitty case where the row size is larger than 2KB. */
 
1340
  if (size > BLITZ_MAX_ROW_STACK) {
 
1341
    if (size > secondary_row_buffer_size) {
 
1342
      void *new_ptr = realloc(secondary_row_buffer, size);
 
1343
 
 
1344
      if (new_ptr == NULL) {
 
1345
        errno = HA_ERR_OUT_OF_MEM;
 
1346
        return NULL;
 
1347
      }
 
1348
 
 
1349
      secondary_row_buffer_size = size;
 
1350
      secondary_row_buffer = (unsigned char *)new_ptr;
 
1351
    }
 
1352
    buf = secondary_row_buffer;
 
1353
  }
 
1354
  return buf;
 
1355
}
 
1356
 
 
1357
static BlitzEngine *blitz_engine = NULL;
 
1358
 
 
1359
BlitzShare *ha_blitz::get_share(const char *name) {
 
1360
  BlitzShare *share_ptr;
 
1361
  BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
 
1362
  std::string table_path(name);
 
1363
 
 
1364
  pthread_mutex_lock(&blitz_utility_mutex);
 
1365
 
 
1366
  /* Look up the table cache to see if the table resource is available */
 
1367
  share_ptr = bz_engine->getTableShare(table_path);
 
1368
 
 
1369
  if (share_ptr) {
 
1370
    share_ptr->use_count++;
 
1371
    pthread_mutex_unlock(&blitz_utility_mutex);
 
1372
    return share_ptr;
 
1373
  }
 
1374
 
 
1375
  /* Table wasn't cached so create a new table handler */
 
1376
  share_ptr = new BlitzShare();
 
1377
 
 
1378
  /* Prepare the Data Dictionary */
 
1379
  if (share_ptr->dict.startup(table_path.c_str()) != 0) {
 
1380
    delete share_ptr;
 
1381
    pthread_mutex_unlock(&blitz_utility_mutex);
 
1382
    return NULL;
 
1383
  }
 
1384
 
 
1385
  /* Prepare Index Structure(s) */
 
1386
  KeyInfo *curr = &getTable()->getMutableShare()->getKeyInfo(0);
 
1387
  share_ptr->btrees = new BlitzTree[getTable()->getShare()->keys];
 
1388
 
 
1389
  for (uint32_t i = 0; i < getTable()->getShare()->keys; i++, curr++) {
 
1390
    share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
 
1391
    share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
 
1392
 
 
1393
    if (getTable()->key_info[i].flags & HA_NOSAME)
 
1394
      share_ptr->btrees[i].unique = true;
 
1395
 
 
1396
    share_ptr->btrees[i].length = curr->key_length;
 
1397
    share_ptr->btrees[i].nparts = curr->key_parts;
 
1398
 
 
1399
    /* Record Meta Data of the Key Segments */
 
1400
    for (uint32_t j = 0; j < curr->key_parts; j++) {
 
1401
      Field *f = curr->key_part[j].field;
 
1402
 
 
1403
      if (f->null_ptr) {
 
1404
        share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
 
1405
        share_ptr->btrees[i].parts[j].null_pos
 
1406
          = (uint32_t)(f->null_ptr - (unsigned char *)getTable()->getInsertRecord());
 
1407
      }
 
1408
 
 
1409
      share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
 
1410
 
 
1411
      if (f->type() == DRIZZLE_TYPE_BLOB) {
 
1412
        share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
 
1413
      }
 
1414
 
 
1415
      share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
 
1416
      share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
 
1417
      share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
 
1418
    }
 
1419
  }
 
1420
 
 
1421
  /* Set Meta Data */
 
1422
  share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
 
1423
  share_ptr->table_name = table_path;
 
1424
  share_ptr->nkeys = getTable()->getShare()->keys;
 
1425
  share_ptr->use_count = 1;
 
1426
 
 
1427
  share_ptr->fixed_length_table = !(getTable()->getShare()->db_create_options
 
1428
                                    & HA_OPTION_PACK_RECORD);
 
1429
 
 
1430
  if (getTable()->getShare()->getPrimaryKey() >= MAX_KEY)
 
1431
    share_ptr->primary_key_exists = false;
 
1432
  else
 
1433
    share_ptr->primary_key_exists = true;
 
1434
 
 
1435
  /* Done creating the share object. Cache it for later
 
1436
     use by another cursor object.*/
 
1437
  bz_engine->cacheTableShare(table_path, share_ptr);
 
1438
 
 
1439
  pthread_mutex_unlock(&blitz_utility_mutex);
 
1440
  return share_ptr;
 
1441
}
 
1442
 
 
1443
int ha_blitz::free_share(void) {
 
1444
  pthread_mutex_lock(&blitz_utility_mutex);
 
1445
 
 
1446
  /* BlitzShare could still be used by another thread. Check the
 
1447
     reference counter to see if it's safe to free it */
 
1448
  if (--share->use_count == 0) {
 
1449
    share->dict.write_meta_autoinc(share->auto_increment_value);
 
1450
 
 
1451
    if (share->dict.shutdown() != 0) {
 
1452
      pthread_mutex_unlock(&blitz_utility_mutex);
 
1453
      return HA_ERR_CRASHED_ON_USAGE;
 
1454
    }
 
1455
 
 
1456
    for (uint32_t i = 0; i < share->nkeys; i++) {
 
1457
      delete[] share->btrees[i].parts;
 
1458
      share->btrees[i].close();
 
1459
    }
 
1460
 
 
1461
    BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
 
1462
    bz_engine->deleteTableShare(share->table_name);
 
1463
 
 
1464
    delete[] share->btrees;
 
1465
    delete share;
 
1466
  }
 
1467
 
 
1468
  pthread_mutex_unlock(&blitz_utility_mutex);
 
1469
  return 0;
 
1470
}
 
1471
 
 
1472
static int blitz_init(drizzled::module::Context &context) {
 
1473
  blitz_engine = new BlitzEngine("BLITZDB");
 
1474
 
 
1475
  if (!blitz_engine->doCreateTableCache()) {
 
1476
    delete blitz_engine;
 
1477
    return HA_ERR_OUT_OF_MEM;
 
1478
  }
 
1479
 
 
1480
  pthread_mutex_init(&blitz_utility_mutex, NULL);
 
1481
  context.add(blitz_engine);
 
1482
  context.registerVariable(new sys_var_uint64_t_ptr("estimated-rows",
 
1483
                                                    &blitz_estimated_rows));
 
1484
  return 0;
 
1485
}
 
1486
 
 
1487
/* Read the prototype of this function for details. */
 
1488
static char *skip_btree_key(const char *key, const size_t skip_len,
 
1489
                            int *return_klen) {
 
1490
  char *pos = (char *)key;
 
1491
  *return_klen = uint2korr(pos + skip_len);
 
1492
  return pos + skip_len + sizeof(uint16_t);
 
1493
}
 
1494
 
 
1495
static bool str_is_numeric(const std::string &str) {
 
1496
  for (uint32_t i = 0; i < str.length(); i++) {
 
1497
    if (!std::isdigit(str[i]))
 
1498
      return false;
 
1499
  }
 
1500
  return true;
 
1501
}
 
1502
 
 
1503
static void blitz_init_options(drizzled::module::option_context &context)
 
1504
{
 
1505
  context("estimated-rows",
 
1506
          po::value<uint64_t>(&blitz_estimated_rows)->default_value(0),
 
1507
          N_("Estimated number of rows that a BlitzDB table will store."));
 
1508
}
 
1509
 
 
1510
DRIZZLE_PLUGIN(blitz_init, NULL, blitz_init_options);