~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/blitzdb/ha_blitz.cc

Merged vcol stuff.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2009 - 2010 Toru Maesaka
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
#include "config.h"
21
 
 
22
 
#include "ha_blitz.h"
23
 
#include <drizzled/plugin/storage_engine.h>
24
 
 
25
 
using namespace std;
26
 
using namespace drizzled;
27
 
namespace po= boost::program_options;
28
 
 
29
 
static pthread_mutex_t blitz_utility_mutex;
30
 
 
31
 
static const char *ha_blitz_exts[] = {
32
 
  BLITZ_DATA_EXT,
33
 
  BLITZ_INDEX_EXT,
34
 
  BLITZ_SYSTEM_EXT,
35
 
  NULL
36
 
};
37
 
 
38
 
/* Global Variables for Startup Options */
39
 
uint64_t blitz_estimated_rows;
40
 
 
41
 
class BlitzEngine : public drizzled::plugin::StorageEngine {
42
 
private:
43
 
  TCMAP *blitz_table_cache;
44
 
 
45
 
public:
46
 
  BlitzEngine(const std::string &name_arg) :
47
 
    drizzled::plugin::StorageEngine(name_arg,
48
 
                                    drizzled::HTON_NULL_IN_KEY |
49
 
                                    drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
50
 
                                    drizzled::HTON_STATS_RECORDS_IS_EXACT |
51
 
                                    drizzled::HTON_SKIP_STORE_LOCK) {
52
 
    table_definition_ext = BLITZ_SYSTEM_EXT;
53
 
  }
54
 
 
55
 
  virtual ~BlitzEngine() {
56
 
    pthread_mutex_destroy(&blitz_utility_mutex);
57
 
    tcmapdel(blitz_table_cache);
58
 
  }
59
 
 
60
 
  virtual drizzled::Cursor *create(drizzled::Table &table) {
61
 
    return new ha_blitz(*this, table);
62
 
  }
63
 
 
64
 
  const char **bas_ext() const {
65
 
    return ha_blitz_exts;
66
 
  }
67
 
 
68
 
  int doCreateTable(drizzled::Session &session,
69
 
                    drizzled::Table &table_arg,
70
 
                    const drizzled::identifier::Table &identifier,
71
 
                    drizzled::message::Table &table_proto);
72
 
 
73
 
  int doRenameTable(drizzled::Session &session,
74
 
                    const drizzled::identifier::Table &from_identifier,
75
 
                    const drizzled::identifier::Table &to_identifier);
76
 
 
77
 
  int doDropTable(drizzled::Session &session,
78
 
                  const drizzled::identifier::Table &identifier);
79
 
 
80
 
  int doGetTableDefinition(drizzled::Session &session,
81
 
                           const drizzled::identifier::Table &identifier,
82
 
                           drizzled::message::Table &table_proto);
83
 
 
84
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
85
 
                             const drizzled::identifier::Schema &schema_identifier,
86
 
                             drizzled::identifier::Table::vector &set_of_identifiers);
87
 
 
88
 
  bool doDoesTableExist(drizzled::Session &session,
89
 
                        const drizzled::identifier::Table &identifier);
90
 
 
91
 
  bool validateCreateTableOption(const std::string &key,
92
 
                                 const std::string &state);
93
 
 
94
 
  bool doCreateTableCache(void);
95
 
 
96
 
  BlitzShare *getTableShare(const std::string &name);
97
 
  void cacheTableShare(const std::string &name, BlitzShare *share);
98
 
  void deleteTableShare(const std::string &name);
99
 
 
100
 
  uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
101
 
  uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
102
 
  uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
103
 
 
104
 
  uint32_t index_flags(enum drizzled::ha_key_alg) const {
105
 
    return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
106
 
            HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
107
 
  }
108
 
};
109
 
 
110
 
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
111
 
   a key to that row in the data dictionary. Two keys are merged and
112
 
   stored as a key because we want to avoid reading the leaf node and
113
 
   thus save disk IO and some computation in the tree. Note that the
114
 
   comparison function of BlitzDB's btree only takes into accound the
115
 
   actual index key. See blitzcmp.cc for details.
116
 
 
117
 
   With the above in mind, this helper function returns a pointer to
118
 
   the dictionary key by calculating the offset. */
119
 
static char *skip_btree_key(const char *key, const size_t skip_len,
120
 
                            int *return_klen);
121
 
 
122
 
static bool str_is_numeric(const std::string &str);
123
 
 
124
 
int BlitzEngine::doCreateTable(drizzled::Session &,
125
 
                               drizzled::Table &table,
126
 
                               const drizzled::identifier::Table &identifier,
127
 
                               drizzled::message::Table &proto) {
128
 
  BlitzData dict;
129
 
  BlitzTree btree;
130
 
  int ecode;
131
 
 
132
 
  /* Temporary fix for blocking composite keys. We need to add this
133
 
     check because version 1 doesn't handle composite indexes. */
134
 
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
135
 
    if (table.key_info[i].key_parts > 1)
136
 
      return HA_ERR_UNSUPPORTED;
137
 
  }
138
 
 
139
 
  /* Create relevant files for a new table and close them immediately.
140
 
     All we want to do here is somewhat like UNIX touch(1). */
141
 
  if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
142
 
    return ecode;
143
 
 
144
 
  if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
145
 
    return ecode;
146
 
 
147
 
  /* Create b+tree index(es) for this table. */
148
 
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
149
 
    if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
150
 
      return ecode;
151
 
  }
152
 
 
153
 
  /* Write the table definition to system table. */
154
 
  if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
155
 
    return ecode;
156
 
 
157
 
  if (!dict.write_table_definition(proto)) {
158
 
    dict.close_system_table();
159
 
    return HA_ERR_CRASHED_ON_USAGE;
160
 
  }
161
 
 
162
 
  dict.close_system_table();
163
 
  return 0;
164
 
}
165
 
 
166
 
int BlitzEngine::doRenameTable(drizzled::Session &,
167
 
                               const drizzled::identifier::Table &from,
168
 
                               const drizzled::identifier::Table &to) {
169
 
  int rv = 0;
170
 
 
171
 
  BlitzData blitz_table;
172
 
  uint32_t nkeys;
173
 
 
174
 
  BlitzData dict;
175
 
  int ecode;
176
 
  /* Write the table definition to system table. */
177
 
  if ((ecode = dict.open_system_table(from.getPath(), HDBOWRITER)) != 0)
178
 
    return ecode;
179
 
 
180
 
  drizzled::message::Table proto;
181
 
  char *proto_string;
182
 
  int proto_string_len;
183
 
 
184
 
  proto_string = dict.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
185
 
                                       BLITZ_TABLE_PROTO_KEY.length(),
186
 
                                       &proto_string_len);
187
 
 
188
 
  if (proto_string == NULL) {
189
 
    return ENOMEM;
190
 
  }
191
 
 
192
 
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
193
 
    free(proto_string);
194
 
    return HA_ERR_CRASHED_ON_USAGE;
195
 
  }
196
 
 
197
 
  free(proto_string);
198
 
 
199
 
  proto.set_name(to.getTableName());
200
 
  proto.set_schema(to.getSchemaName());
201
 
  proto.set_catalog(to.getCatalogName());
202
 
 
203
 
  if (!dict.write_table_definition(proto)) {
204
 
    dict.close_system_table();
205
 
    return HA_ERR_CRASHED_ON_USAGE;
206
 
  }
