~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/blitzdb/ha_blitz.cc

  • Committer: Monty Taylor
  • Date: 2010-06-18 17:03:01 UTC
  • mfrom: (1239.3.124 merger)
  • Revision ID: mordred@inaugust.com-20100618170301-2rr20efiqgi1zdme
Merged in BlitzDB.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 - 2010 Toru Maesaka
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; version 2 of the License.
 
9
 *
 
10
 *  This program is distributed in the hope that it will be useful,
 
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 *  GNU General Public License for more details.
 
14
 *
 
15
 *  You should have received a copy of the GNU General Public License
 
16
 *  along with this program; if not, write to the Free Software
 
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 */
 
19
 
 
20
#include "config.h"
 
21
#include "ha_blitz.h"
 
22
 
 
23
using namespace std;
 
24
using namespace drizzled;
 
25
 
 
26
static pthread_mutex_t blitz_utility_mutex;
 
27
 
 
28
static const char *ha_blitz_exts[] = {
 
29
  BLITZ_DATA_EXT,
 
30
  BLITZ_INDEX_EXT,
 
31
  BLITZ_SYSTEM_EXT,
 
32
  NULL
 
33
};
 
34
 
 
35
class BlitzEngine : public drizzled::plugin::StorageEngine {
 
36
private:
 
37
  TCMAP *blitz_table_cache;
 
38
 
 
39
public:
 
40
  BlitzEngine(const std::string &name_arg) :
 
41
    drizzled::plugin::StorageEngine(name_arg,
 
42
                                    drizzled::HTON_NULL_IN_KEY |
 
43
                                    drizzled::HTON_PRIMARY_KEY_IN_READ_INDEX |
 
44
                                    drizzled::HTON_STATS_RECORDS_IS_EXACT |
 
45
                                    drizzled::HTON_SKIP_STORE_LOCK) {
 
46
    table_definition_ext = BLITZ_SYSTEM_EXT;
 
47
  }
 
48
 
 
49
  virtual ~BlitzEngine() {
 
50
    pthread_mutex_destroy(&blitz_utility_mutex);
 
51
    tcmapdel(blitz_table_cache);
 
52
  }
 
53
 
 
54
  virtual drizzled::Cursor *create(drizzled::TableShare &table,
 
55
                                   drizzled::memory::Root *mem_root) {
 
56
    return new (mem_root) ha_blitz(*this, table);
 
57
  }
 
58
 
 
59
  const char **bas_ext() const {
 
60
    return ha_blitz_exts;
 
61
  }
 
62
 
 
63
  int doCreateTable(drizzled::Session &session,
 
64
                    drizzled::Table &table_arg,
 
65
                    drizzled::TableIdentifier &identifier,
 
66
                    drizzled::message::Table &table_proto);
 
67
 
 
68
  int doRenameTable(drizzled::Session &session,
 
69
                    drizzled::TableIdentifier &from_identifier,
 
70
                    drizzled::TableIdentifier &to_identifier);
 
71
 
 
72
  int doDropTable(drizzled::Session &session,
 
73
                  drizzled::TableIdentifier &identifier);
 
74
 
 
75
  int doGetTableDefinition(drizzled::Session &session,
 
76
                           drizzled::TableIdentifier &identifier,
 
77
                           drizzled::message::Table &table_proto);
 
78
 
 
79
  void doGetTableNames(drizzled::CachedDirectory &directory,
 
80
                       drizzled::SchemaIdentifier &schema_identifier,
 
81
                       std::set<std::string>& set_of_names);
 
82
 
 
83
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
84
                             drizzled::SchemaIdentifier &schema_identifier,
 
85
                             drizzled::TableIdentifiers &set_of_identifiers);
 
86
 
 
87
  bool doDoesTableExist(drizzled::Session &session,
 
88
                        drizzled::TableIdentifier &identifier);
 
89
 
 
90
  bool doCreateTableCache(void);
 
91
 
 
92
  BlitzShare *getTableShare(const std::string &name);
 
93
  void cacheTableShare(const std::string &name, BlitzShare *share);
 
94
  void deleteTableShare(const std::string &name);
 
95
 
 
96
  uint32_t max_supported_keys() const { return BLITZ_MAX_INDEX; }
 
97
  uint32_t max_supported_key_length() const { return BLITZ_MAX_KEY_LEN; }
 
98
  uint32_t max_supported_key_part_length() const { return BLITZ_MAX_KEY_LEN; }
 
99
 
 
100
  uint32_t index_flags(enum drizzled::ha_key_alg) const {
 
101
    return (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
 
102
            HA_READ_RANGE | HA_ONLY_WHOLE_INDEX | HA_KEYREAD_ONLY);
 
103
  }
 
104
};
 
105
 
 
106
/* A key stored in BlitzDB's B+Tree is a byte array that also includes
 
107
   a key to that row in the data dictionary. Two keys are merged and
 
108
   stored as a key because we want to avoid reading the leaf node and
 
109
   thus save disk IO and some computation in the tree. Note that the
 
110
   comparison function of BlitzDB's btree only takes into accound the
 
111
   actual index key. See blitzcmp.cc for details.
 
112
 
 
113
   With the above in mind, this helper function returns a pointer to
 
114
   the dictionary key by calculating the offset. */
 
115
static char *skip_btree_key(const char *key, const size_t skip_len,
 
116
                            int *return_klen);
 
117
 
 
118
int BlitzEngine::doCreateTable(drizzled::Session &,
 
119
                               drizzled::Table &table,
 
120
                               drizzled::TableIdentifier &identifier,
 
121
                               drizzled::message::Table &proto) {
 
122
  BlitzData dict;
 
123
  BlitzTree btree;
 
124
  int ecode;
 
125
 
 
126
  /* Temporary fix for blocking composite keys. We need to add this
 
127
     check because version 1 doesn't handle composite indexes. */
 
128
  for (uint32_t i = 0; i < table.s->keys; i++) {
 
129
    if (table.key_info[i].key_parts > 1)
 
130
      return HA_ERR_UNSUPPORTED;
 
131
  }
 
132
 
 
133
  /* Create relevant files for a new table and close them immediately.
 
134
     All we want to do here is somewhat like UNIX touch(1). */
 
135
  if ((ecode = dict.create_data_table(proto, table, identifier)) != 0)
 
136
    return ecode;
 
137
 
 
138
  if ((ecode = dict.create_system_table(identifier.getPath())) != 0)
 
139
    return ecode;
 
140
 
 
141
  /* Create b+tree index(es) for this table. */
 
142
  for (uint32_t i = 0; i < table.s->keys; i++) {
 
143
    if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
 
144
      return ecode;
 
145
  }
 
146
 
 
147
  /* Write the table definition to system table. */
 
148
  if ((ecode = dict.open_system_table(identifier.getPath(), HDBOWRITER)) != 0)
 
149
    return ecode;
 
150
 
 
151
  if (!dict.write_table_definition(proto)) {
 
152
    dict.close_system_table();
 
153
    return HA_ERR_CRASHED_ON_USAGE;
 
154
  }
 
155
 
 
156
  dict.close_system_table();
 
157
  return 0;
 
158
}
 
