~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/blitzdb/ha_blitz.cc

  • Committer: Olaf van der Spek
  • Date: 2011-10-10 09:27:50 UTC
  • mto: (2430.1.6 rf)
  • mto: This revision was merged to the branch mainline in revision 2436.
  • Revision ID: olafvdspek@gmail.com-20111010092750-ryxgmn7zj5yvxfkf
Refactor

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2009 - 2010 Toru Maesaka
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
#include "config.h"
21
 
#include "ha_blitz.h"
22
 
 
23
 
using namespace std;
24
 
using namespace drizzled;
25
 
namespace po= boost::program_options;
26
 
 
27
 
static pthread_mutex_t blitz_utility_mutex;
28
 
 
29
 
static const char *ha_blitz_exts[] = {
30
 
  BLITZ_DATA_EXT,
31
 
  BLITZ_INDEX_EXT,
32
 
  BLITZ_SYSTEM_EXT,
33
 
  NULL
34
 
};
35
 
 
36
 
/* Global Variables for Startup Options */
37
 
uint64_t blitz_estimated_rows;
38
 
 
39
 
class BlitzEngine : public drizzled::plugin::StorageEngine {
40
 
private:
41
 
  TCMAP *blitz_table_cache;
42
 
 
43
 
public:
44
 
  BlitzEngine(const std::string &name_arg) :
45
 
    drizzled::plugin::StorageEngine(name_arg,
46
 
                                    drizzled::HTON_NULL_IN_KEY |
47
 
                                    drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
48
 
                                    drizzled::HTON_STATS_RECORDS_IS_EXACT |
49
 
                                    drizzled::HTON_SKIP_STORE_LOCK) {
50
 
    table_definition_ext = BLITZ_SYSTEM_EXT;
51
 
  }
52
 
 
53
 
  virtual ~BlitzEngine() {
54
 
    pthread_mutex_destroy(&blitz_utility_mutex);
55
 
    tcmapdel(blitz_table_cache);
56
 
  }
57
 
 
58
 
  virtual drizzled::Cursor *create(drizzled::Table &table) {
59
 
    return new ha_blitz(*this, table);
60
 
  }
61
 
 
62
 
  const char **bas_ext() const {
63
 
    return ha_blitz_exts;
64
 
  }
65
 
 
66
 
  int doCreateTable(drizzled::Session &session,
67
 
                    drizzled::Table &table_arg,
68
 
                    const drizzled::TableIdentifier &identifier,
69
 
                    drizzled::message::Table &table_proto);
70
 
 
71
 
  int doRenameTable(drizzled::Session &session,
72
 
                    const drizzled::TableIdentifier &from_identifier,
73
 
                    const drizzled::TableIdentifier &to_identifier);
74
 
 
75
 
  int doDropTable(drizzled::Session &session,
76
 
                  const drizzled::TableIdentifier &identifier);
77
 
 
78
 
  int doGetTableDefinition(drizzled::Session &session,
79
 
                           const drizzled::TableIdentifier &identifier,
80
 
                           drizzled::message::Table &table_proto);
81
 
 
82
 
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
83
 
                             const drizzled::SchemaIdentifier &schema_identifier,
84
 
                             drizzled::TableIdentifier::vector &set_of_identifiers);
85
 
 
86
 
  bool doDoesTableExist(drizzled::Session &session,
87
 
                        const drizzled::TableIdentifier &identifier);
88
 
 
89
 
  bool validateCreateTableOption(const std::string &key,
90
 
                                 const std::string &state);
91
 
 
92
 
  bool doCreateTableCache(void);
93
 
 
94
 
  BlitzShare *getTableShare(const std::string &name);
95
 
  void cacheTableShare(const std::string &name, BlitzShare *share);
96
 
  void deleteTableShare(const std::string &name);
97
 
 
98
 
  uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
99
 
  uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
100
 
  uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
101
 
 
102
 
  uint32_t index_flags(enum drizzled::ha_key_alg) const {
103
 
    return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
104
 
            HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
105
 
  }
106
 
};
107
 
 
108
 
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
109
 
   a key to that row in the data dictionary. Two keys are merged and
110
 
   stored as a key because we want to avoid reading the leaf node and
111
 
   thus save disk IO and some computation in the tree. Note that the
112
 
   comparison function of BlitzDB's btree only takes into accound the
113
 
   actual index key. See blitzcmp.cc for details.
114
 
 
115
 
   With the above in mind, this helper function returns a pointer to
116
 
   the dictionary key by calculating the offset. */
117
 
static char *skip_btree_key(const char *key, const size_t skip_len,
118
 
                            int *return_klen);
119
 
 
120
 
static bool str_is_numeric(const std::string &str);
121
 
 
122
 
int BlitzEngine::doCreateTable(drizzled::Session &,
123
 
                               drizzled::Table &table,
124
 
                               const drizzled::TableIdentifier &identifier,
125
 
                               drizzled::message::Table &proto) {
126
 
  BlitzData dict;
127
 
  BlitzTree btree;
128
 
  int ecode;
129
 
 
130
 
  /* Temporary fix for blocking composite keys. We need to add this
131
 
     check because version 1 doesn't handle composite indexes. */
132
 
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
133
 
    if (table.key_info[i].key_parts > 1)
134
 
      return HA_ERR_UNSUPPORTED;
135
 
  }
136
 
 
137
 
  /* Create relevant files for a new table and close them immediately.
138
 
     All we want to do here is somewhat like UNIX touch(1). */
139
 
  if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
140
 
    return ecode;
141
 
 
142
 
  if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
143
 
    return ecode;
144
 
 
145
 
  /* Create b+tree index(es) for this table. */
146
 
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
147
 
    if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
148
 
      return ecode;
149
 
  }
150
 
 
151
 
  /* Write the table definition to system table. */
152
 
  if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
153
 
    return ecode;
154
 
 
155
 
  if (!dict.write_table_definition(proto)) {
156
 
    dict.close_system_table();
157
 
    return HA_ERR_CRASHED_ON_USAGE;
158
 
  }
159
 
 
160
 
  dict.close_system_table();
161
 
  return 0;
162
 
}
163
 
 
164
 
int BlitzEngine::doRenameTable(drizzled::Session &,
165
 
                               const drizzled::TableIdentifier &from,
166
 
                               const drizzled::TableIdentifier &to) {
167
 
  int rv = 0;
168
 
 
169
 
  BlitzData blitz_table;
170
 
  uint32_t nkeys;
171
 
 
172
 
  BlitzData dict;
173
 
  int ecode;
174
 
  /* Write the table definition to system table. */
175
 
  if ((ecode = dict.open_system_table(from.getPath(), HDBOWRITER)) != 0)
176
 
    return ecode;
177
 
 
178
 
  drizzled::message::Table proto;
179
 
  char *proto_string;
180
 
  int proto_string_len;
181
 
 
182
 
  proto_string = dict.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
183
 
                                       BLITZ_TABLE_PROTO_KEY.length(),
184
 
                                       &proto_string_len);
185
 
 
186
 
  if (proto_string == NULL) {
187
 
    return ENOMEM;
188
 
  }
189
 
 
190
 
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
191
 
    free(proto_string);
192
 
    return HA_ERR_CRASHED_ON_USAGE;
193
 
  }
194
 
 
195
 
  free(proto_string);
196
 
 
197
 
  proto.set_name(to.getTableName());
198
 
  proto.set_schema(to.getSchemaName());