207
 
 
208
 
  dict.close_system_table();
209
 
 
210
 
  /* Find out the number of indexes in this table. This information
211
 
     is required because BlitzDB creates a file for each indexes.*/
212
 
  if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
213
 
    return HA_ERR_CRASHED_ON_USAGE;
214
 
 
215
 
  nkeys = blitz_table.read_meta_keycount();
216
 
 
217
 
  if (blitz_table.close_data_table() != 0)
218
 
    return HA_ERR_CRASHED_ON_USAGE;
219
 
 
220
 
  /* We're now ready to rename the file(s) for this table. Start by
221
 
     attempting to rename the data and system files. */
222
 
  if (rename_file_ext(from.getPath().c_str(),
223
 
                      to.getPath().c_str(), BLITZ_DATA_EXT)) {
224
 
    if ((rv = errno) != ENOENT)
225
 
      return rv;
226
 
  }
227
 
 
228
 
  if (rename_file_ext(from.getPath().c_str(),
229
 
                      to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
230
 
    if ((rv = errno) != ENOENT)
231
 
      return rv;
232
 
  }
233
 
 
234
 
  /* So far so good. Rename the index file(s) and we're done. */
235
 
  BlitzTree btree;
236
 
 
237
 
  for (uint32_t i = 0; i < nkeys; i++) {
238
 
    if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
239
 
      return HA_ERR_CRASHED_ON_USAGE;
240
 
  }
241
 
 
242
 
  return rv;
243
 
}
244
 
 
245
 
int BlitzEngine::doDropTable(drizzled::Session &,
246
 
                             const drizzled::identifier::Table &identifier) {
247
 
  BlitzData dict;
248
 
  BlitzTree btree;
249
 
  char buf[FN_REFLEN];
250
 
  uint32_t nkeys;
251
 
  int err;
252
 
 
253
 
  /* We open the dictionary to extract meta data from it */
254
 
  if ((err = dict.open_data_table(identifier.getPath().c_str(),
255
 
                                  HDBOREADER)) != 0) {
256
 
    return err;
257
 
  }
258
 
 
259
 
  nkeys = dict.read_meta_keycount();
260
 
 
261
 
  /* We no longer need the dictionary to be open */
262
 
  dict.close_data_table();
263
 
 
264
 
  /* Drop the Data Dictionary */
265
 
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
266
 
  if ((err = unlink(buf)) == -1) {
267
 
    return err;
268
 
  }
269
 
 
270
 
  /* Drop the System Table */
271
 
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
272
 
  if ((err = unlink(buf)) == -1) {
273
 
    return err;
274
 
  }
275
 
 
276
 
  /* Drop Index file(s) */
277
 
  for (uint32_t i = 0; i < nkeys; i++) {
278
 
    if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
279
 
      return err;
280
 
    }
281
 
  }
282
 
 
283
 
  return 0;
284
 
}
285
 
 
286
 
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
287
 
                                      const drizzled::identifier::Table &identifier,
288
 
                                      drizzled::message::Table &proto) {
289
 
  struct stat stat_info;
290
 
  std::string path(identifier.getPath());
291
 
 
292
 
  path.append(BLITZ_SYSTEM_EXT);
293
 
 
294
 
  if (stat(path.c_str(), &stat_info)) {
295
 
    return errno;
296
 
  }
297
 
 
298
 
  BlitzData db;
299
 
  char *proto_string;
300
 
  int proto_string_len;
301
 
 
302
 
  if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
303
 
    return HA_ERR_CRASHED_ON_USAGE;
304
 
  }
305
 
 
306
 
  proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
307
 
                                     BLITZ_TABLE_PROTO_KEY.length(),
308
 
                                     &proto_string_len);
309
 
 
310
 
  if (db.close_system_table() != 0) {
311
 
    return HA_ERR_CRASHED_ON_USAGE;
312
 
  }
313
 
 
314
 
  if (proto_string == NULL) {
315
 
    return ENOMEM;
316
 
  }
317
 
 
318
 
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
319
 
    free(proto_string);
320
 
    return HA_ERR_CRASHED_ON_USAGE;
321
 
  }
322
 
 
323
 
  free(proto_string);
324
 
 
325
 
  return EEXIST;
326
 
}
327
 
 
328
 
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
329
 
                                        const drizzled::identifier::Schema &schema_id,
330
 
                                        drizzled::identifier::Table::vector &ids) {
331
 
  drizzled::CachedDirectory::Entries entries = directory.getEntries();
332
 
 
333
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
334
 
       entry_iter != entries.end(); ++entry_iter) {
335
 
 
336
 
    drizzled::CachedDirectory::Entry *entry = *entry_iter;
337
 
    const std::string *filename = &entry->filename;
338
 
 
339
 
    assert(filename->size());
340
 
 
341
 
    const char *ext = strchr(filename->c_str(), '.');
342
 
 
343
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
344
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
345
 
    } else {
346
 
      char uname[NAME_LEN + 1];
347
 
      uint32_t file_name_len;
348
 
 
349
 
      file_name_len = identifier::Table::filename_to_tablename(filename->c_str(),
350
 
                                                             uname,
351
 
                                                             sizeof(uname));
352
 
 
353
 
      uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
354
 
      ids.push_back(identifier::Table(schema_id, uname));
355
 
    }
356
 
  }
357
 
}
358
 
 
359
 
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
360
 
                                   const drizzled::identifier::Table &identifier) {
361
 
  std::string proto_path(identifier.getPath());
362
 
  proto_path.append(BLITZ_DATA_EXT);
363
 
 
364
 
  return (access(proto_path.c_str(), F_OK)) ? false : true;
365
 
}
366
 
 
367
 
bool BlitzEngine::validateCreateTableOption(const std::string &key,
368
 
                                            const std::string &state) {
369
 
  if (key == "ESTIMATED_ROWS" || key == "estimated_rows") {
370
 
    if (str_is_numeric(state))
371
 
      return true;
372
 
  }
373
 
  return false;
374
 
}
375
 
 
376
 
bool BlitzEngine::doCreateTableCache(void) {
377
 
  return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
378
 
}
379
 
 
380
 
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
381
 
  int vlen;
382
 
  const void *fetched;
383
 
  BlitzShare *rv = NULL;
384
 
 
385
 
  fetched = tcmapget(blitz_table_cache, table_name.c_str(),
386
 
                     table_name.length(), &vlen);
387
 
 
388
 
  /* dereference the object */
389
 
  if (fetched)
390
 
    rv = *(BlitzShare **)fetched;
391
 
 
392
 
  return rv;
393
 
}
394
 
 
395
 
void BlitzEngine::cacheTableShare(const std::string &table_name,
396
 
                                  BlitzShare *share) {
397
 
  /* Cache the memory address of the share object */
398
 
  tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
399
 
           &share, sizeof(share));
400
 
}
401
 
 
402
 
void BlitzEngine::deleteTableShare(const std::string &table_name) {
403
 
  tcmapout2(blitz_table_cache, table_name.c_str());
404
 
}
405
 
 
406
 
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
407
 
                   Table &table_arg) : Cursor(engine_arg, table_arg),
408
 
                                            btree_cursor(NULL),
409
 
                                            table_scan(false),
410
 
                                            table_based(false),
411
 
                                            thread_locked(false),
412
 
                                            held_key(NULL),
413
 
                                            held_key_len(0),
414
 
                                            current_key(NULL),