159
 
 
160
int BlitzEngine::doRenameTable(drizzled::Session &,
 
161
                               drizzled::TableIdentifier &from,
 
162
                               drizzled::TableIdentifier &to) {
 
163
  int rv = 0;
 
164
 
 
165
  BlitzData blitz_table;
 
166
  uint32_t nkeys;
 
167
 
 
168
  /* Find out the number of indexes in this table. This information
 
169
     is required because BlitzDB creates a file for each indexes.*/
 
170
  if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
 
171
    return HA_ERR_CRASHED_ON_USAGE;
 
172
 
 
173
  nkeys = blitz_table.read_meta_keycount();
 
174
 
 
175
  if (blitz_table.close_data_table() != 0)
 
176
    return HA_ERR_CRASHED_ON_USAGE;
 
177
 
 
178
  /* We're now ready to rename the file(s) for this table. Start by
 
179
     attempting to rename the data and system files. */
 
180
  if (rename_file_ext(from.getPath().c_str(),
 
181
                      to.getPath().c_str(), BLITZ_DATA_EXT)) {
 
182
    if ((rv = errno) != ENOENT)
 
183
      return rv;
 
184
  }
 
185
 
 
186
  if (rename_file_ext(from.getPath().c_str(),
 
187
                      to.getPath().c_str(), BLITZ_SYSTEM_EXT)) {
 
188
    if ((rv = errno) != ENOENT)
 
189
      return rv;
 
190
  }
 
191
 
 
192
  /* So far so good. Rename the index file(s) and we're done. */
 
193
  BlitzTree btree;
 
194
 
 
195
  for (uint32_t i = 0; i < nkeys; i++) {
 
196
    if (btree.rename(from.getPath().c_str(), to.getPath().c_str(), i) != 0)
 
197
      return HA_ERR_CRASHED_ON_USAGE;
 
198
  }
 
199
 
 
200
  return rv;
 
201
}
 
202
 
 
203
int BlitzEngine::doDropTable(drizzled::Session &,
 
204
                             drizzled::TableIdentifier &identifier) {
 
205
  BlitzData dict;
 
206
  BlitzTree btree;
 
207
  char buf[FN_REFLEN];
 
208
  uint32_t nkeys;
 
209
  int err;
 
210
 
 
211
  /* We open the dictionary to extract meta data from it */
 
212
  if ((err = dict.open_data_table(identifier.getPath().c_str(),
 
213
                                  HDBOREADER)) != 0) {
 
214
    return err;
 
215
  }
 
216
 
 
217
  nkeys = dict.read_meta_keycount();
 
218
 
 
219
  /* We no longer need the dictionary to be open */
 
220
  dict.close_data_table();
 
221
 
 
222
  /* Drop the Data Dictionary */
 
223
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_DATA_EXT);
 
224
  if ((err = unlink(buf)) == -1) {
 
225
    return err;
 
226
  }
 
227
 
 
228
  /* Drop the System Table */
 
229
  snprintf(buf, FN_REFLEN, "%s%s", identifier.getPath().c_str(), BLITZ_SYSTEM_EXT);
 
230
  if ((err = unlink(buf)) == -1) {
 
231
    return err;
 
232
  }
 
233
 
 
234
  /* Drop Index file(s) */
 
235
  for (uint32_t i = 0; i < nkeys; i++) {
 
236
    if ((err = btree.drop(identifier.getPath().c_str(), i)) != 0) {
 
237
      return err;
 
238
    }
 
239
  }
 
240
 
 
241
  return 0;
 
242
}
 
243
 
 
244
int BlitzEngine::doGetTableDefinition(drizzled::Session &,
 
245
                                      drizzled::TableIdentifier &identifier,
 
246
                                      drizzled::message::Table &proto) {
 
247
  struct stat stat_info;
 
248
  std::string path(identifier.getPath());
 
249
 
 
250
  path.append(BLITZ_SYSTEM_EXT);
 
251
 
 
252
  if (stat(path.c_str(), &stat_info)) {
 
253
    return errno;
 
254
  }
 
255
 
 
256
  BlitzData db;
 
257
  char *proto_string;
 
258
  int proto_string_len;
 
259
 
 
260
  if (db.open_system_table(identifier.getPath(), HDBOREADER) != 0) {
 
261
    return HA_ERR_CRASHED_ON_USAGE;
 
262
  }
 
263
 
 
264
  proto_string = db.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
 
265
                                     BLITZ_TABLE_PROTO_KEY.length(),
 
266
                                     &proto_string_len);
 
267
 
 
268
  if (db.close_system_table() != 0) {
 
269
    return HA_ERR_CRASHED_ON_USAGE;
 
270
  }
 
271
 
 
272
  if (proto_string == NULL) {
 
273
    return ENOMEM;
 
274
  }
 
275
 
 
276
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
 
277
    free(proto_string);
 
278
    return HA_ERR_CRASHED_ON_USAGE;
 
279
  }
 
280
 
 
281
  free(proto_string);
 
282
 
 
283
  return EEXIST;
 
284
}
 
285
 
 
286
void BlitzEngine::doGetTableNames(drizzled::CachedDirectory &directory,
 
287
                                  drizzled::SchemaIdentifier &,
 
288
                                  std::set<string> &set_of_names) {
 
289
  drizzled::CachedDirectory::Entries entries = directory.getEntries();
 
290
 
 
291
  for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
 
292
       entry_iter != entries.end(); ++entry_iter) {
 
293
 
 
294
    drizzled::CachedDirectory::Entry *entry = *entry_iter;
 
295
    std::string *filename = &entry->filename;
 
296
 
 
297
    assert(filename->size());
 
298
 
 
299
    const char *ext = strchr(filename->c_str(), '.');
 
300
 
 
301
    if (ext != NULL) {
 
302
      char uname[NAME_LEN + 1];
 
303
      uint32_t file_name_len;
 
304
 
 
305
      file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
 
306
                                                             uname,
 
307
                                                             sizeof(uname));
 
308
 
 
309
      uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
 
310
      set_of_names.insert(uname);
 
311
    }
 
312
  }
 
313
}
 
314
 
 
315
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
 
316
                                        drizzled::SchemaIdentifier &schema_id,
 
317
                                        drizzled::TableIdentifiers &ids) {
 
318
  drizzled::CachedDirectory::Entries entries = directory.getEntries();
 
319
 
 
320
  for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
 
321
       entry_iter != entries.end(); ++entry_iter) {
 
322
 
 
323
    drizzled::CachedDirectory::Entry *entry = *entry_iter;
 
324
    const std::string *filename = &entry->filename;
 
325
 
 
326
    assert(filename->size());
 
327
 
 
328
    const char *ext = strchr(filename->c_str(), '.');
 
329
 
 
330
    if (ext == NULL || my_strcasecmp(system_charset_info, ext, BLITZ_SYSTEM_EXT) ||
 
331
        (filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0)) {
 
332
    } else {
 
333
      char uname[NAME_LEN + 1];
 
334
      uint32_t file_name_len;
 
335
 
 
336
      file_name_len = TableIdentifier::filename_to_tablename(filename->c_str(),
 
337
                                                             uname,
 
338
                                                             sizeof(uname));
 
339
 
 
340
      uname[file_name_len - sizeof(BLITZ_DATA_EXT) + 1]= '\0';
 
341
      ids.push_back(TableIdentifier(schema_id, uname));
 
342
    }
 
343
  }
 
344
}
 
345
 
 
346
bool BlitzEngine::doDoesTableExist(drizzled::Session &,
 
347
                                   drizzled::TableIdentifier &identifier) {
 
348
  std::string proto_path(identifier.getPath());
 
349
  proto_path.append(BLITZ_DATA_EXT);
 
350
 
 
351
  return (access(proto_path.c_str(), F_OK)) ? false : true;
 
352
}
 
353
 
 
354
bool BlitzEngine::doCreateTableCache(void) {
 
355
  return ((blitz_table_cache = tcmapnew()) == NULL) ? false : true;
 
356
}
 
357
 
 
358
BlitzShare *BlitzEngine::getTableShare(const std::string &table_name) {
 
359
  int vlen;
 
360
  const void *fetched;
 
361
  BlitzShare *rv = NULL;
 
362
 
 
363
  fetched = tcmapget(blitz_table_cache, table_name.c_str(),
 
364
                     table_name.length(), &vlen);
 
365
 
 
366
  /* dereference the object */
 
367
  if (fetched)
 
368
    rv = *(BlitzShare **)fetched;
 
369
 
 
370
  return rv;
 
371
}
 