199
 
  proto.set_catalog(to.getCatalogName());
200
 
 
201
 
  if (!dict.write_table_definition(proto)) {
202
 
    dict.close_system_table();
203
 
    return HA_ERR_CRASHED_ON_USAGE;
204
 
  }
205
 
 
206
 
  dict.close_system_table();
207
 
 
208
 
  /* Find out the number of indexes in this table. This information
209
 
     is required because BlitzDB creates a file for each indexes.*/
210
 
  if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
211
 
    return HA_ERR_CRASHED_ON_USAGE;
212
 
 
213
 
  nkeys = blitz_table.read_meta_keycount();
214
 
 
215
 
  if (blitz_table.close_data_table() != 0)
216
 
    return HA_ERR_CRASHED_ON_USAGE;
217
 
 
218
 
  /* We're now ready to rename the file(s) for this table. Start by
219
 
     attempting to rename the data and system files. */
220
 
  if (rename_file_ext(from.getPath().c_str(),
221
 
                      to.getPath().c_str(), BLITZ_DATA_EXT)) {
222
 
    if ((rv = errno) != ENOENT)
223
 
      return rv;
224
 
  }
225
 
 
226
 
  if (rename_file_ext(from.getPath().c_str(),
227
 
                      to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
228
 
    if ((rv = errno) != ENOENT)
229
 
      return rv;
230
 
  }
231
 
 
232
 
  /* So far so good. Rename the index file(s) and we're done. */
233
 
  BlitzTree btree;
234
 
 
235
 
  for (uint32_t i = 0; i < nkeys; i++) {
236
 
    if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
237
 
      return HA_ERR_CRASHED_ON_USAGE;
238
 
  }
239
 
 
240
 
  return rv;
241
 
}
242
 
 
243
 
int BlitzEngine::doDropTable(drizzled::Session &,
244
 
                             const drizzled::TableIdentifier &identifier) {
245
 
  BlitzData dict;
246
 
  BlitzTree btree;
247
 
  char buf[FN_REFLEN];
248
 
  uint32_t nkeys;
249
 
  int err;
250
 
 
251
 
  /* We open the dictionary to extract meta data from it */
252
 
  if ((err = dict.open_data_table(identifier.getPath().c_str(),
253
 
                                  HDBOREADER)) != 0) {
254
 
    return err;
255
 
  }
256
 
 
257
 
  nkeys = dict.read_meta_keycount();
258
 
 
259
 
  /* We no longer need the dictionary to be open */
260
 
  dict.close_data_table();
261
 
 
262
 
  /* Drop the Data Dictionary */
263
 
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
264
 
  if ((err = unlink(buf)) == -1) {
265
 
    return err;
266
 
  }
267
 
 
268
 
  /* Drop the System Table */
269
 
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
270
 
  if ((err = unlink(buf)) == -1) {
271
 
    return err;
272
 
  }
273
 
 
274
 
  /* Drop Index file(s) */
275
 
  for (uint32_t i = 0; i < nkeys; i++) {
276
 
    if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
277
 
      return err;
278
 
    }
279
 
  }
280
 
 
281
 
  return 0;
282
 
}
283
 
 
284
 
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
285
 
                                      const drizzled::TableIdentifier &identifier,
286
 
                                      drizzled::message::Table &proto) {
287
 
  struct stat stat_info;
288
 
  std::string path(identifier.getPath());
289
 
 
290
 
  path.append(BLITZ_SYSTEM_EXT);
291
 
 
292
 
  if (stat(path.c_str(), &stat_info)) {
293
 
    return errno;
294
 
  }
295
 
 
296
 
  BlitzData db;
297
 
  char *proto_string;
298
 
  int proto_string_len;
299
 
 
300
 
  if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
301
 
    return HA_ERR_CRASHED_ON_USAGE;
302
 
  }
303
 
 
304
 
  proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
305
 
                                     BLITZ_TABLE_PROTO_KEY.length(),
306
 
                                     &proto_string_len);
307
 
 
308
 
  if (db.close_system_table() != 0) {
309
 
    return HA_ERR_CRASHED_ON_USAGE;
310
 
  }
311
 
 
312
 
  if (proto_string == NULL) {
313
 
    return ENOMEM;
314
 
  }
315
 
 
316
 
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
317
 
    free(proto_string);
318
 
    return HA_ERR_CRASHED_ON_USAGE;
319
 
  }
320
 
 
321
 
  free(proto_string);
322
 
 
323
 
  return EEXIST;
324
 
}
325
 
 
326
 
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
327
 
                                        const drizzled::SchemaIdentifier &schema_id,
328
 
                                        drizzled::TableIdentifier::vector &ids) {
329
 
  drizzled::CachedDirectory::Entries entries = directory.getEntries();
330
 
 
331
 
  for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
332
 
       entry_iter != entries.end(); ++entry_iter) {
333
 
 
334
 
    drizzled::CachedDirectory::Entry *entry = *entry_iter;
335
 
    const std::string *filename = &entry->filename;
336
 
 
337
 
    assert(filename->size());
338
 
 
339
 
    const char *ext = strchr(filename->c_str(), '.');
340
 
 
341
 
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
342
 
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
343
 
    } else {
344
 
      char uname[NAME_LEN + 1];
345
 
      uint32_t file_name_len;
346
 
 
347
 
      file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
348
 
                                                             uname,
349
 
                                                             sizeof(uname));
350
 
 
351
 
      uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
352
 
      ids.push_back(TableIdentifier(schema_id, uname));
353
 
    }
354
 
  }
355
 
}
356
 
 
357
 
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
358
 
                                   const drizzled::TableIdentifier &identifier) {
359
 
  std::string proto_path(identifier.getPath());
360
 
  proto_path.append(BLITZ_DATA_EXT);
361
 
 
362
 
  return (access(proto_path.c_str(), F_OK)) ? false : true;
363
 
}
364
 
 
365
 
bool BlitzEngine::validateCreateTableOption(const std::string &key,
366
 
                                            const std::string &state) {
367
 
  if (key == "ESTIMATED_ROWS" || key == "estimated_rows") {
368
 
    if (str_is_numeric(state))
369
 
      return true;
370
 
  }
371
 
  return false;
372
 
}
373
 
 
374
 
bool BlitzEngine::doCreateTableCache(void) {
375
 
  return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
376
 
}
377
 
 
378
 
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
379
 
  int vlen;
380
 
  const void *fetched;
381
 
  BlitzShare *rv = NULL;
382
 
 
383
 
  fetched = tcmapget(blitz_table_cache, table_name.c_str(),
384
 
                     table_name.length(), &vlen);
385
 
 
386
 
  /* dereference the object */
387
 
  if (fetched)
388
 
    rv = *(BlitzShare **)fetched;
389
 
 
390
 
  return rv;
391
 
}
392
 
 
393
 
void BlitzEngine::cacheTableShare(const std::string &table_name,
394
 
                                  BlitzShare *share) {
395
 
  /* Cache the memory address of the share object */
396
 
  tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
397
 
           &share, sizeof(share));
398
 
}
399
 
 
400
 
void BlitzEngine::deleteTableShare(const std::string &table_name) {
401
 
  tcmapout2(blitz_table_cache, table_name.c_str());
402
 
}
403
 
 
404
 
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
405
 
                   Table &table_arg) : Cursor(engine_arg, table_arg),
406
 
                                            btree_cursor(NULL),
407
 
                                            table_scan(false),
408
 
                                            table_based(false),