415
 
                                            current_key_len(0),
416
 
                                            key_buffer(NULL),
417
 
                                            errkey_id(0) {}
418
 
 
419
 
int ha_blitz::open(const char *table_name, int, uint32_t) {
420
 
  if ((share = get_share(table_name)) == NULL)
421
 
    return HA_ERR_CRASHED_ON_USAGE;
422
 
 
423
 
  pthread_mutex_lock(&blitz_utility_mutex);
424
 
 
425
 
  btree_cursor = new BlitzCursor[share->nkeys];
426
 
 
427
 
  for (uint32_t i = 0; i < share->nkeys; i++) {
428
 
    if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
429
 
      free_share();
430
 
      pthread_mutex_unlock(&blitz_utility_mutex);
431
 
      return HA_ERR_OUT_OF_MEM;
432
 
    }
433
 
  }
434
 
 
435
 
  if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
436
 
    free_share();
437
 
    pthread_mutex_unlock(&blitz_utility_mutex);
438
 
    return HA_ERR_OUT_OF_MEM;
439
 
  }
440
 
 
441
 
  if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
442
 
    free_share();
443
 
    pthread_mutex_unlock(&blitz_utility_mutex);
444
 
    return HA_ERR_OUT_OF_MEM;
445
 
  }
446
 
 
447
 
  if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
448
 
    free_share();
449
 
    free(key_buffer);
450
 
    free(key_merge_buffer);
451
 
    pthread_mutex_unlock(&blitz_utility_mutex);
452
 
    return HA_ERR_OUT_OF_MEM;
453
 
  }
454
 
 
455
 
  secondary_row_buffer = NULL;
456
 
  secondary_row_buffer_size = 0;
457
 
  key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
458
 
 
459
 
  /* 'ref_length' determines the size of the buffer that the kernel
460
 
     will use to uniquely identify a row. The actual allocation is
461
 
     done by the kernel so all we do here is specify the size of it.*/
462
 
  if (share->primary_key_exists) {
463
 
    ref_length = getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
464
 
  } else {
465
 
    ref_length = sizeof(held_key_len) + sizeof(uint64_t);
466
 
  }
467
 
 
468
 
  pthread_mutex_unlock(&blitz_utility_mutex);
469
 
  return 0;
470
 
}
471
 
 
472
 
int ha_blitz::close(void) {
473
 
  for (uint32_t i = 0; i < share->nkeys; i++) {
474
 
    share->btrees[i].destroy_cursor(&btree_cursor[i]);
475
 
  }
476
 
  delete [] btree_cursor;
477
 
 
478
 
  free(key_buffer);
479
 
  free(key_merge_buffer);
480
 
  free(held_key_buf);
481
 
  free(secondary_row_buffer);
482
 
  return free_share();
483
 
}
484
 
 
485
 
int ha_blitz::info(uint32_t flag) {
486
 
  if (flag & HA_STATUS_VARIABLE) {
487
 
    stats.records = share->dict.nrecords();
488
 
    stats.data_file_length = share->dict.table_size();
489
 
  }
490
 
 
491
 
  if (flag & HA_STATUS_AUTO)
492
 
    stats.auto_increment_value = share->auto_increment_value + 1;
493
 
 
494
 
  if (flag & HA_STATUS_ERRKEY)
495
 
    errkey = errkey_id;
496
 
 
497
 
  return 0;
498
 
}
499
 
 
500
 
int ha_blitz::doStartTableScan(bool scan) {
501
 
  /* Obtain the query type for this scan */
502
 
  sql_command_type = getTable()->getSession()->getSqlCommand();
503
 
  table_scan = scan;
504
 
  table_based = true;
505
 
 
506
 
  /* Obtain the most suitable lock for the given statement type. */
507
 
  blitz_optimal_lock();
508
 
 
509
 
  /* Get the first record from TCHDB. Let the scanner take
510
 
     care of checking return value errors. */
511
 
  if (table_scan) {
512
 
    current_key = share->dict.next_key_and_row(NULL, 0,
513
 
                                               &current_key_len,
514
 
                                               &current_row,
515
 
                                               &current_row_len);
516
 
  }
517
 
  return 0;
518
 
}
519
 
 
520
 
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
521
 
  char *next_key;
522
 
  const char *next_row;
523
 
  int next_row_len;
524
 
  int next_key_len;
525
 
 
526
 
  free(held_key);
527
 
  held_key = NULL;
528
 
 
529
 
  if (current_key == NULL) {
530
 
    getTable()->status = STATUS_NOT_FOUND;
531
 
    return HA_ERR_END_OF_FILE;
532
 
  }
533
 
 
534
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
535
 
 
536
 
  /* Unpack and copy the current row to Drizzle's result buffer. */
537
 
  unpack_row(drizzle_buf, current_row, current_row_len);
538
 
 
539
 
  /* Retrieve both key and row of the next record with one allocation. */
540
 
  next_key = share->dict.next_key_and_row(current_key, current_key_len,
541
 
                                          &next_key_len, &next_row,
542
 
                                          &next_row_len);
543
 
 
544
 
  /* Memory region for "current_row" will be freed as "held key" on
545
 
     the next iteration. This is because "current_key" points to the
546
 
     region of memory that contains "current_row" and "held_key" points
547
 
     to it. If there isn't another iteration then it is freed in doEndTableScan(). */
548
 
  current_row = next_row;
549
 
  current_row_len = next_row_len;
550
 
 
551
 
  /* Remember the current row because delete, update or replace
552
 
     function could be called after this function. This pointer is
553
 
     also used to free the previous key and row, which resides on
554
 
     the same buffer. */
555
 
  held_key = current_key;
556
 
  held_key_len = current_key_len;
557
 
 
558
 
  /* It is now memory-leak-safe to point current_key to next_key. */
559
 
  current_key = next_key;
560
 
  current_key_len = next_key_len;
561
 
  getTable()->status = 0;
562
 
  return 0;
563
 
}
564
 
 
565
 
int ha_blitz::doEndTableScan() {
566
 
  if (table_scan && current_key)
567
 
    free(current_key);
568
 
  if (table_scan && held_key)
569
 
    free(held_key);
570
 
 
571
 
  current_key = NULL;
572
 
  held_key = NULL;
573
 
  current_key_len = 0;
574
 
  held_key_len = 0;
575
 
  table_scan = false;
576
 
  table_based = false;
577
 
 
578
 
  if (thread_locked)
579
 
    blitz_optimal_unlock();
580
 
 
581
 
  return 0;
582
 
}
583
 
 
584
 
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
585
 
  char *row;
586
 
  char *key = NULL;
587
 
  int key_len, row_len;
588
 
 
589
 
  memcpy(&key_len, pos, sizeof(key_len));
590
 
  key = (char *)(pos + sizeof(key_len));
591
 
 
592
 
  /* TODO: Find a better error type. */
593
 
  if (key == NULL)
594
 
    return HA_ERR_KEY_NOT_FOUND;
595
 
 
596
 
  row = share->dict.get_row(key, key_len, &row_len);
597
 
 
598
 
  if (row == NULL)
599
 
    return HA_ERR_KEY_NOT_FOUND;
600
 
 
601
 
  unpack_row(copy_to, row, row_len);
602
 
 
603
 
  /* Remember the key location on memory if the thread is not doing
604
 
     a table scan. This is because either update_row() or delete_row()
605
 
     might be called after this function. */