372
 
 
373
void BlitzEngine::cacheTableShare(const std::string &table_name,
 
374
                                  BlitzShare *share) {
 
375
  /* Cache the memory address of the share object */
 
376
  tcmapput(blitz_table_cache, table_name.c_str(), table_name.length(),
 
377
           &share, sizeof(share));
 
378
}
 
379
 
 
380
void BlitzEngine::deleteTableShare(const std::string &table_name) {
 
381
  tcmapout2(blitz_table_cache, table_name.c_str());
 
382
}
 
383
 
 
384
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
 
385
                   TableShare &table_arg) : Cursor(engine_arg, table_arg),
 
386
                                            btree_cursor(NULL),
 
387
                                            table_scan(false),
 
388
                                            table_based(false),
 
389
                                            thread_locked(false),
 
390
                                            held_key(NULL),
 
391
                                            held_key_len(0),
 
392
                                            current_key(NULL),
 
393
                                            current_key_len(0),
 
394
                                            key_buffer(NULL),
 
395
                                            errkey_id(0) {}
 
396
 
 
397
int ha_blitz::open(const char *table_name, int, uint32_t) {
 
398
  if ((share = get_share(table_name)) == NULL)
 
399
    return HA_ERR_CRASHED_ON_USAGE;
 
400
 
 
401
  pthread_mutex_lock(&blitz_utility_mutex);
 
402
 
 
403
  btree_cursor = new BlitzCursor[share->nkeys];
 
404
 
 
405
  for (uint32_t i = 0; i < share->nkeys; i++) {
 
406
    if (!share->btrees[i].create_cursor(&btree_cursor[i])) {
 
407
      free_share();
 
408
      pthread_mutex_unlock(&blitz_utility_mutex);
 
409
      return HA_ERR_OUT_OF_MEM;
 
410
    }
 
411
  }
 
412
 
 
413
  if ((key_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
 
414
    free_share();
 
415
    pthread_mutex_unlock(&blitz_utility_mutex);
 
416
    return HA_ERR_OUT_OF_MEM;
 
417
  }
 
418
 
 
419
  if ((key_merge_buffer = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
 
420
    free_share();
 
421
    pthread_mutex_unlock(&blitz_utility_mutex);
 
422
    return HA_ERR_OUT_OF_MEM;
 
423
  }
 
424
 
 
425
  if ((held_key_buf = (char *)malloc(BLITZ_MAX_KEY_LEN)) == NULL) {
 
426
    free_share();
 
427
    free(key_buffer);
 
428
    free(key_merge_buffer);
 
429
    pthread_mutex_unlock(&blitz_utility_mutex);
 
430
    return HA_ERR_OUT_OF_MEM;
 
431
  }
 
432
 
 
433
  secondary_row_buffer = NULL;
 
434
  secondary_row_buffer_size = 0;
 
435
  key_merge_buffer_len = BLITZ_MAX_KEY_LEN;
 
436
 
 
437
  /* 'ref_length' determines the size of the buffer that the kernel
 
438
     will use to uniquely identify a row. The actual allocation is
 
439
     done by the kernel so all we do here is specify the size of it.*/
 
440
  if (share->primary_key_exists) {
 
441
    ref_length = table->key_info[table->s->getPrimaryKey()].key_length;
 
442
  } else {
 
443
    ref_length = sizeof(held_key_len) + sizeof(uint64_t);
 
444
  }
 
445
 
 
446
  pthread_mutex_unlock(&blitz_utility_mutex);
 
447
  return 0;
 
448
}
 
449
 
 
450
int ha_blitz::close(void) {
 
451
  for (uint32_t i = 0; i < share->nkeys; i++) {
 
452
    share->btrees[i].destroy_cursor(&btree_cursor[i]);
 
453
  }
 
454
  delete [] btree_cursor;
 
455
 
 
456
  free(key_buffer);
 
457
  free(key_merge_buffer);
 
458
  free(held_key_buf);
 
459
  free(secondary_row_buffer);
 
460
  return free_share();
 
461
}
 
462
 
 
463
int ha_blitz::info(uint32_t flag) {
 
464
  if (flag & HA_STATUS_VARIABLE) {
 
465
    stats.records = share->dict.nrecords();
 
466
    stats.data_file_length = share->dict.table_size();
 
467
  }
 
468
 
 
469
  if (flag & HA_STATUS_AUTO)
 
470
    stats.auto_increment_value = share->auto_increment_value + 1;
 
471
 
 
472
  if (flag & HA_STATUS_ERRKEY)
 
473
    errkey = errkey_id;
 
474
 
 
475
  return 0;
 
476
}
 
477
 
 
478
int ha_blitz::doStartTableScan(bool scan) {
 
479
  /* Obtain the query type for this scan */
 
480
  sql_command_type = session_sql_command(current_session);
 
481
  table_scan = scan;
 
482
  table_based = true;
 
483
 
 
484
  /* Obtain the most suitable lock for the given statement type. */
 
485
  critical_section_enter();
 
486
 
 
487
  /* Get the first record from TCHDB. Let the scanner take
 
488
     care of checking return value errors. */
 
489
  if (table_scan) {
 
490
    current_key = share->dict.next_key_and_row(NULL, 0,
 
491
                                               &current_key_len,
 
492
                                               &current_row,
 
493
                                               &current_row_len);
 
494
  }
 
495
  return 0;
 
496
}
 
497
 
 
498
int ha_blitz::rnd_next(unsigned char *drizzle_buf) {
 
499
  char *next_key;
 
500
  const char *next_row;
 
501
  int next_row_len;
 
502
  int next_key_len;
 
503
 
 
504
  free(held_key);
 
505
  held_key = NULL;
 
506
 
 
507
  if (current_key == NULL) {
 
508
    table->status = STATUS_NOT_FOUND;
 
509
    return HA_ERR_END_OF_FILE;
 
510
  }
 
511
 
 
512
  ha_statistic_increment(&system_status_var::ha_read_rnd_next_count);
 
513
 
 
514
  /* Unpack and copy the current row to Drizzle's result buffer. */
 
515
  unpack_row(drizzle_buf, current_row, current_row_len);
 
516
 
 
517
  /* Retrieve both key and row of the next record with one allocation. */
 
518
  next_key = share->dict.next_key_and_row(current_key, current_key_len,
 
519
                                          &next_key_len, &next_row,
 
520
                                          &next_row_len);
 
521
 
 
522
  /* Memory region for "current_row" will be freed as "held key" on
 
523
     the next iteration. This is because "current_key" points to the
 
524
     region of memory that contains "current_row" and "held_key" points
 
525
     to it. If there isn't another iteration then it is freed in doEndTableScan(). */
 
526
  current_row = next_row;
 
527
  current_row_len = next_row_len;
 
528
 
 
529
  /* Remember the current row because delete, update or replace
 
530
     function could be called after this function. This pointer is
 
531
     also used to free the previous key and row, which resides on
 
532
     the same buffer. */
 
533
  held_key = current_key;
 
534
  held_key_len = current_key_len;
 
535
 
 
536
  /* It is now memory-leak-safe to point current_key to next_key. */
 
537
  current_key = next_key;
 
538
  current_key_len = next_key_len;
 
539
  table->status = 0;
 
540
  return 0;
 
541
}
 
542
 
 
543
int ha_blitz::doEndTableScan() {
 
544
  if (table_scan && current_key)
 
545
    free(current_key);
 
546
  if (table_scan && held_key)
 
547
    free(held_key);
 
548
 
 
549
  current_key = NULL;
 
550
  held_key = NULL;
 
551
  current_key_len = 0;
 
552
  held_key_len = 0;
 
553
  table_scan = false;
 
554
  table_based = false;
 
555
 
 
556
  if (thread_locked)
 
557
    critical_section_exit();
 
558
 
 
559
  return 0;
 
560
}
 
561
 
 
562
int ha_blitz::rnd_pos(unsigned char *copy_to, unsigned char *pos) {
 
563
  char *row;
 
564
  char *key = NULL;
 
565
  int key_len, row_len;
 
566
 
 
567
  memcpy(&key_len, pos, sizeof(key_len));
 
568
  key = (char *)(pos + sizeof(key_len));
 
569
 
 
570
  /* TODO: Find a better error type. */
 
571
  if (key == NULL)
 
572
    return HA_ERR_KEY_NOT_FOUND;
 
573
 
 
574
  row = share->dict.get_row(key, key_len, &row_len);
 
575
 
 
576
  if (row == NULL)
 
577
    return HA_ERR_KEY_NOT_FOUND;
 
578
 
 
579
  unpack_row(copy_to, row, row_len);
 
580
 
 
581
  /* Remember the key location on memory if the thread is not doing
 
582
     a table scan. This is because either update_row() or delete_row()
 
583
     might be called after this function. */
 
584
  if (!table_scan) {
 
585
    held_key = key;
 
586
    held_key_len = key_len;
 
587
  }
 
588
 
 
589
  free(row);
 
590
  return 0;
 
591
}
 
592
 
 
593
void ha_blitz::position(const unsigned char *) {
 
594
  int length = sizeof(held_key_len);
 
595
  memcpy(ref, &held_key_len, length);
 
596
  memcpy(ref + length, (unsigned char *)held_key, held_key_len);
 
597
}
 
598
 
 
599
const char *ha_blitz::index_type(uint32_t /*key_num*/) {
 
600
  return "BTREE";
 
601
}
 
602
 
 
603
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
 
604
  active_index = key_num;
 
605
  sql_command_type = session_sql_command(current_session);
 
606
 
 
607
  /* This is unlikely to happen but just for assurance, re-obtain
 
608
     the lock if this thread already has a certain lock. This makes
 
609
     sure that this thread will get the most appropriate lock for
 
610
     the current statement. */
 
611
  if (thread_locked)
 
612
    critical_section_exit();
 
613
 
 
614
  critical_section_enter();
 
615
  return 0;
 
616
}
 
617
 
 
618
int ha_blitz::index_first(unsigned char *buf) {
 
619
  char *dict_key, *bt_key, *row;
 
620
  int dict_klen, bt_klen, prefix_len, rlen;
 
621
 
 
622
  bt_key = btree_cursor[active_index].first_key(&bt_klen);
 
623
 
 
624
  if (bt_key == NULL)
 
625
    return HA_ERR_END_OF_FILE;
 
626
 
 
627
  prefix_len = btree_key_length(bt_key, active_index);
 
628
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
629
 
 
630
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
631
    free(bt_key);
 
632
    return HA_ERR_KEY_NOT_FOUND;
 
633
  }
 
634
 
 
635
  unpack_row(buf, row, rlen);
 
636
  keep_track_of_key(bt_key, bt_klen);
 
637
 
 
638
  free(bt_key);
 
639
  free(row);
 
640
  return 0;
 
641
}
 
