~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/blitzdb/ha_blitz.cc

  • Committer: Monty Taylor
  • Date: 2010-10-13 17:53:36 UTC
  • mto: This revision was merged to the branch mainline in revision 1845.
  • Revision ID: mordred@inaugust.com-20101013175336-amzhjftgztblvua5
Updated pandora-build files to version 0.161

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
using namespace std;
24
24
using namespace drizzled;
25
 
namespace po= boost::program_options;
26
25
 
27
26
static pthread_mutex_t blitz_utility_mutex;
28
27
 
55
54
    tcmapdel(blitz_table_cache);
56
55
  }
57
56
 
58
 
  virtual drizzled::Cursor *create(drizzled::Table &table) {
 
57
  virtual drizzled::Cursor *create(drizzled::TableShare &table) {
59
58
    return new ha_blitz(*this, table);
60
59
  }
61
60
 
81
80
 
82
81
  void doGetTableIdentifiers(drizzled::CachedDirectory &directory,
83
82
                             const drizzled::SchemaIdentifier &schema_identifier,
84
 
                             drizzled::TableIdentifier::vector &set_of_identifiers);
 
83
                             drizzled::TableIdentifiers &set_of_identifiers);
85
84
 
86
85
  bool doDoesTableExist(drizzled::Session &session,
87
86
                        const drizzled::TableIdentifier &identifier);
129
128
 
130
129
  /* Temporary fix for blocking composite keys. We need to add this
131
130
     check because version 1 doesn't handle composite indexes. */
132
 
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
 
131
  for (uint32_t i = 0; i < table.getMutableShare()->keys; i++) {
133
132
    if (table.key_info[i].key_parts > 1)
134
133
      return HA_ERR_UNSUPPORTED;
135
134
  }
143
142
    return ecode;
144
143
 
145
144
  /* Create b+tree index(es) for this table. */