606
 
  if (!table_scan) {
607
 
    held_key = key;
608
 
    held_key_len = key_len;
609
 
  }
610
 
 
611
 
  free(row);
612
 
  return 0;
613
 
}
614
 
 
615
 
void ha_blitz::position(const unsigned char *) {
616
 
  int length = sizeof(held_key_len);
617
 
  memcpy(ref, &held_key_len, length);
618
 
  memcpy(ref + length, (unsigned char *)held_key, held_key_len);
619
 
}
620
 
 
621
 
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
622
 
  return "BTREE";
623
 
}
624
 
 
625
 
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
626
 
  active_index = key_num;
627
 
  sql_command_type = getTable()->getSession()->getSqlCommand();
628
 
 
629
 
  /* This is unlikely to happen but just for assurance, re-obtain
630
 
     the lock if this thread already has a certain lock. This makes
631
 
     sure that this thread will get the most appropriate lock for
632
 
     the current statement. */
633
 
  if (thread_locked)
634
 
    blitz_optimal_unlock();
635
 
 
636
 
  blitz_optimal_lock();
637
 
  return 0;
638
 
}
639
 
 
640
 
int ha_blitz::index_first(unsigned char *buf) {
641
 
  char *dict_key, *bt_key, *row;
642
 
  int dict_klen, bt_klen, prefix_len, rlen;
643
 
 
644
 
  bt_key = btree_cursor[active_index].first_key(&bt_klen);
645
 
 
646
 
  if (bt_key == NULL)
647
 
    return HA_ERR_END_OF_FILE;
648
 
 
649
 
  prefix_len = btree_key_length(bt_key, active_index);
650
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
651
 
 
652
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
653
 
    free(bt_key);
654
 
    return HA_ERR_KEY_NOT_FOUND;
655
 
  }
656
 
 
657
 
  unpack_row(buf, row, rlen);
658
 
  keep_track_of_key(bt_key, bt_klen);
659
 
 
660
 
  free(bt_key);
661
 
  free(row);
662
 
  return 0;
663
 
}
664
 
 
665
 
int ha_blitz::index_next(unsigned char *buf) {
666
 
  char *dict_key, *bt_key, *row;
667
 
  int dict_klen, bt_klen, prefix_len, rlen;
668
 
 
669
 
  bt_key = btree_cursor[active_index].next_key(&bt_klen);
670
 
 
671
 
  if (bt_key == NULL) {
672
 
    getTable()->status = STATUS_NOT_FOUND;
673
 
    return HA_ERR_END_OF_FILE;
674
 
  }
675
 
 
676
 
  prefix_len = btree_key_length(bt_key, active_index);
677
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
678
 
 
679
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
680
 
    free(bt_key);
681
 
    getTable()->status = STATUS_NOT_FOUND;
682
 
    return HA_ERR_KEY_NOT_FOUND;
683
 
  }
684
 
 
685
 
  unpack_row(buf, row, rlen);
686
 
  keep_track_of_key(bt_key, bt_klen);
687
 
 
688
 
  free(bt_key);
689
 
  free(row);
690
 
  return 0;
691
 
}
692
 
 
693
 
int ha_blitz::index_prev(unsigned char *buf) {
694
 
  char *dict_key, *bt_key, *row;
695
 
  int dict_klen, bt_klen, prefix_len, rlen;
696
 
 
697
 
  bt_key = btree_cursor[active_index].prev_key(&bt_klen);
698
 
 
699
 
  if (bt_key == NULL)
700
 
    return HA_ERR_END_OF_FILE;
701
 
 
702
 
  prefix_len = btree_key_length(bt_key, active_index);
703
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
704
 
 
705
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
706
 
    free(bt_key);
707
 
    return HA_ERR_KEY_NOT_FOUND;
708
 
  }
709
 
 
710
 
  unpack_row(buf, row, rlen);
711
 
  keep_track_of_key(bt_key, bt_klen);
712
 
 
713
 
  free(bt_key);
714
 
  free(row);
715
 
  return 0;
716
 
}
717
 
 
718
 
int ha_blitz::index_last(unsigned char *buf) {
719
 
  char *dict_key, *bt_key, *row;
720
 
  int dict_klen, bt_klen, prefix_len, rlen;
721
 
 
722
 
  bt_key = btree_cursor[active_index].final_key(&bt_klen);
723
 
 
724
 
  if (bt_key == NULL)
725
 
    return HA_ERR_KEY_NOT_FOUND;
726
 
 
727
 
  prefix_len = btree_key_length(bt_key, active_index);
728
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
729
 
 
730
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
731
 
    free(bt_key);
732
 
    errkey_id = active_index;
733
 
    return HA_ERR_KEY_NOT_FOUND;
734
 
  }
735
 
 
736
 
  unpack_row(buf, row, rlen);
737
 
  keep_track_of_key(bt_key, bt_klen);
738
 
 
739
 
  free(bt_key);
740
 
  free(row);
741
 
  return 0;
742
 
}
743
 
 
744
 
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
745
 
                         uint32_t key_len, enum ha_rkey_function find_flag) {
746
 
  return index_read_idx(buf, active_index, key, key_len, find_flag);
747
 
}
748
 
 
749
 
/* This is where the read related index logic lives. It is used by both
750
 
   BlitzDB and the Database Kernel (specifically, by the optimizer). */
751
 
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
752
 
                             const unsigned char *key, uint32_t,
753
 
                             enum ha_rkey_function search_mode) {
754
 
 
755
 
  /* If the provided key is NULL, we are required to return the first
756
 
     row in the active_index. */
757
 
  if (key == NULL)
758
 
    return this->index_first(buf);
759
 
 
760
 
  /* Otherwise we search for it. Prepare the key to look up the tree. */
761
 
  int packed_klen;
762
 
  char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
763
 
 
764
 
  /* Lookup the tree and get the master key. */
765
 
  int unique_klen;
766
 
  char *unique_key;
767
 
 
768
 
  unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
769
 
                                              packed_klen, &unique_klen);
770
 
 
771
 
  if (unique_key == NULL) {
772
 
    errkey_id = key_num;
773
 
    return HA_ERR_KEY_NOT_FOUND;
774
 
  }
775
 
 
776
 
  /* Got the master key. Prepare it to lookup the data dictionary. */
777
 
  int dict_klen;
778
 
  int skip_len = btree_key_length(unique_key, key_num);
779
 
  char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
780
 
 
781
 
  /* Fetch the packed row from the data dictionary. */
782
 
  int row_len;
783
 
  char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
784
 
 
785
 
  if (fetched_row == NULL) {
786
 
    errkey_id = key_num;
787
 
    free(unique_key);
788
 
    return HA_ERR_KEY_NOT_FOUND;
789
 
  }
790
 
 
791
 
  /* Unpack it into Drizzle's return buffer and keep track of the
792
 
     master key for future use (before index_end() is called). */
793
 
  unpack_row(buf, fetched_row, row_len);
794
 
  keep_track_of_key(unique_key, unique_klen);
795
 
 
796
 
  free(unique_key);
797
 
  free(fetched_row);
798
 
  return 0;
799
 
}
800
 
 
801
 
int ha_blitz::doEndIndexScan(void) {
802
 
  held_key = NULL;
803
 
  held_key_len = 0;
804
 
 
805
 
  btree_cursor[active_index].moved = false;
806
 
 
807
 
  if (thread_locked)
808
 
    blitz_optimal_unlock();
809
 
 
810
 
  return 0;
811
 
}
812
 
 
813
 