409
 
                                            thread_locked(false),
410
 
                                            held_key(NULL),
411
 
                                            held_key_len(0),
412
 
                                            current_key(NULL),
413
 
                                            current_key_len(0),
414
 
                                            key_buffer(NULL),
415
 
                                            errkey_id(0) {}
416
 
 
417
 
int ha_blitz::open(const char *table_name, int, uint32_t) {
418
 
  if ((share = get_share(table_name)) == NULL)
419
 
    return HA_ERR_CRASHED_ON_USAGE;
420
 
 
421
 
  pthread_mutex_lock(&blitz_utility_mutex);
422
 
 
423
 
  btree_cursor = new BlitzCursor[share->nkeys];
424
 
 
425
 
  for (uint32_t i = 0; i < share->nkeys; i++) {
426
 
    if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
427
 
      free_share();
428
 
      pthread_mutex_unlock(&blitz_utility_mutex);
429
 
      return HA_ERR_OUT_OF_MEM;
430
 
    }
431
 
  }
432
 
 
433
 
  if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
434
 
    free_share();
435
 
    pthread_mutex_unlock(&blitz_utility_mutex);
436
 
    return HA_ERR_OUT_OF_MEM;
437
 
  }
438
 
 
439
 
  if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
440
 
    free_share();
441
 
    pthread_mutex_unlock(&blitz_utility_mutex);
442
 
    return HA_ERR_OUT_OF_MEM;
443
 
  }
444
 
 
445
 
  if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
446
 
    free_share();
447
 
    free(key_buffer);
448
 
    free(key_merge_buffer);
449
 
    pthread_mutex_unlock(&blitz_utility_mutex);
450
 
    return HA_ERR_OUT_OF_MEM;
451
 
  }
452
 
 
453
 
  secondary_row_buffer = NULL;
454
 
  secondary_row_buffer_size = 0;
455
 
  key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
456
 
 
457
 
  /* 'ref_length' determines the size of the buffer that the kernel
458
 
     will use to uniquely identify a row. The actual allocation is
459
 
     done by the kernel so all we do here is specify the size of it.*/
460
 
  if (share->primary_key_exists) {
461
 
    ref_length = getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
462
 
  } else {
463
 
    ref_length = sizeof(held_key_len) + sizeof(uint64_t);
464
 
  }
465
 
 
466
 
  pthread_mutex_unlock(&blitz_utility_mutex);
467
 
  return 0;
468
 
}
469
 
 
470
 
int ha_blitz::close(void) {
471
 
  for (uint32_t i = 0; i < share->nkeys; i++) {
472
 
    share->btrees[i].destroy_cursor(&btree_cursor[i]);
473
 
  }
474
 
  delete [] btree_cursor;
475
 
 
476
 
  free(key_buffer);
477
 
  free(key_merge_buffer);
478
 
  free(held_key_buf);
479
 
  free(secondary_row_buffer);
480
 
  return free_share();
481
 
}
482
 
 
483
 
int ha_blitz::info(uint32_t flag) {
484
 
  if (flag & HA_STATUS_VARIABLE) {
485
 
    stats.records = share->dict.nrecords();
486
 
    stats.data_file_length = share->dict.table_size();
487
 
  }
488
 
 
489
 
  if (flag & HA_STATUS_AUTO)
490
 
    stats.auto_increment_value = share->auto_increment_value + 1;
491
 
 
492
 
  if (flag & HA_STATUS_ERRKEY)
493
 
    errkey = errkey_id;
494
 
 
495
 
  return 0;
496
 
}
497
 
 
498
 
int ha_blitz::doStartTableScan(bool scan) {
499
 
  /* Obtain the query type for this scan */
500
 
  sql_command_type = session_sql_command(getTable()->getSession());
501
 
  table_scan = scan;
502
 
  table_based = true;
503
 
 
504
 
  /* Obtain the most suitable lock for the given statement type. */
505
 
  blitz_optimal_lock();
506
 
 
507
 
  /* Get the first record from TCHDB. Let the scanner take
508
 
     care of checking return value errors. */
509
 
  if (table_scan) {
510
 
    current_key = share->dict.next_key_and_row(NULL, 0,
511
 
                                               &current_key_len,
512
 
                                               &current_row,
513
 
                                               &current_row_len);
514
 
  }
515
 
  return 0;
516
 
}
517
 
 
518
 
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
519
 
  char *next_key;
520
 
  const char *next_row;
521
 
  int next_row_len;
522
 
  int next_key_len;
523
 
 
524
 
  free(held_key);
525
 
  held_key = NULL;
526
 
 
527
 
  if (current_key == NULL) {
528
 
    getTable()->status = STATUS_NOT_FOUND;
529
 
    return HA_ERR_END_OF_FILE;
530
 
  }
531
 
 
532
 
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
533
 
 
534
 
  /* Unpack and copy the current row to Drizzle's result buffer. */
535
 
  unpack_row(drizzle_buf, current_row, current_row_len);
536
 
 
537
 
  /* Retrieve both key and row of the next record with one allocation. */
538
 
  next_key = share->dict.next_key_and_row(current_key, current_key_len,
539
 
                                          &next_key_len, &next_row,
540
 
                                          &next_row_len);
541
 
 
542
 
  /* Memory region for "current_row" will be freed as "held key" on
543
 
     the next iteration. This is because "current_key" points to the
544
 
     region of memory that contains "current_row" and "held_key" points
545
 
     to it. If there isn't another iteration then it is freed in doEndTableScan(). */
546
 
  current_row = next_row;
547
 
  current_row_len = next_row_len;
548
 
 
549
 
  /* Remember the current row because delete, update or replace
550
 
     function could be called after this function. This pointer is
551
 
     also used to free the previous key and row, which resides on
552
 
     the same buffer. */
553
 
  held_key = current_key;
554
 
  held_key_len = current_key_len;
555
 
 
556
 
  /* It is now memory-leak-safe to point current_key to next_key. */
557
 
  current_key = next_key;
558
 
  current_key_len = next_key_len;
559
 
  getTable()->status = 0;
560
 
  return 0;
561
 
}
562
 
 
563
 
int ha_blitz::doEndTableScan() {
564
 
  if (table_scan && current_key)
565
 
    free(current_key);
566
 
  if (table_scan && held_key)
567
 
    free(held_key);
568
 
 
569
 
  current_key = NULL;
570
 
  held_key = NULL;
571
 
  current_key_len = 0;
572
 
  held_key_len = 0;
573
 
  table_scan = false;
574
 
  table_based = false;
575
 
 
576
 
  if (thread_locked)
577
 
    blitz_optimal_unlock();
578
 
 
579
 
  return 0;
580
 
}
581
 
 
582
 
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
583
 
  char *row;
584
 
  char *key = NULL;
585
 
  int key_len, row_len;
586
 
 
587
 
  memcpy(&key_len, pos, sizeof(key_len));
588
 
  key = (char *)(pos + sizeof(key_len));
589
 
 
590
 
  /* TODO: Find a better error type. */
591
 
  if (key == NULL)
592
 
    return HA_ERR_KEY_NOT_FOUND;
593
 
 
594
 
  row = share->dict.get_row(key, key_len, &row_len);
595
 
 
596
 
  if (row == NULL)
597
 
    return HA_ERR_KEY_NOT_FOUND;
598
 
 
599
 
  unpack_row(copy_to, row, row_len);