642
 
 
643
int ha_blitz::index_next(unsigned char *buf) {
 
644
  char *dict_key, *bt_key, *row;
 
645
  int dict_klen, bt_klen, prefix_len, rlen;
 
646
 
 
647
  bt_key = btree_cursor[active_index].next_key(&bt_klen);
 
648
 
 
649
  if (bt_key == NULL) {
 
650
    table->status = STATUS_NOT_FOUND;
 
651
    return HA_ERR_END_OF_FILE;
 
652
  }
 
653
 
 
654
  prefix_len = btree_key_length(bt_key, active_index);
 
655
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
656
 
 
657
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
658
    free(bt_key);
 
659
    table->status = STATUS_NOT_FOUND;
 
660
    return HA_ERR_KEY_NOT_FOUND;
 
661
  }
 
662
 
 
663
  unpack_row(buf, row, rlen);
 
664
  keep_track_of_key(bt_key, bt_klen);
 
665
 
 
666
  free(bt_key);
 
667
  free(row);
 
668
  return 0;
 
669
}
 
670
 
 
671
int ha_blitz::index_prev(unsigned char *buf) {
 
672
  char *dict_key, *bt_key, *row;
 
673
  int dict_klen, bt_klen, prefix_len, rlen;
 
674
 
 
675
  bt_key = btree_cursor[active_index].prev_key(&bt_klen);
 
676
 
 
677
  if (bt_key == NULL)
 
678
    return HA_ERR_END_OF_FILE;
 
679
 
 
680
  prefix_len = btree_key_length(bt_key, active_index);
 
681
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
682
 
 
683
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
684
    free(bt_key);
 
685
    return HA_ERR_KEY_NOT_FOUND;
 
686
  }
 
687
 
 
688
  unpack_row(buf, row, rlen);
 
689
  keep_track_of_key(bt_key, bt_klen);
 
690
 
 
691
  free(bt_key);
 
692
  free(row);
 
693
  return 0;
 
694
}
 
695
 
 
696
int ha_blitz::index_last(unsigned char *buf) {
 
697
  char *dict_key, *bt_key, *row;
 
698
  int dict_klen, bt_klen, prefix_len, rlen;
 
699
 
 
700
  bt_key = btree_cursor[active_index].final_key(&bt_klen);
 
701
 
 
702
  if (bt_key == NULL)
 
703
    return HA_ERR_KEY_NOT_FOUND;
 
704
 
 
705
  prefix_len = btree_key_length(bt_key, active_index);
 
706
  dict_key = skip_btree_key(bt_key, prefix_len, &dict_klen);
 
707
 
 
708
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
 
709
    free(bt_key);
 
710
    errkey_id = active_index;
 
711
    return HA_ERR_KEY_NOT_FOUND;
 
712
  }
 
713
 
 
714
  unpack_row(buf, row, rlen);
 
715
  keep_track_of_key(bt_key, bt_klen);
 
716
 
 
717
  free(bt_key);
 
718
  free(row);
 
719
  return 0;
 
720
}
 
721
 
 
722
int ha_blitz::index_read(unsigned char *buf, const unsigned char *key,
 
723
                         uint32_t key_len, enum ha_rkey_function find_flag) {
 
724
  return index_read_idx(buf, active_index, key, key_len, find_flag);
 
725
}
 
726
 
 
727
/* This is where the read related index logic lives. It is used by both
 
728
   BlitzDB and the Database Kernel (specifically, by the optimizer). */
 
729
int ha_blitz::index_read_idx(unsigned char *buf, uint32_t key_num,
 
730
                             const unsigned char *key, uint32_t,
 
731
                             enum ha_rkey_function search_mode) {
 
732
 
 
733
  /* If the provided key is NULL, we are required to return the first
 
734
     row in the active_index. */
 
735
  if (key == NULL)
 
736
    return this->index_first(buf);
 
737
 
 
738
  /* Otherwise we search for it. Prepare the key to look up the tree. */
 
739
  int packed_klen;
 
740
  char *packed_key = native_to_blitz_key(key, key_num, &packed_klen);
 
741
 
 
742
  /* Lookup the tree and get the master key. */
 
743
  int unique_klen;
 
744
  char *unique_key;
 
745
 
 
746
  unique_key = btree_cursor[key_num].find_key(search_mode, packed_key,
 
747
                                              packed_klen, &unique_klen);
 
748
 
 
749
  if (unique_key == NULL) {
 
750
    errkey_id = key_num;
 
751
    return HA_ERR_KEY_NOT_FOUND;
 
752
  }
 
753
 
 
754
  /* Got the master key. Prepare it to lookup the data dictionary. */
 
755
  int dict_klen;
 
756
  int skip_len = btree_key_length(unique_key, key_num);
 
757
  char *dict_key = skip_btree_key(unique_key, skip_len, &dict_klen);
 
758
 
 
759
  /* Fetch the packed row from the data dictionary. */
 
760
  int row_len;
 
761
  char *fetched_row = share->dict.get_row(dict_key, dict_klen, &row_len);
 
762
 
 
763
  if (fetched_row == NULL) {
 
764
    errkey_id = key_num;
 
765
    free(unique_key);
 
766
    return HA_ERR_KEY_NOT_FOUND;
 
767
  }
 
768
 
 
769
  /* Unpack it into Drizzle's return buffer and keep track of the
 
770
     master key for future use (before index_end() is called). */
 
771
  unpack_row(buf, fetched_row, row_len);
 
772
  keep_track_of_key(unique_key, unique_klen);
 
773
 
 
774
  free(unique_key);
 
775
  free(fetched_row);
 
776
  return 0;
 
777
}
 