int ha_blitz::enable_indexes(uint32_t) {
814
 
  return HA_ERR_UNSUPPORTED;
815
 
}
816
 
 
817
 
int ha_blitz::disable_indexes(uint32_t) {
818
 
  return HA_ERR_UNSUPPORTED;
819
 
}
820
 
 
821
 
/* Find the estimated number of rows between min_key and max_key.
822
 
   Leave the proper implementation of this for now since there are
823
 
   too many exceptions to cover. */
824
 
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
825
 
                                   drizzled::key_range * /*min_key*/,
826
 
                                   drizzled::key_range * /*max_key*/) {
827
 
  return BLITZ_WORST_CASE_RANGE;
828
 
}
829
 
 
830
 
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
831
 
  int rv;
832
 
 
833
 
  ha_statistic_increment(&system_status_var::ha_write_count);
834
 
 
835
 
  /* Prepare Auto Increment field if one exists. */
836
 
  if (getTable()->next_number_field && drizzle_row == getTable()->getInsertRecord()) {
837
 
    pthread_mutex_lock(&blitz_utility_mutex);
838
 
    if ((rv = update_auto_increment()) != 0) {
839
 
      pthread_mutex_unlock(&blitz_utility_mutex);
840
 
      return rv;
841
 
    }
842
 
 
843
 
    uint64_t next_val = getTable()->next_number_field->val_int();
844
 
 
845
 
    if (next_val > share->auto_increment_value) {
846
 
      share->auto_increment_value = next_val;
847
 
      stats.auto_increment_value = share->auto_increment_value + 1;
848
 
    }
849
 
    pthread_mutex_unlock(&blitz_utility_mutex);
850
 
  }
851
 
 
852
 
  /* Serialize a primary key for this row. If a PK doesn't exist,
853
 
     an internal hidden ID will be generated. We obtain the PK here
854
 
     and pack it to this function's local buffer instead of the
855
 
     thread's own 'key_buffer' because the PK value needs to be
856
 
     remembered when writing non-PK keys AND because the 'key_buffer'
857
 
     will be used to generate these non-PK keys. */
858
 
  char temp_pkbuf[BLITZ_MAX_KEY_LEN];
859
 
  size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
860
 
 
861
 
  /* Obtain a buffer that can accommodate this row. We then pack
862
 
     the provided row into it. Note that this code works most
863
 
     efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
864
 
  unsigned char *row_buf = get_pack_buffer(max_row_length());
865
 
  size_t row_len = pack_row(row_buf, drizzle_row);
866
 
 
867
 
  uint32_t curr_key = 0;
868
 
  uint32_t lock_id = 0;
869
 
 
870
 
  if (share->nkeys > 0) {
871
 
    lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
872
 
    share->blitz_lock.slotted_lock(lock_id);
873
 
  }
874
 
 
875
 
  /* We isolate this condition outside the key loop to avoid the CPU
876
 
     from going through unnecessary conditional branching on heavy
877
 
     insertion load. TODO: Optimize this block. PK should not need
878
 
     to go through merge_key() since this information is redundant. */
879
 
  if (share->primary_key_exists) {
880
 
    char *key = NULL;
881
 
    size_t klen = 0;
882
 
 
883
 
    key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
884
 
 
885
 
    rv = share->btrees[curr_key].write_unique(key, klen);
886
 
 
887
 
    if (rv == HA_ERR_FOUND_DUPP_KEY) {
888
 
      errkey_id = curr_key;
889
 
      share->blitz_lock.slotted_unlock(lock_id);
890
 
      return rv;
891
 
    }
892
 
    curr_key = 1;
893
 
  }
894
 
 
895
 
  /* Loop over the keys and write them to it's exclusive tree. */
896
 
  while (curr_key < share->nkeys) {
897
 
    char *key = NULL;
898
 
    size_t prefix_len = 0;
899
 
    size_t klen = 0;
900
 
 
901
 
    prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
902
 
    key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
903
 
 
904
 
    if (share->btrees[curr_key].unique) {
905
 
      rv = share->btrees[curr_key].write_unique(key, klen);
906
 
    } else {
907
 
      rv = share->btrees[curr_key].write(key, klen);
908
 
    }
909
 
 
910
 
    if (rv != 0) {
911
 
      errkey_id = curr_key;
912
 
      share->blitz_lock.slotted_unlock(lock_id);
913
 
      return rv;
914
 
    }
915
 
 
916
 
    curr_key++;
917
 
  }
918
 
 
919
 
  /* Write the row to the Data Dictionary. */
920
 
  rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
921
 
 
922
 
  if (share->nkeys > 0)
923
 
    share->blitz_lock.slotted_unlock(lock_id);
924
 
 
925
 
  return rv;
926
 
}
927
 
 
928
 
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
929
 
                             unsigned char *new_row) {
930
 
  int rv;
931
 
  uint32_t lock_id = 0;
932
 
 
933
 
  ha_statistic_increment(&system_status_var::ha_update_count);
934
 
 
935
 
  /* Handle Indexes */
936
 
  if (share->nkeys > 0) {
937
 
    /* BlitzDB cannot update an indexed row on table scan. */
938
 
    if (table_scan)
939
 
      return HA_ERR_UNSUPPORTED;
940
 
 
941
 
    if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
942
 
      return rv;
943
 
 
944
 
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
945
 
    share->blitz_lock.slotted_lock(lock_id);
946
 
 
947
 
    /* Update all relevant index entries. Start by deleting the
948
 
       the existing key then write the new key. Something we should
949
 
       consider in the future is to take a diff of the keys and only
950
 
       update changed keys. */
951
 
    int skip = btree_key_length(held_key, active_index);
952
 
    char *suffix = held_key + skip;
953
 
    uint16_t suffix_len = uint2korr(suffix);
954
 
 
955
 
    suffix += sizeof(suffix_len);
956
 
 
957
 
    for (uint32_t i = 0; i < share->nkeys; i++) {
958
 
      char *key;
959
 
      size_t prefix_len, klen;
960
 
 
961
 
      klen = 0;
962
 
      prefix_len = make_index_key(key_buffer, i, old_row);
963
 
      key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
964
 
 
965
 
      if (share->btrees[i].delete_key(key, klen) != 0) {
966
 
        errkey_id = i;
967
 
        share->blitz_lock.slotted_unlock(lock_id);
968
 
        return HA_ERR_KEY_NOT_FOUND;
969
 
      }
970
 
 
971
 
      /* Now write the new key. */
972
 
      prefix_len = make_index_key(key_buffer, i, new_row);
973
 
 
974
 
      if (i == getTable()->getShare()->getPrimaryKey()) {
975
 
        key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
976
 
        rv = share->btrees[i].write(key, klen);
977
 
      } else {
978
 
        key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
979
 
        rv = share->btrees[i].write(key, klen);
980
 
      }
981
 
 
982
 
      if (rv != 0) {
983
 
        errkey_id = i;
984
 
        share->blitz_lock.slotted_unlock(lock_id);
985
 
        return rv;
986
 
      }
987
 
    }
988
 
  }
989
 
 
990
 
  /* Getting this far means that the index has been successfully
991
 
     updated. We now update the Data Dictionary. This implementation
992
 
     is admittedly far from optimial and will be revisited. */
993
 
  size_t row_len = max_row_length();
994
 
  unsigned char *row_buf = get_pack_buffer(row_len);
995
 
  row_len = pack_row(row_buf, new_row);
996
 
 
997
 
  /* This is a basic case where we can simply overwrite the key. */
998
 
  if (table_based) {
999
 
    rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
1000
 
  } else {
1001
 
    int klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), old_row);
