1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* 2005-09-30 Paul McCullagh
24
#include "xt_config.h"
37
/* This header not available on suse-11-amd64, ubuntu-9.10-amd64 */
41
#include <drizzled/base.h>
42
using namespace drizzled;
44
#include "mysql_priv.h"
47
#include "pthread_xt.h"
48
#include "memory_xt.h"
51
#include "database_xt.h"
52
#include "strutil_xt.h"
59
#define MAX_SEARCH_DEPTH 32
60
//#define CHECK_AND_PRINT
61
//#define CHECK_NODE_REFERENCE
62
//#define TRACE_FLUSH_INDEX
63
//#define CHECK_PRINTS_RECORD_REFERENCES
64
//#define DO_COMP_TEST
67
#define MAX_SEARCH_DEPTH 100
70
//#define TRACE_FLUSH_TIMES
72
typedef struct IdxStackItem {
74
xtIndexNodeID i_branch;
75
} IdxStackItemRec, *IdxStackItemPtr;
77
typedef struct IdxBranchStack {
79
IdxStackItemRec s_elements[MAX_SEARCH_DEPTH];
80
} IdxBranchStackRec, *IdxBranchStackPtr;
84
static void idx_check_on_key(XTOpenTablePtr ot);
86
static u_int idx_check_index(XTOpenTablePtr ot, XTIndexPtr ind, xtBool with_lock);
89
static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, xtBool last_item, XTIdxKeyValuePtr key_value, xtIndexNodeID branch);
90
static xtBool idx_remove_lazy_deleted_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID current, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value);
92
#ifdef XT_TRACK_INDEX_UPDATES
94
static xtBool ind_track_write(struct XTOpenTable *ot, struct XTIndex *ind, xtIndexNodeID offset, size_t size, xtWord1 *data)
97
return xt_ind_write(ot, ind, offset, size, data);
100
#define XT_IND_WRITE ind_track_write
104
#define XT_IND_WRITE xt_ind_write
109
#ifdef CHECK_NODE_REFERENCE
110
#define IDX_GET_NODE_REF(t, x, o) idx_get_node_ref(t, x, o)
112
#define IDX_GET_NODE_REF(t, x, o) XT_GET_NODE_REF(t, (x) - (o))
116
* -----------------------------------------------------------------------
120
//#define TRACK_ACTIVITY
122
#ifdef TRACK_ACTIVITY
123
#define TRACK_MAX_BLOCKS 2000
125
typedef struct TrackBlock {
128
} TrackBlockRec, *TrackBlockPtr;
130
TrackBlockRec blocks[TRACK_MAX_BLOCKS];
132
xtPublic void track_work(u_int block, char *what)
136
ASSERT_NS(block > 0 && block <= TRACK_MAX_BLOCKS);
138
if (blocks[block].activity)
139
len = strlen(blocks[block].activity);
141
xt_realloc_ns((void **) &blocks[block].activity, len + len2 + 1);
142
memcpy(blocks[block].activity + len, what, len2 + 1);
145
static void track_block_exists(xtIndexNodeID block)
147
if (XT_NODE_ID(block) > 0 && XT_NODE_ID(block) <= TRACK_MAX_BLOCKS)
148
blocks[XT_NODE_ID(block)-1].exists = TRUE;
151
static void track_reset_missing()
153
for (u_int i=0; i<TRACK_MAX_BLOCKS; i++)
154
blocks[i].exists = FALSE;
157
static void track_dump_missing(xtIndexNodeID eof_block)
159
for (u_int i=0; i<XT_NODE_ID(eof_block)-1; i++) {
160
if (!blocks[i].exists)
161
printf("block missing = %04d %s\n", i+1, blocks[i].activity);
165
static void track_dump_all(u_int max_block)
167
for (u_int i=0; i<max_block; i++) {
168
if (blocks[i].exists)
169
printf(" %04d %s\n", i+1, blocks[i].activity);
171
printf("-%04d %s\n", i+1, blocks[i].activity);
177
xtPublic void xt_ind_track_dump_block(XTTableHPtr XT_UNUSED(tab), xtIndexNodeID XT_UNUSED(address))
179
#ifdef TRACK_ACTIVITY
180
u_int i = XT_NODE_ID(address)-1;
182
printf("BLOCK %04d %s\n", i+1, blocks[i].activity);
186
#ifdef CHECK_NODE_REFERENCE
187
static xtIndexNodeID idx_get_node_ref(XTTableHPtr tab, xtWord1 *ref, u_int node_ref_size)
191
/* Node is invalid by default: */
192
XT_NODE_ID(node) = 0xFFFFEEEE;
194
ref -= node_ref_size;
195
node = XT_RET_NODE_ID(XT_GET_DISK_4(ref));
196
if (node >= tab->tab_ind_eof) {
197
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
205
* -----------------------------------------------------------------------
209
static void idx_newstack(IdxBranchStackPtr stack)
214
static xtBool idx_push(IdxBranchStackPtr stack, xtIndexNodeID n, XTIdxItemPtr pos)
216
if (stack->s_top == MAX_SEARCH_DEPTH) {
217
xt_register_error(XT_REG_CONTEXT, XT_ERR_STACK_OVERFLOW, 0, "Index node stack overflow");
220
stack->s_elements[stack->s_top].i_branch = n;
222
stack->s_elements[stack->s_top].i_pos = *pos;
227
static IdxStackItemPtr idx_pop(IdxBranchStackPtr stack)
229
if (stack->s_top == 0)
232
return &stack->s_elements[stack->s_top];
235
static IdxStackItemPtr idx_top(IdxBranchStackPtr stack)
237
if (stack->s_top == 0)
239
return &stack->s_elements[stack->s_top-1];
243
* -----------------------------------------------------------------------
244
* Allocation of nodes
248
* Allocating and freeing blocks for an index is safe because this is a structural
249
* change which requires an exclusive lock on the index!
251
static xtBool idx_new_branch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID *address)
253
register XTTableHPtr tab;
254
xtIndexNodeID wrote_pos;
255
XTIndFreeBlockRec free_block;
256
XTIndFreeListPtr list_ptr;
260
//ASSERT_NS(XT_INDEX_HAVE_XLOCK(ind, ot));
261
if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
262
ind->mi_free_list->fl_free_count--;
263
*address = ind->mi_free_list->fl_page_id[ind->mi_free_list->fl_free_count];
264
TRACK_BLOCK_ALLOC(*address);
268
xt_lock_mutex_ns(&tab->tab_ind_lock);
270
/* Check the cached free list: */
271
while ((list_ptr = tab->tab_ind_free_list)) {
272
if (list_ptr->fl_start < list_ptr->fl_free_count) {
273
wrote_pos = list_ptr->fl_page_id[list_ptr->fl_start];
274
list_ptr->fl_start++;
275
xt_unlock_mutex_ns(&tab->tab_ind_lock);
276
*address = wrote_pos;
277
TRACK_BLOCK_ALLOC(wrote_pos);
280
tab->tab_ind_free_list = list_ptr->fl_next_list;
281
xt_free_ns(list_ptr);
284
if ((XT_NODE_ID(wrote_pos) = XT_NODE_ID(tab->tab_ind_free))) {
285
xtIndexNodeID next_node;
287
/* Use the block on the free list: */
288
if (!xt_ind_read_bytes(ot, NULL, wrote_pos, sizeof(XTIndFreeBlockRec), (xtWord1 *) &free_block))
290
XT_NODE_ID(next_node) = (xtIndexNodeID) XT_GET_DISK_8(free_block.if_next_block_8);
291
if (XT_NODE_ID(next_node) >= XT_NODE_ID(tab->tab_ind_eof)) {
292
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
295
XT_NODE_ID(tab->tab_ind_free) = XT_NODE_ID(next_node);
296
xt_unlock_mutex_ns(&tab->tab_ind_lock);
297
*address = wrote_pos;
298
TRACK_BLOCK_ALLOC(wrote_pos);
302
/* PMC - Dont allow overflow! */
303
if (XT_NODE_ID(tab->tab_ind_eof) >= 0xFFFFFFF) {
304
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_FILE_TO_LARGE, xt_file_path(ot->ot_ind_file));
307
*address = tab->tab_ind_eof;
308
XT_NODE_ID(tab->tab_ind_eof)++;
309
xt_unlock_mutex_ns(&tab->tab_ind_lock);
310
TRACK_BLOCK_ALLOC(*address);
314
xt_unlock_mutex_ns(&tab->tab_ind_lock);
318
/* Add the block to the private free list of the index.
319
* On flush, this list will be transfered to the global list.
321
static xtBool idx_free_branch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID node_id)
323
register u_int count;
325
register u_int guess;
327
TRACK_BLOCK_FREE(node_id);
328
//ASSERT_NS(XT_INDEX_HAVE_XLOCK(ind, ot));
329
if (!ind->mi_free_list) {
331
if (!(ind->mi_free_list = (XTIndFreeListPtr) xt_calloc_ns(offsetof(XTIndFreeListRec, fl_page_id) + 10 * sizeof(xtIndexNodeID))))
335
count = ind->mi_free_list->fl_free_count;
336
if (!xt_realloc_ns((void **) &ind->mi_free_list, offsetof(XTIndFreeListRec, fl_page_id) + (count + 1) * sizeof(xtIndexNodeID)))
342
guess = (i + count - 1) >> 1;
343
if (XT_NODE_ID(node_id) == XT_NODE_ID(ind->mi_free_list->fl_page_id[guess])) {
344
// Should not happen...
348
if (XT_NODE_ID(node_id) < XT_NODE_ID(ind->mi_free_list->fl_page_id[guess]))
354
/* Insert at position i */
355
memmove(ind->mi_free_list->fl_page_id + i + 1, ind->mi_free_list->fl_page_id + i, (ind->mi_free_list->fl_free_count - i) * sizeof(xtIndexNodeID));
356
ind->mi_free_list->fl_page_id[i] = node_id;
357
ind->mi_free_list->fl_free_count++;
359
/* Set the cache page to clean: */
360
return xt_ind_free_block(ot, ind, node_id);
364
* -----------------------------------------------------------------------
365
* Simple compare functions
368
xtPublic int xt_compare_2_int4(XTIndexPtr XT_UNUSED(ind), uint key_length, xtWord1 *key_value, xtWord1 *b_value)
372
ASSERT_NS(key_length == 4 || key_length == 8);
373
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
374
if (r == 0 && key_length > 4) {
377
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
382
xtPublic int xt_compare_3_int4(XTIndexPtr XT_UNUSED(ind), uint key_length, xtWord1 *key_value, xtWord1 *b_value)
386
ASSERT_NS(key_length == 4 || key_length == 8 || key_length == 12);
387
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
388
if (r == 0 && key_length > 4) {
391
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
392
if (r == 0 && key_length > 8) {
395
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
402
* -----------------------------------------------------------------------
403
* Tree branch sanning (searching nodes and leaves)
406
xtPublic void xt_scan_branch_single(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
411
u_int full_item_size;
413
register xtWord1 *base;
415
register xtWord1 *bitem;
418
branch_size = XT_GET_DISK_2(branch->tb_size_2);
419
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
421
result->sr_found = FALSE;
422
result->sr_duplicate = FALSE;
423
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
424
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
426
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
427
full_item_size = result->sr_item.i_item_size + node_ref_size;
428
result->sr_item.i_node_ref_size = node_ref_size;
430
search_flags = value->sv_flags;
431
base = branch->tb_data + node_ref_size;
432
total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
433
if (search_flags & XT_SEARCH_FIRST_FLAG)
435
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
438
register u_int guess;
439
register u_int count;
441
xtRecordID key_record;
443
key_record = value->sv_rec_id;
449
guess = (i + count - 1) >> 1;
451
bitem = base + guess * full_item_size;
453
switch (ind->mi_single_type) {
454
case HA_KEYTYPE_LONG_INT: {
455
register xtInt4 a, b;
457
a = XT_GET_DISK_4(value->sv_key);
458
b = XT_GET_DISK_4(bitem);
459
r = (a < b) ? -1 : (a == b ? 0 : 1);
462
case HA_KEYTYPE_ULONG_INT: {
463
register xtWord4 a, b;
465
a = XT_GET_DISK_4(value->sv_key);
466
b = XT_GET_DISK_4(bitem);
467
r = (a < b) ? -1 : (a == b ? 0 : 1);
471
/* Should not happen: */
476
if (search_flags & XT_SEARCH_WHOLE_KEY) {
477
xtRecordID item_record;
480
xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
482
/* This should not happen because we should never
483
* try to insert the same record twice into the
486
result->sr_duplicate = TRUE;
487
if (key_record == item_record) {
488
result->sr_found = TRUE;
489
result->sr_rec_id = item_record;
490
result->sr_row_id = row_id;
491
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
492
result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
495
if (key_record < item_record)
501
result->sr_found = TRUE;
502
/* -1 causes a search to the beginning of the duplicate list of keys.
503
* 1 causes a search to just after the key.
505
if (search_flags & XT_SEARCH_AFTER_KEY)
519
bitem = base + i * full_item_size;
520
xt_get_res_record_ref(bitem + ind->mi_key_size, result);
521
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
522
result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
523
#ifdef IND_SKEW_SPLIT_ON_APPEND
524
if (i != total_count)
525
result->sr_last_item = FALSE;
530
* We use a special binary search here. It basically assumes that the values
531
* in the index are not unique.
533
* Even if they are unique, when we search for part of a key, then it is
534
* effectively the case.
536
* So in the situation where we find duplicates in the index we usually
537
* want to position ourselves at the beginning of the duplicate list.
539
* Alternatively a search can find the position just after a given key.
541
* To achieve this we make the following modifications:
542
* - The result of the comparison is always returns 1 or -1. We only stop
543
* the search early in the case an exact match when inserting (but this
544
* should not happen anyway).
545
* - The search never actually fails, but sets 'found' to TRUE if it
546
* sees the search key in the index.
548
* If the search value exists in the index we know that
549
* this method will take us to the first occurrence of the key in the
550
* index (in the case of -1) or to the first value after the
551
* the search key in the case of 1.
553
xtPublic void xt_scan_branch_fix(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
558
u_int full_item_size;
565
branch_size = XT_GET_DISK_2(branch->tb_size_2);
566
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
568
result->sr_found = FALSE;
569
result->sr_duplicate = FALSE;
570
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
571
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
573
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
574
full_item_size = result->sr_item.i_item_size + node_ref_size;
575
result->sr_item.i_node_ref_size = node_ref_size;
577
search_flags = value->sv_flags;
578
base = branch->tb_data + node_ref_size;
579
total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
580
if (search_flags & XT_SEARCH_FIRST_FLAG)
582
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
585
register u_int guess;
586
register u_int count;
587
xtRecordID key_record;
590
key_record = value->sv_rec_id;
596
guess = (i + count - 1) >> 1;
598
bitem = base + guess * full_item_size;
600
r = myxt_compare_key(ind, search_flags, value->sv_length, value->sv_key, bitem);
603
if (search_flags & XT_SEARCH_WHOLE_KEY) {
604
xtRecordID item_record;
607
xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
609
/* This should not happen because we should never
610
* try to insert the same record twice into the
613
result->sr_duplicate = TRUE;
614
if (key_record == item_record) {
615
result->sr_found = TRUE;
616
result->sr_rec_id = item_record;
617
result->sr_row_id = row_id;
618
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
619
result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
622
if (key_record < item_record)
628
result->sr_found = TRUE;
629
/* -1 causes a search to the beginning of the duplicate list of keys.
630
* 1 causes a search to just after the key.
632
if (search_flags & XT_SEARCH_AFTER_KEY)
646
bitem = base + i * full_item_size;
647
xt_get_res_record_ref(bitem + ind->mi_key_size, result);
648
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
649
result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
650
#ifdef IND_SKEW_SPLIT_ON_APPEND
651
if (i != total_count)
652
result->sr_last_item = FALSE;
656
xtPublic void xt_scan_branch_fix_simple(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
661
u_int full_item_size;
668
branch_size = XT_GET_DISK_2(branch->tb_size_2);
669
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
671
result->sr_found = FALSE;
672
result->sr_duplicate = FALSE;
673
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
674
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
676
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
677
full_item_size = result->sr_item.i_item_size + node_ref_size;
678
result->sr_item.i_node_ref_size = node_ref_size;
680
search_flags = value->sv_flags;
681
base = branch->tb_data + node_ref_size;
682
total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
683
if (search_flags & XT_SEARCH_FIRST_FLAG)
685
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
688
register u_int guess;
689
register u_int count;
690
xtRecordID key_record;
693
key_record = value->sv_rec_id;
699
guess = (i + count - 1) >> 1;
701
bitem = base + guess * full_item_size;
703
r = ind->mi_simple_comp_key(ind, value->sv_length, value->sv_key, bitem);
706
if (search_flags & XT_SEARCH_WHOLE_KEY) {
707
xtRecordID item_record;
710
xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
712
/* This should not happen because we should never
713
* try to insert the same record twice into the
716
result->sr_duplicate = TRUE;
717
if (key_record == item_record) {
718
result->sr_found = TRUE;
719
result->sr_rec_id = item_record;
720
result->sr_row_id = row_id;
721
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
722
result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
725
if (key_record < item_record)
731
result->sr_found = TRUE;
732
/* -1 causes a search to the beginning of the duplicate list of keys.
733
* 1 causes a search to just after the key.
735
if (search_flags & XT_SEARCH_AFTER_KEY)
749
bitem = base + i * full_item_size;
750
xt_get_res_record_ref(bitem + ind->mi_key_size, result);
751
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
752
result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
753
#ifdef IND_SKEW_SPLIT_ON_APPEND
754
if (i != total_count)
755
result->sr_last_item = FALSE;
760
* Variable length key values are stored as a sorted list. Since each list item has a variable length, we
761
* must scan the list sequentially in order to find a key.
763
xtPublic void xt_scan_branch_var(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
774
branch_size = XT_GET_DISK_2(branch->tb_size_2);
775
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
777
result->sr_found = FALSE;
778
result->sr_duplicate = FALSE;
779
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
780
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
782
result->sr_item.i_node_ref_size = node_ref_size;
784
search_flags = value->sv_flags;
785
base = branch->tb_data + node_ref_size;
787
bend = &branch->tb_data[result->sr_item.i_total_size];
792
if (search_flags & XT_SEARCH_FIRST_FLAG)
793
ilen = myxt_get_key_length(ind, bitem);
794
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG) {
799
xtRecordID key_record;
802
key_record = value->sv_rec_id;
805
while (bitem < bend) {
806
ilen = myxt_get_key_length(ind, bitem);
807
r = myxt_compare_key(ind, search_flags, value->sv_length, value->sv_key, bitem);
809
if (search_flags & XT_SEARCH_WHOLE_KEY) {
810
xtRecordID item_record;
813
xt_get_record_ref(bitem + ilen, &item_record, &row_id);
815
/* This should not happen because we should never
816
* try to insert the same record twice into the
819
result->sr_duplicate = TRUE;
820
if (key_record == item_record) {
821
result->sr_found = TRUE;
822
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
823
result->sr_rec_id = item_record;
824
result->sr_row_id = row_id;
825
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
826
result->sr_item.i_item_offset = bitem - branch->tb_data;
829
if (key_record < item_record)
835
result->sr_found = TRUE;
836
/* -1 causes a search to the beginning of the duplicate list of keys.
837
* 1 causes a search to just after the key.
839
if (search_flags & XT_SEARCH_AFTER_KEY)
847
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
852
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
853
xt_get_res_record_ref(bitem + ilen, result);
854
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
855
result->sr_item.i_item_offset = bitem - branch->tb_data;
856
#ifdef IND_SKEW_SPLIT_ON_APPEND
858
result->sr_last_item = FALSE;
862
/* Go to the next item in the node. */
863
static void idx_next_branch_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultRec *result)
869
result->sr_item.i_item_offset += result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
870
bitem = branch->tb_data + result->sr_item.i_item_offset;
871
if (result->sr_item.i_item_offset < result->sr_item.i_total_size) {
873
ilen = result->sr_item.i_item_size;
875
ilen = myxt_get_key_length(ind, bitem) + XT_RECORD_REF_SIZE;
876
result->sr_item.i_item_size = ilen;
878
xt_get_res_record_ref(bitem + ilen - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
881
result->sr_item.i_item_size = 0;
882
result->sr_rec_id = 0;
883
result->sr_row_id = 0;
885
if (result->sr_item.i_node_ref_size)
886
/* IDX_GET_NODE_REF() loads the branch reference to the LEFT of the item. */
887
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
889
result->sr_branch = 0;
892
xtPublic void xt_prev_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultRec *result)
895
ASSERT_NS(result->sr_item.i_item_offset >= result->sr_item.i_item_size + result->sr_item.i_node_ref_size + result->sr_item.i_node_ref_size);
896
result->sr_item.i_item_offset -= (result->sr_item.i_item_size + result->sr_item.i_node_ref_size);
897
xt_get_res_record_ref(branch->tb_data + result->sr_item.i_item_offset + result->sr_item.i_item_size - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
898
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, result->sr_item.i_node_ref_size);
901
xtPublic void xt_prev_branch_item_var(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultRec *result)
908
bitem = branch->tb_data + result->sr_item.i_node_ref_size;
909
bend = &branch->tb_data[result->sr_item.i_item_offset];
911
ilen = myxt_get_key_length(ind, bitem);
912
if (bitem + ilen + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size >= bend)
914
bitem += ilen + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size;
917
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
918
xt_get_res_record_ref(bitem + ilen, result); /* (Only valid if i_item_offset < i_total_size) */
919
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
920
result->sr_item.i_item_offset = bitem - branch->tb_data;
923
static void idx_reload_item_fix(XTIndexPtr XT_NDEBUG_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultPtr result)
927
branch_size = XT_GET_DISK_2(branch->tb_size_2);
928
ASSERT_NS(result->sr_item.i_node_ref_size == (XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0));
929
ASSERT_NS(result->sr_item.i_item_size == ind->mi_key_size + XT_RECORD_REF_SIZE);
930
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
931
if (result->sr_item.i_item_offset > result->sr_item.i_total_size)
932
result->sr_item.i_item_offset = result->sr_item.i_total_size;
933
xt_get_res_record_ref(&branch->tb_data[result->sr_item.i_item_offset + result->sr_item.i_item_size - XT_RECORD_REF_SIZE], result);
936
static void idx_first_branch_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
943
branch_size = XT_GET_DISK_2(branch->tb_size_2);
944
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
946
result->sr_found = FALSE;
947
result->sr_duplicate = FALSE;
948
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
949
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
952
key_data_size = ind->mi_key_size;
956
bitem = branch->tb_data + node_ref_size;
957
if (bitem < &branch->tb_data[result->sr_item.i_total_size])
958
key_data_size = myxt_get_key_length(ind, bitem);
963
result->sr_item.i_item_size = key_data_size + XT_RECORD_REF_SIZE;
964
result->sr_item.i_node_ref_size = node_ref_size;
966
xt_get_res_record_ref(branch->tb_data + node_ref_size + key_data_size, result);
967
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + node_ref_size, node_ref_size); /* Only valid if this is a node. */
968
result->sr_item.i_item_offset = node_ref_size;
972
* Last means different things for leaf or node!
974
xtPublic void xt_last_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
980
branch_size = XT_GET_DISK_2(branch->tb_size_2);
981
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
983
result->sr_found = FALSE;
984
result->sr_duplicate = FALSE;
985
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
986
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
988
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
989
result->sr_item.i_node_ref_size = node_ref_size;
992
result->sr_item.i_item_offset = result->sr_item.i_total_size;
993
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, node_ref_size);
996
if (result->sr_item.i_total_size) {
997
result->sr_item.i_item_offset = result->sr_item.i_total_size - result->sr_item.i_item_size;
998
xt_get_res_record_ref(branch->tb_data + result->sr_item.i_item_offset + ind->mi_key_size, result);
1001
/* Leaf is empty: */
1002
result->sr_item.i_item_offset = 0;
1006
xtPublic void xt_last_branch_item_var(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
1010
u_int node_ref_size;
1012
branch_size = XT_GET_DISK_2(branch->tb_size_2);
1013
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
1015
result->sr_found = FALSE;
1016
result->sr_duplicate = FALSE;
1017
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
1018
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
1020
result->sr_item.i_node_ref_size = node_ref_size;
1022
if (node_ref_size) {
1023
result->sr_item.i_item_offset = result->sr_item.i_total_size;
1024
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, node_ref_size);
1025
result->sr_item.i_item_size = 0;
1028
if (result->sr_item.i_total_size) {
1033
bitem = branch->tb_data + node_ref_size;;
1034
bend = &branch->tb_data[result->sr_item.i_total_size];
1038
ilen = myxt_get_key_length(ind, bitem);
1039
if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend)
1041
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1045
result->sr_item.i_item_offset = bitem - branch->tb_data;
1046
xt_get_res_record_ref(bitem + ilen, result);
1047
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
1050
/* Leaf is empty: */
1051
result->sr_item.i_item_offset = 0;
1052
result->sr_item.i_item_size = 0;
1057
xtPublic xtBool xt_idx_lazy_delete_on_leaf(XTIndexPtr ind, XTIndBlockPtr block, xtWord2 branch_size)
1059
ASSERT_NS(ind->mi_fix_key);
1061
/* Compact the leaf if more than half the items that fit on the page
1063
if (block->cp_del_count >= ind->mi_max_items/2)
1066
/* Compact the page if there is only 1 (or less) valid item left: */
1067
if ((u_int) block->cp_del_count+1 >= ((u_int) branch_size - 2)/(ind->mi_key_size + XT_RECORD_REF_SIZE))
1073
static xtBool idx_lazy_delete_on_node(XTIndexPtr ind, XTIndBlockPtr block, register XTIdxItemPtr item)
1075
ASSERT_NS(ind->mi_fix_key);
1077
/* Compact the node if more than 1/4 of the items that fit on the page
1079
if (block->cp_del_count >= ind->mi_max_items/4)
1082
/* Compact the page if there is only 1 (or less) valid item left: */
1083
if ((u_int) block->cp_del_count+1 >= (item->i_total_size - item->i_node_ref_size)/(item->i_item_size + item->i_node_ref_size))
1089
inline static xtBool idx_cmp_item_key_fix(XTIndReferencePtr iref, register XTIdxItemPtr item, XTIdxKeyValuePtr value)
1093
data = &iref->ir_branch->tb_data[item->i_item_offset];
1094
return memcmp(data, value->sv_key, value->sv_length) == 0;
1097
inline static void idx_set_item_key_fix(XTIndReferencePtr iref, register XTIdxItemPtr item, XTIdxKeyValuePtr value)
1101
data = &iref->ir_branch->tb_data[item->i_item_offset];
1102
memcpy(data, value->sv_key, value->sv_length);
1103
xt_set_val_record_ref(data + value->sv_length, value);
1104
#ifdef IND_OPT_DATA_WRITTEN
1105
if (item->i_item_offset < iref->ir_block->cb_min_pos)
1106
iref->ir_block->cb_min_pos = item->i_item_offset;
1107
if (item->i_item_offset + value->sv_length > iref->ir_block->cb_max_pos)
1108
iref->ir_block->cb_max_pos = item->i_item_offset + value->sv_length;
1109
ASSERT_NS(iref->ir_block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1110
ASSERT_NS(iref->ir_block->cb_min_pos <= iref->ir_block->cb_max_pos);
1112
iref->ir_updated = TRUE;
1115
static xtBool idx_set_item_row_id(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item, xtRowID row_id)
1117
register XTIndBlockPtr block = iref->ir_block;
1121
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1122
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1123
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1124
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1130
/* This is the offset of the reference in the item we found: */
1131
item->i_item_offset +item->i_item_size - XT_RECORD_REF_SIZE +
1132
/* This is the offset of the row id in the reference: */
1134
data = &iref->ir_branch->tb_data[offset];
1136
/* This update does not change the structure of page, so we do it without
1137
* copying the page before we write.
1139
XT_SET_DISK_4(data, row_id);
1140
#ifdef IND_OPT_DATA_WRITTEN
1141
if (offset < block->cb_min_pos)
1142
block->cb_min_pos = offset;
1143
if (offset + XT_ROW_ID_SIZE > block->cb_max_pos)
1144
block->cb_max_pos = offset + XT_ROW_ID_SIZE;
1145
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1146
ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1148
iref->ir_updated = TRUE;
1152
inline static xtBool idx_is_item_deleted(register XTIdxBranchDPtr branch, register XTIdxItemPtr item)
1156
data = &branch->tb_data[item->i_item_offset + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE];
1157
return XT_GET_DISK_4(data) == (xtRowID) -1;
1160
static xtBool idx_set_item_deleted(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1162
if (!idx_set_item_row_id(ot, ind, iref, item, (xtRowID) -1))
1165
/* This should be safe because there is only one thread,
1166
* the sweeper, that does this!
1168
* Threads that decrement this value have an xlock on
1169
* the page, or the index.
1171
iref->ir_block->cp_del_count++;
1176
* {LAZY-DEL-INDEX-ITEMS}
1177
* Do a lazy delete of an item by just setting the Row ID
1178
* to the delete indicator: row ID -1.
1180
static xtBool idx_lazy_delete_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1182
if (!idx_set_item_deleted(ot, ind, iref, item))
1184
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_R_UPDATE, iref);
1189
* This function compacts the leaf, but preserves the
1190
* position of the item.
1192
static xtBool idx_compact_leaf(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1194
register XTIndBlockPtr block = iref->ir_block;
1195
register XTIdxBranchDPtr branch = iref->ir_branch;
1196
int item_idx, count, i, idx;
1203
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1204
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1205
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1206
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1211
if (block->cb_handle_count) {
1212
if (!xt_ind_copy_on_write(iref)) {
1213
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1218
ASSERT_NS(!item->i_node_ref_size);
1219
ASSERT_NS(ind->mi_fix_key);
1220
size = item->i_item_size;
1221
count = item->i_total_size / size;
1222
item_idx = item->i_item_offset / size;
1223
s_data = d_data = branch->tb_data;
1225
for (i=0; i<count; i++) {
1226
data = s_data + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE;
1227
row_id = XT_GET_DISK_4(data);
1228
if (row_id == (xtRowID) -1) {
1233
if (d_data != s_data)
1234
memcpy(d_data, s_data, size);
1240
block->cp_del_count = 0;
1241
item->i_total_size = d_data - branch->tb_data;
1242
ASSERT_NS(idx * size == item->i_total_size);
1243
item->i_item_offset = item_idx * size;
1244
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, 0));
1245
#ifdef IND_OPT_DATA_WRITTEN
1246
block->cb_header = TRUE;
1247
block->cb_min_pos = 0;
1248
block->cb_max_pos = item->i_total_size;
1249
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1250
ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1252
iref->ir_updated = TRUE;
1256
static xtBool idx_lazy_remove_leaf_item_right(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1258
register XTIndBlockPtr block = iref->ir_block;
1259
register XTIdxBranchDPtr branch = iref->ir_branch;
1260
int item_idx, count, i;
1267
ASSERT_NS(!item->i_node_ref_size);
1269
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1270
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1271
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1272
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1277
if (block->cb_handle_count) {
1278
if (!xt_ind_copy_on_write(iref)) {
1279
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, iref);
1284
ASSERT_NS(ind->mi_fix_key);
1285
size = item->i_item_size;
1286
count = item->i_total_size / size;
1287
item_idx = item->i_item_offset / size;
1288
s_data = d_data = branch->tb_data;
1289
for (i=0; i<count; i++) {
1291
item->i_item_offset = d_data - branch->tb_data;
1293
data = s_data + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE;
1294
row_id = XT_GET_DISK_4(data);
1295
if (row_id != (xtRowID) -1) {
1296
if (d_data != s_data)
1297
memcpy(d_data, s_data, size);
1303
block->cp_del_count = 0;
1304
item->i_total_size = d_data - branch->tb_data;
1305
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, 0));
1306
#ifdef IND_OPT_DATA_WRITTEN
1307
block->cb_header = TRUE;
1308
block->cb_min_pos = 0;
1309
block->cb_max_pos = item->i_total_size;
1310
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1311
ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1313
iref->ir_updated = TRUE;
1314
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, iref);
1319
* Remove an item and save to disk.
1321
static xtBool idx_remove_branch_item_right(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID, XTIndReferencePtr iref, register XTIdxItemPtr item)
1323
register XTIndBlockPtr block = iref->ir_block;
1324
register XTIdxBranchDPtr branch = iref->ir_branch;
1325
u_int size = item->i_item_size + item->i_node_ref_size;
1327
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1328
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1329
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1330
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1335
/* {HANDLE-COUNT-USAGE}
1336
* This access is safe because we have the right to update
1337
* the page, so no other thread can modify the page.
1340
* We either have an Xlock on the index, or we have
1341
* an Xlock on the cache block.
1343
if (block->cb_handle_count) {
1344
if (!xt_ind_copy_on_write(iref)) {
1345
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
1349
if (ind->mi_lazy_delete) {
1350
if (idx_is_item_deleted(branch, item))
1351
block->cp_del_count--;
1353
/* Remove the node reference to the left of the item: */
1354
memmove(&branch->tb_data[item->i_item_offset],
1355
&branch->tb_data[item->i_item_offset + size],
1356
item->i_total_size - item->i_item_offset - size);
1357
item->i_total_size -= size;
1358
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, item->i_node_ref_size));
1359
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(branch->tb_size_2));
1360
#ifdef IND_OPT_DATA_WRITTEN
1361
block->cb_header = TRUE;
1362
if (item->i_item_offset < block->cb_min_pos)
1363
block->cb_min_pos = item->i_item_offset;
1364
block->cb_max_pos = item->i_total_size;
1365
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1366
ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1368
iref->ir_updated = TRUE;
1369
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE, iref);
1373
static xtBool idx_remove_branch_item_left(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID, XTIndReferencePtr iref, register XTIdxItemPtr item, xtBool *lazy_delete_cleanup_required)
1375
register XTIndBlockPtr block = iref->ir_block;
1376
register XTIdxBranchDPtr branch = iref->ir_branch;
1377
u_int size = item->i_item_size + item->i_node_ref_size;
1379
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1380
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1381
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1382
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1387
ASSERT_NS(item->i_node_ref_size);
1388
if (block->cb_handle_count) {
1389
if (!xt_ind_copy_on_write(iref)) {
1390
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
1394
if (ind->mi_lazy_delete) {
1395
if (idx_is_item_deleted(branch, item))
1396
block->cp_del_count--;
1397
if (lazy_delete_cleanup_required)
1398
*lazy_delete_cleanup_required = idx_lazy_delete_on_node(ind, block, item);
1400
/* Remove the node reference to the left of the item: */
1401
memmove(&branch->tb_data[item->i_item_offset - item->i_node_ref_size],
1402
&branch->tb_data[item->i_item_offset + item->i_item_size],
1403
item->i_total_size - item->i_item_offset - item->i_item_size);
1404
item->i_total_size -= size;
1405
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, item->i_node_ref_size));
1406
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(branch->tb_size_2));
1407
#ifdef IND_OPT_DATA_WRITTEN
1408
block->cb_header = TRUE;
1409
if (item->i_item_offset - item->i_node_ref_size < block->cb_min_pos)
1410
block->cb_min_pos = item->i_item_offset - item->i_node_ref_size;
1411
block->cb_max_pos = item->i_total_size;
1412
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1413
ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1415
iref->ir_updated = TRUE;
1416
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE, iref);
1420
static void idx_insert_leaf_item(XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr leaf, XTIdxKeyValuePtr value, XTIdxResultPtr result)
1424
/* This will ensure we do not overwrite the end of the buffer: */
1425
ASSERT_NS(value->sv_length <= XT_INDEX_MAX_KEY_SIZE);
1426
memmove(&leaf->tb_data[result->sr_item.i_item_offset + value->sv_length + XT_RECORD_REF_SIZE],
1427
&leaf->tb_data[result->sr_item.i_item_offset],
1428
result->sr_item.i_total_size - result->sr_item.i_item_offset);
1429
item = &leaf->tb_data[result->sr_item.i_item_offset];
1430
memcpy(item, value->sv_key, value->sv_length);
1431
xt_set_val_record_ref(item + value->sv_length, value);
1432
result->sr_item.i_total_size += value->sv_length + XT_RECORD_REF_SIZE;
1433
XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_LEAF_SIZE(result->sr_item.i_total_size));
1436
static void idx_insert_node_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr leaf, XTIdxKeyValuePtr value, XTIdxResultPtr result, xtIndexNodeID branch)
1440
/* This will ensure we do not overwrite the end of the buffer: */
1441
ASSERT_NS(value->sv_length <= XT_INDEX_MAX_KEY_SIZE);
1442
memmove(&leaf->tb_data[result->sr_item.i_item_offset + value->sv_length + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size],
1443
&leaf->tb_data[result->sr_item.i_item_offset],
1444
result->sr_item.i_total_size - result->sr_item.i_item_offset);
1445
item = &leaf->tb_data[result->sr_item.i_item_offset];
1446
memcpy(item, value->sv_key, value->sv_length);
1447
xt_set_val_record_ref(item + value->sv_length, value);
1448
XT_SET_NODE_REF(tab, item + value->sv_length + XT_RECORD_REF_SIZE, branch);
1449
result->sr_item.i_total_size += value->sv_length + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size;
1450
XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_NODE_SIZE(result->sr_item.i_total_size));
1453
static xtBool idx_get_middle_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result)
1457
ASSERT_NS(result->sr_item.i_node_ref_size == 0 || result->sr_item.i_node_ref_size == XT_NODE_REF_SIZE);
1458
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE*2);
1459
if (ind->mi_fix_key) {
1460
u_int full_item_size = result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
1462
result->sr_item.i_item_offset = ((result->sr_item.i_total_size - result->sr_item.i_node_ref_size)
1463
/ full_item_size / 2 * full_item_size) + result->sr_item.i_node_ref_size;
1464
#ifdef IND_SKEW_SPLIT_ON_APPEND
1465
if (result->sr_last_item) {
1468
offset = result->sr_item.i_total_size - full_item_size * 2;
1469
/* We actually split at the item before last! */
1470
if (offset > result->sr_item.i_item_offset)
1471
result->sr_item.i_item_offset = offset;
1475
bitem = &branch->tb_data[result->sr_item.i_item_offset];
1476
value->sv_flags = XT_SEARCH_WHOLE_KEY;
1477
value->sv_length = result->sr_item.i_item_size - XT_RECORD_REF_SIZE;
1478
xt_get_record_ref(bitem + value->sv_length, &value->sv_rec_id, &value->sv_row_id);
1479
memcpy(value->sv_key, bitem, value->sv_length);
1482
u_int node_ref_size;
1486
node_ref_size = result->sr_item.i_node_ref_size;
1487
bitem = branch->tb_data + node_ref_size;
1488
bend = &branch->tb_data[(result->sr_item.i_total_size - node_ref_size) / 2 + node_ref_size];
1489
#ifdef IND_SKEW_SPLIT_ON_APPEND
1490
if (result->sr_last_item)
1491
bend = &branch->tb_data[XT_INDEX_PAGE_DATA_SIZE];
1493
u_int prev_ilen = 0;
1494
xtWord1 *prev_bitem = NULL;
1500
ilen = myxt_get_key_length(ind, bitem);
1501
tlen += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1502
if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend) {
1503
if (ilen > XT_INDEX_PAGE_SIZE || tlen > result->sr_item.i_total_size) {
1504
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
1509
#ifdef IND_SKEW_SPLIT_ON_APPEND
1513
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1517
#ifdef IND_SKEW_SPLIT_ON_APPEND
1518
/* We actully want the item before last! */
1519
if (result->sr_last_item && prev_bitem) {
1524
result->sr_item.i_item_offset = bitem - branch->tb_data;
1525
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
1527
value->sv_flags = XT_SEARCH_WHOLE_KEY;
1528
value->sv_length = ilen;
1529
xt_get_record_ref(bitem + ilen, &value->sv_rec_id, &value->sv_row_id);
1530
memcpy(value->sv_key, bitem, value->sv_length);
1535
static size_t idx_write_branch_item(XTIndexPtr XT_UNUSED(ind), xtWord1 *item, XTIdxKeyValuePtr value)
1537
memcpy(item, value->sv_key, value->sv_length);
1538
xt_set_val_record_ref(item + value->sv_length, value);
1539
return value->sv_length + XT_RECORD_REF_SIZE;
1542
static xtBool idx_replace_node_key(XTOpenTablePtr ot, XTIndexPtr ind, IdxStackItemPtr item, IdxBranchStackPtr stack, u_int item_size, xtWord1 *item_buf)
1544
XTIndReferenceRec iref;
1545
xtIndexNodeID new_branch;
1546
XTIdxResultRec result;
1547
xtIndexNodeID current = item->i_branch;
1549
XTIdxBranchDPtr new_branch_ptr;
1550
XTIdxKeyValueRec key_value;
1551
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
1555
iref.ir_updated = 2;
1557
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_WRITE, &iref))
1560
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1561
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1562
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1566
if (iref.ir_block->cb_handle_count) {
1567
if (!xt_ind_copy_on_write(&iref))
1571
if (ind->mi_lazy_delete) {
1572
ASSERT_NS(item_size == item->i_pos.i_item_size);
1573
if (idx_is_item_deleted(iref.ir_branch, &item->i_pos))
1574
iref.ir_block->cp_del_count--;
1577
if (item->i_pos.i_total_size + item_size - item->i_pos.i_item_size <= XT_INDEX_PAGE_DATA_SIZE) {
1578
/* The new item is larger than the old, this can result
1579
* in overflow of the node!
1581
memmove(&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item_size],
1582
&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item->i_pos.i_item_size],
1583
item->i_pos.i_total_size - item->i_pos.i_item_offset - item->i_pos.i_item_size);
1584
memcpy(&iref.ir_branch->tb_data[item->i_pos.i_item_offset],
1585
item_buf, item_size);
1586
if (ind->mi_lazy_delete) {
1587
if (idx_is_item_deleted(iref.ir_branch, &item->i_pos))
1588
iref.ir_block->cp_del_count++;
1590
item->i_pos.i_total_size = item->i_pos.i_total_size + item_size - item->i_pos.i_item_size;
1591
XT_SET_DISK_2(iref.ir_branch->tb_size_2, XT_MAKE_NODE_SIZE(item->i_pos.i_total_size));
1592
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(iref.ir_branch->tb_size_2));
1593
#ifdef IND_OPT_DATA_WRITTEN
1594
iref.ir_block->cb_header = TRUE;
1595
if (item->i_pos.i_item_offset < iref.ir_block->cb_min_pos)
1596
iref.ir_block->cb_min_pos = item->i_pos.i_item_offset;
1597
iref.ir_block->cb_max_pos = item->i_pos.i_total_size;
1598
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1600
iref.ir_updated = TRUE;
1603
ASSERT_NS(item->i_pos.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
1605
return xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
1608
/* The node has overflowed!! */
1609
#ifdef IND_SKEW_SPLIT_ON_APPEND
1610
result.sr_last_item = FALSE;
1612
result.sr_item = item->i_pos;
1614
memcpy(ot->ot_ind_wbuf.tb_data, iref.ir_branch->tb_data, item->i_pos.i_item_offset); // First part of the buffer
1615
memcpy(&ot->ot_ind_wbuf.tb_data[item->i_pos.i_item_offset], item_buf, item_size); // The new item
1616
memcpy(&ot->ot_ind_wbuf.tb_data[item->i_pos.i_item_offset + item_size],
1617
&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item->i_pos.i_item_size],
1618
item->i_pos.i_total_size - item->i_pos.i_item_offset - item->i_pos.i_item_size);
1619
item->i_pos.i_total_size += item_size - item->i_pos.i_item_size;
1620
item->i_pos.i_item_size = item_size;
1621
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(item->i_pos.i_total_size));
1622
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1623
ASSERT_NS(item->i_pos.i_total_size > XT_INDEX_PAGE_DATA_SIZE && item->i_pos.i_total_size <= XT_INDEX_PAGE_DATA_SIZE*2);
1625
/* Adjust the stack (we want the parents of the delete node): */
1627
if (idx_pop(stack) == item)
1631
/* We assume that value can be overwritten (which is the case) */
1632
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
1633
key_value.sv_key = key_buf;
1634
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, &key_value, &result))
1637
if (!idx_new_branch(ot, ind, &new_branch))
1640
/* Split the node: */
1641
new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
1642
new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
1643
memmove(new_branch_ptr->tb_data, &iref.ir_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
1645
XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_NODE_SIZE(new_size));
1646
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
1647
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
1650
/* Change the size of the old branch: */
1651
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(result.sr_item.i_item_offset));
1652
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1653
memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
1654
#ifdef IND_OPT_DATA_WRITTEN
1655
iref.ir_block->cb_header = TRUE;
1656
if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
1657
iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
1658
iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
1659
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1660
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1662
iref.ir_updated = TRUE;
1663
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
1665
/* Insert the new branch into the parent node, using the new middle key value: */
1666
if (!idx_insert_node(ot, ind, stack, FALSE, &key_value, new_branch)) {
1668
* TODO: Mark the index as corrupt.
1669
* This should not fail because everything has been
1671
* However, if it does fail the index
1673
* I could modify and release the branch above,
1675
* But that would mean holding the lock longer,
1676
* and also may not help because idx_insert_node()
1679
idx_free_branch(ot, ind, new_branch);
1686
idx_free_branch(ot, ind, new_branch);
1689
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, &iref);
1695
* -----------------------------------------------------------------------
1696
* Standard b-tree insert
1700
* Insert the given branch into the node on the top of the stack. If the stack
1701
* is empty we need to add a new root.
1703
static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, xtBool last_item, XTIdxKeyValuePtr key_value, xtIndexNodeID branch)
1705
IdxStackItemPtr stack_item;
1706
xtIndexNodeID new_branch;
1708
xtIndexNodeID current;
1709
XTIndReferenceRec iref;
1710
XTIdxResultRec result;
1712
XTIdxBranchDPtr new_branch_ptr;
1713
#ifdef IND_OPT_DATA_WRITTEN
1719
iref.ir_updated = 2;
1721
/* Insert a new branch (key, data)... */
1722
if (!(stack_item = idx_pop(stack))) {
1726
if (!idx_new_branch(ot, ind, &new_branch))
1729
ditem = ot->ot_ind_wbuf.tb_data;
1730
XT_SET_NODE_REF(ot->ot_table, ditem, ind->mi_root);
1731
ditem += XT_NODE_REF_SIZE;
1732
ditem += idx_write_branch_item(ind, ditem, key_value);
1733
XT_SET_NODE_REF(ot->ot_table, ditem, branch);
1734
ditem += XT_NODE_REF_SIZE;
1735
size = ditem - ot->ot_ind_wbuf.tb_data;
1736
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(size));
1737
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1738
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + size, (xtWord1 *) &ot->ot_ind_wbuf))
1740
ind->mi_root = new_branch;
1744
current = stack_item->i_branch;
1745
/* This read does not count (towards ot_ind_reads), because we are only
1746
* counting each loaded page once. We assume that the page is in
1747
* cache, and will remain in cache when we read again below for the
1748
* purpose of update.
1750
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
1752
ASSERT_NS(XT_IS_NODE(XT_GET_DISK_2(iref.ir_branch->tb_size_2)));
1753
#ifdef IND_SKEW_SPLIT_ON_APPEND
1754
result.sr_last_item = last_item;
1756
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
1758
if (result.sr_item.i_total_size + key_value->sv_length + XT_RECORD_REF_SIZE + result.sr_item.i_node_ref_size <= XT_INDEX_PAGE_DATA_SIZE) {
1759
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1760
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1761
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1765
if (iref.ir_block->cb_handle_count) {
1766
if (!xt_ind_copy_on_write(&iref))
1770
idx_insert_node_item(ot->ot_table, ind, iref.ir_branch, key_value, &result, branch);
1771
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1772
#ifdef IND_OPT_DATA_WRITTEN
1773
iref.ir_block->cb_header = TRUE;
1774
if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
1775
iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
1776
iref.ir_block->cb_max_pos = result.sr_item.i_total_size;
1777
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1778
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1780
iref.ir_updated = TRUE;
1781
ASSERT_NS(result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
1782
xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
1786
memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
1787
idx_insert_node_item(ot->ot_table, ind, &ot->ot_ind_wbuf, key_value, &result, branch);
1788
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1789
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE);
1790
#ifdef IND_OPT_DATA_WRITTEN
1791
new_min_pos = result.sr_item.i_item_offset;
1794
/* We assume that value can be overwritten (which is the case) */
1795
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, key_value, &result))
1798
if (!idx_new_branch(ot, ind, &new_branch))
1801
/* Split the node: */
1802
new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
1803
new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
1804
memmove(new_branch_ptr->tb_data, &ot->ot_ind_wbuf.tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
1806
XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_NODE_SIZE(new_size));
1807
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
1808
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
1811
/* Change the size of the old branch: */
1812
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(result.sr_item.i_item_offset));
1813
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1815
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1816
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1817
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1821
if (iref.ir_block->cb_handle_count) {
1822
if (!xt_ind_copy_on_write(&iref))
1826
#ifdef IND_OPT_DATA_WRITTEN
1827
if (result.sr_item.i_item_offset < new_min_pos)
1828
new_min_pos = result.sr_item.i_item_offset;
1830
memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
1831
#ifdef IND_OPT_DATA_WRITTEN
1832
iref.ir_block->cb_header = TRUE;
1833
if (new_min_pos < iref.ir_block->cb_min_pos)
1834
iref.ir_block->cb_min_pos = new_min_pos;
1835
iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
1836
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1837
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1839
iref.ir_updated = TRUE;
1840
xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
1842
/* Insert the new branch into the parent node, using the new middle key value: */
1843
if (!idx_insert_node(ot, ind, stack, last_item, key_value, new_branch)) {
1844
// Index may be inconsistant now...
1845
idx_free_branch(ot, ind, new_branch);
1853
idx_free_branch(ot, ind, new_branch);
1856
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
1862
#define IDX_MAX_INDEX_FLUSH_COUNT 10
1864
struct IdxTableItem {
1865
xtTableID ti_tab_id;
1866
u_int ti_dirty_blocks;
1869
inline u_int idx_dirty_blocks(XTTableHPtr tab)
1876
indp = tab->tab_dic.dic_keys;
1877
for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
1879
dirty_blocks += ind->mi_dirty_blocks;
1881
return dirty_blocks;
1884
static xtBool idx_out_of_memory_failure(XTOpenTablePtr ot)
1886
#ifdef XT_TRACK_INDEX_UPDATES
1887
/* If the index has been changed when we run out of memory, we
1888
* will corrupt the index!
1890
ASSERT_NS(ot->ot_ind_changed == 0);
1892
if (ot->ot_thread->t_exception.e_xt_err == XT_ERR_NO_INDEX_CACHE) {
1893
u_int block_total = xt_ind_get_blocks();
1895
/* Flush index and retry! */
1896
xt_clear_exception(ot->ot_thread);
1898
if (idx_dirty_blocks(ot->ot_table) >= block_total / 4) {
1899
if (!xt_async_flush_indices(ot->ot_table, FALSE, TRUE, ot->ot_thread))
1901
if (!xt_wait_for_async_task_results(ot->ot_thread))
1905
XTDatabaseHPtr db = ot->ot_table->tab_db;
1906
IdxTableItem table_list[IDX_MAX_INDEX_FLUSH_COUNT];
1910
XTTableEntryPtr tab_ptr;
1912
u_int dirty_total = 0;
1914
xt_ht_lock(NULL, db->db_tables);
1915
xt_enum_tables_init(&edx);
1916
while ((tab_ptr = xt_enum_tables_next(NULL, db, &edx))) {
1917
if (tab_ptr->te_table) {
1918
if (tab_ptr->te_table->tab_ind_flush_task->tk_is_running()) {
1919
if (!(dirty_blocks = tab_ptr->te_table->tab_ind_flush_task->fit_dirty_blocks))
1920
dirty_blocks = idx_dirty_blocks(tab_ptr->te_table);
1923
dirty_blocks = idx_dirty_blocks(tab_ptr->te_table);
1924
dirty_total += dirty_blocks;
1926
for (i=0; i<item_count; i++) {
1927
if (table_list[i].ti_dirty_blocks < dirty_blocks)
1930
if (i < IDX_MAX_INDEX_FLUSH_COUNT) {
1933
if (item_count < IDX_MAX_INDEX_FLUSH_COUNT) {
1934
cnt = item_count - i;
1938
cnt = item_count - i - 1;
1939
memmove(&table_list[i], &table_list[i+1], sizeof(IdxTableItem) * cnt);
1940
table_list[i].ti_tab_id = tab_ptr->te_table->tab_id;
1941
table_list[i].ti_dirty_blocks = dirty_blocks;
1944
if (dirty_total >= block_total / 4)
1948
xt_ht_unlock(NULL, db->db_tables);
1949
if (dirty_total >= block_total / 4) {
1950
for (i=0; i<item_count; i++) {
1951
if (table_list[i].ti_tab_id == ot->ot_table->tab_id) {
1952
if (!xt_async_flush_indices(ot->ot_table, FALSE, TRUE, ot->ot_thread))
1959
if ((tab = xt_use_table_by_id_ns(db, table_list[i].ti_tab_id))) {
1960
ok = xt_async_flush_indices(tab, FALSE, TRUE, ot->ot_thread);
1961
xt_heap_release_ns(tab);
1965
if (!xt_wait_for_async_task_results(ot->ot_thread))
1976
* Check all the duplicate variation in an index.
1977
* If one of them is visible, then we have a duplicate key
1980
* GOTCHA: This routine must use the write index buffer!
1982
static xtBool idx_check_duplicates(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxKeyValuePtr key_value)
1984
IdxBranchStackRec stack;
1985
xtIndexNodeID current;
1986
XTIndReferenceRec iref;
1987
XTIdxResultRec result;
1988
xtBool on_key = FALSE;
1995
iref.ir_updated = 2;
1998
idx_newstack(&stack);
2000
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2003
save_flags = key_value->sv_flags;
2004
key_value->sv_flags = 0;
2006
while (XT_NODE_ID(current)) {
2007
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref)) {
2008
key_value->sv_flags = save_flags;
2011
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
2012
if (result.sr_found)
2013
/* If we have found the key in a node: */
2015
if (!result.sr_item.i_node_ref_size)
2017
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2018
if (!idx_push(&stack, current, &result.sr_item)) {
2019
key_value->sv_flags = save_flags;
2022
current = result.sr_branch;
2025
key_value->sv_flags = save_flags;
2028
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2033
if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
2034
IdxStackItemPtr node;
2036
/* We are at the end of a leaf node.
2037
* Go up the stack to find the start position of the next key.
2038
* If we find none, then we are the end of the index.
2040
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2041
while ((node = idx_pop(&stack))) {
2042
if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
2043
current = node->i_branch;
2044
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2046
xt_get_res_record_ref(&iref.ir_branch->tb_data[node->i_pos.i_item_offset + node->i_pos.i_item_size - XT_RECORD_REF_SIZE], &result);
2047
result.sr_item = node->i_pos;
2055
/* Quit the loop if the key is no longer matched! */
2056
if (myxt_compare_key(ind, 0, key_value->sv_length, key_value->sv_key, &iref.ir_branch->tb_data[result.sr_item.i_item_offset]) != 0) {
2057
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2061
if (ind->mi_lazy_delete) {
2062
if (result.sr_row_id == (xtRowID) -1)
2066
switch (xt_tab_maybe_committed(ot, result.sr_rec_id, &xn_id, NULL, NULL)) {
2068
/* Record is not committed, wait for the transaction. */
2069
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2070
XT_INDEX_UNLOCK(ind, ot);
2071
xw.xw_xn_id = xn_id;
2072
if (!xt_xn_wait_for_xact(ot->ot_thread, &xw, NULL)) {
2073
XT_INDEX_WRITE_LOCK(ind, ot);
2076
XT_INDEX_WRITE_LOCK(ind, ot);
2079
/* Error while reading... */
2082
/* Record is committed or belongs to me, duplicate key: */
2083
XT_DEBUG_TRACE(("DUPLICATE KEY tx=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) result.sr_rec_id));
2084
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DUPLICATE_KEY);
2087
/* Record is deleted or rolled-back: */
2092
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2094
if (result.sr_item.i_node_ref_size) {
2095
/* Go down to the bottom: */
2096
while (XT_NODE_ID(current)) {
2097
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2098
if (!idx_push(&stack, current, &result.sr_item))
2100
current = result.sr_branch;
2101
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2103
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2104
if (!result.sr_item.i_node_ref_size)
2113
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2117
inline static void idx_still_on_key(XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, register XTIdxBranchDPtr branch, register XTIdxItemPtr item)
2119
if (search_key && search_key->sk_on_key) {
2120
search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
2121
search_key->sk_key_value.sv_key, &branch->tb_data[item->i_item_offset]) == 0;
2126
* Insert a value into the given index. Return FALSE if an error occurs.
2128
xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_buf, xtWord1 *bef_buf, xtBool allow_dups)
2130
XTIdxKeyValueRec key_value;
2131
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
2132
IdxBranchStackRec stack;
2133
xtIndexNodeID current;
2134
XTIndReferenceRec iref;
2135
xtIndexNodeID new_branch;
2136
XTIdxBranchDPtr new_branch_ptr;
2138
XTIdxResultRec result;
2140
xtBool check_for_dups = ind->mi_flags & (HA_UNIQUE_CHECK | HA_NOSAME) && !allow_dups;
2141
xtBool lock_structure = FALSE;
2142
xtBool updated = FALSE;
2143
#ifdef IND_OPT_DATA_WRITTEN
2149
iref.ir_updated = 2;
2151
#ifdef CHECK_AND_PRINT
2152
//idx_check_index(ot, ind, TRUE);
2156
#ifdef XT_TRACK_INDEX_UPDATES
2157
ot->ot_ind_changed = 0;
2159
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2160
key_value.sv_rec_id = rec_id;
2161
key_value.sv_row_id = row_id; /* Should always be zero on insert (will be update by sweeper later).
2162
* Non-zero only during recovery, assuming that sweeper will process such records right after recovery.
2164
key_value.sv_key = key_buf;
2165
key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, &check_for_dups);
2167
if (bef_buf && check_for_dups) {
2168
/* If we have a before image, and we are required to check for duplicates.
2169
* then compare the before image key with the after image key.
2171
xtWord1 bef_key_buf[XT_INDEX_MAX_KEY_SIZE];
2173
xtBool has_no_null = TRUE;
2175
len = myxt_create_key_from_row(ind, bef_key_buf, bef_buf, &has_no_null);
2177
/* If the before key has no null values, then compare with the after key value.
2178
* We only have to check for duplicates if the key has changed!
2180
check_for_dups = myxt_compare_key(ind, 0, len, bef_key_buf, key_buf) != 0;
2184
/* The index appears to have no root: */
2185
if (!XT_NODE_ID(ind->mi_root))
2186
lock_structure = TRUE;
2189
idx_newstack(&stack);
2191
/* A write lock is only required if we are going to change the
2192
* strcuture of the index!
2195
XT_INDEX_WRITE_LOCK(ind, ot);
2197
XT_INDEX_READ_LOCK(ind, ot);
2200
/* Create a root node if required: */
2201
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
2202
/* Index is empty, create a new one: */
2203
ASSERT_NS(lock_structure);
2204
if (!xt_ind_reserve(ot, 1, NULL))
2206
if (!idx_new_branch(ot, ind, &new_branch))
2208
size = idx_write_branch_item(ind, ot->ot_ind_wbuf.tb_data, &key_value);
2209
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(size));
2210
IDX_TRACE("%d-> %x\n", (int) new_branch, (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2211
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + size, (xtWord1 *) &ot->ot_ind_wbuf))
2213
ind->mi_root = new_branch;
2217
/* Search down the tree for the insertion point. */
2218
#ifdef IND_SKEW_SPLIT_ON_APPEND
2219
result.sr_last_item = TRUE;
2221
while (XT_NODE_ID(current)) {
2222
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, &iref))
2224
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
2225
if (result.sr_duplicate) {
2226
if (check_for_dups) {
2227
/* Duplicates are not allowed, at least one has been
2231
/* Leaf nodes (i_node_ref_size == 0) are write locked,
2232
* non-leaf nodes are read locked.
2234
xt_ind_release(ot, ind, result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, &iref);
2236
if (!idx_check_duplicates(ot, ind, &key_value))
2238
/* We have checked all the "duplicate" variations. None of them are
2239
* relevant. So this will cause a correct insert.
2241
check_for_dups = FALSE;
2242
idx_newstack(&stack);
2246
if (result.sr_found) {
2247
/* Node found, can happen during recovery of indexes!
2248
* We have found an exact match of both key and record.
2250
XTPageUnlockType utype;
2251
xtBool overwrite = FALSE;
2253
/* {LAZY-DEL-INDEX-ITEMS}
2254
* If the item has been lazy deleted, then just overwrite!
2256
if (result.sr_row_id == (xtRowID) -1) {
2259
/* This is safe because we have an xlock on the leaf. */
2260
if ((del_count = iref.ir_block->cp_del_count))
2261
iref.ir_block->cp_del_count = del_count-1;
2265
if (!result.sr_row_id && row_id) {
2266
/* {INDEX-RECOV_ROWID} Set the row-id
2267
* during recovery, even if the index entry
2269
* It will be removed later by the sweeper.
2275
if (!idx_set_item_row_id(ot, ind, &iref, &result.sr_item, row_id))
2277
utype = result.sr_item.i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE;
2280
utype = result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE;
2281
xt_ind_release(ot, ind, utype, &iref);
2284
/* Stop when we get to a leaf: */
2285
if (!result.sr_item.i_node_ref_size)
2287
xt_ind_release(ot, ind, result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, &iref);
2288
if (!idx_push(&stack, current, NULL))
2290
current = result.sr_branch;
2292
ASSERT_NS(XT_NODE_ID(current));
2294
/* Must be a leaf!: */
2295
ASSERT_NS(!result.sr_item.i_node_ref_size);
2298
if (ind->mi_lazy_delete && iref.ir_block->cp_del_count) {
2299
/* There are a number of possibilities:
2300
* - We could just replace a lazy deleted slot.
2301
* - We could compact and insert.
2302
* - We could just insert
2305
if (result.sr_item.i_item_offset > 0) {
2306
/* Check if it can go into the previous node: */
2307
XTIdxResultRec t_res;
2309
t_res.sr_item = result.sr_item;
2310
xt_prev_branch_item_fix(ot->ot_table, ind, iref.ir_branch, &t_res);
2311
if (t_res.sr_row_id != (xtRowID) -1)
2314
/* Yup, it can, but first check to see if it would be
2315
* better to put it in the current node.
2316
* This is the case if the previous node key is not the
2317
* same as the key we are adding...
2319
if (result.sr_item.i_item_offset < result.sr_item.i_total_size &&
2320
result.sr_row_id == (xtRowID) -1) {
2321
if (!idx_cmp_item_key_fix(&iref, &t_res.sr_item, &key_value))
2325
idx_set_item_key_fix(&iref, &t_res.sr_item, &key_value);
2326
iref.ir_block->cp_del_count--;
2327
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2332
if (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2333
if (result.sr_row_id == (xtRowID) -1) {
2334
idx_set_item_key_fix(&iref, &result.sr_item, &key_value);
2335
iref.ir_block->cp_del_count--;
2336
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2341
/* Check if we must compact...
2342
* It makes no sense to split as long as there are lazy deleted items
2343
* in the page. So, delete them if a split would otherwise be required!
2345
ASSERT_NS(key_value.sv_length + XT_RECORD_REF_SIZE == result.sr_item.i_item_size);
2346
if (result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE > XT_INDEX_PAGE_DATA_SIZE) {
2347
if (!idx_compact_leaf(ot, ind, &iref, &result.sr_item))
2352
/* Fall through to the insert code... */
2353
/* NOTE: if there were no lazy deleted items in the leaf, then
2354
* idx_compact_leaf is a NOP. This is the only case in which it may not
2355
* fall through and do the insert below.
2357
* Normally, if the cp_del_count is correct then the insert
2358
* will work below, and the assertion here will not fail.
2360
* In this case, the xt_ind_release() will correctly indicate an update.
2362
ASSERT_NS(result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE <= XT_INDEX_PAGE_DATA_SIZE);
2365
if (result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE <= XT_INDEX_PAGE_DATA_SIZE) {
2366
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
2367
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
2368
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
2372
if (iref.ir_block->cb_handle_count) {
2373
if (!xt_ind_copy_on_write(&iref))
2377
idx_insert_leaf_item(ind, iref.ir_branch, &key_value, &result);
2378
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2379
ASSERT_NS(result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
2380
#ifdef IND_OPT_DATA_WRITTEN
2381
iref.ir_block->cb_header = TRUE;
2382
if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
2383
iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
2384
iref.ir_block->cb_max_pos = result.sr_item.i_total_size;
2385
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
2386
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
2388
iref.ir_updated = TRUE;
2389
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2393
/* Key does not fit. Must split the node.
2394
* Make sure we have a structural lock:
2396
if (!lock_structure) {
2397
xt_ind_release(ot, ind, updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, &iref);
2398
XT_INDEX_UNLOCK(ind, ot);
2399
lock_structure = TRUE;
2400
goto lock_and_retry;
2403
memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
2404
idx_insert_leaf_item(ind, &ot->ot_ind_wbuf, &key_value, &result);
2405
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2406
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE && result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE*2);
2407
#ifdef IND_OPT_DATA_WRITTEN
2408
new_min_pos = result.sr_item.i_item_offset;
2411
/* This is the number of potential writes. In other words, the total number
2412
* of blocks that may be accessed.
2414
* Note that this assume if a block is read and written soon after that the block
2415
* will not be freed in-between (a safe assumption?)
2417
if (!xt_ind_reserve(ot, stack.s_top * 2 + 3, iref.ir_branch))
2420
/* Key does not fit, must split... */
2421
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, &key_value, &result))
2424
if (!idx_new_branch(ot, ind, &new_branch))
2427
if (XT_NODE_ID(current) == XT_NODE_ID(new_branch)) {
2428
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
2432
/* Copy and write the rest of the data to the new node: */
2433
new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
2434
new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
2435
memmove(new_branch_ptr->tb_data, &ot->ot_ind_wbuf.tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
2437
XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_LEAF_SIZE(new_size));
2438
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
2439
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
2442
/* Modify the first node: */
2443
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(result.sr_item.i_item_offset));
2444
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2446
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
2447
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
2448
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
2452
if (iref.ir_block->cb_handle_count) {
2453
if (!xt_ind_copy_on_write(&iref))
2456
#ifdef IND_OPT_DATA_WRITTEN
2457
if (result.sr_item.i_item_offset < new_min_pos)
2458
new_min_pos = result.sr_item.i_item_offset;
2460
memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
2461
#ifdef IND_OPT_DATA_WRITTEN
2462
iref.ir_block->cb_header = TRUE;
2463
if (new_min_pos < iref.ir_block->cb_min_pos)
2464
iref.ir_block->cb_min_pos = new_min_pos;
2465
iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
2466
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
2467
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
2469
iref.ir_updated = TRUE;
2470
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2472
/* Insert the new branch into the parent node, using the new middle key value: */
2473
if (!idx_insert_node(ot, ind, &stack, result.sr_last_item, &key_value, new_branch)) {
2474
// Index may be inconsistant now...
2475
idx_free_branch(ot, ind, new_branch);
2479
#ifdef XT_TRACK_INDEX_UPDATES
2480
ASSERT_NS(ot->ot_ind_reserved >= ot->ot_ind_reads);
2484
XT_INDEX_UNLOCK(ind, ot);
2487
//printf("INSERT OK\n");
2488
//idx_check_index(ot, ind, TRUE);
2490
xt_ind_unreserve(ot);
2494
idx_free_branch(ot, ind, new_branch);
2497
xt_ind_release(ot, ind, updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, &iref);
2500
XT_INDEX_UNLOCK(ind, ot);
2501
if (idx_out_of_memory_failure(ot))
2502
goto retry_after_oom;
2505
//printf("INSERT FAILED\n");
2506
//idx_check_index(ot, ind, TRUE);
2508
xt_ind_unreserve(ot);
2513
/* Remove the given item in the node.
2514
* This is done by going down the tree to find a replacement
2515
* for the deleted item!
2517
static xtBool idx_remove_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value)
2519
IdxStackItemPtr delete_node;
2520
XTIdxResultRec result;
2521
xtIndexNodeID current;
2522
xtBool lazy_delete_cleanup_required = FALSE;
2523
IdxStackItemPtr current_top;
2525
delete_node = idx_top(stack);
2526
current = delete_node->i_branch;
2527
result.sr_item = delete_node->i_pos;
2529
/* Follow the branch after this item: */
2530
idx_next_branch_item(ot->ot_table, ind, iref->ir_branch, &result);
2531
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2533
/* Go down the left-hand side until we reach a leaf: */
2534
while (XT_NODE_ID(current)) {
2535
current = result.sr_branch;
2536
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, iref))
2538
idx_first_branch_item(ot->ot_table, ind, iref->ir_branch, &result);
2539
if (!result.sr_item.i_node_ref_size)
2541
xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
2542
if (!idx_push(stack, current, &result.sr_item))
2546
ASSERT_NS(XT_NODE_ID(current));
2547
ASSERT_NS(!result.sr_item.i_node_ref_size);
2549
if (!xt_ind_reserve(ot, stack->s_top + 2, iref->ir_branch)) {
2550
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, iref);
2554
/* This code removes lazy deleted items from the leaf,
2555
* before we promote an item to a leaf.
2556
* This is not essential, but prevents lazy deleted
2557
* items from being propogated up the tree.
2559
if (ind->mi_lazy_delete) {
2560
if (iref->ir_block->cp_del_count) {
2561
if (!idx_compact_leaf(ot, ind, iref, &result.sr_item))
2566
/* Crawl back up the stack trace, looking for a key
2567
* that can be used to replace the deleted key.
2569
* Any empty nodes on the way up can be removed!
2571
if (result.sr_item.i_total_size > 0) {
2572
/* There is a key in the leaf, extract it, and put it in the node: */
2573
memcpy(key_value->sv_key, &iref->ir_branch->tb_data[result.sr_item.i_item_offset], result.sr_item.i_item_size);
2574
/* This call also frees the iref.ir_branch page! */
2575
if (!idx_remove_branch_item_right(ot, ind, current, iref, &result.sr_item))
2577
if (!idx_replace_node_key(ot, ind, delete_node, stack, result.sr_item.i_item_size, key_value->sv_key))
2582
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, iref);
2585
/* The current node/leaf is empty, remove it: */
2586
idx_free_branch(ot, ind, current);
2588
current_top = idx_pop(stack);
2589
current = current_top->i_branch;
2590
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, iref))
2593
if (current_top == delete_node) {
2594
/* All children have been removed. Delete the key and done: */
2595
if (!idx_remove_branch_item_right(ot, ind, current, iref, ¤t_top->i_pos))
2600
if (current_top->i_pos.i_total_size > current_top->i_pos.i_node_ref_size) {
2602
memcpy(key_value->sv_key, &iref->ir_branch->tb_data[current_top->i_pos.i_item_offset], current_top->i_pos.i_item_size);
2603
/* This function also frees the cache page: */
2604
if (!idx_remove_branch_item_left(ot, ind, current, iref, ¤t_top->i_pos, &lazy_delete_cleanup_required))
2606
if (!idx_replace_node_key(ot, ind, delete_node, stack, current_top->i_pos.i_item_size, key_value->sv_key))
2609
if (lazy_delete_cleanup_required) {
2610
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, iref))
2612
if (!idx_remove_lazy_deleted_item_in_node(ot, ind, current, iref, key_value))
2617
xt_ind_release(ot, ind, current_top->i_pos.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
2621
#ifdef XT_TRACK_INDEX_UPDATES
2622
ASSERT_NS(ot->ot_ind_reserved >= ot->ot_ind_reads);
2628
* This function assumes we have a lock on the structure of the index.
2630
static xtBool idx_remove_lazy_deleted_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID current, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value)
2632
IdxBranchStackRec stack;
2633
XTIdxResultRec result;
2635
/* Now remove all lazy deleted items in this node.... */
2636
idx_first_branch_item(ot->ot_table, ind, (XTIdxBranchDPtr) iref->ir_block->cb_data, &result);
2639
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2640
if (result.sr_row_id == (xtRowID) -1)
2642
idx_next_branch_item(ot->ot_table, ind, (XTIdxBranchDPtr) iref->ir_block->cb_data, &result);
2648
idx_newstack(&stack);
2649
if (!idx_push(&stack, current, &result.sr_item)) {
2650
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2654
if (!idx_remove_item_in_node(ot, ind, &stack, iref, key_value))
2657
/* Go back up to the node we are trying to
2660
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, iref))
2662
/* Load the data again: */
2663
idx_reload_item_fix(ind, iref->ir_branch, &result);
2666
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2670
static xtBool idx_delete(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxKeyValuePtr key_value)
2672
IdxBranchStackRec stack;
2673
xtIndexNodeID current;
2674
XTIndReferenceRec iref;
2675
XTIdxResultRec result;
2676
xtBool lock_structure = FALSE;
2680
iref.ir_updated = 2;
2682
/* The index appears to have no root: */
2683
if (!XT_NODE_ID(ind->mi_root))
2684
lock_structure = TRUE;
2687
idx_newstack(&stack);
2690
XT_INDEX_WRITE_LOCK(ind, ot);
2692
XT_INDEX_READ_LOCK(ind, ot);
2694
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2697
while (XT_NODE_ID(current)) {
2698
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_DEL_LEAF, &iref))
2700
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
2701
if (!result.sr_item.i_node_ref_size) {
2703
if (result.sr_found) {
2704
if (ind->mi_lazy_delete) {
2705
/* If the we have a W lock, then fetch decided that we
2706
* need to compact the page.
2707
* The decision is made by xt_idx_lazy_delete_on_leaf()
2709
if (!iref.ir_xlock) {
2710
if (!idx_lazy_delete_branch_item(ot, ind, &iref, &result.sr_item))
2714
if (!iref.ir_block->cp_del_count) {
2715
if (!idx_remove_branch_item_right(ot, ind, current, &iref, &result.sr_item))
2719
if (!idx_lazy_remove_leaf_item_right(ot, ind, &iref, &result.sr_item))
2725
if (!idx_remove_branch_item_right(ot, ind, current, &iref, &result.sr_item))
2730
xt_ind_release(ot, ind, iref.ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, &iref);
2733
if (!idx_push(&stack, current, &result.sr_item)) {
2734
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2737
if (result.sr_found)
2738
/* If we have found the key in a node: */
2740
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2741
current = result.sr_branch;
2744
/* Must be a non-leaf!: */
2745
ASSERT_NS(result.sr_item.i_node_ref_size);
2747
if (ind->mi_lazy_delete) {
2748
if (!idx_lazy_delete_on_node(ind, iref.ir_block, &result.sr_item)) {
2749
/* We need to remove some items from this node: */
2751
if (!lock_structure) {
2752
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2753
XT_INDEX_UNLOCK(ind, ot);
2754
lock_structure = TRUE;
2755
goto lock_and_retry;
2758
if (!idx_set_item_deleted(ot, ind, &iref, &result.sr_item))
2760
if (!idx_remove_lazy_deleted_item_in_node(ot, ind, current, &iref, key_value))
2765
if (!ot->ot_table->tab_dic.dic_no_lazy_delete) {
2766
/* {LAZY-DEL-INDEX-ITEMS}
2767
* We just set item to deleted, this is a significant time
2769
* But this item can only be cleaned up when all
2770
* items on the node below are deleted.
2772
if (!idx_lazy_delete_branch_item(ot, ind, &iref, &result.sr_item))
2778
/* We will have to remove the key from a non-leaf node,
2779
* which means we are changing the structure of the index.
2780
* Make sure we have a structural lock:
2782
if (!lock_structure) {
2783
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2784
XT_INDEX_UNLOCK(ind, ot);
2785
lock_structure = TRUE;
2786
goto lock_and_retry;
2789
/* This is the item we will have to replace: */
2790
if (!idx_remove_item_in_node(ot, ind, &stack, &iref, key_value))
2794
XT_INDEX_UNLOCK(ind, ot);
2797
//printf("DELETE OK\n");
2798
//idx_check_index(ot, ind, TRUE);
2800
xt_ind_unreserve(ot);
2804
XT_INDEX_UNLOCK(ind, ot);
2805
xt_ind_unreserve(ot);
2809
xtPublic xtBool xt_idx_delete(XTOpenTablePtr ot, XTIndexPtr ind, xtRecordID rec_id, xtWord1 *rec_buf)
2811
XTIdxKeyValueRec key_value;
2812
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE + XT_MAX_RECORD_REF_SIZE];
2815
#ifdef XT_TRACK_INDEX_UPDATES
2816
ot->ot_ind_changed = 0;
2819
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2820
key_value.sv_rec_id = rec_id;
2821
key_value.sv_row_id = 0;
2822
key_value.sv_key = key_buf;
2823
key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, NULL);
2825
if (!idx_delete(ot, ind, &key_value)) {
2826
if (idx_out_of_memory_failure(ot))
2827
goto retry_after_oom;
2833
xtPublic xtBool xt_idx_update_row_id(XTOpenTablePtr ot, XTIndexPtr ind, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_buf)
2835
xtIndexNodeID current;
2836
XTIndReferenceRec iref;
2837
XTIdxResultRec result;
2838
XTIdxKeyValueRec key_value;
2839
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE + XT_MAX_RECORD_REF_SIZE];
2843
iref.ir_updated = 2;
2845
#ifdef CHECK_AND_PRINT
2846
idx_check_index(ot, ind, TRUE);
2849
#ifdef XT_TRACK_INDEX_UPDATES
2850
ot->ot_ind_changed = 0;
2852
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2853
key_value.sv_rec_id = rec_id;
2854
key_value.sv_row_id = 0;
2855
key_value.sv_key = key_buf;
2856
key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, NULL);
2858
/* NOTE: Only a read lock is required for this!!
2860
* 09.05.2008 - This has changed because the dirty list now
2861
* hangs on the index. And the dirty list may be updated
2862
* by any change of the index.
2863
* However, the advantage is that I should be able to read
2864
* lock in the first phase of the flush.
2866
* 18.02.2009 - This has changed again.
2867
* I am now using a read lock, because this update does not
2868
* require a structural change. In fact, it does not even
2869
* need a WRITE LOCK on the page affected, because there
2870
* is only ONE thread that can do this (the sweeper).
2872
* This has the advantage that the sweeper (which uses this
2873
* function, causes less conflicts.
2875
* However, it does mean that the dirty list must be otherwise
2876
* protected (which it now is be a spin lock - mi_dirty_lock).
2878
* It also has the dissadvantage that I am going to have to
2879
* take an xlock in the first phase of the flush.
2881
XT_INDEX_READ_LOCK(ind, ot);
2883
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2886
while (XT_NODE_ID(current)) {
2887
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2889
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
2890
if (result.sr_found || !result.sr_item.i_node_ref_size)
2892
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2893
current = result.sr_branch;
2896
if (result.sr_found) {
2897
/* TODO: Check that concurrent reads can handle this!
2898
* assuming the write is not atomic.
2900
if (!idx_set_item_row_id(ot, ind, &iref, &result.sr_item, row_id))
2902
xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
2905
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2908
XT_INDEX_UNLOCK(ind, ot);
2911
//idx_check_index(ot, ind, TRUE);
2912
//idx_check_on_key(ot);
2917
XT_INDEX_UNLOCK(ind, ot);
2918
if (idx_out_of_memory_failure(ot))
2919
goto retry_after_oom;
2923
xtPublic void xt_idx_prep_key(XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, int flags, xtWord1 *in_key_buf, size_t in_key_length)
2925
search_key->sk_key_value.sv_flags = flags;
2926
search_key->sk_key_value.sv_rec_id = 0;
2927
search_key->sk_key_value.sv_row_id = 0;
2928
search_key->sk_key_value.sv_key = search_key->sk_key_buf;
2929
search_key->sk_key_value.sv_length = myxt_create_key_from_key(ind, search_key->sk_key_buf, in_key_buf, in_key_length);
2930
search_key->sk_on_key = FALSE;
2933
xtPublic xtBool xt_idx_research(XTOpenTablePtr ot, XTIndexPtr ind)
2935
XTIdxSearchKeyRec search_key;
2937
xt_ind_lock_handle(ot->ot_ind_rhandle);
2938
search_key.sk_key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2939
xt_get_record_ref(&ot->ot_ind_rhandle->ih_branch->tb_data[ot->ot_ind_state.i_item_offset + ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE],
2940
&search_key.sk_key_value.sv_rec_id, &search_key.sk_key_value.sv_row_id);
2941
search_key.sk_key_value.sv_key = search_key.sk_key_buf;
2942
search_key.sk_key_value.sv_length = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
2943
search_key.sk_on_key = FALSE;
2944
memcpy(search_key.sk_key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[ot->ot_ind_state.i_item_offset], search_key.sk_key_value.sv_length);
2945
xt_ind_unlock_handle(ot->ot_ind_rhandle);
2946
return xt_idx_search(ot, ind, &search_key);
2950
* Search for a given key and position the current pointer on the first
2951
* key in the list of duplicates. If the key is not found the current
2952
* pointer is placed at the first position after the key.
2954
xtPublic xtBool xt_idx_search(XTOpenTablePtr ot, XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
2956
IdxBranchStackRec stack;
2957
xtIndexNodeID current;
2958
XTIndReferenceRec iref;
2959
XTIdxResultRec result;
2963
iref.ir_updated = 2;
2965
if (ot->ot_ind_rhandle) {
2966
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, ot->ot_thread);
2967
ot->ot_ind_rhandle = NULL;
2970
//idx_check_index(ot, ind, TRUE);
2973
/* Calling from recovery, this is not the case.
2974
* But the index read does not require a transaction!
2975
* Only insert requires this to check for duplicates.
2976
if (!ot->ot_thread->st_xact_data) {
2977
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
2983
#ifdef XT_TRACK_INDEX_UPDATES
2984
ot->ot_ind_changed = 0;
2986
idx_newstack(&stack);
2988
ot->ot_curr_rec_id = 0;
2989
ot->ot_curr_row_id = 0;
2991
XT_INDEX_READ_LOCK(ind, ot);
2993
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2996
while (XT_NODE_ID(current)) {
2997
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2999
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &search_key->sk_key_value, &result);
3000
if (result.sr_found)
3001
/* If we have found the key in a node: */
3002
search_key->sk_on_key = TRUE;
3003
if (!result.sr_item.i_node_ref_size)
3005
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3006
if (!idx_push(&stack, current, &result.sr_item))
3008
current = result.sr_branch;
3011
if (ind->mi_lazy_delete) {
3012
ignore_lazy_deleted_items:
3013
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3014
if (result.sr_row_id != (xtRowID) -1) {
3015
idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3018
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3022
if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
3023
IdxStackItemPtr node;
3025
/* We are at the end of a leaf node.
3026
* Go up the stack to find the start position of the next key.
3027
* If we find none, then we are the end of the index.
3029
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3030
while ((node = idx_pop(&stack))) {
3031
if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
3032
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3034
xt_get_res_record_ref(&iref.ir_branch->tb_data[node->i_pos.i_item_offset + node->i_pos.i_item_size - XT_RECORD_REF_SIZE], &result);
3036
if (ind->mi_lazy_delete) {
3037
result.sr_item = node->i_pos;
3038
if (result.sr_row_id == (xtRowID) -1) {
3039
/* If this node pointer is lazy deleted, then
3040
* go down the next branch...
3042
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3044
/* Go down to the bottom: */
3045
current = node->i_branch;
3046
while (XT_NODE_ID(current)) {
3047
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3048
if (!idx_push(&stack, current, &result.sr_item))
3050
current = result.sr_branch;
3051
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3053
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3054
if (!result.sr_item.i_node_ref_size)
3058
goto ignore_lazy_deleted_items;
3060
idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3063
ot->ot_curr_rec_id = result.sr_rec_id;
3064
ot->ot_curr_row_id = result.sr_row_id;
3065
ot->ot_ind_state = node->i_pos;
3067
/* Convert the pointer to a handle which can be used in later operations: */
3068
ASSERT_NS(!ot->ot_ind_rhandle);
3069
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3071
/* Keep the node for next operations: */
3073
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3074
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3075
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3082
ot->ot_curr_rec_id = result.sr_rec_id;
3083
ot->ot_curr_row_id = result.sr_row_id;
3084
ot->ot_ind_state = result.sr_item;
3086
/* Convert the pointer to a handle which can be used in later operations: */
3087
ASSERT_NS(!ot->ot_ind_rhandle);
3088
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3090
/* Keep the node for next operations: */
3092
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3093
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3094
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3099
XT_INDEX_UNLOCK(ind, ot);
3102
//idx_check_index(ot, ind, TRUE);
3103
//idx_check_on_key(ot);
3105
ASSERT_NS(iref.ir_xlock == 2);
3106
ASSERT_NS(iref.ir_updated == 2);
3107
if (ind->mi_key_corrupted) {
3108
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3114
XT_INDEX_UNLOCK(ind, ot);
3115
if (idx_out_of_memory_failure(ot))
3116
goto retry_after_oom;
3117
ASSERT_NS(iref.ir_xlock == 2);
3118
ASSERT_NS(iref.ir_updated == 2);
3122
xtPublic xtBool xt_idx_search_prev(XTOpenTablePtr ot, XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3124
IdxBranchStackRec stack;
3125
xtIndexNodeID current;
3126
XTIndReferenceRec iref;
3127
XTIdxResultRec result;
3131
iref.ir_updated = 2;
3133
if (ot->ot_ind_rhandle) {
3134
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, ot->ot_thread);
3135
ot->ot_ind_rhandle = NULL;
3138
//idx_check_index(ot, ind, TRUE);
3141
/* see the comment above in xt_idx_search */
3143
if (!ot->ot_thread->st_xact_data) {
3144
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3150
#ifdef XT_TRACK_INDEX_UPDATES
3151
ot->ot_ind_changed = 0;
3153
idx_newstack(&stack);
3155
ot->ot_curr_rec_id = 0;
3156
ot->ot_curr_row_id = 0;
3158
XT_INDEX_READ_LOCK(ind, ot);
3160
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
3163
while (XT_NODE_ID(current)) {
3164
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3166
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &search_key->sk_key_value, &result);
3167
if (result.sr_found)
3168
/* If we have found the key in a node: */
3169
search_key->sk_on_key = TRUE;
3170
if (!result.sr_item.i_node_ref_size)
3172
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3173
if (!idx_push(&stack, current, &result.sr_item))
3175
current = result.sr_branch;
3178
if (result.sr_item.i_item_offset == 0) {
3179
IdxStackItemPtr node;
3182
/* We are at the start of a leaf node.
3183
* Go up the stack to find the start position of the next key.
3184
* If we find none, then we are the end of the index.
3186
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3187
while ((node = idx_pop(&stack))) {
3188
if (node->i_pos.i_item_offset > node->i_pos.i_node_ref_size) {
3189
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3191
result.sr_item = node->i_pos;
3192
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3194
if (ind->mi_lazy_delete) {
3195
if (result.sr_row_id == (xtRowID) -1) {
3196
/* Go down to the bottom, in order to scan the leaf backwards: */
3197
current = node->i_branch;
3198
while (XT_NODE_ID(current)) {
3199
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3200
if (!idx_push(&stack, current, &result.sr_item))
3202
current = result.sr_branch;
3203
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3205
ind->mi_last_item(ot->ot_table, ind, iref.ir_branch, &result);
3206
if (!result.sr_item.i_node_ref_size)
3210
/* If the leaf empty we have to go up the stack again... */
3211
if (result.sr_item.i_total_size == 0)
3212
goto search_up_stack;
3214
goto scan_back_in_leaf;
3224
/* We must just step once to the left in this leaf node... */
3225
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3227
if (ind->mi_lazy_delete) {
3229
while (result.sr_row_id == (xtRowID) -1) {
3230
if (result.sr_item.i_item_offset == 0)
3231
goto search_up_stack;
3232
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3234
idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3238
ot->ot_curr_rec_id = result.sr_rec_id;
3239
ot->ot_curr_row_id = result.sr_row_id;
3240
ot->ot_ind_state = result.sr_item;
3242
/* Convert to handle for later operations: */
3243
ASSERT_NS(!ot->ot_ind_rhandle);
3244
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3246
/* Keep a copy of the node for previous operations... */
3250
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3251
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3252
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3256
XT_INDEX_UNLOCK(ind, ot);
3259
//idx_check_index(ot, ind, TRUE);
3260
//idx_check_on_key(ot);
3262
if (ind->mi_key_corrupted) {
3263
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3269
XT_INDEX_UNLOCK(ind, ot);
3270
if (idx_out_of_memory_failure(ot))
3271
goto retry_after_oom;
3276
* Copy the current index value to the record.
3278
xtPublic xtBool xt_idx_read(XTOpenTablePtr ot, XTIndexPtr ind, xtWord1 *rec_buf)
3283
//idx_check_on_key(ot);
3285
xt_ind_lock_handle(ot->ot_ind_rhandle);
3286
bitem = ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset;
3287
myxt_create_row_from_key(ot, ind, bitem, ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE, rec_buf);
3288
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3292
xtPublic xtBool xt_idx_next(register XTOpenTablePtr ot, register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3294
XTIdxKeyValueRec key_value;
3295
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3296
XTIdxResultRec result;
3297
IdxBranchStackRec stack;
3298
xtIndexNodeID current;
3299
XTIndReferenceRec iref;
3303
iref.ir_updated = 2;
3305
ASSERT_NS(ot->ot_ind_rhandle);
3306
xt_ind_lock_handle(ot->ot_ind_rhandle);
3307
result.sr_item = ot->ot_ind_state;
3308
if (!result.sr_item.i_node_ref_size &&
3309
result.sr_item.i_item_offset < result.sr_item.i_total_size &&
3310
ot->ot_ind_rhandle->ih_cache_reference) {
3311
XTIdxItemRec prev_item;
3313
key_value.sv_key = &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset];
3314
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3316
prev_item = result.sr_item;
3317
idx_next_branch_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3319
if (ind->mi_lazy_delete) {
3320
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3321
if (result.sr_row_id != (xtRowID) -1)
3323
prev_item = result.sr_item;
3324
idx_next_branch_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3328
if (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3330
idx_still_on_key(ind, search_key, ot->ot_ind_rhandle->ih_branch, &result.sr_item);
3331
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3332
goto checked_on_key;
3335
result.sr_item = prev_item;
3338
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
3339
xt_get_record_ref(&ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size - XT_RECORD_REF_SIZE], &key_value.sv_rec_id, &key_value.sv_row_id);
3340
key_value.sv_key = key_buf;
3341
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3342
memcpy(key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset], key_value.sv_length);
3343
xt_ind_release_handle(ot->ot_ind_rhandle, TRUE, ot->ot_thread);
3344
ot->ot_ind_rhandle = NULL;
3347
#ifdef XT_TRACK_INDEX_UPDATES
3348
ot->ot_ind_changed = 0;
3350
idx_newstack(&stack);
3352
XT_INDEX_READ_LOCK(ind, ot);
3354
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
3355
XT_INDEX_UNLOCK(ind, ot);
3356
if (ind->mi_key_corrupted) {
3357
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3363
while (XT_NODE_ID(current)) {
3364
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3366
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
3367
if (result.sr_item.i_node_ref_size) {
3368
if (result.sr_found) {
3369
/* If we have found the key in a node: */
3370
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3372
/* Go down to the bottom: */
3373
while (XT_NODE_ID(current)) {
3374
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3375
if (!idx_push(&stack, current, &result.sr_item))
3377
current = result.sr_branch;
3378
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3380
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3381
if (!result.sr_item.i_node_ref_size)
3385
/* Is the leaf not empty, then we are done... */
3390
/* We have reached the leaf. */
3391
if (result.sr_found)
3392
/* If we have found the key in a leaf: */
3393
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3394
/* If we did not find the key (although we should have). Our
3395
* position is automatically the next one.
3399
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3400
if (!idx_push(&stack, current, &result.sr_item))
3402
current = result.sr_branch;
3405
if (ind->mi_lazy_delete) {
3406
ignore_lazy_deleted_items:
3407
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3408
if (result.sr_row_id != (xtRowID) -1)
3410
idx_next_branch_item(NULL, ind, iref.ir_branch, &result);
3414
/* Check the current position in a leaf: */
3415
if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
3417
IdxStackItemPtr node;
3419
/* We are at the end of a leaf node.
3420
* Go up the stack to find the start poition of the next key.
3421
* If we find none, then we are the end of the index.
3423
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3424
while ((node = idx_pop(&stack))) {
3425
if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
3426
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3428
result.sr_item = node->i_pos;
3429
xt_get_res_record_ref(&iref.ir_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size - XT_RECORD_REF_SIZE], &result);
3431
if (ind->mi_lazy_delete) {
3432
if (result.sr_row_id == (xtRowID) -1) {
3433
/* If this node pointer is lazy deleted, then
3434
* go down the next branch...
3436
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3438
/* Go down to the bottom: */
3439
current = node->i_branch;
3440
while (XT_NODE_ID(current)) {
3441
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3442
if (!idx_push(&stack, current, &result.sr_item))
3444
current = result.sr_branch;
3445
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3447
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3448
if (!result.sr_item.i_node_ref_size)
3452
/* And scan the leaf... */
3453
goto ignore_lazy_deleted_items;
3457
goto unlock_check_on_key;
3463
search_key->sk_on_key = FALSE;
3464
ot->ot_curr_rec_id = 0;
3465
ot->ot_curr_row_id = 0;
3466
XT_INDEX_UNLOCK(ind, ot);
3467
if (ind->mi_key_corrupted) {
3468
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3474
unlock_check_on_key:
3476
ASSERT_NS(!ot->ot_ind_rhandle);
3477
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3482
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3483
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3484
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3487
XT_INDEX_UNLOCK(ind, ot);
3490
if (search_key && search_key->sk_on_key) {
3491
/* GOTCHA: As a short-cut I was using a length compare
3492
* and a memcmp() here to check whether we as still on
3493
* the original search key.
3494
* This does not work because it does not take into account
3495
* trialing spaces (which are ignored in comparison).
3496
* So lengths can be different, but values still equal.
3498
* NOTE: We have to use the original search flags for
3501
xt_ind_lock_handle(ot->ot_ind_rhandle);
3502
search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
3503
search_key->sk_key_value.sv_key, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset]) == 0;
3504
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3508
ot->ot_curr_rec_id = result.sr_rec_id;
3509
ot->ot_curr_row_id = result.sr_row_id;
3510
ot->ot_ind_state = result.sr_item;
3512
if (ind->mi_key_corrupted) {
3513
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3519
XT_INDEX_UNLOCK(ind, ot);
3520
if (idx_out_of_memory_failure(ot))
3521
goto retry_after_oom;
3525
xtPublic xtBool xt_idx_prev(register XTOpenTablePtr ot, register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3527
XTIdxKeyValueRec key_value;
3528
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3529
XTIdxResultRec result;
3530
IdxBranchStackRec stack;
3531
xtIndexNodeID current;
3532
XTIndReferenceRec iref;
3533
IdxStackItemPtr node;
3537
iref.ir_updated = 2;
3539
ASSERT_NS(ot->ot_ind_rhandle);
3540
xt_ind_lock_handle(ot->ot_ind_rhandle);
3541
result.sr_item = ot->ot_ind_state;
3542
if (!result.sr_item.i_node_ref_size && result.sr_item.i_item_offset > 0) {
3543
key_value.sv_key = &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset];
3544
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3546
ind->mi_prev_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3548
if (ind->mi_lazy_delete) {
3549
while (result.sr_row_id == (xtRowID) -1) {
3550
if (result.sr_item.i_item_offset == 0)
3552
ind->mi_prev_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3556
idx_still_on_key(ind, search_key, ot->ot_ind_rhandle->ih_branch, &result.sr_item);
3558
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3559
goto checked_on_key;
3563
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
3564
key_value.sv_rec_id = ot->ot_curr_rec_id;
3565
key_value.sv_row_id = 0;
3566
key_value.sv_key = key_buf;
3567
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3568
memcpy(key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset], key_value.sv_length);
3569
xt_ind_release_handle(ot->ot_ind_rhandle, TRUE, ot->ot_thread);
3570
ot->ot_ind_rhandle = NULL;
3573
#ifdef XT_TRACK_INDEX_UPDATES
3574
ot->ot_ind_changed = 0;
3576
idx_newstack(&stack);
3578
XT_INDEX_READ_LOCK(ind, ot);
3580
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
3581
XT_INDEX_UNLOCK(ind, ot);
3582
if (ind->mi_key_corrupted) {
3583
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3589
while (XT_NODE_ID(current)) {
3590
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3592
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
3593
if (result.sr_item.i_node_ref_size) {
3594
if (result.sr_found) {
3595
/* If we have found the key in a node: */
3598
/* Go down to the bottom: */
3599
while (XT_NODE_ID(current)) {
3600
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3601
if (!idx_push(&stack, current, &result.sr_item))
3603
current = result.sr_branch;
3604
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3606
ind->mi_last_item(ot->ot_table, ind, iref.ir_branch, &result);
3607
if (!result.sr_item.i_node_ref_size)
3611
/* If the leaf empty we have to go up the stack again... */
3612
if (result.sr_item.i_total_size == 0)
3615
if (ind->mi_lazy_delete) {
3616
while (result.sr_row_id == (xtRowID) -1) {
3617
if (result.sr_item.i_item_offset == 0)
3618
goto search_up_stack;
3619
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3623
goto unlock_check_on_key;
3627
/* We have reached the leaf.
3628
* Whether we found the key or not, we have
3629
* to move one to the left.
3631
if (result.sr_item.i_item_offset == 0)
3633
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3635
if (ind->mi_lazy_delete) {
3636
while (result.sr_row_id == (xtRowID) -1) {
3637
if (result.sr_item.i_item_offset == 0)
3638
goto search_up_stack;
3639
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3643
goto unlock_check_on_key;
3645
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3646
if (!idx_push(&stack, current, &result.sr_item))
3648
current = result.sr_branch;
3652
/* We are at the start of a leaf node.
3653
* Go up the stack to find the start poition of the next key.
3654
* If we find none, then we are the end of the index.
3656
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3657
while ((node = idx_pop(&stack))) {
3658
if (node->i_pos.i_item_offset > node->i_pos.i_node_ref_size) {
3659
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3661
result.sr_item = node->i_pos;
3662
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3664
if (ind->mi_lazy_delete) {
3665
if (result.sr_row_id == (xtRowID) -1) {
3666
current = node->i_branch;
3667
goto search_down_stack;
3671
goto unlock_check_on_key;
3677
search_key->sk_on_key = FALSE;
3678
ot->ot_curr_rec_id = 0;
3679
ot->ot_curr_row_id = 0;
3681
XT_INDEX_UNLOCK(ind, ot);
3682
if (ind->mi_key_corrupted) {
3683
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3688
unlock_check_on_key:
3689
ASSERT_NS(!ot->ot_ind_rhandle);
3690
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3695
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3696
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3697
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3700
XT_INDEX_UNLOCK(ind, ot);
3703
if (search_key && search_key->sk_on_key) {
3704
xt_ind_lock_handle(ot->ot_ind_rhandle);
3705
search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
3706
search_key->sk_key_value.sv_key, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset]) == 0;
3707
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3711
ot->ot_curr_rec_id = result.sr_rec_id;
3712
ot->ot_curr_row_id = result.sr_row_id;
3713
ot->ot_ind_state = result.sr_item;
3714
if (ind->mi_key_corrupted) {
3715
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
3721
XT_INDEX_UNLOCK(ind, ot);
3722
if (idx_out_of_memory_failure(ot))
3723
goto retry_after_oom;
3727
/* Return TRUE if the record matches the current index search! */
3728
xtPublic xtBool xt_idx_match_search(register XTOpenTablePtr XT_UNUSED(ot), register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, xtWord1 *buf, int mode)
3731
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3733
myxt_create_key_from_row(ind, key_buf, (xtWord1 *) buf, NULL);
3734
r = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length, search_key->sk_key_value.sv_key, key_buf);
3736
case XT_S_MODE_MATCH:
3738
case XT_S_MODE_NEXT:
3740
case XT_S_MODE_PREV:
3746
static void idx_set_index_selectivity(XTOpenTablePtr ot, XTIndexPtr ind, XTThreadPtr thread)
3748
static const xtRecordID MAX_RECORDS = 100;
3750
XTIdxSearchKeyRec search_key;
3751
XTIndexSegPtr key_seg;
3752
u_int select_count[2] = {0, 0};
3753
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3755
xtWord1 *next_key_buf;
3760
/* these 2 vars are used to check the overlapping if we have < 200 records */
3761
xtRecordID last_rec = 0; /* last record accounted in this iteration */
3762
xtRecordID last_iter_rec = 0; /* last record accounted in the previous iteration */
3764
xtBool (* xt_idx_iterator[2])(
3765
register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key) = {
3771
xtBool (* xt_idx_begin[2])(
3772
struct XTOpenTable *ot, struct XTIndex *ind, register XTIdxSearchKeyPtr search_key) = {
3778
ind->mi_select_total = 0;
3779
key_seg = ind->mi_seg;
3780
for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3781
key_seg->is_selectivity = 1;
3782
key_seg->is_recs_in_range = 1;
3785
for (j=0; j < 2; j++) {
3786
xt_idx_prep_key(ind, &search_key, j == 0 ? XT_SEARCH_FIRST_FLAG : XT_SEARCH_AFTER_LAST_FLAG, NULL, 0);
3787
if (!(xt_idx_begin[j])(ot, ind, &search_key))
3790
/* Initialize the buffer with the first index valid index entry: */
3791
while (!select_count[j] && ot->ot_curr_rec_id != last_iter_rec) {
3792
if (ot->ot_curr_row_id) {
3794
last_rec = ot->ot_curr_rec_id;
3796
key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3797
xt_ind_lock_handle(ot->ot_ind_rhandle);
3798
memcpy(key_buf, ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset, key_len);
3799
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3801
if (!(xt_idx_iterator[j])(ot, ind, &search_key))
3805
while (select_count[j] < MAX_RECORDS && ot->ot_curr_rec_id != last_iter_rec) {
3806
/* Check if the index entry is committed: */
3807
if (ot->ot_curr_row_id) {
3808
xt_ind_lock_handle(ot->ot_ind_rhandle);
3810
last_rec = ot->ot_curr_rec_id;
3812
next_key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3813
next_key_buf = ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset;
3817
key_seg = ind->mi_seg;
3818
for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3819
curr_len += myxt_key_seg_length(key_seg, curr_len, key_buf);
3820
if (!diff && myxt_compare_key(ind, 0, curr_len, key_buf, next_key_buf) != 0)
3823
key_seg->is_selectivity++;
3826
/* Store the key for the next comparison: */
3827
key_len = next_key_len;
3828
memcpy(key_buf, next_key_buf, key_len);
3829
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3832
if (!(xt_idx_iterator[j])(ot, ind, &search_key))
3836
last_iter_rec = last_rec;
3838
if (ot->ot_ind_rhandle) {
3839
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
3840
ot->ot_ind_rhandle = NULL;
3846
select_total = select_count[0] + select_count[1];
3850
ind->mi_select_total = select_total;
3851
key_seg = ind->mi_seg;
3852
for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3853
recs = (u_int) ((double) select_total / (double) key_seg->is_selectivity + (double) 0.5);
3854
key_seg->is_recs_in_range = recs ? recs : 1;
3860
if (ot->ot_ind_rhandle) {
3861
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
3862
ot->ot_ind_rhandle = NULL;
3866
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
3867
xt_log_and_clear_exception_ns();
3871
xtPublic void xt_ind_set_index_selectivity(XTOpenTablePtr ot, XTThreadPtr thread)
3873
XTTableHPtr tab = ot->ot_table;
3879
xt_lock_mutex_ns(&tab->tab_ind_stat_lock);
3880
if (tab->tab_ind_stat_calc_time < now) {
3881
if (!tab->tab_dic.dic_disable_index) {
3882
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++)
3883
idx_set_index_selectivity(ot, *ind, thread);
3885
tab->tab_ind_stat_calc_time = time(NULL);
3887
xt_unlock_mutex_ns(&tab->tab_ind_stat_lock);
3891
* -----------------------------------------------------------------------
3896
static void idx_check_on_key(XTOpenTablePtr ot)
3898
u_int offs = ot->ot_ind_state.i_item_offset + ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3902
if (ot->ot_curr_rec_id && ot->ot_ind_state.i_item_offset < ot->ot_ind_state.i_total_size) {
3903
xt_get_record_ref(&ot->ot_ind_rbuf.tb_data[offs], &rec_id, &row_id);
3905
ASSERT_NS(rec_id == ot->ot_curr_rec_id);
3910
static void idx_check_space(int
3917
for (int i=0; i<depth; i++)
3924
//#define FILL_COMPRESS_BLOCKS
3926
#ifdef FILL_COMPRESS_BLOCKS
3927
#define COMPRESS_BLOCK_SIZE (1024 * 128)
3929
#define COMPRESS_BLOCK_SIZE 16384
3946
unsigned char out[COMPRESS_BLOCK_SIZE];
3947
unsigned char uncomp[COMPRESS_BLOCK_SIZE];
3948
xtWord1 precomp[COMPRESS_BLOCK_SIZE+16000];
3950
u_int idx_precompress(XTIndexPtr ind, u_int node_ref_size, u_int item_size, u_int insize, xtWord1 *in_data, xtWord1 *out_data)
3952
xtWord1 *prev_item = NULL;
3953
xtWord1 *in_ptr = in_data;
3954
xtWord1 *out_ptr = out_data;
3958
if (insize >= node_ref_size) {
3959
memcpy(out_ptr, in_ptr, node_ref_size);
3960
insize -= node_ref_size;
3961
out_size += node_ref_size;
3962
in_ptr += node_ref_size;
3963
out_ptr += node_ref_size;
3966
while (insize >= item_size + node_ref_size) {
3969
while (same_size < item_size + node_ref_size && *prev_item == *in_ptr) {
3974
ASSERT_NS(same_size < 256);
3975
*out_ptr = (xtWord1) same_size;
3978
same_size = item_size + node_ref_size - same_size;
3979
memcpy(out_ptr, in_ptr, same_size);
3980
out_size += same_size;
3981
out_ptr += same_size;
3982
in_ptr += same_size;
3983
prev_item += same_size;
3987
memcpy(out_ptr, in_ptr, item_size + node_ref_size);
3988
out_size += item_size + node_ref_size;
3989
out_ptr += item_size + node_ref_size;
3990
in_ptr += item_size + node_ref_size;
3992
insize -= (item_size + node_ref_size);
3997
u_int idx_compress(u_int insize, xtWord1 *in_data, u_int outsize, xtWord1 *out_data)
4002
strm.zalloc = Z_NULL;
4003
strm.zfree = Z_NULL;
4004
strm.opaque = Z_NULL;
4006
strm.next_in = Z_NULL;
4007
ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
4008
strm.avail_out = outsize;
4009
strm.next_out = out_data;
4010
strm.avail_in = insize;
4011
strm.next_in = in_data;
4012
ret = deflate(&strm, Z_FINISH);
4014
return outsize - strm.avail_out;
4020
memset(&strm, 0, sizeof(strm));
4022
ret = BZ2_bzCompressInit(&strm, 1, 0, 0);
4023
strm.avail_out = outsize;
4024
strm.next_out = (char *) out_data;
4025
strm.avail_in = insize;
4026
strm.next_in = (char *) in_data;
4027
ret = BZ2_bzCompress(&strm, BZ_FINISH);
4029
BZ2_bzCompressEnd(&strm);
4030
return outsize - strm.avail_out;
4034
u_int idx_decompress(u_int insize, xtWord1 *in_data, u_int outsize, xtWord1 *out_data)
4039
strm.zalloc = Z_NULL;
4040
strm.zfree = Z_NULL;
4041
strm.opaque = Z_NULL;
4043
strm.next_in = Z_NULL;
4044
ret = inflateInit(&strm);
4045
strm.avail_out = outsize;
4046
strm.next_out = out_data;
4047
strm.avail_in = insize;
4048
strm.next_in = in_data;
4049
ret = inflate(&strm, Z_FINISH);
4051
return outsize - strm.avail_out;
4057
memset(&strm, 0, sizeof(strm));
4059
ret = BZ2_bzDecompressInit(&strm, 0, 0);
4060
strm.avail_out = outsize;
4061
strm.next_out = (char *) out_data;
4062
strm.avail_in = insize;
4063
strm.next_in = (char *) in_data;
4064
ret = BZ2_bzDecompress(&strm);
4066
BZ2_bzDecompressEnd(&strm);
4067
return outsize - strm.avail_out;
4070
#endif // DO_COMP_TEST
4072
static u_int idx_check_node(XTOpenTablePtr ot, XTIndexPtr ind, int depth, xtIndexNodeID node)
4074
XTIdxResultRec result;
4075
u_int block_count = 1;
4076
XTIndReferenceRec iref;
4080
unsigned uncomp_size;
4087
iref.ir_updated = 2;
4089
ASSERT_NS(XT_NODE_ID(node) <= XT_NODE_ID(ot->ot_table->tab_ind_eof));
4091
now = xt_trace_clock();
4093
/* A deadlock can occur when taking a read lock
4094
* because the XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)
4095
* only takes into account WRITE locks.
4096
* So, if we hold a READ lock on a page, and ind_free_block() trys to
4097
* free the block, it hangs on its own read lock!
4099
* So we change from READ lock to a WRITE lock.
4100
* If too restrictive then locks need to handle TRY on a
4101
* read lock as well.
4103
* #3 0x00e576b6 in xt_yield at thread_xt.cc:1351
4104
* #4 0x00e7218e in xt_spinxslock_xlock at lock_xt.cc:1467
4105
* #5 0x00dee1a9 in ind_free_block at cache_xt.cc:901
4106
* #6 0x00dee500 in ind_cac_free_lru_blocks at cache_xt.cc:1054
4107
* #7 0x00dee88c in ind_cac_fetch at cache_xt.cc:1151
4108
* #8 0x00def6d4 in xt_ind_fetch at cache_xt.cc:1480
4109
* #9 0x00e1ce2e in idx_check_node at index_xt.cc:3996
4110
* #10 0x00e1cf2b in idx_check_node at index_xt.cc:4106
4111
* #11 0x00e1cf2b in idx_check_node at index_xt.cc:4106
4112
* #12 0x00e1cfdc in idx_check_index at index_xt.cc:4130
4113
* #13 0x00e1d11c in xt_check_indices at index_xt.cc:4181
4114
* #14 0x00e4aa82 in xt_check_table at table_xt.cc:2363
4116
if (!xt_ind_fetch(ot, ind, node, XT_LOCK_WRITE, &iref))
4119
time = xt_trace_clock() - now;
4123
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4124
ASSERT_NS(result.sr_item.i_total_size + offsetof(XTIdxBranchDRec, tb_data) <= XT_INDEX_PAGE_SIZE);
4127
u_int size = result.sr_item.i_total_size;
4128
xtWord1 *data = iref.ir_branch->tb_data;
4131
size = idx_precompress(ind, result.sr_item.i_node_ref_size, result.sr_item.i_item_size, size, data, precomp);
4132
if (size > result.sr_item.i_total_size)
4133
size = result.sr_item.i_total_size;
4139
usage_total += result.sr_item.i_total_size;
4141
#ifdef FILL_COMPRESS_BLOCKS
4142
if (fill_size + size > COMPRESS_BLOCK_SIZE) {
4143
now = xt_trace_clock();
4144
comp_size = idx_compress(fill_size, precomp, COMPRESS_BLOCK_SIZE, out);
4145
time = xt_trace_clock() - now;
4148
zlib_total += comp_size;
4149
filled_size += fill_size;
4151
now = xt_trace_clock();
4152
uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4153
time = xt_trace_clock() - now;
4154
uncomp_time += time;
4156
if (uncomp_size != fill_size)
4161
memcpy(precomp + fill_size, data, size);
4164
now = xt_trace_clock();
4165
comp_size = idx_compress(size, data, COMPRESS_BLOCK_SIZE, out);
4166
time = xt_trace_clock() - now;
4168
zlib_total += comp_size;
4170
now = xt_trace_clock();
4171
uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4172
time = xt_trace_clock() - now;
4173
uncomp_time += time;
4174
if (uncomp_size != size)
4178
if (comp_size <= 1024)
4180
else if (comp_size <= 2048)
4182
else if (comp_size <= 4096)
4184
else if (comp_size <= 8192)
4189
#endif // DO_COMP_TEST
4191
if (result.sr_item.i_node_ref_size) {
4192
idx_check_space(depth);
4194
printf("%04d -->\n", (int) XT_NODE_ID(result.sr_branch));
4196
#ifdef TRACK_ACTIVITY
4197
track_block_exists(result.sr_branch);
4199
block_count += idx_check_node(ot, ind, depth+1, result.sr_branch);
4202
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4203
#ifdef CHECK_PRINTS_RECORD_REFERENCES
4204
idx_check_space(depth);
4205
if (result.sr_item.i_item_size == 12) {
4206
/* Assume this is a NOT-NULL INT!: */
4207
xtWord4 val = XT_GET_DISK_4(&iref.ir_branch->tb_data[result.sr_item.i_item_offset]);
4209
printf("(%6d) ", (int) val);
4213
printf("rec=%d row=%d ", (int) result.sr_rec_id, (int) result.sr_row_id);
4217
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4218
if (result.sr_item.i_node_ref_size) {
4219
idx_check_space(depth);
4221
printf("%04d -->\n", (int) XT_NODE_ID(result.sr_branch));
4223
#ifdef TRACK_ACTIVITY
4224
track_block_exists(result.sr_branch);
4226
block_count += idx_check_node(ot, ind, depth+1, result.sr_branch);
4230
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, &iref);
4234
static u_int idx_check_index(XTOpenTablePtr ot, XTIndexPtr ind, xtBool with_lock)
4236
xtIndexNodeID current;
4237
u_int block_count = 0;
4241
XT_INDEX_WRITE_LOCK(ind, ot);
4244
printf("INDEX (%d) %04d ---------------------------------------\n", (int) ind->mi_index_no, (int) XT_NODE_ID(ind->mi_root));
4246
if ((XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
4247
#ifdef TRACK_ACTIVITY
4248
track_block_exists(ind->mi_root);
4250
block_count = idx_check_node(ot, ind, 0, current);
4253
if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
4255
printf("INDEX (%d) FREE ---------------------------------------", (int) ind->mi_index_no);
4257
ASSERT_NS(ind->mi_free_list->fl_start == 0);
4258
for (i=0; i<ind->mi_free_list->fl_free_count; i++) {
4264
#ifdef TRACK_ACTIVITY
4265
track_block_exists(ind->mi_free_list->fl_page_id[i]);
4268
printf("%2d ", (int) XT_NODE_ID(ind->mi_free_list->fl_page_id[i]));
4278
XT_INDEX_UNLOCK(ind, ot);
4283
xtPublic void xt_check_indices(XTOpenTablePtr ot)
4285
register XTTableHPtr tab = ot->ot_table;
4287
xtIndexNodeID current;
4288
XTIndFreeBlockRec free_block;
4289
u_int ind_count, block_count = 0;
4290
u_int free_count = 0;
4293
xt_lock_mutex_ns(&tab->tab_ind_flush_lock);
4294
printf("CHECK INDICES %s ==============================\n", tab->tab_name->ps_path);
4295
#ifdef TRACK_ACTIVITY
4296
track_reset_missing();
4299
ind = tab->tab_dic.dic_keys;
4300
for (u_int k=0; k<tab->tab_dic.dic_key_count; k++, ind++) {
4301
ind_count = idx_check_index(ot, *ind, TRUE);
4302
block_count += ind_count;
4308
#ifdef FILL_COMPRESS_BLOCKS
4309
if (fill_size > 0) {
4311
unsigned uncomp_size;
4315
now = xt_trace_clock();
4316
comp_size = idx_compress(fill_size, precomp, COMPRESS_BLOCK_SIZE, out);
4317
time = xt_trace_clock() - now;
4319
zlib_total += comp_size;
4320
filled_size += fill_size;
4322
now = xt_trace_clock();
4323
uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4324
time = xt_trace_clock() - now;
4325
uncomp_time += time;
4327
if (filled_size != usage_total)
4331
printf("Total blocks = %d\n", blocks);
4332
printf("zlib <= 1024 = %d\n", zlib_1024);
4333
printf("zlib <= 2048 = %d\n", zlib_2048);
4334
printf("zlib <= 4096 = %d\n", zlib_4096);
4335
printf("zlib <= 8192 = %d\n", zlib_8192);
4336
printf("zlib <= 16384 = %d\n", zlib_16384);
4337
printf("zlib average size = %.2f\n", (double) zlib_total / (double) blocks);
4338
printf("zlib average time = %.2f\n", (double) zlib_time / (double) blocks);
4339
printf("read average time = %.2f\n", (double) read_time / (double) blocks);
4340
printf("uncompress time = %.2f\n", (double) uncomp_time / (double) blocks);
4341
block_total = (zlib_1024 + zlib_2048) * 8192;
4342
block_total += zlib_4096 * 8192;
4343
block_total += zlib_8192 * 8192;
4344
block_total += zlib_16384 * 16384;
4345
printf("block total = %d\n", block_total);
4346
printf("block %% compress = %.2f\n", ((double) block_total * (double) 100) / ((double) blocks * (double) 16384));
4347
printf("Total size = %d\n", blocks * 16384);
4348
printf("total before zlib = %d\n", usage_total);
4349
printf("total after zlib = %d\n", zlib_total);
4350
printf("zlib %% compress = %.2f\n", ((double) zlib_total * (double) 100) / (double) usage_total);
4351
printf("total %% compress = %.2f\n", ((double) zlib_total * (double) 100) / (double) (blocks * 16384));
4354
xt_lock_mutex_ns(&tab->tab_ind_lock);
4356
printf("\nFREE: ---------------------------------------\n");
4358
if (tab->tab_ind_free_list) {
4359
XTIndFreeListPtr ptr;
4361
ptr = tab->tab_ind_free_list;
4364
printf("Memory List:");
4367
for (j=ptr->fl_start; j<ptr->fl_free_count; j++, i++) {
4373
#ifdef TRACK_ACTIVITY
4374
track_block_exists(ptr->fl_page_id[j]);
4377
printf("%2d ", (int) XT_NODE_ID(ptr->fl_page_id[j]));
4384
ptr = ptr->fl_next_list;
4388
current = tab->tab_ind_free;
4389
if (XT_NODE_ID(current)) {
4392
printf("Disk List:");
4394
while (XT_NODE_ID(current)) {
4400
#ifdef TRACK_ACTIVITY
4401
track_block_exists(current);
4404
printf("%d ", (int) XT_NODE_ID(current));
4406
if (!xt_ind_read_bytes(ot, NULL, current, sizeof(XTIndFreeBlockRec), (xtWord1 *) &free_block)) {
4407
xt_log_and_clear_exception_ns();
4410
XT_NODE_ID(current) = (xtIndexNodeID) XT_GET_DISK_8(free_block.if_next_block_8);
4419
printf("\n-----------------------------\n");
4420
printf("used blocks %d + free blocks %d = %d\n", block_count, free_count, block_count + free_count);
4421
printf("EOF = %"PRIu64", total blocks = %d\n", (xtWord8) xt_ind_node_to_offset(tab, tab->tab_ind_eof), (int) (XT_NODE_ID(tab->tab_ind_eof) - 1));
4422
printf("-----------------------------\n");
4424
xt_unlock_mutex_ns(&tab->tab_ind_lock);
4425
#ifdef TRACK_ACTIVITY
4426
track_dump_missing(tab->tab_ind_eof);
4427
printf("===================================================\n");
4428
track_dump_all((u_int) (XT_NODE_ID(tab->tab_ind_eof) - 1));
4430
printf("===================================================\n");
4431
xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
4435
* -----------------------------------------------------------------------
4439
static void idx_load_node(XTThreadPtr self, XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID node)
4441
XTIdxResultRec result;
4442
XTIndReferenceRec iref;
4444
ASSERT_NS(XT_NODE_ID(node) <= XT_NODE_ID(ot->ot_table->tab_ind_eof));
4445
if (!xt_ind_fetch(ot, ind, node, XT_LOCK_READ, &iref))
4448
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4449
if (result.sr_item.i_node_ref_size)
4450
idx_load_node(self, ot, ind, result.sr_branch);
4451
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4452
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4453
if (result.sr_item.i_node_ref_size)
4454
idx_load_node(self, ot, ind, result.sr_branch);
4457
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
4460
xtPublic void xt_load_indices(XTThreadPtr self, XTOpenTablePtr ot)
4462
register XTTableHPtr tab = ot->ot_table;
4463
XTIndexPtr *ind_ptr;
4465
xtIndexNodeID current;
4467
xt_lock_mutex(self, &tab->tab_ind_flush_lock);
4468
pushr_(xt_unlock_mutex, &tab->tab_ind_flush_lock);
4470
ind_ptr = tab->tab_dic.dic_keys;
4471
for (u_int k=0; k<tab->tab_dic.dic_key_count; k++, ind_ptr++) {
4473
XT_INDEX_WRITE_LOCK(ind, ot);
4474
if ((XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
4475
idx_load_node(self, ot, ind, current);
4476
XT_INDEX_UNLOCK(ind, ot);
4479
freer_(); // xt_unlock_mutex(&tab->tab_ind_flush_lock)
4483
* -----------------------------------------------------------------------
4484
* Count the number of deleted entries in a node:
4488
* {LAZY-DEL-INDEX-ITEMS}
4490
* Use this function to count the number of deleted items
4491
* in a node when it is loaded.
4493
* The count helps us decide of the node should be "packed".
4495
xtPublic void xt_ind_count_deleted_items(XTTableHPtr tab, XTIndexPtr ind, XTIndBlockPtr block)
4497
XTIdxResultRec result;
4499
xtWord2 branch_size;
4501
branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
4503
/* This is possible when reading free pages. */
4504
if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE)
4507
idx_first_branch_item(tab, ind, (XTIdxBranchDPtr) block->cb_data, &result);
4508
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4509
if (result.sr_row_id == (xtRowID) -1)
4511
idx_next_branch_item(tab, ind, (XTIdxBranchDPtr) block->cb_data, &result);
4513
block->cp_del_count = del_count;
4517
* -----------------------------------------------------------------------
4521
xtBool XTIndDirtyList::dl_add_block(XTIndBlockPtr block)
4523
XTIndDirtyBlocksPtr blocks;
4525
blocks = dl_block_lists;
4526
if (dl_list_usage == XT_DIRTY_BLOCK_LIST_SIZE || !blocks) {
4527
if (!(blocks = (XTIndDirtyBlocksPtr) xt_malloc_ns(sizeof(XTIndDirtyBlocksRec))))
4530
blocks->db_next = dl_block_lists;
4531
dl_block_lists = blocks;
4533
blocks->db_blocks[dl_list_usage] = block;
4539
static int idx_compare_blocks(const void *a, const void *b)
4541
XTIndBlockPtr b_a = *((XTIndBlockPtr *) a);
4542
XTIndBlockPtr b_b = *((XTIndBlockPtr *) b);
4544
if (b_a->cb_address == b_b->cb_address)
4546
if (b_a->cb_address < b_b->cb_address)
4551
void XTIndDirtyList::dl_sort_blocks()
4553
XTIndDirtyBlocksPtr blocks;
4556
size = dl_list_usage;
4557
blocks = dl_block_lists;
4559
qsort(blocks->db_blocks, size, sizeof(XTIndBlockPtr), idx_compare_blocks);
4560
blocks = blocks->db_next;
4561
size = XT_DIRTY_BLOCK_LIST_SIZE;
4565
void XTIndDirtyList::dl_free_all()
4567
XTIndDirtyBlocksPtr blocks, n_blocks;
4569
blocks = dl_block_lists;
4570
dl_block_lists = NULL;
4571
dl_total_blocks = 0;
4574
n_blocks = blocks->db_next;
4581
* -----------------------------------------------------------------------
4582
* Index consistent flush
4585
xtBool XTFlushIndexTask::tk_task(XTThreadPtr thread)
4589
fit_dirty_blocks = 0;
4590
fit_blocks_flushed = 0;
4592
/* See {TASK-TABLE-GONE} */
4593
if (!(xt_db_open_pool_table_ns(&ot, fit_table->tab_db, fit_table->tab_id)))
4597
/* Can happen if the table has been dropped: */
4598
if (thread->t_exception.e_xt_err)
4599
xt_log_and_clear_exception(thread);
4600
xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table was not found\n", (u_long) fit_table->tab_id);
4601
xt_checkpoint_set_flush_state(fit_table->tab_db, fit_table->tab_id, XT_CPT_STATE_DONE_ALL);
4605
if (ot->ot_table != fit_table) {
4606
/* Can happen if the table has been renamed: */
4607
if (thread->t_exception.e_xt_err)
4608
xt_log_and_clear_exception(thread);
4609
xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table has been renamed\n", (u_long) fit_table->tab_id);
4610
xt_checkpoint_set_flush_state(fit_table->tab_db, fit_table->tab_id, XT_CPT_STATE_DONE_ALL);
4614
if (!xt_flush_indices(ot, NULL, FALSE, this)) {
4615
xt_db_return_table_to_pool_ns(ot);
4620
xt_db_return_table_to_pool_ns(ot);
4624
void XTFlushIndexTask::tk_reference()
4626
xt_heap_reference_ns(fit_table);
4629
void XTFlushIndexTask::tk_release()
4631
xt_heap_release_ns(fit_table);
4635
* Set notify_before_write to TRUE if the caller requires
4636
* notification before the index file is written.
4638
* This is used if the index is flushed due to lock of index cache.
4640
xtPublic xtBool xt_async_flush_indices(XTTableHPtr tab, xtBool notify_complete, xtBool notify_before_write, XTThreadPtr thread)
4643
return xt_run_async_task(tab->tab_ind_flush_task, notify_complete, notify_before_write, thread, tab->tab_db);
4646
#if defined(PRINT_IND_FLUSH_STATS) || defined(TRACE_FLUSH_TIMES)
4648
static char *idx_format(char *buffer, double v)
4651
sprintf(buffer, "%9.2f", v);
4652
if (strcmp(buffer, " nan") == 0)
4653
strcpy(buffer, " ");
4656
strcpy(buffer, " ");
4660
static char *idx_format_mb(char *buffer, double v)
4663
sprintf(buffer, "%7.3f", v / (double) 1024);
4664
if (strcmp(buffer, " nan") == 0)
4665
strcpy(buffer, " ");
4668
strcpy(buffer, " ");
4673
#ifdef TRACE_FLUSH_TIMES
4675
#define ILOG_FLUSH 1
4676
#define INDEX_FLUSH 2
4685
static void idx_print(char *msg, XTThreadPtr thread, struct idxstats *st, xtWord8 *now, int flush)
4689
double dilogw, didxw;
4695
then = xt_trace_clock();
4697
ilogw = (double) (thread->st_statistics.st_ilog.ts_write - st->i_log_write) / (double) 1024;
4698
dilogw = ((double) ilogw * (double) 1000000) / (double) t;
4699
idxw = (double) (thread->st_statistics.st_ind.ts_write - st->idx_write) / (double) 1024;
4700
didxw = ((double) idxw * (double) 1000000) / (double) t;
4702
printf("%26s | TIME: %7d ", msg, (int) t);
4703
printf("ILOG: %s - %s INDX: %s - %s\n",
4704
idx_format_mb(buf1, dilogw), idx_format(buf2, ilogw),
4705
idx_format_mb(buf3, didxw), idx_format(buf4, idxw));
4706
st->i_log_write = thread->st_statistics.st_ilog.ts_write;
4707
st->idx_write = thread->st_statistics.st_ind.ts_write;
4711
ilogw = (double) (thread->st_statistics.st_ilog.ts_write - st->i_log_flush) / (double) 1024;
4712
dilogw = ((double) ilogw * (double) 1000000) / (double) t;
4713
printf("%26s | TIME: %7s ", " ", " ");
4714
printf("ILOG: %s - %s INDX: %s - %s\n",
4715
idx_format_mb(buf1, dilogw), idx_format(buf2, ilogw),
4716
idx_format_mb(buf3, 0.0), idx_format(buf4, 0.0));
4717
st->i_log_flush = thread->st_statistics.st_ilog.ts_write;
4720
idxw = (double) (thread->st_statistics.st_ind.ts_write - st->idx_flush) / (double) 1024;
4721
didxw = ((double) idxw * (double) 1000000) / (double) t;
4722
printf("%26s | TIME: %7s ", " ", " ");
4723
printf("ILOG: %s - %s INDX: %s - %s\n",
4724
idx_format_mb(buf1, 0.0), idx_format(buf2, 0.0),
4725
idx_format_mb(buf3, didxw), idx_format(buf4, idxw));
4726
st->idx_flush = thread->st_statistics.st_ind.ts_write;
4730
*now = xt_trace_clock();
4733
#define TRACE_FLUSH(a, b, c, d, e) idx_print(a, b, c, d, e)
4735
#else // TRACE_FLUSH_TIMES
4737
#define TRACE_FLUSH(a, b, c, d, e)
4739
#endif // TRACE_FLUSH_TIMES
4741
/* Flush the indexes of a table.
4742
* If a ft is given, then this means this is an asynchronous flush.
4744
xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool have_table_lock, XTFlushIndexTask *fit)
4746
register XTTableHPtr tab = ot->ot_table;
4751
XTIndBlockPtr block, fblock;
4753
xtIndexNodeID ind_free;
4754
xtBool block_on_free_list = FALSE;
4755
xtIndexNodeID last_address, next_address;
4756
XTIndFreeListPtr list_ptr;
4758
XTIndDirtyListItorRec it;
4759
//u_int dirty_count;
4760
#ifdef TRACE_FLUSH_INDEX
4764
#ifdef TRACE_FLUSH_TIMES
4765
XTThreadPtr thread = ot->ot_thread;
4768
st.i_log_flush = thread->st_statistics.st_ilog.ts_write;
4769
st.i_log_write = thread->st_statistics.st_ilog.ts_write;
4770
st.idx_flush = thread->st_statistics.st_ind.ts_write;
4771
st.idx_write = thread->st_statistics.st_ind.ts_write;
4772
now = xt_trace_clock();
4775
#ifdef DEBUG_CHECK_IND_CACHE
4776
xt_ind_check_cache(NULL);
4778
xt_lock_mutex_ns(&tab->tab_ind_flush_lock);
4779
TRACE_FLUSH("LOCKED flush index lock", thread, &st, &now, 0);
4781
if (!xt_begin_checkpoint(tab->tab_db, have_table_lock, ot->ot_thread))
4784
ASSERT_NS(!tab->tab_ind_flush_ilog);
4785
if (!tab->tab_db->db_indlogs.ilp_get_log(&tab->tab_ind_flush_ilog, ot->ot_thread))
4787
il = tab->tab_ind_flush_ilog;
4789
if (!il->il_reset(ot))
4791
if (!il->il_write_byte(ot, XT_DT_LOG_HEAD))
4793
if (!il->il_write_word4(ot, tab->tab_id))
4795
if (!il->il_write_word4(ot, 0))
4797
TRACE_FLUSH("reset ilog", thread, &st, &now, 0);
4801
indp = tab->tab_dic.dic_keys;
4802
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4804
XT_INDEX_WRITE_LOCK(ind, ot);
4805
if (ind->mi_free_list && ind->mi_free_list->fl_free_count)
4806
block_on_free_list = TRUE;
4807
dirty_blocks += ind->mi_dirty_blocks;
4809
TRACE_FLUSH("LOCKED all indexes", thread, &st, &now, 0);
4811
if (!dirty_blocks && !block_on_free_list) {
4812
/* Nothing to flush... */
4813
indp = tab->tab_dic.dic_keys;
4814
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4816
XT_INDEX_UNLOCK(ind, ot);
4821
#ifdef TRACE_FLUSH_INDEX
4823
printf("FLUSH INDEX pages=%lu %s\n", (u_long) dirty_blocks, tab->tab_name->ps_path);
4827
fit->fit_dirty_blocks = dirty_blocks;
4829
// 128 dirty blocks == 2MB
4831
*bytes_flushed += (dirty_blocks * XT_INDEX_PAGE_SIZE);
4833
/* Collect the index roots: */
4834
data = tab->tab_index_head->tp_data;
4836
/* Collect a complete list of all dirty blocks: */
4837
indp = tab->tab_dic.dic_keys;
4838
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4840
xt_spinlock_lock(&ind->mi_dirty_lock);
4841
if ((block = ind->mi_dirty_list)) {
4843
ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
4844
#ifdef IND_OPT_DATA_WRITTEN
4845
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
4847
tab->tab_ind_dirty_list.dl_add_block(block);
4848
fblock = block->cb_dirty_next;
4849
block->cb_dirty_next = NULL;
4850
block->cb_dirty_prev = NULL;
4851
block->cb_state = IDX_CAC_BLOCK_FLUSHING;
4855
//dirty_count = ind->mi_dirty_blocks;
4856
ind->mi_dirty_blocks = 0;
4857
ind->mi_dirty_list = NULL;
4858
xt_spinlock_unlock(&ind->mi_dirty_lock);
4859
//ot->ot_thread->st_statistics.st_ind_cache_dirty -= dirty_count;
4860
XT_SET_NODE_REF(tab, data, ind->mi_root);
4861
data += XT_NODE_REF_SIZE;
4864
TRACE_FLUSH("Collected all blocks", thread, &st, &now, 0);
4866
xt_lock_mutex_ns(&tab->tab_ind_lock);
4867
TRACE_FLUSH("LOCKED table index lock", thread, &st, &now, 0);
4869
/* Write the free list: */
4870
if (block_on_free_list) {
4872
xtWord1 buffer[XT_BLOCK_SIZE_FOR_DIRECT_IO];
4873
XTIndFreeBlockRec free_block;
4875
memset(x.buffer, 0, sizeof(XTIndFreeBlockRec));
4877
/* The old start of the free list: */
4878
XT_NODE_ID(ind_free) = 0;
4879
/* This is a list of lists: */
4880
while ((list_ptr = tab->tab_ind_free_list)) {
4881
/* If this free list still has unused blocks,
4882
* pick the first. That is the front of
4883
* the list of free blocks.
4885
if (list_ptr->fl_start < list_ptr->fl_free_count) {
4886
ind_free = list_ptr->fl_page_id[list_ptr->fl_start];
4889
/* This list is empty, free it: */
4890
tab->tab_ind_free_list = list_ptr->fl_next_list;
4891
xt_free_ns(list_ptr);
4893
/* If nothing is on any list, then
4894
* take the value stored in the index header.
4895
* It is the from of the list on disk.
4897
if (!XT_NODE_ID(ind_free))
4898
ind_free = tab->tab_ind_free;
4900
if (!il->il_write_byte(ot, XT_DT_FREE_LIST))
4902
indp = tab->tab_dic.dic_keys;
4903
XT_NODE_ID(last_address) = 0;
4904
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4906
if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
4907
for (j=0; j<ind->mi_free_list->fl_free_count; j++) {
4908
next_address = ind->mi_free_list->fl_page_id[j];
4909
/* Write out the IDs of the free blocks. */
4910
if (!il->il_write_word4(ot, XT_NODE_ID(ind->mi_free_list->fl_page_id[j])))
4912
if (XT_NODE_ID(last_address)) {
4913
/* Update the page in cache, if it is in the cache! */
4914
XT_SET_DISK_8(x.free_block.if_next_block_8, XT_NODE_ID(next_address));
4915
if (!xt_ind_write_cache(ot, last_address, 8, x.buffer))
4918
last_address = next_address;
4922
if (!il->il_write_word4(ot, XT_NODE_ID(ind_free)))
4924
if (XT_NODE_ID(last_address)) {
4925
XT_SET_DISK_8(x.free_block.if_next_block_8, XT_NODE_ID(tab->tab_ind_free));
4926
if (!xt_ind_write_cache(ot, last_address, 8, x.buffer))
4929
if (!il->il_write_word4(ot, 0xFFFFFFFF))
4934
* Add the free list caches to the global free list cache.
4935
* Added backwards to match the write order.
4937
indp = tab->tab_dic.dic_keys + tab->tab_dic.dic_key_count-1;
4938
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp--) {
4940
if (ind->mi_free_list) {
4941
ind->mi_free_list->fl_next_list = tab->tab_ind_free_list;
4942
tab->tab_ind_free_list = ind->mi_free_list;
4944
ind->mi_free_list = NULL;
4948
* The new start of the free list is the first
4949
* item on the table free list:
4951
XT_NODE_ID(ind_free) = 0;
4952
while ((list_ptr = tab->tab_ind_free_list)) {
4953
if (list_ptr->fl_start < list_ptr->fl_free_count) {
4954
ind_free = list_ptr->fl_page_id[list_ptr->fl_start];
4957
tab->tab_ind_free_list = list_ptr->fl_next_list;
4958
xt_free_ns(list_ptr);
4960
if (!XT_NODE_ID(ind_free))
4961
ind_free = tab->tab_ind_free;
4962
TRACE_FLUSH("did free block stuff", thread, &st, &now, 0);
4963
xt_unlock_mutex_ns(&tab->tab_ind_lock);
4965
XT_SET_DISK_6(tab->tab_index_head->tp_ind_eof_6, XT_NODE_ID(tab->tab_ind_eof));
4966
XT_SET_DISK_6(tab->tab_index_head->tp_ind_free_6, XT_NODE_ID(ind_free));
4968
TRACE_FLUSH("UN-LOCKED table index lock", thread, &st, &now, 0);
4969
if (!il->il_write_header(ot, XT_INDEX_HEAD_SIZE, (xtWord1 *) tab->tab_index_head))
4972
TRACE_FLUSH("wrote ilog header", thread, &st, &now, 0);
4973
indp = tab->tab_dic.dic_keys;
4974
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4976
XT_INDEX_UNLOCK(ind, ot);
4979
TRACE_FLUSH("UN-LOCKED all indexes", thread, &st, &now, 0);
4981
/* Write all blocks to the index log: */
4982
if (tab->tab_ind_dirty_list.dl_total_blocks) {
4983
tab->tab_ind_dirty_list.dl_sort_blocks();
4985
while ((block = tab->tab_ind_dirty_list.dl_next_block(&it))) {
4986
XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
4987
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
4988
if (!il->il_write_block(ot, block)) {
4989
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
4993
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
4997
/* {PAGE-NO-IN-INDEX-FILE}
4998
* At this point all blocks have been written to the index file.
4999
* It is not safe to release them from the cache.
5000
* It was not safe to do this before this point because
5001
* read would read the wrong data.
5003
* The exception to this is freed blocks.
5004
* These are cached separately in the free block list.
5006
TRACE_FLUSH("Wrote all blocks to the ilog", thread, &st, &now, 0);
5008
if (il->il_data_written()) {
5009
/* Flush the log before we flush the index.
5011
* The reason is, we must make sure that changes that
5012
* will be in the index are already in the transaction
5015
* Only then are we able to undo those changes on
5019
* CREATE TABLE t1 (s1 INT PRIMARY KEY);
5020
* INSERT INTO t1 VALUES (1);
5023
* INSERT INTO t1 VALUES (2);
5025
* --- INDEX IS FLUSHED HERE ---
5027
* --- SERVER CRASH HERE ---
5030
* The INSERT VALUES (2) has been written
5031
* to the log, but not flushed.
5032
* But the index has been updated.
5033
* If the index is flushed it will contain
5034
* the entry for record with s1=2.
5036
* This entry must be removed on recovery.
5038
* To prevent this situation I flush the log
5041
if (!XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
5042
/* Note, thread->st_database may not be set here:
5043
* #0 0x00defba3 in xt_spinlock_set at lock_xt.h:249
5044
* #1 0x00defbfd in xt_spinlock_lock at lock_xt.h:299
5045
* #2 0x00e65a98 in XTDatabaseLog::xlog_flush_pending at xactlog_xt.cc:737
5046
* #3 0x00e6a27f in XTDatabaseLog::xlog_flush at xactlog_xt.cc:727
5047
* #4 0x00e6a308 in xt_xlog_flush_log at xactlog_xt.cc:1476
5048
* #5 0x00e22fee in xt_flush_indices at index_xt.cc:4599
5049
* #6 0x00e49fe0 in xt_close_table at table_xt.cc:2678
5050
* #7 0x00df0f10 in xt_db_pool_exit at database_xt.cc:758
5051
* #8 0x00df3ca2 in db_finalize at database_xt.cc:342
5052
* #9 0x00e17037 in xt_heap_release at heap_xt.cc:110
5053
* #10 0x00df07c0 in db_hash_free at database_xt.cc:245
5054
* #11 0x00e0b145 in xt_free_hashtable at hashtab_xt.cc:84
5055
* #12 0x00df093e in xt_exit_databases at database_xt.cc:303
5056
* #13 0x00e0d3cf in pbxt_call_exit at ha_pbxt.cc:946
5057
* #14 0x00e0d498 in ha_exit at ha_pbxt.cc:975
5058
* #15 0x00e0dce6 in pbxt_end at ha_pbxt.cc:1274
5059
* #16 0x00e0dd03 in pbxt_panic at ha_pbxt.cc:1287
5060
* #17 0x002321a1 in ha_finalize_handlerton at handler.cc:392
5061
* #18 0x002f0567 in plugin_deinitialize at sql_plugin.cc:815
5062
* #19 0x002f370e in reap_plugins at sql_plugin.cc:903
5063
* #20 0x002f3eac in plugin_shutdown at sql_plugin.cc:1512
5064
* #21 0x000f3ba6 in clean_up at mysqld.cc:1238
5065
* #22 0x000f4046 in unireg_end at mysqld.cc:1166
5066
* #23 0x000fc2c5 in kill_server at mysqld.cc:1108
5067
* #24 0x000fd0d5 in kill_server_thread at mysqld.cc:1129
5069
if (!tab->tab_db->db_xlog.xlog_flush(ot->ot_thread))
5071
TRACE_FLUSH("FLUSHED xlog", thread, &st, &now, 0);
5074
if (!il->il_flush(ot))
5076
TRACE_FLUSH("FLUSHED ilog", thread, &st, &now, ILOG_FLUSH);
5078
if (!il->il_apply_log_write(ot))
5081
TRACE_FLUSH("wrote all blocks to index", thread, &st, &now, 0);
5084
* 1.0.01 - I have split apply log into flush and write
5087
* I have to write before waiting threads can continue
5088
* because otherwise incorrect data will be read
5089
* when the cache block is freed before it is written
5092
* Risk: (see below).
5095
while ((block = tab->tab_ind_dirty_list.dl_next_block(&it))) {
5096
XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
5097
if (block->cb_state == IDX_CAC_BLOCK_LOGGED) {
5098
block->cb_state = IDX_CAC_BLOCK_CLEAN;
5099
ot->ot_thread->st_statistics.st_ind_cache_dirty--;
5101
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
5104
/* This will early notification for threads waiting for this operation
5105
* to get to this point.
5108
fit->fit_blocks_flushed = fit->fit_dirty_blocks;
5109
fit->fit_dirty_blocks = 0;
5110
xt_async_task_notify(fit);
5114
* 1.0.01 - Note: I have moved the flush to here.
5115
* It allows the calling thread to continue during the
5118
* The problem is, if the ilog flush fails then we have
5119
* lost the information to re-create a consistent flush again!
5121
if (!il->il_apply_log_flush(ot))
5123
TRACE_FLUSH("FLUSHED index file", thread, &st, &now, INDEX_FLUSH);
5127
tab->tab_ind_dirty_list.dl_free_all();
5128
tab->tab_ind_flush_ilog = NULL;
5131
/* Mark this table as index flushed: */
5132
xt_checkpoint_set_flush_state(tab->tab_db, tab->tab_id, XT_CPT_STATE_DONE_INDEX);
5133
TRACE_FLUSH("set index state", thread, &st, &now, 0);
5135
#ifdef TRACE_FLUSH_INDEX
5137
printf("flush index (%d) %s DONE\n", (int) (time(NULL) - tnow), tab->tab_name->ps_path);
5142
xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
5143
TRACE_FLUSH("UN-LOCKED flush index lock", thread, &st, &now, 0);
5145
if (!xt_end_checkpoint(tab->tab_db, ot->ot_thread, NULL))
5148
#ifdef DEBUG_CHECK_IND_CACHE
5149
xt_ind_check_cache((XTIndex *) 1);
5154
indp = tab->tab_dic.dic_keys;
5155
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
5157
XT_INDEX_UNLOCK(ind, ot);
5161
tab->tab_ind_dirty_list.dl_free_all();
5162
tab->tab_ind_flush_ilog = NULL;
5166
xt_checkpoint_set_flush_state(tab->tab_db, tab->tab_id, XT_CPT_STATE_STOP_INDEX);
5168
#ifdef TRACE_FLUSH_INDEX
5170
printf("flush index (%d) %s FAILED\n", (int) (time(NULL) - tnow), tab->tab_name->ps_path);
5175
xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
5176
#ifdef DEBUG_CHECK_IND_CACHE
5177
xt_ind_check_cache(NULL);
5182
void XTIndexLogPool::ilp_init(struct XTThread *self, struct XTDatabase *db, size_t log_buffer_size)
5184
char path[PATH_MAX];
5188
XTIndexLogPtr il = NULL;
5189
XTOpenTablePtr ot = NULL;
5192
ilp_log_buffer_size = log_buffer_size;
5193
xt_init_mutex_with_autoname(self, &ilp_lock);
5195
xt_strcpy(PATH_MAX, path, db->db_main_path);
5196
xt_add_system_dir(PATH_MAX, path);
5197
if (xt_fs_exists(path)) {
5198
pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
5199
while (xt_dir_next(self, od)) {
5200
file = xt_dir_name(self, od);
5201
if (xt_starts_with(file, "ilog")) {
5202
if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
5203
if (!ilp_open_log(&il, log_id, FALSE, self))
5205
if (il->il_tab_id && il->il_log_eof) {
5206
char table_name[XT_IDENTIFIER_NAME_SIZE*3+3];
5208
if (!il->il_open_table(&ot))
5211
xt_tab_make_table_name(ot->ot_table->tab_name, table_name, sizeof(table_name));
5212
xt_logf(XT_NT_INFO, "PBXT: Recovering index, table: %s, bytes to read: %llu\n", table_name, (u_llong) il->il_log_eof);
5213
if (!il->il_apply_log_write(ot)) {
5214
/* If recovery of an index fails, then it is corrupt! */
5215
if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_NO_INDEX_CACHE)
5217
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
5218
xt_log_and_clear_exception_ns();
5221
if (!il->il_apply_log_flush(ot))
5224
ot->ot_thread = self;
5225
il->il_close_table(ot);
5232
freer_(); // xt_dir_close(od)
5238
il->il_close_table(ot);
5240
il->il_close(FALSE);
5244
void XTIndexLogPool::ilp_close(struct XTThread *XT_UNUSED(self), xtBool lock)
5249
xt_lock_mutex_ns(&ilp_lock);
5250
while ((il = ilp_log_pool)) {
5251
ilp_log_pool = il->il_next_in_pool;
5256
xt_unlock_mutex_ns(&ilp_lock);
5259
void XTIndexLogPool::ilp_exit(struct XTThread *self)
5261
ilp_close(self, FALSE);
5262
ASSERT_NS(il_pool_count == 0);
5263
xt_free_mutex(&ilp_lock);
5266
void XTIndexLogPool::ilp_name(size_t size, char *path, xtLogID log_id)
5270
sprintf(name, "ilog-%lu.xt", (u_long) log_id);
5271
xt_strcpy(size, path, ilp_db->db_main_path);
5272
xt_add_system_dir(size, path);
5273
xt_add_dir_char(size, path);
5274
xt_strcat(size, path, name);
5277
xtBool XTIndexLogPool::ilp_open_log(XTIndexLogPtr *ret_il, xtLogID log_id, xtBool excl, XTThreadPtr thread)
5279
char log_path[PATH_MAX];
5281
XTIndLogHeadDRec log_head;
5284
ilp_name(PATH_MAX, log_path, log_id);
5285
if (!(il = (XTIndexLogPtr) xt_calloc_ns(sizeof(XTIndexLogRec))))
5287
xt_spinlock_init_with_autoname(NULL, &il->il_write_lock);
5288
il->il_log_id = log_id;
5291
/* Writes will be rounded up to the nearest direct write block size (see {WRITE-IN-BLOCKS}),
5292
* so make sure we have space in the buffer for that:
5295
if (IND_WRITE_BLOCK_SIZE < XT_BLOCK_SIZE_FOR_DIRECT_IO)
5297
#ifdef IND_WRITE_IN_BLOCK_SIZES
5298
if (XT_INDEX_PAGE_SIZE < IND_WRITE_BLOCK_SIZE)
5302
#ifdef IND_FILL_BLOCK_TO_NEXT
5303
if (!(il->il_buffer = (xtWord1 *) xt_malloc_ns(ilp_log_buffer_size + XT_INDEX_PAGE_SIZE)))
5306
if (!(il->il_buffer = (xtWord1 *) xt_malloc_ns(ilp_log_buffer_size + IND_WRITE_BLOCK_SIZE)))
5309
il->il_buffer_size = ilp_log_buffer_size;
5311
if (!(il->il_of = xt_open_file_ns(log_path, XT_FT_STANDARD, (excl ? XT_FS_EXCLUSIVE : 0) | XT_FS_CREATE | XT_FS_MAKE_PATH, 0)))
5314
if (!xt_pread_file(il->il_of, 0, sizeof(XTIndLogHeadDRec), 0, &log_head, &read_size, &thread->st_statistics.st_ilog, thread))
5317
if (read_size == sizeof(XTIndLogHeadDRec)) {
5318
il->il_tab_id = XT_GET_DISK_4(log_head.ilh_tab_id_4);
5319
il->il_log_eof = XT_GET_DISK_4(log_head.ilh_log_eof_4);
5330
il->il_close(FALSE);
5334
xtBool XTIndexLogPool::ilp_get_log(XTIndexLogPtr *ret_il, XTThreadPtr thread)
5339
xt_lock_mutex_ns(&ilp_lock);
5340
if ((il = ilp_log_pool)) {
5341
ilp_log_pool = il->il_next_in_pool;
5346
log_id = ilp_next_log_id;
5348
xt_unlock_mutex_ns(&ilp_lock);
5350
if (!ilp_open_log(&il, log_id, TRUE, thread))
5357
void XTIndexLogPool::ilp_release_log(XTIndexLogPtr il)
5359
xt_lock_mutex_ns(&ilp_lock);
5360
if (il_pool_count == 5)
5364
il->il_next_in_pool = ilp_log_pool;
5367
xt_unlock_mutex_ns(&ilp_lock);
5370
xtBool XTIndexLog::il_reset(struct XTOpenTable *ot)
5372
XTIndLogHeadDRec log_head;
5373
xtTableID tab_id = ot->ot_table->tab_id;
5378
il_buffer_offset = 0;
5380
/* We must write the header and flush here or the "previous" status (from the
5381
* last flush run) could remain. Failure to write the file completely leave the
5382
* old header in place, and other parts of the file changed.
5383
* This would lead to index corruption.
5385
log_head.ilh_data_type = XT_DT_LOG_HEAD;
5386
XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
5387
XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
5389
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5392
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5398
xtBool XTIndexLog::il_data_written()
5400
return il_buffer_offset != 0 || il_buffer_len != 0;
5403
void XTIndexLog::il_close(xtBool delete_it)
5405
xtLogID log_id = il_log_id;
5408
xt_close_file_ns(il_of);
5412
if (delete_it && log_id) {
5413
char log_path[PATH_MAX];
5415
il_pool->ilp_name(PATH_MAX, log_path, log_id);
5416
xt_fs_delete(NULL, log_path);
5420
xt_free_ns(il_buffer);
5424
xt_spinlock_free(NULL, &il_write_lock);
5428
void XTIndexLog::il_release()
5430
il_pool->ilp_db->db_indlogs.ilp_release_log(this);
5433
xtBool XTIndexLog::il_require_space(size_t bytes, XTThreadPtr thread)
5435
if (il_buffer_len + bytes > il_buffer_size) {
5436
if (!xt_pwrite_file(il_of, il_buffer_offset, il_buffer_len, il_buffer, &thread->st_statistics.st_ilog, thread))
5438
il_buffer_offset += il_buffer_len;
5445
xtBool XTIndexLog::il_write_byte(struct XTOpenTable *ot, xtWord1 byte)
5447
if (!il_require_space(1, ot->ot_thread))
5449
*(il_buffer + il_buffer_len) = byte;
5454
xtBool XTIndexLog::il_write_word4(struct XTOpenTable *ot, xtWord4 value)
5458
if (!il_require_space(4, ot->ot_thread))
5460
buffer = il_buffer + il_buffer_len;
5461
XT_SET_DISK_4(buffer, value);
5467
* This function assumes that the block is xlocked!
5469
xtBool XTIndexLog::il_write_block(struct XTOpenTable *ot, XTIndBlockPtr block)
5471
xtIndexNodeID node_id;
5472
XTIdxBranchDPtr node;
5475
node_id = block->cb_address;
5476
node = (XTIdxBranchDPtr) block->cb_data;
5477
block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(node->tb_size_2));
5479
//il_min_byte_count += (block->cb_max_pos - block->cb_min_pos) + (block->cb_header ? XT_INDEX_PAGE_HEAD_SIZE : 0);
5480
#ifdef IND_WRITE_MIN_DATA
5483
xtBool eo_block = FALSE;
5485
#ifdef IND_WRITE_IN_BLOCK_SIZES
5486
/* Round up to block boundary: */
5487
max_pos = ((block->cb_max_pos + XT_INDEX_PAGE_HEAD_SIZE + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5488
if (max_pos > block_len)
5489
max_pos = block_len;
5490
max_pos -= XT_INDEX_PAGE_HEAD_SIZE;
5491
/* Round down to block boundary: */
5492
min_pos = ((block->cb_min_pos + XT_INDEX_PAGE_HEAD_SIZE) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5494
min_pos -= XT_INDEX_PAGE_HEAD_SIZE;
5496
max_pos = block->cb_max_pos;
5497
min_pos = block->cb_min_pos;
5500
if (block_len == max_pos + XT_INDEX_PAGE_HEAD_SIZE)
5503
ASSERT_NS(max_pos <= XT_INDEX_PAGE_SIZE-XT_INDEX_PAGE_HEAD_SIZE);
5504
ASSERT_NS(min_pos <= block_len-XT_INDEX_PAGE_HEAD_SIZE);
5505
ASSERT_NS(max_pos <= block_len-XT_INDEX_PAGE_HEAD_SIZE);
5506
ASSERT_NS(min_pos <= max_pos);
5508
xt_spinlock_lock(&il_write_lock);
5509
if (block->cb_min_pos == block->cb_max_pos) {
5510
/* This means just the header was changed. */
5511
#ifdef IND_WRITE_IN_BLOCK_SIZES
5512
XTIndShortPageDataDPtr sh_data;
5514
if (!il_require_space(offsetof(XTIndShortPageDataDRec, ild_data) + IND_WRITE_BLOCK_SIZE, ot->ot_thread))
5517
sh_data = (XTIndShortPageDataDPtr) (il_buffer + il_buffer_len);
5518
sh_data->ild_data_type = XT_DT_SHORT_IND_PAGE;
5519
XT_SET_DISK_4(sh_data->ild_page_id_4, XT_NODE_ID(node_id));
5520
XT_SET_DISK_2(sh_data->ild_size_2, IND_WRITE_BLOCK_SIZE);
5521
memcpy(sh_data->ild_data, block->cb_data, IND_WRITE_BLOCK_SIZE);
5522
il_buffer_len += offsetof(XTIndShortPageDataDRec, ild_data) + IND_WRITE_BLOCK_SIZE;
5524
XTIndSetPageHeadDataDPtr sph_data;
5526
if (!il_require_space(sizeof(XTIndSetPageHeadDataDRec), ot->ot_thread))
5529
ASSERT_NS(sizeof(XTIndSetPageHeadDataDRec) <= il_buffer_size);
5531
sph_data = (XTIndSetPageHeadDataDPtr) (il_buffer + il_buffer_len);
5532
sph_data->ild_data_type = XT_DT_SET_PAGE_HEAD;
5533
XT_SET_DISK_4(sph_data->ild_page_id_4, XT_NODE_ID(node_id));
5534
XT_COPY_DISK_2(sph_data->ild_page_head_2, block->cb_data);
5535
il_buffer_len += sizeof(XTIndSetPageHeadDataDRec);
5538
#ifdef IND_WRITE_IN_BLOCK_SIZES
5539
else if (min_pos == 0 || (block->cb_header && min_pos == IND_WRITE_BLOCK_SIZE - XT_INDEX_PAGE_HEAD_SIZE))
5541
else if (min_pos < 16 - XT_INDEX_PAGE_HEAD_SIZE)
5544
/* Fuse, and write the whole block: */
5546
XTIndPageDataDPtr p_data;
5548
if (!il_require_space(offsetof(XTIndPageDataDRec, ild_data) + block_len, ot->ot_thread))
5551
ASSERT_NS(offsetof(XTIndPageDataDRec, ild_data) + block_len <= il_buffer_size);
5553
p_data = (XTIndPageDataDPtr) (il_buffer + il_buffer_len);
5554
p_data->ild_data_type = XT_DT_INDEX_PAGE;
5555
XT_SET_DISK_4(p_data->ild_page_id_4, XT_NODE_ID(node_id));
5556
memcpy(p_data->ild_data, block->cb_data, block_len);
5557
il_buffer_len += offsetof(XTIndPageDataDRec, ild_data) + block_len;
5560
XTIndShortPageDataDPtr sp_data;
5562
block_len = max_pos + XT_INDEX_PAGE_HEAD_SIZE;
5564
if (!il_require_space(offsetof(XTIndShortPageDataDRec, ild_data) + block_len, ot->ot_thread))
5567
ASSERT_NS(offsetof(XTIndShortPageDataDRec, ild_data) + block_len <= il_buffer_size);
5569
sp_data = (XTIndShortPageDataDPtr) (il_buffer + il_buffer_len);
5570
sp_data->ild_data_type = XT_DT_SHORT_IND_PAGE;
5571
XT_SET_DISK_4(sp_data->ild_page_id_4, XT_NODE_ID(node_id));
5572
XT_SET_DISK_2(sp_data->ild_size_2, block_len);
5573
memcpy(sp_data->ild_data, block->cb_data, block_len);
5574
il_buffer_len += offsetof(XTIndShortPageDataDRec, ild_data) + block_len;
5578
block_len = max_pos - min_pos;
5580
if (block->cb_header) {
5581
#ifdef IND_WRITE_IN_BLOCK_SIZES
5582
XTIndDoubleModPageDataDPtr dd_data;
5584
ASSERT_NS(min_pos + XT_INDEX_PAGE_HEAD_SIZE >= 2*IND_WRITE_BLOCK_SIZE);
5585
ASSERT_NS((min_pos + XT_INDEX_PAGE_HEAD_SIZE) % IND_WRITE_BLOCK_SIZE == 0);
5586
if (!il_require_space(offsetof(XTIndDoubleModPageDataDRec, dld_data) + IND_WRITE_BLOCK_SIZE + block_len, ot->ot_thread))
5589
dd_data = (XTIndDoubleModPageDataDPtr) (il_buffer + il_buffer_len);
5590
dd_data->dld_data_type = eo_block ? XT_DT_2_MOD_IND_PAGE_EOB : XT_DT_2_MOD_IND_PAGE;
5591
XT_SET_DISK_4(dd_data->dld_page_id_4, XT_NODE_ID(node_id));
5592
XT_SET_DISK_2(dd_data->dld_size1_2, IND_WRITE_BLOCK_SIZE);
5593
XT_SET_DISK_2(dd_data->dld_offset2_2, min_pos);
5594
XT_SET_DISK_2(dd_data->dld_size2_2, block_len);
5595
memcpy(dd_data->dld_data, block->cb_data, IND_WRITE_BLOCK_SIZE);
5596
memcpy(dd_data->dld_data + IND_WRITE_BLOCK_SIZE, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5597
il_buffer_len += offsetof(XTIndDoubleModPageDataDRec, dld_data) + IND_WRITE_BLOCK_SIZE + block_len;
5599
XTIndModPageHeadDataDPtr mph_data;
5601
if (!il_require_space(offsetof(XTIndModPageHeadDataDRec, ild_data) + block_len, ot->ot_thread))
5604
mph_data = (XTIndModPageHeadDataDPtr) (il_buffer + il_buffer_len);
5605
mph_data->ild_data_type = eo_block ? XT_DT_MOD_IND_PAGE_HEAD_EOB : XT_DT_MOD_IND_PAGE_HEAD;
5606
XT_SET_DISK_4(mph_data->ild_page_id_4, XT_NODE_ID(node_id));
5607
XT_SET_DISK_2(mph_data->ild_size_2, block_len);
5608
XT_SET_DISK_2(mph_data->ild_offset_2, min_pos);
5609
XT_COPY_DISK_2(mph_data->ild_page_head_2, block->cb_data);
5610
memcpy(mph_data->ild_data, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5611
il_buffer_len += offsetof(XTIndModPageHeadDataDRec, ild_data) + block_len;
5615
XTIndModPageDataDPtr mp_data;
5617
if (!il_require_space(offsetof(XTIndModPageDataDRec, ild_data) + block_len, ot->ot_thread))
5620
mp_data = (XTIndModPageDataDPtr) (il_buffer + il_buffer_len);
5621
mp_data->ild_data_type = eo_block ? XT_DT_MOD_IND_PAGE_EOB : XT_DT_MOD_IND_PAGE;
5622
XT_SET_DISK_4(mp_data->ild_page_id_4, XT_NODE_ID(node_id));
5623
XT_SET_DISK_2(mp_data->ild_size_2, block_len);
5624
XT_SET_DISK_2(mp_data->ild_offset_2, min_pos);
5625
memcpy(mp_data->ild_data, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5626
il_buffer_len += offsetof(XTIndModPageDataDRec, ild_data) + block_len;
5630
block->cb_header = FALSE;
5631
block->cb_min_pos = 0xFFFF;
5632
block->cb_max_pos = 0;
5634
#else // IND_OPT_DATA_WRITTEN
5635
XTIndPageDataDPtr page_data;
5637
if (!il_require_space(offsetof(XTIndPageDataDRec, ild_data) + block_len, ot->ot_thread))
5640
ASSERT_NS(offsetof(XTIndPageDataDRec, ild_data) + XT_INDEX_PAGE_SIZE <= il_buffer_size);
5642
page_data = (XTIndPageDataDPtr) (il_buffer + il_buffer_len);
5643
TRACK_BLOCK_TO_FLUSH(node_id);
5644
page_data->ild_data_type = XT_DT_INDEX_PAGE;
5645
XT_SET_DISK_4(page_data->ild_page_id_4, XT_NODE_ID(node_id));
5646
memcpy(page_data->ild_data, block->cb_data, block_len);
5648
il_buffer_len += offsetof(XTIndPageDataDRec, ild_data) + block_len;
5650
#endif // IND_OPT_DATA_WRITTEN
5651
xt_spinlock_unlock(&il_write_lock);
5653
ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FLUSHING);
5654
block->cb_state = IDX_CAC_BLOCK_LOGGED;
5656
TRACK_BLOCK_TO_FLUSH(node_id);
5660
xt_spinlock_unlock(&il_write_lock);
5664
xtBool XTIndexLog::il_write_header(struct XTOpenTable *ot, size_t head_size, xtWord1 *head_buf)
5666
XTIndHeadDataDPtr head_data;
5668
if (!il_require_space(offsetof(XTIndHeadDataDRec, ilh_data) + head_size, ot->ot_thread))
5671
head_data = (XTIndHeadDataDPtr) (il_buffer + il_buffer_len);
5672
head_data->ilh_data_type = XT_DT_HEADER;
5673
XT_SET_DISK_2(head_data->ilh_head_size_2, head_size);
5674
memcpy(head_data->ilh_data, head_buf, head_size);
5676
il_buffer_len += offsetof(XTIndHeadDataDRec, ilh_data) + head_size;
5681
xtBool XTIndexLog::il_flush(struct XTOpenTable *ot)
5683
XTIndLogHeadDRec log_head;
5684
xtTableID tab_id = ot->ot_table->tab_id;
5686
if (il_buffer_len) {
5687
if (!xt_pwrite_file(il_of, il_buffer_offset, il_buffer_len, il_buffer, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5689
il_buffer_offset += il_buffer_len;
5693
if (il_log_eof != il_buffer_offset) {
5694
log_head.ilh_data_type = XT_DT_LOG_HEAD;
5695
XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
5696
XT_SET_DISK_4(log_head.ilh_log_eof_4, il_buffer_offset);
5698
if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
5699
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5703
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5706
if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
5707
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5712
il_log_eof = il_buffer_offset;
5717
#ifdef CHECK_IF_WRITE_WAS_OK
5718
static void check_buff(void *in_a, void *in_b, int len)
5720
xtWord1 *a = (xtWord1 *) in_a;
5721
xtWord1 *b = (xtWord1 *) in_b;
5724
while (offset < len) {
5726
printf("Missmatch at offset = %d %x != %x\n", offset, (int) *a, (int) *b);
5737
xtBool XTIndexLog::il_apply_log_write(struct XTOpenTable *ot)
5740
register XTTableHPtr tab = ot->ot_table;
5745
xtIndexNodeID node_id;
5746
size_t req_size = 0;
5747
XTIdxBranchDPtr node;
5750
xtWord1 *block_header;
5751
xtWord1 *block_data;
5752
#ifdef CHECK_IF_WRITE_WAS_OK
5753
XTIndReferenceRec c_iref;
5754
XTIdxBranchDPtr c_node;
5759
while (offset < il_log_eof) {
5760
if (offset < il_buffer_offset ||
5761
offset >= il_buffer_offset + (off_t) il_buffer_len) {
5762
il_buffer_len = il_buffer_size;
5763
if (il_log_eof - offset < (off_t) il_buffer_len)
5764
il_buffer_len = (size_t) (il_log_eof - offset);
5767
if (il_buffer_len < req_size) {
5768
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
5771
if (!xt_pread_file(il_of, offset, il_buffer_len, il_buffer_len, il_buffer, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5773
il_buffer_offset = offset;
5775
pos = (size_t) (offset - il_buffer_offset);
5776
ASSERT_NS(pos < il_buffer_len);
5777
#ifdef CHECK_IF_WRITE_WAS_OK
5780
buffer = il_buffer + pos;
5782
case XT_DT_LOG_HEAD:
5783
req_size = sizeof(XTIndLogHeadDRec);
5784
if (il_buffer_len - pos < req_size) {
5791
case XT_DT_SHORT_IND_PAGE: {
5792
XTIndShortPageDataDPtr sp_data;
5794
req_size = offsetof(XTIndShortPageDataDRec, ild_data);
5795
if (il_buffer_len - pos < req_size) {
5800
sp_data = (XTIndShortPageDataDPtr) buffer;
5801
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(sp_data->ild_page_id_4));
5802
block_len = XT_GET_DISK_2(sp_data->ild_size_2);
5803
block_data = sp_data->ild_data;
5806
case XT_DT_INDEX_PAGE:
5807
XTIndPageDataDPtr p_data;
5809
req_size = offsetof(XTIndPageDataDRec, ild_data);
5810
if (il_buffer_len - pos < req_size + 2) {
5815
p_data = (XTIndPageDataDPtr) buffer;
5816
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(p_data->ild_page_id_4));
5817
node = (XTIdxBranchDPtr) p_data->ild_data;
5818
block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(node->tb_size_2));
5819
block_data = p_data->ild_data;
5822
if (block_len < 2 || block_len > XT_INDEX_PAGE_SIZE) {
5823
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
5827
req_size += block_len;
5828
#ifdef IND_FILL_BLOCK_TO_NEXT
5829
/* Make sure we have the start of the next block in the buffer:
5830
* Should always work because a XT_DT_INDEX_PAGE is never the last
5833
if (il_buffer_len - pos < req_size + offsetof(XTIndPageDataDRec, ild_data))
5835
if (il_buffer_len - pos < req_size)
5842
TRACK_BLOCK_FLUSH_N(node_id);
5843
address = xt_ind_node_to_offset(tab, node_id);
5844
#ifdef IND_WRITE_IN_BLOCK_SIZES
5845
/* {WRITE-IN-BLOCKS} Round up the block size. Space has been provided. */
5846
block_len = ((block_len + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5848
IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5849
#ifdef IND_FILL_BLOCK_TO_NEXT
5850
if (block_len < XT_INDEX_PAGE_SIZE) {
5851
XTIndPageDataDPtr next_page_data;
5853
next_page_data = (XTIndPageDataDPtr) (buffer + req_size);
5854
if (next_page_data->ild_data_type == XT_DT_INDEX_PAGE) {
5855
xtIndexNodeID next_node_id;
5857
next_node_id = XT_RET_NODE_ID(XT_GET_DISK_4(next_page_data->ild_page_id_4));
5858
/* Write the whole page, if that means leaving no gaps! */
5859
if (next_node_id == node_id+1)
5860
block_len = XT_INDEX_PAGE_SIZE;
5864
ASSERT_NS(block_len >= 2 && block_len <= XT_INDEX_PAGE_SIZE);
5865
if (!il_pwrite_file(ot, address, block_len, block_data))
5871
case XT_DT_SET_PAGE_HEAD: {
5872
XTIndSetPageHeadDataDPtr sph_data;
5874
req_size = sizeof(XTIndSetPageHeadDataDRec);
5875
if (il_buffer_len - pos < req_size) {
5880
sph_data = (XTIndSetPageHeadDataDPtr) buffer;
5881
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(sph_data->ild_page_id_4));
5884
block_header = sph_data->ild_page_head_2;
5888
case XT_DT_MOD_IND_PAGE_HEAD:
5889
case XT_DT_MOD_IND_PAGE_HEAD_EOB: {
5890
XTIndModPageHeadDataDPtr mph_data;
5892
req_size = offsetof(XTIndModPageHeadDataDRec, ild_data);
5893
if (il_buffer_len - pos < req_size) {
5898
mph_data = (XTIndModPageHeadDataDPtr) buffer;
5899
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(mph_data->ild_page_id_4));
5900
block_offset = XT_GET_DISK_2(mph_data->ild_offset_2);
5901
block_len = XT_GET_DISK_2(mph_data->ild_size_2);
5902
block_header = mph_data->ild_page_head_2;
5903
block_data = mph_data->ild_data;
5906
case XT_DT_MOD_IND_PAGE:
5907
case XT_DT_MOD_IND_PAGE_EOB:
5908
XTIndModPageDataDPtr mp_data;
5910
req_size = offsetof(XTIndModPageDataDRec, ild_data);
5911
if (il_buffer_len - pos < req_size) {
5916
mp_data = (XTIndModPageDataDPtr) buffer;
5917
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(mp_data->ild_page_id_4));
5918
block_offset = XT_GET_DISK_2(mp_data->ild_offset_2);
5919
block_len = XT_GET_DISK_2(mp_data->ild_size_2);
5920
block_header = NULL;
5921
block_data = mp_data->ild_data;
5924
if (block_offset + block_len > XT_INDEX_PAGE_DATA_SIZE) {
5925
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
5929
req_size += block_len;
5930
if (il_buffer_len - pos < req_size) {
5935
TRACK_BLOCK_FLUSH_N(node_id);
5936
address = xt_ind_node_to_offset(tab, node_id);
5937
/* {WRITE-IN-BLOCKS} Round up the block size. Space has been provided. */
5938
IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5940
if (!il_pwrite_file(ot, address, 2, block_header))
5945
#ifdef IND_WRITE_IN_BLOCK_SIZES
5946
if (*buffer == XT_DT_MOD_IND_PAGE_HEAD_EOB || *buffer == XT_DT_MOD_IND_PAGE_EOB)
5947
block_len = ((block_len + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5949
if (!il_pwrite_file(ot, address + XT_INDEX_PAGE_HEAD_SIZE + block_offset, block_len, block_data))
5956
case XT_DT_FREE_LIST:
5957
xtWord4 block, nblock;
5959
xtWord1 buffer[IND_WRITE_BLOCK_SIZE];
5960
XTIndFreeBlockRec free_block;
5964
memset(x.buffer, 0, sizeof(XTIndFreeBlockRec));
5971
if (il_buffer_len - pos < req_size) {
5972
il_buffer_len = il_buffer_size;
5973
if (il_log_eof - offset < (off_t) il_buffer_len)
5974
il_buffer_len = (size_t) (il_log_eof - offset);
5976
if (il_buffer_len < req_size) {
5977
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
5980
if (!xt_pread_file(il_of, offset, il_buffer_len, il_buffer_len, il_buffer, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5984
block = XT_GET_DISK_4(il_buffer + pos);
5985
nblock = XT_GET_DISK_4(il_buffer + pos + 4);
5986
if (nblock == 0xFFFFFFFF)
5988
aoff = xt_ind_node_to_offset(tab, XT_RET_NODE_ID(block));
5989
XT_SET_DISK_8(x.free_block.if_next_block_8, nblock);
5990
IDX_TRACE("%d- *%x\n", (int) block, (int) XT_GET_DISK_2(x.buffer));
5991
if (!il_pwrite_file(ot, aoff, IND_WRITE_BLOCK_SIZE, x.buffer))
6001
XTIndHeadDataDPtr head_data;
6004
req_size = offsetof(XTIndHeadDataDRec, ilh_data);
6005
if (il_buffer_len - pos < req_size) {
6009
head_data = (XTIndHeadDataDPtr) buffer;
6010
len = XT_GET_DISK_2(head_data->ilh_head_size_2);
6012
req_size = offsetof(XTIndHeadDataDRec, ilh_data) + len;
6013
if (il_buffer_len - pos < req_size) {
6018
if (!il_pwrite_file(ot, 0, len, head_data->ilh_data))
6024
case XT_DT_2_MOD_IND_PAGE:
6025
case XT_DT_2_MOD_IND_PAGE_EOB:
6026
XTIndDoubleModPageDataDPtr dd_data;
6029
req_size = offsetof(XTIndDoubleModPageDataDRec, dld_data);
6030
if (il_buffer_len - pos < req_size) {
6035
dd_data = (XTIndDoubleModPageDataDPtr) buffer;
6036
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(dd_data->dld_page_id_4));
6037
block_len = XT_GET_DISK_2(dd_data->dld_size1_2);
6038
block_offset = XT_GET_DISK_2(dd_data->dld_offset2_2);
6039
block_len2 = XT_GET_DISK_2(dd_data->dld_size2_2);
6040
block_data = dd_data->dld_data;
6042
req_size += block_len + block_len2;
6043
if (il_buffer_len - pos < req_size)
6049
TRACK_BLOCK_FLUSH_N(node_id);
6050
address = xt_ind_node_to_offset(tab, node_id);
6051
IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
6052
if (!il_pwrite_file(ot, address, block_len, block_data))
6055
#ifdef IND_WRITE_IN_BLOCK_SIZES
6056
if (*buffer == XT_DT_2_MOD_IND_PAGE_EOB)
6057
block_len2 = ((block_len2 + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
6059
if (!il_pwrite_file(ot, address + XT_INDEX_PAGE_HEAD_SIZE + block_offset, block_len2, block_data + block_len))
6066
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
6069
#ifdef CHECK_IF_WRITE_WAS_OK
6071
if (!xt_ind_get(ot, node_id, &c_iref))
6073
if (c_iref.ir_block) {
6074
c_node = (XTIdxBranchDPtr) c_iref.ir_block->cb_data;
6075
c_block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(c_node->tb_size_2));
6077
if (!xt_pread_file(ot->ot_ind_file, address, XT_INDEX_PAGE_SIZE, 0, &ot->ot_ind_tmp_buf, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6079
if (c_iref.ir_block->cb_min_pos == 0xFFFF)
6080
check_buff(&ot->ot_ind_tmp_buf, c_node, c_block_len);
6082
if (!c_iref.ir_block->cb_header)
6083
check_buff(&ot->ot_ind_tmp_buf, c_node, 2);
6084
check_buff(ot->ot_ind_tmp_buf.tb_data, c_node->tb_data, c_iref.ir_block->cb_min_pos);
6085
check_buff(ot->ot_ind_tmp_buf.tb_data + c_iref.ir_block->cb_max_pos,
6086
c_node->tb_data + c_iref.ir_block->cb_max_pos,
6087
c_block_len - XT_INDEX_PAGE_HEAD_SIZE - c_iref.ir_block->cb_max_pos);
6089
xt_ind_release(ot, NULL, XT_UNLOCK_WRITE, &c_iref);
6093
if (il_bytes_written >= IND_FLUSH_THRESHOLD) {
6094
if (!il_flush_file(ot))
6101
xtBool XTIndexLog::il_apply_log_flush(struct XTOpenTable *ot)
6103
register XTTableHPtr tab = ot->ot_table;
6104
XTIndLogHeadDRec log_head;
6106
#ifdef PRINT_IND_FLUSH_STATS
6107
xtWord8 b_flush_time = ot->ot_thread->st_statistics.st_ind.ts_flush_time;
6109
if (!il_flush_file(ot))
6111
#ifdef PRINT_IND_FLUSH_STATS
6119
ot->ot_table->tab_ind_flush_time += ot->ot_thread->st_statistics.st_ind.ts_flush_time - b_flush_time;
6120
ot->ot_table->tab_ind_flush++;
6122
time = (double) ot->ot_table->tab_ind_flush_time / (double) 1000000 / (double) ot->ot_table->tab_ind_flush;
6123
kb = (double) ot->ot_table->tab_ind_write / (double) ot->ot_table->tab_ind_flush / (double) 1024;
6124
printf("TIME: %s Kbytes: %s Mps: %s Flush Count: %d\n",
6125
idx_format(buf1, time),
6126
idx_format(buf2, kb),
6127
idx_format_mb(buf3, kb / time),
6128
(int) ot->ot_table->tab_ind_flush
6132
log_head.ilh_data_type = XT_DT_LOG_HEAD;
6133
XT_SET_DISK_4(log_head.ilh_tab_id_4, il_tab_id);
6134
XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
6136
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6139
if (!XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
6140
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6146
inline xtBool XTIndexLog::il_pwrite_file(struct XTOpenTable *ot, off_t offs, size_t siz, void *dat)
6148
#ifdef IND_WRITE_IN_BLOCK_SIZES
6149
ASSERT_NS(((offs) % IND_WRITE_BLOCK_SIZE) == 0);
6150
ASSERT_NS(((siz) % IND_WRITE_BLOCK_SIZE) == 0);
6152
il_bytes_written += siz;
6153
#ifdef PRINT_IND_FLUSH_STATS
6156
u_int b_write = ot->ot_thread->st_statistics.st_ind.ts_write;
6157
ok = xt_pwrite_file(ot->ot_ind_file, offs, siz, dat, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6158
ot->ot_table->tab_ind_write += ot->ot_thread->st_statistics.st_ind.ts_write - b_write;
6161
return xt_pwrite_file(ot->ot_ind_file, offs, siz, dat, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6165
inline xtBool XTIndexLog::il_flush_file(struct XTOpenTable *ot)
6169
il_bytes_written = 0;
6170
if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
6171
ok = xt_flush_file(ot->ot_ind_file, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6176
xtBool XTIndexLog::il_open_table(struct XTOpenTable **ot)
6178
return xt_db_open_pool_table_ns(ot, il_pool->ilp_db, il_tab_id);
6181
void XTIndexLog::il_close_table(struct XTOpenTable *ot)
6183
xt_db_return_table_to_pool_ns(ot);