778
 
 
779
int ha_blitz::doEndIndexScan(void) {
 
780
  held_key = NULL;
 
781
  held_key_len = 0;
 
782
 
 
783
  btree_cursor[active_index].moved = false;
 
784
 
 
785
  if (thread_locked)
 
786
    critical_section_exit();
 
787
 
 
788
  return 0;
 
789
}
 
790
 
 
791
int ha_blitz::enable_indexes(uint32_t) {
 
792
  return HA_ERR_UNSUPPORTED;
 
793
}
 
794
 
 
795
int ha_blitz::disable_indexes(uint32_t) {
 
796
  return HA_ERR_UNSUPPORTED;
 
797
}
 
798
 
 
799
/* Find the estimated number of rows between min_key and max_key.
 
800
   Leave the proper implementation of this for now since there are
 
801
   too many exceptions to cover. */
 
802
ha_rows ha_blitz::records_in_range(uint32_t /*key_num*/,
 
803
                                   drizzled::key_range * /*min_key*/,
 
804
                                   drizzled::key_range * /*max_key*/) {
 
805
  return BLITZ_WORST_CASE_RANGE;
 
806
}
 
807
 
 
808
int ha_blitz::doInsertRecord(unsigned char *drizzle_row) {
 
809
  int rv;
 
810
 
 
811
  ha_statistic_increment(&system_status_var::ha_write_count);
 
812
 
 
813
  /* Prepare Auto Increment field if one exists. */
 
814
  if (table->next_number_field && drizzle_row == table->record[0]) {
 
815
    pthread_mutex_lock(&blitz_utility_mutex);
 
816
    if ((rv = update_auto_increment()) != 0) {
 
817
      pthread_mutex_unlock(&blitz_utility_mutex);
 
818
      return rv;
 
819
    }
 
820
 
 
821
    uint64_t next_val = table->next_number_field->val_int();
 
822
 
 
823
    if (next_val > share->auto_increment_value) {
 
824
      share->auto_increment_value = next_val;
 
825
      stats.auto_increment_value = share->auto_increment_value + 1;
 
826
    }
 
827
    pthread_mutex_unlock(&blitz_utility_mutex);
 
828
  }
 
829
 
 
830
  /* Serialize a primary key for this row. If a PK doesn't exist,
 
831
     an internal hidden ID will be generated. We obtain the PK here
 
832
     and pack it to this function's local buffer instead of the
 
833
     thread's own 'key_buffer' because the PK value needs to be
 
834
     remembered when writing non-PK keys AND because the 'key_buffer'
 
835
     will be used to generate these non-PK keys. */
 
836
  char temp_pkbuf[BLITZ_MAX_KEY_LEN];
 
837
  size_t pk_len = make_primary_key(temp_pkbuf, drizzle_row);
 
838
 
 
839
  /* Obtain a buffer that can accommodate this row. We then pack
 
840
     the provided row into it. Note that this code works most
 
841
     efficiently for rows smaller than BLITZ_MAX_ROW_STACK */
 
842
  unsigned char *row_buf = get_pack_buffer(max_row_length());
 
843
  size_t row_len = pack_row(row_buf, drizzle_row);
 
844
 
 
845
  uint32_t curr_key = 0;
 
846
  uint32_t lock_id = 0;
 
847
 
 
848
  if (share->nkeys > 0) {
 
849
    lock_id = share->blitz_lock.slot_id(temp_pkbuf, pk_len);
 
850
    share->blitz_lock.slotted_lock(lock_id);
 
851
  }
 
852
 
 
853
  /* We isolate this condition outside the key loop to avoid the CPU
 
854
     from going through unnecessary conditional branching on heavy
 
855
     insertion load. TODO: Optimize this block. PK should not need
 
856
     to go through merge_key() since this information is redundant. */
 
857
  if (share->primary_key_exists) {
 
858
    char *key = NULL;
 
859
    size_t klen = 0;
 
860
 
 
861
    key = merge_key(temp_pkbuf, pk_len, temp_pkbuf, pk_len, &klen);
 
862
 
 
863
    rv = share->btrees[curr_key].write_unique(key, klen);
 
864
 
 
865
    if (rv == HA_ERR_FOUND_DUPP_KEY) {
 
866
      errkey_id = curr_key;
 
867
      share->blitz_lock.slotted_unlock(lock_id);
 
868
      return rv;
 
869
    }
 
870
    curr_key = 1;
 
871
  }
 
872
 
 
873
  /* Loop over the keys and write them to it's exclusive tree. */
 
874
  while (curr_key < share->nkeys) {
 
875
    char *key = NULL;
 
876
    size_t prefix_len = 0;
 
877
    size_t klen = 0;
 
878
 
 
879
    prefix_len = make_index_key(key_buffer, curr_key, drizzle_row);
 
880
    key = merge_key(key_buffer, prefix_len, temp_pkbuf, pk_len, &klen);
 
881
 
 
882
    if (share->btrees[curr_key].unique) {
 
883
      rv = share->btrees[curr_key].write_unique(key, klen);
 
884
    } else {
 
885
      rv = share->btrees[curr_key].write(key, klen);
 
886
    }
 
887
 
 
888
    if (rv != 0) {
 
889
      errkey_id = curr_key;
 
890
      share->blitz_lock.slotted_unlock(lock_id);
 
891
      return rv;
 
892
    }
 
893
 
 
894
    curr_key++;
 
895
  }
 
896
 
 
897
  /* Write the row to the Data Dictionary. */
 
898
  rv = share->dict.write_row(temp_pkbuf, pk_len, row_buf, row_len);
 
899
 
 
900
  if (share->nkeys > 0)
 
901
    share->blitz_lock.slotted_unlock(lock_id);
 
902
 
 
903
  return rv;
 
904
}
 