1002
 
 
1003
 
    /* Delete with the old key. */
1004
 
    share->dict.delete_row(key_buffer, klen);
1005
 
 
1006
 
    /* Write with the new key. */
1007
 
    klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), new_row);
1008
 
    rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
1009
 
  }
1010
 
 
1011
 
  if (share->nkeys > 0)
1012
 
    share->blitz_lock.slotted_unlock(lock_id);
1013
 
 
1014
 
  return rv;
1015
 
}
1016
 
 
1017
 
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
1018
 
  int rv;
1019
 
 
1020
 
  ha_statistic_increment(&system_status_var::ha_delete_count);
1021
 
 
1022
 
  char *dict_key = held_key;
1023
 
  int dict_klen = held_key_len;
1024
 
  uint32_t lock_id = 0;
1025
 
 
1026
 
  if (share->nkeys > 0) {
1027
 
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
1028
 
    share->blitz_lock.slotted_lock(lock_id);
1029
 
 
1030
 
    /* Loop over the indexes and delete all relevant entries for
1031
 
       this row. We do this by reproducing the key in BlitzDB's
1032
 
       unique key format. The procedure is simple.
1033
 
 
1034
 
       (1): Compute the key value for this index from the row then
1035
 
            pack it into key_buffer (not unique at this point).
1036
 
 
1037
 
       (2): Append the suffix of the held_key to the key generated
1038
 
            in step 1. The key is then guaranteed to be unique. */
1039
 
    for (uint32_t i = 0; i < share->nkeys; i++) {
1040
 
      /* In this case, we don't need to search for the key because
1041
 
         TC's cursor is already pointing at the key that we want
1042
 
         to delete. We wouldn't be here otherwise. */
1043
 
      if (i == active_index) {
1044
 
        btree_cursor[active_index].delete_position();
1045
 
        continue;
1046
 
      }
1047
 
 
1048
 
      int klen = make_index_key(key_buffer, i, row_to_delete);
1049
 
      int skip_len = btree_key_length(held_key, active_index);
1050
 
      uint16_t suffix_len = uint2korr(held_key + skip_len);
1051
 
 
1052
 
      /* Append the suffix to the key */
1053
 
      memcpy(key_buffer + klen, held_key + skip_len,
1054
 
             sizeof(suffix_len) + suffix_len);
1055
 
 
1056
 
      /* Update the key length to cover the generated key. */
1057
 
      klen = klen + sizeof(suffix_len) + suffix_len;
1058
 
 
1059
 
      if (share->btrees[i].delete_key(key_buffer, klen) != 0)
1060
 
        return HA_ERR_KEY_NOT_FOUND;
1061
 
    }
1062
 
 
1063
 
    /* Skip to the data dictionary key. */
1064
 
    int dict_key_offset = btree_key_length(dict_key, active_index);
1065
 
    dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
1066
 
  }
1067
 
 
1068
 
  rv = share->dict.delete_row(dict_key, dict_klen);
1069
 
 
1070
 
  if (share->nkeys > 0)
1071
 
    share->blitz_lock.slotted_unlock(lock_id);
1072
 
 
1073
 
  return rv;
1074
 
}
1075
 
 
1076
 
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
1077
 
                                  uint64_t, uint64_t *first_value,
1078
 
                                  uint64_t *nb_reserved_values) {
1079
 
  *first_value = share->auto_increment_value + 1;
1080
 
  *nb_reserved_values = UINT64_MAX;
1081
 
}
1082
 
 
1083
 
int ha_blitz::reset_auto_increment(uint64_t value) {
1084
 
  share->auto_increment_value = (value == 0) ? 1 : value;
1085
 
  return 0;
1086
 
}
1087
 
 
1088
 
int ha_blitz::delete_all_rows(void) {
1089
 
  for (uint32_t i = 0; i < share->nkeys; i++) {
1090
 
    if (share->btrees[i].delete_all() != 0) {
1091
 
      errkey = i;
1092
 
      return HA_ERR_CRASHED_ON_USAGE;
1093
 
    }
1094
 
  }
1095
 
  return (share->dict.delete_all_rows()) ? 0 : -1;
1096
 
}
1097
 
 
1098
 
uint32_t ha_blitz::max_row_length(void) {
1099
 
  uint32_t length = (getTable()->getRecordLength() + getTable()->sizeFields() * 2);
1100
 
  uint32_t *pos = getTable()->getBlobField();
1101
 
  uint32_t *end = pos + getTable()->sizeBlobFields();
1102
 
 
1103
 
  while (pos != end) {
1104
 
    length += 2 + ((Field_blob *)getTable()->getField(*pos))->get_length();
1105
 
    pos++;
1106
 
  }
1107
 
 
1108
 
  return length;
1109
 
}
1110
 
 
1111
 
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
1112
 
  if (!share->primary_key_exists) {
1113
 
    uint64_t next_id = share->dict.next_hidden_row_id();
1114
 
    int8store(pack_to, next_id);
1115
 
    return sizeof(next_id);
1116
 
  }
1117
 
 
1118
 
  /* Getting here means that there is a PK in this table. Get the
1119
 
     binary representation of the PK, pack it to BlitzDB's key buffer
1120
 
     and return the size of it. */
1121
 
  return make_index_key(pack_to, getTable()->getShare()->getPrimaryKey(), row);
1122
 
}
1123
 
 
1124
 
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
1125
 
                                const unsigned char *row) {
1126
 
  KeyInfo *key = &getTable()->key_info[key_num];
1127
 
  KeyPartInfo *key_part = key->key_part;
1128
 
  KeyPartInfo *key_part_end = key_part + key->key_parts;
1129
 
 
1130
 
  unsigned char *pos = (unsigned char *)pack_to;
1131
 
  unsigned char *end;
1132
 
  int offset = 0;
1133
 
 
1134
 
  memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
1135
 
 
1136
 
  /* Loop through key part(s) and pack them as we go. */
1137
 
  for (; key_part != key_part_end; key_part++) {
1138
 
    if (key_part->null_bit) {
1139
 
      if (row[key_part->null_offset] & key_part->null_bit) {
1140
 
        *pos++ = 0;
1141
 
        continue;
1142
 
      }
1143
 
      *pos++ = 1;
1144
 
    }
1145
 
 
1146
 
    /* Here we normalize VARTEXT1 to VARTEXT2 for simplicity. */
1147
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1148
 
      /* Extract the length of the string from the row. */
1149
 
      uint16_t data_len = *(uint8_t *)(row + key_part->offset);
1150
 
 
1151
 
      /* Copy the length of the string. Use 2 bytes. */
1152
 
      int2store(pos, data_len);
1153
 
      pos += sizeof(data_len);
1154
 
 
1155
 
      /* Copy the string data */
1156
 
      memcpy(pos, row + key_part->offset + sizeof(uint8_t), data_len);
1157
 
      pos += data_len;
1158
 
    } else {
1159
 
      end = key_part->field->pack(pos, row + key_part->offset);
1160
 
      offset = end - pos;
1161
 
      pos += offset;
1162
 
    }
1163
 
  }
1164
 
 
1165
 
  return ((char *)pos - pack_to);