600
 
 
601
 
  /* Remember the key location on memory if the thread is not doing
602
 
     a table scan. This is because either update_row() or delete_row()
603
 
     might be called after this function. */
604
 
  if (!table_scan) {
605
 
    held_key = key;
606
 
    held_key_len = key_len;
607
 
  }
608
 
 
609
 
  free(row);
610
 
  return 0;
611
 
}
612
 
 
613
 
void ha_blitz::position(const unsigned char *) {
614
 
  int length = sizeof(held_key_len);
615
 
  memcpy(ref, &held_key_len, length);
616
 
  memcpy(ref + length, (unsigned char *)held_key, held_key_len);
617
 
}
618
 
 
619
 
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
620
 
  return "BTREE";
621
 
}
622
 
 
623
 
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
624
 
  active_index = key_num;
625
 
  sql_command_type = session_sql_command(getTable()->getSession());
626
 
 
627
 
  /* This is unlikely to happen but just for assurance, re-obtain
628
 
     the lock if this thread already has a certain lock. This makes
629
 
     sure that this thread will get the most appropriate lock for
630
 
     the current statement. */
631
 
  if (thread_locked)
632
 
    blitz_optimal_unlock();
633
 
 
634
 
  blitz_optimal_lock();
635
 
  return 0;
636
 
}
637
 
 
638
 
int ha_blitz::index_first(unsigned char *buf) {
639
 
  char *dict_key, *bt_key, *row;
640
 
  int dict_klen, bt_klen, prefix_len, rlen;
641
 
 
642
 
  bt_key = btree_cursor[active_index].first_key(&bt_klen);
643
 
 
644
 
  if (bt_key == NULL)
645
 
    return HA_ERR_END_OF_FILE;
646
 
 
647
 
  prefix_len = btree_key_length(bt_key, active_index);
648
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
649
 
 
650
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
651
 
    free(bt_key);
652
 
    return HA_ERR_KEY_NOT_FOUND;
653
 
  }
654
 
 
655
 
  unpack_row(buf, row, rlen);
656
 
  keep_track_of_key(bt_key, bt_klen);
657
 
 
658
 
  free(bt_key);
659
 
  free(row);
660
 
  return 0;
661
 
}
662
 
 
663
 
int ha_blitz::index_next(unsigned char *buf) {
664
 
  char *dict_key, *bt_key, *row;
665
 
  int dict_klen, bt_klen, prefix_len, rlen;
666
 
 
667
 
  bt_key = btree_cursor[active_index].next_key(&bt_klen);
668
 
 
669
 
  if (bt_key == NULL) {
670
 
    getTable()->status = STATUS_NOT_FOUND;
671
 
    return HA_ERR_END_OF_FILE;
672
 
  }
673
 
 
674
 
  prefix_len = btree_key_length(bt_key, active_index);
675
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
676
 
 
677
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
678
 
    free(bt_key);
679
 
    getTable()->status = STATUS_NOT_FOUND;
680
 
    return HA_ERR_KEY_NOT_FOUND;
681
 
  }
682
 
 
683
 
  unpack_row(buf, row, rlen);
684
 
  keep_track_of_key(bt_key, bt_klen);
685
 
 
686
 
  free(bt_key);
687
 
  free(row);
688
 
  return 0;
689
 
}
690
 
 
691
 
int ha_blitz::index_prev(unsigned char *buf) {
692
 
  char *dict_key, *bt_key, *row;
693
 
  int dict_klen, bt_klen, prefix_len, rlen;
694
 
 
695
 
  bt_key = btree_cursor[active_index].prev_key(&bt_klen);
696
 
 
697
 
  if (bt_key == NULL)
698
 
    return HA_ERR_END_OF_FILE;
699
 
 
700
 
  prefix_len = btree_key_length(bt_key, active_index);
701
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
702
 
 
703
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
704
 
    free(bt_key);
705
 
    return HA_ERR_KEY_NOT_FOUND;
706
 
  }
707
 
 
708
 
  unpack_row(buf, row, rlen);
709
 
  keep_track_of_key(bt_key, bt_klen);
710
 
 
711
 
  free(bt_key);
712
 
  free(row);
713
 
  return 0;
714
 
}
715
 
 
716
 
int ha_blitz::index_last(unsigned char *buf) {
717
 
  char *dict_key, *bt_key, *row;
718
 
  int dict_klen, bt_klen, prefix_len, rlen;
719
 
 
720
 
  bt_key = btree_cursor[active_index].final_key(&bt_klen);
721
 
 
722
 
  if (bt_key == NULL)
723
 
    return HA_ERR_KEY_NOT_FOUND;
724
 
 
725
 
  prefix_len = btree_key_length(bt_key, active_index);
726
 
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
727
 
 
728
 
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
729
 
    free(bt_key);
730
 
    errkey_id = active_index;
731
 
    return HA_ERR_KEY_NOT_FOUND;
732
 
  }
733
 
 
734
 
  unpack_row(buf, row, rlen);
735
 
  keep_track_of_key(bt_key, bt_klen);
736
 
 
737
 
  free(bt_key);
738
 
  free(row);
739
 
  return 0;
740
 
}
741
 
 
742
 
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
743
 
                         uint32_t key_len, enum ha_rkey_function find_flag) {
744
 
  return index_read_idx(buf, active_index, key, key_len, find_flag);
745
 
}
746
 
 
747
 
/* This is where the read related index logic lives. It is used by both
748
 
   BlitzDB and the Database Kernel (specifically, by the optimizer). */
749
 
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
750
 
                             const unsigned char *key, uint32_t,
751
 
                             enum ha_rkey_function search_mode) {
752
 
 
753
 
  /* If the provided key is NULL, we are required to return the first
754
 
     row in the active_index. */
755
 
  if (key == NULL)
756
 
    return this->index_first(buf);
757
 
 
758
 
  /* Otherwise we search for it. Prepare the key to look up the tree. */
759
 
  int packed_klen;
760
 
  char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
761
 
 
762
 
  /* Lookup the tree and get the master key. */
763
 
  int unique_klen;
764
 
  char *unique_key;
765
 
 
766
 
  unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
767
 
                                              packed_klen, &unique_klen);
768
 
 
769
 
  if (unique_key == NULL) {
770
 
    errkey_id = key_num;
771
 
    return HA_ERR_KEY_NOT_FOUND;
772
 
  }
773
 
 
774
 
  /* Got the master key. Prepare it to lookup the data dictionary. */
775
 
  int dict_klen;
776
 
  int skip_len = btree_key_length(unique_key, key_num);
777
 
  char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
778
 
 
779
 
  /* Fetch the packed row from the data dictionary. */
780
 
  int row_len;
781
 
  char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
782
 
 
783
 
  if (fetched_row == NULL) {
784
 
    errkey_id = key_num;
785
 
    free(unique_key);
786
 
    return HA_ERR_KEY_NOT_FOUND;
787
 
  }
788
 
 
789
 
  /* Unpack it into Drizzle's return buffer and keep track of the
790
 
     master key for future use (before index_end() is called). */
791
 
  unpack_row(buf, fetched_row, row_len);
792
 
  keep_track_of_key(unique_key, unique_klen);
793
 
 
794
 
  free(unique_key);
795
 
  free(fetched_row);
796
 
  return 0;
797
 
}
798
 
 
799
 
int ha_blitz::doEndIndexScan(void) {
800
 
  held_key = NULL;
801
 
  held_key_len = 0;
802
 
 
803
 
  btree_cursor[active_index].moved = false;
804
 
 
805
 
  if (thread_locked)
806
 
    blitz_optimal_unlock();
807
 
 
808
 
  return 0;
809
 
}
810
 
 
811
 