146
 
  for (uint32_t i = 0; i < table.getShare()->keys; i++) {
 
145
  for (uint32_t i = 0; i < table.getMutableShare()->keys; i++) {
147
146
    if ((ecode = btree.create(identifier.getPath().c_str(), i)) != 0)
148
147
      return ecode;
149
148
  }
169
168
  BlitzData blitz_table;
170
169
  uint32_t nkeys;
171
170
 
172
 
  BlitzData dict;
173
 
  int ecode;
174
 
  /* Write the table definition to system table. */
175
 
  if ((ecode = dict.open_system_table(from.getPath(), HDBOWRITER)) != 0)
176
 
    return ecode;
177
 
 
178
 
  drizzled::message::Table proto;
179
 
  char *proto_string;
180
 
  int proto_string_len;
181
 
 
182
 
  proto_string = dict.get_system_entry(BLITZ_TABLE_PROTO_KEY.c_str(),
183
 
                                       BLITZ_TABLE_PROTO_KEY.length(),
184
 
                                       &proto_string_len);
185
 
 
186
 
  if (proto_string == NULL) {
187
 
    return ENOMEM;
188
 
  }
189
 
 
190
 
  if (!proto.ParseFromArray(proto_string, proto_string_len)) {
191
 
    free(proto_string);
192
 
    return HA_ERR_CRASHED_ON_USAGE;
193
 
  }
194
 
 
195
 
  free(proto_string);
196
 
 
197
 
  proto.set_name(to.getTableName());
198
 
  proto.set_schema(to.getSchemaName());
199
 
  proto.set_catalog(to.getCatalogName());
200
 
 
201
 
  if (!dict.write_table_definition(proto)) {
202
 
    dict.close_system_table();
203
 
    return HA_ERR_CRASHED_ON_USAGE;
204
 
  }
205
 
 
206
 
  dict.close_system_table();
207
 
 
208
171
  /* Find out the number of indexes in this table. This information
209
172
     is required because BlitzDB creates a file for each indexes.*/
210
173
  if (blitz_table.open_data_table(from.getPath().c_str(), HDBOREADER) != 0)
325
288
 
326
289
void BlitzEngine::doGetTableIdentifiers(drizzled::CachedDirectory &directory,
327
290
                                        const drizzled::SchemaIdentifier &schema_id,
328
 
                                        drizzled::TableIdentifier::vector &ids) {
 
291
                                        drizzled::TableIdentifiers &ids) {
329
292
  drizzled::CachedDirectory::Entries entries = directory.getEntries();
330
293
 
331
294
  for (drizzled::CachedDirectory::Entries::iterator entry_iter = entries.begin();
402
365
}
403
366
 
404
367
ha_blitz::ha_blitz(drizzled::plugin::StorageEngine &engine_arg,
405
 
                   Table &table_arg) : Cursor(engine_arg, table_arg),
 
368
                   TableShare &table_arg) : Cursor(engine_arg, table_arg),
406
369
                                            btree_cursor(NULL),
407
370
                                            table_scan(false),
408
371
                                            table_based(false),
458
421
     will use to uniquely identify a row. The actual allocation is
459
422
     done by the kernel so all we do here is specify the size of it.*/
460
423
  if (share->primary_key_exists) {
461
 
    ref_length = getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
 
424
    ref_length = table->key_info[table->getMutableShare()->getPrimaryKey()].key_length;
462
425
  } else {
463
426
    ref_length = sizeof(held_key_len) + sizeof(uint64_t);
464
427
  }
497
460
 
498
461
int ha_blitz::doStartTableScan(bool scan) {
499
462
  /* Obtain the query type for this scan */
500
 
  sql_command_type = session_sql_command(getTable()->getSession());
 
463
  sql_command_type = session_sql_command(table->getSession());
501
464
  table_scan = scan;
502
465
  table_based = true;
503
466
 
525
488
  held_key = NULL;
526
489
 
527
490
  if (current_key == NULL) {
528
 
    getTable()->status = STATUS_NOT_FOUND;
 
491
    table->status = STATUS_NOT_FOUND;
529
492
    return HA_ERR_END_OF_FILE;
530
493
  }
531
494
 
556
519
  /* It is now memory-leak-safe to point current_key to next_key. */
557
520
  current_key = next_key;
558
521
  current_key_len = next_key_len;
559
 
  getTable()->status = 0;
 
522
  table->status = 0;
560
523
  return 0;
561
524
}
562
525
 
622
585
 
623
586
int ha_blitz::doStartIndexScan(uint32_t key_num, bool) {
624
587
  active_index = key_num;
625
 
  sql_command_type = session_sql_command(getTable()->getSession());
 
588
  sql_command_type = session_sql_command(table->getSession());
626
589
 
627
590
  /* This is unlikely to happen but just for assurance, re-obtain
628
591
     the lock if this thread already has a certain lock. This makes
667
630
  bt_key = btree_cursor[active_index].next_key(&bt_klen);
668
631
 
669
632
  if (bt_key == NULL) {
670
 
    getTable()->status = STATUS_NOT_FOUND;
 
633
    table->status = STATUS_NOT_FOUND;
671
634
    return HA_ERR_END_OF_FILE;
672
635
  }
673
636
 
676
639
 
677
640
  if ((row = share->dict.get_row(dict_key, dict_klen, &rlen)) == NULL) {
678
641
    free(bt_key);
679
 
    getTable()->status = STATUS_NOT_FOUND;
 
642
    table->status = STATUS_NOT_FOUND;
680
643
    return HA_ERR_KEY_NOT_FOUND;
681
644
  }
682
645
 
831
794
  ha_statistic_increment(&system_status_var::ha_write_count);
832
795
 
833
796
  /* Prepare Auto Increment field if one exists. */
834
 
  if (getTable()->next_number_field && drizzle_row == getTable()->getInsertRecord()) {
 
797
  if (table->next_number_field && drizzle_row == table->getInsertRecord()) {
835
798
    pthread_mutex_lock(&blitz_utility_mutex);
836
799
    if ((rv = update_auto_increment()) != 0) {
837
800
      pthread_mutex_unlock(&blitz_utility_mutex);
838
801
      return rv;
839
802
    }
840
803
 
841
 
    uint64_t next_val = getTable()->next_number_field->val_int();
 
804
    uint64_t next_val = table->next_number_field->val_int();
842
805
 
843
806
    if (next_val > share->auto_increment_value) {
844
807
      share->auto_increment_value = next_val;
969
932
      /* Now write the new key. */
970
933
      prefix_len = make_index_key(key_buffer, i, new_row);
971
934
 
972
 
      if (i == getTable()->getShare()->getPrimaryKey()) {
 
935
      if (i == table->getMutableShare()->getPrimaryKey()) {
973
936
        key = merge_key(key_buffer, prefix_len, key_buffer, prefix_len, &klen);
974
937
        rv = share->btrees[i].write(key, klen);
975
938
      } else {
996
959
  if (table_based) {
997
960
    rv = share->dict.write_row(held_key, held_key_len, row_buf, row_len);
998
961
  } else {
999
 
    int klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), old_row);
 
962
    int klen = make_index_key(key_buffer, table->getMutableShare()->getPrimaryKey(), old_row);
1000
963
 
1001
964
    /* Delete with the old key. */
1002
965
    share->dict.delete_row(key_buffer, klen);
1003
966
 
1004
967
    /* Write with the new key. */
1005
 
    klen = make_index_key(key_buffer, getTable()->getShare()->getPrimaryKey(), new_row);
 
968
    klen = make_index_key(key_buffer, table->getMutableShare()->getPrimaryKey(), new_row);
1006
969
    rv = share->dict.write_row(key_buffer, klen, row_buf, row_len);
1007
970
  }
1008
971
 
1094
1057
}
1095
1058
 
1096
1059
uint32_t ha_blitz::max_row_length(void) {
1097
 
  uint32_t length = (getTable()->getRecordLength() + getTable()->sizeFields() * 2);
1098
 
  uint32_t *pos = getTable()->getBlobField();
1099
 
  uint32_t *end = pos + getTable()->sizeBlobFields();
 
1060
  uint32_t length = (table->getRecordLength() + table->sizeFields() * 2);
 
1061
  uint32_t *pos = table->getBlobField();
 
1062
  uint32_t *end = pos + table->sizeBlobFields();
1100
1063
 
1101
1064
  while (pos != end) {
1102
 
    length += 2 + ((Field_blob *)getTable()->getField(*pos))->get_length();
 
1065
    length += 2 + ((Field_blob *)table->getField(*pos))->get_length();
1103
1066
    pos++;
1104
1067
  }
1105
1068
 
1116
1079
  /* Getting here means that there is a PK in this table. Get the
1117
1080
     binary representation of the PK, pack it to BlitzDB's key buffer
1118
1081
     and return the size of it. */
1119
 
  return make_index_key(pack_to, getTable()->getShare()->getPrimaryKey(), row);
 
1082
  return make_index_key(pack_to, table->getMutableShare()->getPrimaryKey(), row);
1120
1083
}
1121
1084
 
1122
1085
size_t ha_blitz::make_index_key(char *pack_to, int key_num,
1123
1086
                                const unsigned char *row) {
1124
 
  KeyInfo *key = &getTable()->key_info[key_num];
 
1087
  KeyInfo *key = &table->key_info[key_num];
1125
1088
  KeyPartInfo *key_part = key->key_part;
1126
1089
  KeyPartInfo *key_part_end = key_part + key->key_parts;
1127
1090
 
1141
1104
      *pos++ = 1;
1142
1105
    }
1143
1106
 
1144
 
    /* Here we normalize VARTEXT1 to VARTEXT2 for simplicity. */
1145
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1146
 
      /* Extract the length of the string from the row. */
1147
 
      uint16_t data_len = *(uint8_t *)(row + key_part->offset);
1148
 
 
1149
 
      /* Copy the length of the string. Use 2 bytes. */
1150
 
      int2store(pos, data_len);
1151
 
      pos += sizeof(data_len);
1152
 
 
1153
 
      /* Copy the string data */
1154
 
      memcpy(pos, row + key_part->offset + sizeof(uint8_t), data_len);
1155
 
      pos += data_len;
1156
 
    } else {
1157
 
      end = key_part->field->pack(pos, row + key_part->offset);
1158
 
      offset = end - pos;
1159
 
      pos += offset;
1160
 
    }
 
1107
    end = key_part->field->pack(pos, row + key_part->offset);
 
1108
    offset = end - pos;
 
1109
    pos += offset;
1161
1110
  }
1162
1111
 
1163
1112
  return ((char *)pos - pack_to);
1196
1145
}
1197
1146
 
1198
1147
size_t ha_blitz::btree_key_length(const char *key, const int key_num) {
1199
 
  KeyInfo *key_info = &getTable()->key_info[key_num];
 
1148
  KeyInfo *key_info = &table->key_info[key_num];
1200
1149
  KeyPartInfo *key_part = key_info->key_part;
1201
1150
  KeyPartInfo *key_part_end = key_part + key_info->key_parts;
1202
1151
  char *pos = (char *)key;
1205
1154
 
1206
1155
  for (; key_part != key_part_end; key_part++) {
1207
1156
    if (key_part->null_bit) {
1208
 
      pos++;
1209
1157
      rv++;
1210
1158
      if (*key == 0)
1211
1159
        continue;
1212
1160
    }
1213
1161
 
1214
 
    if (key_part->type == HA_KEYTYPE_VARTEXT1 ||
1215
 
        key_part->type == HA_KEYTYPE_VARTEXT2) {
 
1162
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
 
1163
      len = *(uint8_t *)pos;
 
1164
      rv += len + sizeof(uint8_t);
 
1165
    } else if (key_part->type == HA_KEYTYPE_VARTEXT2) {
1216
1166
      len = uint2korr(pos);
1217
1167
      rv += len + sizeof(uint16_t);
1218
1168
    } else {
1235
1185
/* Converts a native Drizzle index key to BlitzDB's format. */
1236
1186
char *ha_blitz::native_to_blitz_key(const unsigned char *native_key,
1237
1187
                                    const int key_num, int *return_key_len) {
1238
 
  KeyInfo *key = &getTable()->key_info[key_num];
 
1188
  KeyInfo *key = &table->key_info[key_num];
1239
1189
  KeyPartInfo *key_part = key->key_part;
1240
1190
  KeyPartInfo *key_part_end = key_part + key->key_parts;
1241
1191
 
1256
1206
        continue;
1257
1207
    }
1258
1208
 
1259
 
    /* Normalize a VARTEXT1 key to VARTEXT2. */
 
1209
    /* This is a temporary workaround for a bug in Drizzle's VARCHAR
 
1210
       where a 1 byte representable length varchar's actual data is
 
1211
       positioned 2 bytes ahead of the beginning of the buffer. The
 
1212
       correct behavior is to be positioned 1 byte ahead. Furthermore,
 
1213
       this is only applicable with varchar keys on READ. */
1260
1214
    if (key_part->type == HA_KEYTYPE_VARTEXT1) {
1261
 
      uint16_t str_len = *(uint16_t *)key_pos;
1262
 
 
1263
 
      /* Copy the length of the string over to key buffer. */
1264
 
      int2store(keybuf_pos, str_len);
1265
 
      keybuf_pos += sizeof(str_len);
1266
 
 
1267
 
      /* Copy the actual value over to the key buffer. */
1268
 
      memcpy(keybuf_pos, key_pos + sizeof(str_len), str_len);
1269
 
      keybuf_pos += str_len;
1270
 
 
1271
 
      /* NULL byte + Length of str (2 byte) + Actual String. */
1272
 
      offset = 1 + sizeof(str_len) + str_len;
 
1215
      /* Dereference the 1 byte length of the value. */
 
1216
      uint8_t varlen = *(uint8_t *)key_pos;
 
1217
      *keybuf_pos++ = varlen;
 
1218
 
 
1219
      /* Read the value by skipping 2 bytes. This is the workaround. */
 
1220
      memcpy(keybuf_pos, key_pos + sizeof(uint16_t), varlen);
 
1221
      offset = (sizeof(uint8_t) + varlen);
 
1222
      keybuf_pos += varlen;
1273
1223
    } else {
1274
1224
      end = key_part->field->pack(keybuf_pos, key_pos);
1275
1225
      offset = end - keybuf_pos;
1290
1240
 
1291
1241
  /* Nothing special to do if the table is fixed length */
1292
1242
  if (share->fixed_length_table) {
1293
 
    memcpy(row_buffer, row_to_pack, getTable()->getShare()->getRecordLength());
1294
 
    return (size_t)getTable()->getShare()->getRecordLength();
 
1243
    memcpy(row_buffer, row_to_pack, table->getMutableShare()->getRecordLength());
 
1244
    return (size_t)table->getMutableShare()->getRecordLength();
1295
1245
  }
1296
1246
 
1297
1247
  /* Copy NULL bits */
1298
 
  memcpy(row_buffer, row_to_pack, getTable()->getShare()->null_bytes);
1299
 
  pos = row_buffer + getTable()->getShare()->null_bytes;
 
1248
  memcpy(row_buffer, row_to_pack, table->getMutableShare()->null_bytes);
 
1249
  pos = row_buffer + table->getMutableShare()->null_bytes;
1300
1250
 
1301
1251
  /* Pack each field into the buffer */
1302
 
  for (Field **field = getTable()->getFields(); *field; field++) {
 
1252
  for (Field **field = table->getFields(); *field; field++) {
1303
1253
    if (!((*field)->is_null()))
1304
1254
      pos = (*field)->pack(pos, row_to_pack + (*field)->offset(row_to_pack));
1305
1255
  }
1320
1270
  /* Start by copying NULL bits which is the beginning block
1321
1271
     of a Drizzle row. */
1322
1272
  pos = (const unsigned char *)from;
1323
 
  memcpy(to, pos, getTable()->getShare()->null_bytes);
1324
 
  pos += getTable()->getShare()->null_bytes;
 
1273
  memcpy(to, pos, table->getMutableShare()->null_bytes);
 
1274
  pos += table->getMutableShare()->null_bytes;
1325
1275
 
1326
1276
  /* Unpack all fields in the provided row. */
1327
 
  for (Field **field = getTable()->getFields(); *field; field++) {
 
1277
  for (Field **field = table->getFields(); *field; field++) {
1328
1278
    if (!((*field)->is_null())) {
1329
 
      pos = (*field)->unpack(to + (*field)->offset(getTable()->getInsertRecord()), pos);
 
1279
      pos = (*field)->unpack(to + (*field)->offset(table->getInsertRecord()), pos);
1330
1280
    }
1331
1281
  }
1332
1282
 
1358
1308
 
1359
1309
BlitzShare *ha_blitz::get_share(const char *name) {
1360
1310
  BlitzShare *share_ptr;
1361
 
  BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
 
1311
  BlitzEngine *bz_engine = (BlitzEngine *)engine;
1362
1312
  std::string table_path(name);
1363
1313
 
1364
1314
  pthread_mutex_lock(&blitz_utility_mutex);
1383
1333
  }
1384
1334
 
1385
1335
  /* Prepare Index Structure(s) */
1386
 
  KeyInfo *curr = &getTable()->getMutableShare()->getKeyInfo(0);
1387
 
  share_ptr->btrees = new BlitzTree[getTable()->getShare()->keys];
 
1336
  KeyInfo *curr = &table->getMutableShare()->getKeyInfo(0);
 
1337
  share_ptr->btrees = new BlitzTree[table->getMutableShare()->keys];
1388
1338
 
1389
 
  for (uint32_t i = 0; i < getTable()->getShare()->keys; i++, curr++) {
 
1339
  for (uint32_t i = 0; i < table->getMutableShare()->keys; i++, curr++) {
1390
1340
    share_ptr->btrees[i].open(table_path.c_str(), i, BDBOWRITER);
1391
1341
    share_ptr->btrees[i].parts = new BlitzKeyPart[curr->key_parts];
1392
1342
 
1393
 
    if (getTable()->key_info[i].flags & HA_NOSAME)
 
1343
    if (table->key_info[i].flags & HA_NOSAME)
1394
1344
      share_ptr->btrees[i].unique = true;
1395
1345
 
1396
1346
    share_ptr->btrees[i].length = curr->key_length;
1403
1353
      if (f->null_ptr) {
1404
1354
        share_ptr->btrees[i].parts[j].null_bitmask = f->null_bit;
1405
1355
        share_ptr->btrees[i].parts[j].null_pos
1406
 
          = (uint32_t)(f->null_ptr - (unsigned char *)getTable()->getInsertRecord());
 
1356
          = (uint32_t)(f->null_ptr - (unsigned char *)table->getInsertRecord());
1407
1357
      }
1408
1358
 
1409
1359
      share_ptr->btrees[i].parts[j].flag = curr->key_part[j].key_part_flag;
1421
1371
  /* Set Meta Data */
1422
1372
  share_ptr->auto_increment_value = share_ptr->dict.read_meta_autoinc();
1423
1373
  share_ptr->table_name = table_path;
1424
 
  share_ptr->nkeys = getTable()->getShare()->keys;
 
1374
  share_ptr->nkeys = table->getMutableShare()->keys;
1425
1375
  share_ptr->use_count = 1;
1426
1376
 
1427
 
  share_ptr->fixed_length_table = !(getTable()->getShare()->db_create_options
 
1377
  share_ptr->fixed_length_table = !(table->getMutableShare()->db_create_options
1428
1378
                                    & HA_OPTION_PACK_RECORD);
1429
1379
 
1430
 
  if (getTable()->getShare()->getPrimaryKey() >= MAX_KEY)
 
1380
  if (table->getMutableShare()->getPrimaryKey() >= MAX_KEY)
1431
1381
    share_ptr->primary_key_exists = false;
1432
1382
  else
1433
1383
    share_ptr->primary_key_exists = true;
1458
1408
      share->btrees[i].close();
1459
1409
    }
1460
1410
 
1461
 
    BlitzEngine *bz_engine = (BlitzEngine *)getEngine();
 
1411
    BlitzEngine *bz_engine = (BlitzEngine *)engine;
1462
1412
    bz_engine->deleteTableShare(share->table_name);
1463
1413
 
1464
1414
    delete[] share->btrees;
1479
1429
 
1480
1430
  pthread_mutex_init(&blitz_utility_mutex, NULL);
1481
1431
  context.add(blitz_engine);
1482
 
  context.registerVariable(new sys_var_uint64_t_ptr("estimated-rows",
1483
 
                                                    &blitz_estimated_rows));
1484
1432
  return 0;
1485
1433
}
1486
1434
 
1500
1448
  return true;
1501
1449
}
1502
1450
 
1503
 
static void blitz_init_options(drizzled::module::option_context &context)
1504
 
{
1505
 
  context("estimated-rows",
1506
 
          po::value<uint64_t>(&blitz_estimated_rows)->default_value(0),
1507
 
          N_("Estimated number of rows that a BlitzDB table will store."));
1508
 
}
1509
 
 
1510
 
DRIZZLE_PLUGIN(blitz_init, NULL, blitz_init_options);
 
1451
static DRIZZLE_SYSVAR_ULONGLONG (
 
1452
  estimated_rows,
 
1453
  blitz_estimated_rows,
 
1454
  PLUGIN_VAR_RQCMDARG,
 
1455
  "Estimated number of rows that a BlitzDB table will store.",
 
1456
  NULL,
 
1457
  NULL,
 
1458
  0,
 
1459
  0,
 
1460
  UINT64_MAX,
 
1461
  0
 
1462
);
 
1463
 
 
1464
static drizzle_sys_var *blitz_system_variables[] = {
 
1465
  DRIZZLE_SYSVAR(estimated_rows),
 
1466
  NULL
 
1467
};
 
1468
 
 
1469
DRIZZLE_PLUGIN(blitz_init, blitz_system_variables, NULL);