905
 
 
906
int ha_blitz::doUpdateRecord(const unsigned char *old_row,
 
907
                             unsigned char *new_row) {
 
908
  int rv;
 
909
  uint32_t lock_id = 0;
 
910
 
 
911
  ha_statistic_increment(&system_status_var::ha_update_count);
 
912
 
 
913
  /* Handle Indexes */
 
914
  if (share->nkeys > 0) {
 
915
    /* BlitzDB cannot update an indexed row on table scan. */
 
916
    if (table_scan)
 
917
      return HA_ERR_UNSUPPORTED;
 
918
 
 
919
    if ((rv = compare_rows_for_unique_violation(old_row, new_row)) != 0)
 
920
      return rv;
 
921
 
 
922
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
 
923
    share->blitz_lock.slotted_lock(lock_id);
 
924
 
 
925
    /* Update all relevant index entries. Start by deleting the
 
926
       the existing key then write the new key. Something we should
 
927
       consider in the future is to take a diff of the keys and only
 
928
       update changed keys. */
 
929
    int skip = btree_key_length(held_key, active_index);
 
930
    char *suffix = held_key + skip;
 
931
    uint16_t suffix_len = uint2korr(suffix);
 
932
 
 
933
    suffix += sizeof(suffix_len);
 
934
 
 
935
    for (uint32_t i = 0; i < share->nkeys; i++) {
 
936
      char *key;
 
937
      size_t prefix_len, klen;
 
938
 
 
939
      klen = 0;
 
940
      prefix_len = make_index_key(key_buffer, i, old_row);
 
941
      key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
 
942
 
 
943
      if (share->btrees[i].delete_key(key, klen) != 0) {
 
944
        errkey_id = i;
 
945
        share->blitz_lock.slotted_unlock(lock_id);
 
946
        return HA_ERR_KEY_NOT_FOUND;
 
947
      }
 
948
 
 
949
      /* Now write the new key. */
 
950
      prefix_len = make_index_key(key_buffer, i, new_row);
 
951
 
 
952
      if (i == table->s->getPrimaryKey()) {
 
953
        key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
 
954
        rv = share->btrees[i].write(key, klen);
 
955
      } else {
 
956
        key = merge_key(key_buffer, prefix_len, suffix, suffix_len, &klen);
 
957
        rv = share->btrees[i].write(key, klen);
 
958
      }
 
959
 
 
960
      if (rv != 0) {
 
961
        errkey_id = i;
 
962
        share->blitz_lock.slotted_unlock(lock_id);
 
963
        return rv;
 
964
      }
 
965
    }
 
966
  }
 
967
 
 
968
  /* Getting this far means that the index has been successfully
 
969
     updated. We now update the Data Dictionary. This implementation
 
970
     is admittedly far from optimial and will be revisited. */
 
971
  size_t row_len = max_row_length();
 
972
  unsigned char *row_buf = get_pack_buffer(row_len);
 
973
  row_len = pack_row(row_buf, new_row);
 
974
 
 
975
  /* This is a basic case where we can simply overwrite the key. */
 
976
  if (table_based) {
 
977
    rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
 
978
  } else {
 
979
    int klen = make_index_key(key_buffer, table->s->getPrimaryKey(), old_row);
 
980
 
 
981
    /* Delete with the old key. */
 
982
    share->dict.delete_row(key_buffer, klen);
 
983
 
 
984
    /* Write with the new key. */
 
985
    klen = make_index_key(key_buffer, table->s->getPrimaryKey(), new_row);
 
986
    rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
 
987
  }
 
988
 
 
989
  if (share->nkeys > 0)
 
990
    share->blitz_lock.slotted_unlock(lock_id);
 
991
 
 
992
  return rv;
 
993
}
 
994
 
 
995
int ha_blitz::doDeleteRecord(const unsigned char *row_to_delete) {
 
996
  int rv;
 
997
 
 
998
  ha_statistic_increment(&system_status_var::ha_delete_count);
 
999
 
 
1000
  char *dict_key = held_key;
 
1001
  int dict_klen = held_key_len;
 
1002
  uint32_t lock_id = 0;
 
1003
 
 
1004
  if (share->nkeys > 0) {
 
1005
    lock_id = share->blitz_lock.slot_id(held_key, held_key_len);
 
1006
    share->blitz_lock.slotted_lock(lock_id);
 
1007
 
 
1008
    /* Loop over the indexes and delete all relevant entries for
 
1009
       this row. We do this by reproducing the key in BlitzDB's
 
1010
       unique key format. The procedure is simple.
 
1011
 
 
1012
       (1): Compute the key value for this index from the row then
 
1013
            pack it into key_buffer (not unique at this point).
 
1014
 
 
1015
       (2): Append the suffix of the held_key to the key generated
 
1016
            in step 1. The key is then guaranteed to be unique. */
 
1017
    for (uint32_t i = 0; i < share->nkeys; i++) {
 
1018
      /* In this case, we don't need to search for the key because
 
1019
         TC's cursor is already pointing at the key that we want
 
1020
         to delete. We wouldn't be here otherwise. */
 
1021
      if (i == active_index) {
 
1022
        btree_cursor[active_index].delete_position();
 
1023
        continue;
 
1024
      }
 
1025
 
 
1026
      int klen = make_index_key(key_buffer, i, row_to_delete);
 
1027
      int skip_len = btree_key_length(held_key, active_index);
 
1028
      uint16_t suffix_len = uint2korr(held_key + skip_len);
 
1029
 
 
1030
      /* Append the suffix to the key */
 
1031
      memcpy(key_buffer + klen, held_key + skip_len,
 
1032
             sizeof(suffix_len) + suffix_len);
 
1033
 
 
1034
      /* Update the key length to cover the generated key. */
 
1035
      klen = klen + sizeof(suffix_len) + suffix_len;
 
1036
 
 
1037
      if (share->btrees[i].delete_key(key_buffer, klen) != 0)
 
1038
        return HA_ERR_KEY_NOT_FOUND;
 
1039
    }
 
1040
 
 
1041
    /* Skip to the data dictionary key. */
 
1042
    int dict_key_offset = btree_key_length(dict_key, active_index);
 
1043
    dict_key = skip_btree_key(dict_key, dict_key_offset, &dict_klen);
 
1044
  }
 
1045
 
 
1046
  rv = share->dict.delete_row(dict_key, dict_klen);
 
1047
 
 
1048
  if (share->nkeys > 0)
 
1049
    share->blitz_lock.slotted_unlock(lock_id);
 
1050
 
 
1051
  return rv;
 
1052
}
 
1053
 
 
1054
void ha_blitz::get_auto_increment(uint64_t, uint64_t,
 
1055
                                  uint64_t, uint64_t *first_value,
 
1056
                                  uint64_t *nb_reserved_values) {
 
1057
  *first_value = share->auto_increment_value + 1;
 
1058
  *nb_reserved_values = UINT64_MAX;
 
1059
}
 
1060
 
 
1061
int ha_blitz::reset_auto_increment(uint64_t value) {
 
1062
  share->auto_increment_value = (value == 0) ? 1 : value;
 
1063
  return 0;
 
1064
}
 
1065
 
 
1066
int ha_blitz::delete_all_rows(void) {
 
1067
  for (uint32_t i = 0; i < share->nkeys; i++) {
 
1068
    if (share->btrees[i].delete_all() != 0) {
 
1069
      errkey = i;
 
1070
      return HA_ERR_CRASHED_ON_USAGE;
 
1071
    }
 
1072
  }
 
1073
  return (share->dict.delete_all_rows()) ? 0 : -1;
 
1074
}
 
1075
 
 
1076
uint32_t ha_blitz::max_row_length(void) {
 
1077
  uint32_t length = (table->getRecordLength() + table->sizeFields() * 2);
 
1078
  uint32_t *pos = table->getBlobField();
 
1079
  uint32_t *end = pos + table->sizeBlobFields();
 
1080
 
 
1081
  while (pos != end) {
 
1082
    length += 2 + ((Field_blob *)table->getField(*pos))->get_length();
 
1083
    pos++;
 
1084
  }
 
1085
 
 
1086
  return length;
 
1087
}
 
1088
 
 
1089
size_t ha_blitz::make_primary_key(char *pack_to, const unsigned char *row) {
 
1090
  if (!share->primary_key_exists) {
 
1091
    uint64_t next_id = share->dict.next_hidden_row_id();
 
1092
    int8store(pack_to, next_id);
 
1093
    return sizeof(next_id);
 
1094
  }
 
1095
 
 
1096
  /* Getting here means that there is a PK in this table. Get the
 
1097
     binary representation of the PK, pack it to BlitzDB's key buffer
 
1098
     and return the size of it. */
 
1099
  return make_index_key(pack_to, table->s->getPrimaryKey(), row);
 
1100
}
 
1101
 
 
1102
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
 