int ha_blitz::enable_indexes(uint32_t) {
812
 
  return HA_ERR_UNSUPPORTED;
813
 
}
814
 
 
815
 
int ha_blitz::disable_indexes(uint32_t) {
816
 
  return HA_ERR_UNSUPPORTED;
817
 
}
818
 
 
819
 
/* Find the estimated number of rows between min_key and max_key.
820
 
   Leave the proper implementation of this for now since there are
821
 
   too many exceptions to cover. */
822
 
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
823
 
                                   drizzled::key_range * /*min_key*/,
824
 
                                   drizzled::key_range * /*max_key*/) {
825
 
  return BLITZ_WORST_CASE_RANGE;
826
 
}
827
 
 
828
 
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
829
 
  int rv;
830
 
 
831
 
  ha_statistic_increment(&system_status_var::ha_write_count);
832
 
 
833
 
  /* Prepare Auto Increment field if one exists. */
834
 
  if (getTable()->next_number_field && drizzle_row == getTable()->getInsertRecord()) {
835
 
    pthread_mutex_lock(&blitz_utility_mutex);
836
 
    if ((rv = update_auto_increment()) != 0) {
837
 
      pthread_mutex_unlock(&blitz_utility_mutex);
838
 
      return rv;
839
 
    }
840
 
 
841
 
    uint64_t next_val = getTable()->next_number_field->val_int();
842
 
 
843
 
    if (next_val > share->auto_increment_value) {
844
 
      share->auto_increment_value = next_val;
845
 
      stats.auto_increment_value = share->auto_increment_value + 1;
846
 
    }
847
 
    pthread_mutex_unlock(&blitz_utility_mutex);
848
 
  }
849
 
 
850
 
  /* Serialize a primary key for this row. If a PK doesn't exist,
851
 
     an internal hidden ID will be generated. We obtain the PK here
852
 
     and pack it to this function's local buffer instead of the
853
 
     thread's own 'key_buffer' because the PK value needs to be
854
 
     remembered when writing non-PK keys AND because the 'key_buffer'
855
 
     will be used to generate these non-PK keys. */
856
 
  char temp_pkbuf[BLITZ_MAX_KEY_LEN];
857
 
  size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
858
 
 
859
 
  /* Obtain a buffer that can accommodate this row. We then pack
860
 
     the provided row into it. Note that this code works most
861
 
     efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
862
 
  unsigned char *row_buf = get_pack_buffer(max_row_length());
863
 
  size_t row_len = pack_row(row_buf, drizzle_row);
864
 
 
865
 
  uint32_t curr_key = 0;
866
 
  uint32_t lock_id = 0;
867
 
 
868
 
  if (share->nkeys > 0) {
869
 
    lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
870
 
    share->blitz_lock.slotted_lock(lock_id);
871
 
  }
872
 
 
873
 
  /* We isolate this condition outside the key loop to avoid the CPU
874
 
     from going through unnecessary conditional branching on heavy
875
 
     insertion load. TODO: Optimize this block. PK should not need
876
 
     to go through merge_key() since this information is redundant. */
877
 
  if (share->primary_key_exists) {
878
 
    char *key = NULL;
879
 
    size_t klen = 0;
880
 
 
881
 
    key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
882
 
 
883
 
    rv = share->btrees[curr_key].write_unique(key, klen);
884
 
 
885
 
    if (rv == HA_ERR_FOUND_DUPP_KEY) {
886
 
      errkey_id = curr_key;
887
 
      share->blitz_lock.slotted_unlock(lock_id);
888
 
      return rv;
889
 
    }
890
 
    curr_key = 1;
891
 
  }
892
 
 
893
 
  /* Loop over the keys and write them to it's exclusive tree. */
894
 
  while (curr_key < share->nkeys) {
895
 
    char *key = NULL;
896
 
    size_t prefix_len = 0;
897
 
    size_t klen = 0;
898
 
 
899
 
    prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
900
 
    key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
901
 
 
902
 
    if (share->btrees[curr_key].unique) {
903
 
      rv = share->btrees[curr_key].write_unique(key, klen);
904
 
    } else {
905
 
      rv = share->btrees[curr_key].write(key, klen);
906
 
    }
907
 
 
908
 
    if (rv != 0) {
909
 
      errkey_id = curr_key;
910
 
      share->blitz_lock.slotted_unlock(lock_id);
911
 
      return rv;
912
 
    }
913
 
 
914
 
    curr_key++;
915
 
  }
916
 
 
917
 
  /* Write the row to the Data Dictionary. */
918
 
  rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
919
 
 
920
 
  if (share->nkeys > 0)
921
 
    share->blitz_lock.slotted_unlock(lock_id);
922
 
 
923
 
  return rv;
924
 
}
925
 
 
926
 
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
927
 
                             unsigned char *new_row) {
928
 
  int rv;
929
 
  uint32_t lock_id = 0;
930
 
 
931
 
  ha_statistic_increment(&system_status_var::ha_update_count);
932
 
 
933
 
  /* Handle Indexes */
934
 
  if (share->nkeys > 0) {
935
 
    /* BlitzDB cannot update an indexed row on table scan. */
936
 
    if (table_scan)
937
 
      return HA_ERR_UNSUPPORTED;
938
 
 
939
 
    if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
940
 
      return rv;
941
 
 
942
 
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
943
 
    share->blitz_lock.slotted_lock(lock_id);
944
 
 
945
 
    /* Update all relevant index entries. Start by deleting the
946
 
       the existing key then write the new key. Something we should
947
 
       consider in the future is to take a diff of the keys and only
948
 
       update changed keys. */
949
 
    int skip = btree_key_length(held_key, active_index);
950
 
    char *suffix = held_key + skip;
951
 
    uint16_t suffix_len = uint2korr(suffix);
952
 
 
953
 
    suffix += sizeof(suffix_len);
954
 
 
955
 
    for (uint32_t i = 0; i < share->nkeys; i++) {
956
 
      char *key;
957
 
      size_t prefix_len, klen;
958
 
 
959
 
      klen = 0;
960
 
      prefix_len = make_index_key(key_buffer, i, old_row);
961
 
      key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
962
 
 
963
 
      if (share->btrees[i].delete_key(key, klen) != 0) {
964
 
        errkey_id = i;
965
 
        share->blitz_lock.slotted_unlock(lock_id);
966
 
        return HA_ERR_KEY_NOT_FOUND;
967
 
      }
968
 
 
969
 
      /* Now write the new key. */
970
 
      prefix_len = make_index_key(key_buffer, i, new_row);
971
 
 
972
 
      if (i == getTable()->getShare()->getPrimaryKey()) {
973
 
        key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
974
 
        rv = share->btrees[i].write(key, klen);
975
 
      } else {
976
 
        key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
977
 
        rv = share->btrees[i].write(key, klen);
978
 
      }
979
 
 
980
 
      if (rv != 0) {
981
 
        errkey_id = i;
982
 
        share->blitz_lock.slotted_unlock(lock_id);
983
 
        return rv;
984
 
      }
985
 
    }
986
 
  }
987
 
 
988
 
  /* Getting this far means that the index has been successfully
989
 
     updated. We now update the Data Dictionary. This implementation
990
 
     is admittedly far from optimial and will be revisited. */
991
 
  size_t row_len = max_row_length();
992
 
  unsigned char *row_buf = get_pack_buffer(row_len);
993
 
  row_len = pack_row(row_buf, new_row);
994
 
 
995
 
  /* This is a basic case where we can simply overwrite the key. */
996
 
  if (table_based) {
997
 
    rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
998
 
  } else {
999
 
    int klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), old_row);