1166
 
}
1167
 
 
1168
 
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
1169
 
                          const size_t b_len, size_t *merged_len) {
1170
 
 
1171
 
  size_t total = a_len + sizeof(uint16_t) + b_len;
1172
 
 
1173
 
  if (total > key_merge_buffer_len) {
1174
 
    key_merge_buffer = (char *)realloc(key_merge_buffer, total);
1175
 
 
1176
 
    if (key_merge_buffer == NULL) {
1177
 
      errno = HA_ERR_OUT_OF_MEM;
1178
 
      return NULL;
1179
 
    }
1180
 
    key_merge_buffer_len = total;
1181
 
  }
1182
 
 
1183
 
  char *pos = key_merge_buffer;
1184
 
 
1185
 
  /* Copy the prefix. */
1186
 
  memcpy(pos, a, a_len);
1187
 
  pos += a_len;
1188
 
 
1189
 
  /* Copy the length of b. */
1190
 
  int2store(pos, (uint16_t)b_len);
1191
 
  pos += sizeof(uint16_t);
1192
 
 
1193
 
  /* Copy the suffix and we're done. */
1194
 
  memcpy(pos, b, b_len);
1195
 
 
1196
 
  *merged_len = total;
1197
 
  return key_merge_buffer;
1198
 
}
1199
 
 
1200
 
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
1201
 
  KeyInfo *key_info = &getTable()->key_info[key_num];
1202
 
  KeyPartInfo *key_part = key_info->key_part;
1203
 
  KeyPartInfo *key_part_end = key_part + key_info->key_parts;
1204
 
  char *pos = (char *)key;
1205
 
  uint64_t len = 0;
1206
 
  size_t rv = 0;
1207
 
 
1208
 
  for (; key_part != key_part_end; key_part++) {
1209
 
    if (key_part->null_bit) {
1210
 
      pos++;
1211
 
      rv++;
1212
 
      if (*key == 0)
1213
 
        continue;
1214
 
    }
1215
 
 
1216
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1 ||
1217
 
        key_part->type == HA_KEYTYPE_VARTEXT2) {
1218
 
      len = uint2korr(pos);
1219
 
      rv += len + sizeof(uint16_t);
1220
 
    } else {
1221
 
      len = key_part->field->key_length();
1222
 
      rv += len;
1223
 
    }
1224
 
    pos += len;
1225
 
    len = 0;
1226
 
  }
1227
 
 
1228
 
  return rv;
1229
 
}
1230
 
 
1231
 
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
1232
 
  memcpy(held_key_buf, key, klen);
1233
 
  held_key = held_key_buf;
1234
 
  held_key_len = klen;
1235
 
}
1236
 
 
1237
 
/* Converts a native Drizzle index key to BlitzDB's format. */
1238
 
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
1239
 
                                    const int key_num, int *return_key_len) {
1240
 
  KeyInfo *key = &getTable()->key_info[key_num];
1241
 
  KeyPartInfo *key_part = key->key_part;
1242
 
  KeyPartInfo *key_part_end = key_part + key->key_parts;
1243
 
 
1244
 
  unsigned char *key_pos = (unsigned char *)native_key;
1245
 
  unsigned char *keybuf_pos = (unsigned char *)key_buffer;
1246
 
  unsigned char *end;
1247
 
  int key_size = 0;
1248
 
  int offset = 0;
1249
 
 
1250
 
  memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
1251
 
 
1252
 
  for (; key_part != key_part_end; key_part++) {
1253
 
    if (key_part->null_bit) {
1254
 
      key_size++;
1255
 
 
1256
 
      /* This key is NULL */
1257
 
      if (!(*keybuf_pos++ = (*key_pos++ == 0)))
1258
 
        continue;
1259
 
    }
1260
 
 
1261
 
    /* Normalize a VARTEXT1 key to VARTEXT2. */
1262
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1263
 
      uint16_t str_len = *(uint16_t *)key_pos;
1264
 
 
1265
 
      /* Copy the length of the string over to key buffer. */
1266
 
      int2store(keybuf_pos, str_len);
1267
 
      keybuf_pos += sizeof(str_len);
1268
 
 
1269
 
      /* Copy the actual value over to the key buffer. */
1270
 
      memcpy(keybuf_pos, key_pos + sizeof(str_len), str_len);
1271
 
      keybuf_pos += str_len;
1272
 
 
1273
 
      /* NULL byte + Length of str (2 byte) + Actual String. */
1274
 
      offset = 1 + sizeof(str_len) + str_len;
1275
 
    } else {
1276
 
      end = key_part->field->pack(keybuf_pos, key_pos);
1277
 
      offset = end - keybuf_pos;
1278
 
      keybuf_pos += offset;
1279
 
    }
1280
 
 
1281
 
    key_size += offset;
1282
 
    key_pos += key_part->field->key_length();
1283
 
  }
1284
 
 
1285
 
  *return_key_len = key_size;
1286
 
  return key_buffer;
1287
 
}
1288
 
 
1289
 
size_t ha_blitz::pack_row(unsigned char *row_buffer,
1290
 
                          unsigned char *row_to_pack) {
1291
 
  unsigned char *pos;
1292
 
 
1293
 
  /* Nothing special to do if the table is fixed length */
1294
 
  if (share->fixed_length_table) {
1295
 
    memcpy(row_buffer, row_to_pack, getTable()->getShare()->getRecordLength());
1296
 
    return (size_t)getTable()->getShare()->getRecordLength();
1297
 
  }
1298
 
 
1299
 
  /* Copy NULL bits */
1300
 
  memcpy(row_buffer, row_to_pack, getTable()->getShare()->null_bytes);
1301
 
  pos = row_buffer + getTable()->getShare()->null_bytes;
1302
 
 
1303
 
  /* Pack each field into the buffer */
1304
 
  for (Field **field = getTable()->getFields(); *field; field++) {
1305
 
    if (!((*field)->is_null()))
1306
 
      pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
1307
 
  }
1308
 
 
1309
 
  return (size_t)(pos - row_buffer);
1310
 
}
1311
 
 
1312
 
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
1313
 
                          const size_t from_len) {
1314
 
  const unsigned char *pos;
1315
 
 
1316
 
  /* Nothing special to do */
1317
 
  if (share->fixed_length_table) {
1318
 
    memcpy(to, from, from_len);
1319
 
    return true;
1320
 
  }
1321
 
 
1322
 
  /* Start by copying NULL bits which is the beginning block
1323
 
     of a Drizzle row. */
1324
 
  pos = (const unsigned char *)from;
1325
 
  memcpy(to, pos, getTable()->getShare()->null_bytes);
1326
 
  pos += getTable()->getShare()->null_bytes;
1327
 
 
1328
 
  /* Unpack all fields in the provided row. */
1329
 
  for (Field **field = getTable()->getFields(); *field; field++) {
1330
 
    if (!((*field)->is_null())) {
1331
 
      pos = (*field)->unpack(to + (*field)->offset(getTable()->getInsertRecord()), pos);
1332
 
    }
1333
 
  }
1334
 
 
1335
 
  return true;
1336
 
}
1337
 
 
1338
 
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
1339
 
  unsigned char *buf = pack_buffer;
1340
 
 
1341
 
  /* This is a shitty case where the row size is larger than 2KB. */