1103
                                const unsigned char *row) {
 
1104
  KeyInfo *key = &table->key_info[key_num];
 
1105
  KeyPartInfo *key_part = key->key_part;
 
1106
  KeyPartInfo *key_part_end = key_part + key->key_parts;
 
1107
 
 
1108
  unsigned char *pos = (unsigned char *)pack_to;
 
1109
  unsigned char *end;
 
1110
  int offset = 0;
 
1111
 
 
1112
  memset(pack_to, 0, BLITZ_MAX_KEY_LEN);
 
1113
 
 
1114
  /* Loop through key part(s) and pack them as we go. */
 
1115
  for (; key_part != key_part_end; key_part++) {
 
1116
    if (key_part->null_bit) {
 
1117
      if (row[key_part->null_offset] & key_part->null_bit) {
 
1118
        *pos++ = 0;
 
1119
        continue;
 
1120
      }
 
1121
      *pos++ = 1;
 
1122
    }
 
1123
 
 
1124
    end = key_part->field->pack(pos, row + key_part->offset);
 
1125
    offset = end - pos;
 
1126
    pos += offset;
 
1127
  }
 
1128
 
 
1129
  return ((char *)pos - pack_to);
 
1130
}
 
1131
 
 
1132
char *ha_blitz::merge_key(const char *a, const size_t a_len, const char *b,
 
1133
                          const size_t b_len, size_t *merged_len) {
 
1134
 
 
1135
  size_t total = a_len + sizeof(uint16_t) + b_len;
 
1136
 
 
1137
  if (total > key_merge_buffer_len) {
 
1138
    key_merge_buffer = (char *)realloc(key_merge_buffer, total);
 
1139
 
 
1140
    if (key_merge_buffer == NULL) {
 
1141
      errno = HA_ERR_OUT_OF_MEM;
 
1142
      return NULL;
 
1143
    }
 
1144
    key_merge_buffer_len = total;
 
1145
  }
 
1146
 
 
1147
  char *pos = key_merge_buffer;
 
1148
 
 
1149
  /* Copy the prefix. */
 
1150
  memcpy(pos, a, a_len);
 
1151
  pos += a_len;
 
1152
 
 
1153
  /* Copy the length of b. */
 
1154
  int2store(pos, (uint16_t)b_len);
 
1155
  pos += sizeof(uint16_t);
 
1156
 
 
1157
  /* Copy the suffix and we're done. */
 
1158
  memcpy(pos, b, b_len);
 
1159
 
 
1160
  *merged_len = total;
 
1161
  return key_merge_buffer;
 
1162
}
 
1163
 
 
1164
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
 
1165
  KeyInfo *key_info = &table->key_info[key_num];
 
1166
  KeyPartInfo *key_part = key_info->key_part;
 
1167
  KeyPartInfo *key_part_end = key_part + key_info->key_parts;
 
1168
  char *pos = (char *)key;
 
1169
  uint64_t len = 0;
 
1170
  size_t rv = 0;
 
1171
 
 
1172
  for (; key_part != key_part_end; key_part++) {
 
1173
    if (key_part->null_bit) {
 
1174
      rv++;
 
1175
      if (*key == 0)
 
1176
        continue;
 
1177
    }
 
1178
 
 
1179
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
 
1180
      len = *(uint8_t *)pos;
 
1181
      rv += len + sizeof(uint8_t);
 
1182
    } else if (key_part->type == HA_KEYTYPE_VARTEXT2) {
 
1183
      len = uint2korr(pos);
 
1184
      rv += len + sizeof(uint16_t);
 
1185
    } else {
 
1186
      len = key_part->field->key_length();
 
1187
      rv += len;
 
1188
    }
 
1189
    pos += len;
 
1190
    len = 0;
 
1191
  }
 
1192
 
 
1193
  return rv;
 
1194
}
 
1195
 
 
1196
void ha_blitz::keep_track_of_key(const char *key, const int klen) {
 
1197
  memcpy(held_key_buf, key, klen);
 
1198
  held_key = held_key_buf;
 
1199
  held_key_len = klen;
 
1200
}
 
1201
 
 
1202
/* Converts a native Drizzle index key to BlitzDB's format. */
 
1203
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
 
1204
                                    const int key_num, int *return_key_len) {
 
1205
  KeyInfo *key = &table->key_info[key_num];
 
1206
  KeyPartInfo *key_part = key->key_part;
 
1207
  KeyPartInfo *key_part_end = key_part + key->key_parts;
 
1208
 
 
1209
  unsigned char *key_pos = (unsigned char *)native_key;
 
1210
  unsigned char *keybuf_pos = (unsigned char *)key_buffer;
 
1211
  unsigned char *end;
 
1212
  int key_size = 0;
 
1213
  int offset = 0;
 
1214
 
 
1215
  memset(key_buffer, 0, BLITZ_MAX_KEY_LEN);
 
1216
 
 
1217
  for (; key_part != key_part_end; key_part++) {
 
1218
    if (key_part->null_bit) {
 
1219
      key_size++;
 
1220
 
 
1221
      /* This key is NULL */
 
1222
      if (!(*keybuf_pos++ = (*key_pos++ == 0)))
 
1223
        continue;
 
1224
    }
 
1225
 
 
1226
    /* This is a temporary workaround for a bug in Drizzle's VARCHAR
 
1227
       where a 1 byte representable length varchar's actual data is
 
1228
       positioned 2 bytes ahead of the beginning of the buffer. The
 
1229
       correct behavior is to be positioned 1 byte ahead. Furthermore,
 
1230
       this is only applicable with varchar keys on READ. */
 
1231
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
 
1232
      /* Dereference the 1 byte length of the value. */
 
1233
      uint8_t varlen = *(uint8_t *)key_pos;
 
1234
      *keybuf_pos++ = varlen;
 
1235
 
 
1236
      /* Read the value by skipping 2 bytes. This is the workaround. */
 
1237
      memcpy(keybuf_pos, key_pos + sizeof(uint16_t), varlen);
 
1238
      offset = (sizeof(uint8_t) + varlen);
 
1239
      keybuf_pos += varlen;
 
1240
    } else {
 
1241
      end = key_part->field->pack(keybuf_pos, key_pos);
 
1242
      offset = end - keybuf_pos;
 
1243
      keybuf_pos += offset;
 
1244
    }
 
1245
 
 
1246
    key_size += offset;
 
1247
    key_pos += key_part->field->key_length();
 
1248
  }
 
1249
 
 
1250
  *return_key_len = key_size;
 
1251
  return key_buffer;
 
1252
}
 
1253
 
 
1254
size_t ha_blitz::pack_row(unsigned char *row_buffer,
 
1255
                          unsigned char *row_to_pack) {
 
1256
  unsigned char *pos;
 
1257
 
 
1258
  /* Nothing special to do if the table is fixed length */
 
1259
  if (share->fixed_length_table) {
 
1260
    memcpy(row_buffer, row_to_pack, table->s->getRecordLength());
 
1261
    return (size_t)table->s->getRecordLength();
 
1262
  }
 
1263
 
 
1264
  /* Copy NULL bits */
 
1265
  memcpy(row_buffer, row_to_pack, table->s->null_bytes);
 
1266
  pos = row_buffer + table->s->null_bytes;
 
1267
 
 
1268
  /* Pack each field into the buffer */
 
1269
  for (Field **field = table->getFields(); *field; field++) {
 
1270
    if (!((*field)->is_null()))
 
1271
      pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
 
1272
  }
 
1273
 
 
1274
  return (size_t)(pos - row_buffer);
 
1275
}
 
1276
 
 
1277
bool ha_blitz::unpack_row(unsigned char *to, const char *from,
 
1278
                          const size_t from_len) {
 
1279
  const unsigned char *pos;
 
1280
 
 
1281
  /* Nothing special to do */
 
1282
  if (share->fixed_length_table) {
 
1283
    memcpy(to, from, from_len);
 
1284
    return true;
 
1285
  }
 
1286
 
 
1287
  /* Start by copying NULL bits which is the beginning block
 
1288
     of a Drizzle row. */
 
1289
  pos = (const unsigned char *)from;
 
1290
  memcpy(to, pos, table->s->null_bytes);
 
1291
  pos += table->s->null_bytes;
 
1292
 
 
1293
  /* Unpack all fields in the provided row. */
 
1294
  for (Field **field = table->getFields(); *field; field++) {
 
1295
    if (!((*field)->is_null())) {
 
1296
      pos = (*field)->unpack(to + (*field)->offset(table->record[0]), pos);
 
1297
    }
 
1298
  }
 
1299
 
 
1300
  return true;
 
1301
}
 