1000
 
 
1001
 
    /* Delete with the old key. */
1002
 
    share->dict.delete_row(key_buffer, klen);
1003
 
 
1004
 
    /* Write with the new key. */
1005
 
    klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), new_row);
1006
 
    rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
1007
 
  }
1008
 
 
1009
 
  if (share->nkeys > 0)
1010
 
    share->blitz_lock.slotted_unlock(lock_id);
1011
 
 
1012
 
  return rv;
1013
 
}
1014
 
 
1015
 
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
1016
 
  int rv;
1017
 
 
1018
 
  ha_statistic_increment(&system_status_var::ha_delete_count);
1019
 
 
1020
 
  char *dict_key = held_key;
1021
 
  int dict_klen = held_key_len;
1022
 
  uint32_t lock_id = 0;
1023
 
 
1024
 
  if (share->nkeys > 0) {
1025
 
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
1026
 
    share->blitz_lock.slotted_lock(lock_id);
1027
 
 
1028
 
    /* Loop over the indexes and delete all relevant entries for
1029
 
       this row. We do this by reproducing the key in BlitzDB's
1030
 
       unique key format. The procedure is simple.
1031
 
 
1032
 
       (1): Compute the key value for this index from the row then
1033
 
            pack it into key_buffer (not unique at this point).
1034
 
 
1035
 
       (2): Append the suffix of the held_key to the key generated
1036
 
            in step 1. The key is then guaranteed to be unique. */
1037
 
    for (uint32_t i = 0; i < share->nkeys; i++) {
1038
 
      /* In this case, we don't need to search for the key because
1039
 
         TC's cursor is already pointing at the key that we want
1040
 
         to delete. We wouldn't be here otherwise. */
1041
 
      if (i == active_index) {
1042
 
        btree_cursor[active_index].delete_position();
1043
 
        continue;
1044
 
      }
1045
 
 
1046
 
      int klen = make_index_key(key_buffer, i, row_to_delete);
1047
 
      int skip_len = btree_key_length(held_key, active_index);
1048
 
      uint16_t suffix_len = uint2korr(held_key + skip_len);
1049
 
 
1050
 
      /* Append the suffix to the key */
1051
 
      memcpy(key_buffer + klen, held_key + skip_len,
1052
 
             sizeof(suffix_len) + suffix_len);
1053
 
 
1054
 
      /* Update the key length to cover the generated key. */
1055
 
      klen = klen + sizeof(suffix_len) + suffix_len;
1056
 
 
1057
 
      if (share->btrees[i].delete_key(key_buffer, klen) != 0)
1058
 
        return HA_ERR_KEY_NOT_FOUND;
1059
 
    }
1060
 
 
1061
 
    /* Skip to the data dictionary key. */
1062
 
    int dict_key_offset = btree_key_length(dict_key, active_index);
1063
 
    dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
1064
 
  }
1065
 
 
1066
 
  rv = share->dict.delete_row(dict_key, dict_klen);
1067
 
 
1068
 
  if (share->nkeys > 0)
1069
 
    share->blitz_lock.slotted_unlock(lock_id);
1070
 
 
1071
 
  return rv;
1072
 
}
1073
 
 
1074
 
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
1075
 
                                  uint64_t, uint64_t *first_value,
1076
 
                                  uint64_t *nb_reserved_values) {
1077
 
  *first_value = share->auto_increment_value + 1;
1078
 
  *nb_reserved_values = UINT64_MAX;
1079
 
}
1080
 
 
1081
 
int ha_blitz::reset_auto_increment(uint64_t value) {
1082
 
  share->auto_increment_value = (value == 0) ? 1 : value;
1083
 
  return 0;
1084
 
}
1085
 
 
1086
 
int ha_blitz::delete_all_rows(void) {
1087
 
  for (uint32_t i = 0; i < share->nkeys; i++) {
1088
 
    if (share->btrees[i].delete_all() != 0) {
1089
 
      errkey = i;
1090
 
      return HA_ERR_CRASHED_ON_USAGE;
1091
 
    }
1092
 
  }
1093
 
  return (share->dict.delete_all_rows()) ? 0 : -1;
1094
 
}
1095
 
 
1096
 
uint32_t ha_blitz::max_row_length(void) {
1097
 
  uint32_t length = (getTable()->getRecordLength() + getTable()->sizeFields() * 2);
1098
 
  uint32_t *pos = getTable()->getBlobField();
1099
 
  uint32_t *end = pos + getTable()->sizeBlobFields();
1100
 
 
1101
 
  while (pos != end) {
1102
 
    length += 2 + ((Field_blob *)getTable()->getField(*pos))->get_length();
1103
 
    pos++;
1104
 
  }
1105
 
 
1106
 
  return length;
1107
 
}
1108
 
 
1109
 
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
1110
 
  if (!share->primary_key_exists) {
1111
 
    uint64_t next_id = share->dict.next_hidden_row_id();
1112
 
    int8store(pack_to, next_id);
1113
 
    return sizeof(next_id);
1114
 
  }
1115
 
 
1116
 
  /* Getting here means that there is a PK in this table. Get the
1117
 
     binary representation of the PK, pack it to BlitzDB's key buffer
1118
 
     and return the size of it. */
1119
 
  return make_index_key(pack_to, getTable()->getShare()->getPrimaryKey(), row);
1120
 
}
1121
 
 
1122
 
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
1123
 
                                const unsigned char *row) {
1124
 
  KeyInfo *key = &getTable()->key_info[key_num];
1125
 
  KeyPartInfo *key_part = key->key_part;
1126
 
  KeyPartInfo *key_part_end = key_part + key->key_parts;
1127
 
 
1128
 
  unsigned char *pos = (unsigned char *)pack_to;
1129
 
  unsigned char *end;
1130
 
  int offset = 0;
1131
 
 
1132
 
  memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
1133
 
 
1134
 
  /* Loop through key part(s) and pack them as we go. */
1135
 
  for (; key_part != key_part_end; key_part++) {
1136
 
    if (key_part->null_bit) {
1137
 
      if (row[key_part->null_offset] & key_part->null_bit) {
1138
 
        *pos++ = 0;
1139
 
        continue;
1140
 
      }
1141
 
      *pos++ = 1;
1142
 
    }
1143
 
 
1144
 
    /* Here we normalize VARTEXT1 to VARTEXT2 for simplicity. */
1145
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1146
 
      /* Extract the length of the string from the row. */
1147
 
      uint16_t data_len = *(uint8_t *)(row + key_part->offset);
1148
 
 
1149
 
      /* Copy the length of the string. Use 2 bytes. */
1150
 
      int2store(pos, data_len);
1151
 
      pos += sizeof(data_len);
1152
 
 
1153
 
      /* Copy the string data */
1154
 
      memcpy(pos, row + key_part->offset + sizeof(uint8_t), data_len);
1155
 
      pos += data_len;
1156
 
    } else {
1157
 
      end = key_part->field->pack(pos, row + key_part->offset);
1158
 
      offset = end - pos;
1159
 
      pos += offset;
1160
 
    }
1161
 
  }