1342
 
  if (size > BLITZ_MAX_ROW_STACK) {
1343
 
    if (size > secondary_row_buffer_size) {
1344
 
      void *new_ptr = realloc(secondary_row_buffer, size);
1345
 
 
1346
 
      if (new_ptr == NULL) {
1347
 
        errno = HA_ERR_OUT_OF_MEM;
1348
 
        return NULL;
1349
 
      }
1350
 
 
1351
 
      secondary_row_buffer_size = size;
1352
 
      secondary_row_buffer = (unsigned char *)new_ptr;
1353
 
    }
1354
 
    buf = secondary_row_buffer;
1355
 
  }
1356
 
  return buf;
1357
 
}
1358
 
 
1359
 
static BlitzEngine *blitz_engine = NULL;
1360
 
 
1361
 
BlitzShare *ha_blitz::get_share(const char *name) {
1362
 
  BlitzShare *share_ptr;
1363
 
  BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1364
 
  std::string table_path(name);
1365
 
 
1366
 
  pthread_mutex_lock(&blitz_utility_mutex);
1367
 
 
1368
 
  /* Look up the table cache to see if the table resource is available */
1369
 
  share_ptr = bz_engine->getTableShare(table_path);
1370
 
 
1371
 
  if (share_ptr) {
1372
 
    share_ptr->use_count++;
1373
 
    pthread_mutex_unlock(&blitz_utility_mutex);
1374
 
    return share_ptr;
1375
 
  }
1376
 
 
1377
 
  /* Table wasn't cached so create a new table handler */
1378
 
  share_ptr = new BlitzShare();
1379
 
 
1380
 
  /* Prepare the Data Dictionary */
1381
 
  if (share_ptr->dict.startup(table_path.c_str()) != 0) {
1382
 
    delete share_ptr;
1383
 
    pthread_mutex_unlock(&blitz_utility_mutex);
1384
 
    return NULL;
1385
 
  }
1386
 
 
1387
 
  /* Prepare Index Structure(s) */
1388
 
  KeyInfo *curr = &getTable()->getMutableShare()->getKeyInfo(0);
1389
 
  share_ptr->btrees = new BlitzTree[getTable()->getShare()->keys];
1390
 
 
1391
 
  for (uint32_t i = 0; i < getTable()->getShare()->keys; i++, curr++) {
1392
 
    share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
1393
 
    share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
1394
 
 
1395
 
    if (getTable()->key_info[i].flags & HA_NOSAME)
1396
 
      share_ptr->btrees[i].unique = true;
1397
 
 
1398
 
    share_ptr->btrees[i].length = curr->key_length;
1399
 
    share_ptr->btrees[i].nparts = curr->key_parts;
1400
 
 
1401
 
    /* Record Meta Data of the Key Segments */
1402
 
    for (uint32_t j = 0; j < curr->key_parts; j++) {
1403
 
      Field *f = curr->key_part[j].field;
1404
 
 
1405
 
      if (f->null_ptr) {
1406
 
        share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
1407
 
        share_ptr->btrees[i].parts[j].null_pos
1408
 
          = (uint32_t)(f->null_ptr - (unsigned char *)getTable()->getInsertRecord());
1409
 
      }
1410
 
 
1411
 
      share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
1412
 
 
1413
 
      if (f->type() == DRIZZLE_TYPE_BLOB) {
1414
 
        share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
1415
 
      }
1416
 
 
1417
 
      share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
1418
 
      share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
1419
 
      share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
1420
 
    }
1421
 
  }
1422
 
 
1423
 
  /* Set Meta Data */
1424
 
  share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
1425
 
  share_ptr->table_name = table_path;
1426
 
  share_ptr->nkeys = getTable()->getShare()->keys;
1427
 
  share_ptr->use_count = 1;
1428
 
 
1429
 
  share_ptr->fixed_length_table = !(getTable()->getShare()->db_create_options
1430
 
                                    & HA_OPTION_PACK_RECORD);
1431
 
 
1432
 
  if (getTable()->getShare()->getPrimaryKey() >= MAX_KEY)
1433
 
    share_ptr->primary_key_exists = false;
1434
 
  else
1435
 
    share_ptr->primary_key_exists = true;
1436
 
 
1437
 
  /* Done creating the share object. Cache it for later
1438
 
     use by another cursor object.*/
1439
 
  bz_engine->cacheTableShare(table_path, share_ptr);
1440
 
 
1441
 
  pthread_mutex_unlock(&blitz_utility_mutex);
1442
 
  return share_ptr;
1443
 
}
1444
 
 
1445
 
int ha_blitz::free_share(void) {
1446
 
  pthread_mutex_lock(&blitz_utility_mutex);
1447
 
 
1448
 
  /* BlitzShare could still be used by another thread. Check the
1449
 
     reference counter to see if it's safe to free it */
1450
 
  if (--share->use_count == 0) {
1451
 
    share->dict.write_meta_autoinc(share->auto_increment_value);
1452
 
 
1453
 
    if (share->dict.shutdown() != 0) {
1454
 
      pthread_mutex_unlock(&blitz_utility_mutex);
1455
 
      return HA_ERR_CRASHED_ON_USAGE;
1456
 
    }
1457
 
 
1458
 
    for (uint32_t i = 0; i < share->nkeys; i++) {
1459
 
      delete[] share->btrees[i].parts;
1460
 
      share->btrees[i].close();
1461
 
    }
1462
 
 
1463
 
    BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1464
 
    bz_engine->deleteTableShare(share->table_name);
1465
 
 
1466
 
    delete[] share->btrees;
1467
 
    delete share;
1468
 
  }
1469
 
 
1470
 
  pthread_mutex_unlock(&blitz_utility_mutex);
1471
 
  return 0;
1472
 
}
1473
 
 
1474
 
static int blitz_init(drizzled::module::Context &context) {
1475
 
  blitz_engine = new BlitzEngine("BLITZDB");
1476
 
 
1477
 
  if (!blitz_engine->doCreateTableCache()) {
1478
 
    delete blitz_engine;
1479
 
    return HA_ERR_OUT_OF_MEM;
1480
 
  }
1481
 
 
1482
 
  pthread_mutex_init(&blitz_utility_mutex, NULL);
1483
 
  context.add(blitz_engine);
1484
 
  context.registerVariable(new sys_var_uint64_t_ptr("estimated-rows",
1485
 
                                                    &blitz_estimated_rows));
1486
 
  return 0;
1487
 
}
1488
 
 
1489
 
/* Read the prototype of this function for details. */
1490
 
static char *skip_btree_key(const char *key, const size_t skip_len,
1491
 
                            int *return_klen) {
1492
 
  char *pos = (char *)key;
1493
 
  *return_klen = uint2korr(pos + skip_len);
1494
 
  return pos + skip_len + sizeof(uint16_t);
1495
 
}
1496
 
 
1497
 
static bool str_is_numeric(const std::string &str) {
1498
 
  for (uint32_t i = 0; i < str.length(); i++) {
1499
 
    if (!std::isdigit(str[i]))
1500
 
      return false;
1501
 
  }
1502
 
  return true;
1503
 
}
1504
 
 
1505
 
static void blitz_init_options(drizzled::module::option_context &context)
1506
 
{
1507
 
  context("estimated-rows",
1508
 
          po::value<uint64_t>(&blitz_estimated_rows)->default_value(0),
1509
 
          N_("Estimated number of rows that a BlitzDB table will store."));
1510
 
}
1511
 
 
1512
 
DRIZZLE_PLUGIN(blitz_init, NULL, blitz_init_options);