1302
 
 
1303
unsigned char *ha_blitz::get_pack_buffer(const size_t size) {
 
1304
  unsigned char *buf = pack_buffer;
 
1305
 
 
1306
  /* This is a shitty case where the row size is larger than 2KB. */
 
1307
  if (size > BLITZ_MAX_ROW_STACK) {
 
1308
    if (size > secondary_row_buffer_size) {
 
1309
      void *new_ptr = realloc(secondary_row_buffer, size);
 
1310
 
 
1311
      if (new_ptr == NULL) {
 
1312
        errno = HA_ERR_OUT_OF_MEM;
 
1313
        return NULL;
 
1314
      }
 
1315
 
 
1316
      secondary_row_buffer_size = size;
 
1317
      secondary_row_buffer = (unsigned char *)new_ptr;
 
1318
      buf = secondary_row_buffer;
 
1319
    }
 
1320
  }
 
1321
  return buf;
 
1322
}
 
1323
 
 
1324
static BlitzEngine *blitz_engine = NULL;
 
1325
 
 
1326
BlitzShare *ha_blitz::get_share(const char *name) {
 
1327
  BlitzShare *share_ptr;
 
1328
  BlitzEngine *bz_engine = (BlitzEngine *)engine;
 
1329
  std::string table_path(name);
 
1330
 
 
1331
  pthread_mutex_lock(&blitz_utility_mutex);
 
1332
 
 
1333
  /* Look up the table cache to see if the table resource is available */
 
1334
  share_ptr = bz_engine->getTableShare(table_path);
 
1335
 
 
1336
  if (share_ptr) {
 
1337
    share_ptr->use_count++;
 
1338
    pthread_mutex_unlock(&blitz_utility_mutex);
 
1339
    return share_ptr;
 
1340
  }
 
1341
 
 
1342
  /* Table wasn't cached so create a new table handler */
 
1343
  share_ptr = new BlitzShare();
 
1344
 
 
1345
  /* Prepare the Data Dictionary */
 
1346
  if (share_ptr->dict.startup(table_path.c_str()) != 0) {
 
1347
    delete share_ptr;
 
1348
    pthread_mutex_unlock(&blitz_utility_mutex);
 
1349
    return NULL;
 
1350
  }
 
1351
 
 
1352
  /* Prepare Index Structure(s) */
 
1353
  KeyInfo *curr = &table->s->getKeyInfo(0);
 
1354
  share_ptr->btrees = new BlitzTree[table->s->keys];
 
1355
 
 
1356
  for (uint32_t i = 0; i < table->s->keys; i++, curr++) {
 
1357
    share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
 
1358
    share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
 
1359
 
 
1360
    if (table->key_info[i].flags & HA_NOSAME)
 
1361
      share_ptr->btrees[i].unique = true;
 
1362
 
 
1363
    share_ptr->btrees[i].length = curr->key_length;
 
1364
    share_ptr->btrees[i].nparts = curr->key_parts;
 
1365
 
 
1366
    /* Record Meta Data of the Key Segments */
 
1367
    for (uint32_t j = 0; j < curr->key_parts; j++) {
 
1368
      Field *f = curr->key_part[j].field;
 
1369
 
 
1370
      if (f->null_ptr) {
 
1371
        share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
 
1372
        share_ptr->btrees[i].parts[j].null_pos
 
1373
          = (uint32_t)(f->null_ptr - (unsigned char *)table->record[0]);
 
1374
      }
 
1375
 
 
1376
      share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
 
1377
 
 
1378
      if (f->type() == DRIZZLE_TYPE_BLOB) {
 
1379
        share_ptr->btrees[i].parts[j].flag |= HA_BLOB_PART;
 
1380
      }
 
1381
 
 
1382
      share_ptr->btrees[i].parts[j].type = curr->key_part[j].type;
 
1383
      share_ptr->btrees[i].parts[j].offset = curr->key_part[j].offset;
 
1384
      share_ptr->btrees[i].parts[j].length = curr->key_part[j].length;
 
1385
    }
 
1386
  }
 
1387
 
 
1388
  /* Set Meta Data */
 
1389
  share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
 
1390
  share_ptr->table_name = table_path;
 
1391
  share_ptr->nkeys = table->s->keys;
 
1392
  share_ptr->use_count = 1;
 
1393
 
 
1394
  share_ptr->fixed_length_table = !(table->s->db_create_options
 
1395
                                    & HA_OPTION_PACK_RECORD);
 
1396
 
 
1397
  if (table->s->getPrimaryKey() >= MAX_KEY)
 
1398
    share_ptr->primary_key_exists = false;
 
1399
  else
 
1400
    share_ptr->primary_key_exists = true;
 
1401
 
 
1402
  /* Done creating the share object. Cache it for later
 
1403
     use by another cursor object.*/
 
1404
  bz_engine->cacheTableShare(table_path, share_ptr);
 
1405
 
 
1406
  pthread_mutex_unlock(&blitz_utility_mutex);
 
1407
  return share_ptr;
 
1408
}
 
1409
 
 
1410
int ha_blitz::free_share(void) {
 
1411
  pthread_mutex_lock(&blitz_utility_mutex);
 
1412
 
 
1413
  /* BlitzShare could still be used by another thread. Check the
 
1414
     reference counter to see if it's safe to free it */
 
1415
  if (--share->use_count == 0) {
 
1416
    share->dict.write_meta_autoinc(share->auto_increment_value);
 
1417
 
 
1418
    if (share->dict.shutdown() != 0) {
 
1419
      pthread_mutex_unlock(&blitz_utility_mutex);
 
1420
      return HA_ERR_CRASHED_ON_USAGE;
 
1421
    }
 
1422
 
 
1423
    for (uint32_t i = 0; i < share->nkeys; i++) {
 
1424
      delete[] share->btrees[i].parts;
 
1425
      share->btrees[i].close();
 
1426
    }
 
1427
 
 
1428
    BlitzEngine *bz_engine = (BlitzEngine *)engine;
 
1429
    bz_engine->deleteTableShare(share->table_name);
 
1430
 
 
1431
    delete[] share->btrees;
 
1432
    delete share;
 
1433
  }
 
1434
 
 
1435
  pthread_mutex_unlock(&blitz_utility_mutex);
 
1436
  return 0;
 
1437
}
 
1438
 
 
1439
static int blitz_init(drizzled::module::Context &context) {
 
1440
  blitz_engine = new BlitzEngine("BLITZDB");
 
1441
 
 
1442
  if (!blitz_engine->doCreateTableCache()) {
 
1443
    delete blitz_engine;
 
1444
    return HA_ERR_OUT_OF_MEM;
 
1445
  }
 
1446
 
 
1447
  pthread_mutex_init(&blitz_utility_mutex, NULL);
 
1448
  context.add(blitz_engine);
 
1449
  return 0;
 
1450
}
 
1451
 
 
1452
/* Read the prototype of this function for details. */
 
1453
static char *skip_btree_key(const char *key, const size_t skip_len,
 
1454
                            int *return_klen) {
 
1455
  char *pos = (char *)key;
 
1456
  *return_klen = uint2korr(pos + skip_len);
 
1457
  return pos + skip_len + sizeof(uint16_t);
 
1458
}
 
1459
 
 
1460
DRIZZLE_PLUGIN(blitz_init, NULL);