1162
 
 
1163
 
  return ((char *)pos - pack_to);
1164
 
}
1165
 
 
1166
 
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
1167
 
                          const size_t b_len, size_t *merged_len) {
1168
 
 
1169
 
  size_t total = a_len + sizeof(uint16_t) + b_len;
1170
 
 
1171
 
  if (total > key_merge_buffer_len) {
1172
 
    key_merge_buffer = (char *)realloc(key_merge_buffer, total);
1173
 
 
1174
 
    if (key_merge_buffer == NULL) {
1175
 
      errno = HA_ERR_OUT_OF_MEM;
1176
 
      return NULL;
1177
 
    }
1178
 
    key_merge_buffer_len = total;
1179
 
  }
1180
 
 
1181
 
  char *pos = key_merge_buffer;
1182
 
 
1183
 
  /* Copy the prefix. */
1184
 
  memcpy(pos, a, a_len);
1185
 
  pos += a_len;
1186
 
 
1187
 
  /* Copy the length of b. */
1188
 
  int2store(pos, (uint16_t)b_len);
1189
 
  pos += sizeof(uint16_t);
1190
 
 
1191
 
  /* Copy the suffix and we're done. */
1192
 
  memcpy(pos, b, b_len);
1193
 
 
1194
 
  *merged_len = total;
1195
 
  return key_merge_buffer;
1196
 
}
1197
 
 
1198
 
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
1199
 
  KeyInfo *key_info = &getTable()->key_info[key_num];
1200
 
  KeyPartInfo *key_part = key_info->key_part;
1201
 
  KeyPartInfo *key_part_end = key_part + key_info->key_parts;
1202
 
  char *pos = (char *)key;
1203
 
  uint64_t len = 0;
1204
 
  size_t rv = 0;
1205
 
 
1206
 
  for (; key_part != key_part_end; key_part++) {
1207
 
    if (key_part->null_bit) {
1208
 
      pos++;
1209
 
      rv++;
1210
 
      if (*key == 0)
1211
 
        continue;
1212
 
    }
1213
 
 
1214
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1 ||
1215
 
        key_part->type == HA_KEYTYPE_VARTEXT2) {
1216
 
      len = uint2korr(pos);
1217
 
      rv += len + sizeof(uint16_t);
1218
 
    } else {
1219
 
      len = key_part->field->key_length();
1220
 
      rv += len;
1221
 
    }
1222
 
    pos += len;
1223
 
    len = 0;
1224
 
  }
1225
 
 
1226
 
  return rv;
1227
 
}
1228
 
 
1229
 
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
1230
 
  memcpy(held_key_buf, key, klen);
1231
 
  held_key = held_key_buf;
1232
 
  held_key_len = klen;
1233
 
}
1234
 
 
1235
 
/* Converts a native Drizzle index key to BlitzDB's format. */
1236
 
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
1237
 
                                    const int key_num, int *return_key_len) {
1238
 
  KeyInfo *key = &getTable()->key_info[key_num];
1239
 
  KeyPartInfo *key_part = key->key_part;
1240
 
  KeyPartInfo *key_part_end = key_part + key->key_parts;
1241
 
 
1242
 
  unsigned char *key_pos = (unsigned char *)native_key;
1243
 
  unsigned char *keybuf_pos = (unsigned char *)key_buffer;
1244
 
  unsigned char *end;
1245
 
  int key_size = 0;
1246
 
  int offset = 0;
1247
 
 
1248
 
  memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
1249
 
 
1250
 
  for (; key_part != key_part_end; key_part++) {
1251
 
    if (key_part->null_bit) {
1252
 
      key_size++;
1253
 
 
1254
 
      /* This key is NULL */
1255
 
      if (!(*keybuf_pos++ = (*key_pos++ == 0)))
1256
 
        continue;
1257
 
    }
1258
 
 
1259
 
    /* Normalize a VARTEXT1 key to VARTEXT2. */
1260
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1261
 
      uint16_t str_len = *(uint16_t *)key_pos;
1262
 
 
1263
 
      /* Copy the length of the string over to key buffer. */
1264
 
      int2store(keybuf_pos, str_len);
1265
 
      keybuf_pos += sizeof(str_len);
1266
 
 
1267
 
      /* Copy the actual value over to the key buffer. */
1268
 
      memcpy(keybuf_pos, key_pos + sizeof(str_len), str_len);
1269
 
      keybuf_pos += str_len;
1270
 
 
1271
 
      /* NULL byte + Length of str (2 byte) + Actual String. */
1272
 
      offset = 1 + sizeof(str_len) + str_len;
1273
 
    } else {
1274
 
      end = key_part->field->pack(keybuf_pos, key_pos);
1275
 
      offset = end - keybuf_pos;
1276
 
      keybuf_pos += offset;
1277
 
    }
1278
 
 
1279
 
    key_size += offset;
1280
 
    key_pos += key_part->field->key_length();
1281
 
  }
1282
 
 
1283
 
  *return_key_len = key_size;
1284
 
  return key_buffer;
1285
 
}
1286
 
 
1287
 
size_t ha_blitz::pack_row(unsigned char *row_buffer,
1288
 
                          unsigned char *row_to_pack) {
1289
 
  unsigned char *pos;
1290
 
 
1291
 
  /* Nothing special to do if the table is fixed length */
1292
 
  if (share->fixed_length_table) {
1293
 
    memcpy(row_buffer, row_to_pack, getTable()->getShare()->getRecordLength());
1294
 
    return (size_t)getTable()->getShare()->getRecordLength();
1295
 
  }
1296
 
 
1297
 
  /* Copy NULL bits */
1298
 
  memcpy(row_buffer, row_to_pack, getTable()->getShare()->null_bytes);
1299
 
  pos = row_buffer + getTable()->getShare()->null_bytes;
1300
 
 
1301
 
  /* Pack each field into the buffer */
1302
 
  for (Field **field = getTable()->getFields(); *field; field++) {
1303
 
    if (!((*field)->is_null()))
1304
 
      pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
1305
 
  }
1306
 
 
1307
 
  return (size_t)(pos - row_buffer);
1308
 
}
1309
 
 
1310
 
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
1311
 
                          const size_t from_len) {
1312
 
  const unsigned char *pos;
1313
 
 
1314
 
  /* Nothing special to do */
1315
 
  if (share->fixed_length_table) {
1316
 
    memcpy(to, from, from_len);
1317
 
    return true;
1318
 
  }
1319
 
 
1320
 
  /* Start by copying NULL bits which is the beginning block
1321
 
     of a Drizzle row. */
1322
 
  pos = (const unsigned char *)from;
1323
 
  memcpy(to, pos, getTable()->getShare()->null_bytes);
1324
 
  pos += getTable()->getShare()->null_bytes;
1325
 
 
1326
 
  /* Unpack all fields in the provided row. */
1327
 
  for (Field **field = getTable()->getFields(); *field; field++) {
1328
 
    if (!((*field)->is_null())) {
1329
 
      pos = (*field)->unpack(to + (*field)->offset(getTable()->getInsertRecord()), pos);
1330
 
    }
1331
 
  }
1332
 
 
1333
 
  return true;
1334
 
}
1335
 
 
1336
 
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
1337
 
  unsigned char *buf = pack_buffer;
1338
 
 
1339
 
  /* This is a shitty case where the row size is larger than 2KB. */
1340
 
  if (size > BLITZ_MAX_ROW_STACK) {
1341
 
    if (size > secondary_row_buffer_size) {
1342
 
      void *new_ptr = realloc(secondary_row_buffer, size);
1343
 
 
1344
 
      if (new_ptr == NULL) {
1345
 
        errno = HA_ERR_OUT_OF_MEM;
1346
 
        return NULL;
1347
 
      }
1348
 
 
1349
 
      secondary_row_buffer_size = size;
1350
 
      secondary_row_buffer = (unsigned char *)new_ptr;
1351
 
    }
1352
 
    buf = secondary_row_buffer;
1353
 
  }
1354
 
  return buf;
1355
 
}
1356
 
 
1357
 
static BlitzEngine *blitz_engine = NULL;
1358
 
 
1359
 
BlitzShare *ha_blitz::get_share(const char *name) {
1360
 
  BlitzShare *share_ptr;
1361
 
  BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1362
 
  std::string table_path(name);
1363
 
 
1364
 
  pthread_mutex_lock(&blitz_utility_mutex);
1365
 
 
1366
 
  /* Look up the table cache to see if the table resource is available */
1367
 
  share_ptr = bz_engine->getTableShare(table_path);
1368
 
 
1369
 
  if (share_ptr) {
1370
 
    share_ptr->use_count++;
1371
 
    pthread_mutex_unlock(&blitz_utility_mutex);
1372
 
    return share_ptr;
1373
 
  }
1374
 
 
1375
 
  /* Table wasn't cached so create a new table handler */
1376
 
  share_ptr = new BlitzShare();
1377
 
 
1378
 
  /* Prepare the Data Dictionary */
1379
 
  if (share_ptr->dict.startup(table_path.c_str()) != 0) {
1380
 
    delete share_ptr;
1381
 
    pthread_mutex_unlock(&blitz_utility_mutex);
1382
 
    return NULL;
1383
 
  }
1384
 
 
1385
 
  /* Prepare Index Structure(s) */
1386
 
  KeyInfo *curr = &getTable()->getMutableShare()->getKeyInfo(0);
1387
 
  share_ptr->btrees = new BlitzTree[getTable()->getShare()->keys];
1388
 
 
1389
 
  for (uint32_t i = 0; i < getTable()->getShare()->keys; i++, curr++) {
1390
 
    share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
1391
 
    share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
1392
 
 
1393
 
    if (getTable()->key_info[i].flags & HA_NOSAME)
1394
 
      share_ptr->btrees[i].unique = true;
1395
 
 
1396
 
    share_ptr->btrees[i].length = curr->key_length;
1397
 
    share_ptr->btrees[i].nparts = curr->key_parts;
1398
 
 
1399
 
    /* Record Meta Data of the Key Segments */
1400
 
    for (uint32_t j = 0; j < curr->key_parts; j++) {
1401
 
      Field *f = curr->key_part[j].field;
1402
 
 
1403
 
      if (f->null_ptr) {
1404
 
        share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
1405
 
        share_ptr->btrees[i].parts[j].null_pos
1406
 
          = (uint32_t)(f->null_ptr - (unsigned char *)getTable()->getInsertRecord());
1407
 
      }
1408
 
 
1409
 
      share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
1410
 
 
1411
 
      if (f->type() == DRIZZLE_TYPE_BLOB) {
1412
 
        share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
1413
 
      }
1414
 
 
1415
 
      share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
1416
 
      share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
1417
 
      share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
1418
 
    }
1419
 
  }
1420
 
 
1421
 
  /* Set Meta Data */
1422
 
  share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
1423
 
  share_ptr->table_name = table_path;
1424
 
  share_ptr->nkeys = getTable()->getShare()->keys;
1425
 
  share_ptr->use_count = 1;
1426
 
 
1427
 
  share_ptr->fixed_length_table = !(getTable()->getShare()->db_create_options
1428
 
                                    & HA_OPTION_PACK_RECORD);
1429
 
 
1430
 
  if (getTable()->getShare()->getPrimaryKey() >= MAX_KEY)
1431
 
    share_ptr->primary_key_exists = false;
1432
 
  else
1433
 
    share_ptr->primary_key_exists = true;
1434
 
 
1435
 
  /* Done creating the share object. Cache it for later
1436
 
     use by another cursor object.*/
1437
 
  bz_engine->cacheTableShare(table_path, share_ptr);
1438
 
 
1439
 
  pthread_mutex_unlock(&blitz_utility_mutex);
1440
 
  return share_ptr;
1441
 
}
1442
 
 
1443
 
int ha_blitz::free_share(void) {
1444
 
  pthread_mutex_lock(&blitz_utility_mutex);
1445
 
 
1446
 
  /* BlitzShare could still be used by another thread. Check the
1447
 
     reference counter to see if it's safe to free it */
1448
 
  if (--share->use_count == 0) {
1449
 
    share->dict.write_meta_autoinc(share->auto_increment_value);
1450
 
 
1451
 
    if (share->dict.shutdown() != 0) {
1452
 
      pthread_mutex_unlock(&blitz_utility_mutex);
1453
 
      return HA_ERR_CRASHED_ON_USAGE;
1454
 
    }
1455
 
 
1456
 
    for (uint32_t i = 0; i < share->nkeys; i++) {
1457
 
      delete[] share->btrees[i].parts;
1458
 
      share->btrees[i].close();
1459
 
    }
1460
 
 
1461
 
    BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
1462
 
    bz_engine->deleteTableShare(share->table_name);
1463
 
 
1464
 
    delete[] share->btrees;
1465
 
    delete share;
1466
 
  }
1467
 
 
1468
 
  pthread_mutex_unlock(&blitz_utility_mutex);
1469
 
  return 0;
1470
 
}
1471
 
 
1472
 
static int blitz_init(drizzled::module::Context &context) {
1473
 
  blitz_engine = new BlitzEngine("BLITZDB");
1474
 
 
1475
 
  if (!blitz_engine->doCreateTableCache()) {
1476
 
    delete blitz_engine;
1477
 
    return HA_ERR_OUT_OF_MEM;
1478
 
  }
1479
 
 
1480
 
  pthread_mutex_init(&blitz_utility_mutex, NULL);
1481
 
  context.add(blitz_engine);
1482
 
  context.registerVariable(new sys_var_uint64_t_ptr("estimated-rows",
1483
 
                                                    &blitz_estimated_rows));
1484
 
  return 0;
1485
 
}
1486
 
 
1487
 
/* Read the prototype of this function for details. */
1488
 
static char *skip_btree_key(const char *key, const size_t skip_len,
1489
 
                            int *return_klen) {
1490
 
  char *pos = (char *)key;
1491
 
  *return_klen = uint2korr(pos + skip_len);
1492
 
  return pos + skip_len + sizeof(uint16_t);
1493
 
}
1494
 
 
1495
 
static bool str_is_numeric(const std::string &str) {
1496
 
  for (uint32_t i = 0; i < str.length(); i++) {
1497
 
    if (!std::isdigit(str[i]))
1498
 
      return false;
1499
 
  }
1500
 
  return true;
1501
 
}
1502
 
 
1503
 
static void blitz_init_options(drizzled::module::option_context &context)
1504
 
{
1505
 
  context("estimated-rows",
1506
 
          po::value<uint64_t>(&blitz_estimated_rows)->default_value(0),
1507
 
          N_("Estimated number of rows that a BlitzDB table will store."));
1508
 
}
1509
 
 
1510
 
DRIZZLE_PLUGIN(blitz_init, NULL, blitz_init_options);